Published: 2018-01-01

Asyncio与cl-async创建TCPServer对比

Table of Contents

1 概述

本文比较python3的asyncio库和common lisp的cl-async这两个个类似的库在创建TCP server上的一点差异。

2 接口定义

python asyncio库 create_server 接口定义:

def create_server(protocol_factory, host=None, port=None,
                 *, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
                 sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None)

# => Create a TCP server (socket type SOCK_STREAM) bound to host and port.

common lisp cl-async tcp-server 接口定义:

(defun tcp-server (bind-address port read-cb event-cb
                   &key connect-cb (backlog -1) stream))
;; => tcp-server


3 分析

3.1 asyncio

相较于cl-async,python的asynio库引入了protocol和transport两个抽象概念。

3.1.1 Protocol

protocol是对callback based api的抽象。

asyncio 的protocol代码如下:

# file: cpython/Lib/asyncio/protocols.py

class BaseProtocol:
    """Common base class for protocol interfaces.

    Usually user implements protocols that derived from BaseProtocol
    like Protocol or ProcessProtocol.

    The only case when BaseProtocol should be implemented directly is
    write-only transport like write pipe
    """

    def connection_made(self, transport):
        """Called when a connection is made.

        The argument is the transport representing the pipe connection.
        To receive data, wait for data_received() calls.
        When the connection is closed, connection_lost() is called.
        """

    def connection_lost(self, exc):
        """Called when the connection is lost or closed.

        The argument is an exception object or None (the latter
        meaning a regular EOF is received or the connection was
        aborted or closed).
        """

    def pause_writing(self):
        """Called when the transport's buffer goes over the high-water mark.

        Pause and resume calls are paired -- pause_writing() is called
        once when the buffer goes strictly over the high-water mark
        (even if subsequent writes increases the buffer size even
        more), and eventually resume_writing() is called once when the
        buffer size reaches the low-water mark.

        Note that if the buffer size equals the high-water mark,
        pause_writing() is not called -- it must go strictly over.
        Conversely, resume_writing() is called when the buffer size is
        equal or lower than the low-water mark.  These end conditions
        are important to ensure that things go as expected when either
        mark is zero.

        NOTE: This is the only Protocol callback that is not called
        through EventLoop.call_soon() -- if it were, it would have no
        effect when it's most needed (when the app keeps writing
        without yielding until pause_writing() is called).
        """

    def resume_writing(self):
        """Called when the transport's buffer drains below the low-water mark.

        See pause_writing() for details.
        """


class Protocol(BaseProtocol):
    """Interface for stream protocol.

    The user should implement this interface.  They can inherit from
    this class but don't need to.  The implementations here do
    nothing (they don't raise exceptions).

    When the user wants to requests a transport, they pass a protocol
    factory to a utility function (e.g., EventLoop.create_connection()).

    When the connection is made successfully, connection_made() is
    called with a suitable transport object.  Then data_received()
    will be called 0 or more times with data (bytes) received from the
    transport; finally, connection_lost() will be called exactly once
    with either an exception object or None as an argument.

    State machine of calls:

      start -> CM [-> DR*] [-> ER?] -> CL -> end

    * CM: connection_made()
    * DR: data_received()
    * ER: eof_received()
    * CL: connection_lost()
    """

    def data_received(self, data):
        """Called when some data is received.

        The argument is a bytes object.
        """

    def eof_received(self):
        """Called when the other end calls write_eof() or equivalent.

        If this returns a false value (including None), the transport
        will close itself.  If it returns a true value, closing the
        transport is up to the protocol.
        """


asyncio的BaseProtocol类定义了 connection_made , connection_lost , pause_writing , resume_writing 回调, 继承BaseProtocol的Protocol类定义了 data_received, eof_received 方法, 可以看到BaseProtocol的 connection_made 方法接受一个"transport"作为参数, Protocol的 data_reveived 方法接受data作为参数。

3.1.2 Transport

Transport是对socket操作的抽象。

asyncio 的transport代码如下:

