程序员人生 网站导航

tornado协程(coroutine)原理

栏目:互联网时间:2015-06-16 08:18:18

tornado中的协程是如何工作的

本文将按以下结构进行组织,说明tornado中协程的履行原理

  • 协程定义
  • 生成器和yield语义
  • Future对象
  • ioloop对象
  • 函数装潢器coroutine
  • 总结

协程定义

Coroutines are computer program components that generalize subroutines for nonpreemptive multitasking, by allowing multiple entry points for suspending and resuming execution at certain locations.。 ―― [ 维基百科 ]

我们在平常编程中,更习惯使用的是子例程(subroutine),通俗的叫法是函数,或进程。子例程,常常只有1个入口(函数调用,实参通过传给形参开始履行),1个出口(函数return,履行终了,或引发异常,将控制权转移给调用者)。但协程是子例程基础上,1种更加宽泛定义的计算机程序模块(子例程可以看作协程的特例),它可以有多个入口点,允许从1个入口点,履行到下1个入口点之前暂停保存履行状态,等到适合的时机恢复履行状态,从下1个入口点重新开始履行,这也是协程应当具有的能力。

定义
协程代码块
1个入口点和下1个入口点(或退出点)中的代码。
协程模块
由n个入口点代码,和n个协程代码块组成。第1个入口点通常是1个函 数入口点。其组织情势如:函数入口点->协程代码块->入口点->协程代码块…,入口点和代码块相间。
线性模块
1个同步函数的函数体是线性履行的。也就是说1个模块中的每行代码,相继履行,1个模块在履行中,如果还没有履行终了,不会去履行其他模块的代码。称这样的代码模块为线性模块。

1个协程模块,如果只含有单1入口点和单1协程代码块(假定这个协程代码块全是同步代码),固然这个协程模块是1个线性履行模块,但是如果含有多个入口点和多个协程代码块,那末就不是1个线性模块。那末履行1个协程模块进程实际是分散的(不同的时间段,履行不同的协程代码块,协程代码块的履行时间段,彼此不相交),但也是顺序的(后1个协程代码块在前1个协程代码块履行结束后才履行)。两个属于同1协程模块的相继协程代码块履行的中间时间间隙,可能有很多其他协程模块的协程代码片断在履行。

生成器和yield语义

谈到协程,必须要说说python语义中的生成器(generator)。

在pep255中提到了”simple generator”和”yield语句”(此时还不是”yield表达式”)的实现。1个basic idea,提供1种函数,能够返回中间结果给调用者,然后保护函数的局部状态,以便函数当离开后,也能恢复履行

prep255及第了1个简单的例子,生成斐波那契数列:

def fib(): a, b = 0, 1 while 1: yield b a, b = b, a+b

a,b初始化为0,1。当yield b被履行,1被返回给调用者。当fib恢复履行,a,变成了1,b也是1,然后将1返回给调用者,如此循环。generator是1种非常自然的编程方式,由于对fib来讲,它的功能不变,都还是不断生成下1个斐波那契数。而对fib的调用者来讲,fib像1个列表的迭代器,不断迭代,可以获得下1个斐波那契数。

def caller(): for num in fib(): print num

生成器是1个含有yield表达式的函数,此时该函数叫生成器。1个生成器永久是异步的,即便生成器模块中含有阻塞代码。由于调用1个生成器,生成器的参数会绑定到生成器,结果返回是1个生成器对象,它的类型是types.GeneratorType,不会去履行生成器主模块中的代码。

每次调用1个GeneratorType对象的next方法,生成器函数履行到下1个yield语句或,或碰到1个return语句,或履行到生成器函数结束。

在pep342中,对Generator进1步加强,增加了GeneratorType的send方法,和yield表达式语义。yield表达式,可以作为等号右侧的表达式。如果对Generator调用send(None)方法,生成器函数会从开始1直履行到yield表达式。那末下1次对Generator调用send(argument),Generator恢复履行。那末可以在生成器函数体内取得这个argument,这个argument将会作为yield表达式的返回值。

从上面可以看到,Generator已具有协程的1些能力。如:能够暂停履行,保存状态;能够恢复履行;能够异步履行。

但是此时Generator还不是1个协程。1个真实的协程能够控制代码甚么时候继续履行。而1个Generator履行遇到1个yield表达式 或语句,会将履行控制权转移给调用者

However, it is still possible to implement coroutines on top of a generator facility, with the aid of a top-level dispatcher routine (a trampoline, essentially) that passes control explicitly to child generators identified by tokens passed back from the generators。 ―― [ 维基百科 ]

