EdgeStream
Creating a Subscription
Subscribe to a topic using edgeStream.subscribe(serverId, topic, callback). The returned handle lets you unsubscribe when done.
The subscribe() API
// Signature
subscribe<TBody = unknown>(
serverId: string,
topic: string,
callback: SubscriberCallback<TBody>
): ISubscription
// SubscriberCallback type
type SubscriberCallback<TBody = unknown> =
(envelope: IEnvelope<TBody>) => void | Promise<void>
Basic Subscription
import { createEdgeStream, IEnvelope } from 'edge-stream-js';
const stream = createEdgeStream();
// ... setup and start ...
interface WorkflowEvent {
nodeId: string;
status: 'started' | 'completed' | 'failed';
output?: unknown;
}
// Subscribe with typed body
const sub = stream.subscribe<WorkflowEvent>(
'bas', // server ID
'workflow.execution.*', // topic pattern
(envelope: IEnvelope<WorkflowEvent>) => {
const { nodeId, status } = envelope.body;
console.log(`Node ${nodeId}: ${status}`);
console.log('Topic:', envelope.meta.topic);
console.log('Received at:', envelope.meta.receivedAt);
console.log('Correlation:', envelope.meta.correlationId);
}
);
// sub.id → 'sub-a1b2c3d4'
// sub.active → true
// Later: sub.unsubscribe()
Server-Level Subscribe
You can also subscribe directly on a server instance — useful when you already have a reference to the server:
const server = stream.server('bas')!;
const sub = server.subscriptionManager.subscribe<ChatMessage>(
'agent.chat.*',
(envelope) => {
chatStore.appendMessage(envelope.body);
}
);
Multiple Subscriptions to One Topic
Multiple subscribers on the same topic all receive the message independently. One subscriber's failure does not affect others:
// Three subscribers on the same topic
const sub1 = stream.subscribe('bas', 'workflow.*', (e) => logStore.add(e));
const sub2 = stream.subscribe('bas', 'workflow.*', (e) => metricsStore.track(e));
const sub3 = stream.subscribe('bas', 'workflow.*', (e) => uiStore.update(e));
// All three receive every 'workflow.*' message independently
Async Subscriber
// Async callback — EdgeStream awaits all async subscribers concurrently
const sub = stream.subscribe('bas', 'hil.approval.*', async (envelope) => {
try {
await hilInboxStore.processApprovalRequest(envelope.body);
await notificationService.notifyUser(envelope.body.userId);
} catch (error) {
// Catch internally — never let async subscriber errors propagate
console.error('[HILSubscriber] Failed:', error);
}
});