EdgeStream
Post-Pipeline Stage
Post-pipeline hooks run after the core processing stage and just before subscriber delivery — the right place for audit logging, metrics emission, acknowledgment, and side effects that should happen regardless of subscriber outcome.
Post-Pipeline Hook Position
Post-pipeline hooks use priorities of 300 and above. They execute after all processing hooks and before the terminal PublishToSubscribersHook. Since they run after processing, the message body is in its final, fully-transformed state.
Audit Logging Hook
export class AuditLogHook extends BaseHook {
readonly name = 'AuditLogHook';
readonly priority = 300;
constructor(private auditService: AuditService) { super(); }
async execute(context: IPipelineContext): Promise<HookResult> {
const { envelope, metadata } = context;
// Audit every message — fire-and-forget (don't await)
this.auditService.log({
messageId: envelope.meta.id,
topic: envelope.meta.topic,
serverId: envelope.meta.serverId,
transport: envelope.meta.transport,
tenantId: metadata.tenantId as string,
receivedAt: envelope.meta.receivedAt,
bodySize: JSON.stringify(context.body).length,
}).catch(err => console.warn('[AuditLogHook] Non-critical audit failure:', err));
// Always continue — audit failure must not block delivery
return { continue: true };
}
}
Metrics Emission Hook
export class MetricsHook extends BaseHook {
readonly name = 'MetricsHook';
readonly priority = 310;
private counters = new Map<string, number>();
async execute(context: IPipelineContext): Promise<HookResult> {
const topic = context.envelope.meta.topic || 'unknown';
const topicPrefix = topic.split('.')[0];
// Increment counters
const key = `messages.${topicPrefix}`;
this.counters.set(key, (this.counters.get(key) || 0) + 1);
// Emit to metrics system
metricsClient.increment('edge_stream.messages_processed', 1, {
topic: topicPrefix,
transport: context.envelope.meta.transport,
serverId: context.envelope.meta.serverId,
});
context.addActivity(`Metrics emitted for topic prefix: ${topicPrefix}`);
return { continue: true };
}
getCounters(): ReadonlyMap<string, number> {
return this.counters;
}
}
Acknowledgment Hook
Some systems require explicit message acknowledgment. The post-pipeline stage is the right place — after the message has been fully processed but before subscribers run:
export class AcknowledgmentHook extends BaseHook {
readonly name = 'AcknowledgmentHook';
readonly priority = 320;
constructor(private ackService: MessageAckService) { super(); }
async execute(context: IPipelineContext): Promise<HookResult> {
const messageId = context.envelope.meta.id;
const correlationId = context.envelope.meta.correlationId;
if (correlationId) {
await this.ackService.acknowledge(messageId, correlationId);
}
return { continue: true };
}
}
Snapshot / Cache Hook
export class MessageSnapshotHook extends BaseHook {
readonly name = 'MessageSnapshotHook';
readonly priority = 330;
private cache = new Map<string, IEnvelope[]>();
private readonly maxPerTopic = 50;
async execute(context: IPipelineContext): Promise<HookResult> {
const topic = context.envelope.meta.topic || 'default';
if (!this.cache.has(topic)) {
this.cache.set(topic, []);
}
const topicCache = this.cache.get(topic)!;
topicCache.push(context.envelope);
// Keep only the last N messages
if (topicCache.length > this.maxPerTopic) {
topicCache.shift();
}
return { continue: true };
}
getRecent(topic: string, count = 10): IEnvelope[] {
const all = this.cache.get(topic) || [];
return all.slice(-count);
}
}
Post-Pipeline Hooks Must Not Abort
Post-pipeline hooks should almost never call
context.abort(). By this stage the message has been fully validated, processed, and is ready for delivery. Aborting in post-pipeline wastes all preceding processing. If you need to conditionally block delivery, use a processing-stage hook instead.