NormalizationHook
The NormalizationHook is the entry-point processor of the incoming pipeline. It standardizes raw messages into the CloudEvents envelope format without ever touching the business payload.
Purpose and Position
Raw messages arriving from a transport can have varying shapes. The NormalizationHook runs at priority 110 — after security hooks (decrypt at 90, verify at 95, validate at 100) — and transforms whatever format the message is in into the standardized NormalizedEnvelopeBody that all downstream hooks and subscribers expect.
Constructor
import { NormalizationHook, createNormalizationHook } from 'edge-stream-js';
// Direct instantiation
const hook = new NormalizationHook(true); // enabled
const noop = new NormalizationHook(false); // disabled (pass-through)
// Factory function
const hook = createNormalizationHook(true);
What Gets Added to the Body
After normalization, envelope.body gains CloudEvents protocol fields at the root. The business data under data is never touched:
// Before normalization — raw JSON body
{
"workflowId": "wf-123",
"nodeId": "node-456",
"status": "completed"
}
// After normalization — CloudEvents wrapper added
{
"specversion": "1.0",
"type": "com.bizfirst.workflow.event",
"source": "https://bas-server/edge-stream",
"id": "4f3a2b1c-...",
"time": "2026-05-25T09:00:00.000Z",
"subject": "workflow.execution.node-completed",
"datacontenttype": "application/json",
"dataschema": "https://schema.bizfirst.com/event/v1",
"data": {
"metadata": {},
"data": {
"workflowId": "wf-123", // ← original business data, untouched
"nodeId": "node-456",
"status": "completed"
}
},
"_original": {
"format": "json",
"body": { "workflowId": "wf-123", ... }, // snapshot of original
"timestamp": "2026-05-25T09:00:00.123Z"
}
}
Envelope Meta Changes
After normalization, the hook also updates envelope.meta.protocol to 'cloudevents' and sets audit attributes:
envelope.meta.protocol = 'cloudevents';
envelope.attributes.normalized = true;
envelope.attributes.normalizedAt = '2026-05-25T09:00:00.123Z';
envelope.attributes.originalProtocol = 'json';
envelope.attributes.originalMessagePreserved = true;
Idempotent — Safe to Run Multiple Times
If the message is already CloudEvents format (has specversion + type), the hook skips normalization. This prevents double-normalization in multi-hop pipelines.
Using NormalizationService Without the Hook
import { NormalizationService, IEnvelope } from 'edge-stream-js';
// Normalize a raw envelope outside the pipeline
function processRawMessage(rawEnvelope: IEnvelope) {
const normalized = NormalizationService.normalize(rawEnvelope);
// Access structured data
const workflowId = normalized.data.metadata?.workflowId;
const payload = normalized.data.data;
}