Portal Community

IMessagePlugin Interface

interface IMessagePlugin {
  /** Topic namespace this plugin owns — must be unique in the registry */
  readonly topicNamespace: string;

  /** All message type names under this namespace */
  readonly messageTypes: readonly string[];

  /** Returns true if this plugin handles the given envelope */
  canHandle(envelope: IEnvelope): boolean;

  /** Validates message body against expected schema */
  validate(envelope: IEnvelope): ValidationResult;

  /** Creates the set of default hooks for this plugin's pipeline needs */
  createDefaultHooks(): IHook[];
}

interface ValidationResult {
  valid: boolean;
  errors?: string[];
}

Example: Inventory Event Plugin

import type { IMessagePlugin, IEnvelope, IHook, ValidationResult } from 'edge-stream-js';
import { NormalizationHook } from 'edge-stream-js';

// 1. Define TypeScript interfaces for your message types
export interface InventoryLowStockEvent {
  productId: string;
  productName: string;
  currentStock: number;
  threshold: number;
  warehouseId: string;
  alertedAt: string;
}

export interface InventoryRestockedEvent {
  productId: string;
  previousStock: number;
  newStock: number;
  restockedAt: string;
  supplierId: string;
}

// 2. Implement the plugin
export class InventoryEventPlugin implements IMessagePlugin {
  readonly topicNamespace = 'inventory';

  readonly messageTypes = [
    'inventory.lowStock',
    'inventory.restocked',
    'inventory.reserved',
    'inventory.released',
  ] as const;

  canHandle(envelope: IEnvelope): boolean {
    return envelope.meta.topic.startsWith('inventory.');
  }

  validate(envelope: IEnvelope): ValidationResult {
    const topic = envelope.meta.topic;
    const body = envelope.body as any;

    if (topic === 'inventory.lowStock') {
      if (!body.productId || typeof body.currentStock !== 'number') {
        return { valid: false, errors: ['productId and currentStock (number) are required'] };
      }
    }

    if (topic === 'inventory.restocked') {
      if (!body.productId || !body.supplierId) {
        return { valid: false, errors: ['productId and supplierId are required'] };
      }
    }

    return { valid: true };
  }

  createDefaultHooks(): IHook[] {
    return [
      new NormalizationHook(),        // priority 110 — CloudEvents normalization
      new InventoryEnrichmentHook(),  // priority 200 — add product details from cache
      new InventoryAuditHook(),       // priority 300 — log to audit trail
    ];
  }
}

Registering and Using the Plugin

import { InventoryEventPlugin } from './inventory-plugin';

const inventoryPlugin = new InventoryEventPlugin();
const server = stream.server('bas')!;

// Register default hooks
inventoryPlugin.createDefaultHooks().forEach(hook => {
  server.incomingPipeline.addHook(hook);
});

// Type-safe subscription using the plugin's interface
stream.subscribe<InventoryLowStockEvent>('bas', 'inventory.lowStock', (envelope) => {
  const { productId, productName, currentStock, threshold } = envelope.body;
  alertService.sendLowStockAlert(productId, productName, currentStock, threshold);
});

Plugin Checklist

ItemDescription
Unique namespacePick a noun prefix that does not conflict with existing plugins (workflow, agent, ui, system)
TypeScript interfacesExport one interface per message type
canHandle()Return true only for envelopes in your namespace
validate()Check required fields per message type, return clear error messages
createDefaultHooks()At minimum, include NormalizationHook. Add domain-specific hooks at higher priorities.
RegistrationRegister in the Plugin Registry so other teams can discover it