awakeBird Back-end Dev Engineer

Gunicorn源码阅读(三) Worker进程

2019-02-26

为了消除版本影响,本篇将 gunicorn 版本更新为19.9.0

Worker 进程

Worker 进程是真正接收并处理请求的进程,它也是个 loop,不断调用listen()来接收并处理新的请求。

初始化

Gunicorn 有很多 Worker 类型,我们先从默认的SyncWorker看起,Worker 的入口是/workers/base.pyinit_process()函数,它会做一些初始化的工作,包括创建 WorkerTmp、创建管道、加载 reloader、注册信号处理函数、加载 WSGI application,最后加载各自 Worker 的run()方法。

WorkerTmp 是 Worker 的一个属性,内部维护了一个打开的文件描述符,它会被 Master 进程循环检查文件的更新时间,如果发现长时间未更新就会被 Master 进程 kill 掉,默认时间是 30s。notify()方法会更改文件的访问权限来将更新时间同步,后面会多次看到这个方法的身影。

class WorkerTmp(object):

    def __init__(self, cfg):
        old_umask = os.umask(cfg.umask)
        fdir = cfg.worker_tmp_dir
        if fdir and not os.path.isdir(fdir):
            raise RuntimeError("%s doesn't exist. Can't create workertmp." % fdir)
        fd, name = tempfile.mkstemp(prefix="wgunicorn-", dir=fdir)

        # allows the process to write to the file
        util.chown(name, cfg.uid, cfg.gid)
        os.umask(old_umask)

        # unlink the file so we don't leak tempory files
        try:
            if not IS_CYGWIN:
                util.unlink(name)
            self._tmp = os.fdopen(fd, 'w+b', 1)
        except:
            os.close(fd)
            raise

        self.spinner = 0

    def notify(self):
        self.spinner = (self.spinner + 1) % 2
        os.fchmod(self._tmp.fileno(), self.spinner)

    def last_update(self):
        return os.fstat(self._tmp.fileno()).st_ctime

    def fileno(self):
        return self._tmp.fileno()

    def close(self):
        return self._tmp.close()

与 Master 进程类似,Worker 在注册信号处理函数时会向管道的写入端写入字节来唤醒 loop。

def init_signals(self):
    if hasattr(signal, 'set_wakeup_fd'):
        signal.set_wakeup_fd(self.PIPE[1])

Reloader 是 gunicorn 负责重新加载 WSGI application 的「监视器」,它是一个线程对象,它在启动后会监控工作目录下的.py文件,一旦这些文件被更改,Worker 进程就会终止,并由 Master 进程重新创建新的 Worker

if self.cfg.reload:
    def changed(fname):
        self.log.info("Worker reloading: %s modified", fname)
        self.alive = False
        self.cfg.worker_int(self)
        time.sleep(0.1)
        sys.exit(0)

    reloader_cls = reloader_engines[self.cfg.reload_engine]
    self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
                                 callback=changed)
    self.reloader.start()

class Reloader(threading.Thread):
    def __init__(self, extra_files=None, interval=1, callback=None):
        super(Reloader, self).__init__()
        self.setDaemon(True)
        self._extra_files = set(extra_files or ())
        self._extra_files_lock = threading.RLock()
        self._interval = interval
        self._callback = callback

    def add_extra_file(self, filename):
        with self._extra_files_lock:
            self._extra_files.add(filename)

    def get_files(self):
        fnames = [
            COMPILED_EXT_RE.sub('py', module.__file__)
            for module in tuple(sys.modules.values())
            if getattr(module, '__file__', None)
        ]

        with self._extra_files_lock:
            fnames.extend(self._extra_files)

        return fnames

    def run(self):
        mtimes = {}
        while True:
            for filename in self.get_files():
                try:
                    mtime = os.stat(filename).st_mtime
                except OSError:
                    continue
                old_time = mtimes.get(filename)
                if old_time is None:
                    mtimes[filename] = mtime
                    continue
                elif mtime > old_time:
                    if self._callback:
                        self._callback(filename)
            time.sleep(self._interval)

