Portal Community

Priority Range: 120–199

Pre-pipeline hooks sit just after normalization (110) and before the core processing stage (200+). They receive a fully parsed NormalizedEnvelopeBody and can read business metadata without parsing raw bytes.

Enrichment Hook Pattern

import { BaseHook, IPipelineContext, HookResult } from 'edge-stream-js';

export class UserContextEnrichmentHook extends BaseHook {
  readonly name = 'UserContextEnrichmentHook';
  readonly priority = 120;

  constructor(private userService: UserService) { super(); }

  async execute(context: IPipelineContext): Promise<HookResult> {
    const body = context.body as any;
    const userId = body?.data?.metadata?.userId;

    if (!userId) return { continue: true }; // not all messages have userId

    const user = await this.userService.findById(userId);
    if (!user) return { continue: true }; // user lookup failure is non-fatal

    // Attach to metadata — accessible in all downstream hooks
    context.metadata.user = user;
    context.metadata.userId = userId;
    context.metadata.tenantId = user.tenantId;
    return { continue: true };
  }
}

Authorization Hook Pattern

export class TenantBoundaryHook extends BaseHook {
  readonly name = 'TenantBoundaryHook';
  readonly priority = 135;

  constructor(private currentTenantId: string) { super(); }

  async execute(context: IPipelineContext): Promise<HookResult> {
    const messageTenantId = context.metadata.tenantId as string;

    // If message has a tenantId and it doesn't match, reject
    if (messageTenantId && messageTenantId !== this.currentTenantId) {
      context.abort(`Tenant boundary violation: ${messageTenantId} !== ${this.currentTenantId}`);
      return { continue: false };
    }
    return { continue: true };
  }
}

Routing Decision Hook

export class TopicRoutingHook extends BaseHook {
  readonly name = 'TopicRoutingHook';
  readonly priority = 160;

  async execute(context: IPipelineContext): Promise<HookResult> {
    const topic = context.envelope.meta.topic || '';

    // Tag message with routing attributes for subscriber filtering
    if (topic.startsWith('agent.chat')) {
      context.envelope.attributes = {
        ...context.envelope.attributes,
        routingClass: 'streaming',
        priority: 'high',
      };
    } else if (topic.startsWith('workflow.execution')) {
      context.envelope.attributes = {
        ...context.envelope.attributes,
        routingClass: 'workflow',
        priority: 'normal',
      };
    }
    return { continue: true };
  }
}

Pre-Pipeline Hook Registration

const server = edgeStream.server('bas')!;

// Add in any order — pipeline sorts by priority automatically
server.incomingPipeline.addHook(new UserContextEnrichmentHook(userService));
server.incomingPipeline.addHook(new TenantBoundaryHook(currentTenantId));
server.incomingPipeline.addHook(new TopicRoutingHook());