Portal Community

Post-Pipeline Hook Position

Post-pipeline hooks use priorities of 300 and above. They execute after all processing hooks and before the terminal PublishToSubscribersHook. Since they run after processing, the message body is in its final, fully-transformed state.

Audit Logging Hook

export class AuditLogHook extends BaseHook {
  readonly name = 'AuditLogHook';
  readonly priority = 300;

  constructor(private auditService: AuditService) { super(); }

  async execute(context: IPipelineContext): Promise<HookResult> {
    const { envelope, metadata } = context;

    // Audit every message — fire-and-forget (don't await)
    this.auditService.log({
      messageId: envelope.meta.id,
      topic: envelope.meta.topic,
      serverId: envelope.meta.serverId,
      transport: envelope.meta.transport,
      tenantId: metadata.tenantId as string,
      receivedAt: envelope.meta.receivedAt,
      bodySize: JSON.stringify(context.body).length,
    }).catch(err => console.warn('[AuditLogHook] Non-critical audit failure:', err));

    // Always continue — audit failure must not block delivery
    return { continue: true };
  }
}

Metrics Emission Hook

export class MetricsHook extends BaseHook {
  readonly name = 'MetricsHook';
  readonly priority = 310;

  private counters = new Map<string, number>();

  async execute(context: IPipelineContext): Promise<HookResult> {
    const topic = context.envelope.meta.topic || 'unknown';
    const topicPrefix = topic.split('.')[0];

    // Increment counters
    const key = `messages.${topicPrefix}`;
    this.counters.set(key, (this.counters.get(key) || 0) + 1);

    // Emit to metrics system
    metricsClient.increment('edge_stream.messages_processed', 1, {
      topic: topicPrefix,
      transport: context.envelope.meta.transport,
      serverId: context.envelope.meta.serverId,
    });

    context.addActivity(`Metrics emitted for topic prefix: ${topicPrefix}`);
    return { continue: true };
  }

  getCounters(): ReadonlyMap<string, number> {
    return this.counters;
  }
}

Acknowledgment Hook

Some systems require explicit message acknowledgment. The post-pipeline stage is the right place — after the message has been fully processed but before subscribers run:

export class AcknowledgmentHook extends BaseHook {
  readonly name = 'AcknowledgmentHook';
  readonly priority = 320;

  constructor(private ackService: MessageAckService) { super(); }

  async execute(context: IPipelineContext): Promise<HookResult> {
    const messageId = context.envelope.meta.id;
    const correlationId = context.envelope.meta.correlationId;

    if (correlationId) {
      await this.ackService.acknowledge(messageId, correlationId);
    }

    return { continue: true };
  }
}

Snapshot / Cache Hook

export class MessageSnapshotHook extends BaseHook {
  readonly name = 'MessageSnapshotHook';
  readonly priority = 330;

  private cache = new Map<string, IEnvelope[]>();
  private readonly maxPerTopic = 50;

  async execute(context: IPipelineContext): Promise<HookResult> {
    const topic = context.envelope.meta.topic || 'default';

    if (!this.cache.has(topic)) {
      this.cache.set(topic, []);
    }

    const topicCache = this.cache.get(topic)!;
    topicCache.push(context.envelope);

    // Keep only the last N messages
    if (topicCache.length > this.maxPerTopic) {
      topicCache.shift();
    }

    return { continue: true };
  }

  getRecent(topic: string, count = 10): IEnvelope[] {
    const all = this.cache.get(topic) || [];
    return all.slice(-count);
  }
}
Post-Pipeline Hooks Must Not Abort Post-pipeline hooks should almost never call context.abort(). By this stage the message has been fully validated, processed, and is ready for delivery. Aborting in post-pipeline wastes all preceding processing. If you need to conditionally block delivery, use a processing-stage hook instead.