Change streams
RESTHeart Cloud|
Note
|
Change streams can be consumed via WebSocket or SSE. This page covers the stream definition format. For the SSE transport see SSE API Overview. |
The "streams" collection metadata
In RESTHeart, not only documents but also databases and collections have properties. Some properties are metadata, i.e. they have a special meaning for RESTheart that influences its behavior.
The collection metadata property streams allows to declare change streams that client can watch for to be aware about changes to documents and bind them to given URI.
Change streams need to be defined as collection metadata. This way clients are not able to open up arbitrary change streams but only those defined (and tested) by the developers.
streams is an array of stream definitions.
Stream definition format
{ "streams": [
{
"uri": <uri>,
"stages": [
"<stage_1>",
"<stage_2>",
...
]
}
]}
| Property | Description | Mandatory |
|---|---|---|
uri |
specifies the URI when the stream is bound under the path |
yes |
stages |
the MongoDB aggregation pipeline stages. For more information refer to Aggregation Pipeline on MongoDb documentation. |
yes |
|
Note
|
Stages take as input Change Events instead of the documents. For example, the modified version of a document after a PATCH request is present at event.fullDocument property of the stages input event. (See examples).
|
Optional Stages
The aggregation of the stream can include optional stages. These optional stages are only executed when one or more variables are specified.
For a more comprehensive understanding of how to use optional stages, please refer to the Aggregation documentation.
Escaped stage properties
MongoDB (up to v5) does not allow to store fields with names starting with $ or
containing dots (.).
To store stages with dollar prefixed operators or use the dot notation, RESTHeart automatically and transparently escapes the properties keys as follows:
-
the
$prefix is "underscore escaped", e.g.$existsis stored as_$exists -
if the dot notation has to be used in a key name, dots are replaced with
::e.g.SD.propis stored asSD::prop
Per-client filtering: notify_when
Available from RESTHeart 9.4.
notify_when is an optional field in a stream definition that filters which connected clients receive each change event, without requiring a separate MongoDB cursor per client.
{
"streams": [{
"uri": "by-tenant",
"stages": [
{ "_$match": { "operationType": { "_$in": ["insert", "update"] } } }
],
"notify_when": {
"fullDocument::tenantId": { "$var": "tid" }
}
}]
}
The client passes its variable value as a query parameter when connecting:
# SSE — only receives events where fullDocument.tenantId == "acme"
curl -N \
-H 'Accept: text/event-stream' \
-H 'Authorization: Basic YWRtaW46c2VjcmV0' \
'http://localhost:8080/mydb/mycoll/_streams/by-tenant?tid=acme'
Field path notation
The notify_when key uses :: to separate the top-level change event field from the dotted path within it:
"fullDocument::tenantId" # fullDocument.tenantId
"fullDocument::org.department" # fullDocument.org.department (nested)
"documentKey::_id" # documentKey._id
Supported predicates
| Form | Description | Example |
|---|---|---|
Scalar equality |
Field value (string) equals the bound variable |
|
Array membership |
Bound variable value is contained in a string array field |
|
Design notes
-
stagesare sent to MongoDB and applied once on the shared cursor. Use them for fixed-value conditions (e.g.operationType,fullDocument.status). -
notify_whenis evaluated server-side per event × per connected client. Use it for conditions that depend on per-client variables. -
If
notify_whenis absent, all clients receive all events (broadcast — existing behaviour). -
If a client connects without the required query parameter, it receives all events (pass-through).
Last-Event-ID limitation
When notify_when is defined, Last-Event-ID is ignored on reconnect and a warning is logged. The shared cursor cannot be rewound for a single client; reconnecting clients should recover missed events via the REST API.
See SSE API Overview for details on the SSE transport and event format.