Portal Community

SubscriptionManager Interface

export interface ISubscriptionManager {
  subscribe<TBody>(topic: string, callback: SubscriberCallback<TBody>): ISubscription;
  publish<TBody>(topic: string, envelope: IEnvelope<TBody>, messageId?: string): Promise<void>;
  setObserver(observer: IPipelineObserver): void;
  readonly subscriptions: ReadonlyArray<ISubscription>;
  clear(): void;
}

Internal Data Structures

// Each registered subscription is stored as a SubscriberEntry
interface SubscriberEntry {
  id: string;                        // 'sub-a1b2c3d4'
  topic: string;                     // the registered pattern
  callback: SubscriberCallback;      // the callback function
}

// The registry is a simple Map — O(1) lookup by ID
private subscribers: Map<string, SubscriberEntry> = new Map();

// All live subscription handles (for external enumeration)
private _subscriptions: Subscription[] = [];

Publish Flow

When the pipeline completes, it calls subscriptionManager.publish(topic, envelope). The manager iterates all subscribers and evaluates patterns:

// Simplified publish flow
async publish(topic, envelope, messageId?) {
  const promises = [];

  for (const entry of this.subscribers.values()) {
    if (this.topicMatches(entry.topic, topic)) {
      // 1. Notify observer (feeds SubscribersMonitor)
      observer?.onSubscriberQueued({ messageId, subscriberId: entry.id, topic });

      // 2. Invoke callback
      try {
        const result = entry.callback(envelope);
        if (result instanceof Promise) {
          promises.push(result
            .then(() => observer?.onSubscriberDelivered({ ... }))
            .catch(err => observer?.onSubscriberFailed({ ... }))
          );
        } else {
          observer?.onSubscriberDelivered({ ... });
        }
      } catch (syncError) {
        observer?.onSubscriberFailed({ ... });
      }
    }
  }

  await Promise.all(promises); // wait for all async deliveries
}

Accessing the SubscriptionManager

// Via the server instance
const manager = edgeStream.server('bas')!.subscriptionManager;

// Subscribe
const sub = manager.subscribe('workflow.*', handler);

// Enumerate active subscriptions
for (const sub of manager.subscriptions) {
  console.log(sub.id, sub.active);
}

// Clear all subscriptions (useful in tests)
manager.clear();

Observer Integration

// Set an observer to receive delivery events
manager.setObserver({
  onSubscriberQueued(event) {
    monitorStore.trackQueued(event.subscriberId, event.messageId);
  },
  onSubscriberDelivered(event) {
    monitorStore.trackDelivered(event.subscriberId, event.processingTime);
  },
  onSubscriberFailed(event) {
    errorStore.recordSubscriberFailure(event.error, event.subscriberId);
  },
  // ... other observer methods
});