Custom SseService Plugin
RESTHeart Cloud|
Note
|
SSE support is available from RESTHeart 9.3 onwards. |
The SseService plugin interface lets you build any SSE endpoint without a MongoDB connection. Use it for live dashboards, system metrics, IoT feeds, log tailing, or any application where the server pushes a stream of events to HTTP clients.
This works in RESTHeart’s standalone mode (java -jar restheart.jar -s) — no MongoDB is required.
The SseService Interface
public interface SseService extends Plugin {
void onConnect(ServerSentEventConnection connection, String lastEventId);
}
onConnect is called once per client connection. Use the connection object to push events and register cleanup tasks. The lastEventId parameter carries the Last-Event-ID header value when a client reconnects after a drop — it is null on the first connection.
@RegisterPlugin Annotation
Annotate your class with @RegisterPlugin to register it with the framework:
| Parameter | Description | Mandatory | Default |
|---|---|---|---|
|
Unique plugin name |
yes |
— |
|
Human-readable description |
yes |
— |
|
The URI path where the SSE endpoint is exposed |
no |
|
|
Whether the plugin is enabled without explicit configuration |
no |
|
|
When |
no |
|
ServerSentEventConnection Methods
The connection parameter passed to onConnect is Undertow’s ServerSentEventConnection. Key methods:
| Method | Description |
|---|---|
|
Send one SSE event. |
|
Send a plain |
|
Returns |
|
Send a keep-alive comment every |
|
Register a callback invoked when the client disconnects or the connection is shut down. |
|
Close the connection from the server side. |
Complete Example: UTC Clock Feed
The sse-clock example (source) exposes a live UTC clock at /sse/clock, pushing one tick event per second:
@RegisterPlugin(
name = "clockSse",
description = "Pushes a UTC timestamp tick event every second",
defaultURI = "/sse/clock",
secure = false
)
public class ClockSseService implements SseService {
@Override
public void onConnect(ServerSentEventConnection conn, String lastEventId) {
// keep-alive comment every 15 seconds to prevent proxy timeouts
conn.setKeepAliveTime(15_000);
// run the event loop on a virtual thread — does not block I/O
Thread.ofVirtual().start(() -> {
try {
while (conn.isOpen()) {
conn.send(Instant.now().toString(), "tick", null, null);
Thread.sleep(Duration.ofSeconds(1));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// clean up resources when the client disconnects
conn.addCloseTask(c -> { /* release any resources here */ });
}
}
The SSE wire format for each tick event:
event:tick
data:2026-03-17T14:32:01.123456789Z
event:tick
data:2026-03-17T14:32:02.234567890Z
Build and Deploy
# Build the plugin JAR
./mvnw package -pl examples/sse-clock
# Copy it to the plugins directory
cp examples/sse-clock/target/sse-clock.jar /path/to/restheart/plugins/
# Run RESTHeart in standalone mode (no MongoDB needed)
java -jar restheart.jar -s
Testing
cURL
curl -N http://localhost:8080/sse/clock
HTTPie
http --stream GET http://localhost:8080/sse/clock
Browser (JavaScript EventSource)
<!DOCTYPE html>
<html>
<body>
<p>Current time: <span id="time">—</span></p>
<script>
const es = new EventSource('http://localhost:8080/sse/clock');
es.addEventListener('tick', event => {
document.getElementById('time').textContent = event.data;
});
es.onerror = err => console.error('SSE error', err);
</script>
</body>
</html>
Overriding the URI via Configuration
The endpoint URI can be changed in restheart.yml without recompiling:
/clockSse/uri: /api/clock
To disable the plugin entirely:
/clockSse/enabled: false
Securing the Endpoint
Set secure = true in @RegisterPlugin to require authentication:
@RegisterPlugin(
name = "metricsStream",
description = "Internal metrics feed",
defaultURI = "/sse/metrics",
secure = true // 401 for unauthenticated requests
)
public class MetricsSseService implements SseService {
@Override
public void onConnect(ServerSentEventConnection conn, String lastEventId) { ... }
}
Then create an ACL rule to allow only the ops role:
curl -i -X POST 'http://localhost:8080/acl' \
-u admin:secret \
-H 'Content-Type: application/json' \
-d '{
"_id": "opsCanReadMetricsStream",
"predicate": "path-prefix(/sse/metrics)",
"priority": 0,
"roles": ["ops"]
}'
Handling Reconnections with Last-Event-ID
When a client reconnects, the browser EventSource sends the id of the last event it received. Use this to resume from the right position:
@RegisterPlugin(name = "eventFeed", description = "Resumable event feed", defaultURI = "/sse/events")
public class EventFeedSseService implements SseService {
@Override
public void onConnect(ServerSentEventConnection conn, String lastEventId) {
// determine where to start based on the reconnect id
long startSequence = lastEventId != null ? Long.parseLong(lastEventId) : 0L;
Thread.ofVirtual().start(() -> {
long seq = startSequence;
try {
while (conn.isOpen()) {
String event = fetchEvent(seq); // your data source
conn.send(event, "update", String.valueOf(seq), null);
seq++;
Thread.sleep(Duration.ofMillis(500));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
private String fetchEvent(long seq) { return "{\"seq\":" + seq + "}"; }
}
The client wire exchange looks like:
# First connection — server sends events with sequential IDs
event:update
id:0
data:{"seq":0}
event:update
id:1
data:{"seq":1}
# Client disconnects then reconnects with:
# Last-Event-ID: 1
# Server resumes from seq=2
Broadcasting to Multiple Clients
To fan out events to all connected clients, maintain a shared session registry:
@RegisterPlugin(name = "broadcastFeed", description = "Fan-out feed", defaultURI = "/sse/broadcast")
public class BroadcastSseService implements SseService {
private static final Set<ServerSentEventConnection> SESSIONS =
Collections.synchronizedSet(new HashSet<>());
@Override
public void onConnect(ServerSentEventConnection conn, String lastEventId) {
SESSIONS.add(conn);
conn.addCloseTask(c -> SESSIONS.remove(conn));
// broadcast is driven externally, e.g. from an Initializer that
// calls BroadcastSseService.broadcast(message) on some event
}
public static void broadcast(String data) {
new HashSet<>(SESSIONS).forEach(conn -> {
if (conn.isOpen()) conn.send(data, "update", null, null);
});
}
}
|
Note
|
ServerSentEventConnection.send() is thread-safe. You can call it from any thread — including virtual threads, scheduled tasks, or message consumers.
|
Next Steps
-
Tutorial: Change Streams over SSE — stream MongoDB change events without writing a plugin
-
Plugin Framework Overview — learn about the full plugin model
-
Services — the HTTP request/response sibling of
SseService -
WebSocket API Overview — bidirectional alternative for MongoDB change streams