Portal Community

Interface Definition

namespace BizFirst.Ai.ProcessEngine.EventBus;

public interface IWorkflowEventBus
{
    /// <summary>
    /// Publish an event to all registered handlers for TEvent.
    /// Handlers are called synchronously in registration order.
    /// Throws AggregateException if any handler fails (configurable).
    /// </summary>
    Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
        where TEvent : WorkflowEvent;

    /// <summary>
    /// Programmatic subscription — for use in tests or dynamic registration.
    /// Prefer DI-based IWorkflowEventHandler<TEvent> for production code.
    /// </summary>
    void Subscribe<TEvent>(Func<TEvent, CancellationToken, Task> handler)
        where TEvent : WorkflowEvent;
}

IWorkflowEventHandler<TEvent>

/// <summary>
/// Implement to consume a specific workflow event type.
/// Register as IWorkflowEventHandler<TEvent> in DI — the bus
/// discovers all handlers at publish time.
/// </summary>
public interface IWorkflowEventHandler<TEvent> where TEvent : WorkflowEvent
{
    Task HandleAsync(TEvent @event, CancellationToken ct);
}

Registration in DI

// Register the bus implementation (done once in the engine setup)
builder.Services.AddSingleton<IWorkflowEventBus, WorkflowEventDispatcher>();

// Register your handler
builder.Services.AddTransient<
    IWorkflowEventHandler<WorkflowCompleted>,
    WebhookDeliveryHandler>();

// Or use the extension method (if provided by the SDK)
builder.Services.AddWorkflowEventHandler<WorkflowCompleted, WebhookDeliveryHandler>();

Publish / Subscribe Semantics

AspectBehaviour
Delivery modelSynchronous in-process fan-out — all handlers execute before PublishAsync returns
Handler orderDI registration order
Handler failureBy default, handler exceptions are caught and logged; other handlers still run
CancellationCancellationToken is forwarded to every handler
Thread safetyThe bus is Singleton — handlers must be reentrant

Injecting IWorkflowEventBus into an Executor

public class OrderApprovalExecutor : BaseNodeExecutor<ApprovalInput, ApprovalOutput>
{
    private readonly IWorkflowEventBus _eventBus;

    public OrderApprovalExecutor(IWorkflowEventBus eventBus)
    {
        _eventBus = eventBus;
    }

    protected override async Task<NodeExecutionResult> ExecuteInternalAsync(
        NodeExecutionContext ctx, CancellationToken ct)
    {
        var decision = ctx.Input.Decision; // approved / rejected

        // Publish domain event so downstream systems can react
        await _eventBus.PublishAsync(new OrderDecisionEvent
        {
            ExecutionId = ctx.ExecutionId,
            ProcessId   = ctx.ProcessId,
            TenantId    = ctx.TenantId,
            OrderId     = ctx.Input.OrderId,
            Decision    = decision
        }, ct);

        return NodeExecutionResult.Success(new ApprovalOutput { Decision = decision });
    }
}