Looking for Cloud Services or Professional Support? Check restheart.com

Change Streams

Introduction

If the stream all is defined on the collection messages, clients can connect via WebSocket to ws://mydomain.com/messages/_streams/all and receive real time notification of data changes occurring in the collection.

Modern web applications needs to react with promptness and efficiency to data changes in many contexts. RESTHeart Change Stream feature comes in handy to achieve this goal.

“Change streams allow applications to access real-time data changes. […] Because change streams use the aggregation framework, applications can also filter for specific changes.”

Exposing a WebSocket Server resource, clients may be promptly notified about these changes only if necessary, avoiding network expensive common practices like polling.

Change streams require at least MongoDB v3.6 configured as a Replica Set.

Starting from RESTHeart 5.3.0, when the stream collection metadata is modified or the collection or the db is deleted, all related WebSocket connections are closed and the change streams are consequently updated.

With RESTHeart up to 5.2 a restart of the server is required after modifying a stream definition, or deleting its collection, in order to disconnect all clients.

The streams collection metadata

In RESTHeart, not only documents but also dbs and collections have properties. Some properties are metadata, i.e. they have a special meaning for RESTheart that influences its behavior.

The collection metadata property streams allows to declare change streams that client can watch for to be aware about changes to documents and bind them to given URI.

Change streams need to be defined as collection metadata. It is not possible to define a stream via a query parameter and this is by design: clients are not able to open up arbitrary change streams but only those defined (and tested) by the developers.

streams is an array of stream objects.

Stream metadata object format

stream object format

{
    "uri": <uri>,
    "stages": [
        "<stage_1>",
        "<stage_2>",
        ...
    ]
}
Property Description Mandatory
uri specifies the URI when the stream is bound under the path /<db>/<collection>/_streams yes
stages

the MongoDB aggregation pipeline stages.

For more information refer to https://docs.mongodb.org/manual/core/aggregation-pipeline/

yes

Notes:

  • Only a subset of aggregation pipeline stages are allowed for this features. Check MongoDB’s documentation for further informations.
  • Stages takes as input Change Events instead of the documents themselves. For example, the modified version of a document after a PATCH request is present at event.fullDocument property of the stages input event. (See examples below).

Escape stage properties informations

MongoDB does not allow to store fields with names starting with $ or containing dots (.), see Restrictions on Field Names on MongoDB’s documentation.

In order to allow storing stages with dollar prefixed operators or using the dot notation (to refer to properties of subdocuments), RESTHeart automatically and transparently escapes the properties keys as follows:

  • the $ prefix is “underscore escaped”, e.g. $exists is stored as _$exists
  • if the dot notation has to be used in a key name, dots are replaced with :: e.g. SD.prop is stored as SD::prop

In RESTHeart 1.x, these escapes are not managed automatically: the developer had to explicitly use them; starting from version 2.0 this is not needed anymore.

Examples

The following requests upsert a collection defining two change streams:

  • all bound at /messages/_streams/all
  • mine bound at /messages/_streams/mine
Request
PUT /messages HTTP/1.1

{ 
    "streams" : [ 
      { "stages" : [
          {
              "_$match": {
                "_$or" : [
                    { 
                        "operationType": "insert"
                    },
                    { 
                        "operationType": "update"
                    }
                ]
            }
          }
      ],
        "uri" : "all"
      },
      { "stages" : [ 
          { 
              "_$match" : { 
                "fullDocument::name" : { "_$var" : "n" } 
              } 
          }
        ],
        "uri" : "mine"
      }
    ] 
}

Note that the $match stage specifies a condition on the name property using fullDocument::name. This is because the Change Event looks like:

{
    "fullDocument": {
        "_id": {
            "$oid": "5e15ff5779ca449eb20fdd09"
        },
        "message": "hi uji, how are you?",
        "name": "uji",
        "_etag": {
            "$oid": "5e15ff57a2e5700c3459e801"
        }
    },
    "documentKey": {
        "_id": {
            "$oid": "5e15ff5779ca449eb20fdd09"
        }
    },
    "updateDescription": null,
    "operationType": "insert"
}

Note between the _links collection property the URIs of the change streams (returned with ?rep=SHAL).

Request
GET /messages?rep=SHAL HTTP/1.1
Response
HTTP/1.1 200 OK

...

{
    ...

    "_links": {
        ...,
        "all": {
            "href": "/messages/_streams/all"
        },
        "mine": {
            "href": "/messages/_streams/mine"
        }
    },

    ...

}

Passing variables to change streams

The query parameter avars allows to pass variables to the change stream.

For example, the previous example mine use a variable named “n”. If the variable is not passed via the avars qparam, the request fails.

Request
GET /messages/_streams/mine HTTP/1.1
Response
HTTP/1.1 400 Bad Request

...

{
    "_exceptions": [
        {
            "exception": "org.restheart.exchange.QueryVariableNotBoundException",
            "exception message": "variable n not bound", 
            ...
        }
    ]
}

Passing the variable n, the request succeeds:

Request
GET /messages/_streams/mine?avars={"n":"uji"} HTTP/1.1
Response
HTTP/1.1 101 Switching Protocols

...

Variables in stages or query

Variables can be used in change streams query as follows:

{ "$var": "<var_name>" }

In case of change stream with stage parameter previous example, the variable was used to restrict notifications only to changes on documents with a property name matching the variable n:

{ "_$match": { "fullDocument::name": { "_$var": "n" } } }

Security Informations

By default RESTHeart makes sure that the aggregation variables passed as query parameters hasn’t got inside MongoDB operators.

This behavior is required to protect data from undesirable malicious query injection.

Even though is highly discouraged, is possible to disable this check by editing the following property into restheart.yml configuration file.

### Security

# Check if aggregation variables use operators. allowing operators in aggregation variables
# is risky. requester can inject operators modifying the query

aggregation-check-operators: true