首页 > temp > 简明python教程 >
-
tornado服务器实现原理
本文分析的tornado版本为1.0.0, 它的代码量比较少, 便于我们找到其核心部分. 在这里可以下载1.0.0版本的tornado.
一.基本流程
使用下面的代码实现一个最简单的tornado服务器:
import tornado.httpserver import tornado.ioloop import tornado.web class MainHandler(tornado.web.RequestHandler): def get(self): self.write('hello world') if __name__ == '__main__': application = tornado.web.Application( handlers=[ (r'/', MainHandler) ] ) http_server = tornado.httpserver.HTTPServer(application) http_server.listen(8000) tornado.ioloop.IOLoop.instance().start()
这里使用了tornado的httpserver, ioloop和web三个模块, 其中httpserver就是http服务器, 它负责接收和处理连接; ioloop则是底层的事件循环系统, 负责在监听到事件时进行通知; web模块就相当于web应用.
总的来说, 一个tornado服务器可以分为四层, 工作流程大致是下面这样:
上面这张图可能有点复杂, 一时看不懂没关系, 后面会进行详细的讲解.
二.异步非阻塞socket
tornado的高性能主要来自于ioloop.IOLoop和iostream.IOStream两个类, 前者是一个事件循环, 通过epoll对不同的socket对象进行监听和调度. IOStream类则是socket对象的封装, 它依靠着IOLoop的事件循环, 实现了对socket读写功能的非阻塞+异步回调.
ioloop.IOLoop的主要代码如下:
import select import logging class IOLoop(object): _EPOLLIN = 0x001 _EPOLLPRI = 0x002 _EPOLLOUT = 0x004 _EPOLLERR = 0x008 _EPOLLHUP = 0x010 _EPOLLRDHUP = 0x2000 _EPOLLONESHOT = (1 << 30) _EPOLLET = (1 << 31) # 可以监听的事件类型,从字面上理解就行 NONE = 0 READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP def __init__(self): self._impl = select.epoll() # 在不支持epoll的系统中, 事件通知机制会退化为kqueue或者select self._handlers = {} @classmethod def instance(cls): # 需要IOLoop对象时,不直接实例化,而是调用这个类方法,这样可以保证IOLoop是单例的 if not hasattr(cls, "_instance"): cls._instance = cls() return cls._instance def add_handler(self, fd, handler, events): self._handlers[fd] = handler self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): self._handlers.pop(fd, None) try: self._impl.unregister(fd) except (OSError, IOError): logging.debug("Error deleting fd from IOLoop", exc_info=True) def start(self): while 1: event_pairs = self._impl.poll() for fd, events in event_pairs: self._handlers[fd](fd, events)
IOLoop的本质是对epoll的封装, 它的用法比较简单: 首先, 我们可以调用add&update&remove_handler方法来设置需要监听的句柄, 对应的事件和回调函数, 然后, 只要调用start方法, IOLoop就会使用epoll一直监听下去, 并且在监听到事件时, 调用对应的回调函数, 这样就实现了监听和调度的功能.
iostream.IOStream类的主要代码如下:
import errno import logging import socket class IOStream: def __init__(self, socket, io_loop, read_chunk_size=4096): self.socket = socket self.socket.setblocking(False) self.io_loop = io_loop self.read_chunk_size = read_chunk_size self._read_buffer = "" self._write_buffer = "" self._read_delimiter = None self._read_callback = None self._write_callback = None self._state = self.io_loop.ERROR self.io_loop.add_handler( self.socket.fileno(), self._handle_events, self._state) def read_until(self, delimiter, callback): loc = self._read_buffer.find(delimiter) if loc != -1: callback(self._consume(loc + len(delimiter))) return self._read_delimiter = delimiter self._read_callback = callback self._add_io_state(self.io_loop.READ) def write(self, data, callback=None): self._write_buffer += data self._add_io_state(self.io_loop.WRITE) self._write_callback = callback def _consume(self, loc): # 这个方法负责把读取缓冲区的指定长度截下来返回 result = self._read_buffer[:loc] self._read_buffer = self._read_buffer[loc:] return result def close(self): if self.socket is not None: self.io_loop.remove_handler(self.socket.fileno()) self.socket.close() self.socket = None def _add_io_state(self, state): # 调用这个方法添加要监听的事件 if not self._state & state: self._state = self._state | state self.io_loop.update_handler(self.socket.fileno(), self._state) def _handle_events(self, fd, events): # 这个方法由事件循环进行回调 # 它首先根据事件类型调用对应方法去处理,然后根据处理结果更新在事件循环中注册的事件 if events & self.io_loop.READ: self._handle_read() if not self.socket: return if events & self.io_loop.WRITE: self._handle_write() if not self.socket: return if events & self.io_loop.ERROR: self.close() return # 判断是否还需要读&写数据,然后重新注册事件 state = self.io_loop.ERROR if self._read_delimiter: state |= self.io_loop.READ if self._write_buffer: state |= self.io_loop.WRITE if state != self._state: self._state = state self.io_loop.update_handler(self.socket.fileno(), self._state) def _handle_read(self): # 当有可读事件时触发这个方法,读取可读缓冲区的数据并写入到self._read_buffer中 try: chunk = self.socket.recv(self.read_chunk_size) except socket.error, e: if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return else: logging.warning("Read error on %d: %s", self.socket.fileno(), e) self.close() return if not chunk: self.close() return self._read_buffer += chunk # 如果设置了终止符,并且已经读到终止符了,就不再读取 if self._read_delimiter: loc = self._read_buffer.find(self._read_delimiter) if loc != -1: callback = self._read_callback delimiter_len = len(self._read_delimiter) self._read_callback = None self._read_delimiter = None callback(self._consume(loc + delimiter_len)) def _handle_write(self): # 当有可写事件时触发这个函数,把self._write_buffer的数据写入可写缓冲区,直到写完或者写不下为止 while self._write_buffer: try: num_bytes = self.socket.send(self._write_buffer) self._write_buffer = self._write_buffer[num_bytes:] except socket.error, e: if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN): break else: logging.warning("Write error on %d: %s", self.socket.fileno(), e) self.close() return # 写完之后,调用预先设定的回调 if not self._write_buffer and self._write_callback: callback = self._write_callback self._write_callback = None callback()
IOStream本质是一个socket对象, 只不过通过事件循环变为异步的了. 我们调用它的read_until或者write方法时, IOStream并不会马上尝试去读取或写入数据, 而是设置一个回调函数, 然后调用_add_io_state方法在事件循环中添加对可读或可写事件的监控. 然后, 事件循环在监听到事件时, 调用IOStream的_handle_events方法, 该方法根据事件的类型再调用_handle_read和_handle_write去读取或写入数据, 并调用之前设定好的回调, 这样一次读取&写入才算结束.
除此之外, IOStream还将自己的socket设置为非阻塞的状态, 避免在socket不可读&不可写的情况下产生阻塞. tornado的高性能主要就是因为事件循环回调和非阻塞socket这两点, 首先, 异步回调的机制可以使tornado在单个线程中同时维护多个socket连接, 当某个连接触发事件时, 调用回调去处理就行. 然后, socket的非阻塞状态可以避免处理事件时产生的阻塞, 从而最大程度地利用CPU时间.
总的来说, IOStream + IOLoop的工作流程如下:
三.web服务器
httpserver模块中有三个类: HTTPServer, HTTPConnection和HTTPRequest, HTTPServer相当于服务端socket的封装, 负责接收客户端的连接. 该连接会交由HTTPConnection去处理, HTTPConnection利用iostream模块读取客户端的请求数据, 然后将请求数据封装成一个HTTPRequest对象, 将这个对象交由web应用去处理.
HTTPServer的主要代码如下:
import errno import socket class HTTPServer: def __init__(self, application): self.application = application self.io_loop = ioloop.IOLoop.instance() self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._socket.setblocking(0) def listen(self, port, address=''): self._socket.bind((address, port)) self._socket.listen(128) self.io_loop.add_handler(self._socket.fileno(), self._handle_events, ioloop.IOLoop.READ) def _handle_events(self, fd, events): while 1: try: connection, address = self._socket.accept() except socket.error, e: # 已经接收到客户端后,就跳出循环 if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return raise stream = iostream.IOStream(connection, io_loop=self.io_loop) HTTPConnection(stream, address, self.application)
HTTPServer是web服务器端的入口, 首先, 我们通过实例化这个对象来指定web服务器所配套的web应用. 然后, 调用它的listen方法, 就会通过ioloop监听指定端口的可读事件, 也就是客户端连接. 当有客户端连接时, HTTPServer会首先实例化一个IOStream对象, 这个对象相当于对客户端socket对象的封装, 然后新建一个HTTPConnection对象去处理这个新连接.
HTTPConnection的主要代码如下:
import tornado.httputil class HTTPConnection: def __init__(self, stream, address, application): self.stream = stream self.address = address self.application = application self.stream.read_until("\r\n\r\n", self._on_headers) def _on_headers(self, data): eol = data.find("\r\n") start_line = data[:eol] method, uri, version = start_line.split(" ") headers = tornado.httputil.HTTPHeaders.parse(data[eol:]) # 这里会把请求数据解析成一个字典对象返回 self._request = HTTPRequest( connection=self, method=method, uri=uri, version=version, headers=headers, remote_ip=self.address[0]) self.application(self._request) def write(self, chunk): self.stream.write(chunk, self._on_write_complete) def _on_write_complete(self): self.stream.close()
在HTTPServer接收到新连接后, 由HTTPConnection来处理这个新连接. 首先, HTTPConnection使用IOStream异步回调地读取客户端的请求数据, 解析出请求行的内容以及请求头数据之后, 将这些数据封装到一个HTTPRequest对象中, 让web应用去处理这个请求对象. web应用处理结束后, 再调用它的write方法, 通过IOStream将响应数据写入, 最后关闭socket连接, 这样一个请求就处理完毕了.
HTTPRequest主要是对请求数据的封装, 没什么好说的. 它的主要代码如下:
import urlparse class HTTPRequest: def __init__(self, method, uri, version="HTTP/1.0", headers=None, remote_ip=None, connection=None): self.method = method self.uri = uri self.version = version self.headers = headers self.remote_ip = remote_ip self.host = self.headers.get("Host") or "127.0.0.1" self.connection = connection scheme, netloc, path, query, fragment = urlparse.urlsplit(uri) self.path = path self.query = query def write(self, chunk): # web应用调用这个方法写入响应数据,通过HTTPConnection最终由IOStream来写入数据 self.connection.write(chunk)
这样, 一个http服务器就完成了, 它的流程像是下面这样:
四.web应用
web应用的职责是, 接收web服务器发过来的请求数据, 根据这些数据执行一些逻辑之后, 返回响应结果. tornado的web模块就负责web应用这块.
我们首先分析web.Application类, 简单来说, 它的代码差不多是下面这样:
import re class Application(object): """ 实际上, 这个类还做了其它一些工作, 比如设置debug模式, 指定wsgi等等 另外, 路由的映射关系实际上是由web.URLSpec这个类进行封装的 但是, 这些都不是重点, 这段代码只是为了方便理解, 说明Application主要做什么事 """ def __init__(self, handlers): self.handlers = handlers def __call__(self, request): path = request.path h_obj = None for pattern, handler in self.handlers: if re.match(pattern, path): h_obj = handler(request) h_obj._execute()
web.Application是web应用的入口, 由刚才的代码可以看出来, 它负责路由的分发. 首先我们实例化对象并传入handlers = [(r'/', MainHandler)]这样的参数, 然后调用这个Application对象并传入request, 它就会根据请求数据所给的路径找到对应的handler类, 实例化这个handler类并调用handler的_execute方法, 让handler对象来执行具体的操作.
一般来说, 我们指定的handler类都会继承web.RequestHandler, 它的代码差不多是下面这样:
import httplib class RequestHandler(object): """ 这里的RequestHandler也只列出了最核心的代码 除此之外, RequestHandler还实现了获取和设置cookie, 用户认证以及防csrf攻击等功能 """ def __init__(self, request): self.request = request self._headers = { "Content-Type": "text/html; charset=UTF-8", } self._write_buffer = [] self._status_code = 200 def get(self): # 这个方法需要我们根据请求类型自己定义,除get外,还支持head,post,delete和put raise HTTPError(405) def write(self, chunk): self._write_buffer.append(chunk.encode('utf8')) def finish(self): # 首先生成响应状态和响应头 lines = [self.request.version + " " + str(self._status_code) + " " + httplib.responses[self._status_code]] lines.extend(["%s: %s" % (n, v) for n, v in self._headers.iteritems()]) headers = "\r\n".join(lines) + "\r\n\r\n" # 然后生成响应内容 chunk = "".join(self._write_buffer) self.request.write(headers + chunk) def _execute(self): getattr(self, self.request.method.lower())() self.finish()
RequestHandler对响应进行了封装. Application调用它的_execute方法, 就会根据请求类型反射到我们所重写的方法, 比如get方法. 在执行完我们定义的方法之后, 调用自己的finish方法来生成响应消息, 并通过request将响应消息返回.
Application和RequestHandler实现了一个web应用的框架, 用户只需要继承RequestHandler类, 然后重写请求类型的对应方法就可以了. 总的来看, 这个web应用的处理流程如下:
五.总结
综上所述, tornado服务器可以分为四层: 事件循环层, TCP传输层, HTTP层和web应用, 工作起来像是下面这样:
在写demo应用的阶段, 我们做了四件事:
- 继承RequestHandler, 重写请求类型对应的方法, 比如get方法
- 定义Application的路由
- 为HTTPServer指定app和端口
- 启动IOLoop
这样, 一个tornado应用就启动了, 一个请求的流程是这样的:
- IOLoop监听到新的客户端连接, 通知HTTPServer
- HTTPServer实例化一个HTTPConnection来处理这个新的客户端
- HTTPConnection利用IOStream异步读取客户端的请求数据
- IOStream通过IOLoop注册可读事件, 在事件触发时读取数据, 然后调用HTTPConnection的回调函数
- HTTPConnection将读取的请求数据进行解析, 用一个HTTPRequest对象封装解析后的请求数据
- HTTPConnection把HTTPRequest发送给Application
- Application通过路由找到对应的RequestHandler, 让它来处理请求
- RequestHandler通过反射找到请求类型对应的处理方法, 处理请求
- 处理完成后, RequestHandler调用HTTPRequest的write方法写入响应结果
- HTTPRequest将响应结果交给HTTPConnection, HTTPConnection使用IOStream来写入响应数据
- IOStream继续使用IOLoop异步地写入数据, 写入完毕后, 调用HTTPConnection的回调函数
- HTTPConnection被回调, 它关闭socket连接, 请求结束 (http1.1或者keep-alive的情况不讨论)