Message Pipeline
Every EdgeStream message passes through an ordered sequence of stages — from raw bytes at the transport to typed envelope delivery to subscribers. Understanding the pipeline is the foundation of effective EdgeStream development.
The Pipeline at a Glance
The EdgeStream pipeline is linear and deterministic. A message enters at the transport and exits (or is aborted) before reaching subscribers. Each stage is composed of one or more hooks that execute in priority order.
Transport Receipt
Raw bytes arrive over SignalR, WebSocket, SSE, or HTTP polling. The transport emits a raw message event to the server.
Normalization Stage
NormalizationHook (priority 110) converts raw bytes into a standardized IEnvelope with CloudEvents metadata. Runs after decryption (90) and verification (95).
Incoming Pipeline Hooks
All registered incoming hooks execute in priority order. Each hook receives the IPipelineContext and can read/modify body, set metadata, or abort the pipeline.
Subscriber Delivery
SubscriptionManager.publish() matches the envelope topic against all registered subscriber patterns and invokes matching callbacks concurrently.
Observability Events
Each stage emits events to IPipelineObserver — feeding HooksMonitor and SubscribersMonitor in real time.
Incoming vs. Outgoing Pipeline
Each IServer instance maintains two independent pipelines:
| Pipeline | Direction | Triggered By | Terminates At |
|---|---|---|---|
IncomingPipeline | Transport → App | Message received from transport | Subscriber callbacks |
OutgoingPipeline | App → Transport | server.send() call | Transport send() |
Pipeline Context
Every hook receives the same IPipelineContext object for the duration of one message's journey through the pipeline. The context is mutable — hooks communicate via body and metadata.
export interface IPipelineContext<TBody = unknown> {
readonly envelope: IEnvelope<TBody>; // immutable reference to the envelope
readonly serverId: string;
body: TBody; // mutable — hooks transform this
metadata: Record<string, unknown>; // shared scratch space between hooks
abort(reason?: string): void; // abort the pipeline
readonly isAborted: boolean;
pause(request: IUserInputRequest): IPipelinePause; // pause for user input
readonly isPaused: boolean;
readonly activityLog: readonly string[];
addActivity(message: string): void;
}
Hook Priority Reference
| Priority | Hook | Stage | Purpose |
|---|---|---|---|
| 5 | HookActivityLogger | Both | Captures timing before any other hook runs |
| 30 | Custom auth hooks | Incoming | Token validation, tenant verification |
| 50 | Custom filter hooks | Both | Message filtering, deduplication |
| 90 | DecryptHook | Incoming | Decrypts encrypted body |
| 95 | VerifySignatureHook | Incoming | Verifies HMAC/JWT signature |
| 100 | ValidationHook | Incoming | Schema validation |
| 110 | NormalizationHook | Incoming | CloudEvents normalization |
| 200+ | Custom enrichment hooks | Both | Business data enrichment |
Pipeline Lifecycle Events
The EdgeStream facade emits events for key pipeline moments:
edgeStream.on('message:received', (event) => {
// fired when transport delivers a raw message
console.log('Message from server:', event.serverId);
});
edgeStream.on('pipeline:paused', (event) => {
// fired when a hook calls context.pause()
console.log('Pipeline paused, waiting for user input');
});
edgeStream.on('pipeline:resumed', (event) => {
// fired when the paused pipeline is resumed
console.log('Pipeline resumed, continuing delivery');
});
NormalizationHook on incoming pipelines that receive JSON messages.