Change Streams

Introduction

Modern web applications needs to react with promptness and efficiency to data changes in many contexts. RESTHeart Platform Change Stream feature comes in handy to achieve this goal.

“Change streams allow applications to access real-time data changes. […] Because change streams use the aggregation framework, applications can also filter for specific changes.”

Exposing a WebSocket Server resource, clients may be promptly notified about these changes only if necessary, avoiding network expensive common practices like polling.

Change streams require at least MongoDB v3.6 configured as a Replica Set.

Always restart the server after modifying a stream definition, or deleting its collection, to disconnect all clients.

NOTE: to get Change Streams working properly HTTP listener must be enabled. This is done by default by using our Docker images.

If you want to use this feature while running RESTHeart Platform manually the following settings to resheart-platform-security are needed:


## restheart-platform-security/etc/security.properties
    root-proxy-pass=http://localhost:8081
    ## NOTE: change streams require HTTP (AJP doesn't support WebSocket)
    ## enable http listener in restheart-platform-core
    ##\u00a0and set root-proxy-pass=http://localhost:8081
    

The streams collection metadata

In RESTHeart, not only documents but also dbs and collections have properties. Some properties are metadata, i.e. they have a special meaning for RESTheart that influences its behavior.

The collection metadata property streams allows to declare change streams that client can watch for to be aware about changes to documents and bind them to given URI.

Change streams need to be defined as collection metadata. It is not possible to define a stream via a query parameter and this is by design: clients are not able to open up arbitrary change streams but only those defined (and tested) by the developers.

streams is an array of stream objects.

Stream metadata object format

stream object format

{
    "uri": <uri>,
    "stages": [
        "<stage_1>",
        "<stage_2>",
        ...
    ]
}
Property Description Mandatory
uri specifies the URI when the stream is bound under the path /<db>/<collection>/_streams yes
stages

the MongoDB aggregation pipeline stages.

For more information refer to https://docs.mongodb.org/manual/core/aggregation-pipeline/

yes

Notes:

  • Only a subset of aggregation pipeline stages are allowed for this features. Check MongoDB’s documentation for further informations.
  • Stages takes as input Change Events instead of the modified documents itselves. For example, the modified version of a document after a PATCH request is present at event.fullDocument property of the stages input event. (See examples below).

Escape stage properties informations

MongoDB does not allow to store fields with names starting with $ or containing dots (.), see Restrictions on Field Names on MongoDB’s documentation.

In order to allow storing stages with dollar prefixed operators or using the dot notation (to refer to properties of subdocuments), RESTHeart automatically and transparently escapes the properties keys as follows:

  • the $ prefix is “underscore escaped”, e.g. $exists is stored as _$exists
  • if the dot notation has to be used in a key name, dots are replaced with :: e.g. SD.prop is stored as SD::prop

In RESTHeart 1.x, these escapes are not managed automatically: the developer had to explicitly use them; starting from version 2.0 this is not needed anymore.

Examples

The following requests upsert a collection defining two change streams:

  • all bound at /messages/_streams/all
  • mine bound at /messages/_streams/mine
Request
PUT /messages HTTP/1.1

{ 
    "streams" : [ 
      { "stages" : [
          {
              "_$match": {
                "_$or" : [
                    { 
                        "operationType": "insert"
                    },
                    { 
                        "operationType": "update"
                    }
                ]
            }
          }
      ],
        "uri" : "all"
      },
      { "stages" : [ 
          { 
              "_$match" : { 
                "fullDocument::name" : { "_$var" : "n" } 
              } 
          }
        ],
        "uri" : "mine"
      }
    ] 
}

Note that the $match stage specifies a condition on the name property using fullDocument::name. This is because the Change Event looks like:

{
    "fullDocument": {
        "_id": {
            "$oid": "5e15ff5779ca449eb20fdd09"
        },
        "message": "hi uji, how are you?",
        "name": "uji",
        "_etag": {
            "$oid": "5e15ff57a2e5700c3459e801"
        }
    },
    "documentKey": {
        "_id": {
            "$oid": "5e15ff5779ca449eb20fdd09"
        }
    },
    "updateDescription": null,
    "operationType": "insert"
}

Note between the _links collection property the URIs of the change streams (returned with ?rep=SHAL).

Request
GET /messages?rep=SHAL HTTP/1.1
Response
HTTP/1.1 200 OK

...

{
    ...

    "_links": {
        ...,
        "all": {
            "href": "/messages/_streams/all"
        },
        "mine": {
            "href": "/messages/_streams/mine"
        }
    },

    ...

}

Passing variables to change streams

The query parameter avars allows to pass variables to the change stream.

For example, the previous example mine use a variable named “n”. If the variable is not passed via the avars qparam, the request fails.

Request
GET /messages/_streams/mine HTTP/1.1
Response
HTTP/1.1 400 Bad Request

...

{
    "_exceptions": [
        {
            "exception": "org.restheart.hal.metadata.QueryVariableNotBoundException",
            "exception message": "variable n not bound", 
            ...
        }
    ]
}

Passing the variable n, the request succeeds:

Request
GET /messages/_streams/mine?avars={"n":"uji"} HTTP/1.1
Response
HTTP/1.1 101 Switching Protocols

...

Variables in stages or query

Variables can be used in change streams query as follows:

{ "$var": "<var_name>" }

In case of change stream with stage parameter previous example, the variable was used to restrict notifications only to changes on documents with a property name matching the variable n:

{ "_$match": { "fullDocument::name": { "_$var": "n" } } }

Security Informations

By default RESTHeart makes sure that the aggregation variables passed as query parameters hasn’t got inside MongoDB operators.

This behavior is required to protect data from undesirable malicious query injection.

Even though is highly discouraged, is possible to disable this check by editing the following property into restheart.yml configuration file.

### Security

# Check if aggregation variables use operators. allowing operators in aggregation variables
# is risky. requester can inject operators modifying the query

aggregation-check-operators: true