Portal Community

Synchronous Callback

// Sync — fast, ideal for pure state updates
stream.subscribe('bas', 'workflow.execution.*', (envelope) => {
  const { nodeId, status } = envelope.body as WorkflowNodeEvent;
  workflowStore.setNodeStatus(nodeId, status);
});

Async Callback

// Async — for I/O operations, API calls, database writes
stream.subscribe('bas', 'hil.approval.*', async (envelope) => {
  const request = envelope.body as HilApprovalRequest;
  await hilStore.addPendingTask(request);
  await emailService.notifyApprovers(request.approverIds, request.taskId);
});

Named Function Pattern (Recommended)

Use named functions instead of anonymous arrows — the function name appears in observability logs and stack traces:

// Named function — better for debugging
async function handleWorkflowNodeEvent(envelope: IEnvelope<WorkflowNodeEvent>) {
  try {
    const { nodeId, status, output } = envelope.body;
    observerStore.updateNode(nodeId, { status, output });
  } catch (error) {
    console.error('[WorkflowSubscriber] Failed to update observer store:', error);
  }
}

stream.subscribe('bas', 'workflow.execution.*', handleWorkflowNodeEvent);
// → SubscriberRegistry shows callbackName: 'handleWorkflowNodeEvent'

Discriminated Union Pattern

type AgentEvent =
  | { eventType: 'token'; token: string }
  | { eventType: 'tool-call'; toolName: string }
  | { eventType: 'done' };

stream.subscribe<AgentEvent>('bas', 'agent.chat.*', (envelope) => {
  switch (envelope.body.eventType) {
    case 'token':
      chatStore.appendToken(envelope.body.token);  // typed ✅
      break;
    case 'tool-call':
      chatStore.showToolCall(envelope.body.toolName);  // typed ✅
      break;
    case 'done':
      chatStore.markComplete();
      break;
  }
});

Error Handling in Callbacks

Always catch errors inside callbacks. An uncaught error in one subscriber does not affect others — but it's still logged as a failure in the observability system:

stream.subscribe('bas', 'workflow.*', async (envelope) => {
  try {
    await processMessage(envelope.body);
  } catch (error) {
    // Safe error handling — never rethrow
    errorStore.record({
      error,
      messageId: envelope.meta.id,
      topic: envelope.meta.topic
    });
  }
});

Topic Guard Pattern

// One subscription with internal routing by topic
stream.subscribe('bas', 'workflow.*', (envelope) => {
  const topic = envelope.meta.topic || '';

  if (topic.includes('.started')) {
    handleStarted(envelope.body as WorkflowStartedEvent);
  } else if (topic.includes('.completed')) {
    handleCompleted(envelope.body as WorkflowCompletedEvent);
  } else if (topic.includes('.failed')) {
    handleFailed(envelope.body as WorkflowFailedEvent);
  }
});