Portal Community

What Normalization Does

When a message arrives from a transport, it's in whatever format the backend chose to send — plain JSON, a custom object, or a partially structured payload. The normalization stage converts this into the standardized NormalizedEnvelopeBody format that the rest of the pipeline and subscribers expect.

Sacred Rule: Business Data is Never Modified The NormalizationHook adds CloudEvents protocol fields at the root level of the body. It never modifies body.data or body.data.metadata. Your business payload is completely protected.

NormalizationHook Specification

export class NormalizationHook implements IHook {
  readonly name = 'NormalizationHook';
  readonly priority = 110; // runs AFTER: Decrypt(90), VerifySignature(95), Validation(100)
  readonly enabled: boolean;

  constructor(enabled: boolean = true) { ... }

  async execute(context: IPipelineContext): Promise<HookResult> {
    // Skip if already CloudEvents format
    if (this.isAlreadyNormalized(context.envelope)) {
      return { continue: true };
    }
    // ... normalize body
  }
}

The Normalized Body Shape

After normalization, envelope.body conforms to NormalizedEnvelopeBody:

export interface NormalizedEnvelopeBody {
  // CloudEvents protocol wrapper (added by NormalizationHook)
  specversion: string;          // always '1.0'
  type: string;                 // inferred event type (e.g., 'com.bizfirst.workflow.event')
  source: string;               // e.g., 'https://bas-server/edge-stream'
  id: string;                   // UUID
  time?: string;                // ISO timestamp
  subject?: string;             // topic string
  datacontenttype?: string;     // 'application/json'
  dataschema?: string;          // 'https://schema.bizfirst.com/event/v1'

  // BizFirstAI business data — COMPLETELY UNTOUCHED
  data: {
    metadata?: {                // routing metadata
      workflowId?: string;
      userId?: string;
      correlationId?: string;
      priority?: string;
      tags?: string[];
    };
    data?: { [key: string]: unknown }; // actual business payload
  };

  // Original message preserved for audit trail
  _original?: {
    format: string;     // e.g., 'json'
    body: unknown;      // snapshot of the original message
    timestamp: string;  // when normalization occurred
  };
}

Event Type Inference

The hook infers the CloudEvents type field from heuristics applied to the business data — reading only, never modifying:

// Type inference logic (read-only heuristics)
if (data?.type)          → use data.type
if (data?.eventType)     → use data.eventType
if (data?.messageType)   → use data.messageType
if (data?.author && text) → 'com.bizfirst.chat.message'
if (data?.formID)         → 'com.bizfirst.form.submission'
if (data?.workflowId)     → 'com.bizfirst.workflow.event'
else                      → 'com.bizfirst.generic.event'

Execution Order Requirement

The NormalizationHook must run after security hooks to ensure data integrity:

90

DecryptHook (priority 90)

Decrypts the message body first — NormalizationHook should never see encrypted data.

95

VerifySignatureHook (priority 95)

Verifies HMAC/signature on the original format — before any transformation changes the bytes.

100

ValidationHook (priority 100)

Validates the original message structure — schema validation on raw input.

110

NormalizationHook (priority 110)

Now safe to normalize — data is decrypted, verified, and validated. Wraps in CloudEvents structure.

Registering the Hook

import { NormalizationHook } from 'edge-stream-js';

const server = edgeStream.server('bas')!;

// Add to the incoming pipeline
server.incomingPipeline.addHook(new NormalizationHook(true));

// Or create disabled (no-op) — useful for testing
const noopHook = new NormalizationHook(false);

Skipping Already-Normalized Messages

If the message body already has CloudEvents fields (specversion + type), the hook skips normalization. This prevents double-normalization when a message passes through multiple pipeline stages.

// Hook checks first — returns early if already normalized
private isAlreadyNormalized(envelope: IEnvelope): boolean {
  const body = envelope.body as any;
  return (
    envelope.meta.protocol === 'cloudevents' ||
    (body && typeof body.specversion === 'string' && typeof body.type === 'string')
  );
}

Using NormalizationService Directly

Subscribers that want normalized access without using the hook can use the standalone NormalizationService:

import { NormalizationService } from 'edge-stream-js';

edgeStream.subscribe('bas', 'some.topic', (rawEnvelope) => {
  const normalized = NormalizationService.normalize(rawEnvelope);
  console.log(normalized.type);           // 'com.bizfirst.workflow.event'
  console.log(normalized.data.metadata);  // routing metadata
  console.log(normalized.data.data);      // business payload
});