"""
---------------
theia.cli.query
---------------
Theia query command line interface.
"""
import asyncio
import signal
from io import BytesIO
from datetime import datetime
from theia.query import Query
from theia.model import EventParser
[docs]def get_parser(subparsers):
"""Configures the subparser for the ``query`` command.
:param argparse.ArgumentParser subparser: subparser for commands.
:returns: :class:`argparse.ArgumentParser` configured for the ``query`` command.
"""
parser = subparsers.add_parser('query', help='Query for events')
parser.add_argument('-l', '--live', action='store_true', dest='live',
help='Filter events in real time (live).')
parser.add_argument('-F', '--format-output', dest='o_format',
default='{timestamp:15} [{source:10}] {tags:15}: {content}',
metavar='FORMAT_STRING', help='Event output format string.' +
'Available properties are: id, tags, source, timestamp and content.')
parser.add_argument('-T', '--format-timestamp', dest='o_ts_format',
default='%Y-%m-%d %H:%M:%S.%f%Z', metavar='DATE_FORMAT_STRING',
help='Timestamp strftime compatible format string')
parser.add_argument('--close-timeout', dest='close_timeout', default=10, type=int,
help='Time (in milliseconds) given to the connecting websocket to ' +
'actually close the connection after receiving close frame form the server. ' +
'The default is 10ms.')
# Add the filter arguments
_setup_filter_arguments(parser)
return parser
def _setup_filter_arguments(parser):
"""Sets up the filters for the theia query.
:param argparse.ArgumentParser parser: the ``query`` command parser.
"""
parser.add_argument('--id', dest='f_id', metavar='PATTERN',
default=None, help='Filter by event id')
parser.add_argument('-s', '--source', dest='f_source', metavar='PATTERN',
default=None, help='Filter by event source')
parser.add_argument('-a', '--after', dest='f_after', metavar='TIMESTAMP',
default=0, type=int, help='Match events after this timestamp')
parser.add_argument('-b', '--before', dest='f_before', metavar='TIMESTAMP',
default=0, type=int, help='Match events before this timestamp')
parser.add_argument('-c', '--content', dest='f_content', metavar='PATTERN',
default=None, help='Match event content')
parser.add_argument('-t', '--tags', nargs='*', dest='f_tags',
metavar='PATTERN', help='Match any of the tags')
parser.add_argument('-o', '--order', dest='f_order', default='asc', metavar='ORDERING',
help='Order of results (asc or desc). Valid only for "find".')
_CRITERIA_MAP = {
'source': 'f_source',
'id': 'f_id',
'start': 'f_after',
'end': 'f_before',
'content': 'f_content',
'tags': 'f_tags'
}
def _to_criteria(args):
"""Builds a criteria ``dict`` based on the parser arguments.
"""
criteria = {}
for cr_field, arg_field in _CRITERIA_MAP.items():
if hasattr(args, arg_field) and getattr(args, arg_field):
criteria[cr_field] = getattr(args, arg_field)
if not args.live:
criteria['order'] = args.f_order
return criteria
[docs]def event_printer(event_format, time_format, parser):
"""Builds an event printer callback with the given event format and alternative
date-time format.
:param str event_format: the event format string.
:param str time_format: alternative format for the event timestamp.
:param theia.model.EventParser: event parser
:returns: event printer that takes an event data and formats is based on the
above formats.
"""
def print_event(evdata):
"""Formats and prints the event data.
:param evdata: the received event data.
"""
if isinstance(evdata, str):
print(evdata)
return
try:
event = parser.parse_event(BytesIO(evdata))
print(format_event(event, event_format, time_format))
except Exception as e:
print('> Failed to show event', e)
try:
print('Value received: ', evdata.decode('UTF-8'))
except:
print('Raw data:', evdata)
return print_event
[docs]def run_query(args):
"""Configures and runs a :class:`theia.query.Query`.
:param argparse.Namespace args: parsed command-line arguments.
"""
loop = asyncio.get_event_loop()
query = Query(host=args.server_host, port=args.port, loop=loop)
criteria_filter = _to_criteria(args)
result = None
parser = EventParser('UTF-8')
if args.live:
result = query.live(criteria_filter, event_printer(args.o_format,
args.o_ts_format,
parser))
else:
result = query.find(criteria_filter, event_printer(args.o_format,
args.o_ts_format,
parser))
def clean_stop(*arg):
"""Perform a clean stop - wait a bit for the tasks on the loop to finish.
"""
# delay the stop a bit, give it chance to actually close the loop
loop.call_later(args.close_timeout/1000, loop.stop)
result.when_closed(clean_stop)
loop.add_signal_handler(signal.SIGHUP, loop.stop)
loop.add_signal_handler(signal.SIGINT, loop.stop)
loop.add_signal_handler(signal.SIGTERM, loop.stop)
loop.run_forever()
loop.close()