- Prerequisites
- Step 1: Create a Collection with a Change Stream
- Step 2: Connect via SSE
- Step 3: Insert a Document and Observe the Event
- Step 4: Understanding the Event Format
- Step 5: Resume After Disconnect with Last-Event-ID
- Step 6: Configure ACL Permissions
- Step 7: Filtering Events with Variables
- Sharing a Stream Between SSE and WebSocket Clients
- Summary
- Next Steps
Tutorial: Change Streams over SSE
RESTHeart Cloud|
Note
|
SSE support is available from RESTHeart 9.3 onwards. |
This tutorial guides you through streaming real-time MongoDB change events to HTTP clients using Server-Sent Events (SSE). You will:
-
Define a change stream on a MongoDB collection
-
Connect to the stream with
Accept: text/event-stream -
Observe live events in a browser and from the command line
-
Resume a stream after a disconnect using
Last-Event-ID -
Configure ACL permissions for SSE access
Prerequisites
-
MongoDB configured as a Replica Set (required for change streams)
-
RESTHeart running (see Quick Start)
|
Tip
|
To test SSE from the command line, use HTTPie (brew install httpie) or plain curl. Both support streaming responses.
|
Step 1: Create a Collection with a Change Stream
Change streams must be declared as collection metadata. This gives developers full control over which streams are exposed and what pipeline stages they apply.
We create a messages collection with one stream called all that forwards every insert and update:
cURL
curl -i -X PUT 'http://localhost:8080/messages' \
-u admin:secret \
-H 'Content-Type: application/json' \
-d '{
"streams": [
{
"uri": "all",
"stages": [
{
"_$match": {
"_$or": [
{ "operationType": "insert" },
{ "operationType": "update" }
]
}
}
]
}
]
}'
HTTPie
echo '{
"streams": [
{
"uri": "all",
"stages": [
{
"_$match": {
"_$or": [
{ "operationType": "insert" },
{ "operationType": "update" }
]
}
}
]
}
]
}' | http PUT http://localhost:8080/messages \
'Authorization:Basic YWRtaW46c2VjcmV0' \
'Content-Type:application/json'
A 201 Created response confirms the stream definition was saved. The stream is now accessible at /messages/_streams/all.
|
Note
|
MongoDB operators starting with $ are stored as $ (e.g., $match). RESTHeart converts them automatically. See Change Streams documentation for details.
|
Step 2: Connect via SSE
To open an SSE connection, send a GET request to the stream URL with Accept: text/event-stream. The response remains open and events arrive as they happen.
cURL
curl -N \
-H 'Accept: text/event-stream' \
-H 'Authorization: Basic YWRtaW46c2VjcmV0' \
http://localhost:8080/messages/_streams/all
The -N flag disables response buffering so events are printed immediately.
HTTPie
http --stream GET http://localhost:8080/messages/_streams/all \
'Accept:text/event-stream' \
'Authorization:Basic YWRtaW46c2VjcmV0'
Browser (JavaScript EventSource)
// Basic64("admin:secret") = "YWRtaW46c2VjcmV0"
const es = new EventSource('http://localhost:8080/messages/_streams/all', {
// credentials are passed via a token in practice; basic auth on EventSource
// requires a proxy or an auth token — see the ACL step below for unauthenticated access
});
es.addEventListener('change', event => {
const data = JSON.parse(event.data);
console.log('operationType:', data.operationType);
console.log('document:', data.fullDocument);
});
es.onerror = err => console.error('SSE error', err);
|
Note
|
The browser EventSource API does not support custom request headers. For authenticated access from the browser, either allow $unauthenticated on the stream path (see Step 5) or use a JWT token passed as a query parameter via a custom auth mechanism.
|
The connection is now open and waiting for events. Leave it running and continue to the next step.
Step 3: Insert a Document and Observe the Event
In a second terminal, insert a document into the messages collection:
cURL
curl -i -X POST 'http://localhost:8080/messages' \
-u admin:secret \
-H 'Content-Type: application/json' \
-d '{"message": "Hello SSE!", "name": "uji"}'
HTTPie
echo '{"message": "Hello SSE!", "name": "uji"}' \
| http POST http://localhost:8080/messages \
'Authorization:Basic YWRtaW46c2VjcmV0' \
'Content-Type:application/json'
The SSE connection in the first terminal immediately receives the event:
event:change
data:{"fullDocument":{"_id":{"$oid":"62166d53ebdcd56455a1a7ab"},"message":"Hello SSE!","name":"uji","_etag":{"$oid":"62166d53ebdcd56455a1a7aa"}},"documentKey":{"_id":{"$oid":"62166d53ebdcd56455a1a7ab"}},"operationType":"insert"}
Step 4: Understanding the Event Format
Every SSE event from a MongoDB change stream contains two non-blank lines:
event:change
data:<json>
The data JSON payload contains:
| Field | Description |
|---|---|
|
The complete document after the change. For |
|
The |
|
The type of change: |
|
For |
A typical update event looks like:
{
"fullDocument": {
"_id": { "$oid": "62166d53ebdcd56455a1a7ab" },
"message": "Hello SSE! (edited)",
"name": "uji"
},
"documentKey": {
"_id": { "$oid": "62166d53ebdcd56455a1a7ab" }
},
"updateDescription": {
"updatedFields": { "message": "Hello SSE! (edited)" },
"removedFields": []
},
"operationType": "update"
}
Step 5: Resume After Disconnect with Last-Event-ID
A key advantage of SSE over WebSocket is the built-in resume protocol. When a client disconnects and reconnects, it sends the Last-Event-ID header containing the last event’s id token. RESTHeart resumes the change stream from that position — no events are lost.
RESTHeart uses MongoDB resume tokens as event IDs. To resume manually:
curl -N \
-H 'Accept: text/event-stream' \
-H 'Authorization: Basic YWRtaW46c2VjcmV0' \
-H 'Last-Event-ID: {"_data":"82..."}' \
http://localhost:8080/messages/_streams/all
|
Note
|
The EventSource API in the browser handles Last-Event-ID automatically. When the connection drops, the browser reconnects and sends the ID of the last event it received. No additional client code is required.
|
|
Important
|
Resume tokens are only valid as long as the MongoDB oplog has not been truncated past the token’s position. For long-lived applications, ensure your replica set’s oplogSizeMB is large enough.
|
Step 6: Configure ACL Permissions
By default, SSE connections to /_streams/ endpoints require authentication. To allow unauthenticated access (for example, in a public dashboard), create an ACL rule:
cURL
curl -i -X POST 'http://localhost:8080/acl' \
-u admin:secret \
-H 'Content-Type: application/json' \
-d '{
"_id": "unauthenticatedCanConnectToMessagesStream",
"predicate": "path-prefix(/messages/_streams/all)",
"priority": 0,
"roles": ["$unauthenticated"]
}'
HTTPie
echo '{
"_id": "unauthenticatedCanConnectToMessagesStream",
"predicate": "path-prefix(/messages/_streams/all)",
"priority": 0,
"roles": ["$unauthenticated"]
}' | http POST http://localhost:8080/acl \
'Authorization:Basic YWRtaW46c2VjcmV0' \
'Content-Type:application/json'
With this rule in place, the browser EventSource API can connect without credentials:
const es = new EventSource('http://localhost:8080/messages/_streams/all');
es.addEventListener('change', event => console.log(JSON.parse(event.data)));
|
Warning
|
Only allow unauthenticated access in development or when the stream contains non-sensitive data. In production, use JWT tokens or session-based authentication. |
Step 7: Filtering Events with Variables
The same pipeline variable mechanism available in WebSocket streams works for SSE. Pass avars as a query parameter to filter events server-side:
# Only receive events for documents where name = "uji"
curl -N \
-H 'Accept: text/event-stream' \
-H 'Authorization: Basic YWRtaW46c2VjcmV0' \
'http://localhost:8080/messages/_streams/mine?avars={"n":"uji"}'
This requires the mine stream to be defined with a $var stage:
{
"uri": "mine",
"stages": [
{ "_$match": { "fullDocument::name": { "_$var": "n" } } }
]
}
See Using Variables in Change Streams for the full reference.
Sharing a Stream Between SSE and WebSocket Clients
SSE and WebSocket clients watching the same stream share the same MongoDB cursor. A single cursor fans out to all connected clients regardless of their transport. You can mix SSE and WebSocket consumers on the same stream URL simultaneously.
# Terminal 1: SSE client
curl -N -H 'Accept: text/event-stream' -u admin:secret \
http://localhost:8080/messages/_streams/all
# Terminal 2: WebSocket client (requires websocat)
websocat --text - autoreconnect:ws://admin:secret@127.0.0.1:8080/messages/_streams/all
Both terminals receive the same events when a document changes.
Summary
You have successfully:
-
✓ Defined a change stream on a MongoDB collection
-
✓ Connected to the stream using
Accept: text/event-stream -
✓ Received live change events via SSE
-
✓ Used
Last-Event-IDto resume after a disconnect -
✓ Configured ACL permissions for the SSE endpoint
-
✓ Filtered events with aggregation pipeline variables
Next Steps
-
Custom SseService Plugin — build SSE endpoints that don’t require MongoDB
-
WebSocket Tutorial — the full-duplex alternative for the same change streams
-
Using Variables in Change Streams — advanced pipeline variable reference
-
Change Streams Reference — complete stream definition format