Edit Page

Tutorial: Change Streams over SSE

RESTHeart Cloud
Note
SSE support is available from RESTHeart 9.3 onwards.

This tutorial guides you through streaming real-time MongoDB change events to HTTP clients using Server-Sent Events (SSE). You will:

  • Define a change stream on a MongoDB collection

  • Connect to the stream with Accept: text/event-stream

  • Observe live events in a browser and from the command line

  • Resume a stream after a disconnect using Last-Event-ID

  • Configure ACL permissions for SSE access

Prerequisites

Tip
To test SSE from the command line, use HTTPie (brew install httpie) or plain curl. Both support streaming responses.

Step 1: Create a Collection with a Change Stream

Change streams must be declared as collection metadata. This gives developers full control over which streams are exposed and what pipeline stages they apply.

We create a messages collection with one stream called all that forwards every insert and update:

cURL
curl -i -X PUT 'http://localhost:8080/messages' \
  -u admin:secret \
  -H 'Content-Type: application/json' \
  -d '{
    "streams": [
      {
        "uri": "all",
        "stages": [
          {
            "_$match": {
              "_$or": [
                { "operationType": "insert" },
                { "operationType": "update" }
              ]
            }
          }
        ]
      }
    ]
  }'
HTTPie
echo '{
  "streams": [
    {
      "uri": "all",
      "stages": [
        {
          "_$match": {
            "_$or": [
              { "operationType": "insert" },
              { "operationType": "update" }
            ]
          }
        }
      ]
    }
  ]
}' | http PUT http://localhost:8080/messages \
  'Authorization:Basic YWRtaW46c2VjcmV0' \
  'Content-Type:application/json'

A 201 Created response confirms the stream definition was saved. The stream is now accessible at /messages/_streams/all.

Note
MongoDB operators starting with $ are stored as $ (e.g., $match). RESTHeart converts them automatically. See Change Streams documentation for details.

Step 2: Connect via SSE

To open an SSE connection, send a GET request to the stream URL with Accept: text/event-stream. The response remains open and events arrive as they happen.

cURL
curl -N \
  -H 'Accept: text/event-stream' \
  -H 'Authorization: Basic YWRtaW46c2VjcmV0' \
  http://localhost:8080/messages/_streams/all

The -N flag disables response buffering so events are printed immediately.

HTTPie
http --stream GET http://localhost:8080/messages/_streams/all \
  'Accept:text/event-stream' \
  'Authorization:Basic YWRtaW46c2VjcmV0'
Browser (JavaScript EventSource)
// Basic64("admin:secret") = "YWRtaW46c2VjcmV0"
const es = new EventSource('http://localhost:8080/messages/_streams/all', {
  // credentials are passed via a token in practice; basic auth on EventSource
  // requires a proxy or an auth token — see the ACL step below for unauthenticated access
});

es.addEventListener('change', event => {
  const data = JSON.parse(event.data);
  console.log('operationType:', data.operationType);
  console.log('document:', data.fullDocument);
});

es.onerror = err => console.error('SSE error', err);
Note
The browser EventSource API does not support custom request headers. For authenticated access from the browser, either allow $unauthenticated on the stream path (see Step 5) or use a JWT token passed as a query parameter via a custom auth mechanism.

The connection is now open and waiting for events. Leave it running and continue to the next step.

Step 3: Insert a Document and Observe the Event

In a second terminal, insert a document into the messages collection:

cURL
curl -i -X POST 'http://localhost:8080/messages' \
  -u admin:secret \
  -H 'Content-Type: application/json' \
  -d '{"message": "Hello SSE!", "name": "uji"}'
HTTPie
echo '{"message": "Hello SSE!", "name": "uji"}' \
  | http POST http://localhost:8080/messages \
    'Authorization:Basic YWRtaW46c2VjcmV0' \
    'Content-Type:application/json'

The SSE connection in the first terminal immediately receives the event:

event:change
data:{"fullDocument":{"_id":{"$oid":"62166d53ebdcd56455a1a7ab"},"message":"Hello SSE!","name":"uji","_etag":{"$oid":"62166d53ebdcd56455a1a7aa"}},"documentKey":{"_id":{"$oid":"62166d53ebdcd56455a1a7ab"}},"operationType":"insert"}

Step 4: Understanding the Event Format

Every SSE event from a MongoDB change stream contains two non-blank lines:

event:change
data:<json>

The data JSON payload contains:

Field Description

fullDocument

The complete document after the change. For update operations with lookup enabled, this reflects the document’s current state.

documentKey

The _id of the changed document.

operationType

The type of change: insert, update, replace, delete, invalidate.

updateDescription

For update operations: updatedFields and removedFields.

A typical update event looks like:

{
    "fullDocument": {
        "_id": { "$oid": "62166d53ebdcd56455a1a7ab" },
        "message": "Hello SSE! (edited)",
        "name": "uji"
    },
    "documentKey": {
        "_id": { "$oid": "62166d53ebdcd56455a1a7ab" }
    },
    "updateDescription": {
        "updatedFields": { "message": "Hello SSE! (edited)" },
        "removedFields": []
    },
    "operationType": "update"
}

Step 5: Resume After Disconnect with Last-Event-ID

A key advantage of SSE over WebSocket is the built-in resume protocol. When a client disconnects and reconnects, it sends the Last-Event-ID header containing the last event’s id token. RESTHeart resumes the change stream from that position — no events are lost.

RESTHeart uses MongoDB resume tokens as event IDs. To resume manually:

curl -N \
  -H 'Accept: text/event-stream' \
  -H 'Authorization: Basic YWRtaW46c2VjcmV0' \
  -H 'Last-Event-ID: {"_data":"82..."}' \
  http://localhost:8080/messages/_streams/all
Note
The EventSource API in the browser handles Last-Event-ID automatically. When the connection drops, the browser reconnects and sends the ID of the last event it received. No additional client code is required.
Important
Resume tokens are only valid as long as the MongoDB oplog has not been truncated past the token’s position. For long-lived applications, ensure your replica set’s oplogSizeMB is large enough.

Step 6: Configure ACL Permissions

By default, SSE connections to /_streams/ endpoints require authentication. To allow unauthenticated access (for example, in a public dashboard), create an ACL rule:

cURL
curl -i -X POST 'http://localhost:8080/acl' \
  -u admin:secret \
  -H 'Content-Type: application/json' \
  -d '{
    "_id": "unauthenticatedCanConnectToMessagesStream",
    "predicate": "path-prefix(/messages/_streams/all)",
    "priority": 0,
    "roles": ["$unauthenticated"]
  }'
HTTPie
echo '{
  "_id": "unauthenticatedCanConnectToMessagesStream",
  "predicate": "path-prefix(/messages/_streams/all)",
  "priority": 0,
  "roles": ["$unauthenticated"]
}' | http POST http://localhost:8080/acl \
  'Authorization:Basic YWRtaW46c2VjcmV0' \
  'Content-Type:application/json'

With this rule in place, the browser EventSource API can connect without credentials:

const es = new EventSource('http://localhost:8080/messages/_streams/all');
es.addEventListener('change', event => console.log(JSON.parse(event.data)));
Warning
Only allow unauthenticated access in development or when the stream contains non-sensitive data. In production, use JWT tokens or session-based authentication.

Step 7: Filtering Events with Variables

The same pipeline variable mechanism available in WebSocket streams works for SSE. Pass avars as a query parameter to filter events server-side:

# Only receive events for documents where name = "uji"
curl -N \
  -H 'Accept: text/event-stream' \
  -H 'Authorization: Basic YWRtaW46c2VjcmV0' \
  'http://localhost:8080/messages/_streams/mine?avars={"n":"uji"}'

This requires the mine stream to be defined with a $var stage:

{
  "uri": "mine",
  "stages": [
    { "_$match": { "fullDocument::name": { "_$var": "n" } } }
  ]
}

See Using Variables in Change Streams for the full reference.

Sharing a Stream Between SSE and WebSocket Clients

SSE and WebSocket clients watching the same stream share the same MongoDB cursor. A single cursor fans out to all connected clients regardless of their transport. You can mix SSE and WebSocket consumers on the same stream URL simultaneously.

# Terminal 1: SSE client
curl -N -H 'Accept: text/event-stream' -u admin:secret \
  http://localhost:8080/messages/_streams/all

# Terminal 2: WebSocket client (requires websocat)
websocat --text - autoreconnect:ws://admin:secret@127.0.0.1:8080/messages/_streams/all

Both terminals receive the same events when a document changes.

Summary

You have successfully:

  • ✓ Defined a change stream on a MongoDB collection

  • ✓ Connected to the stream using Accept: text/event-stream

  • ✓ Received live change events via SSE

  • ✓ Used Last-Event-ID to resume after a disconnect

  • ✓ Configured ACL permissions for the SSE endpoint

  • ✓ Filtered events with aggregation pipeline variables

Next Steps