Subscribers Overview
Subscribers are typed callbacks that receive message envelopes on matched topic patterns. The SubscriptionManager routes envelopes to matching subscribers after the pipeline completes — with full isolation between callbacks.
The Subscription Model
EdgeStream uses a push-based pub/sub model. You register a callback for a topic pattern; when a matching message is processed, your callback is invoked with the complete IEnvelope. No polling, no promise-based waiting — your callback fires the moment the pipeline completes.
// Basic subscription
const subscription = edgeStream.subscribe(
'bas', // server ID
'workflow.execution.*', // topic pattern
(envelope: IEnvelope) => { // callback
console.log('Message received:', envelope.body);
}
);
// subscription.id — unique ID, e.g. 'sub-a1b2c3d4'
// subscription.active — true while subscribed
// subscription.unsubscribe() — remove this subscription
Core Components
SubscriptionManager
Central registry of all active subscriptions. Receives publish calls from the pipeline and routes envelopes to matching callbacks. One instance per server.
TopicMatcher
Evaluates subscription patterns against message topics using exact match and .* wildcard. No regex — uses efficient string operations.
SubscriptionRegistry
The internal data structure (Map of ID → SubscriberEntry) that SubscriptionManager uses to store and look up subscriptions. Used by observability to enumerate active subscriptions.
ISubscription Handle
Returned by every subscribe() call. Holds the subscription ID and exposes unsubscribe(). Store this reference for cleanup.
Subscription Lifecycle
Register
edgeStream.subscribe(serverId, topic, callback) — generates a unique subscription ID ('sub-xxxx') and stores the entry in SubscriptionManager.
Match
When a message arrives, TopicMatcher.matches(pattern, topic) is called for each subscription. Matching subscriptions are queued for delivery.
Deliver
All matching subscriber callbacks are invoked concurrently. Async callbacks are collected in a Promise.all(). Each callback's error is isolated.
Cleanup
subscription.unsubscribe() removes the entry from SubscriptionManager. Any future messages no longer invoke this callback.
Topic Pattern Rules
| Pattern | Matches | Does Not Match |
|---|---|---|
workflow.started | workflow.started (exact) | workflow.started.sub |
workflow.* | workflow.started, workflow.done | workflow.node.started |
agent.chat.* | agent.chat.token, agent.chat.done | agent.chat, agent.other |
workflow.execution.* | workflow.execution.started | workflow.execution.node.done |
subscription.unsubscribe() in the cleanup function of useEffect. Leaked subscriptions accumulate and cause memory growth in long-running applications.