Source code for theia.comm

"""
----------
theia.comm
----------

Theia communication module.

Defines classes for building theia servers and clients.

Theia communication module is asynchronous and is built on top of asyncio loop.

There are two main interfaces:

* ``Server`` an async server handling and managing WebSocket connections from multiple clients.
* ``Client`` an async client connection to a theia server.

The interfaces are designed to work primarily with theia events and are thread-safe.
"""

import asyncio
import json
from logging import getLogger
import websockets
from theia.model import EventSerializer


log = getLogger(__name__)


_WAIT_TIME = 0.1


[docs]class Client: """Client represents a client connection to a theia server. :param loop: :mod:`asyncio` EventLoop to use for this client. :param host: ``str``, theia server hostname. :param port: ``int``, theia server port. :param secure: ``bool``, is the connection secure. :param path: ``str``, the request path - for example: ``"/live"``, ``"/events"`` etc. :param recv: ``function``, receive handler. Called when a message is received from the server. The handler has the following signature: .. code-block:: python def handler(message): pass where: * ``message`` is the message received from the theia server. """ def __init__(self, loop, host, port, secure=False, path=None, recv=None): self.loop = loop self.host = host self.port = port self.secure = secure self.path = path self.recv_handler = recv self.serializer = EventSerializer() self.websocket = None self._is_open = False self._close_handlers = [] async def _open_websocket(self): websocket = await websockets.connect(self._get_ws_url(), loop=self.loop) self.websocket = websocket self._is_open = True asyncio.ensure_future(self._recv(), loop=self.loop) log.debug('[%s:%d]: connected', self.host, self.port)
[docs] def connect(self): """Connect to the remote server. """ self.loop.run_until_complete(self._open_websocket())
[docs] def close(self, reason=None): """Close the connection to the remote server. :param reason: ``str``, the reason for disconnecting. If not given, a default ``"normal close"`` is sent to the server. """ reason = reason or 'normal close' self._is_open = False self.websocket.close(code=1000, reason=reason) log.debug('[%s:%d]: explicitly closed. Reason=%s', self.host, self.port, reason)
def _get_ws_url(self): url = 'wss://' if self.secure else 'ws://' url += self.host if self.port: url += ':' + str(self.port) if self.path: if self.path.startswith('/'): url += self.path else: url += '/' + self.path return url
[docs] def send(self, message): """Send a ``str`` message to the remote server. :param message: ``str``, the message to be sent to the remote server. :returns: the :class:`asyncio.Handle` to the scheduled task for sending the actual data. """ return self.loop.call_soon_threadsafe(self._call_send, message)
def _call_send(self, message): asyncio.ensure_future(self.websocket.send(message), loop=self.loop)
[docs] def send_event(self, event): """Send an event to the remote server. Serializes, then sends the serialized content to the remote server. :param event: :class:`theia.model.Event`, the event to be send. :returns: the :class:`asyncio.Handle` to the scheduled task for sending the actual data. """ message = self.serializer.serialize(event) return self.send(message)
async def _recv(self): while self._is_open: try: message = await self.websocket.recv() await self._process_message(message) except websockets.ConnectionClosed as wse: self._closed(wse.code, wse.reason) log.debug('[%s:%d] connection closed', self.host, self.port) # pylint: disable=broad-except # General case except Exception as e: log.exception(e) self._closed(1006, reason=str(e)) async def _process_message(self, message): if self.recv_handler: self.recv_handler(message)
[docs] def on_close(self, handler): """Add close handler. The handles is called when the client connection is closed either by the client or by the server. :param handler: ``function``, the handler callback. The callback prototype looks like so: .. code-block:: python def callback(websocket, code, reason): pass where: * ``websocket`` :class:`websockets.WebSocketClientProtocol` is the underlying websocket. * ``code`` ``int`` is the code received when the connection was closed. Check out the `WebSocket specification`_ for the list of codes and their meaning. * ``reason`` ``str`` is the reason for closing the connection. .. _WebSocket specification: https://tools.ietf.org/html/rfc6455#section-7.4 """ self._close_handlers.append(handler)
def _closed(self, code=1000, reason=None): self._is_open = False for hnd in self._close_handlers: try: hnd(self.websocket, code, reason) except Exception as e: log.debug(e)
[docs] def is_open(self): """Check if the client connection is open. :returns: ``True`` if the client connection is open, otherwise ``False``. """ return self._is_open
[docs]class wsHandler: """Wrapper for an incoming websocket connection. Used primarily with the :class:`Server` implementation in the client connections life-cycle management. :param websocket: :class:`websockets.WebSocketClientProtocol`, the underlying websocket connection. :param path: ``str``, the request path of the websocket connection. **Note**: This class is mainly used internally and as such it is a subject of chnages in its API. """ def __init__(self, websocket, path): self.ws = websocket self.path = path self.close_handlers = []
[docs] def trigger(self, websocket): """Triggers the close handlers for this websocket. :param websocket: :class:`websockets.WebSocketClientProtocol`, the underlying websocket connection. """ for hnd in self.close_handlers: try: hnd(self.ws, self.path) # pylint: disable=broad-except except Exception as ex: log.debug(ex)
[docs] def add_close_handler(self, hnd): """Register a close handler for this connection. :param hnd: ``function``, the close handler callback. The callback receives two parameters: * ``ws`` (:class:`websockets.WebSocketClientProtocol`), the underlying websocket connection. * ``path`` (``str``), the request path of the websocket. """ self.close_handlers.append(hnd)
[docs]class Server: """Listens for and manages multiple client connections. The server is based on :mod:`asyncio` event loop. It manages the websocket connections comming from multiple clients based on the path in the websocket request connection. Provides a way to register a callback for notifying when a client connects to a particular endpoint (path), and also a way to register a callback for when the client disconnects. Instances of this class are thread-safe. :param loop: :class:`asyncio.BaseEventLoop`, the event loop. :param host: ``str``, the hostname to bind to when listening fo incoming connections. :param port: ``int``, the port to listen on. """ def __init__(self, loop, host='localhost', port=4479): self.loop = loop self.host = host self.port = port self.websockets = {} self._started = False self.actions = {} self._stop_timeout = 10
[docs] def on_action(self, path, cb): """Register a callback to listen for messages from clients that connected to this specific entrypoint (path). The callback will be called whenever a new message is received from the client on this ``path``. If multiple callbacks are registered on the same action, then they are called one by one in the same order as registered. The response from the callbacks is chained between the subsequent calls. :param path: ``str``, the request path of the incoming websocket connection. :param cb: ``function``, the callback to be called when a message is received from the client on this path. The callback handler looks like this: .. code-block:: python def callback(path, message, websocket, resp): return resp where: * ``path`` ``str``, the path on which the message was received. * ``message`` ``str``, the messge received from the websocket connection. * ``websocket`` :class:`websockets.WebSocketClientProtocol`, the underlying websocket. * ``resp`` ``str``, the response from the previous action registered on this same action The callback must return ``str`` response or ``None``. """ actions = self.actions.get(path) if not actions: actions = self.actions[path] = [] actions.append(cb)
async def _on_client_connection(self, websocket, path): self.websockets[websocket] = wsHandler(websocket, path) try: while self._started: message = await websocket.recv() resp = await self._process_req(path, message, websocket) if resp is not None: await websocket.send(str(resp)) except websockets.ConnectionClosed: self._remove_websocket(websocket) log.debug('[Server:%s:%d] Closing websocket connection: %s', self.host, self.port, websocket) except Exception as e: self._remove_websocket(websocket) log.error('[Server:%s:%d] Closing websocket connection because of unknown error: %s', self.host, self.port, websocket) log.exception(e) def _remove_websocket(self, websocket): # self.websockets.remove(websocket) hnd = self.websockets.get(websocket) if hnd is not None: del self.websockets[websocket] hnd.trigger(websocket)
[docs] def on_websocket_close(self, websocket, cb): """Register a close callback for this websocket. :param websocket: :class:`websockets.WebSocketClientProtocol`, the websocket to watch for closing. :param cb: ``function``, the callback to be called when the ``websocket`` is closed. The callback should look like this: .. code-block:: python def callback(ws, path): pass where: * ``ws`` (:class:`websockets.WebSocketClientProtocol`), the underlying websocket connection. * ``path`` (``str``), the request path of the websocket. This method returns ``True`` if the callback was added; ``False`` if the websocket is not managed by this :class:`Server` instance. """ hnd = self.websockets.get(websocket) if hnd is not None: hnd.add_close_handler(cb) return True return False
async def _process_req(self, path, message, websocket): resp = '' for reg_path, actions in self.actions.items(): if reg_path == path: try: for action in actions: resp = action(path, message, websocket, resp) # pylint: disable=broad-except # Intended to be broad as it handles generic action except Exception as e: log.exception(e) return json.dumps({"error": str(e)}) break return resp
[docs] def start(self): """Starts the server. This call blocks until the server is started or an error occurs. """ start_server = websockets.serve(self._on_client_connection, self.host, self.port, loop=self.loop) self.loop.run_until_complete(start_server) self._started = True
[docs] def stop(self): """Stops the server. Closes all client websocket connections then shuts down the server. This operation blocks until the server stops or an error occurs. """ from time import sleep wss = [ws for ws, _ in self.websockets.items()] semaphore = {'value': len(wss)} for ws in wss: try: self._remove_websocket(ws) except Exception as exc: log.exception(exc) asyncio.run_coroutine_threadsafe(self._send_close(semaphore, ws, 'server stop'), self.loop) log.debug('Server notified to stop') total_wait = 0 while semaphore['value']: sleep(_WAIT_TIME) total_wait += _WAIT_TIME if total_wait > self._stop_timeout: break if semaphore['value']: log.warning('Stop timeout reached before all connections were closed. Server will stop anyway') else: log.debug('All done. Server stopped.')
async def _send_close(self, semaphore, websocket, reason=None): await websocket.close(code=1000, reason=reason) semaphore['value'] -= 1