python Tornado事件循环示例源码解析
作者:BruceChen7
这篇文章主要为大家介绍了python Tornado事件循环示例源码解析,有需要的朋友可以借鉴参考下,希望能够有所帮助,祝大家多多进步,早日升职加薪
hello world
#!/usr/bin/env python import tornado.httpserver import tornado.ioloop import tornado.options import tornado.web from tornado.options import define, options define("port", default=8888, help="run on the given port", type=int) class MainHandler(tornado.web.RequestHandler): def get(self): self.write("Hello, world") def main(): tornado.options.parse_command_line() application = tornado.web.Application([ (r"/", MainHandler), ]) http_server = tornado.httpserver.HTTPServer(application) http_server.listen(options.port) tornado.ioloop.IOLoop.instance().start() if __name__ == "__main__": main()
tornado提供了高效的异步机制,我们先不管Application实例化过程,以及http_server创建socket、bind、listen的过程,直接调IOLoop.instance().start进行源码分析。
IOLoop.instance()
@classmethod def instance(cls): """Returns a global IOLoop instance. Most single-threaded applications have a single, global IOLoop. Use this method instead of passing around IOLoop instances throughout your code. A common pattern for classes that depend on IOLoops is to use a default argument to enable programs with multiple IOLoops but not require the argument for simpler applications: class MyClass(object): def __init__(self, io_loop=None): self.io_loop = io_loop or IOLoop.instance() """ if not hasattr(cls, "_instance"): cls._instance = cls() return cls._instance
显然是一个单例模式,注意tornado中的注释,大多数单线程只能有一个ioloop。
start()
这个函数将开始事件循环。
def start(): """ Starts the I/O loop. The loop will run until one of the I/O handlers calls stop(), which will make the loop stop after the current event iteration completes. """ # 判断是否设置了,如果是,将直接退出。 if self._stopped: self._stopped = False return self._running = True while True: # Never use an infinite timeout here - it can stall epoll # 设置轮询时间 poll_timeout = 0.2 # Prevent IO event starvation by delaying new callbacks # to the next iteration of the event loop. callbacks = self._callbacks self._callbacks = [] # 及时调用回调函数 for callback in callbacks: self._run_callback(callback) if self._callbacks: poll_timeout = 0.0 # 如果设置了超时时间 if self._timeouts: # 获取当前时间 now = time.time() while self._timeouts and self._timeouts[0].deadline <= now: timeout = self._timeouts.pop(0) self._run_callback(timeout.callback) if self._timeouts: milliseconds = self._timeouts[0].deadline - now poll_timeout = min(milliseconds, poll_timeout) # 再一次检查事件循环是否在运行 if not self._running: break # 目前不清楚作用 if self._blocking_signal_threshold is not None: # clear alarm so it doesn't fire while poll is waiting for # events. signal.setitimer(signal.ITIMER_REAL, 0, 0) try: # 开始等待事件发生 # _impl初始化和poll源代码见下面 event_pairs = self._impl.poll(poll_timeout) except Exception, e: # Depending on python version and IOLoop implementation, # different exception types may be thrown and there are # two ways EINTR might be signaled: # * e.errno == errno.EINTR # * e.args is like (errno.EINTR, 'Interrupted system call') if (getattr(e, 'errno', None) == errno.EINTR or (isinstance(getattr(e, 'args', None), tuple) and len(e.args) == 2 and e.args[0] == errno.EINTR)): continue else: raise if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, self._blocking_signal_threshold, 0) # Pop one fd at a time from the set of pending fds and run # its handler. Since that handler may perform actions on # other file descriptors, there may be reentrant calls to # this IOLoop that update self._events self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() try: # 见下面的分析 self._handlers[fd](fd, events) except (KeyboardInterrupt, SystemExit): raise except (OSError, IOError), e: if e.args[0] == errno.EPIPE: # Happens when the client closes the connection pass else: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) except: logging.error("Exception in I/O handler for fd %d", fd, exc_info=True) # reset the stopped flag so another start/stop pair can be issued self._stopped = False if self._blocking_signal_threshold is not None: signal.setitimer(signal.ITIMER_REAL, 0, 0)
我们看看输入localhost:8888,event_pairs
的值:
可以看出,event_pairs是一个元组列表,其中第一个成员4为accept套接字值,1表示为事件类型。我们看看事件类型为:
_EPOLLIN = 0x001 _EPOLLPRI = 0x002 _EPOLLOUT = 0x004 _EPOLLERR = 0x008 _EPOLLHUP = 0x010 _EPOLLRDHUP = 0x2000 _EPOLLONESHOT = (1 << 30) _EPOLLET = (1 << 31) NONE = 0 READ = _EPOLLIN WRITE = _EPOLLOUT ERROR = _EPOLLERR | _EPOLLHUP | _EPOLLRDHUP
可见上述,是文件描述符4,可读事件发生。
self._impl
我们跟踪self._impl初始化过程,可以看到事件循环核心epoll是如何被使用的。在IOLoop实例化开始:
class IOLoop(object): def __init__(self, impl = None): self._impl = impl or _poll # Choose a poll implementation. Use epoll if it is available, fall back to # select() for non-Linux platforms # hasattr(object, attrname)表示某个对象中是否包含属性 if hasattr(select, "epoll"): # Python 2.6+ on Linux # 在linux上使用的是select.epoll _poll = select.epoll elif hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac _poll = _KQueue else: try: # Linux systems with our C module installed import epoll _poll = _EPoll except: # All other systems import sys if "linux" in sys.platform: logging.warning("epoll module not found; using select()") _poll = _Select
从上面的代码中,可以看到,_poll是对于多个平台下epoll、_kQueue的抽象。看一下select.epoll下的返回结果:其返回对象是一个边沿触发的polling对象,当然也可以用作水平触发。
返回的select.epoll对象的方法:
- epoll.close() 关闭epoll fd文件描述符
- epoll.fileno() 返回epoll fd文件描述符只
- epoll.register(fd, eventmask) 注册fd某个事件
- epoll.poll([timeout = -1, maxevents = -1]) wait for events. timeout in seconds
self._handlers[fd](fd, events)
显然,self._handlers[fd]是返回一个回调函数,用来处理fd上的事件events,这里测试的fd为4,事件EPOLLIN。我们来跟踪一下self._handlers变化过程。看看在IOLoop初始化的过程。
def __init__(self): self._handles = {} .... if os.name != 'nt': r, w = os.pipe() self._set_nonblocking(r) self._set_nonblocking(w) ..... self._waker_reader = os.fdopen(r, "rb", 0) self._waker_writer = os.fdopen(w, "wb", 0) # 显然这是对读管道文件描述符事件处理函数 self.add_handler(r, self._read_waker, self.READ)
add_handler(self, fd, handler, events)
def add_handler(self, fd, handler, events): self._handlers[fd] = stack_context.wrap(handler) self._impl.register(fd, events| self.ERROR)
可见add_handler干了两件事情:
- 回调函数设置,当然不仅仅是简单的将handler赋值,而是使用了stack_context.wrap包裹了该函数,具体实现,见下面。
- epoll对象添加该事件,就是在代码的第二行。所以self._handlers[fd](fd, args)实际上就是设置的回调函数。那么用stack_context.wrap()来包裹究竟是为了什么了?
update_handler(self, fd, events)
def update_handler(self, fd, events): """Changes the events we listen for fd.""" self._impl.modify(fd, events | self.ERROR)
该函数用来修改fd感兴趣的事件
以上就是python Tornado事件循环示例源码解析的详细内容,更多关于python Tornado事件循环的资料请关注脚本之家其它相关文章!