Tornado源码分析之HTTP服务请求解析
作者:BruceChen7
listen fd的读事件回调
代码版本 tornado1.2版本下httpserver.py
Tornado定义类HTTPServer来表示一个HTTP服务器,该类在构造函数中会传入事件循环ioloop,和Application对象。同时该HTTPServer提供了如下几种方法:
- listen() 表示该Server的监听方法,调用该方法时,通过调用bind已经将套接字设置成non-blocking,并使用socket.SO_REUSEADDR。
- bind() 方法表示Server绑定端口,并设置socket 为non-blocking.
- start() 方法则是在ioloop中启动该服务器。在不给start()方法传入任何参数的情况下,使用单进程模型的IOLoop。
在start()方法启动服务器时,要向IOLoop中注册对listen fd的可读事件的回调,listen fd可读,表示有新的客户接入到HTTPServer中。我们来看看接入HTTPServer的事件回调_handle_events
def _handle_events(self, fd, events): while True: try: connection, address = self._socket.accept() except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return raise if self.ssl_options is not None: assert ssl, "Python 2.6+ and OpenSSL required for SSL" try: connection = ssl.wrap_socket(connection, server_side=True, do_handshake_on_connect=False, **self.ssl_options) except ssl.SSLError, err: if err.args[0] == ssl.SSL_ERROR_EOF: return connection.close() else: raise except socket.error, err: if err.args[0] == errno.ECONNABORTED: return connection.close() else: raise try: if self.ssl_options is not None: stream = iostream.SSLIOStream(connection, io_loop=self.io_loop) else: stream = iostream.IOStream(connection, io_loop=self.io_loop) HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders) except: logging.error("Error in connection callback", exc_info=True)
不看ssl处理部分。首先基于listen fd调用accept函数,获取connection和address,注意到tornado处理了spurious wakeup的情况:
if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): return
也就是说,当前listen fd,没有数据可以读了,表示没有新的客户连接过来,那么就应该返回。回调函数_handle_events,创建了IOStream对象stream,
stream = iostream.SSLIOStream(connection, io_loop=self.io_loop)
注意到创建IOStream对象会给单进程的IOLoop添加新的回调函数,该函数是用来处理accept fd的读事件,此时,在有客户接入过来,那么IOLoop中的handler有如下几个:
其中,
- 描述符4是listen fd 读事件的处理回调
- 描述符7 是用来唤醒IOLoop _read_waker
- 描述符9 是新accept文件描述符读事件的回调.
accept fd 回调函数
def _handle_events(self, fd, events): # 确保该连接还存在 if not self.socket: logging.warning("Got events for closed stream %d", fd) return try: # 如果该连接的读事件产生了,调用读回调 if events & self.io_loop.READ: self._handle_read() if not self.socket: return # 如果是写事件 if events & self.io_loop.WRITE: # 如果该socket是客户端创建的socket, 其已经被服务器处理 if self._connecting: # 连接服务器端 self._handle_connect() # 普通的写事件回调 self._handle_write() if not self.socket: return # 错误 if events & self.io_loop.ERROR: # IOLoop中删除该文件描述符对应的handler # 并关闭连接. self.close() return state = self.io_loop.ERROR # 如果正在读 if self.reading(): # 添加状态继续读 state |= self.io_loop.READ if self.writing(): # 如果正在写,添加状态写 state |= self.io_loop.WRITE if state != self._state: self._state = state # 更新对应fd对应的关注状态 self.io_loop.update_handler(self.socket.fileno(), self._state) except: logging.error("Uncaught exception, closing connection.", exc_info=True) self.close() raise
HTTPConnection对象的创建
接着我们将思路放回到listen fd 的回调函数_handle_events中,在IOStream对象stream创建后,_handle_events将创建HTTPConnection
HTTPConnection(stream, address, self.request_callback, self.no_keep_alive, self.xheaders)
在HTTPConnnection构造函数中,可以看到起调用了
self.stream.read_until("\r\n\r\n", self._header_callback)
此时,HTTPConnection中的成员stream,表示的是accept fd中的stream,read_until函数将会从accept fd中一直读到\r\n\n,然后调用_header_callback来解析HTTP的头部字段。
read_until函数
read_until从套接字中就是读到指定的分隔符为止
def read_until(self, delimiter, callback): """Call callback when we read the given delimiter.""" assert not self._read_callback, "Already reading" self._read_delimiter = delimiter self._read_callback = stack_context.wrap(callback) while True: # See if we've already got the data from a previous read if self._read_from_buffer(): return self._check_closed() if self._read_to_buffer() == 0: break self._add_io_state(self.io_loop.READ)
readl_until有两种退出方式:一种是从buffer中读到数据后,直接返回,另一种是将数据读到buffer,_\read_to_buffer就是将数据读到buffer,如果没有数据,则将该socket的读事件添加进来。
accept fd获取后,显然该fd对应的IOStream中的缓冲区为0,所以_read_from_buffer返回False,流程将执行_read_to_buffer。
从缓冲区读_read_from_buffer
def _read_from_buffer(self): """Attempts to complete the currently-pending read from the buffer. Returns True if the read was completed. """ if self._read_bytes: if self._read_buffer_size() >= self._read_bytes: num_bytes = self._read_bytes callback = self._read_callback self._read_callback = None self._read_bytes = None self._run_callback(callback, self._consume(num_bytes)) return True elif self._read_delimiter: _merge_prefix(self._read_buffer, sys.maxint) loc = self._read_buffer[0].find(self._read_delimiter) if loc != -1: callback = self._read_callback delimiter_len = len(self._read_delimiter) self._read_callback = None self._read_delimiter = None self._run_callback(callback, self._consume(loc + delimiter_len)) return True return False
read_from_buffer有两种形式的读:
- 一种是读指定字节的数据
- 一种是读到指定的分隔符
应该注意的是_read_from_buffer中指定了回调函数,意思是从socket中读的数据后,使用回调函数来消费。应该注意的是_read_from_buffer并跟socket打交道,其假设所有的数据已经在buffer中了。
将数据读到缓冲区
从socket中读数据到缓冲区,使用的是_read_to_buffer。read_to_buffer实际调用的是_read_from_socket,其从non-blocking中读一次,最多读4096个字节。注意错误的处理,当我们发现从socket中读,发生了其他的错误(除了EAGAIN)的时候,就应该关闭连接。
def _read_to_buffer(self): """Reads from the socket and appends the result to the read buffer. Returns the number of bytes read. Returns 0 if there is nothing to read (i.e. the read returns EWOULDBLOCK or equivalent). On error closes the socket and raises an exception. """ try: chunk = self._read_from_socket() except socket.error, e: # ssl.SSLError is a subclass of socket.error logging.warning("Read error on %d: %s", self.socket.fileno(), e) self.close() raise if chunk is None: return 0 self._read_buffer.append(chunk) if self._read_buffer_size() >= self.max_buffer_size: logging.error("Reached maximum read buffer size") self.close() raise IOError("Reached maximum read buffer size") return len(chunk)
该函数总体来说就是讲将据读到bufferIOStream中的_read_buffer中,并返回实际读的大小。注意错误的处理:如果所读的数据过大,那么也应该关闭连接。
从socket读一次数据_read_from_socket
def _read_from_socket(self): """Attempts to read from the socket Returns the data read or None if there is nothing to read. May be overridden in subclasses. """ try: # 尽可能读得多 chunk = self.socket.recv(self.read_chunk_size) except socket.error, e: if e.args[0] in (errno.EWOULDBLOCK, errno.EAGAIN): # 没有数据 return None else: raise if not chunk: # 客户端数据没有了,那么表示客户端关闭了连接 self.close() return None return chunk
LT模式下从socket中读数据_handle_read()
def _handle_read(self): while True: try: # Read from the socket until we get EWOULDBLOCK or equivalent. # SSL sockets do some internal buffering, and if the data is # sitting in the SSL object's buffer select() and friends # can't see it; the only way to find out if it's there is to # try to read it. result = self._read_to_buffer() except Exception: self.close() return if result == 0: break else: if self._read_from_buffer(): return
在LT模式下从socket中读分成了两步:
- 从socket中读数据读到_read_to_buffer
- 然后从buffer中读。
从socket读,要一直读到出现EWOULDBLOCK或者是EAGAIN。注意到while True
,就是不停的读,读到没有数据。
以上就是Tornado源码分析之HTTP服务请求解析的详细内容,更多关于Tornado HTTP服务请求的资料请关注脚本之家其它相关文章!