Change Streams

Introduction

Modern web applications needs to react with promptness and efficiency to data changes in many contexts.

RESTHeart PRO Change Stream feature comes in handy to achieve this goal. Exposing a websocket server endpoint, every RFC 6455-compliant or JSR-356-compliant Websocket client can be promptly notified about these changes only if necessary, avoiding network expensive common practices like polling.

“Change streams allow applications to access real-time data changes. […] Because change streams use the aggregation framework, applications can also filter for specific changes.”

Exposing a WebSocket Server resource, clients may be promptly notified about these changes only if necessary, avoiding network expensive common practices like polling.

Multi-document transaction requires at least MongoDB v3.6 configured as a Replica Set.

The streams collection metadata

In RESTHeart, not only documents but also dbs and collections have properties. Some properties are metadata, i.e. they have a special meaning for RESTheart that influences its behavior.

The collection metadata property streams allows to declare change streams that client can watch for to be aware about changes to documents and bind them to given URI.

Change streams need to be defined as collection metadata. It is not possible to define a stream via a query parameter and this is by design: clients are not able to open up arbitrary change streams but only those defined (and tested) by the developers.

streams is an array of stream objects.

Stream metadata object format

stream object format

{
    "uri": <uri>,
    "stages": [
        "<stage_1>",
        "<stage_2>",
        ...
    ]
}
Property Description Mandatory
uri specifies the URI when the stream is bound under the path /<db>/<collection>/_streams yes
stages

the MongoDB aggregation pipeline stages.

For more information refer to https://docs.mongodb.org/manual/core/aggregation-pipeline/

yes

Notes:

  • Only a subset of aggregation pipeline stages are allowed for this features. Check MongoDB’s documentation for further informations.
  • Stages takes as input Change Events instead of the modified documents itselves. For example, the modified version of a document after a PATCH request is present at event.fullDocument property of the stages input event. (See examples below).

Escape stage properties informations

MongoDB does not allow to store fields with names starting with $ or containing dots (.), see Restrictions on Field Names on MongoDB’s documentation.

In order to allow storing stages with dollar prefixed operators or using the dot notation (to refer to properties of subdocuments), RESTHeart automatically and transparently escapes the properties keys as follows:

  • the $ prefix is “underscore escaped”, e.g. $exists is stored as _$exists
  • if the dot notation has to be used in a key name, dots are replaced with :: e.g. SD.prop is stored as SD::prop

In RESTHeart 1.x, these escapes are not managed automatically: the developer had to explicitly use them; starting from version 2.0 this is not needed anymore.

Examples

The following requests upsert a collection defining two change streams:

  • test_stream bound at /cs_test/_streams/test_stream
  • test_stream_with_stage_params bound at /cs_test/_streams/test_stream_with_stage_params
Request
PUT /cs_test HTTP/1.1

{ 
    "streams" : [ 
      { "stages" : [],
        "uri" : "test_stream"
      },
      { "stages" : [ 
          { "_$match" : { 
              "fullDocument::name" : { "_$var" : "n" } 
              } 
          }
        ],
        "uri" : "test_stream_with_stage_params"
      }
    ] 
}

Note that the $match stage specifies a condition on the name property using fullDocument::name. This is because the Change Event looks like:


Note between the _links collection property the URIs of the change streams (returned with ?rep=SHAL).

Request
GET /cs_test?rep=SHAL HTTP/1.1
Response
HTTP/1.1 200 OK

...

{
    ...

    "_links": {
        ...,
        "test_stream": {
            "href": "/cs_test/_streams/test_stream"
        },
        "test_stream_with_stage_params": {
            "href": "/cs_test/_streams/test_stream_with_stage_params"
        }
    },

    ...

}

Passing variables to change streams

The query parameter avars allows to pass variables to the change stream.

For example, the previous example test_stream_with_stage_params use a variable named “n”. If the variable is not passed via the avars qparam, the request fails.

Request
GET /cs_test/_streams/test_stream_with_stage_params HTTP/1.1
Response
HTTP/1.1 400 Bad Request

...

{
    "_exceptions": [
        {
            "exception": "org.restheart.hal.metadata.QueryVariableNotBoundException", 
            "exception message": "variable n not bound", 
            ...
        }
    ]
}

Passing the variable n, the request succeeds:

Request
GET /cs_test/_streams/test_ap?avars={"n":1} HTTP/1.1
Response
HTTP/1.1 101 Switching Protocols

...

Variables in stages or query

Variables can be used in change streams query as follows:

{ "$var": "<var_name>" }

In case of change stream with stage parameter previous example, the variable was used to restrict notifications only to changes on documents with a property name matching the variable n:

{ "_$match" : { "fullDocument::name" : { "_$var" : "n" } } }

Security Informations

By default RESTHeart makes sure that the aggregation variables passed as query parameters hasn’t got inside MongoDB operators.

This behavior is required to protect data from undesirable malicious query injection.

Even though is highly discouraged, is possible to disable this check by editing the following property into restheart.yml configuration file.

### Security

# Check if aggregation variables use operators. allowing operators in aggregation variables 
# is risky. requester can inject operators modifying the query

aggregation-check-operators: true