EdgeStream
Short-Circuit
A hook can stop all further processing by returning continue: false — the message is silently dropped without reaching any subscriber. Short-circuiting is the mechanism behind filtering, blocking, and deduplication.
How Short-Circuit Works
When any hook returns { continue: false }, the pipeline engine immediately stops executing remaining hooks and does not call SubscriptionManager.publish(). The message is dropped cleanly.
// Pipeline engine (simplified)
for (const hook of sortedHooks) {
const result = await hook.execute(context);
if (!result.continue || context.isAborted) {
// Short-circuit — stop here, message dropped
logger.debug(`Pipeline aborted at hook: ${hook.name}`);
return;
}
}
// Only reached if ALL hooks returned continue: true
await subscriptionManager.publish(topic, envelope);
Short-Circuit Use Cases
Message Filtering
// Only pass through messages for the current tenant
export class TenantFilterHook extends BaseHook {
readonly name = 'TenantFilterHook';
readonly priority = 130;
constructor(private allowedTenantId: string) { super(); }
async execute(context: IPipelineContext): Promise<HookResult> {
const messageTenantId = (context.body as any)?.data?.metadata?.tenantId;
if (messageTenantId && messageTenantId !== this.allowedTenantId) {
// Silently drop — wrong tenant
return { continue: false };
}
return { continue: true };
}
}
Duplicate Detection
export class ExactDuplicateFilter extends BaseHook {
readonly name = 'ExactDuplicateFilter';
readonly priority = 125;
private processed = new Set<string>();
async execute(context: IPipelineContext): Promise<HookResult> {
const id = context.envelope.meta.id;
if (this.processed.has(id)) {
// Already processed this exact envelope — drop silently
return { continue: false };
}
this.processed.add(id);
return { continue: true };
}
}
Schema Validation Gate
import Ajv from 'ajv';
export class SchemaValidationHook extends BaseHook {
readonly name = 'SchemaValidationHook';
readonly priority = 100;
private validators = new Map<string, Ajv.ValidateFunction>();
async execute(context: IPipelineContext): Promise<HookResult> {
const topic = context.envelope.meta.topic || '';
const validator = this.validators.get(topic);
if (!validator) {
return { continue: true }; // no schema registered for this topic
}
const valid = validator(context.body);
if (!valid) {
const errors = validator.errors?.map(e => e.message).join('; ');
context.abort(`Schema validation failed: ${errors}`);
return { continue: false, error: errors };
}
return { continue: true };
}
}
context.abort() vs. Return { continue: false }
| Approach | When to Use | Logs Reason? |
|---|---|---|
context.abort('reason') + return { continue: false } | When you want a reason logged for observability | Yes — reason visible in HooksMonitor |
return { continue: false } only | Silent filter (no reason needed, e.g., tenant filter) | No — clean drop |
// With reason (logged to observer)
context.abort('Message rejected: tenant mismatch');
return { continue: false };
// Without reason (silent drop)
return { continue: false };
Abort Does Not Throw
context.abort() sets an internal flag and records the reason. It does not throw. You must still return { continue: false } explicitly — calling abort without returning false would allow the pipeline to continue:
// WRONG — context.isAborted is set, but pipeline still continues
async execute(context) {
context.abort('Stop this message');
return { continue: true }; // Bug! Pipeline continues despite abort
}
// CORRECT
async execute(context) {
context.abort('Stop this message');
return { continue: false }; // Consistent — abort + stop
}
Next: Hooks Guide
For a complete reference of all six hook types and how to implement each one, proceed to Guide 3: Hooks.