前面讲过,WSGI application 必须是一个 callable 对象,这里加载的动作就是把这个对象加载进来,作为wsgi成员变量。

def load_wsgi(self):
    try:
        self.wsgi = self.app.wsgi()
    except SyntaxError as e:
        # ...

真正的加载动作在WSGIApplication里,通过 uri 获取 module 和 obj,分别对 module 和 obj 进行__import__eval

class WSGIApplication(Application):
    def load_wsgiapp(self):
        return util.import_app(self.app_uri)

def import_app(module):
    parts = module.split(":", 1)
    if len(parts) == 1:
        module, obj = module, "application"
    else:
        module, obj = parts[0], parts[1]

    try:
        __import__(module)
    except ImportError:
        # ...

    mod = sys.modules[module]

    is_debug = logging.root.level == logging.DEBUG
    try:
        app = eval(obj, vars(mod))
    # ...

loop

接下来到了 Woker 的run()方法,它会将从 Master 进程继承来的 socket 设置为 ,之后根据 socket 的数目调用相应的方法,差别不大,这里看只有一个 socket 的情况。这里就是 Woker 进程的 loop 所在,它会通过accept()来尝试处理请求,通过wait()方法来控制 loop 的进行。

def run(self):
    timeout = self.timeout or 0.5
    for s in self.sockets:
        s.setblocking(0)

    if len(self.sockets) > 1:
        self.run_for_multiple(timeout)
    else:
        self.run_for_one(timeout)

def run_for_one(self, timeout):
    listener = self.sockets[0]
    while self.alive:
        self.notify()

        try:
            self.accept(listener)
            continue
        # ...

        if not self.is_parent_alive():
            return

        try:
            self.wait(timeout)
        except StopWaiting:
            return

先看wait()方法,通过 select 来轮训检测self.wait_fds是否就绪,这里self.wait_fdsself.wait_fds = self.sockets + [self.PIPE[0]],有 fd 就绪或超过 timeout 则返回进行下一轮 loop。可见,使 loop 立即开始的行为有下面两个:

  • 有 socket 处于就绪(可读)状态
  • 收到信号
    def wait(self, timeout):
      try:
          self.notify()
          ret = select.select(self.wait_fds, [], [], timeout)
          if ret[0]:
              if self.PIPE[0] in ret[0]:
                  os.read(self.PIPE[0], 1)
              return ret[0]
    

再看accept()方法,由于 listeners 都被设置了 Non-Block,如果 socket 未就绪,listener.accept()函数会抛出EAGAGIN错误并立即返回,反之则获得accept()之后的文件描述符,设置 block 后传入 handle 进行处理。这里就可以发现,在accept()调用后,SyncWorker 便开始进入处理一个请求的流程,同时进入阻塞状态,不能再相应其他的请求。

def accept(self, listener):
    client, addr = listener.accept()
    client.setblocking(1)
    util.close_on_exec(client)
    self.handle(listener, client, addr)

handler

进入 handler 之后,Worker 进程开始履行 gunicorn 作为 WSGI 的职责,即

  • 接收 HTTP 请求
  • 解析 HTTP 请求
  • 将HTTP请求进行封装并传给 application
  • 获得 application 返回并返回 HTTP 响应

解析 HTTP 请求是通过RequestParser来进行的,它的__next__方法会读完之前的字节流,并重新创建Request对象并返回,Request会借助Unreader来读取 socket 中的字节流,解析出 HTTP 请求中的一些信息(header、body、path 等)。

def handle(self, listener, client, addr):
    req = None
    try:
        if self.cfg.is_ssl:
            client = ssl.wrap_socket(client, server_side=True,
                **self.cfg.ssl_options)

        parser = http.RequestParser(self.cfg, client)
        req = next(parser)
        self.handle_request(listener, req, client, addr)
    # ...

class RequestParser(Parser):

    mesg_class = Request

