Source code for theia.watcher

"""
-------------
theia.watcher
-------------

File watcher.

Watches files and directories for changes and emits the changes as events.
"""
from os.path import getsize, isfile, isdir, split as splitpath, realpath
from time import time
from uuid import uuid4

from watchdog.events import FileSystemEventHandler

from theia.model import Event


[docs]class FileSource: """Represents a source of events. The underlying file that is being watched does not have to exist at the moment of creation of this :class:`FileSource`. :param str file_path: the path to the file to be watched. :param function callback: the callback handler to be executed when the file is changed. The callback is called with the difference, the path to the file and the list of tags for this source. The method signature looks like this: .. code-block:: python def callback(diff, path, tags): pass where: * ``diff``, ``str`` is the difference from the last state of the file. Usually this is the content of the emitted event. * ``path``, ``str`` is the path to the file that has changed. Usually this is the ``source`` property of the event. * ``tags``, ``list`` is the list of tags associated with this event source. :param str enc: the file encoding. If not specified, ``UTF-8`` is assumed. :param list tags: list of tags associated with this source. """ def __init__(self, file_path, callback, enc='UTF-8', tags=None): self.path = file_path self.position = 0 self.callback = callback self.enc = enc self.tags = tags or [] self._setup()
[docs] def modified(self): """Triggers an execution of the callbacks when the file has been modified. Loads the difference from the source file and calls the registered callbacks. """ diff = self._get_diff() diff = diff.decode(self.enc) self.callback(diff, self.path, self.tags)
[docs] def moved(self, dest_path): """Called when the source file has been moved to another location. :param str dest_path: the target location of the file after the move. """ self.path = dest_path
[docs] def created(self): """Called when the file has actually been created. Does not trigger the callbacks. """ self.position = 0
[docs] def removed(self): """Called when the file has been removed. Does not triggers the callbacks. """ pass
def _get_diff(self): with open(self.path, 'rb') as source_file: source_file.seek(self.position) diff = source_file.read() self.position = source_file.tell() return diff def _setup(self): if isfile(self.path): self.position = getsize(self.path) elif isdir(self.path): raise Exception('Not a file: %s' % self.path)
[docs]class DirectoryEventHandler(FileSystemEventHandler): """Implements :class:`watchdog.events.FileSystemEventHandler` and is used with the underlying :class:`watchdog.observers.Observer`. Reacts on events triggered by the watchdog Observer and passes down to the registered handlers. The handlers are registered when creating the instance as a constructor argument. They must be specified as ``dict`` whose keys (``str``) are the names of the events and the entries are the event handlers themselves. An example of creating new :class:`DirectoryEventHandler`: .. code-block:: python def on_file_moved(src_path, dest_path): print("File has moved", src_path, "->", dest_path) event_handler = DirectoryEventHandler(handlers={ "moved": on_file_moved }) The following events are supported: * ``moved`` - handles the move of a file to another location. The handler takes two arguments: the source path and the destination path. The method signature looks like this: .. code-block:: python def moved_handler(src_path, dest_path): pass * ``created`` - handles file creation. The handler takes one argument: the path of the created file. .. code-block:: python def created_handler(file_path): pass * ``modified`` - handles file modification. The handler takes one argument: the path of the modified file. .. code-block:: python def created_handler(file_path): pass * ``deleted`` - handles file deletion. The handler takes one argument: the path of the deleted file. .. code-block:: python def created_handler(file_path): pass :param dict handlers: a ``dict`` of handlers for specific events. """ def __init__(self, handlers): self.handlers = handlers def _notify(self, event, *args): hnd = self.handlers.get(event) if hnd: hnd(*args)
[docs] def on_moved(self, event): self._notify('moved', event.src_path, event.dest_path)
[docs] def on_created(self, event): self._notify('created', event.src_path)
[docs] def on_deleted(self, event): self._notify('deleted', event.src_path)
[docs] def on_modified(self, event): self._notify('modified', event.src_path)
[docs]class SourcesDaemon: """Daemon that watches multiple sources for events. Uses :mod:`watchdog` to monitor files and directories for changes. This defaults to using ``inotify`` kernel subsystem on Linux systems, ``kqueue`` on MacOSX and BSD-like systems and ``ReadDirectoryChangesW`` on Windows. :param watchdog.observers.Observer observer: an instance of the :class:`watchdog.observers.Observer` to be used. :param theia.comm.Client client: a client to a theia collector server. :param list tags: initial list of default tags that are appended to every file source watched by this daemon. """ def __init__(self, observer, client, tags=None): self.sources = {} self.observer = observer self.client = client self.tags = tags or [] self.dir_handler = DirectoryEventHandler(handlers={ 'moved': self._moved, 'created': self._created, 'deleted': self._deleted, 'modified': self._modified }) self.observer.start()
[docs] def add_source(self, fpath, enc='UTF-8', tags=None): """Add source of events to be watched by this daemon. The path will be added as a file source and a list of tags will be associated with it. The default list of tags will be added to provided tags. :param str fpath: the path of the file to be watched. :param str enc: the file encoding. By default ``UTF-8`` is assumed. :param list tags: list of tags to be added to the events generated by this file source. """ def callback(diff, _, evtags): """Handle an event from a :class:`FileSource` and emit an event to the collector server. """ timestamp = time() event = Event(id=str(uuid4()), source=fpath, timestamp=timestamp, tags=evtags, content=diff) self.client.send_event(event) fsrc = FileSource(fpath, callback, enc, self._merge_tags(tags)) pdir, fname = self._split_path(fpath) files = self.sources.get(pdir) if not files: files = self.sources[pdir] = {} self.observer.schedule(self.dir_handler, pdir, recursive=False) if files.get(fname): return files[fname] = fsrc
def _merge_tags(self, tags): if self.tags is None and tags is None: return None return (self.tags or []) + (tags or [])
[docs] def remove_source(self, fpath): """Remove this path from the list of file event sources. All associated watchers and handlers are removed as well. :param str fpath: the path of the file to be removed from the watching list. """ pdir, fname = self._split_path(fpath) files = self.sources.get(pdir) if files and files.get(fname): del files[fname] if self.sources.get(pdir) is not None and not self.sources[pdir]: del self.sources[pdir]
@staticmethod def _split_path(fpath): fpath = realpath(fpath) return splitpath(fpath) def _get_file_source(self, src_path): pdir, fname = self._split_path(src_path) files = self.sources.get(pdir) if files: return files.get(fname) return None def _moved(self, src_path, dest_path): pdir, fname = self._split_path(src_path) files = self.sources.get(pdir) if files and files.get(fname): fsrc = files[fname] del files[fname] ndir, nfname = self._split_path(dest_path) if not self.sources.get(ndir): self.sources[ndir] = {} self.sources[ndir][nfname] = fsrc fsrc.moved(dest_path) def _created(self, src_path): fsrc = self._get_file_source(src_path) if fsrc: fsrc.created() def _deleted(self, src_path): fsrc = self._get_file_source(src_path) if fsrc: fsrc.removed() def _modified(self, src_path): fsrc = self._get_file_source(src_path) if fsrc: fsrc.modified()