- Before Running Examples
- Introduction
- Prerequisites
- Step 1: Create a Collection with Change Streams
- Step 2: Verify the Change Streams
- Step 3: Connect to the Change Stream
- Step 4: Configure Unauthenticated Access (Optional)
- Step 5: Test Real-Time Notifications
- Step 6: Using Filtered Streams with Variables
- Summary
- Next Steps
WebSocket API Tutorial
🔧 Configuration
Before Running Examples
To run the examples on this page, you need a RESTHeart instance.
Option 1: Use RESTHeart Cloud (Recommended)
The fastest way to get started is with RESTHeart Cloud. Create a free service in minutes:
-
Sign up at cloud.restheart.com
-
Create a free API service
-
Set up your root user following the Root User Setup guide
-
Use the configuration panel above to set your service URL and credentials
Tip
|
All code examples on this page will automatically use your configured RESTHeart Cloud credentials. |
Option 2: Run RESTHeart Locally
If you prefer local development, follow the Setup Guide to install RESTHeart on your machine.
Note
|
Local instances run at http://localhost:8080 with default credentials admin:secret
|
Introduction
This tutorial will guide you through creating a real-time messaging system using RESTHeart’s WebSocket API. You’ll learn how to:
-
Define change streams on a MongoDB collection
-
Connect to WebSocket streams to receive real-time notifications
-
Filter events using MongoDB aggregation pipelines
-
Configure permissions for WebSocket access
By the end of this tutorial, you’ll have a working real-time notification system that responds instantly to data changes.
Prerequisites
-
MongoDB configured as a Replica Set (required for change streams)
Tip
|
For testing WebSocket connections, install websocat, a command-line client for WebSockets. Installation instructions: https://github.com/vi/websocat#installation or download binaries from https://github.com/vi/websocat/releases |
Step 1: Create a Collection with Change Streams
First, we’ll create a messages
collection and define two change streams:
-
all - streams all insert and update operations, bound at
/messages/_streams/all
-
mine - streams messages filtered by name, bound at
/messages/_streams/mine
cURL
curl -i -X PUT '[RESTHEART-URL]/messages' \
-H 'Authorization: Basic [BASIC-AUTH]' \
-H 'Content-Type: application/json' \
-d '{
"streams" : [
{ "stages" : [
{
"_$match": {
"_$or" : [ { "operationType": "insert" }, { "operationType": "update" } ]
}
}
],
"uri" : "all"
},
{ "stages" : [
{ "_$match" : { "fullDocument::name" : { "_$var" : "n" } } }
],
"uri" : "mine"
}
]
}'
HTTPie
echo '{
"streams" : [
{ "stages" : [
{
"_$match": {
"_$or" : [ { "operationType": "insert" }, { "operationType": "update" } ]
}
}
],
"uri" : "all"
},
{ "stages" : [
{ "_$match" : { "fullDocument::name" : { "_$var" : "n" } } }
],
"uri" : "mine"
}
]
}' | http PUT [RESTHEART-URL]/messages \
'Authorization:Basic [BASIC-AUTH]' \
'Content-Type:application/json'
JavaScript
fetch('[RESTHEART-URL]/messages', {
method: 'PUT',
headers: {
'Authorization': 'Basic [BASIC-AUTH]',
'Content-Type': 'application/json'
},
body: JSON.stringify({
"streams" : [
{ "stages" : [
{
"_$match": {
"_$or" : [ { "operationType": "insert" }, { "operationType": "update" } ]
}
}
],
"uri" : "all"
},
{ "stages" : [
{ "_$match" : { "fullDocument::name" : { "_$var" : "n" } } }
],
"uri" : "mine"
}
]
})
})
.then(response => {
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
})
.then(data => console.log(data))
.catch(error => console.error('Error:', error));
The request body defines the streams
array with two change stream configurations:
-
all stream: Uses a
$match
stage to filter forinsert
orupdate
operations -
mine stream: Uses a
$match
stage with a variablen
to filter documents by thename
field
Note
|
The $match stage uses fullDocument::name to access the name property within the change event structure.
|
Understanding Change Events
When a document changes in MongoDB, the change stream emits a change event. Here’s what a typical change event looks like:
{
"fullDocument": {
"_id": { "$oid": "5e15ff5779ca449eb20fdd09" },
"message": "hi uji, how are you?",
"name": "uji",
"_etag": { "$oid": "5e15ff57a2e5700c3459e801" }
},
"documentKey": {
"_id": { "$oid": "5e15ff5779ca449eb20fdd09" }
},
"updateDescription": null,
"operationType": "insert"
}
The change event contains:
-
fullDocument
- the complete document after the change -
documentKey
- the_id
of the changed document -
operationType
- the type of operation (insert
,update
,delete
, etc.) -
updateDescription
- details about updated fields (for update operations)
This is why we use fullDocument::name
in our match stage - we’re accessing the name
field within the fullDocument
object.
Step 2: Verify the Change Streams
Let’s verify that our change streams were created successfully by checking the collection metadata with the SHAL
representation.
cURL
curl -i -X GET '[RESTHEART-URL]/messages?rep=SHAL' \
-H 'Authorization: Basic [BASIC-AUTH]'
HTTPie
http GET [RESTHEART-URL]/messages rep==SHAL \
'Authorization:Basic [BASIC-AUTH]'
JavaScript
fetch('[RESTHEART-URL]/messages?rep=SHAL', {
method: 'GET',
headers: {
'Authorization': 'Basic [BASIC-AUTH]'
}
})
.then(response => {
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
})
.then(data => console.log(data))
.catch(error => console.error('Error:', error));
You should see the _links
property containing references to your change streams:
{
"_links": {
"all": {
"href": "/messages/_streams/all"
},
"mine": {
"href": "/messages/_streams/mine"
}
}
}
Great! The change streams are now configured and ready to use.
Optional: Using Conditional Stages
Alternatively, you can define a single change stream that returns either all messages or only those sent by a specific name
, depending on whether a variable is provided. This is achieved using optional stages with the $ifvar
operator:
{
"streams" : [
{ "stages" : [
{ "$ifvar": [ "n", { "_$match" : { "fullDocument::name" : { "_$var" : "n" } } } ] }
],
"uri" : "withOptionalStage"
}
]
}
The $ifvar
operator checks if the variable n
is provided. If it is, the $match
stage is applied; otherwise, all documents pass through.
Step 3: Connect to the Change Stream
Now let’s connect to the change stream using WebSocket. We’ll use websocat
to establish a WebSocket connection.
Connecting with Authentication
Connect to the all
stream using the default admin credentials:
$ websocat --text - autoreconnect:ws://admin:secret@127.0.0.1:8080/messages/_streams/all
The connection is now established and waiting for events. The autoreconnect:
prefix ensures the connection automatically reconnects if it drops.
Step 4: Configure Unauthenticated Access (Optional)
For public-facing applications or development purposes, you might want to allow WebSocket connections without authentication. Let’s create an ACL permission for this.
cURL
curl -i -X POST '[RESTHEART-URL]/acl' \
-H 'Authorization: Basic [BASIC-AUTH]' \
-H 'Content-Type: application/json' \
-d '{
"_id": "unauthenticatedCanConnectToMyWebSocket",
"predicate": "path-prefix('"'"'/messages/_streams/all'"'"')",
"priority": 0,
"roles": [ "$unauthenticated" ]
}'
HTTPie
echo '{
"_id": "unauthenticatedCanConnectToMyWebSocket",
"predicate": "path-prefix('"'"'/messages/_streams/all'"'"')",
"priority": 0,
"roles": [ "$unauthenticated" ]
}' | http POST [RESTHEART-URL]/acl \
'Authorization:Basic [BASIC-AUTH]' \
'Content-Type:application/json'
JavaScript
fetch('[RESTHEART-URL]/acl', {
method: 'POST',
headers: {
'Authorization': 'Basic [BASIC-AUTH]',
'Content-Type': 'application/json'
},
body: JSON.stringify({
"_id": "unauthenticatedCanConnectToMyWebSocket",
"predicate": "path-prefix('/messages/_streams/all')",
"priority": 0,
"roles": [ "$unauthenticated" ]
})
})
.then(response => {
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
})
.then(data => console.log(data))
.catch(error => console.error('Error:', error));
This ACL permission grants the $unauthenticated
role access to the WebSocket endpoint at /messages/_streams/all
.
Testing Unauthenticated Connection
With this permission in place, you can now connect to the WebSocket without providing credentials:
$ websocat --text - autoreconnect:ws://127.0.0.1:8080/messages/_streams/all
Warning
|
Be careful when allowing unauthenticated access in production environments. Only use this for development or when appropriate security measures are in place. |
Step 5: Test Real-Time Notifications
Now for the exciting part! Let’s create a document and see the real-time notification in action.
Keep your WebSocket connection open in one terminal, then in another terminal, create a new message:
cURL
curl -i -X POST '[RESTHEART-URL]/messages' \
-H 'Authorization: Basic [BASIC-AUTH]' \
-H 'Content-Type: application/json' \
-d '{
"message": "Hello WebSockets!",
"name": "uji"
}'
HTTPie
echo '{
"message": "Hello WebSockets!",
"name": "uji"
}' | http POST [RESTHEART-URL]/messages \
'Authorization:Basic [BASIC-AUTH]' \
'Content-Type:application/json'
JavaScript
fetch('[RESTHEART-URL]/messages', {
method: 'POST',
headers: {
'Authorization': 'Basic [BASIC-AUTH]',
'Content-Type': 'application/json'
},
body: JSON.stringify({
"message": "Hello WebSockets!",
"name": "uji"
})
})
.then(response => {
if (!response.ok) throw new Error(`HTTP ${response.status}`);
return response.json();
})
.then(data => console.log(data))
.catch(error => console.error('Error:', error));
Observing the Real-Time Event
Immediately after creating the document, you should see the following output in your websocat
terminal:
$ websocat --text - autoreconnect:ws://127.0.0.1:8080/messages/_streams/all
{"fullDocument":{"_id":{"$oid":"62166d53ebdcd56455a1a7ab"},"message":"Hello WebSockets!","name":"uji","_etag":{"$oid":"62166d53ebdcd56455a1a7aa"}},"documentKey":{"_id":{"$oid":"62166d53ebdcd56455a1a7ab"}},"operationType":"insert"}
Success! The change event was received in real-time through the WebSocket connection.
Step 6: Using Filtered Streams with Variables
Remember the mine
stream we created earlier? It uses a variable n
to filter messages by name. Let’s test it.
Connecting with a Query Parameter
Connect to the mine
stream and pass the n
variable as a query parameter:
$ websocat --text - autoreconnect:ws://127.0.0.1:8080/messages/_streams/mine?n=uji
Now, only messages where name
equals "uji" will be streamed to this WebSocket connection.
Testing the Filter
Create two messages with different names:
# This message will be received (name=uji)
curl -X POST 'http://127.0.0.1:8080/messages' \
-u admin:secret \
-H 'Content-Type: application/json' \
-d '{"message": "This is for uji", "name": "uji"}'
# This message will NOT be received (name=andrea)
curl -X POST 'http://127.0.0.1:8080/messages' \
-u admin:secret \
-H 'Content-Type: application/json' \
-d '{"message": "This is for andrea", "name": "andrea"}'
Your WebSocket connection will only receive the first message because it matches the filter condition name=uji
.
Summary
Congratulations! You’ve successfully:
-
✓ Created a MongoDB collection with change streams
-
✓ Defined aggregation pipelines to filter events
-
✓ Connected to WebSocket streams
-
✓ Configured ACL permissions for unauthenticated access
-
✓ Received real-time notifications when documents change
-
✓ Used query parameters to filter streamed events
Next Steps
Now that you understand the basics, you can:
-
Explore more complex aggregation pipelines in change streams
-
Build a web application using JavaScript WebSocket API (
new WebSocket(…)
) -
Implement authentication with JWT tokens for WebSocket connections
-
Use change streams for real-time dashboards, chat applications, or live data feeds
For more information, check out: