Connector Websocket API¶
Theia collector is the main process that aggregates events from multiple sources. This is a WebSocket socket server that exposes endpoints for pushing and retrieving events. When retrieving events, the client must always supply a filter.
All exposed APIs are designed as event streams - i.e you can push event at any time reusing the same websocket without specifying the number of events that will be pushed. Similarly when retrieving events, you will receive the filter results without notification of the number of total matching events. The client will receive an event data message asynchronously - the only guarantee is that the event message itself is consistent (you won’t receive a half message, either the full event passes through or none of it).
The collector exposes a real-time event stream as well that pushes events that match a certain filter criteria back to the client.
Event model¶
Each event is a textual message consisting of the following properties:
id
- the event ID. This value is unique across the whole system.timestamp
- the time (and) date when the event was created. This is a UNIX-like timestamp, but may contain nanoseconds info.source
- the source of the event. It may be the path of the log file being watched for changes, name of the sensor generating the event etc.tags
- comma separated list of values acting as tags. Example:web,httpd,access-log,server1,datacenter3
content
- the actual content of the event. Plain text.
Event Format¶
The events are plain text strings. Here is an example of an event
id:331c531d-6eb4-4fb5-84d3-ea6937b01fdd
timestamp: 1509989630.6749051
source:/dev/sensors/door1-sensor
tags:sensors,home,doors,door1
Door has been unlocked.
The basic layout of an event looks like this
id:<id-string>
timestamp:<unix-timestamo>.<nanos>
source:<event-source>
tags:<comma-separated-tags>
<content>
The events are expected to be UTF-8 encoded.
One event has two main parts: the header and the event content. The header has at least 4 lines: id, timestamp, source and tags. These are always present and always start with the line type then a colon :, then the header value (no empty spaces). If the header value is empty, then only the header name will be present plus the colon:
id:331c531d-6eb4-4fb5-84d3-ea6937b01fdd
timestamp: 1509989630.6749051
source:/dev/sensors/door1-sensor
tags:
Door has been unlocked.
Custom header fields may be added later on, but the current implementation supports and understands only the above four headers. The custom fields are not processed in any way, but are stored and returned to the clients. Currently the header lines order is not guaranteed to be preserved. Each header MUST be on a single line. It the client sends multiline value for a header, the collector will accept the value up until the line end and will ignore the rest.
The content starts after the last header and is separated by a single newline character.
Collector WebScoket API Endpoints¶
/event
- Push (Add) Event¶
Open channel to push events.
- Path:
/event
- Params: None
- Message Payload: Serialized Event string
- Response: None
Example of sending events to the collector (using wscat):
wscat -c ws://localhost:6433/event
connected (press CTRL+C to quit)
> event: 110 108 2\nid:d55507cc-3530-47c1-913d-d07db6cfebea\ntimestamp: 1531528042.9037790\nsource:/dev/sensors/temp0\ntags:sensor\n32\n
>
/find
- Find events matching criteria¶
Endpoint params
- Path:
/find
- Params: None
- Message Payload: first message body must be Criteria JSON.
- Response: Event stream
Opens channel to find events that match some criteria. The events are pushed from the collector to the client (on the incoming port of the web socket). The client should only post one message containing the filter criteria by which to match the events. This looks up only persisted events and will not return events in real-time, only those received before this channel was opened. Once all events have been send back to the client, the collector will close the websocket connection to the client.
- The criteria message must a JSON string with the following format:
{ "start": int, "end": int, "tags": ["string regex",...], "content": "string regex", "order": "asc|desc" }
Where:
start
-int
, optional: match events after this timestamp (UNIX).end
-int
, optional: match events before this timestamp (UNIX).tags
- array ofstring
, optional: match the events matching any of the supplied tags. The values are processed as regular expressions.content
-string
regular expression, optional: match the eventa with content matching to the supplied content regex.order
-string
one ofasc
ordesc
, optinal: sort order for the result. The sort is performed by the event timestamp. By default it returns the events in ascending order (asc
) which means earlier events are returned first.
Example
Match all events after a timestamp that have a tag log
on any web-server
and contain [ERROR]
:
{ "start": 1527283299, "tags": ["log", "web-server-.+"], "content": ".*\[ERROR\].*" }
Example (assuming the collector runs on localhost):
wscat -c ws://localhost:6433/find
connected (press CTRL+C to quit)
> {"start": 1531528038}
< ok
< event: 110 108 2
id:2ca00a2a-d70f-4617-b48f-a31716b1d5dc
timestamp: 1531528038.8951149
source:/dev/sensors/temp0
tags:sensor
32
Notice that the first result is the string ok
- this means that the query was successfully processed by the server.
The next messages are the matched events. Every serialized event message always ends in a newline.
/live
- Real-time event stream¶
Endpoint params
- Path:
/live
- Params: None
- Message Payload: first message body must be Criteria JSON.
- Response: Event stream
Opens channel to monitor for events matching a certain criteria. The client can open a channel to the collector to monitor for incoming events that match the client criteria. This endpoint will not lookup events in the persistent storage, but matches only the events coming to the collector after the channel was opened.
The collector does not close this channel. If a timeout occurs due to inactivity, then the client must initiate new websocket connection.
The first message sent to the collector after establishing the channel must be the filter criteria object serialized as JSON string.
- The criteria object has the following format:
{ "id": "string regex", "start": int, "end": int, "tags": ["string regex",...], "source": "string regex", "content": "string regex" }
Where:
id
-string
regular expression, optional: match any event whichid
matches the provided regular expression.start
-int
, optional: match events after this timestamp (UNIX).end
-int
, optional: match events before this timestamp (UNIX).tags
- array ofstring
, optional: match the events matching any of the supplied tags. The values are processed as regular expressions.source
-string
regular expression, optional: match any event whichsource
matches the provided regular expression.content
-string
regular expression, optional: match the eventa with content matching to the supplied content regex.
Example
Match all events after a timestamp that have a tag log
on any web-server
and contain [ERROR]
from the /var/log
files (source):
{ "start": 1527283299, "tags": ["log", "web-server-.+"], "content": ".*\[ERROR\].*", "source": "/var/log/.+" }
Example using wscat
(assuming the collector runs on localhost):
wscat -c ws://localhost:6433/live
connected (press CTRL+C to quit)
> {"start": 1531528038}
< ok
< event: 110 108 2
id:2ca00a2a-d70f-4617-b48f-a31716b1d5dc
timestamp: 1531528038.8951149
source:/dev/sensors/temp0
tags:sensor
32
Simple event parser and serializer in JavaScript¶
- An event parser and serialized in JavaScript.
function parseEvent(event_str) { let event = {} let lines = event_str.split('\n') for (var i = 0; i < lines.length; i++) { let line = lines[i] let idx = line.indexOf(':') if (idx < 0) { break } let prop = line.slice(0, idx); let value = line.slice(idx+1, line.length); if (prop == 'tags') { value = value.split(',').filter( t => { return t; }); } event[prop] = value } if (i < lines.length) { event.content = lines.slice(i, lines.length).join('\n') } return event } function serializeEvent(event) { let event_str = '' let guaranteed = ['id', 'timestamp', 'source', 'tags'] for (var i = 0; prop = guaranteed[i]; i++) { let value = event[prop]; if (prop == 'tags') { value = value.join(','); } event_str += prop + ':' + value + '\n'; } for (var prop in event) { // add custom headers if (!guaranteed.includes(prop) && prop != 'content') { event_str += prop + ':' + event[prop] + '\n'; } } event_str += event.content; return event_str } var event_str = ['id:331c531d-6eb4-4fb5-84d3-ea6937b01fdd', 'timestamp: 1509989630.6749051', 'source:/dev/sensors/door1-sensor', 'tags:sensors,home,doors,door1', 'x-header:somevalue', 'Door has been unlocked.'].join('\n') var event = parseEvent(event_str); console.log(event) // prints: // { id: '331c531d-6eb4-4fb5-84d3-ea6937b01fdd', // timestamp: ' 1509989630.6749051', // source: '/dev/sensors/door1-sensor', // tags: [ 'sensors', 'home', 'doors', 'door1' ], // 'x-header': 'somevalue', // content: 'Door has been unlocked.' } var serialized = serializeEvent(event); console.log(serialized); // prints: // id:331c531d-6eb4-4fb5-84d3-ea6937b01fdd // timestamp: 1509989630.6749051 // source:/dev/sensors/door1-sensor // tags:sensors,home,doors,door1 // x-header:somevalue // Door has been unlocked.