VB.net 2010 视频教程 VB.net 2010 视频教程 python基础视频教程
SQL Server 2008 视频教程 c#入门经典教程 Visual Basic从门到精通视频教程
当前位置:
首页 > temp > 简明python教程 >
  • tornado服务器实现原理

本文分析的tornado版本为1.0.0, 它的代码量比较少, 便于我们找到其核心部分. 在这里可以下载1.0.0版本的tornado.

一.基本流程

  使用下面的代码实现一个最简单的tornado服务器:

复制代码
import tornado.httpserver
import tornado.ioloop
import tornado.web


class MainHandler(tornado.web.RequestHandler):

    def get(self):
        self.write('hello world')


if __name__ == '__main__':
    application = tornado.web.Application(
        handlers=[
            (r'/', MainHandler)
        ]
    )
    http_server = tornado.httpserver.HTTPServer(application)
    http_server.listen(8000)
    tornado.ioloop.IOLoop.instance().start()
复制代码

  这里使用了tornado的httpserver, ioloop和web三个模块, 其中httpserver就是http服务器, 它负责接收和处理连接; ioloop则是底层的事件循环系统, 负责在监听到事件时进行通知; web模块就相当于web应用.

  总的来说, 一个tornado服务器可以分为四层, 工作流程大致是下面这样:

  上面这张图可能有点复杂, 一时看不懂没关系, 后面会进行详细的讲解.

二.异步非阻塞socket

  tornado的高性能主要来自于ioloop.IOLoop和iostream.IOStream两个类, 前者是一个事件循环, 通过epoll对不同的socket对象进行监听和调度. IOStream类则是socket对象的封装, 它依靠着IOLoop的事件循环, 实现了对socket读写功能的非阻塞+异步回调.

  ioloop.IOLoop的主要代码如下:

复制代码
import select
import logging


class IOLoop(object):
    _EPOLLIN = 0x001
    _EPOLLPRI = 0x002
    _EPOLLOUT = 0x004
    _EPOLLERR = 0x008
    _EPOLLHUP = 0x010
    _EPOLLRDHUP = 0x2000
    _EPOLLONESHOT = (1 << 30)
    _EPOLLET = (1 << 31)

    # 可以监听的事件类型,从字面上理解就行
    NONE = 0
    READ = _EPOLLIN
    WRITE = _EPOLLOUT
    ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP

    def __init__(self):
        self._impl = select.epoll()  # 在不支持epoll的系统中, 事件通知机制会退化为kqueue或者select
        self._handlers = {}

    @classmethod
    def instance(cls):
        # 需要IOLoop对象时,不直接实例化,而是调用这个类方法,这样可以保证IOLoop是单例的
        if not hasattr(cls, "_instance"):
            cls._instance = cls()
        return cls._instance

    def add_handler(self, fd, handler, events):
        self._handlers[fd] = handler
        self._impl.register(fd, events | self.ERROR)

    def update_handler(self, fd, events):
        self._impl.modify(fd, events | self.ERROR)

    def remove_handler(self, fd):
        self._handlers.pop(fd, None)
        try:
            self._impl.unregister(fd)
        except (OSError, IOError):
            logging.debug("Error deleting fd from IOLoop", exc_info=True)

    def start(self):
        while 1:
            event_pairs = self._impl.poll()
            for fd, events in event_pairs:
                self._handlers[fd](fd, events)
复制代码

  IOLoop的本质是对epoll的封装, 它的用法比较简单: 首先, 我们可以调用add&update&remove_handler方法来设置需要监听的句柄, 对应的事件和回调函数, 然后, 只要调用start方法, IOLoop就会使用epoll一直监听下去, 并且在监听到事件时, 调用对应的回调函数, 这样就实现了监听和调度的功能.

  iostream.IOStream类的主要代码如下:

复制代码
import errno
import logging
import socket


