Edit Page

WebSocket Change Streams Cloud

The WebSocket section of the RESTHeart Cloud UI lets you define, test, and manage server-side MongoDB Change Stream endpoints exposed as WebSocket connections. Clients connect via WebSocket and receive real-time Change Event notifications as documents are inserted, updated, or deleted in the watched collection.

Navigation path: Service → WebSocket

How Change Streams Work

  1. You define a streams array in a collection’s metadata via the UI.

  2. RESTHeart exposes a WebSocket endpoint at wss://<host>/<db>/<collection>/_streams/<uri>.

  3. A client connects to the endpoint, authenticating with HTTP Basic Auth credentials embedded in the URL.

  4. RESTHeart opens a MongoDB Change Stream on the collection, applies your pipeline stages as a filter, and forwards matching Change Events to all connected clients as JSON messages.

Why Server-Side Stream Definitions?

Stream definitions are declared server-side so clients can only subscribe to pre-approved, tested streams — they cannot open arbitrary change streams. This is the same security model used for Aggregations: clients call a named endpoint rather than posting arbitrary pipeline logic.

MongoDB Replica Set Requirement

Change Streams require MongoDB to be configured as a Replica Set (not a standalone instance). All RESTHeart Cloud Free, Shared, and Dedicated services already satisfy this requirement — no additional configuration is needed on your part.

How Streams Are Stored

Stream definitions are stored as a streams array inside a collection’s metadata — the same metadata document that holds the aggrs array for aggregations. When you add, edit, or delete a stream in the UI, RESTHeart issues a PATCH /<db>/<collection> request that updates the streams array in place. No service restart is required.

{
  "streams": [
    {
      "uri": "all-changes",
      "stages": [
        {
          "$match": {
            "operationType": { "$in": ["insert", "update", "replace"] }
          }
        }
      ]
    }
  ]
}

Stream Definition Format

Each entry in the streams array is a JSON object with the following fields:

Field Required Description

uri

Yes

The path segment used in the WebSocket URL. Must be unique within the collection. Example: all-changes, status-updates.

stages

Yes

A MongoDB aggregation pipeline array used to filter which Change Events are forwarded to clients. Use an empty array [] to receive all Change Events for the collection.

Important: Stages Operate on Change Events, Not Documents

This is the most common source of confusion when writing stream pipeline stages. Unlike aggregation stages (which receive raw MongoDB documents), stream stages receive MongoDB Change Event objects.

A Change Event looks like this:

{
  "operationType": "update",
  "ns": { "db": "mydb", "coll": "orders" },
  "documentKey": { "_id": { "$oid": "64abc..." } },
  "fullDocument": {
    "_id": { "$oid": "64abc..." },
    "status": "shipped",
    "customerId": "alice"
  },
  "updateDescription": {
    "updatedFields": { "status": "shipped" },
    "removedFields": []
  }
}

To filter on a field inside the document, use fullDocument.<fieldName>:

{ "$match": { "fullDocument.status": "shipped" } }

NOT:

{ "$match": { "status": "shipped" } }

Key Change Event Fields

Field Description

operationType

Type of the change: "insert", "update", "replace", "delete", "drop", "rename", "dropDatabase", "invalidate".

fullDocument

The full document after the change. Available for insert, replace, and (when configured) update operations.

ns

Namespace: { "db": "…​", "coll": "…​" }.

documentKey

The _id of the affected document.

updateDescription

For update operations: { "updatedFields": {…​}, "removedFields": […​] }.

Listing Streams

The WebSocket page loads all collections and fetches each collection’s /_meta to discover the streams array. Each collection is shown with its stream count.

On Dedicated plans a database selector dropdown appears at the top of the page. Switching databases refreshes the collection list automatically.

Use the Search box to filter the collection list by name (debounced, 300 ms).

Adding a Stream

  1. Expand the collection you want to add a stream to.

  2. Click Add Stream.

  3. Enter the full stream definition as a JSON object in the editor:

    {
      "uri": "active-orders",
      "stages": [
        {
          "$match": {
            "operationType": { "$in": ["insert", "update"] },
            "fullDocument.status": { "$ne": "cancelled" }
          }
        }
      ]
    }
  4. Click Format to pretty-print and validate the JSON.

  5. Click Save. The UI issues PATCH /<db>/<collection> with the updated streams array.

Note
The editor shows the complete stream definition object { "uri": "…​", "stages": […​] } — not just the stages array. This makes it easy to copy and paste stream definitions between collections.

Editing a Stream

  1. Expand the collection and find the stream entry.

  2. Click Edit. The editor is pre-populated with the current stream definition.

  3. The uri is read-only after creation — it forms part of the public WebSocket URL.

  4. Modify the stages array and click Save.

Important
When a stream definition is modified, RESTHeart automatically closes all active WebSocket connections for that stream. Clients must reconnect to receive events from the updated pipeline.

Deleting a Stream

Click Delete on a stream entry. A confirmation dialog appears before the streams array is updated to remove the entry.

Warning
Deleting a stream immediately disconnects all clients connected to that stream’s WebSocket endpoint. The endpoint returns a 404 Not Found for any subsequent connection attempts.

Copying the WebSocket URL

Click Copy URL on any stream entry to copy the full wss:// endpoint URL to the clipboard. The URL format is:

