WebSockets and Change Streams

Introduction

The WebSocket API (WebSockets)

The WebSocket API is an advanced technology that makes it possible to open a two-way interactive communication session between the user’s browser and a server. With this API, you can send messages to a server and receive event-driven responses without having to poll the server for a reply.

Ref: https://developer.mozilla.org/en-US/docs/Web/API/WebSockets_API

Websocket_connection

MongoDB Change Streams

Change streams allow applications to access real-time data changes without the complexity and risk of tailing the oplog. Applications can use change streams to subscribe to all data changes on a single collection, a database, or an entire deployment, and immediately react to them. Because change streams use the aggregation framework, applications can also filter for specific changes or transform the notifications at will.

Ref: https://docs.mongodb.com/manual/changeStreams/

RESTHeart WebSockets

RESTHeart embeds a WebSocket server implementation that allows to expose MongoDB’s Change Streams to Web browsers and any kind of HTTP/WebSocket client (for example, Postman offers this feature).

With RESTHeart is possibile to create Web or Mobile apps that can be asynchronously notified in real time of data changes. Because change streams use the aggregation framework, applications can also filter for specific changes.

For example, if the stream all is defined on the collection messages, clients can connect via WebSocket to ws://mydomain.com/messages/_streams/all and receive real time notification of data changes occurring in the collection.

Exposing a WebSocket Server resource, clients may be promptly notified about these changes only if necessary, avoiding network expensive common practices like polling.

Change streams require at least MongoDB v3.6 configured as a Replica Set.

Starting from RESTHeart 5.3.0, when the stream collection metadata is modified or the collection or the db is deleted, all related WebSocket connections are closed and the change streams are consequently updated.

How it works

The “streams” collection metadata

In RESTHeart, not only documents but also databases and collections have properties. Some properties are metadata, i.e. they have a special meaning for RESTheart that influences its behavior.

The collection metadata property streams allows to declare change streams that client can watch for to be aware about changes to documents and bind them to given URI.

Change streams need to be defined as collection metadata. It is not possible to define a stream via a query parameter and this is by design: clients are not able to open up arbitrary change streams but only those defined (and tested) by the developers.

streams is an array of stream objects.

Stream metadata object format

stream object format

{
    "uri": <uri>,
    "stages": [
        "<stage_1>",
        "<stage_2>",
        ...
    ]
}
Property Description Mandatory
uri specifies the URI when the stream is bound under the path /<db>/<collection>/_streams yes
stages

the MongoDB aggregation pipeline stages.

For more information refer to https://docs.mongodb.org/manual/core/aggregation-pipeline/

yes

Notes:

  • Only a subset of aggregation pipeline stages are allowed for this features. Check MongoDB’s documentation for further informations.
  • Stages takes as input Change Events instead of the documents themselves. For example, the modified version of a document after a PATCH request is present at event.fullDocument property of the stages input event. (See examples below).

Escape stage properties informations

MongoDB does not allow to store fields with names starting with $ or containing dots (.), see Restrictions on Field Names on MongoDB’s documentation.

In order to allow storing stages with dollar prefixed operators or using the dot notation (to refer to properties of subdocuments), RESTHeart automatically and transparently escapes the properties keys as follows:

  • the $ prefix is “underscore escaped”, e.g. $exists is stored as _$exists
  • if the dot notation has to be used in a key name, dots are replaced with :: e.g. SD.prop is stored as SD::prop

In RESTHeart 1.x, these escapes are not managed automatically: the developer had to explicitly use them; starting from version 2.0 this is not needed anymore.

Examples

The following requests upsert a collection defining two change streams:

  • all bound at /messages/_streams/all
  • mine bound at /messages/_streams/mine
Request
PUT /messages HTTP/1.1

{ 
    "streams" : [ 
      { "stages" : [
          {
              "_$match": {
                "_$or" : [
                    { 
                        "operationType": "insert"
                    },
                    { 
                        "operationType": "update"
                    }
                ]
            }
          }
      ],
        "uri" : "all"
      },
      { "stages" : [ 
          { 
              "_$match" : { 
                "fullDocument::name" : { "_$var" : "n" } 
              } 
          }
        ],
        "uri" : "mine"
      }
    ] 
}

Note that the $match stage specifies a condition on the name property using fullDocument::name. This is because the Change Event looks like:

{
    "fullDocument": {
        "_id": {
            "$oid": "5e15ff5779ca449eb20fdd09"
        },
        "message": "hi uji, how are you?",
        "name": "uji",
        "_etag": {
            "$oid": "5e15ff57a2e5700c3459e801"
        }
    },
    "documentKey": {
        "_id": {
            "$oid": "5e15ff5779ca449eb20fdd09"
        }
    },
    "updateDescription": null,
    "operationType": "insert"
}

Note between the _links collection property the URIs of the change streams (returned with ?rep=SHAL).

Request
GET /messages?rep=SHAL HTTP/1.1
Response
HTTP/1.1 200 OK

...

{
    ...

    "_links": {
        ...,
        "all": {
            "href": "/messages/_streams/all"
        },
        "mine": {
            "href": "/messages/_streams/mine"
        }
    },

    ...

}

Passing variables to change streams

The query parameter avars allows to pass variables to the change stream.

For example, the previous example mine use a variable named “n”. If the variable is not passed via the avars qparam, the request fails.

Request
GET /messages/_streams/mine HTTP/1.1
Response
HTTP/1.1 400 Bad Request

...

{
    "_exceptions": [
        {
            "exception": "org.restheart.exchange.QueryVariableNotBoundException",
            "exception message": "variable n not bound", 
            ...
        }
    ]
}

Passing the variable n, the request succeeds:

Request
GET /messages/_streams/mine?avars={"n":"uji"} HTTP/1.1
Response
HTTP/1.1 101 Switching Protocols

...

Variables in stages or query

Variables can be used in change streams query as follows:

{ "$var": "<var_name>" }

In case of change stream with stage parameter previous example, the variable was used to restrict notifications only to changes on documents with a property name matching the variable n:

{ "_$match": { "fullDocument::name": { "_$var": "n" } } }

Security Informations

By default RESTHeart makes sure that the aggregation variables passed as query parameters hasn’t got inside MongoDB operators.

This behavior is required to protect data from undesirable malicious query injection.

Even though is highly discouraged, is possible to disable this check by editing the following property into restheart.yml configuration file.

### Security

# Check if aggregation variables use operators. allowing operators in aggregation variables
# is risky. requester can inject operators modifying the query

aggregation-check-operators: true