Portal Community

The subscribe() API

// Signature
subscribe<TBody = unknown>(
  serverId: string,
  topic: string,
  callback: SubscriberCallback<TBody>
): ISubscription

// SubscriberCallback type
type SubscriberCallback<TBody = unknown> =
  (envelope: IEnvelope<TBody>) => void | Promise<void>

Basic Subscription

import { createEdgeStream, IEnvelope } from 'edge-stream-js';

const stream = createEdgeStream();
// ... setup and start ...

interface WorkflowEvent {
  nodeId: string;
  status: 'started' | 'completed' | 'failed';
  output?: unknown;
}

// Subscribe with typed body
const sub = stream.subscribe<WorkflowEvent>(
  'bas',                          // server ID
  'workflow.execution.*',         // topic pattern
  (envelope: IEnvelope<WorkflowEvent>) => {
    const { nodeId, status } = envelope.body;
    console.log(`Node ${nodeId}: ${status}`);
    console.log('Topic:', envelope.meta.topic);
    console.log('Received at:', envelope.meta.receivedAt);
    console.log('Correlation:', envelope.meta.correlationId);
  }
);

// sub.id       → 'sub-a1b2c3d4'
// sub.active   → true
// Later: sub.unsubscribe()

Server-Level Subscribe

You can also subscribe directly on a server instance — useful when you already have a reference to the server:

const server = stream.server('bas')!;

const sub = server.subscriptionManager.subscribe<ChatMessage>(
  'agent.chat.*',
  (envelope) => {
    chatStore.appendMessage(envelope.body);
  }
);

Multiple Subscriptions to One Topic

Multiple subscribers on the same topic all receive the message independently. One subscriber's failure does not affect others:

// Three subscribers on the same topic
const sub1 = stream.subscribe('bas', 'workflow.*', (e) => logStore.add(e));
const sub2 = stream.subscribe('bas', 'workflow.*', (e) => metricsStore.track(e));
const sub3 = stream.subscribe('bas', 'workflow.*', (e) => uiStore.update(e));

// All three receive every 'workflow.*' message independently

Async Subscriber

// Async callback — EdgeStream awaits all async subscribers concurrently
const sub = stream.subscribe('bas', 'hil.approval.*', async (envelope) => {
  try {
    await hilInboxStore.processApprovalRequest(envelope.body);
    await notificationService.notifyUser(envelope.body.userId);
  } catch (error) {
    // Catch internally — never let async subscriber errors propagate
    console.error('[HILSubscriber] Failed:', error);
  }
});