Portal Community

Three Types of Pipeline Errors

Error TypeCausePipeline EffectMessage Effect
Hook throwsUnhandled exception in hook.execute()Pipeline abortsMessage dropped, error logged
context.abort()Hook intentionally stops processingPipeline aborts cleanlyMessage dropped, reason logged
Subscriber throwsException in subscriber callbackNo effect on pipelineOther subscribers still receive

Hook Exception Handling

If a hook's execute() throws, the pipeline engine catches the exception, logs it, and aborts. No subsequent hooks run. The NormalizationHook deliberately catches its own exceptions and returns continue: true — normalization failures are non-fatal.

// Safe hook pattern — catch internally and decide whether to abort
export class SafeEnrichmentHook extends BaseHook {
  readonly name = 'SafeEnrichmentHook';
  readonly priority = 150;

  async execute(context: IPipelineContext): Promise<HookResult> {
    try {
      const data = await this.externalService.fetch(context.envelope.meta.id);
      context.metadata.enrichmentData = data;
    } catch (error) {
      // Log warning but allow pipeline to continue
      console.warn('[SafeEnrichmentHook] Enrichment failed, continuing:', error);
      context.metadata.enrichmentFailed = true;
      // Return continue: true — message still delivered without enrichment
    }
    return { continue: true };
  }
}

Graceful Abort vs. Exception

// Preferred: use context.abort() for intentional stops
async execute(context: IPipelineContext): Promise<HookResult> {
  if (!isValid(context.body)) {
    context.abort('Validation failed: missing required field "tenantId"');
    return { continue: false }; // must return after abort
  }
  return { continue: true };
}

// Avoid: letting exceptions propagate (less informative error messages)
async execute(context: IPipelineContext): Promise<HookResult> {
  if (!isValid(context.body)) {
    throw new Error('Invalid message'); // caught by pipeline, less informative
  }
  return { continue: true };
}

Server-Level Error Events

Unhandled pipeline errors bubble up to the EdgeStream facade as lifecycle events:

edgeStream.on('stream:error', (event) => {
  console.error('EdgeStream error:', event.data?.error);
  // Log to monitoring system
  monitoring.captureException(event.data?.error);
});

edgeStream.on('server:error', (event) => {
  console.error(`Server ${event.serverId} error:`, event.data?.error);
});

Subscriber Error Isolation

Subscriber errors are logged to the pipeline observer but never rethrow or affect other subscribers. Design subscribers to be resilient — catch their own errors and update local state:

// Defensive subscriber
edgeStream.subscribe('bas', 'workflow.*', async (envelope) => {
  try {
    await workflowStore.updateFromEvent(envelope.body);
  } catch (error) {
    // Never let subscriber errors propagate
    console.error('[WorkflowSubscriber] Failed to update store:', error);
    errorReporter.captureException(error, { topic: envelope.meta.topic });
  }
});

NormalizationHook — Non-Fatal by Design

The NormalizationHook catches all its internal errors and returns continue: true even on failure. This means the pipeline continues with the un-normalized body. Subscribers must handle both normalized and raw body shapes in this edge case:

// NormalizationHook error handling (from source)
try {
  // ... normalize body
  return { continue: true };
} catch (error) {
  const errorMsg = error instanceof Error ? error.message : String(error);
  console.error(`[NormalizationHook] Error: ${errorMsg}`);
  // Non-fatal — normalization is optional, pipeline continues
  return { continue: true, error: errorMsg };
}