Pipeline Error Handling
EdgeStream distinguishes between hook errors (thrown exceptions), graceful aborts (context.abort()), and subscriber errors — each handled differently to maximize resilience.
Three Types of Pipeline Errors
| Error Type | Cause | Pipeline Effect | Message Effect |
|---|---|---|---|
| Hook throws | Unhandled exception in hook.execute() | Pipeline aborts | Message dropped, error logged |
| context.abort() | Hook intentionally stops processing | Pipeline aborts cleanly | Message dropped, reason logged |
| Subscriber throws | Exception in subscriber callback | No effect on pipeline | Other subscribers still receive |
Hook Exception Handling
If a hook's execute() throws, the pipeline engine catches the exception, logs it, and aborts. No subsequent hooks run. The NormalizationHook deliberately catches its own exceptions and returns continue: true — normalization failures are non-fatal.
// Safe hook pattern — catch internally and decide whether to abort
export class SafeEnrichmentHook extends BaseHook {
readonly name = 'SafeEnrichmentHook';
readonly priority = 150;
async execute(context: IPipelineContext): Promise<HookResult> {
try {
const data = await this.externalService.fetch(context.envelope.meta.id);
context.metadata.enrichmentData = data;
} catch (error) {
// Log warning but allow pipeline to continue
console.warn('[SafeEnrichmentHook] Enrichment failed, continuing:', error);
context.metadata.enrichmentFailed = true;
// Return continue: true — message still delivered without enrichment
}
return { continue: true };
}
}
Graceful Abort vs. Exception
// Preferred: use context.abort() for intentional stops
async execute(context: IPipelineContext): Promise<HookResult> {
if (!isValid(context.body)) {
context.abort('Validation failed: missing required field "tenantId"');
return { continue: false }; // must return after abort
}
return { continue: true };
}
// Avoid: letting exceptions propagate (less informative error messages)
async execute(context: IPipelineContext): Promise<HookResult> {
if (!isValid(context.body)) {
throw new Error('Invalid message'); // caught by pipeline, less informative
}
return { continue: true };
}
Server-Level Error Events
Unhandled pipeline errors bubble up to the EdgeStream facade as lifecycle events:
edgeStream.on('stream:error', (event) => {
console.error('EdgeStream error:', event.data?.error);
// Log to monitoring system
monitoring.captureException(event.data?.error);
});
edgeStream.on('server:error', (event) => {
console.error(`Server ${event.serverId} error:`, event.data?.error);
});
Subscriber Error Isolation
Subscriber errors are logged to the pipeline observer but never rethrow or affect other subscribers. Design subscribers to be resilient — catch their own errors and update local state:
// Defensive subscriber
edgeStream.subscribe('bas', 'workflow.*', async (envelope) => {
try {
await workflowStore.updateFromEvent(envelope.body);
} catch (error) {
// Never let subscriber errors propagate
console.error('[WorkflowSubscriber] Failed to update store:', error);
errorReporter.captureException(error, { topic: envelope.meta.topic });
}
});
NormalizationHook — Non-Fatal by Design
The NormalizationHook catches all its internal errors and returns continue: true even on failure. This means the pipeline continues with the un-normalized body. Subscribers must handle both normalized and raw body shapes in this edge case:
// NormalizationHook error handling (from source)
try {
// ... normalize body
return { continue: true };
} catch (error) {
const errorMsg = error instanceof Error ? error.message : String(error);
console.error(`[NormalizationHook] Error: ${errorMsg}`);
// Non-fatal — normalization is optional, pipeline continues
return { continue: true, error: errorMsg };
}