在维基百科中提到,可以实现1个顶级的调度子例程,将履行控制权转移回Generator,从而让它继续履行。在tornado中,ioLoop就是这样的顶级调度子例程,每一个协程模块通过,函数装潢器coroutine和ioLoop进行通讯,从而ioLoop可以在协程模块履行暂停后,在适合的时机重新调度协程模块履行。

不过,接下来还不能介绍coroutine和ioLoop,在介绍这二者之前,先得明白tornado中在协程环境中1个非常重要的类Future.

Future类

Future类位于tornado源码的concurrent模块中。Future类的完全代码,请查看tornado的源码。在这里截取1部份代码作为分析之用

class Future(object): def done(self): return self._done def result(self, timeout=None): self._clear_tb_log() if self._result is not None: return self._result if self._exc_info is not None: raise_exc_info(self._exc_info) self._check_done() return self._result def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): self._result = result self._set_done() def _set_done(self): self._done = True for cb in self._callbacks: try: cb(self) except Exception: app_log.exception('exception calling callback %r for %r', cb, self) self._callbacks = None
Future类重要成员函数:
def done(self):
Future的_result成员是不是被设置
def result(self, timeout=None):
获得Future对象的结果
def add_done_callback(self, fn):
添加1个回调函数fn给Future对象。如果这个Future对象已done,则直接履行fn,否则将fn加入到Future类的1个成员列表中保存。
def _set_done(self):
1个内部函数,主要是遍历列表,逐一调用列表中的callback函数,也就是前面add_done_calback加如来的。
def set_result(self, result):
给Future对象设置result,并且调用_set_done。也就是说,当Future对象取得result后,所有add_done_callback加入的回调函数就会履行。

Future封装了异步操作的结果。实际是它类似于在网页html前端中,图片异步加载的占位符,但加载后终究也是1个完全的图片。Future也是一样用途,tornado使用它,终究希望它被set_result,并且调用1些回调函数。Future对象实际是coroutine函数装潢器和IOLoop的沟通使者,有着非常重要的作用。

IOLoop类

tornado框架的底层核心类,位于tornado的ioloop模块。功能方面类似win32窗口的消息循环。每一个窗口可以绑定1个窗口进程。窗口进程主要是1个消息循环在履行。消息循环主要任务是利用PeekMessage系统调用,从消息队列中取出各种类型的消息,判断消息的类型,然后交给特定的消息handler进行履行。

tornado中的IOLoop与此相比具有很大的类似性,在协程运行环境中担负着协程调度器的角色, 和win32的消息循环本质上都是1种事件循环,等待事件,然后运行对应的事件处理器(handler)。不过IOLoop主要调度处理的是IO事件(如读,写,毛病)。除此以外,还能调度callback和timeout事件。

在本博文中,我们暂时只关注callback事件,由于这个与协程调度的相干性最大。

def add_future(self, future, callback): assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))

add_future函数在基类IOLoop中实现,函数参数是1个Future对象和1个callback函数。当Future对象被set_result,履行1个回调函数,是个lambda函数,在lambda函数中调用IOLoop的add_callback函数。将add_future的参数callback加入到IOLoop的统1调度中,让callback在IOLoop下1次迭代中履行。

def add_callback(self, callback, *args, **kwargs): with self._callback_lock: if self._closing: raise RuntimeError("IOLoop is closing") list_empty = not self._callbacks self._callbacks.append(functools.partial( stack_context.wrap(callback), *args, **kwargs)) if list_empty and thread.get_ident() != self._thread_ident: self._waker.wake()

add_callback函数主要在IOLoop的子类PollIOLoop中实现。也很容易理解。
将传入的callback函数,利用偏函数进行包装,将所有callback真正运行时需要的参数,都绑定到生成的偏函数中,实际上就是找个地方把callback运行时需要的参数保存起来。将包装好的偏函数加入到回调函数列表。当IOLoop下1次迭代运行的时候,遍历callback函数列表,运行偏函数的时候,就不再需要传入参数履行,效果同等于用实参运行callback。

IOLoop对象调用start函数,会运行event loop。在event loop中,首先遍历callback列表,履行回调函数,然后遍历timeout列表,履行timeoutCallback。最后才履行ioHandler。

coroutine函数装潢器

函数装潢器本质是1个函数,我们称这个函数为装潢器函数。装潢器函数签名含有1个 函数对象(可调用对象callable)参数,返回的结果是1个装潢器内部定义的1个新函数对象。如果返回的函数对象被调用,装潢器函数的参数(函数对象)也会被调用。不过,会在这个参数(装潢器函数参数)调用前做1些事情,或在这个参数调用后做1些事情。实际上做的这些事情,就是利用内部自定义的函数对象对参数(原函数)的1些装潢(额外操作)