class Parser(object):

    mesg_class = None

    def __init__(self, cfg, source):
        self.cfg = cfg
        if hasattr(source, "recv"):
            self.unreader = SocketUnreader(source)
        else:
            self.unreader = IterUnreader(source)
        self.mesg = None

        # request counter (for keepalive connetions)
        self.req_count = 0

    def __iter__(self):
        return self

    def __next__(self):
        # Stop if HTTP dictates a stop.
        if self.mesg and self.mesg.should_close():
            raise StopIteration()

        # Discard any unread body of the previous message
        if self.mesg:
            data = self.mesg.body.read(8192)
            while data:
                data = self.mesg.body.read(8192)

        # Parse the next request
        self.req_count += 1
        self.mesg = self.mesg_class(self.cfg, self.unreader, self.req_count)
        if not self.mesg:
            raise StopIteration()
        return self.mesg

    next = __next_

handle_request()方法中会构建environrespenviron是一个一个包含所有 HTTP 请求信息的 dict 对象,而resp是一个Response对象,它是 HTTP 相应的封装,同时实现了start_response方法供 WSGI application 回调,之后调用 application 获得返回体(字节流),将它写入resp中,一个 HTTP 响应流程结束。

def handle_request(self, listener, req, client, addr):
    environ = {}
    resp = None
    try:
        self.cfg.pre_request(self, req)
        request_start = datetime.now()
        resp, environ = wsgi.create(req, client, addr,
                listener.getsockname(), self.cfg)
        # Force the connection closed until someone shows
        # a buffering proxy that supports Keep-Alive to
        # the backend.
        resp.force_close()
        self.nr += 1
        if self.nr >= self.max_requests:
            self.log.info("Autorestarting worker after current request.")
            self.alive = False
        respiter = self.wsgi(environ, resp.start_response)
        try:
            if isinstance(respiter, environ['wsgi.file_wrapper']):
                resp.write_file(respiter)
            else:
                for item in respiter:
                    # 写入 resp
                    resp.write(item)
            resp.close()
            request_time = datetime.now() - request_start
            self.log.access(resp, req, environ, request_time)
        finally:
            if hasattr(respiter, "close"):
                respiter.close()

GeventWorker

下来看看最常用的 GeventWorker 的实现与 SyncWorker 有哪些不同。

首先是run()方法,也是一个 loop,在 loop 中只是循环的调用notify()方法维持 Worker 进程存活,在这之前它会将 socket 交给 gevent 的StreamServer处理,同时用worker_connections控制最大连接数。其余部分与 SyncWorker 类似。

def run(self):
    servers = []
    ssl_args = {}

    if self.cfg.is_ssl:
        ssl_args = dict(server_side=True, **self.cfg.ssl_options)

    for s in self.sockets:
        s.setblocking(1)
        pool = Pool(self.worker_connections)
        if self.server_class is not None:
            environ = base_environ(self.cfg)
            environ.update({
                "wsgi.multithread": True,
                "SERVER_SOFTWARE": VERSION,
            })
            server = self.server_class(
                s, application=self.wsgi, spawn=pool, log=self.log,
                handler_class=self.wsgi_handler, environ=environ,
                **ssl_args)
        else:
            hfun = partial(self.handle, s)
            server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args)

        server.start()
        servers.append(server)

    while self.alive:
        self.notify()
        gevent.sleep(1.0)

总结

Worker 进程也是一个 loop,也会创建管道来控制 loop 的频率。它借助 WorkerTmp 来监控进程的空闲状态,借助 Reloader 来监控程序文件并自动重启。loop 中接收请求并完成解析和处理,loop 的条件是alive属性为True,而SIGTERM的信号处理函数会将该属性设置为False,使 Worker 进入空闲状态后被 Master 回收,从而配合 Master 进程实现热重启。

Gunicorn 的整个源码阅读文章到这里就结束了,对 gunicorn 的实现不能做到面面俱到,只能挑个人认为比较重要的地方拿出来讲一讲。

(End)


Comments

Content