Portal Community

The Subscription Model

EdgeStream uses a push-based pub/sub model. You register a callback for a topic pattern; when a matching message is processed, your callback is invoked with the complete IEnvelope. No polling, no promise-based waiting — your callback fires the moment the pipeline completes.

// Basic subscription
const subscription = edgeStream.subscribe(
  'bas',                     // server ID
  'workflow.execution.*',    // topic pattern
  (envelope: IEnvelope) => { // callback
    console.log('Message received:', envelope.body);
  }
);

// subscription.id   — unique ID, e.g. 'sub-a1b2c3d4'
// subscription.active — true while subscribed
// subscription.unsubscribe() — remove this subscription

Core Components

SubscriptionManager

Central registry of all active subscriptions. Receives publish calls from the pipeline and routes envelopes to matching callbacks. One instance per server.

TopicMatcher

Evaluates subscription patterns against message topics using exact match and .* wildcard. No regex — uses efficient string operations.

SubscriptionRegistry

The internal data structure (Map of ID → SubscriberEntry) that SubscriptionManager uses to store and look up subscriptions. Used by observability to enumerate active subscriptions.

ISubscription Handle

Returned by every subscribe() call. Holds the subscription ID and exposes unsubscribe(). Store this reference for cleanup.

Subscription Lifecycle

1

Register

edgeStream.subscribe(serverId, topic, callback) — generates a unique subscription ID ('sub-xxxx') and stores the entry in SubscriptionManager.

2

Match

When a message arrives, TopicMatcher.matches(pattern, topic) is called for each subscription. Matching subscriptions are queued for delivery.

3

Deliver

All matching subscriber callbacks are invoked concurrently. Async callbacks are collected in a Promise.all(). Each callback's error is isolated.

4

Cleanup

subscription.unsubscribe() removes the entry from SubscriptionManager. Any future messages no longer invoke this callback.

Topic Pattern Rules

PatternMatchesDoes Not Match
workflow.startedworkflow.started (exact)workflow.started.sub
workflow.*workflow.started, workflow.doneworkflow.node.started
agent.chat.*agent.chat.token, agent.chat.doneagent.chat, agent.other
workflow.execution.*workflow.execution.startedworkflow.execution.node.done
Always Unsubscribe Subscriptions remain active until explicitly removed. In React components, always call subscription.unsubscribe() in the cleanup function of useEffect. Leaked subscriptions accumulate and cause memory growth in long-running applications.