Edit Page

Custom SseService Plugin

RESTHeart Cloud
Note
SSE support is available from RESTHeart 9.3 onwards.

The SseService plugin interface lets you build any SSE endpoint without a MongoDB connection. Use it for live dashboards, system metrics, IoT feeds, log tailing, or any application where the server pushes a stream of events to HTTP clients.

This works in RESTHeart’s standalone mode (java -jar restheart.jar -s) — no MongoDB is required.

The SseService Interface

public interface SseService extends Plugin {
    void onConnect(ServerSentEventConnection connection, String lastEventId);
}

onConnect is called once per client connection. Use the connection object to push events and register cleanup tasks. The lastEventId parameter carries the Last-Event-ID header value when a client reconnects after a drop — it is null on the first connection.

@RegisterPlugin Annotation

Annotate your class with @RegisterPlugin to register it with the framework:

Parameter Description Mandatory Default

name

Unique plugin name

yes

—

description

Human-readable description

yes

—

defaultURI

The URI path where the SSE endpoint is exposed

no

/<name>

enabledByDefault

Whether the plugin is enabled without explicit configuration

no

true

secure

When true, the auth/authz pipeline runs before onConnect — unauthenticated requests get 401 Unauthorized

no

false

ServerSentEventConnection Methods

The connection parameter passed to onConnect is Undertow’s ServerSentEventConnection. Key methods:

Method Description

send(String data, String event, String id, EventCallback cb)

Send one SSE event. event sets the event: field (used for browser addEventListener). id sets the id: field for client-side Last-Event-ID tracking. Pass null to omit either.

send(String data)

Send a plain data: event with no event type or id.

isOpen()

Returns true while the client is connected.

setKeepAliveTime(long millis)

Send a keep-alive comment every millis milliseconds to prevent proxies from closing idle connections.

addCloseTask(ChannelListener<ServerSentEventConnection> task)

Register a callback invoked when the client disconnects or the connection is shut down.

shutdown()

Close the connection from the server side.

Complete Example: UTC Clock Feed

The sse-clock example (source) exposes a live UTC clock at /sse/clock, pushing one tick event per second:

@RegisterPlugin(
    name        = "clockSse",
    description = "Pushes a UTC timestamp tick event every second",
    defaultURI  = "/sse/clock",
    secure      = false
)
public class ClockSseService implements SseService {

    @Override
    public void onConnect(ServerSentEventConnection conn, String lastEventId) {
        // keep-alive comment every 15 seconds to prevent proxy timeouts
        conn.setKeepAliveTime(15_000);

        // run the event loop on a virtual thread — does not block I/O
        Thread.ofVirtual().start(() -> {
            try {
                while (conn.isOpen()) {
                    conn.send(Instant.now().toString(), "tick", null, null);
                    Thread.sleep(Duration.ofSeconds(1));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });

        // clean up resources when the client disconnects
        conn.addCloseTask(c -> { /* release any resources here */ });
    }
}

The SSE wire format for each tick event:

event:tick
data:2026-03-17T14:32:01.123456789Z

event:tick
data:2026-03-17T14:32:02.234567890Z

Build and Deploy

# Build the plugin JAR
./mvnw package -pl examples/sse-clock

# Copy it to the plugins directory
cp examples/sse-clock/target/sse-clock.jar /path/to/restheart/plugins/

# Run RESTHeart in standalone mode (no MongoDB needed)
java -jar restheart.jar -s

Testing

cURL
curl -N http://localhost:8080/sse/clock
HTTPie
http --stream GET http://localhost:8080/sse/clock
Browser (JavaScript EventSource)
<!DOCTYPE html>
<html>
<body>
  <p>Current time: <span id="time">—</span></p>

  <script>
    const es = new EventSource('http://localhost:8080/sse/clock');

    es.addEventListener('tick', event => {
      document.getElementById('time').textContent = event.data;
    });

    es.onerror = err => console.error('SSE error', err);
  </script>
</body>
</html>

Overriding the URI via Configuration

The endpoint URI can be changed in restheart.yml without recompiling:

/clockSse/uri: /api/clock

To disable the plugin entirely:

/clockSse/enabled: false

Securing the Endpoint

Set secure = true in @RegisterPlugin to require authentication:

@RegisterPlugin(
    name       = "metricsStream",
    description = "Internal metrics feed",
    defaultURI = "/sse/metrics",
    secure     = true          // 401 for unauthenticated requests
)
public class MetricsSseService implements SseService {
    @Override
    public void onConnect(ServerSentEventConnection conn, String lastEventId) { ... }
}

Then create an ACL rule to allow only the ops role:

curl -i -X POST 'http://localhost:8080/acl' \
  -u admin:secret \
  -H 'Content-Type: application/json' \
  -d '{
    "_id": "opsCanReadMetricsStream",
    "predicate": "path-prefix(/sse/metrics)",
    "priority": 0,
    "roles": ["ops"]
  }'

Handling Reconnections with Last-Event-ID

When a client reconnects, the browser EventSource sends the id of the last event it received. Use this to resume from the right position:

@RegisterPlugin(name = "eventFeed", description = "Resumable event feed", defaultURI = "/sse/events")
public class EventFeedSseService implements SseService {

    @Override
    public void onConnect(ServerSentEventConnection conn, String lastEventId) {
        // determine where to start based on the reconnect id
        long startSequence = lastEventId != null ? Long.parseLong(lastEventId) : 0L;

        Thread.ofVirtual().start(() -> {
            long seq = startSequence;
            try {
                while (conn.isOpen()) {
                    String event = fetchEvent(seq); // your data source
                    conn.send(event, "update", String.valueOf(seq), null);
                    seq++;
                    Thread.sleep(Duration.ofMillis(500));
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    private String fetchEvent(long seq) { return "{\"seq\":" + seq + "}"; }
}

The client wire exchange looks like:

# First connection — server sends events with sequential IDs
event:update
id:0
data:{"seq":0}

event:update
id:1
data:{"seq":1}

# Client disconnects then reconnects with:
# Last-Event-ID: 1
# Server resumes from seq=2

Broadcasting to Multiple Clients

To fan out events to all connected clients, maintain a shared session registry:

@RegisterPlugin(name = "broadcastFeed", description = "Fan-out feed", defaultURI = "/sse/broadcast")
public class BroadcastSseService implements SseService {

    private static final Set<ServerSentEventConnection> SESSIONS =
        Collections.synchronizedSet(new HashSet<>());

    @Override
    public void onConnect(ServerSentEventConnection conn, String lastEventId) {
        SESSIONS.add(conn);

        conn.addCloseTask(c -> SESSIONS.remove(conn));

        // broadcast is driven externally, e.g. from an Initializer that
        // calls BroadcastSseService.broadcast(message) on some event
    }

    public static void broadcast(String data) {
        new HashSet<>(SESSIONS).forEach(conn -> {
            if (conn.isOpen()) conn.send(data, "update", null, null);
        });
    }
}
Note
ServerSentEventConnection.send() is thread-safe. You can call it from any thread — including virtual threads, scheduled tasks, or message consumers.

Next Steps