Portal Community

The Pipeline at a Glance

The EdgeStream pipeline is linear and deterministic. A message enters at the transport and exits (or is aborted) before reaching subscribers. Each stage is composed of one or more hooks that execute in priority order.

T

Transport Receipt

Raw bytes arrive over SignalR, WebSocket, SSE, or HTTP polling. The transport emits a raw message event to the server.

N

Normalization Stage

NormalizationHook (priority 110) converts raw bytes into a standardized IEnvelope with CloudEvents metadata. Runs after decryption (90) and verification (95).

P

Incoming Pipeline Hooks

All registered incoming hooks execute in priority order. Each hook receives the IPipelineContext and can read/modify body, set metadata, or abort the pipeline.

D

Subscriber Delivery

SubscriptionManager.publish() matches the envelope topic against all registered subscriber patterns and invokes matching callbacks concurrently.

O

Observability Events

Each stage emits events to IPipelineObserver — feeding HooksMonitor and SubscribersMonitor in real time.

Incoming vs. Outgoing Pipeline

Each IServer instance maintains two independent pipelines:

PipelineDirectionTriggered ByTerminates At
IncomingPipelineTransport → AppMessage received from transportSubscriber callbacks
OutgoingPipelineApp → Transportserver.send() callTransport send()

Pipeline Context

Every hook receives the same IPipelineContext object for the duration of one message's journey through the pipeline. The context is mutable — hooks communicate via body and metadata.

export interface IPipelineContext<TBody = unknown> {
  readonly envelope: IEnvelope<TBody>; // immutable reference to the envelope
  readonly serverId: string;
  body: TBody;                          // mutable — hooks transform this
  metadata: Record<string, unknown>;   // shared scratch space between hooks
  abort(reason?: string): void;         // abort the pipeline
  readonly isAborted: boolean;
  pause(request: IUserInputRequest): IPipelinePause; // pause for user input
  readonly isPaused: boolean;
  readonly activityLog: readonly string[];
  addActivity(message: string): void;
}

Hook Priority Reference

PriorityHookStagePurpose
5HookActivityLoggerBothCaptures timing before any other hook runs
30Custom auth hooksIncomingToken validation, tenant verification
50Custom filter hooksBothMessage filtering, deduplication
90DecryptHookIncomingDecrypts encrypted body
95VerifySignatureHookIncomingVerifies HMAC/JWT signature
100ValidationHookIncomingSchema validation
110NormalizationHookIncomingCloudEvents normalization
200+Custom enrichment hooksBothBusiness data enrichment

Pipeline Lifecycle Events

The EdgeStream facade emits events for key pipeline moments:

edgeStream.on('message:received', (event) => {
  // fired when transport delivers a raw message
  console.log('Message from server:', event.serverId);
});

edgeStream.on('pipeline:paused', (event) => {
  // fired when a hook calls context.pause()
  console.log('Pipeline paused, waiting for user input');
});

edgeStream.on('pipeline:resumed', (event) => {
  // fired when the paused pipeline is resumed
  console.log('Pipeline resumed, continuing delivery');
});
Pipeline is Linear Messages cannot skip stages. If the normalization hook is missing, raw bytes reach subscriber callbacks — which typically causes errors. Always register NormalizationHook on incoming pipelines that receive JSON messages.