Portal Community

Outgoing Pipeline Trigger

The outgoing pipeline runs when edgeStream.send(serverId, topic, body) or server.send(topic, body) is called. The message travels through server.outgoingPipeline before being serialized and handed to the transport.

Auth Token Injection Hook

export class AuthHeaderHook extends BaseHook {
  readonly name = 'AuthHeaderHook';
  readonly priority = 50;

  constructor(private authService: AuthService) { super(); }

  async execute(context: IPipelineContext): Promise<HookResult> {
    const token = await this.authService.getToken();
    context.envelope.headers = {
      ...context.envelope.headers,
      'Authorization': `Bearer ${token}`,
      'X-Tenant-Id': context.metadata.tenantId as string || '',
    };
    return { continue: true };
  }
}

Message Signing Hook

export class HmacSignatureHook extends BaseHook {
  readonly name = 'HmacSignatureHook';
  readonly priority = 60;

  constructor(private secretKey: string) { super(); }

  async execute(context: IPipelineContext): Promise<HookResult> {
    const payload = JSON.stringify(context.body);
    const keyData = new TextEncoder().encode(this.secretKey);
    const msgData = new TextEncoder().encode(payload);

    const key = await crypto.subtle.importKey(
      'raw', keyData, { name: 'HMAC', hash: 'SHA-256' }, false, ['sign']
    );
    const signature = await crypto.subtle.sign('HMAC', key, msgData);
    const sigHex = Array.from(new Uint8Array(signature))
      .map(b => b.toString(16).padStart(2, '0')).join('');

    context.envelope.headers = {
      ...context.envelope.headers,
      'X-EdgeStream-Signature': sigHex,
    };
    return { continue: true };
  }
}

Outbound Rate Limit Hook

export class OutboundRateLimitHook extends BaseHook {
  readonly name = 'OutboundRateLimitHook';
  readonly priority = 40;

  private queue: Array<() => void> = [];
  private lastSent = 0;
  private readonly minIntervalMs: number;

  constructor(options: { maxPerSecond: number }) {
    super();
    this.minIntervalMs = 1000 / options.maxPerSecond;
  }

  async execute(context: IPipelineContext): Promise<HookResult> {
    const now = Date.now();
    const wait = this.minIntervalMs - (now - this.lastSent);

    if (wait > 0) {
      await new Promise(resolve => setTimeout(resolve, wait));
    }

    this.lastSent = Date.now();
    return { continue: true };
  }
}

Registration on Outgoing Pipeline

const server = edgeStream.server('bas')!;

server.outgoingPipeline.addHook(new OutboundRateLimitHook({ maxPerSecond: 50 }));
server.outgoingPipeline.addHook(new AuthHeaderHook(authService));
server.outgoingPipeline.addHook(new HmacSignatureHook(process.env.HMAC_SECRET!));