# file: cpython/Lib/asyncio/transports.py

class BaseTransport:
    """Base class for transports."""

    def __init__(self, extra=None):
        if extra is None:
            extra = {}
        self._extra = extra

    def get_extra_info(self, name, default=None):
        """Get optional transport information."""
        return self._extra.get(name, default)

    def is_closing(self):
        """Return True if the transport is closing or closed."""
        raise NotImplementedError

    def close(self):
        """Close the transport.

        Buffered data will be flushed asynchronously.  No more data
        will be received.  After all buffered data is flushed, the
        protocol's connection_lost() method will (eventually) called
        with None as its argument.
        """
        raise NotImplementedError

    def set_protocol(self, protocol):
        """Set a new protocol."""
        raise NotImplementedError

    def get_protocol(self):
        """Return the current protocol."""
        raise NotImplementedError


class ReadTransport(BaseTransport):
    """Interface for read-only transports."""

    def is_reading(self):
        """Return True if the transport is receiving."""
        raise NotImplementedError

    def pause_reading(self):
        """Pause the receiving end.

        No data will be passed to the protocol's data_received()
        method until resume_reading() is called.
        """
        raise NotImplementedError

    def resume_reading(self):
        """Resume the receiving end.

        Data received will once again be passed to the protocol's
        data_received() method.
        """
        raise NotImplementedError


class WriteTransport(BaseTransport):
    """Interface for write-only transports."""

    def set_write_buffer_limits(self, high=None, low=None):
        """Set the high- and low-water limits for write flow control.

        These two values control when to call the protocol's
        pause_writing() and resume_writing() methods.  If specified,
        the low-water limit must be less than or equal to the
        high-water limit.  Neither value can be negative.

        The defaults are implementation-specific.  If only the
        high-water limit is given, the low-water limit defaults to an
        implementation-specific value less than or equal to the
        high-water limit.  Setting high to zero forces low to zero as
        well, and causes pause_writing() to be called whenever the
        buffer becomes non-empty.  Setting low to zero causes
        resume_writing() to be called only once the buffer is empty.
        Use of zero for either limit is generally sub-optimal as it
        reduces opportunities for doing I/O and computation
        concurrently.
        """
        raise NotImplementedError

    def get_write_buffer_size(self):
        """Return the current size of the write buffer."""
        raise NotImplementedError

    def write(self, data):
        """Write some data bytes to the transport.

        This does not block; it buffers the data and arranges for it
        to be sent out asynchronously.
        """
        raise NotImplementedError

    def writelines(self, list_of_data):
        """Write a list (or any iterable) of data bytes to the transport.

        The default implementation concatenates the arguments and
        calls write() on the result.
        """
        data = b''.join(list_of_data)
        self.write(data)

    def write_eof(self):
        """Close the write end after flushing buffered data.

        (This is like typing ^D into a UNIX program reading from stdin.)

        Data may still be received.
        """
        raise NotImplementedError

    def can_write_eof(self):
        """Return True if this transport supports write_eof(), False if not."""
        raise NotImplementedError

    def abort(self):
        """Close the transport immediately.

        Buffered data will be lost.  No more data will be received.
        The protocol's connection_lost() method will (eventually) be
        called with None as its argument.
        """
        raise NotImplementedError

class Transport(ReadTransport, WriteTransport):
    """Interface representing a bidirectional transport.

    There may be several implementations, but typically, the user does
    not implement new transports; rather, the platform provides some
    useful transports that are implemented using the platform's best
    practices.

    The user never instantiates a transport directly; they call a
    utility function, passing it a protocol factory and other
    information necessary to create the transport and protocol.  (E.g.
    EventLoop.create_connection() or EventLoop.create_server().)

    The utility function will asynchronously create a transport and a
    protocol and hook them up by calling the protocol's
    connection_made() method, passing it the transport.

    The implementation here raises NotImplemented for every method
    except writelines(), which calls write() in a loop.
    """