class IOStream:

    def __init__(self, socket, io_loop, read_chunk_size=4096):
        self.socket = socket
        self.socket.setblocking(False)
        self.io_loop = io_loop
        self.read_chunk_size = read_chunk_size
        self._read_buffer = ""
        self._write_buffer = ""
        self._read_delimiter = None
        self._read_callback = None
        self._write_callback = None
        self._state = self.io_loop.ERROR
        self.io_loop.add_handler(
            self.socket.fileno(), self._handle_events, self._state)

    def read_until(self, delimiter, callback):
        loc = self._read_buffer.find(delimiter)
        if loc != -1:
            callback(self._consume(loc + len(delimiter)))
            return
        self._read_delimiter = delimiter
        self._read_callback = callback
        self._add_io_state(self.io_loop.READ)

    def write(self, data, callback=None):
        self._write_buffer += data
        self._add_io_state(self.io_loop.WRITE)
        self._write_callback = callback

    def _consume(self, loc):
        # 这个方法负责把读取缓冲区的指定长度截下来返回
        result = self._read_buffer[:loc]
        self._read_buffer = self._read_buffer[loc:]
        return result

    def close(self):
        if self.socket is not None:
            self.io_loop.remove_handler(self.socket.fileno())
            self.socket.close()
            self.socket = None

    def _add_io_state(self, state):
        # 调用这个方法添加要监听的事件
        if not self._state & state:
            self._state = self._state | state
            self.io_loop.update_handler(self.socket.fileno(), self._state)

    def _handle_events(self, fd, events):
        # 这个方法由事件循环进行回调
        # 它首先根据事件类型调用对应方法去处理,然后根据处理结果更新在事件循环中注册的事件
        if events & self.io_loop.READ:
            self._handle_read()
        if not self.socket:
            return
        if events & self.io_loop.WRITE:
            self._handle_write()
        if not self.socket:
            return
        if events & self.io_loop.ERROR:
            self.close()
            return
        # 判断是否还需要读&写数据,然后重新注册事件
        state = self.io_loop.ERROR
        if self._read_delimiter:
            state |= self.io_loop.READ
        if self._write_buffer:
            state |= self.io_loop.WRITE
        if state != self._state:
            self._state = state
            self.io_loop.update_handler(self.socket.fileno(), self._state)

    def _handle_read(self):
        # 当有可读事件时触发这个方法,读取可读缓冲区的数据并写入到self._read_buffer中
        try:
            chunk = self.socket.recv(self.read_chunk_size)
        except socket.error, e:
            if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                return
            else:
                logging.warning("Read error on %d: %s",
                                self.socket.fileno(), e)
                self.close()
                return
        if not chunk:
            self.close()
            return
        self._read_buffer += chunk
        # 如果设置了终止符,并且已经读到终止符了,就不再读取
        if self._read_delimiter:
            loc = self._read_buffer.find(self._read_delimiter)
            if loc != -1:
                callback = self._read_callback
                delimiter_len = len(self._read_delimiter)
                self._read_callback = None
                self._read_delimiter = None
                callback(self._consume(loc + delimiter_len))

    def _handle_write(self):
        # 当有可写事件时触发这个函数,把self._write_buffer的数据写入可写缓冲区,直到写完或者写不下为止
        while self._write_buffer:
            try:
                num_bytes = self.socket.send(self._write_buffer)
                self._write_buffer = self._write_buffer[num_bytes:]
            except socket.error, e:
                if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    break
                else:
                    logging.warning("Write error on %d: %s",
                                    self.socket.fileno(), e)
                    self.close()
                    return
        # 写完之后,调用预先设定的回调
        if not self._write_buffer and self._write_callback:
            callback = self._write_callback
            self._write_callback = None
            callback()
复制代码

  IOStream本质是一个socket对象, 只不过通过事件循环变为异步的了. 我们调用它的read_until或者write方法时, IOStream并不会马上尝试去读取或写入数据, 而是设置一个回调函数, 然后调用_add_io_state方法在事件循环中添加对可读或可写事件的监控. 然后, 事件循环在监听到事件时, 调用IOStream的_handle_events方法, 该方法根据事件的类型再调用_handle_read和_handle_write去读取或写入数据, 并调用之前设定好的回调, 这样一次读取&写入才算结束.  

   除此之外, IOStream还将自己的socket设置为非阻塞的状态, 避免在socket不可读&不可写的情况下产生阻塞. tornado的高性能主要就是因为事件循环回调和非阻塞socket这两点, 首先, 异步回调的机制可以使tornado在单个线程中同时维护多个socket连接, 当某个连接触发事件时, 调用回调去处理就行. 然后, socket的非阻塞状态可以避免处理事件时产生的阻塞, 从而最大程度地利用CPU时间.

  总的来说, IOStream + IOLoop的工作流程如下:

   

