Asyncio与cl-async创建TCPServer对比
Table of Contents
1 概述
本文比较python3的asyncio库和common lisp的cl-async这两个个类似的库在创建TCP server上的一点差异。
2 接口定义
python asyncio库 create_server
接口定义:
def create_server(protocol_factory, host=None, port=None, *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None) # => Create a TCP server (socket type SOCK_STREAM) bound to host and port.
common lisp cl-async tcp-server 接口定义:
(defun tcp-server (bind-address port read-cb event-cb &key connect-cb (backlog -1) stream)) ;; => tcp-server
3 分析
3.1 asyncio
相较于cl-async,python的asynio库引入了protocol和transport两个抽象概念。
3.1.1 Protocol
protocol是对callback based api的抽象。
asyncio 的protocol代码如下:
# file: cpython/Lib/asyncio/protocols.py class BaseProtocol: """Common base class for protocol interfaces. Usually user implements protocols that derived from BaseProtocol like Protocol or ProcessProtocol. The only case when BaseProtocol should be implemented directly is write-only transport like write pipe """ def connection_made(self, transport): """Called when a connection is made. The argument is the transport representing the pipe connection. To receive data, wait for data_received() calls. When the connection is closed, connection_lost() is called. """ def connection_lost(self, exc): """Called when the connection is lost or closed. The argument is an exception object or None (the latter meaning a regular EOF is received or the connection was aborted or closed). """ def pause_writing(self): """Called when the transport's buffer goes over the high-water mark. Pause and resume calls are paired -- pause_writing() is called once when the buffer goes strictly over the high-water mark (even if subsequent writes increases the buffer size even more), and eventually resume_writing() is called once when the buffer size reaches the low-water mark. Note that if the buffer size equals the high-water mark, pause_writing() is not called -- it must go strictly over. Conversely, resume_writing() is called when the buffer size is equal or lower than the low-water mark. These end conditions are important to ensure that things go as expected when either mark is zero. NOTE: This is the only Protocol callback that is not called through EventLoop.call_soon() -- if it were, it would have no effect when it's most needed (when the app keeps writing without yielding until pause_writing() is called). """ def resume_writing(self): """Called when the transport's buffer drains below the low-water mark. See pause_writing() for details. """ class Protocol(BaseProtocol): """Interface for stream protocol. The user should implement this interface. They can inherit from this class but don't need to. The implementations here do nothing (they don't raise exceptions). When the user wants to requests a transport, they pass a protocol factory to a utility function (e.g., EventLoop.create_connection()). When the connection is made successfully, connection_made() is called with a suitable transport object. Then data_received() will be called 0 or more times with data (bytes) received from the transport; finally, connection_lost() will be called exactly once with either an exception object or None as an argument. State machine of calls: start -> CM [-> DR*] [-> ER?] -> CL -> end * CM: connection_made() * DR: data_received() * ER: eof_received() * CL: connection_lost() """ def data_received(self, data): """Called when some data is received. The argument is a bytes object. """ def eof_received(self): """Called when the other end calls write_eof() or equivalent. If this returns a false value (including None), the transport will close itself. If it returns a true value, closing the transport is up to the protocol. """
asyncio的BaseProtocol类定义了 connection_made
, connection_lost
, pause_writing
, resume_writing
回调, 继承BaseProtocol的Protocol类定义了 data_received
, eof_received
方法, 可以看到BaseProtocol的 connection_made
方法接受一个"transport"作为参数, Protocol的 data_reveived
方法接受data作为参数。
3.1.2 Transport
Transport是对socket操作的抽象。
asyncio 的transport代码如下:
# file: cpython/Lib/asyncio/transports.py class BaseTransport: """Base class for transports.""" def __init__(self, extra=None): if extra is None: extra = {} self._extra = extra def get_extra_info(self, name, default=None): """Get optional transport information.""" return self._extra.get(name, default) def is_closing(self): """Return True if the transport is closing or closed.""" raise NotImplementedError def close(self): """Close the transport. Buffered data will be flushed asynchronously. No more data will be received. After all buffered data is flushed, the protocol's connection_lost() method will (eventually) called with None as its argument. """ raise NotImplementedError def set_protocol(self, protocol): """Set a new protocol.""" raise NotImplementedError def get_protocol(self): """Return the current protocol.""" raise NotImplementedError class ReadTransport(BaseTransport): """Interface for read-only transports.""" def is_reading(self): """Return True if the transport is receiving.""" raise NotImplementedError def pause_reading(self): """Pause the receiving end. No data will be passed to the protocol's data_received() method until resume_reading() is called. """ raise NotImplementedError def resume_reading(self): """Resume the receiving end. Data received will once again be passed to the protocol's data_received() method. """ raise NotImplementedError class WriteTransport(BaseTransport): """Interface for write-only transports.""" def set_write_buffer_limits(self, high=None, low=None): """Set the high- and low-water limits for write flow control. These two values control when to call the protocol's pause_writing() and resume_writing() methods. If specified, the low-water limit must be less than or equal to the high-water limit. Neither value can be negative. The defaults are implementation-specific. If only the high-water limit is given, the low-water limit defaults to an implementation-specific value less than or equal to the high-water limit. Setting high to zero forces low to zero as well, and causes pause_writing() to be called whenever the buffer becomes non-empty. Setting low to zero causes resume_writing() to be called only once the buffer is empty. Use of zero for either limit is generally sub-optimal as it reduces opportunities for doing I/O and computation concurrently. """ raise NotImplementedError def get_write_buffer_size(self): """Return the current size of the write buffer.""" raise NotImplementedError def write(self, data): """Write some data bytes to the transport. This does not block; it buffers the data and arranges for it to be sent out asynchronously. """ raise NotImplementedError def writelines(self, list_of_data): """Write a list (or any iterable) of data bytes to the transport. The default implementation concatenates the arguments and calls write() on the result. """ data = b''.join(list_of_data) self.write(data) def write_eof(self): """Close the write end after flushing buffered data. (This is like typing ^D into a UNIX program reading from stdin.) Data may still be received. """ raise NotImplementedError def can_write_eof(self): """Return True if this transport supports write_eof(), False if not.""" raise NotImplementedError def abort(self): """Close the transport immediately. Buffered data will be lost. No more data will be received. The protocol's connection_lost() method will (eventually) be called with None as its argument. """ raise NotImplementedError class Transport(ReadTransport, WriteTransport): """Interface representing a bidirectional transport. There may be several implementations, but typically, the user does not implement new transports; rather, the platform provides some useful transports that are implemented using the platform's best practices. The user never instantiates a transport directly; they call a utility function, passing it a protocol factory and other information necessary to create the transport and protocol. (E.g. EventLoop.create_connection() or EventLoop.create_server().) The utility function will asynchronously create a transport and a protocol and hook them up by calling the protocol's connection_made() method, passing it the transport. The implementation here raises NotImplemented for every method except writelines(), which calls write() in a loop. """
asyncio的Transport类继承自"ReadTransport",和"WriteTransport",在文档中有说是不需要自己直接实例化transport的,通过 EventLoop.create_connection() or EventLoop.create_server().
来传入protocol factory创建时,会异步创建一个transport和protocol, 并且将transport绑定到protocol的 connection_made()
方法。
BaseTransport的 get_extra_info
方法可以返回 socket 等对象。
3.2 cl-async
cl-async在创建tcp-server时直接指定read-cb, connect-cb, event-cb等。
cl-async tcp-server 相关代码如下:
(defun tcp-server (bind-address port read-cb &rest args) "Open a TCP connection asynchronously. Optionally send data out once connected via the :data keyword (can be a string or byte array)." (let ((event-cb-dep (car args))) (unless (or (keywordp event-cb-dep) (null event-cb-dep)) (push :event-cb args) (warn "Passing event-cb as the fourth argument to tcp-server is now deprecated. Please use the :event-cb keyword instead.")) (apply 'tcp-server-new bind-address port read-cb args))) (defun tcp-server-new (bind-address port read-cb &key event-cb connect-cb backlog stream fd) "Start a TCP listener on the current event loop. Returns a tcp-server class which can be closed with close-tcp-server" (socket-server 'tcp-server (list bind-address port) read-cb :event-cb event-cb :connect-cb connect-cb :backlog backlog :stream stream :fd fd)) (defun socket-server (server-class address read-cb &key event-cb connect-cb backlog stream fd) "Start a socket listener on the current event loop. Returns a socket-server instance which can be closed with close-socket-server" (check-event-loop-running) (let* ((server-instance (make-instance server-class :stream stream)) (server-c (socket-server-c server-instance)) (r-bind (socket-server-bind server-instance address fd)) (backlog (if (or (null backlog) (< backlog 0)) 128 backlog)) (r-listen (when (zerop r-bind) (uv:uv-listen server-c backlog (cffi:callback socket-accept-cb))))) ;; check that our listener instantiated properly (when (or (< r-bind 0) (< r-listen 0)) (unwind-protect (event-handler (if (zerop r-bind) r-listen r-bind) event-cb :throw t :streamish server-instance) (close-socket-server server-instance)) (return-from socket-server)) ;; make sure the server is closed/freed on exit (add-event-loop-exit-callback (lambda () (close-socket-server server-instance))) (attach-data-to-pointer server-c server-instance) ;; setup an accept error cb (save-callbacks server-c (list :read-cb read-cb :event-cb event-cb :connect-cb connect-cb)) ;; return the listener, which can be closed by the app if needed server-instance)) (defclass tcp-server (tcp-mixin socket-server) ()) (defclass socket-server () ((c :accessor socket-server-c :initarg :c :initform (cffi:null-pointer)) (closed :accessor socket-server-closed :initarg :closed :initform nil) (stream :accessor socket-server-stream :initarg :stream :initform nil)) (:documentation "Wraps around a connection listener."))
cl-async的 tcp-server
方法调用 tcp-server-new
方法,然后调用 socket-server
方法创建一个 tcp-server
实例, tcp-server
类是继承自 socket-server
.
函数 socker-server
的处理callback的地方在 (save-callbacks server-c (list :read-cb read-cb :event-cb event-cb :connect-cb connect-cb))
, 继续追踪下去不是看得太懂,而且涉及到C bindings, 留待以后考究(TODO)。
4 调用对比
4.1 Sanic
sanic 里通过自定义 HttpProtocol
, 定义了一些callback, 比如 data_received
时,用 httptools
这个http parser来解析内容等,一些方法 on_url
, on_message_complete
等是为 httptools
定义的parse callback.
Sanic 通过调用asyncio的创建TCPserver代码如下
# sanic/sanic/server.py class HttpProtocol(asyncio.Protocol): __slots__ = ('loop', 'transport', 'connections', 'signal', # event loop, connection 'parser', 'request', 'url', 'headers', # request params 'request_handler', 'request_timeout', 'request_max_size', # request config '_total_request_size', '_timeout_handler') # connection management def __init__(self, *, loop, request_handler, signal=Signal(), connections={}, request_timeout=60, request_max_size=None): self.loop = loop self.transport = None self.request = None self.parser = None self.url = None self.headers = None self.signal = signal self.connections = connections self.request_handler = request_handler self.request_timeout = request_timeout self.request_max_size = request_max_size self._total_request_size = 0 self._timeout_handler = None # -------------------------------------------- # # Connection # -------------------------------------------- # def connection_made(self, transport): self.connections[self] = True self._timeout_handler = self.loop.call_later(self.request_timeout, self.connection_timeout) self.transport = transport def connection_lost(self, exc): del self.connections[self] self._timeout_handler.cancel() self.cleanup() def connection_timeout(self): self.bail_out("Request timed out, connection closed") # -------------------------------------------- # # Parsing # -------------------------------------------- # def data_received(self, data): # Check for the request itself getting too large and exceeding memory limits self._total_request_size += len(data) if self._total_request_size > self.request_max_size: return self.bail_out("Request too large ({}), connection closed".format(self._total_request_size)) # Create parser if this is the first time we're receiving data if self.parser is None: assert self.request is None self.headers = [] self.parser = httptools.HttpRequestParser(self) # Parse request chunk or close connection try: self.parser.feed_data(data) except httptools.parser.errors.HttpParserError as e: self.bail_out("Invalid request data, connection closed ({})".format(e)) def on_url(self, url): self.url = url def on_header(self, name, value): if name == b'Content-Length' and int(value) > self.request_max_size: return self.bail_out("Request body too large ({}), connection closed".format(value)) self.headers.append((name.decode(), value.decode('utf-8'))) def on_headers_complete(self): self.request = Request( url_bytes=self.url, headers=dict(self.headers), version=self.parser.get_http_version(), method=self.parser.get_method().decode() ) def on_body(self, body): self.request.body = body def on_message_complete(self): self.loop.create_task(self.request_handler(self.request, self.write_response)) # -------------------------------------------- # # Responding # -------------------------------------------- # def write_response(self, response): try: keep_alive = self.parser.should_keep_alive() and not self.signal.stopped self.transport.write(response.output(self.request.version, keep_alive, self.request_timeout)) if not keep_alive: self.transport.close() else: self.cleanup() except Exception as e: self.bail_out("Writing request failed, connection closed {}".format(e)) def bail_out(self, message): log.error(message) self.transport.close() def cleanup(self): self.parser = None self.request = None self.url = None self.headers = None self._total_request_size = 0 def close_if_idle(self): """ Close the connection if a request is not being sent or received :return: boolean - True if closed, false if staying open """ if not self.parser: self.transport.close() return True return False def serve(host, port, request_handler, after_start=None, before_stop=None, debug=False, request_timeout=60, request_max_size=None): # Create Event Loop loop = async_loop.new_event_loop() asyncio.set_event_loop(loop) # I don't think we take advantage of this # And it slows everything waaayyy down # loop.set_debug(debug) connections = {} signal = Signal() server_coroutine = loop.create_server(lambda: HttpProtocol( loop=loop, connections=connections, signal=signal, request_handler=request_handler, request_timeout=request_timeout, request_max_size=request_max_size, ), host, port) try: http_server = loop.run_until_complete(server_coroutine) except OSError as e: log.error("Unable to start server: {}".format(e)) return except: log.exception("Unable to start server") return # Run the on_start function if provided if after_start: result = after_start(loop) if isawaitable(result): loop.run_until_complete(result) # Register signals for graceful termination for _signal in (SIGINT, SIGTERM): loop.add_signal_handler(_signal, loop.stop) try: loop.run_forever() finally: log.info("Stop requested, draining connections...") # Run the on_stop function if provided if before_stop: result = before_stop(loop) if isawaitable(result): loop.run_until_complete(result) # Wait for event loop to finish and all connections to drain http_server.close() loop.run_until_complete(http_server.wait_closed()) # Complete all tasks on the loop signal.stopped = True for connection in connections.keys(): connection.close_if_idle() while connections: loop.run_until_complete(asyncio.sleep(0.1)) loop.close() log.info("Server Stopped")
4.2 Wookie
wookie 里是自己定义了 read-data handle-connection, listener-event-handler
等方法,
read-data 方法 对应于 asyncio 里的 data_received
方法,不同的是它直接接受socket和 data作为参数
handle-connection 回调方法里,定义了header-callback,body-callback,finish-callback 回调给 httpparser "http-parse", 用来定制解析过程.
wookie 通过调用cl-async的创建TCPserver代码如下
(in-package :wookie) (defclass listener () ((bind :accessor listener-bind :initarg :bind :initform nil) (port :accessor listener-port :initarg :port :initform 80) (backlog :accessor listener-backlog :initarg :backlog :initform -1)) (:documentation "Describes an HTTP listener.")) (defun main-event-handler (event socket) "Handle socket events/conditions that crop up during processing." (let* ((socket-data (as:socket-data socket)) (request (getf socket-data :request)) (response (getf socket-data :response)) (handled nil)) ;; dispatch global errors (setf handled (dispatch-event event *wookie-error-handlers* *wookie-error-handler-class-precedence* event socket request response)) ;; dispatch request errors (when (and request (request-error-handlers request)) (setf handled (dispatch-event event (request-error-handlers request) (request-error-precedence request) event socket request response))) ;; if the event wasn't handled, try some default handling here (unless handled (handler-case (error event) (route-not-found () (when response (send-response response :status 404 :body "Route for that resource not found =[."))) (wookie-error () (when response (send-response response :status 500 :body (format nil "There was an error processing your request: ~a" event)))) (t () (format t "(ev) ~a~%" event)))))) (defun listener-event-handler (ev) "A wrapper around main-event-handler, useful for listeners to tie into." (let* ((event-type (type-of ev)) (sock (cond ((subtypep event-type 'response-error) (request-socket (response-request (response-error-response ev)))) ((subtypep event-type 'wookie-error) (wookie-error-socket ev)) ((subtypep event-type 'as:tcp-info) (as:tcp-socket ev))))) (funcall 'main-event-handler ev sock))) (defun handle-connection (sock) "Handles a new connection. Creates a bunch of closures that are passed into an http-parse parser which decide amongst themselves, during different points in the parsing, when to dispatch to the found router, when to send chunked content to the route, etc." ;; TODO pass client address info into :connect hook (run-hooks :connect) (let* ((http (make-instance 'http-parse:http-request)) (route-path nil) (route nil) ; holds the current route, filled in below once we get headers (route-dispatched nil) (request (make-instance 'request :socket sock :http http)) (response (make-instance 'response :request request))) (setf (as:socket-data sock) (list :request request :response response)) (labels ((dispatch-route () ;; dispatch the current route, but only if we haven't already done so (when route-dispatched (return-from dispatch-route)) (setf route-dispatched t) (run-hooks :pre-route request response) (if route (let ((route-fn (getf route :curried-route))) (funcall route-fn request response)) (progn (funcall 'main-event-handler (make-instance 'route-not-found :resource route-path :socket sock) sock) (return-from dispatch-route))) (run-hooks :post-route request response)) (header-callback (headers) ;; if we got the headers, it means we can find the route we're ;; destined to use. if the route accepts chunks and the body is ;; chunked, run the router now so it can set up chunk listening. ;; otherwise, save the route for later and let the rest of the ;; request come in. (let* ((method (http-parse:http-method http)) (resource (http-parse:http-resource http)) (parsed-uri (puri:parse-uri resource)) (path (puri:uri-path parsed-uri)) (found-route (find-route method path))) (setf route-path path) ;; save the parsed uri for plugins/later code (setf (request-uri request) parsed-uri (request-headers request) headers) (run-hooks :parsed-headers request) ;; set up some tracking/state values now that we have headers (setf route found-route (request-method request) method (request-resource request) resource) ;; handle "Expect: 100-continue" properly (when (string= (getf headers :expect) "100-continue") (if found-route (as:write-socket-data sock (format nil "HTTP/1.1 100 Continue~c~c~c~c" #\return #\newline #\return #\newline)) (progn (funcall 'main-event-handler (make-instance 'route-not-found :resource route-path :socket sock) sock) (return-from header-callback)))) ;; if we found a route, the route allows chunking, and we have ;; chunked data, call the route now so it can set up its chunk ;; handler before we start streaming the body chunks to it (when (and found-route (string= (getf headers :transfer-encoding) "chunked") (getf found-route :allow-chunking)) (dispatch-route)))) (body-callback (chunk finishedp) ;; forward the chunk to the callback provided in the chunk-enabled ;; router (run-hooks :body-chunk request chunk finishedp) (let ((request-body-cb (request-body-callback request))) (when request-body-cb (funcall request-body-cb chunk finishedp)))) (finish-callback () ;; make sure we always dispatch at the end. (run-hooks :body-complete request) (dispatch-route))) ;; make an HTTP parser. will be attached to the socket and will be ;; responsible for running all of the above callbacks directly as data ;; filters in from the read callback. (let ((parser (http-parse:make-parser http :header-callback #'header-callback :body-callback #'body-callback :finish-callback #'finish-callback :store-body t))) ;; attach parser to socket-data so we can deref it in the read callback (setf (getf (as:socket-data sock) :parser) parser))))) (defun read-data (sock data) "A simple read-cb handler that passed data to the HTTP parser attached to the socket the data is coming in on. The parser runs all necessary callbacks directly, so this function just blindly feeds the data in." ;; grab the parser stored in the socket and pipe the data into it (let ((parser (getf (as:socket-data sock) :parser))) (funcall parser data))) (defgeneric start-server (listener) (:documentation "Start Wookie with the given listener.")) (defmethod start-server ((listener listener)) ;; start the async server (as:tcp-server (listener-bind listener) (listener-port listener) 'read-data 'listener-event-handler :connect-cb 'handle-connection :backlog (listener-backlog listener)))