"""
-----------
theia.model
-----------
Theia event data model.
Basic model of an Event, serializers and parsers for Event manipulation.
"""
from time import time
from collections import namedtuple
from io import StringIO, SEEK_CUR
import re
EventPreamble = namedtuple('EventPreamble', ['total', 'header', 'content'])
"""A preamble to an :class:`Event`.
The preamble is present only in the serialized representation of an event and
holds the information about the size of the event and its parts.
"""
EventPreamble.total.__doc__ = """
``int``, the size of the serialized event in bytes.
"""
EventPreamble.header.__doc__ = """
``int``, the size of the serialized event header in bytes.
"""
EventPreamble.content.__doc__ = """
``int``, the size of the serialized event content in bytes.
"""
[docs]class Event:
"""Event represnts some event occuring at a specfic time.
Each event is uniquely identified by its ``id`` in the whole system. An event
comes from a ``source`` and always has an associated ``timestamp``. The
timestamp is usually generated by the event producer.
The *content* of an event is an arbitrary string. It may be a log file line,
some generated record, readings from a sensor or other non-structured or
structured text.
Each event may have a list of ``tags`` associcated with it. These are arbitrary
strings and help in filtering the events.
An event may look like this ::
id:331c531d-6eb4-4fb5-84d3-ea6937b01fdd
timestamp: 1509989630.6749051
source:/dev/sensors/door1-sensor
tags:sensors,home,doors,door1
Door has been unlocked.
The constructor takes multiple arguments, of which only the id and source are
required.
:param id: ``str``, the event unique identifier. Must be system-wide unique. An
UUID version 4 (random UUID) would be a good choice for ``id``.
:param source: ``str``, the source of the event. It usually is the name of the
monitored file, but if the event does not originate from a file, it should be
set to the name of the process, system or entity that generated the event.
:param timestamp: ``float``, time when the event occured in seconds (like UNIX
time). The value is a floating point number with nanoseconds precission. If
no value is given, then the current time will be used.
:param tags: ``list``, list of ``str`` tags to add to this event.
:param content: ``str``, the actual content of the event. The content may have
an arbitrary lenght (or none at all).
"""
def __init__(self, id, source, timestamp=None, tags=None, content=None):
self.id = id
self.source = source
self.timestamp = timestamp or time() # time in nanoseconds UTC
self.tags = tags or []
self.content = content or ''
[docs] def match(self, id=None, source=None, start=None, end=None, content=None, tags=None):
"""Check if this event matches the provided criteria.
The event will match only if **all** criteria is statisfied. Calling match
without any criteria, yields ``True``.
The criteria is processed as a regular expression. Each value is first
converted to string, then matched against the provided regular expression -
see :func:`re.match`. The exception of this rule are the criteria for
``start`` and ``end`` wich expect numeric values, as they operate on the
:class:`Event` timestamp.
:param id: ``str``, regular expression against which to match the :class:`Event`
``id``.
:param source: ``str``, regular expression against which to match the :class:`Event`
``source``.
:param start: ``float`` or ``int``, match true if the :class:`Event` timestamp
is greater than or equal to this value.
:param start: ``float`` or ``int``, match true if the :class:`Event` timestamp
is less than or equal to this value.
:param content: ``str``, regular expression against which to match the :class:`Event`
``content``.
:param tags: ``list``, list of ``str`` regular expressions against which to
match the :class:`Event` tags. Matches true only if **all** of the provided
criteria match the Event tags.
:returns: ``True`` if this :class:`Event` matches the criteria, otherwise ``False``.
"""
return all([self._match_header_id_and_source(id, source),
self._match_timestamp(start, end),
self._match_tags(tags),
self._match_content(content)])
def _match_header_id_and_source(self, id, source):
matches = True
if id is not None:
matches = _match(id, self.id)
if matches and source is not None:
matches = _match(source, self.source)
return matches
def _match_timestamp(self, start, end):
matches = True
if self.timestamp:
if start is not None:
matches = self.timestamp >= start
if matches and end is not None:
matches = self.timestamp <= end
return matches
def _match_tags(self, tags):
if tags:
for tag in tags:
if tag not in self.tags:
return False
return True
def _match_content(self, content):
if not content:
return True
return _match(content, self.content)
def _match(pattern, value):
"""Match the value against a regular expression pattern.
"""
if value is None:
return False
return re.match(pattern, value) is not None
[docs]class EventSerializer:
"""Serialized for instances of type :class:`Event`.
This serializes the :class:`Event` in a plain text representation of the Event.
The serialized text is encoded in UTF-8 and the actual ``bytes`` are returned.
The representation consists of three parts: preamble, header and content.
The preamble is the first line of every event and has this format:
event: <total_size> <header_size> <content_size>
where:
* ``total_size`` is the total size of the Event (after the heading) in bytes.
* ``header_size`` is the size of the header in bytes.
* ``content_size`` is the size of the content (after the Header) in bytes.
The header holds the values for the Event's id, source, tags and timestamp.
Each value is serialized on a single line. The line starts with the name of
the property, separated by a colon(``:``), then the property value.
The content starts after the final header and is separated by a newline.
Here is an example of a fully serialized :class:`Event` (Python's ``bytes``)::
b'event: 155 133 22\\nid:331c531d-6eb4-4fb5-84d3-ea6937b01fdd\\ntimestamp: 1509989630.6749051\\nsource:/dev/sensors/door1-sensor\\ntags:sensors,home,doors,door1\\nDoor has been unlocked\\n'
or as a textual representation::
event: 155 133 22
id:331c531d-6eb4-4fb5-84d3-ea6937b01fdd
timestamp: 1509989630.6749051
source:/dev/sensors/door1-sensor
tags:sensors,home,doors,door1
Door has been unlocked
**Note** that the :class:`EventSerializer` adds a trailing newline (``\\n``) at
the end.
The serializer constructor takes the encoding as an argument. By default "utf-8"
is used.
:param encoding: ``str``, the encoding of the serialized string. Default ``utf-8``.
"""
def __init__(self, encoding='utf-8'):
self.encoding = encoding
[docs] def serialize(self, event):
"""Serializes an :class:`Event`.
See :class:`EventSerializer` for details on the serialization format.
:param event: :class:`Event`, the event to be serialized.
:returns: the serialized event as ``bytes``.
"""
event_str = ''
hdr = self._serialize_header(event)
hdr_size = len(hdr.encode(self.encoding))
cnt = event.content or ''
cnt_size = len(cnt.encode(self.encoding))
total_size = hdr_size + cnt_size
event_str += 'event: %d %d %d\n' % (total_size, hdr_size, cnt_size)
event_str += hdr
event_str += cnt
event_str += '\n'
return event_str.encode(self.encoding)
def _serialize_header(self, event):
hdr = ''
hdr += 'id:' + str(event.id) + '\n'
hdr += 'timestamp: %.7f' % event.timestamp + '\n'
hdr += 'source:' + str(event.source) + '\n'
hdr += 'tags:' + ','.join(event.tags) + '\n'
return hdr
_HEADER_FIELDS = {
'id': lambda value: value,
'timestamp': lambda value: float(value),
'source': lambda value: value,
'tags': lambda value: value.split(',')
}
[docs]class EventParser:
"""Parses an incoming bytes stream into an :class:`Event`.
Offers methods for parsing parts of an :class:`Event` or parsing the full
event from the incoming :class:`io.BytesIO` stream.
The stream will be decoded before converting it to ``str``. By default the
parser assumes that the stream is ``utf-8`` encoded.
:param encoding: ``str``, the encoding to be ued for decoding the stream bytes.
The default is ``utf-8``.
"""
def __init__(self, encoding='utf-8'):
self.encoding = encoding
[docs] def parse_preamble(self, stream):
"""Parses the event preamble from the incoming stream into a :class:`EventPreamble`.
The event preamble is a single line read from the stream with the following
structure::
<total_size> <header_size> <content_size>
where:
* ``total_size`` is the total size of the Event (after the heading) in bytes.
* ``header_size`` is the size of the header in bytes.
* ``content_size`` is the size of the content (after the Header) in bytes.
Note that the sizes are expressed in bytes.
:param stream: :class:`io.BytesIO`, the stream to parse.
:returns: the parsed :class:`EventPreamble` from the incoming stream.
"""
pstr = stream.readline()
if not pstr:
raise EOFException()
if pstr:
pstr = pstr.decode(self.encoding).strip()
if not pstr.startswith('event:'):
raise Exception('Invalid preamble line: [%s]' % pstr)
values = pstr[len('event:') + 1:].split(' ')
if len(values) != 3:
raise Exception('Invalid preamble values')
return EventPreamble(total=int(values[0]), header=int(values[1]), content=int(values[2]))
[docs] def parse_event(self, stream, skip_content=False):
"""Parses an event from the incoming stream.
The parsing is done in two phases:
1. The preamble is parsed to determine the total size of the event and the
the size of the event header.
2. Then the actual event is read, either with or without the event content.
If ``skip_content`` is set to ``True``, then the actual content of the event
is not read. This is usefull in event readers that do matching of the event
header values, without wasting memory and performance for reading the content.
In this case, the event content will be set to ``None``.
:param stream: :class:`io.BytesIO`, the stream to parse from.
:param skip_content: ``bool``, whether to skip the fetching of the content and
to fetch only the :class:`Event` header. Default is ``False``.
:returns: the parsed :class:`Event` from the incoming stream.
"""
preamble = self.parse_preamble(stream)
header = self.parse_header(preamble.header, stream)
content = None
read_length = None
if skip_content:
stream.seek(preamble.content, SEEK_CUR)
else:
content = stream.read(preamble.content)
read_length = len(content)
content = content.decode(self.encoding)
stream.seek(1, SEEK_CUR) # new line after each event
if not skip_content and read_length != preamble.content:
raise Exception('Invalid content size. The stream is either unreadable or corrupted. ' +
'Preamble declares %d bytes, but content length is %d' % (preamble.content, len(content)))
return Event(id=header.id, source=header.source, timestamp=header.timestamp,
tags=header.tags, content=content)
[docs]class EOFException(Exception):
"""Represents an error in parsing an :class:`Event` from an underlyign stream
that occurs when the end of stream is reached prematurely.
"""
pass