- How Change Streams Work
- How Streams Are Stored
- Stream Definition Format
- Important: Stages Operate on Change Events, Not Documents
- Listing Streams
- Adding a Stream
- Editing a Stream
- Deleting a Stream
- Copying the WebSocket URL
- Live Try Panel
- Authentication
- JavaScript Client Example
- Worked Example: Define → Connect → Trigger
- Common Stream Patterns
- API Reference
- Related Pages
WebSocket Change Streams Cloud
The WebSocket section of the RESTHeart Cloud UI lets you define, test, and manage server-side MongoDB Change Stream endpoints exposed as WebSocket connections. Clients connect via WebSocket and receive real-time Change Event notifications as documents are inserted, updated, or deleted in the watched collection.
Navigation path: Service → WebSocket
How Change Streams Work
-
You define a
streamsarray in a collection’s metadata via the UI. -
RESTHeart exposes a WebSocket endpoint at
wss://<host>/<db>/<collection>/_streams/<uri>. -
A client connects to the endpoint, authenticating with HTTP Basic Auth credentials embedded in the URL.
-
RESTHeart opens a MongoDB Change Stream on the collection, applies your pipeline stages as a filter, and forwards matching Change Events to all connected clients as JSON messages.
Why Server-Side Stream Definitions?
Stream definitions are declared server-side so clients can only subscribe to pre-approved, tested streams — they cannot open arbitrary change streams. This is the same security model used for Aggregations: clients call a named endpoint rather than posting arbitrary pipeline logic.
MongoDB Replica Set Requirement
Change Streams require MongoDB to be configured as a Replica Set (not a standalone instance). All RESTHeart Cloud Free, Shared, and Dedicated services already satisfy this requirement — no additional configuration is needed on your part.
How Streams Are Stored
Stream definitions are stored as a streams array inside a collection’s metadata — the same metadata document that holds the aggrs array for aggregations. When you add, edit, or delete a stream in the UI, RESTHeart issues a PATCH /<db>/<collection> request that updates the streams array in place. No service restart is required.
{
"streams": [
{
"uri": "all-changes",
"stages": [
{
"$match": {
"operationType": { "$in": ["insert", "update", "replace"] }
}
}
]
}
]
}
Stream Definition Format
Each entry in the streams array is a JSON object with the following fields:
| Field | Required | Description |
|---|---|---|
|
Yes |
The path segment used in the WebSocket URL. Must be unique within the collection. Example: |
|
Yes |
A MongoDB aggregation pipeline array used to filter which Change Events are forwarded to clients. Use an empty array |
Important: Stages Operate on Change Events, Not Documents
This is the most common source of confusion when writing stream pipeline stages. Unlike aggregation stages (which receive raw MongoDB documents), stream stages receive MongoDB Change Event objects.
A Change Event looks like this:
{
"operationType": "update",
"ns": { "db": "mydb", "coll": "orders" },
"documentKey": { "_id": { "$oid": "64abc..." } },
"fullDocument": {
"_id": { "$oid": "64abc..." },
"status": "shipped",
"customerId": "alice"
},
"updateDescription": {
"updatedFields": { "status": "shipped" },
"removedFields": []
}
}
To filter on a field inside the document, use fullDocument.<fieldName>:
{ "$match": { "fullDocument.status": "shipped" } }
NOT:
{ "$match": { "status": "shipped" } }
Key Change Event Fields
| Field | Description |
|---|---|
|
Type of the change: |
|
The full document after the change. Available for |
|
Namespace: |
|
The |
|
For |
Listing Streams
The WebSocket page loads all collections and fetches each collection’s /_meta to discover the streams array. Each collection is shown with its stream count.
On Dedicated plans a database selector dropdown appears at the top of the page. Switching databases refreshes the collection list automatically.
Use the Search box to filter the collection list by name (debounced, 300 ms).
Adding a Stream
-
Expand the collection you want to add a stream to.
-
Click Add Stream.
-
Enter the full stream definition as a JSON object in the editor:
{ "uri": "active-orders", "stages": [ { "$match": { "operationType": { "$in": ["insert", "update"] }, "fullDocument.status": { "$ne": "cancelled" } } } ] } -
Click Format to pretty-print and validate the JSON.
-
Click Save. The UI issues
PATCH /<db>/<collection>with the updatedstreamsarray.
|
Note
|
The editor shows the complete stream definition object { "uri": "…", "stages": […] } — not just the stages array. This makes it easy to copy and paste stream definitions between collections.
|
Editing a Stream
-
Expand the collection and find the stream entry.
-
Click Edit. The editor is pre-populated with the current stream definition.
-
The
uriis read-only after creation — it forms part of the public WebSocket URL. -
Modify the
stagesarray and click Save.
|
Important
|
When a stream definition is modified, RESTHeart automatically closes all active WebSocket connections for that stream. Clients must reconnect to receive events from the updated pipeline. |
Deleting a Stream
Click Delete on a stream entry. A confirmation dialog appears before the streams array is updated to remove the entry.
|
Warning
|
Deleting a stream immediately disconnects all clients connected to that stream’s WebSocket endpoint. The endpoint returns a 404 Not Found for any subsequent connection attempts.
|
Copying the WebSocket URL
Click Copy URL on any stream entry to copy the full wss:// endpoint URL to the clipboard. The URL format is:
wss://<host>/<db>/<collection>/_streams/<uri>
You can paste this URL directly into websocat, a browser DevTools console, or your application code.
Live Try Panel
Every stream definition has a built-in Live Try panel that lets you test the WebSocket connection directly from the UI.
Connecting
-
Click Try on a stream entry to open the Live Try panel.
-
Enter the username and password of a user registered in the
userscollection (not your RESTHeart Cloud account credentials — see Authentication below). -
Click Connect.
The panel shows the connection status cycling through:
-
Connecting — WebSocket upgrade request in flight.
-
Connected — receiving events.
-
Disconnected — connection closed cleanly.
-
Error — connection failed (error code
1006is annotated as a likely authentication failure).
Receiving Events
Incoming Change Events are displayed in a live, auto-scrolling log with timestamps and pretty-printed JSON. Up to 200 events are shown; when the limit is reached, the oldest events are discarded automatically.
Panel Actions
| Action | Description |
|---|---|
Disconnect |
Closes the WebSocket connection cleanly. |
Reconnect |
Closes and immediately re-opens the connection (useful after editing the stream definition). |
Clear |
Clears the event log without disconnecting. |
Authentication
WebSocket connections from browsers cannot set arbitrary HTTP headers, so the standard Authorization: Bearer <token> pattern used for REST calls does not apply here.
RESTHeart Cloud supports credentials embedded directly in the WebSocket URL:
wss://username:password@myservice.restheart.com/mydb/orders/_streams/all-changes
The browser converts the embedded credentials into a standard Authorization: Basic base64(username:password) header in the HTTP upgrade request.
Which Credentials to Use
| Credential type | Can authenticate WebSocket? |
|---|---|
Admin JWT (used by the Cloud UI for management operations) |
❌ No — the Admin JWT cannot be used for WebSocket authentication. |
User credentials (stored in the |
✅ Yes — use any user registered in the Users section with roles that have permission to access the stream endpoint. |
Make sure the user’s roles are covered by an ACL rule in Permissions that permits GET on the stream path.
JavaScript Client Example
const ws = new WebSocket(
"wss://alice:secret@myservice.restheart.com/mydb/orders/_streams/all-changes"
);
ws.onopen = () => {
console.log("Connected to change stream");
};
ws.onmessage = (event) => {
const changeEvent = JSON.parse(event.data);
// The full document after the change
console.log("Operation:", changeEvent.operationType);
console.log("Document:", changeEvent.fullDocument);
// For update events, see which fields changed
if (changeEvent.operationType === "update") {
console.log("Updated fields:", changeEvent.updateDescription.updatedFields);
}
};
ws.onerror = (error) => {
console.error("WebSocket error:", error);
};
ws.onclose = (event) => {
console.log("Connection closed, code:", event.code);
// code 1006 usually means authentication failed or network error
if (event.code === 1006) {
console.error("Possible authentication failure — check username and password");
}
};
Worked Example: Define → Connect → Trigger
This end-to-end example sets up a stream that notifies clients whenever an order’s status changes to "shipped".
Step 1: Define the Stream
In the WebSocket page, expand the orders collection and click Add Stream. Enter:
{
"uri": "shipped-orders",
"stages": [
{
"$match": {
"operationType": { "$in": ["update", "replace"] },
"fullDocument.status": "shipped"
}
}
]
}
Click Save.
Step 2: Connect via websocat (CLI)
websocat "wss://alice:secret@myservice.restheart.com/mydb/orders/_streams/shipped-orders"
The terminal will block, waiting for events.
Step 3: Trigger a Change
In another terminal, update an order’s status:
curl -X PATCH https://myservice.restheart.com/mydb/orders/64abc \
-H "Authorization: Basic $(echo -n alice:secret | base64)" \
-H "Content-Type: application/json" \
-d '{"status": "shipped"}'
Step 4: See the Event
The websocat terminal (or your browser console if using the Live Try panel) immediately receives a Change Event:
{
"operationType": "update",
"ns": { "db": "mydb", "coll": "orders" },
"documentKey": { "_id": { "$oid": "64abc..." } },
"fullDocument": {
"_id": { "$oid": "64abc..." },
"customerId": "alice",
"status": "shipped",
"total": 149.99
},
"updateDescription": {
"updatedFields": { "status": "shipped" },
"removedFields": []
}
}
Common Stream Patterns
All changes on a collection
{ "uri": "all-changes", "stages": [] }
Inserts only
{
"uri": "new-documents",
"stages": [
{ "$match": { "operationType": "insert" } }
]
}
Watch a specific field value
{
"uri": "urgent-tickets",
"stages": [
{
"$match": {
"operationType": { "$in": ["insert", "update", "replace"] },
"fullDocument.priority": "urgent"
}
}
]
}
Exclude deletes
{
"uri": "non-delete-events",
"stages": [
{
"$match": {
"operationType": { "$nin": ["delete"] }
}
}
]
}
API Reference
| Operation | Endpoint |
|---|---|
Read collection metadata |
|
Save streams |
|
Connect to stream |
|
Related Pages
-
Managing Collections & Documents — manage the collections that streams watch.
-
Aggregations — define server-side aggregation pipelines with a similar security model.
-
Managing Users — create the users whose credentials authenticate WebSocket connections.
-
Managing Permissions (ACL) — grant roles access to stream endpoints.
-
Dedicated vs. Free/Shared Plans — understand the database selector and path differences.
-
WebSocket Change Streams (full reference) — deep-dive into RESTHeart’s Change Stream API.