EdgeStream
Outgoing Hook
Outgoing hooks process messages as they leave the application toward the transport — the right place for signing, encryption, auth header injection, and outbound logging.
Outgoing Pipeline Trigger
The outgoing pipeline runs when edgeStream.send(serverId, topic, body) or server.send(topic, body) is called. The message travels through server.outgoingPipeline before being serialized and handed to the transport.
Auth Token Injection Hook
export class AuthHeaderHook extends BaseHook {
readonly name = 'AuthHeaderHook';
readonly priority = 50;
constructor(private authService: AuthService) { super(); }
async execute(context: IPipelineContext): Promise<HookResult> {
const token = await this.authService.getToken();
context.envelope.headers = {
...context.envelope.headers,
'Authorization': `Bearer ${token}`,
'X-Tenant-Id': context.metadata.tenantId as string || '',
};
return { continue: true };
}
}
Message Signing Hook
export class HmacSignatureHook extends BaseHook {
readonly name = 'HmacSignatureHook';
readonly priority = 60;
constructor(private secretKey: string) { super(); }
async execute(context: IPipelineContext): Promise<HookResult> {
const payload = JSON.stringify(context.body);
const keyData = new TextEncoder().encode(this.secretKey);
const msgData = new TextEncoder().encode(payload);
const key = await crypto.subtle.importKey(
'raw', keyData, { name: 'HMAC', hash: 'SHA-256' }, false, ['sign']
);
const signature = await crypto.subtle.sign('HMAC', key, msgData);
const sigHex = Array.from(new Uint8Array(signature))
.map(b => b.toString(16).padStart(2, '0')).join('');
context.envelope.headers = {
...context.envelope.headers,
'X-EdgeStream-Signature': sigHex,
};
return { continue: true };
}
}
Outbound Rate Limit Hook
export class OutboundRateLimitHook extends BaseHook {
readonly name = 'OutboundRateLimitHook';
readonly priority = 40;
private queue: Array<() => void> = [];
private lastSent = 0;
private readonly minIntervalMs: number;
constructor(options: { maxPerSecond: number }) {
super();
this.minIntervalMs = 1000 / options.maxPerSecond;
}
async execute(context: IPipelineContext): Promise<HookResult> {
const now = Date.now();
const wait = this.minIntervalMs - (now - this.lastSent);
if (wait > 0) {
await new Promise(resolve => setTimeout(resolve, wait));
}
this.lastSent = Date.now();
return { continue: true };
}
}
Registration on Outgoing Pipeline
const server = edgeStream.server('bas')!;
server.outgoingPipeline.addHook(new OutboundRateLimitHook({ maxPerSecond: 50 }));
server.outgoingPipeline.addHook(new AuthHeaderHook(authService));
server.outgoingPipeline.addHook(new HmacSignatureHook(process.env.HMAC_SECRET!));