Processing Stage
The processing stage is where domain-specific message transformation happens — content mapping, format conversion, business rule application, and interactive user input collection.
Processing Stage Position
Processing hooks run after the pre-pipeline stage (120–199) and before post-pipeline hooks (300+). This is where the "business logic" of message handling lives — transforming the normalized envelope into a form that subscribers expect.
| Priority Range | Stage | Typical Operations |
|---|---|---|
| 5 | Observability | HookActivityLogger |
| 90–110 | Security + Normalization | Decrypt, Verify, Normalize |
| 120–199 | Pre-pipeline | Enrichment, Rate limiting, Dedup |
| 200–299 | Processing | Transformation, Business logic, HIL |
| 300+ | Post-pipeline | Audit, Metrics, Side effects |
Message Transformation Hook
Processing hooks can reshape the message body to match what subscribers need. The transformed body replaces context.body:
export class WorkflowEventMapperHook extends BaseHook {
readonly name = 'WorkflowEventMapperHook';
readonly priority = 200;
async execute(context: IPipelineContext): Promise<HookResult> {
const raw = context.body as any;
// Map from CloudEvents body to domain-specific shape
if (raw?.data?.data?.eventType?.startsWith('workflow.')) {
const mapped: WorkflowDomainEvent = {
nodeId: raw.data.data.nodeId,
workflowId: raw.data.metadata?.workflowId || context.envelope.meta.correlationId,
eventType: raw.data.data.eventType,
status: raw.data.data.status,
output: raw.data.data.output,
duration: raw.data.data.durationMs,
timestamp: new Date(raw.time || raw.data.data.timestamp),
};
context.body = mapped;
context.envelope.body = mapped;
}
return { continue: true };
}
}
Interactive Processing — HIL Pause
The processing stage is where Human-in-the-Loop (HIL) interruption happens. A hook can pause the pipeline and wait for user input before continuing. When a pipeline is paused, the envelope is held in memory and the pipeline resumes asynchronously when the user responds.
export class HilFormHook extends BaseHook {
readonly name = 'HilFormHook';
readonly priority = 250;
canPause = true;
async execute(context: IPipelineContext): Promise<HookResult> {
const body = context.body as any;
// Only intercept HIL form requests
if (body?.eventType !== 'hil.form.request') {
return { continue: true };
}
// Pause the pipeline and wait for user response
const pause = context.pause({
id: crypto.randomUUID(),
type: 'form',
message: body.prompt || 'Please complete this form to continue',
payload: { formSchema: body.formSchema },
timeoutMs: body.timeoutMs || 300_000, // 5 minutes default
});
// The pipeline is now paused — this hook's execute() returns
// When resume() is called, the next hook executes
return { continue: true, paused: true };
}
}
Content-Type Routing Hook
// Routes messages to topic-specific processing paths
export class ContentRoutingHook extends BaseHook {
readonly name = 'ContentRoutingHook';
readonly priority = 210;
async execute(context: IPipelineContext): Promise<HookResult> {
const topic = context.envelope.meta.topic || '';
// Tag envelope with routing decision — subscribers can filter
if (topic.startsWith('agent.chat')) {
context.envelope.attributes = {
...context.envelope.attributes,
domain: 'agent',
requiresStreaming: true,
};
} else if (topic.startsWith('workflow.')) {
context.envelope.attributes = {
...context.envelope.attributes,
domain: 'workflow',
requiresStreaming: false,
};
}
return { continue: true };
}
}
Async Processing with External Services
Processing hooks can perform async work — database lookups, API calls, cache reads:
export class FormSchemaResolverHook extends BaseHook {
readonly name = 'FormSchemaResolverHook';
readonly priority = 220;
constructor(private formService: FormService) { super(); }
async execute(context: IPipelineContext): Promise<HookResult> {
const body = context.body as any;
if (!body?.formId) return { continue: true };
// Async lookup — hook awaits resolution before pipeline continues
const schema = await this.formService.getSchema(body.formId);
if (!schema) {
context.abort(`Form schema not found: ${body.formId}`);
return { continue: false };
}
// Attach schema to metadata for downstream hooks
context.metadata.formSchema = schema;
return { continue: true };
}
}
execute() methods are async. The pipeline awaits each hook before proceeding to the next. Long-running async operations in hooks will delay message delivery to subscribers.