EdgeStream
Message Plugins Overview
Message plugins are domain-specific packages that define the message types, TypeScript interfaces, validation rules, and default hook behavior for a topic namespace. A plugin "owns" a prefix like workflow or agent and provides everything subscribers need to work with those messages.
Plugin vs Hook
| Hook | Message Plugin | |
|---|---|---|
| What it is | Pipeline stage that transforms/filters one message | A package that bundles message types + validation + hooks for a topic domain |
| Granularity | Single responsibility (decrypt, validate, normalize...) | Domain-level (all workflow messages, all agent messages) |
| What it provides | execute(context) | TypeScript interfaces, topic namespace, validator, default hooks |
| Examples | NormalizationHook, TenantFilterHook | WorkflowEventPlugin, AgentMessagePlugin |
Plugin Categories
| Plugin | Topic Namespace | Powers |
|---|---|---|
WorkflowEventPlugin | workflow.* | Flow Studio Observer Panel — execution events, node completions, failures |
AgentMessagePlugin | agent.* | Octopus chat — streaming tokens, tool calls, turn completions |
UiActionPlugin | ui.* | UI interactions — button clicks, form submits, navigation events |
SystemEventPlugin | system.* | Platform events — health checks, deployments, config changes |
What a Plugin Provides
interface IMessagePlugin {
/** Topic namespace this plugin owns */
readonly topicNamespace: string; // e.g., 'workflow'
/** All message type names this plugin handles */
readonly messageTypes: readonly string[]; // e.g., ['workflow.started', 'workflow.nodeCompleted']
/** TypeScript interfaces for each message type (documentation only) */
readonly messageSchemas: Record<string, unknown>;
/** Validate an envelope — returns true if this plugin should handle it */
canHandle(envelope: IEnvelope): boolean;
/** Validate message body against the expected schema */
validate(envelope: IEnvelope): ValidationResult;
/** Default hooks to register for this plugin's topic namespace */
createDefaultHooks(): IHook[];
}
Using a Plugin
import { WorkflowEventPlugin } from '@edge-stream/workflow-plugin';
const plugin = new WorkflowEventPlugin();
const server = stream.server('bas')!;
// Register the plugin's default hooks into the pipeline
plugin.createDefaultHooks().forEach(hook => {
server.incomingPipeline.addHook(hook);
});
// Subscribe using the plugin's known topic namespace
stream.subscribe('bas', 'workflow.*', (envelope) => {
if (!plugin.canHandle(envelope)) return;
const validation = plugin.validate(envelope);
if (!validation.valid) {
console.warn('Invalid workflow message:', validation.errors);
return;
}
// envelope.body is now typed as WorkflowEvent
console.log('Workflow event:', envelope.body);
});