三.web服务器

  httpserver模块中有三个类: HTTPServer, HTTPConnection和HTTPRequest, HTTPServer相当于服务端socket的封装, 负责接收客户端的连接. 该连接会交由HTTPConnection去处理, HTTPConnection利用iostream模块读取客户端的请求数据, 然后将请求数据封装成一个HTTPRequest对象, 将这个对象交由web应用去处理.

  HTTPServer的主要代码如下:

复制代码
import errno
import socket


class HTTPServer:

    def __init__(self, application):
        self.application = application
        self.io_loop = ioloop.IOLoop.instance()
        self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
        self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self._socket.setblocking(0)

    def listen(self, port, address=''):
        self._socket.bind((address, port))
        self._socket.listen(128)
        self.io_loop.add_handler(self._socket.fileno(),
                                 self._handle_events,
                                 ioloop.IOLoop.READ)

    def _handle_events(self, fd, events):
        while 1:
            try:
                connection, address = self._socket.accept()
            except socket.error, e:
                # 已经接收到客户端后,就跳出循环
                if e[0] in (errno.EWOULDBLOCK, errno.EAGAIN):
                    return
                raise
            stream = iostream.IOStream(connection, io_loop=self.io_loop)
            HTTPConnection(stream, address, self.application)
复制代码

  HTTPServer是web服务器端的入口, 首先, 我们通过实例化这个对象来指定web服务器所配套的web应用. 然后, 调用它的listen方法, 就会通过ioloop监听指定端口的可读事件, 也就是客户端连接. 当有客户端连接时, HTTPServer会首先实例化一个IOStream对象, 这个对象相当于对客户端socket对象的封装, 然后新建一个HTTPConnection对象去处理这个新连接.

  HTTPConnection的主要代码如下:

复制代码
import tornado.httputil


class HTTPConnection:

    def __init__(self, stream, address, application):
        self.stream = stream
        self.address = address
        self.application = application
        self.stream.read_until("\r\n\r\n", self._on_headers)

    def _on_headers(self, data):
        eol = data.find("\r\n")
        start_line = data[:eol]
        method, uri, version = start_line.split(" ")
        headers = tornado.httputil.HTTPHeaders.parse(data[eol:])  # 这里会把请求数据解析成一个字典对象返回
        self._request = HTTPRequest(
            connection=self, method=method, uri=uri, version=version,
            headers=headers, remote_ip=self.address[0])
        self.application(self._request)

    def write(self, chunk):
        self.stream.write(chunk, self._on_write_complete)

    def _on_write_complete(self):
        self.stream.close()
复制代码

  在HTTPServer接收到新连接后, 由HTTPConnection来处理这个新连接. 首先, HTTPConnection使用IOStream异步回调地读取客户端的请求数据, 解析出请求行的内容以及请求头数据之后, 将这些数据封装到一个HTTPRequest对象中, 让web应用去处理这个请求对象. web应用处理结束后, 再调用它的write方法, 通过IOStream将响应数据写入, 最后关闭socket连接, 这样一个请求就处理完毕了.

  HTTPRequest主要是对请求数据的封装, 没什么好说的. 它的主要代码如下:

复制代码
import urlparse


class HTTPRequest:

    def __init__(self, method, uri, version="HTTP/1.0", headers=None,
                 remote_ip=None, connection=None):
        self.method = method
        self.uri = uri
        self.version = version
        self.headers = headers
        self.remote_ip = remote_ip
        self.host = self.headers.get("Host") or "127.0.0.1"
        self.connection = connection

        scheme, netloc, path, query, fragment = urlparse.urlsplit(uri)
        self.path = path
        self.query = query

    def write(self, chunk):
        # web应用调用这个方法写入响应数据,通过HTTPConnection最终由IOStream来写入数据
        self.connection.write(chunk)
复制代码

  这样, 一个http服务器就完成了, 它的流程像是下面这样:

   

四.web应用

  web应用的职责是, 接收web服务器发过来的请求数据, 根据这些数据执行一些逻辑之后, 返回响应结果. tornado的web模块就负责web应用这块.

  我们首先分析web.Application类, 简单来说, 它的代码差不多是下面这样:

复制代码
import re


