EdgeStream
Pre-Pipeline Hook
Pre-pipeline hooks operate on the normalized envelope before core processing. They are the standard location for enrichment, authorization checks, routing decisions, and filtering that applies to all messages on a server.
Priority Range: 120–199
Pre-pipeline hooks sit just after normalization (110) and before the core processing stage (200+). They receive a fully parsed NormalizedEnvelopeBody and can read business metadata without parsing raw bytes.
Enrichment Hook Pattern
import { BaseHook, IPipelineContext, HookResult } from 'edge-stream-js';
export class UserContextEnrichmentHook extends BaseHook {
readonly name = 'UserContextEnrichmentHook';
readonly priority = 120;
constructor(private userService: UserService) { super(); }
async execute(context: IPipelineContext): Promise<HookResult> {
const body = context.body as any;
const userId = body?.data?.metadata?.userId;
if (!userId) return { continue: true }; // not all messages have userId
const user = await this.userService.findById(userId);
if (!user) return { continue: true }; // user lookup failure is non-fatal
// Attach to metadata — accessible in all downstream hooks
context.metadata.user = user;
context.metadata.userId = userId;
context.metadata.tenantId = user.tenantId;
return { continue: true };
}
}
Authorization Hook Pattern
export class TenantBoundaryHook extends BaseHook {
readonly name = 'TenantBoundaryHook';
readonly priority = 135;
constructor(private currentTenantId: string) { super(); }
async execute(context: IPipelineContext): Promise<HookResult> {
const messageTenantId = context.metadata.tenantId as string;
// If message has a tenantId and it doesn't match, reject
if (messageTenantId && messageTenantId !== this.currentTenantId) {
context.abort(`Tenant boundary violation: ${messageTenantId} !== ${this.currentTenantId}`);
return { continue: false };
}
return { continue: true };
}
}
Routing Decision Hook
export class TopicRoutingHook extends BaseHook {
readonly name = 'TopicRoutingHook';
readonly priority = 160;
async execute(context: IPipelineContext): Promise<HookResult> {
const topic = context.envelope.meta.topic || '';
// Tag message with routing attributes for subscriber filtering
if (topic.startsWith('agent.chat')) {
context.envelope.attributes = {
...context.envelope.attributes,
routingClass: 'streaming',
priority: 'high',
};
} else if (topic.startsWith('workflow.execution')) {
context.envelope.attributes = {
...context.envelope.attributes,
routingClass: 'workflow',
priority: 'normal',
};
}
return { continue: true };
}
}
Pre-Pipeline Hook Registration
const server = edgeStream.server('bas')!;
// Add in any order — pipeline sorts by priority automatically
server.incomingPipeline.addHook(new UserContextEnrichmentHook(userService));
server.incomingPipeline.addHook(new TenantBoundaryHook(currentTenantId));
server.incomingPipeline.addHook(new TopicRoutingHook());