Source code for theia.model

"""
-----------
theia.model
-----------

Theia event data model.

Basic model of an Event, serializers and parsers for Event manipulation.
"""

from time import time
from collections import namedtuple
from io import StringIO, SEEK_CUR
import re


EventPreamble = namedtuple('EventPreamble', ['total', 'header', 'content'])
"""A preamble to an :class:`Event`.

The preamble is present only in the serialized representation of an event and
holds the information about the size of the event and its parts.
"""

EventPreamble.total.__doc__ = """
    ``int``, the size of the serialized event in bytes.
"""

EventPreamble.header.__doc__ = """
    ``int``, the size of the serialized event header in bytes.
"""

EventPreamble.content.__doc__ = """
    ``int``, the size of the serialized event content in bytes.
"""





[docs]class Event: """Event represnts some event occuring at a specfic time. Each event is uniquely identified by its ``id`` in the whole system. An event comes from a ``source`` and always has an associated ``timestamp``. The timestamp is usually generated by the event producer. The *content* of an event is an arbitrary string. It may be a log file line, some generated record, readings from a sensor or other non-structured or structured text. Each event may have a list of ``tags`` associcated with it. These are arbitrary strings and help in filtering the events. An event may look like this :: id:331c531d-6eb4-4fb5-84d3-ea6937b01fdd timestamp: 1509989630.6749051 source:/dev/sensors/door1-sensor tags:sensors,home,doors,door1 Door has been unlocked. The constructor takes multiple arguments, of which only the id and source are required. :param id: ``str``, the event unique identifier. Must be system-wide unique. An UUID version 4 (random UUID) would be a good choice for ``id``. :param source: ``str``, the source of the event. It usually is the name of the monitored file, but if the event does not originate from a file, it should be set to the name of the process, system or entity that generated the event. :param timestamp: ``float``, time when the event occured in seconds (like UNIX time). The value is a floating point number with nanoseconds precission. If no value is given, then the current time will be used. :param tags: ``list``, list of ``str`` tags to add to this event. :param content: ``str``, the actual content of the event. The content may have an arbitrary lenght (or none at all). """ def __init__(self, id, source, timestamp=None, tags=None, content=None): self.id = id self.source = source self.timestamp = timestamp or time() # time in nanoseconds UTC self.tags = tags or [] self.content = content or ''
[docs] def match(self, id=None, source=None, start=None, end=None, content=None, tags=None): """Check if this event matches the provided criteria. The event will match only if **all** criteria is statisfied. Calling match without any criteria, yields ``True``. The criteria is processed as a regular expression. Each value is first converted to string, then matched against the provided regular expression - see :func:`re.match`. The exception of this rule are the criteria for ``start`` and ``end`` wich expect numeric values, as they operate on the :class:`Event` timestamp. :param id: ``str``, regular expression against which to match the :class:`Event` ``id``. :param source: ``str``, regular expression against which to match the :class:`Event` ``source``. :param start: ``float`` or ``int``, match true if the :class:`Event` timestamp is greater than or equal to this value. :param start: ``float`` or ``int``, match true if the :class:`Event` timestamp is less than or equal to this value. :param content: ``str``, regular expression against which to match the :class:`Event` ``content``. :param tags: ``list``, list of ``str`` regular expressions against which to match the :class:`Event` tags. Matches true only if **all** of the provided criteria match the Event tags. :returns: ``True`` if this :class:`Event` matches the criteria, otherwise ``False``. """ return all([self._match_header_id_and_source(id, source), self._match_timestamp(start, end), self._match_tags(tags), self._match_content(content)])
def _match_header_id_and_source(self, id, source): matches = True if id is not None: matches = _match(id, self.id) if matches and source is not None: matches = _match(source, self.source) return matches def _match_timestamp(self, start, end): matches = True if self.timestamp: if start is not None: matches = self.timestamp >= start if matches and end is not None: matches = self.timestamp <= end return matches def _match_tags(self, tags): if tags: for tag in tags: if tag not in self.tags: return False return True def _match_content(self, content): if not content: return True return _match(content, self.content)
def _match(pattern, value): """Match the value against a regular expression pattern. """ if value is None: return False return re.match(pattern, value) is not None
[docs]class EventSerializer: """Serialized for instances of type :class:`Event`. This serializes the :class:`Event` in a plain text representation of the Event. The serialized text is encoded in UTF-8 and the actual ``bytes`` are returned. The representation consists of three parts: preamble, header and content. The preamble is the first line of every event and has this format: event: <total_size> <header_size> <content_size> where: * ``total_size`` is the total size of the Event (after the heading) in bytes. * ``header_size`` is the size of the header in bytes. * ``content_size`` is the size of the content (after the Header) in bytes. The header holds the values for the Event's id, source, tags and timestamp. Each value is serialized on a single line. The line starts with the name of the property, separated by a colon(``:``), then the property value. The content starts after the final header and is separated by a newline. Here is an example of a fully serialized :class:`Event` (Python's ``bytes``):: b'event: 155 133 22\\nid:331c531d-6eb4-4fb5-84d3-ea6937b01fdd\\ntimestamp: 1509989630.6749051\\nsource:/dev/sensors/door1-sensor\\ntags:sensors,home,doors,door1\\nDoor has been unlocked\\n' or as a textual representation:: event: 155 133 22 id:331c531d-6eb4-4fb5-84d3-ea6937b01fdd timestamp: 1509989630.6749051 source:/dev/sensors/door1-sensor tags:sensors,home,doors,door1 Door has been unlocked **Note** that the :class:`EventSerializer` adds a trailing newline (``\\n``) at the end. The serializer constructor takes the encoding as an argument. By default "utf-8" is used. :param encoding: ``str``, the encoding of the serialized string. Default ``utf-8``. """ def __init__(self, encoding='utf-8'): self.encoding = encoding
[docs] def serialize(self, event): """Serializes an :class:`Event`. See :class:`EventSerializer` for details on the serialization format. :param event: :class:`Event`, the event to be serialized. :returns: the serialized event as ``bytes``. """ event_str = '' hdr = self._serialize_header(event) hdr_size = len(hdr.encode(self.encoding)) cnt = event.content or '' cnt_size = len(cnt.encode(self.encoding)) total_size = hdr_size + cnt_size event_str += 'event: %d %d %d\n' % (total_size, hdr_size, cnt_size) event_str += hdr event_str += cnt event_str += '\n' return event_str.encode(self.encoding)
def _serialize_header(self, event): hdr = '' hdr += 'id:' + str(event.id) + '\n' hdr += 'timestamp: %.7f' % event.timestamp + '\n' hdr += 'source:' + str(event.source) + '\n' hdr += 'tags:' + ','.join(event.tags) + '\n' return hdr
_HEADER_FIELDS = { 'id': lambda value: value, 'timestamp': lambda value: float(value), 'source': lambda value: value, 'tags': lambda value: value.split(',') }
[docs]class EventParser: """Parses an incoming bytes stream into an :class:`Event`. Offers methods for parsing parts of an :class:`Event` or parsing the full event from the incoming :class:`io.BytesIO` stream. The stream will be decoded before converting it to ``str``. By default the parser assumes that the stream is ``utf-8`` encoded. :param encoding: ``str``, the encoding to be ued for decoding the stream bytes. The default is ``utf-8``. """ def __init__(self, encoding='utf-8'): self.encoding = encoding
[docs] def parse_header(self, hdr_size, stream): """Parses the :class:`Event` header into a :class:`Header` object from the incoming stream. First ``hdr_size`` bytes are read from the :class:`io.BytesIO` stream and are decoded to ``str``. Then, the parser parses each line by splitting it by the first colon (``:``). The first part is ued to determine the :class:`Header` property. The part after the colon is the propery value. :param hdr_size: ``int``, the size of the header in bytes. See :meth:`EventParser.parse_preamble` on how to determine the header size in bytes. :param stream: :class:`io.BytesIO`, the incoming stream to parse. :returns: the parsed :class:`Header` for the event. Raises :class:`Exception` if an unknown property is encountered in the header. """ hbytes = stream.read(hdr_size) if len(hbytes) != hdr_size: raise Exception('Invalid read size from buffer. The stream is either unreadable \ or corrupted. %d read, expected %d' % (len(hbytes), hdr_size)) hdr_str = hbytes.decode(self.encoding) header = Header() sio = StringIO(hdr_str) line = sio.readline() while line: line = line.strip() if not line: raise Exception('Invalid header') idx = line.index(':') prop = line[0:idx].lower() value = line[idx + 1:] parse_field = _HEADER_FIELDS.get(prop) if not parse_field: raise Exception('Unknown property in header %s' % prop) setattr(header, prop, parse_field(value)) line = sio.readline() sio.close() return header
[docs] def parse_preamble(self, stream): """Parses the event preamble from the incoming stream into a :class:`EventPreamble`. The event preamble is a single line read from the stream with the following structure:: <total_size> <header_size> <content_size> where: * ``total_size`` is the total size of the Event (after the heading) in bytes. * ``header_size`` is the size of the header in bytes. * ``content_size`` is the size of the content (after the Header) in bytes. Note that the sizes are expressed in bytes. :param stream: :class:`io.BytesIO`, the stream to parse. :returns: the parsed :class:`EventPreamble` from the incoming stream. """ pstr = stream.readline() if not pstr: raise EOFException() if pstr: pstr = pstr.decode(self.encoding).strip() if not pstr.startswith('event:'): raise Exception('Invalid preamble line: [%s]' % pstr) values = pstr[len('event:') + 1:].split(' ') if len(values) != 3: raise Exception('Invalid preamble values') return EventPreamble(total=int(values[0]), header=int(values[1]), content=int(values[2]))
[docs] def parse_event(self, stream, skip_content=False): """Parses an event from the incoming stream. The parsing is done in two phases: 1. The preamble is parsed to determine the total size of the event and the the size of the event header. 2. Then the actual event is read, either with or without the event content. If ``skip_content`` is set to ``True``, then the actual content of the event is not read. This is usefull in event readers that do matching of the event header values, without wasting memory and performance for reading the content. In this case, the event content will be set to ``None``. :param stream: :class:`io.BytesIO`, the stream to parse from. :param skip_content: ``bool``, whether to skip the fetching of the content and to fetch only the :class:`Event` header. Default is ``False``. :returns: the parsed :class:`Event` from the incoming stream. """ preamble = self.parse_preamble(stream) header = self.parse_header(preamble.header, stream) content = None read_length = None if skip_content: stream.seek(preamble.content, SEEK_CUR) else: content = stream.read(preamble.content) read_length = len(content) content = content.decode(self.encoding) stream.seek(1, SEEK_CUR) # new line after each event if not skip_content and read_length != preamble.content: raise Exception('Invalid content size. The stream is either unreadable or corrupted. ' + 'Preamble declares %d bytes, but content length is %d' % (preamble.content, len(content))) return Event(id=header.id, source=header.source, timestamp=header.timestamp, tags=header.tags, content=content)
[docs]class EOFException(Exception): """Represents an error in parsing an :class:`Event` from an underlyign stream that occurs when the end of stream is reached prematurely. """ pass