Delivery Stage
After all hooks pass, the SubscriptionManager delivers the final envelope to every matching subscriber. Delivery is concurrent, isolated, and fully observable.
How Delivery Works
When the pipeline completes without abortion, SubscriptionManager.publish(topic, envelope) is called. It iterates over all registered subscribers, evaluates each against the topic pattern, and invokes matching callbacks.
// SubscriptionManager.publish — actual source
async publish<TBody>(topic: string, envelope: IEnvelope<TBody>): Promise<void> {
const promises: Promise<void>[] = [];
for (const entry of this.subscribers.values()) {
if (this.topicMatches(entry.topic, topic)) {
const start = Date.now();
// Fire queued event — for SubscribersMonitor
this.observer?.onSubscriberQueued({ messageId, subscriberId: entry.id, ... });
try {
const result = entry.callback(envelope);
if (result instanceof Promise) {
// Async subscribers are collected and awaited together
promises.push(result.then(() => {
this.observer?.onSubscriberDelivered({ ... });
}).catch(err => {
this.observer?.onSubscriberFailed({ ... });
// Error is logged but does NOT stop other subscribers
}));
}
} catch (syncError) {
// Sync errors are also isolated — other subscribers continue
this.observer?.onSubscriberFailed({ ... });
}
}
}
await Promise.all(promises); // wait for all async subscribers
}
Topic Matching at Delivery
The topicMatches(pattern, actual) method evaluates each registered subscription pattern against the incoming envelope's topic:
private topicMatches(pattern: string, actual: string): boolean {
// Exact match
if (pattern === actual) return true;
// Wildcard: 'workflow.*' matches 'workflow.started'
if (pattern.endsWith('.*')) {
const prefix = pattern.slice(0, -2);
return actual.startsWith(`${prefix}.`) && actual.length > prefix.length + 1;
}
return false;
}
// Examples:
// pattern='workflow.*' topic='workflow.started' → true
// pattern='workflow.*' topic='workflow.node.done' → false (multi-level)
// pattern='workflow.*' topic='other.started' → false
Subscriber Isolation
Subscriber failures are isolated — one subscriber throwing an error never affects others. The SubscriptionManager catches both sync and async errors per-subscriber:
Error Isolation
Subscriber A throws → Subscriber B still receives the message. Error is logged to observer but not rethrown.
Concurrent Delivery
All matching subscribers are invoked concurrently via Promise.all. No subscriber waits for another.
Observability Events
Each delivery emits onSubscriberQueued, onSubscriberDelivered, or onSubscriberFailed to the observer.
Delivery Observability Events
| Event | When | Contains |
|---|---|---|
onSubscriberQueued | Before callback is invoked | messageId, subscriberId, topic, envelopeSnapshot |
onSubscriberDelivered | After callback completes successfully | messageId, subscriberId, topic, processingTime, callbackName |
onSubscriberFailed | After callback throws | messageId, subscriberId, topic, error, processingTime |
Subscription Handle
Every call to subscribe() returns an ISubscription handle with a unique generated ID:
export interface ISubscription {
readonly id: string; // e.g., 'sub-a1b2c3d4'
readonly active: boolean;
unsubscribe(): void;
}
// Usage
const sub = edgeStream.subscribe('bas', 'workflow.*', handler);
console.log(sub.id); // 'sub-xxxx'
console.log(sub.active); // true
sub.unsubscribe();
console.log(sub.active); // false — removed from SubscriptionManager