Normalization Stage
The NormalizationHook converts raw incoming messages into standardized CloudEvents envelopes — without ever touching the business data payload.
What Normalization Does
When a message arrives from a transport, it's in whatever format the backend chose to send — plain JSON, a custom object, or a partially structured payload. The normalization stage converts this into the standardized NormalizedEnvelopeBody format that the rest of the pipeline and subscribers expect.
body.data or body.data.metadata. Your business payload is completely protected.
NormalizationHook Specification
export class NormalizationHook implements IHook {
readonly name = 'NormalizationHook';
readonly priority = 110; // runs AFTER: Decrypt(90), VerifySignature(95), Validation(100)
readonly enabled: boolean;
constructor(enabled: boolean = true) { ... }
async execute(context: IPipelineContext): Promise<HookResult> {
// Skip if already CloudEvents format
if (this.isAlreadyNormalized(context.envelope)) {
return { continue: true };
}
// ... normalize body
}
}
The Normalized Body Shape
After normalization, envelope.body conforms to NormalizedEnvelopeBody:
export interface NormalizedEnvelopeBody {
// CloudEvents protocol wrapper (added by NormalizationHook)
specversion: string; // always '1.0'
type: string; // inferred event type (e.g., 'com.bizfirst.workflow.event')
source: string; // e.g., 'https://bas-server/edge-stream'
id: string; // UUID
time?: string; // ISO timestamp
subject?: string; // topic string
datacontenttype?: string; // 'application/json'
dataschema?: string; // 'https://schema.bizfirst.com/event/v1'
// BizFirstAI business data — COMPLETELY UNTOUCHED
data: {
metadata?: { // routing metadata
workflowId?: string;
userId?: string;
correlationId?: string;
priority?: string;
tags?: string[];
};
data?: { [key: string]: unknown }; // actual business payload
};
// Original message preserved for audit trail
_original?: {
format: string; // e.g., 'json'
body: unknown; // snapshot of the original message
timestamp: string; // when normalization occurred
};
}
Event Type Inference
The hook infers the CloudEvents type field from heuristics applied to the business data — reading only, never modifying:
// Type inference logic (read-only heuristics)
if (data?.type) → use data.type
if (data?.eventType) → use data.eventType
if (data?.messageType) → use data.messageType
if (data?.author && text) → 'com.bizfirst.chat.message'
if (data?.formID) → 'com.bizfirst.form.submission'
if (data?.workflowId) → 'com.bizfirst.workflow.event'
else → 'com.bizfirst.generic.event'
Execution Order Requirement
The NormalizationHook must run after security hooks to ensure data integrity:
DecryptHook (priority 90)
Decrypts the message body first — NormalizationHook should never see encrypted data.
VerifySignatureHook (priority 95)
Verifies HMAC/signature on the original format — before any transformation changes the bytes.
ValidationHook (priority 100)
Validates the original message structure — schema validation on raw input.
NormalizationHook (priority 110)
Now safe to normalize — data is decrypted, verified, and validated. Wraps in CloudEvents structure.
Registering the Hook
import { NormalizationHook } from 'edge-stream-js';
const server = edgeStream.server('bas')!;
// Add to the incoming pipeline
server.incomingPipeline.addHook(new NormalizationHook(true));
// Or create disabled (no-op) — useful for testing
const noopHook = new NormalizationHook(false);
Skipping Already-Normalized Messages
If the message body already has CloudEvents fields (specversion + type), the hook skips normalization. This prevents double-normalization when a message passes through multiple pipeline stages.
// Hook checks first — returns early if already normalized
private isAlreadyNormalized(envelope: IEnvelope): boolean {
const body = envelope.body as any;
return (
envelope.meta.protocol === 'cloudevents' ||
(body && typeof body.specversion === 'string' && typeof body.type === 'string')
);
}
Using NormalizationService Directly
Subscribers that want normalized access without using the hook can use the standalone NormalizationService:
import { NormalizationService } from 'edge-stream-js';
edgeStream.subscribe('bas', 'some.topic', (rawEnvelope) => {
const normalized = NormalizationService.normalize(rawEnvelope);
console.log(normalized.type); // 'com.bizfirst.workflow.event'
console.log(normalized.data.metadata); // routing metadata
console.log(normalized.data.data); // business payload
});