Like regular functions, coroutines have a final return value. The coroutine may be ended with a
``return`` statement, which causes its return value to be None, but Python does not allow
``return`` with an argument inside a generator. Instead, to specify its return value, the
-coroutine should raise a `StopIteration` with its return value as an argument::
+coroutine should raise a ``StopIteration`` with its return value as an argument::
raise StopIteration(value)
self.bound_coro = None
def __call__(self, value=None):
- """Cause `value` to be the return value of the WaitCondition."""
+ """Cause ``value`` to be the return value of the WaitCondition."""
assert self.bound_coro
self.bound_coro.resume(value)
self.bound_coro = None
def throw(self, exc=None):
"""Raise an Exception in the bound coroutine.
- If `exc` is None, ``sys.exc_info()`` will be raised instead.
+ If ``exc`` is None, ``sys.exc_info()`` will be raised instead.
"""
if exc is None:
self.bound_coro = None
def __call__(self, *args):
- """Cause `args` to be the return value of the WaitCondition."""
+ """Cause ``args`` to be the return value of the WaitCondition."""
assert self.bound_coro
self.bound_coro.resume(args)
self.bound_coro = None
def throw(self, exc=None):
"""Raise an Exception in the bound coroutine.
- If `exc` is None, ``sys.exc_info()`` will be raised instead.
+ If ``exc`` is None, ``sys.exc_info()`` will be raised instead.
"""
if exc is None:
giving access to a server may be protected by a CoroutineMutex to ensure that multiple
transactions are not started at once.
- CoroutineMutex objects have one important method, `acquire()`. This returns a `WaitCondition`,
+ CoroutineMutex objects have one important method, `acquire`. This returns a `WaitCondition`,
which will resume the coroutine once the mutex is available. The WaitCondition will result
in a context manager, as specified in PEP 342, which should immediately be passed to a ``with``
statement::
"""
Forcefully stop running this coroutine.
- If the coroutine is not in `STATE_RUNNING` or `STATE_SUSPENDED`, this does nothing.
+ If the coroutine is not in ``STATE_RUNNING`` or ``STATE_SUSPENDED``, this does nothing.
A suspended coroutine will have its current wait condition unbound; its completion
callback will then be called with a CoroutineKilledException.
"""
A separate helper coroutine, the `ThreadPoolWatcher`, is responsible for returning results
to the main thread. The watcher is initialized from `run_in_thread` the first time the thread
- pool is used. It creates a socketpair, and uses the `Reactor`'s socket event handling to wait on
+ pool is used. It creates a socketpair, and uses the reactor's socket event handling to wait on
the read end. Whenever a worker thread pushes a result onto the output queue, it writes a single
byte to the watcher's write socket. This will awaken the `ThreadPoolWatcher` coroutine in the
main thread, which ``recv``-s as many bytes as are available, and pops and dispatches that many
-"""Network event handling."""
+"""
+Network event handling.
+
+Chiral's main event loop is provided by the `Reactor` class. After program initialization,
+the Reactor is responsible for determining what internal (timer) and external (socket activity)
+events have occured. The event loop is as such:
+
+1. Determine when the next scheduled event should happen.
+2. Identify which sockets have coroutines waiting on them.
+3. Perform a system call that waits for socket activity or a timeout, whichever comes first.
+4. Dispatch all incoming socket events.
+5. Dispatch all timer events that are ready to run.
+
+These steps are performed by `Reactor._run_once`. The main `Reactor.run` function simply calls
+``_run_once`` until it indicates that there are no more events to process.
+
+Step (3) is traditionally performed by the ``select()`` system call. However, ``select()`` requires
+that the set of "interesting" sockets be passed in to each call, and so scales O(n) as the number of
+sockets increases. Due to its internal bitfield data structure, it is also generally limited to 1024
+simultaneous file descriptors. As such, various platform-specific calls like ``epoll()`` (Linux 2.6+)
+and ``kqueue`` (FreeBSD 4.1+, Mac OS X 10.3+) have been introduced. These have the key advantage that
+step (2) is performed implicitly, when it is determined that the socket is "interesting", and not at
+each event loop. They therefore scale O(1) with respect to the number of open, idle sockets.
+
+Python does not include ``epoll()`` or ``kqueue()`` in its standard library, so ctypes-based bindings
+are provided in `chiral.os`. When `chiral.net.netcore` is loaded, it automatically checks for the
+availability of the platform-specific Reactor classes, and falls back to `SelectReactor` if they are not
+available.
+
+Chiral automatically instantiates a `Reactor` instance and makes it available as ``chiral.net.reactor``.
+Users should not create new Reactors themselves.
+
+See the documentation for the `Reactor` class for information on its specific methods.
+"""
# Chiral, copyright (c) 2007 Jacob Potter
# This program is free software; you can redistribute it and/or modify
self._close_list = weakref.WeakValueDictionary()
def close_on_exit(self, sock):
- """Add sock to a list of sockets to be closed when the reactor terminates."""
+ """Add `sock` to a list of sockets to be closed when the reactor terminates."""
self._close_list[id(sock)] = sock
def _handle_scheduled_events(self):
- """Handle any internally scheduled events."""
+ """
+ Handle any internally scheduled events.
+
+ This should only be called by `Reactor._run_once`.
+ """
while len(self._events) > 0:
next_event_time, next_event_cb = self._events[0][:2]
"""
Run one iteration of the main event handling loop.
- This should be overridden in a derived class.
+ This should only be called by `Reactor.run`.
"""
raise NotImplementedError
"""
Return a WaitCondition that will fire at some point in the future.
- The "time" parameter, if given, should be a datetime.datetime object or UNIX
- timestamp; "delay" may be either a number of seconds or a datetime.timedelta.
- time in seconds or a datetime.timedelta.
+ If both ``time`` and ``delay`` are None, the WaitCondition will fire as soon as
+ possible, during the next reactor loop. Since the reactor handles socket events
+ before scheduled calls, one can yield control to the Reactor to handle incoming
+ events (i.e. during a potentially CPU-intensive operation) with::
+
+ yield reactor.schedule()
- If both time and delay are None, the WaitCondition will fire as soon as
- possible, during the next reactor loop.
+ :param callbacktime: An absolute time or UNIX timestamp.
+ :type callbacktime: datetime.datetime, int, float
+ :param delay: A timedelta object or relative number of seconds.
+ :type delay: datetime.timedelta, int, float
"""
now = time.time()
return callback
+ def wait_for_readable(self, sock):
+ """Return a WaitCondition for readability on a socket.
+
+ When the resultant WaitCondition is yielded, ``sock`` will be added to the list of
+ sockets of interest for the next event loop. If `WaitCondition.unbind` is called
+ (due to the yielding coroutine being killed, for example), the socket will be automatically
+ removed from the list.
+
+ """
+ raise NotImplementedError
+
+ def wait_for_writeable(self, sock):
+ """
+ Return a WaitCondition for writeability on a socket.
+
+ This behaves analogously to `wait_for_readable`.
+ """
+ raise NotImplementedError
+
def time_to_next_event(self):
"""Return the time, in seconds, until the next scheduled event."""
if len(self._events) > 0:
def wait_for_readable(self, sock):
- """Return a WaitCondition for readability on sock."""
+ """Return a WaitCondition for readability on ``sock``."""
return self.WaitForEvent(sock, self, self._read_sockets)
def wait_for_writeable(self, sock):
- """Return a WaitCondition for writeability on sock."""
+ """Return a WaitCondition for writeability on ``sock``."""
return self.WaitForEvent(sock, self, self._write_sockets)
def _run_once(self):
return "<EpollReactor.WaitForEvent: fd %r>" % (self.sock.fileno(), )
def wait_for_readable(self, sock):
- """Return a WaitCondition for readability on sock."""
+ """Return a WaitCondition for readability on ``sock``."""
return self.WaitForEvent(sock, self, epoll.EPOLLIN)
def wait_for_writeable(self, sock):
- """Return a WaitCondition for writeability on sock."""
+ """Return a WaitCondition for writeability on ``sock``."""
return self.WaitForEvent(sock, self, epoll.EPOLLOUT)
def _run_once(self):
return "<KqueueReactor.WaitForEvent: fd %r>" % (self.sock.fileno(), )
def wait_for_readable(self, sock):
- """Return a WaitCondition for readability on sock."""
+ """Return a WaitCondition for readability on ``sock``."""
return self.WaitForEvent(sock, self, kqueue.EVFILT_READ)
def wait_for_writeable(self, sock):
- """Return a WaitCondition for writeability on sock."""
+ """Return a WaitCondition for writeability on ``sock``."""
return self.WaitForEvent(sock, self, kqueue.EVFILT_WRITE)
def _run_once(self):
-"""TCP connection handling classes."""
+"""
+TCP connection handling classes.
+
+Rather than manually setting sockets nonblocking and calling `Reactor.wait_for_readable`
+and `Reactor.wait_for_writable` directly, the `TCPConnection` and `TCPServer` classes
+are provided for higher-level nonblocking connection handling. See their documentation for
+details and examples.
+"""
# Chiral, copyright (c) 2007 Jacob Potter
# This program is free software; you can redistribute it and/or modify
class TCPConnection(coroutine.Coroutine):
"""
Provides basic interface for TCP connections.
+
+ This can be used directly as a client, or subclassed and instantiated by `TCPServer`.
+ It provides higher-level utility functions to open and close the connection, read lines
+ or exact number of bytes, and send from strings or files.
+
+ The `read_line`, `read_exactly`, and `recv` functions use an internal buffer to store data
+ after it is read. This is intended to be transparent; however, users should avoid mixing the
+ ``TCPConnection`` helper functions with direct socket acces.
"""
def connection_handler(self):
Main event processing loop.
The connection_handler() method will be run as a Coroutine when the TCPConnection
- is initialized. It should be overridden in the derived class.
+ is initialized. If the TCPConnection is going to be created from a `TCPServer`, then
+ this function should be overridden in the derived class.
"""
raise NotImplementedError
yield
Completion callback.
This will be run as a completion callback when the connection handler
- terminates; see ``chiral.core.coroutine.Coroutine.add_completion_callback()``.
+ terminates; see `Coroutine.add_completion_callback`.
- By default, this swallows ConnectionClosedExceptions, and closes the connection.
+ By default, this swallows `ConnectionClosedException`, and closes the connection.
It may be overridden in a derived class.
"""
if exception:
- _exc_type, exc_value, _exc_traceback = exception
- if isinstance(exc_value, ConnectionClosedException):
+ if isinstance(exception[0], ConnectionClosedException):
return (None, None)
if self.remote_sock is not None:
self.close()
def close(self):
- """
- Call self.close() on a connection to perform a clean shutdown.
- """
+ """Perform a clean shutdown."""
if self.remote_sock is not None:
self.remote_sock.close()
self.remote_sock = None
@coroutine.as_coro
def _read_line_coro(self, max_len, delimiter):
- """Helper coroutine created by read_line if data is not immediately available."""
+ """Helper coroutine created by `read_line` if data is not immediately available."""
while True:
# Wait for the socket to be readable
yield reactor.wait_for_readable(self.remote_sock)
@coroutine.returns_waitcondition
def read_line(self, max_len = 1024, delimiter = "\r\n"):
- """
- Read a line (delimited by any member of the "delimiters" tuple) from
- the client. If more than max_length characters are read before a
- delimiter is found, a ConnectionOverflowException will be raised.
+ """Read a line from the client.
+
+ :param max_len:
+ If more than ``max_len`` characters are read before ``delimiter`` is
+ found, a ConnectionOverflowException will be raised.
+ :param delimiter:
+ End-of-line character or sequence. This will not be included in the returned line.
"""
# Check if the delimiter is already in the buffer.
@coroutine.as_coro
def read_exactly(self, length, read_increment = 32768):
"""
- Read and return exactly length bytes.
+ Read and return exactly ``length`` bytes.
- If length is less than or equal to read_increment, then only length octets
- will be read from the socket; otherwise, data will be read read_increment
- octets at a time.
+ :param length: Number of bytes to return.
+ :param read_increment: Number of bytes to low-level read from the socket at a time.
"""
# If we have enough bytes already, just return them
def recv(self, buflen):
"""
Read data from the socket.
+
+ This behaves analogously to the ``recv`` system call, but will read data from
+ the internal buffer if available.
"""
+
+ if self._buffer:
+ if len(self._buffer) > buflen:
+ out = self._buffer[:buflen]
+ self._buffer = self._buffer[buflen:]
+ else:
+ out = self._buffer
+ self._buffer = ""
+
+ raise StopIteration(out)
+
while True:
# Try reading the data.
try:
@coroutine.as_coro
def _sendall_coro(self, data):
- """Helper coroutine created by sendall if not all data could be sent."""
+ """Helper coroutine created by `sendall` if not all data could be sent."""
while data:
yield reactor.wait_for_readable(self)
@coroutine.returns_waitcondition
def sendall(self, data):
"""
- Send all of data to the socket. The send() method and underlying system
- call are not guaranteed to write all the supplied data; sendall() will
- loop if necessary until all data is written.
+ Send all of ``data`` to the socket.
+
+ The `send` method and underlying system call are not guaranteed to write
+ all the supplied data; ``sendall`` will loop if necessary until all data is written.
"""
# Try writing the data.
@coroutine.as_coro
def send(self, data):
"""
- Send data, and return the number of bytes actually sent. Note that the
- send() system call does not guarantee that all of data will actually be
- sent; in most cases, sendall() should be used.
+ Send data, and return the number of bytes actually sent.
+
+ This behaves analogously to the ``send`` system call. Note that ``send``
+ does not guarantee that all of ``data`` will actually be sent; in most cases,
+ sendall() should be used instead.
"""
while True:
# Try writing the data.
def sendfile(self, infile, offset, length):
"""
Send up to len bytes of data from infile, starting at offset.
- Returns the amount actually written, which may be less than
- all the data given. Use sendall() if all the data must be sent.
"""
if not _SENDFILE_AVAILABLE:
Connect or reconnect to the remote server.
If this TCPConnection was created by passing an existing socket object to __init__,
- then connect() cannot be used and will raise RuntimeError.
+ then ``connect`` cannot be used and will raise RuntimeError.
- Otherwise, the TCPConnection must be connected with connect() before it can be used,
+ Otherwise, the TCPConnection must be connected with ``connect`` before it can be used,
and may be reconnected after any method raises a ConnectionClosedException.
"""
If the corresponding socket has already been created and connected, i.e. by a
TCPServer calling `socket.accept()`, then it should be passed in as ``sock``.
Otherwise, a new socket is created. The TCPConnection is then in an
- unconnected state; to connect to remote_addr, call `connect()` on the
- TCPConnection.
+ unconnected state; to connect to remote_addr, call `connect`.
"""
self.remote_addr = remote_addr
# Set the socket nonblocking. Socket objects have some magic that
# pylint doesn't grok, so suppress its "no setblocking member" warning.
+
self.remote_sock.setblocking(0) # pylint: disable-msg=E1101
self._buffer = ""
socket which listens on a TCP port and accepts connections;
each connection is tracked and closed when necessary.
- The connection_class attribute sets the class that will be created for
- new connections; it should be derived from TCPConnection.
+ The ``connection_class`` attribute sets the class that will be created for
+ new connections; it should be derived from `TCPConnection`.
"""
connection_class = TCPConnection