Portal Community

How Delivery Works

When the pipeline completes without abortion, SubscriptionManager.publish(topic, envelope) is called. It iterates over all registered subscribers, evaluates each against the topic pattern, and invokes matching callbacks.

// SubscriptionManager.publish — actual source
async publish<TBody>(topic: string, envelope: IEnvelope<TBody>): Promise<void> {
  const promises: Promise<void>[] = [];

  for (const entry of this.subscribers.values()) {
    if (this.topicMatches(entry.topic, topic)) {
      const start = Date.now();
      // Fire queued event — for SubscribersMonitor
      this.observer?.onSubscriberQueued({ messageId, subscriberId: entry.id, ... });

      try {
        const result = entry.callback(envelope);
        if (result instanceof Promise) {
          // Async subscribers are collected and awaited together
          promises.push(result.then(() => {
            this.observer?.onSubscriberDelivered({ ... });
          }).catch(err => {
            this.observer?.onSubscriberFailed({ ... });
            // Error is logged but does NOT stop other subscribers
          }));
        }
      } catch (syncError) {
        // Sync errors are also isolated — other subscribers continue
        this.observer?.onSubscriberFailed({ ... });
      }
    }
  }

  await Promise.all(promises); // wait for all async subscribers
}

Topic Matching at Delivery

The topicMatches(pattern, actual) method evaluates each registered subscription pattern against the incoming envelope's topic:

private topicMatches(pattern: string, actual: string): boolean {
  // Exact match
  if (pattern === actual) return true;

  // Wildcard: 'workflow.*' matches 'workflow.started'
  if (pattern.endsWith('.*')) {
    const prefix = pattern.slice(0, -2);
    return actual.startsWith(`${prefix}.`) && actual.length > prefix.length + 1;
  }

  return false;
}

// Examples:
// pattern='workflow.*'    topic='workflow.started'   → true
// pattern='workflow.*'    topic='workflow.node.done'  → false (multi-level)
// pattern='workflow.*'    topic='other.started'       → false

Subscriber Isolation

Subscriber failures are isolated — one subscriber throwing an error never affects others. The SubscriptionManager catches both sync and async errors per-subscriber:

Error Isolation

Subscriber A throws → Subscriber B still receives the message. Error is logged to observer but not rethrown.

Concurrent Delivery

All matching subscribers are invoked concurrently via Promise.all. No subscriber waits for another.

Observability Events

Each delivery emits onSubscriberQueued, onSubscriberDelivered, or onSubscriberFailed to the observer.

Delivery Observability Events

EventWhenContains
onSubscriberQueuedBefore callback is invokedmessageId, subscriberId, topic, envelopeSnapshot
onSubscriberDeliveredAfter callback completes successfullymessageId, subscriberId, topic, processingTime, callbackName
onSubscriberFailedAfter callback throwsmessageId, subscriberId, topic, error, processingTime

Subscription Handle

Every call to subscribe() returns an ISubscription handle with a unique generated ID:

export interface ISubscription {
  readonly id: string;    // e.g., 'sub-a1b2c3d4'
  readonly active: boolean;
  unsubscribe(): void;
}

// Usage
const sub = edgeStream.subscribe('bas', 'workflow.*', handler);
console.log(sub.id);     // 'sub-xxxx'
console.log(sub.active); // true

sub.unsubscribe();
console.log(sub.active); // false — removed from SubscriptionManager