asyncio的Transport类继承自"ReadTransport",和"WriteTransport",在文档中有说是不需要自己直接实例化transport的,通过 EventLoop.create_connection() or EventLoop.create_server(). 来传入protocol factory创建时,会异步创建一个transport和protocol, 并且将transport绑定到protocol的 connection_made() 方法。

BaseTransport的 get_extra_info 方法可以返回 socket 等对象。

3.2 cl-async

cl-async在创建tcp-server时直接指定read-cb, connect-cb, event-cb等。

cl-async tcp-server 相关代码如下:

(defun tcp-server (bind-address port read-cb &rest args)
  "Open a TCP connection asynchronously. Optionally send data out once connected
   via the :data keyword (can be a string or byte array)."
  (let ((event-cb-dep (car args)))
    (unless (or (keywordp event-cb-dep)
                (null event-cb-dep))
      (push :event-cb args)
      (warn "Passing event-cb as the fourth argument to tcp-server is now deprecated. Please use the :event-cb keyword instead."))
    (apply 'tcp-server-new
           bind-address port read-cb
           args)))

(defun tcp-server-new (bind-address port read-cb &key event-cb connect-cb backlog stream fd)
  "Start a TCP listener on the current event loop. Returns a tcp-server class
   which can be closed with close-tcp-server"
  (socket-server 'tcp-server
                 (list bind-address port) read-cb
                 :event-cb event-cb
                 :connect-cb connect-cb
                 :backlog backlog
                 :stream stream
                 :fd fd))

(defun socket-server (server-class address read-cb &key event-cb connect-cb backlog stream fd)
  "Start a socket listener on the current event loop. Returns a socket-server instance
   which can be closed with close-socket-server"
  (check-event-loop-running)
  (let* ((server-instance (make-instance server-class :stream stream))
         (server-c (socket-server-c server-instance))
         (r-bind (socket-server-bind server-instance address fd))
         (backlog (if (or (null backlog)
                          (< backlog 0))
                      128
                      backlog))
         (r-listen (when (zerop r-bind)
                     (uv:uv-listen server-c backlog (cffi:callback socket-accept-cb)))))
    ;; check that our listener instantiated properly
    (when (or (< r-bind 0)
              (< r-listen 0))
      (unwind-protect
           (event-handler (if (zerop r-bind) r-listen r-bind) event-cb :throw t :streamish server-instance)
        (close-socket-server server-instance))
      (return-from socket-server))
    ;; make sure the server is closed/freed on exit
    (add-event-loop-exit-callback (lambda ()
                                    (close-socket-server server-instance)))
    (attach-data-to-pointer server-c server-instance)
    ;; setup an accept error cb
    (save-callbacks server-c (list :read-cb read-cb :event-cb event-cb :connect-cb connect-cb))
    ;; return the listener, which can be closed by the app if needed
    server-instance))

(defclass tcp-server (tcp-mixin socket-server) ())


(defclass socket-server ()
  ((c :accessor socket-server-c :initarg :c :initform (cffi:null-pointer))
   (closed :accessor socket-server-closed :initarg :closed :initform nil)
   (stream :accessor socket-server-stream :initarg :stream :initform nil))
  (:documentation "Wraps around a connection listener."))

cl-async的 tcp-server 方法调用 tcp-server-new 方法,然后调用 socket-server 方法创建一个 tcp-server 实例, tcp-server 类是继承自 socket-server . 函数 socker-server 的处理callback的地方在 (save-callbacks server-c (list :read-cb read-cb :event-cb event-cb :connect-cb connect-cb)), 继续追踪下去不是看得太懂,而且涉及到C bindings, 留待以后考究(TODO)。

4 调用对比

这里参考Python和common lisp 的异步web server sanicwookie 里创建 tcp server的代码

4.1 Sanic

sanic 里通过自定义 HttpProtocol , 定义了一些callback, 比如 data_received 时,用 httptools 这个http parser来解析内容等,一些方法 on_url, on_message_complete 等是为 httptools 定义的parse callback.

Sanic 通过调用asyncio的创建TCPserver代码如下

# sanic/sanic/server.py

