Source code for theia.query

"""
-----------
theia.query
-----------

Query theia server for events.

This module contains the :class:`Query` that implements API for querying theia collector server.
"""
import asyncio
import json
from logging import getLogger
from theia.comm import Client


log = getLogger(__name__)


[docs]class ResultHandler: """Represents a result of a query against a theia collector server. The result of a query to the collector server is a stream of events. A callback (see ``callback`` in :meth:`Query.live`) can be set to be called whenever an :class:`theia.model.Event` is received, but the control to close the stream is passed down to a :class:`ResultHandler`. This class wraps the :class:`theia.comm.Client` used to connect to the collector server and adds support for registering on client closed events. It also adds means to close (cancel) the connection to the theia server. :param client: :class:`theia.comm.Client`, the underlying client to the theia server. """ def __init__(self, client): self.client = client self._close_handlers = [] self.client.on_close(self._on_client_closed) def _on_client_closed(self, websocket, code, reason): for hnd in self._close_handlers: try: hnd(self.client, code, reason) except Exception as e: log.debug('ResultHandler[%s]: close hander %s error: %s', self.client, hnd, e)
[docs] def when_closed(self, closed_handler): """Register a handler to be called when the connection to the server is closed. The handler has the following signature: .. code-block:: python def closed_handler(client, code, reason): pass where: * ``client`` - :class:`theia.comm.Client`, is the underlying client connected to the theia collector. * ``code`` - ``int``, wbsocket close connection code. * ``reason`` - ``str``, the reason for closing the connection. """ self._close_handlers.append(closed_handler)
[docs] def cancel(self): """Cancel this result. Closes the underlying client connection. """ if self.client.is_open(): self.client.close()
[docs]class Query: """Represents a query to a theia collector server. The query instances are thread-safe. :param host: ``str``, the hostname (IP) of the collector server. :param port: ``int``, the port of the collector server. :param secure: ``bool``, whether to connect securely to the collector server. :param loop: :class:`asyncio.BaseEventLoop`, the event loop to use. """ def __init__(self, host, port, secure=False, loop=None): self.connections = set() self.host = host self.port = port self.secure = secure self.loop = loop if loop is not None else asyncio.get_event_loop()
[docs] def live(self, criteria, callback=None): """Make a live query to the collector. The collector will watch for any events that occur **after** the live query is registered and return those that match the given ``criteria``. :param criteria: ``dict``, the criteria filter for the events. This is a ``dict`` that can contain the following keys: * ``id``, ``str``, pattern for matching the event id. * ``source``, ``str``, pattern for matching the event source. * ``start``, ``int``, match events with timestamp greater or equal to this. * ``end``, ``int``, match events with timestamp less than or equal to this. * ``content``, ``str``, pattern for matching the event content. * ``tags``, ``list``, list of patterns to match against the event tags. :param callback: ``function``, called with the matching event content. The method takes one argument, the serialized :class:`theia.model.Event`. :returns: a :class:`ResultHandler`. """ return self._connect_and_send('/live', criteria, callback)
[docs] def find(self, criteria, callback=None): """Make a query to the collector. The collector will search for any events that occured **before** theis query was sent to the server and will return those that match the given ``criteria``. :param criteria: ``dict``, the criteria filter for the events. This is a ``dict`` that can contain the following keys: * ``id``, ``str``, pattern for matching the event id. * ``source``, ``str``, pattern for matching the event source. * ``start``, ``int``, match events with timestamp greater or equal to this. * ``end``, ``int``, match events with timestamp less than or equal to this. * ``content``, ``str``, pattern for matching the event content. * ``tags``, ``list``, list of patterns to match against the event tags. :param callback: ``function``, called with the matching event content. The method takes one argument, the serialized :class:`theia.model.Event`. :returns: a :class:`ResultHandler`. """ return self._connect_and_send('/find', criteria, callback)
def _connect_and_send(self, path, criteria, callback): client = Client(loop=self.loop, host=self.host, port=self.port, path=path, recv=callback) client.connect() self.connections.add(client) def on_client_closed(websocket, code, reason): """Remove the underlying client from the connections set once its connection is closed. """ # client was closed if client in self.connections: self.connections.remove(client) client.on_close(on_client_closed) msg = json.dumps(criteria) client.send(msg) return ResultHandler(client)