Edit Page

Change streams

RESTHeart Cloud
Note
Change streams can be consumed via WebSocket or SSE. This page covers the stream definition format. For the SSE transport see SSE API Overview.

The "streams" collection metadata

In RESTHeart, not only documents but also databases and collections have properties. Some properties are metadata, i.e. they have a special meaning for RESTheart that influences its behavior.

The collection metadata property streams allows to declare change streams that client can watch for to be aware about changes to documents and bind them to given URI.

Change streams need to be defined as collection metadata. This way clients are not able to open up arbitrary change streams but only those defined (and tested) by the developers.

streams is an array of stream definitions.

Stream definition format

{ "streams": [
    {
        "uri": <uri>,
        "stages": [
            "<stage_1>",
            "<stage_2>",
            ...
        ]
    }
]}
Property Description Mandatory

uri

specifies the URI when the stream is bound under the path /<db>/<collection>/_streams

yes

stages

the MongoDB aggregation pipeline stages. For more information refer to Aggregation Pipeline on MongoDb documentation.

yes

Note
Stages take as input Change Events instead of the documents. For example, the modified version of a document after a PATCH request is present at event.fullDocument property of the stages input event. (See examples).

Optional Stages

The aggregation of the stream can include optional stages. These optional stages are only executed when one or more variables are specified.

For a more comprehensive understanding of how to use optional stages, please refer to the Aggregation documentation.

Escaped stage properties

MongoDB (up to v5) does not allow to store fields with names starting with $ or containing dots (.).

To store stages with dollar prefixed operators or use the dot notation, RESTHeart automatically and transparently escapes the properties keys as follows:

  • the $ prefix is "underscore escaped", e.g. $exists is stored as _$exists

  • if the dot notation has to be used in a key name, dots are replaced with :: e.g. SD.prop is stored as SD::prop

Per-client filtering: notify_when

Available from RESTHeart 9.4.

notify_when is an optional field in a stream definition that filters which connected clients receive each change event, without requiring a separate MongoDB cursor per client.

{
  "streams": [{
    "uri": "by-tenant",
    "stages": [
      { "_$match": { "operationType": { "_$in": ["insert", "update"] } } }
    ],
    "notify_when": {
      "fullDocument::tenantId": { "$var": "tid" }
    }
  }]
}

The client passes its variable value as a query parameter when connecting:

# SSE — only receives events where fullDocument.tenantId == "acme"
curl -N \
  -H 'Accept: text/event-stream' \
  -H 'Authorization: Basic YWRtaW46c2VjcmV0' \
  'http://localhost:8080/mydb/mycoll/_streams/by-tenant?tid=acme'

Field path notation

The notify_when key uses :: to separate the top-level change event field from the dotted path within it:

"fullDocument::tenantId"       # fullDocument.tenantId
"fullDocument::org.department" # fullDocument.org.department (nested)
"documentKey::_id"             # documentKey._id

Supported predicates

Form Description Example

Scalar equality

Field value (string) equals the bound variable

{ "fullDocument::tenantId": { "$var": "tid" } }

Array membership

Bound variable value is contained in a string array field

{ "fullDocument::recipients": { "$var": "userId" } }

Design notes

  • stages are sent to MongoDB and applied once on the shared cursor. Use them for fixed-value conditions (e.g. operationType, fullDocument.status).

  • notify_when is evaluated server-side per event × per connected client. Use it for conditions that depend on per-client variables.

  • If notify_when is absent, all clients receive all events (broadcast — existing behaviour).

  • If a client connects without the required query parameter, it receives all events (pass-through).

Last-Event-ID limitation

When notify_when is defined, Last-Event-ID is ignored on reconnect and a warning is logged. The shared cursor cannot be rewound for a single client; reconnecting clients should recover missed events via the REST API.

See SSE API Overview for details on the SSE transport and event format.