class HttpProtocol(asyncio.Protocol):
    __slots__ = ('loop', 'transport', 'connections', 'signal',  # event loop, connection
                 'parser', 'request', 'url', 'headers',  # request params
                 'request_handler', 'request_timeout', 'request_max_size',  # request config
                 '_total_request_size', '_timeout_handler')  # connection management

    def __init__(self, *, loop, request_handler, signal=Signal(), connections={}, request_timeout=60,
                 request_max_size=None):
        self.loop = loop
        self.transport = None
        self.request = None
        self.parser = None
        self.url = None
        self.headers = None
        self.signal = signal
        self.connections = connections
        self.request_handler = request_handler
        self.request_timeout = request_timeout
        self.request_max_size = request_max_size
        self._total_request_size = 0
        self._timeout_handler = None

        # -------------------------------------------- #

    # Connection
    # -------------------------------------------- #

    def connection_made(self, transport):
        self.connections[self] = True
        self._timeout_handler = self.loop.call_later(self.request_timeout, self.connection_timeout)
        self.transport = transport

    def connection_lost(self, exc):
        del self.connections[self]
        self._timeout_handler.cancel()
        self.cleanup()

    def connection_timeout(self):
        self.bail_out("Request timed out, connection closed")

        # -------------------------------------------- #

    # Parsing
    # -------------------------------------------- #

    def data_received(self, data):
        # Check for the request itself getting too large and exceeding memory limits
        self._total_request_size += len(data)
        if self._total_request_size > self.request_max_size:
            return self.bail_out("Request too large ({}), connection closed".format(self._total_request_size))

        # Create parser if this is the first time we're receiving data
        if self.parser is None:
            assert self.request is None
            self.headers = []
            self.parser = httptools.HttpRequestParser(self)

        # Parse request chunk or close connection
        try:
            self.parser.feed_data(data)
        except httptools.parser.errors.HttpParserError as e:
            self.bail_out("Invalid request data, connection closed ({})".format(e))

    def on_url(self, url):
        self.url = url

    def on_header(self, name, value):
        if name == b'Content-Length' and int(value) > self.request_max_size:
            return self.bail_out("Request body too large ({}), connection closed".format(value))

        self.headers.append((name.decode(), value.decode('utf-8')))

    def on_headers_complete(self):
        self.request = Request(
            url_bytes=self.url,
            headers=dict(self.headers),
            version=self.parser.get_http_version(),
            method=self.parser.get_method().decode()
        )

    def on_body(self, body):
        self.request.body = body

    def on_message_complete(self):
        self.loop.create_task(self.request_handler(self.request, self.write_response))

    # -------------------------------------------- #
    # Responding
    # -------------------------------------------- #

    def write_response(self, response):
        try:
            keep_alive = self.parser.should_keep_alive() and not self.signal.stopped
            self.transport.write(response.output(self.request.version, keep_alive, self.request_timeout))
            if not keep_alive:
                self.transport.close()
            else:
                self.cleanup()
        except Exception as e:
            self.bail_out("Writing request failed, connection closed {}".format(e))

    def bail_out(self, message):
        log.error(message)
        self.transport.close()

    def cleanup(self):
        self.parser = None
        self.request = None
        self.url = None
        self.headers = None
        self._total_request_size = 0

    def close_if_idle(self):
        """
        Close the connection if a request is not being sent or received
        :return: boolean - True if closed, false if staying open
        """
        if not self.parser:
            self.transport.close()
            return True
        return False


