"""
----------------
theia.naivestore
----------------
Naive implementation of the Event Store.
This module provides an implementation of the :class:`theia.storeapi.EventStore`
that stores the events in plain-text files.
The store writes to the files atomically, so there is no danger of leaving the
files in an inconsistent state.
The files in which the store keeps the events are plain text files that contain
serialized events, encoded in UTF-8. The events are written sequentially. Plain
text is chosen so that these files can be also be read and processed by other
tools (such as grep). The events are kept in multiple files. Each file contains
about a minute worth of events - all events that happened in that one minute time
span. The name of the file is the time span: <first-event-timestamp>-<last-event-timestamp>.
The naive store requires a root directory in which to store the events. Here is
an example of usage of the store:
.. code-block:: python
from theia.naivestore import NaiveEventStore
from theia.model import Event
from uuid import uuid4
from datetime import datetime
store = NaiveEventStore(root_dir='./data')
timestamp = datetime.now().timestamp()
store.save(Event(id=uuid4(),
source='test-example',
timestamp=timestamp,
tags=['example'],
content='event 1'))
store.save(Event(id=uuid4(),
source='test-example',
timestamp=timestamp + 10,
tags=['example'],
content='event 2'))
store.save(Event(id=uuid4(),
source='test-example',
timestamp=timestamp + 20,
tags=['example'],
content='event 3'))
# now let's search some events
for ev in store.search(ts_start=timestamp + 5):
print('Found:', ev.content)
would print::
>> Found: event 2
>> Found: event 3
"""
from tempfile import NamedTemporaryFile
from io import BytesIO, SEEK_CUR
from threading import RLock, Thread
from shutil import move
from os.path import join as join_paths, basename, dirname
from os import listdir
from collections import namedtuple
import re
import time
from logging import getLogger
from theia.storeapi import EventStore, EventStoreException
from theia.model import EventSerializer, EventParser, EOFException
log = getLogger(__name__)
[docs]class PeriodicTimer(Thread):
"""Timer that executes an action periodically with a given interval.
The timer executes the action in a separate thread (as this class is a subclass of :class:`threading.Thread`). To
run the action you must call :meth:`PeriodicTimer.start`.
The first execution of the action is delayed by ``interval`` seconds. This timer does not call the ``action``
callback every ``interval`` seconds, but rather waits ``interval`` seconds between after the action completes until
the next call. So for a long running tasks, the time of call of the action may not be evenly spaced.
You can cancel this timer by calling meth:`PeriodicTimer.cancel`.
:param interval: ``numeric``, seconds to wait between subsequent calls to ``action`` callback.
:param action: ``function``, the action callback. This callback takes no arguments.
"""
def __init__(self, interval, action):
super(PeriodicTimer, self).__init__(name='periodic-timer@%f:[%s]' % (interval, str(action)))
self.interval = interval
self.action = action
self.is_running = False
[docs] def run(self):
"""Runs the periodic timer.
Do **not** call this function directly, but rather call :meth:`PeriodicTimer.start` to run this thread.
To cancel the timer, call :meth:`PeriodicTimer.cancel`.
"""
self.is_running = True
time.sleep(self.interval)
while self.is_running:
try:
self.action()
except:
pass
time.sleep(self.interval)
[docs] def cancel(self):
"""Cancels the running timer.
The timer thread may continue running until the next cycle, then it exits.
"""
self.is_running = False
[docs]class SequentialEventReader:
"""Reads events (:class:`theia.model.Event`) from an incoming :class:`io.BytesIO` stream.
Uses an :class:`theia.model.EventParser` to parse the events from the incoming stream.
Provides two ways of parsing the events:
* Parsing the event fully - loads the header and the content of the event. See :meth:`SequentialEventReader.events`.
* Parsing only the event header - this skips the loading of the content. Useful for not wasting performance/memory
on loading and decoding the event content when not searching by the event content.
This reader implements the context manager interface and can be used in ``with`` statements. For example:
.. code-block:: python
with SequentialEventReader(stream, parser) as reader:
for event in reader.events():
print(event)
:param stream: :class:`io.BytesIO`, the incoming stream to read events from.
:param event_parser: :class:`theia.model.EventParser`, the parser used for parsing the events from the stream.
"""
def __init__(self, stream, event_parser):
self.stream = stream
self.parser = event_parser
[docs] def events(self):
"""Reads full events from the incoming stream.
Returns an iterator for the read events and yields :class:`theia.model.Event` as it becomes available in the
stream.
The iterator stops if there are no more events available in the stream or the stream closes.
"""
while True:
try:
yield self._actual_read()
except EOFException:
break
[docs] def events_no_content(self):
"""Reads events without content (just header) from the incoming stream.
Returns an iterator for the read events and yields :class:`theia.model.Event` as it becomes available in the
stream.
Note that the ``content`` property of the :class:`theia.model.Event` will always be set to None.
The iterator stops if there are no more events available in the stream or the stream closes.
"""
while True:
try:
yield self._actual_read(skip_content=True)
except EOFException:
break
[docs] def curr_event(self):
"""Reads an :class:`theia.model.Event` at the current position in the stream.
Reads the event fully.
Returns the :class:`theia.model.Event` at the current position of the stream or ``None`` if there are no more
available events to be read from the stream (the stream closes).
"""
try:
return self._actual_read()
except EOFException:
return None
def _actual_read(self, skip_content=False):
data = self.parser.parse_event(self.stream, skip_content=skip_content)
try:
self.stream.seek(1, SEEK_CUR)
except:
pass
return data
def __enter__(self):
"""Implements the context-manager enter method.
:returns: a reference to itself.
"""
return self
def __exit__(self, *args):
"""Implements the context-manager exiting method.
Closes the underlying stream.
"""
self.stream.close()
[docs]class MemoryFile:
"""File-system backed in-memory buffer.
This class wraps an :class:`io.BytesIO` buffer and backs it up with a real file in the file-system.
The writes go to the in-memory buffer, which then can be flushed to the actual file in the file-system.
The flushing of the buffer is atomic and consistent. The buffer is first flushed to a temporary file, then the
system buffers are synced, and then the temporary file is renamed as the actual file.
The instances of this class are thread-safe and can be shared between threads.
**Limitations:** This class is not optimized for large data files as it keeps all of the file data in memory. This
may cause a performance penalty in both speed and memory consumption when dealing with large files. In those cases
it is better to use other memory mapped file implementations.
:param name: ``str``, the name of the file in the file-system. This is just the filename, without the directory.
:param path: ``str``, the directory holding the file in the file-system.
"""
def __init__(self, name, path):
self.name = name
self.path = path
self.buffer = BytesIO()
self.lock = RLock()
[docs] def write(self, data):
"""Write to the data to the buffer.
This writes the data to the in-memory buffer.
:param data: ``bytes``, the data to be written to the buffer.
"""
try:
self.lock.acquire()
self.buffer.write(data)
finally:
self.lock.release()
[docs] def stream(self):
"""Returns a copy of the underlying :class:`io.BytesIO` in-memory stream.
"""
# copy the buffer
# return the copy
try:
self.lock.acquire()
return BytesIO(self.buffer.getvalue())
finally:
self.lock.release()
[docs] def flush(self):
"""Writes the in-memory buffer to the file in the file-system.
This operation is atomic and guarantees that the complete state of the buffer will be written to the file.
The underlying file will never be left in an inconsistent state. This is achieved by first writing the entire
buffer to a temporary file (in the same directory), flushing the system buffers, then if this succeeds, renaming
the temporary file as the original file name.
"""
tmpf = NamedTemporaryFile(dir=self.path, delete=False)
try:
self.lock.acquire()
tmpf.write(self.buffer.getvalue())
tmpf.flush()
move(tmpf.name, join_paths(self.path, self.name))
finally:
self.lock.release()
DataFile = namedtuple('DataFile', ['path', 'start', 'end'])
"""Represents a file containing data (events) within a given time interval (from ``start`` to ``end``).
"""
DataFile.path.__doc__ = """
``str``, the path to the data file.
"""
DataFile.start.__doc__ = """
``int``, timestamp, the start of the interval. The data file does not contain any events before this timestamp.
"""
DataFile.end.__doc__ = """
``int``, timestamp, the end of the interval. The data file does not contain any events after this timestamp.
"""
[docs]def binary_search(datafiles, timestamp):
"""Performs a binary search in the list of datafiles, for the index of the first data file that contain events that
happened at or after the provided timestamp.
:param datafiles: ``list`` of :class:`DataFile`, list of datafiles **sorted** in ascending order by the start
timestamp.
:param timestamp: ``float``, the timestamp to serch for.
:returns: the index (``int``) of the first :class:`DataFile` in the list that contains event that occurred at or
after the provided timestamp. Returns ``None`` if no such data file can be found.
"""
start = 0
end = len(datafiles) - 1
if not datafiles:
return None
if datafiles[0].start > timestamp or datafiles[-1].end < timestamp:
return None
while True:
mid = (end + start) // 2
if datafiles[mid].end >= timestamp:
end = mid
else:
start = mid
if end - start <= 1:
if datafiles[start].end >= timestamp:
return start
return end
return None
[docs]class FileIndex:
"""An index of :class:`DataFile` files loaded from the given directory.
Loads an builds an index of :class:`DataFile` files from the given directory. Each data file name must be in the
form: ``<start>-<end>``, where ``start`` and ``end`` represent the time interval of the events in that data file.
:class:`FileIndex` loads all data files and builds a total time span of all data files. The index exposes methods
for locating and searching files that contain the events within a given time interval.
:param root_dir: ``str``, the directory from which to load the data files.
"""
def __init__(self, root_dir):
self.root = root_dir
self.files = self._load_files(root_dir)
def _load_files(self, root_dir):
files = []
for file_name in listdir(root_dir):
data_file = self._load_data_file(file_name)
if data_file:
files.append(data_file)
files = sorted(files, key=lambda n: n.start)
log.info('Loaded %d files to index.', len(files))
if files:
log.info('Spanning from %d to %d', files[0].start, files[-1].end)
return files
def _load_data_file(self, fname):
if re.match(r'\d+-\d+', fname):
start, _, end = fname.partition('-')
start, end = int(start), int(end)
return DataFile(path=join_paths(self.root, fname), start=start, end=end)
return None
[docs] def find(self, ts_from, ts_to):
"""Finds the data files that contain the events within the given interval [``ts_from``, ``ts_to``].
The interval can be open at the end ([``ts_from``, ``Inf``]), by passing ``0`` for ``ts_to``. The interval
cannot be open at the start.
:param ts_from: ``float``, timestamp, find all data files containing events that have timestamp greater than or
equal to ``ts_from``.
:param ts_to: ``float``, timestamp, find all data files containing events that have timestamp less than or equal
to ``ts_to``. If ``0`` or ``None`` is passed for this parameter, then the end of the time span is open,
meaning this parameter will be ignore in the search.
:returns: a ``list`` of :class:`DataFile` files that contain the events within the given interval. Returns
``None`` if there are no files containing events within the given interval.
"""
idx = binary_search(self.files, ts_from)
if idx is None and self.files:
if self.files[0].start >= ts_from:
idx = 0
elif self.files[-1].end <= ts_from:
idx = len(self.files) - 1
if idx is not None:
found = []
while idx < len(self.files):
data_file = self.files[idx]
if ts_to:
if data_file.start > ts_to:
break # hit the last
if data_file.start >= ts_from or data_file.end >= ts_from:
found.append(data_file)
idx += 1
return found if found else None
return None
[docs] def find_event_file(self, timestamp):
"""Find the event file that contains the event with the given timestamp.
:param timestamp: ``int``, the timestamp of the event.
:returns: the :class:`DataFile` containing the event, or ``None`` if it cannot be found.
"""
idx = binary_search(self.files, timestamp)
if idx is not None:
return self.files[idx]
return None
[docs] def add_file(self, fname):
"""Add a data file to the :class:`FileIndex`.
The time span will be recalculated to incorporate this new data file.
:param fname: ``str``, the file name of the data file to be added to the file index.
"""
data_file = self._load_data_file(fname)
if data_file:
self.files.append(data_file)
self.files = sorted(self.files, key=lambda n: n.start)
[docs]class NaiveEventStore(EventStore):
"""A naive implementation of the :class:`theia.storeapi.EventStore` that keeps the event data is a plain text files.
The events are kept in plain text files, serialized by default in UTF-8. The files are human readable and the format
is designed to be (relatively) easily processed by other tools as well (such as ``grep``). Each data file contains
events that happened within one minute (60000ms). The names of the data files also reflect the time span interval,
so for example a file with name *1528631988-1528632048* contains only events that happened at or after
``1528631988``, but before ``1528632048``.
The store by default uses in-memory buffers to write new events, and flushes the buffer periodically. By default
the flushing occurs roughly every second (1000ms, see the parameter ``flush_interval``). This increases the
performance of the store, but if outage occurs within this interval, the data in the in-memory buffers will be lost.
The store can be configured to flush the events immediately on disk (by passing ``0`` for ``flush_interval``), but
this decreases the performance of the store significantly.
The instances of this class are thread-safe and can be shared between threads.
:param root_dir: ``str``, the root directory where to store the events data files.
:param flush_interval: ``int``, flush interval for the data files buffers in milliseconds. The event data files will
be flushed and persisted on disk every ``flush_interval`` milliseconds. The default value is 1000ms. To flush
immediately (no buffering), set this value equal or less than ``0``.
"""
def __init__(self, root_dir, flush_interval=1000):
self.root_dir = root_dir
self.data_file_interval = 60 # 60 seconds
self.serializer = EventSerializer()
self.index = FileIndex(root_dir)
self.open_files = {}
self.write_lock = RLock()
self.flush_interval = flush_interval # <=0 immediate, otherwise will flush periodically
self.timer = None
if flush_interval > 0:
self.timer = PeriodicTimer(flush_interval / 1000, self._flush_open_files)
self.timer.start()
log.info('Flushing buffers every %fms', (flush_interval / 1000))
def _get_event_file(self, ts_from):
data_file = self.index.find_event_file(ts_from)
if not data_file:
try:
self.write_lock.acquire()
data_file = self._get_new_data_file(ts_from)
self.index.add_file(basename(data_file.path))
finally:
self.write_lock.release()
return data_file
def _open_file(self, data_file):
return MemoryFile(name=basename(data_file.path), path=dirname(data_file.path))
def _flush_open_files(self):
for file_name, open_file in self.open_files.items():
try:
open_file.flush()
except Exception as e:
log.error('Error while flushing %s. Error: %s', file_name, e)
def _get_new_data_file(self, ts_from):
ts_end = ts_from + self.data_file_interval
return DataFile(join_paths(self.root_dir, '%d-%d' % (ts_from, ts_end)), ts_from, ts_end)
[docs] def save(self, event):
# lookup file/create file
# lock it
# save serialized event
# unlock
data_file = self._get_event_file(event.timestamp)
try:
self.write_lock.acquire()
if not self.open_files.get(data_file.path):
self.open_files[data_file.path] = self._open_file(data_file)
mem_file = self.open_files[data_file.path]
mem_file.write(self.serializer.serialize(event))
mem_file.write('\n'.encode(self.serializer.encoding))
if self.flush_interval <= 0:
mem_file.flush()
finally:
self.write_lock.release()
[docs] def search(self, ts_start, ts_end=None, flags=None, match=None, order='asc'):
if order not in ['asc', 'desc']:
raise EventStoreException('Order must be one of "asc" or "desc".')
data_files = self.index.find(ts_start, ts_end)
match_fn = self._match_forward if order == 'asc' else self._match_reverse
if data_files:
for data_file in data_files:
for event in match_fn(data_file, ts_start, ts_end, flags, match):
yield event
[docs] def close(self):
self._flush_open_files()
if self.timer:
self.timer.cancel()
self.timer.join()
log.info('Naive Store stopped')
def _search_data_file(self, data_file, ts_start, ts_end, flags, match, reverse):
if reverse:
yield from self._match_reverse(data_file, ts_start, ts_end, flags, match)
else:
yield from self._match_forward(data_file, ts_start, ts_end, flags, match)
def _match_forward(self, data_file, ts_start, ts_end, flags, match):
with self._seq_event_parser(data_file) as sqp:
for event in sqp.events():
if event.timestamp >= ts_start and (ts_end is None or event.timestamp <= ts_end):
if self._matches(event, flags, match):
yield event
def _match_reverse(self, data_file, ts_start, ts_end, flags, match):
raise Exception('Reverse order (desc) not supported right now.')
def _seq_event_parser(self, data_file):
return SequentialEventReader(open(data_file.path, 'rb'), EventParser())
def _matches(self, event, flags, match):
if flags is not None and event.tags:
for flag in flags:
if not flag in event.tags:
return False
if match and event.content and match:
if not re.search(match, event.content):
return False
return True
[docs] def get(self, event_id):
""":class:`NaiveEventStore` does not support indexing, so search by ``id`` is also not supported.
"""
log.warning('Get event by id [%s], but operation "get" is not supported in NaiveEventStore.', event_id)
raise Exception('Get event not supported.')
[docs] def delete(self, event_id):
""":class:`NaiveEventStore` does not support indexing, so a delete by ``id`` is not supported.
"""
log.warning('Delete event by id [%s], but operation "delete" is not supported in NaiveEventStore.', event_id)
raise Exception('Delete event not supported.')