"""
----------
theia.rdbs
----------
Relational database EventStore implementation.
"""
import re
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, String, Float, create_engine, desc
from sqlalchemy.orm import sessionmaker
from theia.storeapi import (EventStore,
EventWriteException,
EventReadException,
EventNotFound,
EventStoreException)
from theia.model import Event
Base = declarative_base()
[docs]class EventRecord(Base):
"""SQLAlchemy model representing the event.
"""
__tablename__ = 'events'
id = Column(String, primary_key=True)
timestamp = Column(Float, index=True)
tags = Column(String)
source = Column(String, index=True)
content = Column(String, index=True)
custom_headers = Column(String)
def __repr__(self):
return 'Event<%s @ %s>' % (self.id, str(self.timestamp))
def __str__(self):
return self.__repr__()
def __eq__(self, obj):
if obj is None:
return False
if not isinstance(obj, EventRecord):
return False
return self.id == obj.id
def __hash__(self):
return hash(self.id)
[docs]class RDBSEventStore(EventStore):
"""EventStore that persists the events in a relational database.
The implementation relies on SQLAlchemy ORM framework.
:param session_factory: the SQLAlchemy SessionMaker function.
"""
def __init__(self, session_factory):
self.session_factory = session_factory
self.bulk_size = 128
def _session(self):
"""Creates new session.
"""
return self.session_factory()
[docs] def save(self, event):
"""Stores the event in the underlying database.
Note that this method only does *INSERT* as the ``EventStore`` has no concept of *UPDATE* -
each event is only added and cannot be updated. Adding the same event twice will result in
an error.
:param event (theia.model.Event): the event to save.
This method does not return a value.
"""
event_record = EventRecord(id=event.id,
timestamp=float(event.timestamp),
tags=','.join((event.tags or [])),
source=event.source,
content=event.content)
sess = self._session()
try:
sess.add(event_record)
sess.commit()
finally:
sess.close()
[docs] def delete(self, event_id):
"""Deletes the event with the given event id.
:param event_id(str): the event id
"""
sess = self._session()
try:
event_record = sess.query(EventRecord).get(event_id)
if not event_record:
raise EventNotFound()
sess.delete(event_record)
sess.commit()
except Exception as e:
raise EventWriteException(str(e)) from e
finally:
sess.close()
[docs] def get(self, event_id):
"""Looks up an event by its id.
:param event_id(str): the event id.
:returns: a ``theia.model.Event`` if the event was found or ```theia.storeapi.EventNotFound``` if
no such event can be found.
"""
sess = self._session()
try:
event_record = sess.query(EventRecord).get(event_id)
if not event_record:
raise EventNotFound()
return Event(id=event_record.id,
timestamp=str(event_record.timestamp),
tags=(event_record.tags or '').split(','),
source=event_record.source,
content=event_record.content)
except EventNotFound as ne:
raise ne
except Exception as e:
raise EventReadException(str(e)) from e
finally:
sess.close()
[docs] def search(self, ts_start, ts_end=None, flags=None, match=None, order='asc'):
"""Performs a search through the stored events.
:param ts_start(float): *required*, match all events that occured at or later than this time.
:param ts_end(float): *optional*, match all events that occured before this time.
:param flags(list): *optional*, list of string values (regular expressions) against which to match
the event tags. Event matches only if *all* flags are matched against the event's tags.
:param match(str): *optional*, match the content of an event. This is a regular expession as well.
:param order(str): either ``asc`` or ``desc`` - sort the results ascending or descending based on the
event timestamp.
:returns: an iterator over all matched results.
"""
if not ts_start:
raise EventStoreException('start timestamp is required when searching events')
flag_matchers = None
content_matcher = None
if flags:
try:
flag_matchers = [re.compile(f) for f in flags]
except Exception:
raise EventStoreException('invalid flags match regex.')
if match:
try:
content_matcher = re.compile(match)
except Exception:
raise EventStoreException('invalid content match regex.')
def get_bulk(blk):
"""Fetch single page bulk results.
"""
sess = self._session()
qry = sess.query(EventRecord).filter(EventRecord.timestamp >= ts_start)
if ts_end:
qry.filter(EventRecord.timestamp <= ts_end)
if order == 'asc':
qry.order_by(EventRecord.timestamp)
else:
qry.order_by(desc(EventRecord.timestamp))
results = []
count = 0
for event_record in qry.limit(self.bulk_size).offset(blk*self.bulk_size).all():
count += 1
flags = (event_record.tags or '').split(',')
if flag_matchers:
if not match_all(flag_matchers, flags):
continue
if content_matcher:
if not content_matcher.fullmatch(event_record.content or ''):
continue
event = Event(id=event_record.id,
timestamp=event_record.timestamp,
tags=flags,
source=event_record.source or '',
content=event_record.content)
results.append(event)
return results, blk, count > 0
page = 0
has_more = True
while has_more:
results, page, has_more = get_bulk(page)
for result in results:
yield result
page += 1
[docs] def close(self):
"""Closes the store.
Does nothing in this implementation.
"""
pass
[docs]def match_any(matcher, values):
"""Check if the matcher matches *any* of the supplied values.
:param matcher(regex pattern): compiled regular expression.
:param values(list): list of ``str`` values to match.
:returns: ``True`` if *any* of the ``str`` values matches (fullmatch) the matcher;
otherwise ``False``.
"""
for val in values:
if matcher.fullmatch(val):
return True
return False
[docs]def match_all(matchers, values):
"""Check if *all* matchers match any of the given values.
Each matcher *must* match at least one value of the list of values.
:param matchers(list): list of compiled regular expressions.
:param values(list): list of ``str`` to match
:returns: ``True`` only if *all* of the matchers have matched at least one value of the provided
list of values.
"""
for matcher in matchers:
if not match_any(matcher, values):
return False
return True
[docs]def create_store(db_url, verbose=False):
"""Creates new RDBSEventStore.
:param db_url(str): The database URL in SQLAlchemy form.
:param verbose(bool): ``True`` to show extended messages from the store.
:returns: RDBSEventStore object.
"""
engine = create_engine(db_url, echo=verbose)
Base.metadata.create_all(engine)
session_factory = sessionmaker(bind=engine)
return RDBSEventStore(session_factory)