def serve(host, port, request_handler, after_start=None, before_stop=None, debug=False, request_timeout=60,
          request_max_size=None):
    # Create Event Loop
    loop = async_loop.new_event_loop()
    asyncio.set_event_loop(loop)
    # I don't think we take advantage of this
    # And it slows everything waaayyy down
    # loop.set_debug(debug)

    connections = {}
    signal = Signal()
    server_coroutine = loop.create_server(lambda: HttpProtocol(
        loop=loop,
        connections=connections,
        signal=signal,
        request_handler=request_handler,
        request_timeout=request_timeout,
        request_max_size=request_max_size,
    ), host, port)
    try:
        http_server = loop.run_until_complete(server_coroutine)
    except OSError as e:
        log.error("Unable to start server: {}".format(e))
        return
    except:
        log.exception("Unable to start server")
        return

    # Run the on_start function if provided
    if after_start:
        result = after_start(loop)
        if isawaitable(result):
            loop.run_until_complete(result)

    # Register signals for graceful termination
    for _signal in (SIGINT, SIGTERM):
        loop.add_signal_handler(_signal, loop.stop)

    try:
        loop.run_forever()
    finally:
        log.info("Stop requested, draining connections...")

        # Run the on_stop function if provided
        if before_stop:
            result = before_stop(loop)
            if isawaitable(result):
                loop.run_until_complete(result)

        # Wait for event loop to finish and all connections to drain
        http_server.close()
        loop.run_until_complete(http_server.wait_closed())

        # Complete all tasks on the loop
        signal.stopped = True
        for connection in connections.keys():
            connection.close_if_idle()

        while connections:
            loop.run_until_complete(asyncio.sleep(0.1))

        loop.close()
        log.info("Server Stopped")

4.2 Wookie

wookie 里是自己定义了 read-data handle-connection, listener-event-handler 等方法, read-data 方法 对应于 asyncio 里的 data_received 方法,不同的是它直接接受socket和 data作为参数 handle-connection 回调方法里,定义了header-callback,body-callback,finish-callback 回调给 httpparser "http-parse", 用来定制解析过程.

wookie 通过调用cl-async的创建TCPserver代码如下

(in-package :wookie)

(defclass listener ()
  ((bind :accessor listener-bind :initarg :bind :initform nil)
   (port :accessor listener-port :initarg :port :initform 80)
   (backlog :accessor listener-backlog :initarg :backlog :initform -1))
  (:documentation "Describes an HTTP listener."))

(defun main-event-handler (event socket)
  "Handle socket events/conditions that crop up during processing."
  (let* ((socket-data (as:socket-data socket))
         (request (getf socket-data :request))
         (response (getf socket-data :response))
         (handled nil))
    ;; dispatch global errors
    (setf handled (dispatch-event event
                                  *wookie-error-handlers*
                                  *wookie-error-handler-class-precedence*
                                  event socket request response))

    ;; dispatch request errors
    (when (and request (request-error-handlers request))
      (setf handled (dispatch-event event
                                    (request-error-handlers request)
                                    (request-error-precedence request)
                                    event socket request response)))

    ;; if the event wasn't handled, try some default handling here
    (unless handled
      (handler-case (error event)
        (route-not-found ()
          (when response
            (send-response response :status 404 :body "Route for that resource not found =[.")))
        (wookie-error ()
          (when response
            (send-response response
                           :status 500
                           :body (format nil "There was an error processing your request: ~a" event))))
        (t ()
          (format t "(ev) ~a~%" event))))))

