EdgeStream
Callback Patterns
Subscriber callbacks can be synchronous or async. They receive typed IEnvelope objects and should handle their own errors. Named functions give better observability in HooksMonitor and stack traces.
Synchronous Callback
// Sync — fast, ideal for pure state updates
stream.subscribe('bas', 'workflow.execution.*', (envelope) => {
const { nodeId, status } = envelope.body as WorkflowNodeEvent;
workflowStore.setNodeStatus(nodeId, status);
});
Async Callback
// Async — for I/O operations, API calls, database writes
stream.subscribe('bas', 'hil.approval.*', async (envelope) => {
const request = envelope.body as HilApprovalRequest;
await hilStore.addPendingTask(request);
await emailService.notifyApprovers(request.approverIds, request.taskId);
});
Named Function Pattern (Recommended)
Use named functions instead of anonymous arrows — the function name appears in observability logs and stack traces:
// Named function — better for debugging
async function handleWorkflowNodeEvent(envelope: IEnvelope<WorkflowNodeEvent>) {
try {
const { nodeId, status, output } = envelope.body;
observerStore.updateNode(nodeId, { status, output });
} catch (error) {
console.error('[WorkflowSubscriber] Failed to update observer store:', error);
}
}
stream.subscribe('bas', 'workflow.execution.*', handleWorkflowNodeEvent);
// → SubscriberRegistry shows callbackName: 'handleWorkflowNodeEvent'
Discriminated Union Pattern
type AgentEvent =
| { eventType: 'token'; token: string }
| { eventType: 'tool-call'; toolName: string }
| { eventType: 'done' };
stream.subscribe<AgentEvent>('bas', 'agent.chat.*', (envelope) => {
switch (envelope.body.eventType) {
case 'token':
chatStore.appendToken(envelope.body.token); // typed ✅
break;
case 'tool-call':
chatStore.showToolCall(envelope.body.toolName); // typed ✅
break;
case 'done':
chatStore.markComplete();
break;
}
});
Error Handling in Callbacks
Always catch errors inside callbacks. An uncaught error in one subscriber does not affect others — but it's still logged as a failure in the observability system:
stream.subscribe('bas', 'workflow.*', async (envelope) => {
try {
await processMessage(envelope.body);
} catch (error) {
// Safe error handling — never rethrow
errorStore.record({
error,
messageId: envelope.meta.id,
topic: envelope.meta.topic
});
}
});
Topic Guard Pattern
// One subscription with internal routing by topic
stream.subscribe('bas', 'workflow.*', (envelope) => {
const topic = envelope.meta.topic || '';
if (topic.includes('.started')) {
handleStarted(envelope.body as WorkflowStartedEvent);
} else if (topic.includes('.completed')) {
handleCompleted(envelope.body as WorkflowCompletedEvent);
} else if (topic.includes('.failed')) {
handleFailed(envelope.body as WorkflowFailedEvent);
}
});