EdgeStream
Building a Custom Plugin
A custom plugin packages domain-specific message types, TypeScript interfaces, validation, and default hook configuration for a topic namespace. Build one when you introduce a new message domain that multiple teams will consume.
IMessagePlugin Interface
interface IMessagePlugin {
/** Topic namespace this plugin owns — must be unique in the registry */
readonly topicNamespace: string;
/** All message type names under this namespace */
readonly messageTypes: readonly string[];
/** Returns true if this plugin handles the given envelope */
canHandle(envelope: IEnvelope): boolean;
/** Validates message body against expected schema */
validate(envelope: IEnvelope): ValidationResult;
/** Creates the set of default hooks for this plugin's pipeline needs */
createDefaultHooks(): IHook[];
}
interface ValidationResult {
valid: boolean;
errors?: string[];
}
Example: Inventory Event Plugin
import type { IMessagePlugin, IEnvelope, IHook, ValidationResult } from 'edge-stream-js';
import { NormalizationHook } from 'edge-stream-js';
// 1. Define TypeScript interfaces for your message types
export interface InventoryLowStockEvent {
productId: string;
productName: string;
currentStock: number;
threshold: number;
warehouseId: string;
alertedAt: string;
}
export interface InventoryRestockedEvent {
productId: string;
previousStock: number;
newStock: number;
restockedAt: string;
supplierId: string;
}
// 2. Implement the plugin
export class InventoryEventPlugin implements IMessagePlugin {
readonly topicNamespace = 'inventory';
readonly messageTypes = [
'inventory.lowStock',
'inventory.restocked',
'inventory.reserved',
'inventory.released',
] as const;
canHandle(envelope: IEnvelope): boolean {
return envelope.meta.topic.startsWith('inventory.');
}
validate(envelope: IEnvelope): ValidationResult {
const topic = envelope.meta.topic;
const body = envelope.body as any;
if (topic === 'inventory.lowStock') {
if (!body.productId || typeof body.currentStock !== 'number') {
return { valid: false, errors: ['productId and currentStock (number) are required'] };
}
}
if (topic === 'inventory.restocked') {
if (!body.productId || !body.supplierId) {
return { valid: false, errors: ['productId and supplierId are required'] };
}
}
return { valid: true };
}
createDefaultHooks(): IHook[] {
return [
new NormalizationHook(), // priority 110 — CloudEvents normalization
new InventoryEnrichmentHook(), // priority 200 — add product details from cache
new InventoryAuditHook(), // priority 300 — log to audit trail
];
}
}
Registering and Using the Plugin
import { InventoryEventPlugin } from './inventory-plugin';
const inventoryPlugin = new InventoryEventPlugin();
const server = stream.server('bas')!;
// Register default hooks
inventoryPlugin.createDefaultHooks().forEach(hook => {
server.incomingPipeline.addHook(hook);
});
// Type-safe subscription using the plugin's interface
stream.subscribe<InventoryLowStockEvent>('bas', 'inventory.lowStock', (envelope) => {
const { productId, productName, currentStock, threshold } = envelope.body;
alertService.sendLowStockAlert(productId, productName, currentStock, threshold);
});
Plugin Checklist
| Item | Description |
|---|---|
| Unique namespace | Pick a noun prefix that does not conflict with existing plugins (workflow, agent, ui, system) |
| TypeScript interfaces | Export one interface per message type |
| canHandle() | Return true only for envelopes in your namespace |
| validate() | Check required fields per message type, return clear error messages |
| createDefaultHooks() | At minimum, include NormalizationHook. Add domain-specific hooks at higher priorities. |
| Registration | Register in the Plugin Registry so other teams can discover it |