为了消除版本影响,本篇将 gunicorn 版本更新为19.9.0
。
Worker 进程
Worker 进程是真正接收并处理请求的进程,它也是个 loop,不断调用listen()
来接收并处理新的请求。
初始化
Gunicorn 有很多 Worker 类型,我们先从默认的SyncWorker
看起,Worker 的入口是/workers/base.py
的init_process()
函数,它会做一些初始化的工作,包括创建 WorkerTmp、创建管道、加载 reloader、注册信号处理函数、加载 WSGI application,最后加载各自 Worker 的run()
方法。
WorkerTmp 是 Worker 的一个属性,内部维护了一个打开的文件描述符,它会被 Master 进程循环检查文件的更新时间,如果发现长时间未更新就会被 Master 进程 kill 掉,默认时间是 30s。notify()
方法会更改文件的访问权限来将更新时间同步,后面会多次看到这个方法的身影。
class WorkerTmp(object):
def __init__(self, cfg):
old_umask = os.umask(cfg.umask)
fdir = cfg.worker_tmp_dir
if fdir and not os.path.isdir(fdir):
raise RuntimeError("%s doesn't exist. Can't create workertmp." % fdir)
fd, name = tempfile.mkstemp(prefix="wgunicorn-", dir=fdir)
# allows the process to write to the file
util.chown(name, cfg.uid, cfg.gid)
os.umask(old_umask)
# unlink the file so we don't leak tempory files
try:
if not IS_CYGWIN:
util.unlink(name)
self._tmp = os.fdopen(fd, 'w+b', 1)
except:
os.close(fd)
raise
self.spinner = 0
def notify(self):
self.spinner = (self.spinner + 1) % 2
os.fchmod(self._tmp.fileno(), self.spinner)
def last_update(self):
return os.fstat(self._tmp.fileno()).st_ctime
def fileno(self):
return self._tmp.fileno()
def close(self):
return self._tmp.close()
与 Master 进程类似,Worker 在注册信号处理函数时会向管道的写入端写入字节来唤醒 loop。
def init_signals(self):
if hasattr(signal, 'set_wakeup_fd'):
signal.set_wakeup_fd(self.PIPE[1])
Reloader 是 gunicorn 负责重新加载 WSGI application 的「监视器」,它是一个线程对象,它在启动后会监控工作目录下的.py
文件,一旦这些文件被更改,Worker 进程就会终止,并由 Master 进程重新创建新的 Worker
if self.cfg.reload:
def changed(fname):
self.log.info("Worker reloading: %s modified", fname)
self.alive = False
self.cfg.worker_int(self)
time.sleep(0.1)
sys.exit(0)
reloader_cls = reloader_engines[self.cfg.reload_engine]
self.reloader = reloader_cls(extra_files=self.cfg.reload_extra_files,
callback=changed)
self.reloader.start()
class Reloader(threading.Thread):
def __init__(self, extra_files=None, interval=1, callback=None):
super(Reloader, self).__init__()
self.setDaemon(True)
self._extra_files = set(extra_files or ())
self._extra_files_lock = threading.RLock()
self._interval = interval
self._callback = callback
def add_extra_file(self, filename):
with self._extra_files_lock:
self._extra_files.add(filename)
def get_files(self):
fnames = [
COMPILED_EXT_RE.sub('py', module.__file__)
for module in tuple(sys.modules.values())
if getattr(module, '__file__', None)
]
with self._extra_files_lock:
fnames.extend(self._extra_files)
return fnames
def run(self):
mtimes = {}
while True:
for filename in self.get_files():
try:
mtime = os.stat(filename).st_mtime
except OSError:
continue
old_time = mtimes.get(filename)
if old_time is None:
mtimes[filename] = mtime
continue
elif mtime > old_time:
if self._callback:
self._callback(filename)
time.sleep(self._interval)
前面讲过,WSGI application 必须是一个 callable 对象,这里加载的动作就是把这个对象加载进来,作为wsgi
成员变量。
def load_wsgi(self):
try:
self.wsgi = self.app.wsgi()
except SyntaxError as e:
# ...
真正的加载动作在WSGIApplication
里,通过 uri 获取 module 和 obj,分别对 module 和 obj 进行__import__
和eval
。
class WSGIApplication(Application):
def load_wsgiapp(self):
return util.import_app(self.app_uri)
def import_app(module):
parts = module.split(":", 1)
if len(parts) == 1:
module, obj = module, "application"
else:
module, obj = parts[0], parts[1]
try:
__import__(module)
except ImportError:
# ...
mod = sys.modules[module]
is_debug = logging.root.level == logging.DEBUG
try:
app = eval(obj, vars(mod))
# ...
loop
接下来到了 Woker 的run()
方法,它会将从 Master 进程继承来的 socket 设置为 ,之后根据 socket 的数目调用相应的方法,差别不大,这里看只有一个 socket 的情况。这里就是 Woker 进程的 loop 所在,它会通过accept()
来尝试处理请求,通过wait()
方法来控制 loop 的进行。
def run(self):
timeout = self.timeout or 0.5
for s in self.sockets:
s.setblocking(0)
if len(self.sockets) > 1:
self.run_for_multiple(timeout)
else:
self.run_for_one(timeout)
def run_for_one(self, timeout):
listener = self.sockets[0]
while self.alive:
self.notify()
try:
self.accept(listener)
continue
# ...
if not self.is_parent_alive():
return
try:
self.wait(timeout)
except StopWaiting:
return
先看wait()
方法,通过 select 来轮训检测self.wait_fds
是否就绪,这里self.wait_fds
是self.wait_fds = self.sockets + [self.PIPE[0]]
,有 fd 就绪或超过 timeout 则返回进行下一轮 loop。可见,使 loop 立即开始的行为有下面两个:
- 有 socket 处于就绪(可读)状态
- 收到信号
def wait(self, timeout): try: self.notify() ret = select.select(self.wait_fds, [], [], timeout) if ret[0]: if self.PIPE[0] in ret[0]: os.read(self.PIPE[0], 1) return ret[0]
再看accept()
方法,由于 listeners 都被设置了 Non-Block,如果 socket 未就绪,listener.accept()
函数会抛出EAGAGIN
错误并立即返回,反之则获得accept()
之后的文件描述符,设置 block 后传入 handle 进行处理。这里就可以发现,在accept()
调用后,SyncWorker 便开始进入处理一个请求的流程,同时进入阻塞状态,不能再相应其他的请求。
def accept(self, listener):
client, addr = listener.accept()
client.setblocking(1)
util.close_on_exec(client)
self.handle(listener, client, addr)
handler
进入 handler 之后,Worker 进程开始履行 gunicorn 作为 WSGI 的职责,即
- 接收 HTTP 请求
- 解析 HTTP 请求
- 将HTTP请求进行封装并传给 application
- 获得 application 返回并返回 HTTP 响应
解析 HTTP 请求是通过RequestParser
来进行的,它的__next__
方法会读完之前的字节流,并重新创建Request
对象并返回,Request
会借助Unreader
来读取 socket 中的字节流,解析出 HTTP 请求中的一些信息(header、body、path 等)。
def handle(self, listener, client, addr):
req = None
try:
if self.cfg.is_ssl:
client = ssl.wrap_socket(client, server_side=True,
**self.cfg.ssl_options)
parser = http.RequestParser(self.cfg, client)
req = next(parser)
self.handle_request(listener, req, client, addr)
# ...
class RequestParser(Parser):
mesg_class = Request
class Parser(object):
mesg_class = None
def __init__(self, cfg, source):
self.cfg = cfg
if hasattr(source, "recv"):
self.unreader = SocketUnreader(source)
else:
self.unreader = IterUnreader(source)
self.mesg = None
# request counter (for keepalive connetions)
self.req_count = 0
def __iter__(self):
return self
def __next__(self):
# Stop if HTTP dictates a stop.
if self.mesg and self.mesg.should_close():
raise StopIteration()
# Discard any unread body of the previous message
if self.mesg:
data = self.mesg.body.read(8192)
while data:
data = self.mesg.body.read(8192)
# Parse the next request
self.req_count += 1
self.mesg = self.mesg_class(self.cfg, self.unreader, self.req_count)
if not self.mesg:
raise StopIteration()
return self.mesg
next = __next_
handle_request()
方法中会构建environ
和resp
,environ
是一个一个包含所有 HTTP 请求信息的 dict 对象,而resp
是一个Response
对象,它是 HTTP 相应的封装,同时实现了start_response
方法供 WSGI application 回调,之后调用 application 获得返回体(字节流),将它写入resp
中,一个 HTTP 响应流程结束。
def handle_request(self, listener, req, client, addr):
environ = {}
resp = None
try:
self.cfg.pre_request(self, req)
request_start = datetime.now()
resp, environ = wsgi.create(req, client, addr,
listener.getsockname(), self.cfg)
# Force the connection closed until someone shows
# a buffering proxy that supports Keep-Alive to
# the backend.
resp.force_close()
self.nr += 1
if self.nr >= self.max_requests:
self.log.info("Autorestarting worker after current request.")
self.alive = False
respiter = self.wsgi(environ, resp.start_response)
try:
if isinstance(respiter, environ['wsgi.file_wrapper']):
resp.write_file(respiter)
else:
for item in respiter:
# 写入 resp
resp.write(item)
resp.close()
request_time = datetime.now() - request_start
self.log.access(resp, req, environ, request_time)
finally:
if hasattr(respiter, "close"):
respiter.close()
GeventWorker
下来看看最常用的 GeventWorker 的实现与 SyncWorker 有哪些不同。
首先是run()
方法,也是一个 loop,在 loop 中只是循环的调用notify()
方法维持 Worker 进程存活,在这之前它会将 socket 交给 gevent 的StreamServer
处理,同时用worker_connections
控制最大连接数。其余部分与 SyncWorker 类似。
def run(self):
servers = []
ssl_args = {}
if self.cfg.is_ssl:
ssl_args = dict(server_side=True, **self.cfg.ssl_options)
for s in self.sockets:
s.setblocking(1)
pool = Pool(self.worker_connections)
if self.server_class is not None:
environ = base_environ(self.cfg)
environ.update({
"wsgi.multithread": True,
"SERVER_SOFTWARE": VERSION,
})
server = self.server_class(
s, application=self.wsgi, spawn=pool, log=self.log,
handler_class=self.wsgi_handler, environ=environ,
**ssl_args)
else:
hfun = partial(self.handle, s)
server = StreamServer(s, handle=hfun, spawn=pool, **ssl_args)
server.start()
servers.append(server)
while self.alive:
self.notify()
gevent.sleep(1.0)
总结
Worker 进程也是一个 loop,也会创建管道来控制 loop 的频率。它借助 WorkerTmp 来监控进程的空闲状态,借助 Reloader 来监控程序文件并自动重启。loop 中接收请求并完成解析和处理,loop 的条件是alive
属性为True
,而SIGTERM
的信号处理函数会将该属性设置为False
,使 Worker 进入空闲状态后被 Master 回收,从而配合 Master 进程实现热重启。
Gunicorn 的整个源码阅读文章到这里就结束了,对 gunicorn 的实现不能做到面面俱到,只能挑个人认为比较重要的地方拿出来讲一讲。
(End)