class Application(object):
    """
    实际上, 这个类还做了其它一些工作, 比如设置debug模式, 指定wsgi等等
    另外, 路由的映射关系实际上是由web.URLSpec这个类进行封装的
    但是, 这些都不是重点, 这段代码只是为了方便理解, 说明Application主要做什么事
    """

    def __init__(self, handlers):
        self.handlers = handlers

    def __call__(self, request):
        path = request.path
        h_obj = None
        for pattern, handler in self.handlers:
            if re.match(pattern, path):
                h_obj = handler(request)
                h_obj._execute()
复制代码

  web.Application是web应用的入口, 由刚才的代码可以看出来, 它负责路由的分发. 首先我们实例化对象并传入handlers = [(r'/', MainHandler)]这样的参数, 然后调用这个Application对象并传入request, 它就会根据请求数据所给的路径找到对应的handler类, 实例化这个handler类并调用handler的_execute方法, 让handler对象来执行具体的操作.

  一般来说, 我们指定的handler类都会继承web.RequestHandler, 它的代码差不多是下面这样:

复制代码
import httplib


class RequestHandler(object):
    """
    这里的RequestHandler也只列出了最核心的代码
    除此之外, RequestHandler还实现了获取和设置cookie, 用户认证以及防csrf攻击等功能
    """

    def __init__(self, request):
        self.request = request
        self._headers = {
            "Content-Type": "text/html; charset=UTF-8",
        }
        self._write_buffer = []
        self._status_code = 200

    def get(self):
        # 这个方法需要我们根据请求类型自己定义,除get外,还支持head,post,delete和put
        raise HTTPError(405)

    def write(self, chunk):
        self._write_buffer.append(chunk.encode('utf8'))

    def finish(self):
        # 首先生成响应状态和响应头
        lines = [self.request.version + " " + str(self._status_code) + " " +
                 httplib.responses[self._status_code]]
        lines.extend(["%s: %s" % (n, v) for n, v in self._headers.iteritems()])
        headers = "\r\n".join(lines) + "\r\n\r\n"
        # 然后生成响应内容
        chunk = "".join(self._write_buffer)
        self.request.write(headers + chunk)

    def _execute(self):
        getattr(self, self.request.method.lower())()
        self.finish()
复制代码

  RequestHandler对响应进行了封装. Application调用它的_execute方法, 就会根据请求类型反射到我们所重写的方法, 比如get方法. 在执行完我们定义的方法之后, 调用自己的finish方法来生成响应消息, 并通过request将响应消息返回.

  Application和RequestHandler实现了一个web应用的框架, 用户只需要继承RequestHandler类, 然后重写请求类型的对应方法就可以了. 总的来看, 这个web应用的处理流程如下:

五.总结

  综上所述, tornado服务器可以分为四层: 事件循环层, TCP传输层, HTTP层和web应用, 工作起来像是下面这样:

  在写demo应用的阶段, 我们做了四件事:

  • 继承RequestHandler, 重写请求类型对应的方法, 比如get方法
  • 定义Application的路由
  • 为HTTPServer指定app和端口
  • 启动IOLoop

  这样, 一个tornado应用就启动了, 一个请求的流程是这样的:

  1. IOLoop监听到新的客户端连接, 通知HTTPServer
  2. HTTPServer实例化一个HTTPConnection来处理这个新的客户端
  3. HTTPConnection利用IOStream异步读取客户端的请求数据
  4. IOStream通过IOLoop注册可读事件, 在事件触发时读取数据, 然后调用HTTPConnection的回调函数
  5. HTTPConnection将读取的请求数据进行解析, 用一个HTTPRequest对象封装解析后的请求数据
  6. HTTPConnection把HTTPRequest发送给Application
  7. Application通过路由找到对应的RequestHandler, 让它来处理请求
  8. RequestHandler通过反射找到请求类型对应的处理方法, 处理请求
  9. 处理完成后, RequestHandler调用HTTPRequest的write方法写入响应结果
  10. HTTPRequest将响应结果交给HTTPConnection, HTTPConnection使用IOStream来写入响应数据
  11. IOStream继续使用IOLoop异步地写入数据, 写入完毕后, 调用HTTPConnection的回调函数
  12. HTTPConnection被回调, 它关闭socket连接, 请求结束 (http1.1或者keep-alive的情况不讨论)

相关教程