当1个函数被装潢器装潢。那末以后调用这个函数(此函数已非彼函数)的时候,实际上调用的是装潢器函数返回的内部函数对象。理解tornado中coroutine修饰的函数如何履行,主要是 理解coroutine这个装潢器函数内部定义的新函数对象所做的那些事儿。

def coroutine(func, replace_callback=True): return _make_coroutine_wrapper(func, replace_callback=True)

_make_coroutine_wrapper函数

class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() self.stack_context_deactivate = None if self.handle_yield(first_yielded): self.run() def run(self): if self.running or self.finished: return try: self.running = True while True: future = self.future if not future.done(): return self.future = None try: try: value = future.result() except Exception: self.had_exception = True yielded = self.gen.throw(*sys.exc_info()) else: yielded = self.gen.send(value) except (StopIteration, Return) as e: self.finished = True self.future = _null_future self.result_future.set_result(getattr(e, 'value', None)) self.result_future = None return except Exception: self.finished = True self.future = _null_future self.result_future.set_exc_info(sys.exc_info()) self.result_future = None return if not self.handle_yield(yielded): return finally: self.running = False def handle_yield(self, yielded): try: self.future = convert_yielded(yielded) except BadYieldError: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if not self.future.done() or self.future is moment: self.io_loop.add_future( self.future, lambda f: self.run()) return False return True

以上的代码其实都对源码进行了1些调剂。但函数调用进入到Runner的构造函数的时候,也就是说Generator的第1次履行已终了。那末接下来,调用的是,handle_yield,对第1次Generator履行的返回结果进行处理。固然返回的结果多是多种类型。多是1个Future对象,list,dict,或其他类型对象,或普通类型。通过convert_yield,self.future保存的是1个Future对象的援用(第1次Generator履行返回的结果)。此时如果self.future还没被set_result。对为self.future绑定1个done_callback(lambda f: self.run()),加入到self.io_loop中。

在前文说到。ioloop的add_future函数中,实际上是只有当参数future,在某个地方调用了set_result, 才在履行done_callback时,将参数callback加入到IOLoop中调度。换句话说。Runner类中,self.run要等到self.future在某个代码块被set_result,IOLoop才有可能在下1次迭代的时候履行它,从而调度协程继续恢复履行。而在self.run函数中,我们可以看到将会通过Generator的send函数,恢复履行下1个协程代码块。所以关键的问题是我们需要明白Runner类中self.future,在甚么时候被set_result

从这里我们可以看到Future类的重要作用。future.set_result起到的作用是:
发送1个信号,告知IOLoop去调度暂停的协程继续履行。

我们结合下面的代码例子就能够明白协程调度的全部流程是如何进行的了。

import tornado.ioloop from tornado.gen import coroutine from tornado.concurrent import Future @coroutine def asyn_sum(a, b): print("begin calculate:sum %d+%d"%(a,b)) future = Future() def callback(a, b): print("calculating the sum of %d+%d:"%(a,b)) future.set_result(a+b) tornado.ioloop.IOLoop.instance().add_callback(callback, a, b) result = yield future print("after yielded") print("the %d+%d=%d"%(a, b, result)) def main(): asyn_sum(2,3) tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": main()

实际的运行场景是:1个协程(asyn_sum)遇到yield表达式被暂停履行后,IOLoop调用另外1个代码段(asyn_sum中的回调函数callback)履行,而在callback中,恰好可以访问到属于被暂停协程(asyn_sum)中的future对象(也就是Runner对象中的self.future的援用),callback中将future调用set_result,那末这个暂停的协程(asyn_sum)在IOLoop下1次迭代调度回调函数时中,被恢复履行。

总结

tornado中的协程实现基于python语言的Generator并且结合1个全局的调度器IOLoop,Generator通过函数装潢器coroutine和IOLoop进行通讯。IOLoop并没有直接控制能力,调度恢复被暂停的协程继续履行。future对象在协程中被yield。协程暂停,IOLoop调度另外1个代码模块履行,而在这个履行的代码模块中恰好,可以访问这个future对象,将其set_result,结果通过IOLoop间接恢复暂停协程履行。不同履行代码模块中,同享future对象,彼此合作,协程调度得顺利履行。

从这类意义上来讲,future对象,像window中的Event内核对象的作用。window中的event用于线程中同步。而协程中的yield future相当于WaitForSingleObject(event_object), 而future.set_result(result)。相当于SetEvent(event_object)。而future和Event的不同点在于,协程借future来恢复履行,而线程借Event来进行线程间同步。


------分隔线----------------------------
------分隔线----------------------------

最新技术推荐