(defun listener-event-handler (ev)
  "A wrapper around main-event-handler, useful for listeners to tie into."
  (let* ((event-type (type-of ev))
         (sock (cond ((subtypep event-type 'response-error)
                      (request-socket (response-request (response-error-response ev))))
                     ((subtypep event-type 'wookie-error)
                      (wookie-error-socket ev))
                     ((subtypep event-type 'as:tcp-info)
                      (as:tcp-socket ev)))))
    (funcall 'main-event-handler ev sock)))

(defun handle-connection (sock)
  "Handles a new connection. Creates a bunch of closures that are passed into an
   http-parse parser which decide amongst themselves, during different points in
   the parsing, when to dispatch to the found router, when to send chunked
   content to the route, etc."
  ;; TODO pass client address info into :connect hook
  (run-hooks :connect)
  (let* ((http (make-instance 'http-parse:http-request))
         (route-path nil)
         (route nil)  ; holds the current route, filled in below once we get headers
         (route-dispatched nil)
         (request (make-instance 'request :socket sock :http http))
         (response (make-instance 'response :request request)))
    (setf (as:socket-data sock) (list :request request :response response))
    (labels ((dispatch-route ()
               ;; dispatch the current route, but only if we haven't already done so
               (when route-dispatched
                 (return-from dispatch-route))
               (setf route-dispatched t)
               (run-hooks :pre-route request response)
               (if route
                   (let ((route-fn (getf route :curried-route)))
                     (funcall route-fn request response))
                   (progn
                     (funcall 'main-event-handler (make-instance 'route-not-found :resource route-path :socket sock)
                                                  sock)
                     (return-from dispatch-route)))
               (run-hooks :post-route request response))
             (header-callback (headers)
               ;; if we got the headers, it means we can find the route we're
               ;; destined to use. if the route accepts chunks and the body is
               ;; chunked, run the router now so it can set up chunk listening.
               ;; otherwise, save the route for later and let the rest of the
               ;; request come in.
               (let* ((method (http-parse:http-method http))
                      (resource (http-parse:http-resource http))
                      (parsed-uri (puri:parse-uri resource))
                      (path (puri:uri-path parsed-uri))
                      (found-route (find-route method path)))
                 (setf route-path path)
                 ;; save the parsed uri for plugins/later code
                 (setf (request-uri request) parsed-uri
                       (request-headers request) headers)
                 (run-hooks :parsed-headers request)
                 ;; set up some tracking/state values now that we have headers
                 (setf route found-route
                       (request-method request) method
                       (request-resource request) resource)
                 ;; handle "Expect: 100-continue" properly
                 (when (string= (getf headers :expect) "100-continue")
                   (if found-route
                       (as:write-socket-data sock (format nil "HTTP/1.1 100 Continue~c~c~c~c"
                                                          #\return #\newline #\return #\newline))

                       (progn
                         (funcall 'main-event-handler (make-instance 'route-not-found :resource route-path :socket sock)
                                                      sock)
                         (return-from header-callback))))
                 ;; if we found a route, the route allows chunking, and we have
                 ;; chunked data, call the route now so it can set up its chunk
                 ;; handler before we start streaming the body chunks to it
                 (when (and found-route
                            (string= (getf headers :transfer-encoding) "chunked")
                            (getf found-route :allow-chunking))
                   (dispatch-route))))
             (body-callback (chunk finishedp)
               ;; forward the chunk to the callback provided in the chunk-enabled
               ;; router
               (run-hooks :body-chunk request chunk finishedp)
               (let ((request-body-cb (request-body-callback request)))
                 (when request-body-cb
                   (funcall request-body-cb chunk finishedp))))
             (finish-callback ()
               ;; make sure we always dispatch at the end.
               (run-hooks :body-complete request)
               (dispatch-route)))
      ;; make an HTTP parser. will be attached to the socket and will be
      ;; responsible for running all of the above callbacks directly as data
      ;; filters in from the read callback.
      (let ((parser (http-parse:make-parser
                      http
                      :header-callback #'header-callback
                      :body-callback #'body-callback
                      :finish-callback #'finish-callback
                      :store-body t)))
        ;; attach parser to socket-data so we can deref it in the read callback
        (setf (getf (as:socket-data sock) :parser) parser)))))

(defun read-data (sock data)
  "A simple read-cb handler that passed data to the HTTP parser attached to the
   socket the data is coming in on. The parser runs all necessary callbacks
   directly, so this function just blindly feeds the data in."
  ;; grab the parser stored in the socket and pipe the data into it
  (let ((parser (getf (as:socket-data sock) :parser)))
    (funcall parser data)))

(defgeneric start-server (listener)
  (:documentation
    "Start Wookie with the given listener."))

(defmethod start-server ((listener listener))
  ;; start the async server
  (as:tcp-server (listener-bind listener) (listener-port listener)
    'read-data
    'listener-event-handler
    :connect-cb 'handle-connection
    :backlog (listener-backlog listener)))

Author: Nisen

Email: imnisen@163.com