EdgeStream
Pre-Pipeline Stage
Pre-pipeline hooks run on the normalized envelope before it reaches the core processing stage — the right place for enrichment, routing decisions, and cross-cutting concerns like rate limiting and tenant isolation.
When Pre-Pipeline Hooks Run
Pre-pipeline hooks execute after normalization (priority 110) and before the core processing stage. They receive a fully normalized IEnvelope and can read, transform, or abort it. Common pre-pipeline hook priorities are 120–199.
What Belongs in Pre-Pipeline Hooks
- Message enrichment — add computed fields to
context.metadata(e.g., resolved user object, tenant config) - Routing decisions — determine target topic, fork to alternate topics
- Rate limiting — check and enforce per-tenant message rate
- Deduplication — skip already-processed messages by correlation ID
- Header injection — add tracing headers to outgoing messages
- Tenant isolation validation — confirm tenant ID matches authenticated user
Implementing a Pre-Pipeline Hook
import { BaseHook, IPipelineContext, HookResult } from 'edge-stream-js';
// Enriches each message with resolved tenant configuration
export class TenantEnrichmentHook extends BaseHook {
readonly name = 'TenantEnrichmentHook';
readonly priority = 120; // just after NormalizationHook (110)
constructor(private tenantService: TenantService) {
super();
}
async execute(context: IPipelineContext): Promise<HookResult> {
const tenantId = (context.body as any)?.data?.metadata?.tenantId;
if (!tenantId) {
// Not all messages have tenantId — pass through
return { continue: true };
}
try {
const tenant = await this.tenantService.get(tenantId);
// Store in metadata — available to all subsequent hooks and subscribers
context.metadata.tenant = tenant;
context.metadata.tenantId = tenantId;
context.addActivity(`Enriched with tenant: ${tenant.name}`);
} catch (error) {
// Enrichment failure — abort to prevent unauthorized processing
context.abort(`Tenant ${tenantId} not found or inactive`);
return { continue: false };
}
return { continue: true };
}
}
Deduplication Hook Pattern
export class DeduplicationHook extends BaseHook {
readonly name = 'DeduplicationHook';
readonly priority = 130;
private seen = new Set<string>();
private readonly maxSize = 10000;
async execute(context: IPipelineContext): Promise<HookResult> {
const messageId = context.envelope.meta.id;
const correlationId = context.envelope.meta.correlationId;
const key = correlationId || messageId;
if (this.seen.has(key)) {
// Duplicate — abort silently
context.abort(`Duplicate message: ${key}`);
return { continue: false };
}
this.seen.add(key);
// Evict oldest entries when cache is full
if (this.seen.size > this.maxSize) {
const first = this.seen.values().next().value;
this.seen.delete(first);
}
return { continue: true };
}
}
Rate Limiting Hook Pattern
export class RateLimitHook extends BaseHook {
readonly name = 'RateLimitHook';
readonly priority = 140;
private counters = new Map<string, { count: number; resetAt: number }>();
private readonly maxPerWindow: number;
private readonly windowMs: number;
constructor(options: { maxPerWindow: number; windowMs: number }) {
super();
this.maxPerWindow = options.maxPerWindow;
this.windowMs = options.windowMs;
}
async execute(context: IPipelineContext): Promise<HookResult> {
const tenantId = context.metadata.tenantId as string || 'global';
const now = Date.now();
let entry = this.counters.get(tenantId);
if (!entry || now > entry.resetAt) {
entry = { count: 0, resetAt: now + this.windowMs };
this.counters.set(tenantId, entry);
}
entry.count++;
if (entry.count > this.maxPerWindow) {
context.abort(`Rate limit exceeded for tenant: ${tenantId}`);
return { continue: false, error: 'rate_limit_exceeded' };
}
return { continue: true };
}
}
Registering Pre-Pipeline Hooks
const server = edgeStream.server('bas')!;
// Hooks are added to the incoming pipeline
server.incomingPipeline.addHook(new TenantEnrichmentHook(tenantService));
server.incomingPipeline.addHook(new DeduplicationHook());
server.incomingPipeline.addHook(new RateLimitHook({
maxPerWindow: 100,
windowMs: 60_000, // 100 messages per minute per tenant
}));
Context Metadata Flows Forward
Data written to
context.metadata in any hook is accessible to all subsequent hooks and — importantly — to subscriber callbacks via the envelope's attributes field if you copy it there.