Tornado源码阅读(一) --- IOLoop之创建ioloop

本文的测试环境是在MacOS,因此使用的多路复用的网络IO为kqueue而不是epoll,对应的IOLoop实例对象也是KQueueIOLoop。

在介绍Epoll模式的笔记中,最后写了一个tornado的使用epoll的例子。这个例子是如何工作的呢?下面来读一读tornado的源码。

启动一个tornado server很简单,只需要下面几code:

io_loop = tornado.ioloop.IOLoop.current()
callback = functools.partial(connection_ready, sock)
io_loop.add_handler(sock.fileno(), callback, io_loop.READ)
io_loop.start()

tornado.ioloop.IOLoop.current()

实际上是创建一个IO循环的对象,这里是KQueueIOLoop,Linux的系统则是EPollIOLoop。

下面是current的源码,该方法目的就是从local线程中获取KQueueIOLoop(如果存在的话,否则则新建一个)

@staticmethod
def current(instance=True):
    current = getattr(IOLoop._current, "instance", None)
    if current is None and instance:
        return IOLoop.instance()
    return current

程序首先判断 IOLoop._current对象(_current对象是一个线程local)的instance属性,如果没有current,则调用IOLoop.instance()方法创建一个IOLoop的实例作为currnet返回。由于tornado的包装,实际上IOLoop返回的并不是IOLoop的实例对象,而是KQueueIOLoop实例对象。

为什么IOLoop实例化的对象KQueueIOLoop呢?想知道答案就得揭开IOLoop.instance()神秘面纱,表面上看,该方法创建的IOLoop实例对象,并绑定到IOLoop._instance上。

@staticmethod
def instance():
    if not hasattr(IOLoop, "_instance"):
        with IOLoop._instance_lock:
            if not hasattr(IOLoop, "_instance"):
                # 新实例要经过两次check检查
                IOLoop._instance = IOLoop()
    return IOLoop._instance

IOLoop继承自Configurable基类,IOLoop 自身没有常见的初始化"构造函数"(init)。显然需要再查看Configurable基类。不看不知道,一看tornado的作者还真会玩。Configurable是一个设计很精巧的类,通过不同子类的继承来适配。基类在子类创建的时候做一些适配的事情。相比init,new称之为构造函数更准确。

class Configurable(object):
    def __new__(cls, *args, **kwargs):
        base = cls.configurable_base()
        init_kwargs = {}

        if cls is base:
            # 通过调用configured_class方法,可以绑定 base.__impl_class 为epoll还是kqueue。
            impl = cls.configured_class()
            if base.__impl_kwargs:
                init_kwargs.update(base.__impl_kwargs)
        else:
            impl = cls

        init_kwargs.update(kwargs)

        # impl 对象为对应的网络io模式,对于unix系统,因此这里是 kqueue方式,即位KQueueIOLoop类。
        instance = super(Configurable, cls).__new__(impl)

        # 通过initialize 方法,传接 并返回,这个就是上面current对象,即
        instance.initialize(*args, **init_kwargs)
        return instance

IOLoop在创建的时候,通过基类new方法调用子类的configurable_base和configurable_default适配不同子类的特性。这里通过IOLoop的configurable_default方法选择了unix系统的kqueue方式。

@classmethod
def configurable_default(cls):
    if hasattr(select, "epoll"):
        from tornado.platform.epoll import EPollIOLoop
        return EPollIOLoop
    if hasattr(select, "kqueue"):
        # Python 2.6+ on BSD or Mac
        from tornado.platform.kqueue import KQueueIOLoop
        return KQueueIOLoop
    from tornado.platform.select import SelectIOLoop
    return SelectIOLoop

根据平台确定了impl为kqueue之后,将会通过new创建实例对象,就是这一步,创建了KQueueIOLoop而不是IOLoop的对象。Configurable自身不定义initialize。这里就调用了KQueueIOLoop的initialize方法。

class KQueueIOLoop(PollIOLoop):
    def initialize(self, **kwargs):
        super(KQueueIOLoop, self).initialize(impl=_KQueue(), **kwargs)

KQueueIOLoop的方法很简单,其中实现了一个_KQueue,这个类用于操作unix系统上的kqueue的网络io相关封装,例如注册事件,poll调用等。然后KQueueIOLoop带用其父类(PollIOLoop)的initialize方法。有没有发现,调用的控制权一直在各个父类基类中跳转。大概是 IOLoop -> Configurable -> IOLoop -> KQueueIOLoop ->PollOLoop -> IOLoop -> PolIOLoop。

class PollIOLoop(IOLoop):

    def initialize(self, impl, time_func=None, **kwargs):
        # 调用父类的IOLoop的initialize方法
        super(PollIOLoop, self).initialize(**kwargs)
        # _KQueue类
        self._impl = impl

PollIOLoop继承自IOLoop,PollIOLoop调用其父类的initialize方法。此时调用make_current为None,因此又会调用IOLoop.current()的方法,怎么又是IOLoop.current?我们不就是从客户端逻辑(相对于库)调用这个方法进来的么?注意,不同于第一次客户端调用的时候,当时intances是True。也就是此时直接返回IOLoop._current.instance,前面正是因为current为None,才需要通过IOLoop的创建对象。当然此时current为None,即直接返回None。接下来自然运行make_current方法。

def initialize(self, make_current=None):
    if make_current is None:
        if IOLoop.current(instance=False) is None:
            self.make_current()
    elif make_current:
        if IOLoop.current(instance=False) is None:
            raise RuntimeError("current IOLoop already exists")
        self.make_current()

make_current方法干点啥好呢?当然你肯定想到了,既然我们之前IOLoop.current方法是为了获取IOLoop._current.instance,并且一直为None,那么make_current正好填补这个空白,创建一个绑定就好嘛。

def make_current(self):
    """Makes this the `IOLoop` for the current thread.
    将IOLoop实例对象绑定到local线程_current的instance属性。

    """
    IOLoop._current.instance = self

的确,make_current把当前的类实例(KQueueIOLoop)创建并绑定。通过前面巧妙的设计,根据平台选择了网络io的模式。接下来还得根据io模式绑定IO监听事件。继续阅读PollIOLoop,可以发现通过add_handler方法喝Waker实现。

class PollIOLoop(IOLoop):

    def initialize(self, impl, time_func=None, **kwargs):

        ...

        # posix风格的文件读取操作,网络io本质也是文件操作
        self._waker = Waker()
        # 添加事件绑定,前面条用子类KQueueIOLoop的时候,传了_KQueue类
        self.add_handler(self._waker.fileno(),
                         lambda fd, events: self._waker.consume(),
                         self.READ)

add_handler方法处理文件描述符,其中stack_context类通过wrap包装一个上下文类似的东西。具体数据结构没有仔细看,留待日后研究,总而言之,这个方法借助之前的_KQueue类注册网络io事件。

def add_handler(self, fd, handler, events):
    fd, obj = self.split_fd(fd)
    self._handlers[fd] = (obj, stack_context.wrap(handler))
    self._impl.register(fd, events | self.ERROR)

此时,ioloop对象成功的创建。创建ioloop对象之后,server还不回启动,需要调用start启动。在启动之前,也需要通过add_hanndler绑定事件函数。至于start的工作原理,下回再研究