Change Streams
Introduction
If the stream all
is defined on the collection messages
, clients can connect via WebSocket to ws://mydomain.com/messages/_streams/all
and receive real time notification of data changes occurring in the collection.
Modern web applications needs to react with promptness and efficiency to data changes in many contexts. RESTHeart Change Stream feature comes in handy to achieve this goal.
“Change streams allow applications to access real-time data changes. […] Because change streams use the aggregation framework, applications can also filter for specific changes.”
Exposing a WebSocket Server resource, clients may be promptly notified about these changes only if necessary, avoiding network expensive common practices like polling.
Change streams require at least MongoDB v3.6 configured as a Replica Set.
Starting from RESTHeart 5.3.0, when the stream
collection metadata is modified or the collection or the db is deleted, all related WebSocket connections are closed and the change streams are consequently updated.
With RESTHeart up to 5.2 a restart of the server is required after modifying a stream definition, or deleting its collection, in order to disconnect all clients.
The streams collection metadata
In RESTHeart, not only documents but also dbs and collections have properties. Some properties are metadata, i.e. they have a special meaning for RESTheart that influences its behavior.
The collection metadata property streams
allows to declare change streams that client can watch for to be aware about changes to documents and bind them to given URI.
Change streams need to be defined as collection metadata. It is not possible to define a stream via a query parameter and this is by design: clients are not able to open up arbitrary change streams but only those defined (and tested) by the developers.
streams
is an array of stream objects.
Stream metadata object format
stream object format
{
"uri": <uri>,
"stages": [
"<stage_1>",
"<stage_2>",
...
]
}
Property | Description | Mandatory |
---|---|---|
uri | specifies the URI when the stream is bound under the path /<db>/<collection>/_streams |
yes |
stages | the MongoDB aggregation pipeline stages. For more information refer to https://docs.mongodb.org/manual/core/aggregation-pipeline/ |
yes |
Notes:
- Only a subset of aggregation pipeline stages are allowed for this features. Check MongoDB’s documentation for further informations.
- Stages takes as input Change Events instead of the documents themselves. For example, the modified version of a document after a PATCH request is present at event.fullDocument property of the stages input event. (See examples below).
Escape stage properties informations
MongoDB does not allow to store fields with names starting with $ or containing dots (.), see Restrictions on Field Names on MongoDB’s documentation.
In order to allow storing stages with dollar prefixed operators or using the dot notation (to refer to properties of subdocuments), RESTHeart automatically and transparently escapes the properties keys as follows:
- the $ prefix is “underscore escaped”, e.g.
$exists
is stored as_$exists
- if the dot notation has to be used in a key name, dots are replaced
with :: e.g.
SD.prop
is stored asSD::prop
In RESTHeart 1.x, these escapes are not managed automatically: the developer had to explicitly use them; starting from version 2.0 this is not needed anymore.
Examples
The following requests upsert a collection defining two change streams:
- all bound at
/messages/_streams/all
- mine bound at
/messages/_streams/mine
PUT /messages HTTP/1.1
{
"streams" : [
{ "stages" : [
{
"_$match": {
"_$or" : [
{
"operationType": "insert"
},
{
"operationType": "update"
}
]
}
}
],
"uri" : "all"
},
{ "stages" : [
{
"_$match" : {
"fullDocument::name" : { "_$var" : "n" }
}
}
],
"uri" : "mine"
}
]
}
Note that the $match
stage specifies a condition on the name
property using fullDocument::name
.
This is because the Change Event looks like:
{
"fullDocument": {
"_id": {
"$oid": "5e15ff5779ca449eb20fdd09"
},
"message": "hi uji, how are you?",
"name": "uji",
"_etag": {
"$oid": "5e15ff57a2e5700c3459e801"
}
},
"documentKey": {
"_id": {
"$oid": "5e15ff5779ca449eb20fdd09"
}
},
"updateDescription": null,
"operationType": "insert"
}
Note between the _links
collection property the URIs of the
change streams (returned with ?rep=SHAL
).
GET /messages?rep=SHAL HTTP/1.1
HTTP/1.1 200 OK
...
{
...
"_links": {
...,
"all": {
"href": "/messages/_streams/all"
},
"mine": {
"href": "/messages/_streams/mine"
}
},
...
}
Passing variables to change streams
The query parameter avars
allows to pass variables to the change stream.
For example, the previous example mine use a variable named
“n”. If the variable is not passed via the avars
qparam, the request
fails.
GET /messages/_streams/mine HTTP/1.1
HTTP/1.1 400 Bad Request
...
{
"_exceptions": [
{
"exception": "org.restheart.exchange.QueryVariableNotBoundException",
"exception message": "variable n not bound",
...
}
]
}
Passing the variable n, the request succeeds:
GET /messages/_streams/mine?avars={"n":"uji"} HTTP/1.1
HTTP/1.1 101 Switching Protocols
...
Variables in stages or query
Variables can be used in change streams query as follows:
{ "$var": "<var_name>" }
In case of change stream with stage parameter previous example, the variable was used to restrict notifications only to changes on documents with a property name matching the variable n:
{ "_$match": { "fullDocument::name": { "_$var": "n" } } }
Security Informations
By default RESTHeart makes sure that the aggregation variables passed as query parameters hasn’t got inside MongoDB operators.
This behavior is required to protect data from undesirable malicious query injection.
Even though is highly discouraged, is possible to disable this check by editing the following property into restheart.yml
configuration file.
### Security
# Check if aggregation variables use operators. allowing operators in aggregation variables
# is risky. requester can inject operators modifying the query
aggregation-check-operators: true