EdgeStream
SubscriptionManager
SubscriptionManager is the central registry and message router for a server instance. It maintains the list of active subscriptions and delivers envelopes to all matching callbacks when publish() is called.
SubscriptionManager Interface
export interface ISubscriptionManager {
subscribe<TBody>(topic: string, callback: SubscriberCallback<TBody>): ISubscription;
publish<TBody>(topic: string, envelope: IEnvelope<TBody>, messageId?: string): Promise<void>;
setObserver(observer: IPipelineObserver): void;
readonly subscriptions: ReadonlyArray<ISubscription>;
clear(): void;
}
Internal Data Structures
// Each registered subscription is stored as a SubscriberEntry
interface SubscriberEntry {
id: string; // 'sub-a1b2c3d4'
topic: string; // the registered pattern
callback: SubscriberCallback; // the callback function
}
// The registry is a simple Map — O(1) lookup by ID
private subscribers: Map<string, SubscriberEntry> = new Map();
// All live subscription handles (for external enumeration)
private _subscriptions: Subscription[] = [];
Publish Flow
When the pipeline completes, it calls subscriptionManager.publish(topic, envelope). The manager iterates all subscribers and evaluates patterns:
// Simplified publish flow
async publish(topic, envelope, messageId?) {
const promises = [];
for (const entry of this.subscribers.values()) {
if (this.topicMatches(entry.topic, topic)) {
// 1. Notify observer (feeds SubscribersMonitor)
observer?.onSubscriberQueued({ messageId, subscriberId: entry.id, topic });
// 2. Invoke callback
try {
const result = entry.callback(envelope);
if (result instanceof Promise) {
promises.push(result
.then(() => observer?.onSubscriberDelivered({ ... }))
.catch(err => observer?.onSubscriberFailed({ ... }))
);
} else {
observer?.onSubscriberDelivered({ ... });
}
} catch (syncError) {
observer?.onSubscriberFailed({ ... });
}
}
}
await Promise.all(promises); // wait for all async deliveries
}
Accessing the SubscriptionManager
// Via the server instance
const manager = edgeStream.server('bas')!.subscriptionManager;
// Subscribe
const sub = manager.subscribe('workflow.*', handler);
// Enumerate active subscriptions
for (const sub of manager.subscriptions) {
console.log(sub.id, sub.active);
}
// Clear all subscriptions (useful in tests)
manager.clear();
Observer Integration
// Set an observer to receive delivery events
manager.setObserver({
onSubscriberQueued(event) {
monitorStore.trackQueued(event.subscriberId, event.messageId);
},
onSubscriberDelivered(event) {
monitorStore.trackDelivered(event.subscriberId, event.processingTime);
},
onSubscriberFailed(event) {
errorStore.recordSubscriberFailure(event.error, event.subscriberId);
},
// ... other observer methods
});