"""
-----------------
theia.cli.watcher
-----------------
Theia file watcher command line interface.
"""
[docs]def get_parser(subparsers):
"""Configures the subparser for the ``watcher`` command.
:param argparse.ArgumentParser subparser: subparser for commands.
:returns: :class:`argparse.ArgumentParser` configured for the ``watcher`` command.
"""
parser = subparsers.add_parser('watch', help='Watcher daemon')
parser.add_argument('-c', '--collector-server', help='Collector server name (or IP)')
parser.add_argument('-p', '--collector-port', default=6433, type=int, help='Collector port')
parser.add_argument('--secure', action='store_true', help='Secure connection to collector')
parser.add_argument('-f', '--files', nargs='*', dest='files',
help='List of files to watch for changes')
parser.add_argument('-d', '--directory', nargs='*', dest='dirs',
help='List of directories to whose files are watched for changes')
parser.add_argument('-t', '--tags', nargs='*', dest='tags', help='List of tags')
return parser
[docs]def run_watcher(args):
"""Runs the watcher.
:param argparse.Namespace args: the parsed arguments passed to the CLI.
"""
import socket
import asyncio
import signal
from theia.comm import Client
from watchdog.observers import Observer
from theia.watcher import SourcesDaemon
hostname = socket.gethostname()
loop = asyncio.get_event_loop()
client = Client(loop=loop, host=args.collector_server,
port=args.collector_port, path='/event', secure=args.secure)
client.connect()
daemon = SourcesDaemon(observer=Observer(), client=client, tags=[hostname])
for file_path in args.files:
daemon.add_source(fpath=file_path, tags=args.tags)
loop.add_signal_handler(signal.SIGHUP, loop.stop)
loop.add_signal_handler(signal.SIGINT, loop.stop)
loop.add_signal_handler(signal.SIGTERM, loop.stop)
loop.run_forever()
loop.close()