wss://<host>/<db>/<collection>/_streams/<uri>

You can paste this URL directly into websocat, a browser DevTools console, or your application code.

Live Try Panel

Every stream definition has a built-in Live Try panel that lets you test the WebSocket connection directly from the UI.

Connecting

  1. Click Try on a stream entry to open the Live Try panel.

  2. Enter the username and password of a user registered in the users collection (not your RESTHeart Cloud account credentials — see Authentication below).

  3. Click Connect.

The panel shows the connection status cycling through:

  • Connecting — WebSocket upgrade request in flight.

  • Connected — receiving events.

  • Disconnected — connection closed cleanly.

  • Error — connection failed (error code 1006 is annotated as a likely authentication failure).

Receiving Events

Incoming Change Events are displayed in a live, auto-scrolling log with timestamps and pretty-printed JSON. Up to 200 events are shown; when the limit is reached, the oldest events are discarded automatically.

Panel Actions

Action Description

Disconnect

Closes the WebSocket connection cleanly.

Reconnect

Closes and immediately re-opens the connection (useful after editing the stream definition).

Clear

Clears the event log without disconnecting.

Authentication

WebSocket connections from browsers cannot set arbitrary HTTP headers, so the standard Authorization: Bearer <token> pattern used for REST calls does not apply here.

RESTHeart Cloud supports credentials embedded directly in the WebSocket URL:

wss://username:password@myservice.restheart.com/mydb/orders/_streams/all-changes

The browser converts the embedded credentials into a standard Authorization: Basic base64(username:password) header in the HTTP upgrade request.

Which Credentials to Use

Credential type Can authenticate WebSocket?

Admin JWT (used by the Cloud UI for management operations)

❌ No — the Admin JWT cannot be used for WebSocket authentication.

User credentials (stored in the users collection)

✅ Yes — use any user registered in the Users section with roles that have permission to access the stream endpoint.

Make sure the user’s roles are covered by an ACL rule in Permissions that permits GET on the stream path.

JavaScript Client Example

const ws = new WebSocket(
  "wss://alice:secret@myservice.restheart.com/mydb/orders/_streams/all-changes"
);

ws.onopen = () => {
  console.log("Connected to change stream");
};

ws.onmessage = (event) => {
  const changeEvent = JSON.parse(event.data);

  // The full document after the change
  console.log("Operation:", changeEvent.operationType);
  console.log("Document:", changeEvent.fullDocument);

  // For update events, see which fields changed
  if (changeEvent.operationType === "update") {
    console.log("Updated fields:", changeEvent.updateDescription.updatedFields);
  }
};

ws.onerror = (error) => {
  console.error("WebSocket error:", error);
};

ws.onclose = (event) => {
  console.log("Connection closed, code:", event.code);
  // code 1006 usually means authentication failed or network error
  if (event.code === 1006) {
    console.error("Possible authentication failure — check username and password");
  }
};

Worked Example: Define → Connect → Trigger

This end-to-end example sets up a stream that notifies clients whenever an order’s status changes to "shipped".

Step 1: Define the Stream

In the WebSocket page, expand the orders collection and click Add Stream. Enter:

{
  "uri": "shipped-orders",
  "stages": [
    {
      "$match": {
        "operationType": { "$in": ["update", "replace"] },
        "fullDocument.status": "shipped"
      }
    }
  ]
}

Click Save.

Step 2: Connect via websocat (CLI)

websocat "wss://alice:secret@myservice.restheart.com/mydb/orders/_streams/shipped-orders"

The terminal will block, waiting for events.

Step 3: Trigger a Change

In another terminal, update an order’s status:

curl -X PATCH https://myservice.restheart.com/mydb/orders/64abc \
  -H "Authorization: Basic $(echo -n alice:secret | base64)" \
  -H "Content-Type: application/json" \
  -d '{"status": "shipped"}'

Step 4: See the Event

The websocat terminal (or your browser console if using the Live Try panel) immediately receives a Change Event:

{
  "operationType": "update",
  "ns": { "db": "mydb", "coll": "orders" },
  "documentKey": { "_id": { "$oid": "64abc..." } },
  "fullDocument": {
    "_id": { "$oid": "64abc..." },
    "customerId": "alice",
    "status": "shipped",
    "total": 149.99
  },
  "updateDescription": {
    "updatedFields": { "status": "shipped" },
    "removedFields": []
  }
}

Common Stream Patterns

All changes on a collection

{ "uri": "all-changes", "stages": [] }

Inserts only

{
  "uri": "new-documents",
  "stages": [
    { "$match": { "operationType": "insert" } }
  ]
}

Watch a specific field value

{
  "uri": "urgent-tickets",
  "stages": [
    {
      "$match": {
        "operationType": { "$in": ["insert", "update", "replace"] },
        "fullDocument.priority": "urgent"
      }
    }
  ]
}

Exclude deletes

{
  "uri": "non-delete-events",
  "stages": [
    {
      "$match": {
        "operationType": { "$nin": ["delete"] }
      }
    }
  ]
}

API Reference

Operation Endpoint

Read collection metadata

GET /<db>/<collection>/_meta

Save streams

PATCH /<db>/<collection> with { "streams": […​] }

Connect to stream

wss://<host>/<db>/<collection>/_streams/<uri>