Portal Community

Processing Stage Position

Processing hooks run after the pre-pipeline stage (120–199) and before post-pipeline hooks (300+). This is where the "business logic" of message handling lives — transforming the normalized envelope into a form that subscribers expect.

Priority RangeStageTypical Operations
5ObservabilityHookActivityLogger
90–110Security + NormalizationDecrypt, Verify, Normalize
120–199Pre-pipelineEnrichment, Rate limiting, Dedup
200–299ProcessingTransformation, Business logic, HIL
300+Post-pipelineAudit, Metrics, Side effects

Message Transformation Hook

Processing hooks can reshape the message body to match what subscribers need. The transformed body replaces context.body:

export class WorkflowEventMapperHook extends BaseHook {
  readonly name = 'WorkflowEventMapperHook';
  readonly priority = 200;

  async execute(context: IPipelineContext): Promise<HookResult> {
    const raw = context.body as any;

    // Map from CloudEvents body to domain-specific shape
    if (raw?.data?.data?.eventType?.startsWith('workflow.')) {
      const mapped: WorkflowDomainEvent = {
        nodeId: raw.data.data.nodeId,
        workflowId: raw.data.metadata?.workflowId || context.envelope.meta.correlationId,
        eventType: raw.data.data.eventType,
        status: raw.data.data.status,
        output: raw.data.data.output,
        duration: raw.data.data.durationMs,
        timestamp: new Date(raw.time || raw.data.data.timestamp),
      };

      context.body = mapped;
      context.envelope.body = mapped;
    }

    return { continue: true };
  }
}

Interactive Processing — HIL Pause

The processing stage is where Human-in-the-Loop (HIL) interruption happens. A hook can pause the pipeline and wait for user input before continuing. When a pipeline is paused, the envelope is held in memory and the pipeline resumes asynchronously when the user responds.

export class HilFormHook extends BaseHook {
  readonly name = 'HilFormHook';
  readonly priority = 250;
  canPause = true;

  async execute(context: IPipelineContext): Promise<HookResult> {
    const body = context.body as any;

    // Only intercept HIL form requests
    if (body?.eventType !== 'hil.form.request') {
      return { continue: true };
    }

    // Pause the pipeline and wait for user response
    const pause = context.pause({
      id: crypto.randomUUID(),
      type: 'form',
      message: body.prompt || 'Please complete this form to continue',
      payload: { formSchema: body.formSchema },
      timeoutMs: body.timeoutMs || 300_000, // 5 minutes default
    });

    // The pipeline is now paused — this hook's execute() returns
    // When resume() is called, the next hook executes
    return { continue: true, paused: true };
  }
}

Content-Type Routing Hook

// Routes messages to topic-specific processing paths
export class ContentRoutingHook extends BaseHook {
  readonly name = 'ContentRoutingHook';
  readonly priority = 210;

  async execute(context: IPipelineContext): Promise<HookResult> {
    const topic = context.envelope.meta.topic || '';

    // Tag envelope with routing decision — subscribers can filter
    if (topic.startsWith('agent.chat')) {
      context.envelope.attributes = {
        ...context.envelope.attributes,
        domain: 'agent',
        requiresStreaming: true,
      };
    } else if (topic.startsWith('workflow.')) {
      context.envelope.attributes = {
        ...context.envelope.attributes,
        domain: 'workflow',
        requiresStreaming: false,
      };
    }

    return { continue: true };
  }
}

Async Processing with External Services

Processing hooks can perform async work — database lookups, API calls, cache reads:

export class FormSchemaResolverHook extends BaseHook {
  readonly name = 'FormSchemaResolverHook';
  readonly priority = 220;

  constructor(private formService: FormService) { super(); }

  async execute(context: IPipelineContext): Promise<HookResult> {
    const body = context.body as any;

    if (!body?.formId) return { continue: true };

    // Async lookup — hook awaits resolution before pipeline continues
    const schema = await this.formService.getSchema(body.formId);
    if (!schema) {
      context.abort(`Form schema not found: ${body.formId}`);
      return { continue: false };
    }

    // Attach schema to metadata for downstream hooks
    context.metadata.formSchema = schema;
    return { continue: true };
  }
}
Processing Hooks Are Async All hook execute() methods are async. The pipeline awaits each hook before proceeding to the next. Long-running async operations in hooks will delay message delivery to subscribers.