Portal Community

When Pre-Pipeline Hooks Run

Pre-pipeline hooks execute after normalization (priority 110) and before the core processing stage. They receive a fully normalized IEnvelope and can read, transform, or abort it. Common pre-pipeline hook priorities are 120–199.

What Belongs in Pre-Pipeline Hooks

Implementing a Pre-Pipeline Hook

import { BaseHook, IPipelineContext, HookResult } from 'edge-stream-js';

// Enriches each message with resolved tenant configuration
export class TenantEnrichmentHook extends BaseHook {
  readonly name = 'TenantEnrichmentHook';
  readonly priority = 120; // just after NormalizationHook (110)

  constructor(private tenantService: TenantService) {
    super();
  }

  async execute(context: IPipelineContext): Promise<HookResult> {
    const tenantId = (context.body as any)?.data?.metadata?.tenantId;

    if (!tenantId) {
      // Not all messages have tenantId — pass through
      return { continue: true };
    }

    try {
      const tenant = await this.tenantService.get(tenantId);
      // Store in metadata — available to all subsequent hooks and subscribers
      context.metadata.tenant = tenant;
      context.metadata.tenantId = tenantId;
      context.addActivity(`Enriched with tenant: ${tenant.name}`);
    } catch (error) {
      // Enrichment failure — abort to prevent unauthorized processing
      context.abort(`Tenant ${tenantId} not found or inactive`);
      return { continue: false };
    }

    return { continue: true };
  }
}

Deduplication Hook Pattern

export class DeduplicationHook extends BaseHook {
  readonly name = 'DeduplicationHook';
  readonly priority = 130;

  private seen = new Set<string>();
  private readonly maxSize = 10000;

  async execute(context: IPipelineContext): Promise<HookResult> {
    const messageId = context.envelope.meta.id;
    const correlationId = context.envelope.meta.correlationId;
    const key = correlationId || messageId;

    if (this.seen.has(key)) {
      // Duplicate — abort silently
      context.abort(`Duplicate message: ${key}`);
      return { continue: false };
    }

    this.seen.add(key);

    // Evict oldest entries when cache is full
    if (this.seen.size > this.maxSize) {
      const first = this.seen.values().next().value;
      this.seen.delete(first);
    }

    return { continue: true };
  }
}

Rate Limiting Hook Pattern

export class RateLimitHook extends BaseHook {
  readonly name = 'RateLimitHook';
  readonly priority = 140;

  private counters = new Map<string, { count: number; resetAt: number }>();
  private readonly maxPerWindow: number;
  private readonly windowMs: number;

  constructor(options: { maxPerWindow: number; windowMs: number }) {
    super();
    this.maxPerWindow = options.maxPerWindow;
    this.windowMs = options.windowMs;
  }

  async execute(context: IPipelineContext): Promise<HookResult> {
    const tenantId = context.metadata.tenantId as string || 'global';
    const now = Date.now();

    let entry = this.counters.get(tenantId);
    if (!entry || now > entry.resetAt) {
      entry = { count: 0, resetAt: now + this.windowMs };
      this.counters.set(tenantId, entry);
    }

    entry.count++;

    if (entry.count > this.maxPerWindow) {
      context.abort(`Rate limit exceeded for tenant: ${tenantId}`);
      return { continue: false, error: 'rate_limit_exceeded' };
    }

    return { continue: true };
  }
}

Registering Pre-Pipeline Hooks

const server = edgeStream.server('bas')!;

// Hooks are added to the incoming pipeline
server.incomingPipeline.addHook(new TenantEnrichmentHook(tenantService));
server.incomingPipeline.addHook(new DeduplicationHook());
server.incomingPipeline.addHook(new RateLimitHook({
  maxPerWindow: 100,
  windowMs: 60_000, // 100 messages per minute per tenant
}));
Context Metadata Flows Forward Data written to context.metadata in any hook is accessible to all subsequent hooks and — importantly — to subscriber callbacks via the envelope's attributes field if you copy it there.