Flow Studio
IWorkflowEventBus
The core interface for publishing and subscribing to workflow-level events. Inject it into node executors to publish custom domain events; register IWorkflowEventHandler implementations to consume them.
Interface Definition
namespace BizFirst.Ai.ProcessEngine.EventBus;
public interface IWorkflowEventBus
{
/// <summary>
/// Publish an event to all registered handlers for TEvent.
/// Handlers are called synchronously in registration order.
/// Throws AggregateException if any handler fails (configurable).
/// </summary>
Task PublishAsync<TEvent>(TEvent @event, CancellationToken ct = default)
where TEvent : WorkflowEvent;
/// <summary>
/// Programmatic subscription — for use in tests or dynamic registration.
/// Prefer DI-based IWorkflowEventHandler<TEvent> for production code.
/// </summary>
void Subscribe<TEvent>(Func<TEvent, CancellationToken, Task> handler)
where TEvent : WorkflowEvent;
}
IWorkflowEventHandler<TEvent>
/// <summary>
/// Implement to consume a specific workflow event type.
/// Register as IWorkflowEventHandler<TEvent> in DI — the bus
/// discovers all handlers at publish time.
/// </summary>
public interface IWorkflowEventHandler<TEvent> where TEvent : WorkflowEvent
{
Task HandleAsync(TEvent @event, CancellationToken ct);
}
Registration in DI
// Register the bus implementation (done once in the engine setup)
builder.Services.AddSingleton<IWorkflowEventBus, WorkflowEventDispatcher>();
// Register your handler
builder.Services.AddTransient<
IWorkflowEventHandler<WorkflowCompleted>,
WebhookDeliveryHandler>();
// Or use the extension method (if provided by the SDK)
builder.Services.AddWorkflowEventHandler<WorkflowCompleted, WebhookDeliveryHandler>();
Publish / Subscribe Semantics
| Aspect | Behaviour |
|---|---|
| Delivery model | Synchronous in-process fan-out — all handlers execute before PublishAsync returns |
| Handler order | DI registration order |
| Handler failure | By default, handler exceptions are caught and logged; other handlers still run |
| Cancellation | CancellationToken is forwarded to every handler |
| Thread safety | The bus is Singleton — handlers must be reentrant |
Injecting IWorkflowEventBus into an Executor
public class OrderApprovalExecutor : BaseNodeExecutor<ApprovalInput, ApprovalOutput>
{
private readonly IWorkflowEventBus _eventBus;
public OrderApprovalExecutor(IWorkflowEventBus eventBus)
{
_eventBus = eventBus;
}
protected override async Task<NodeExecutionResult> ExecuteInternalAsync(
NodeExecutionContext ctx, CancellationToken ct)
{
var decision = ctx.Input.Decision; // approved / rejected
// Publish domain event so downstream systems can react
await _eventBus.PublishAsync(new OrderDecisionEvent
{
ExecutionId = ctx.ExecutionId,
ProcessId = ctx.ProcessId,
TenantId = ctx.TenantId,
OrderId = ctx.Input.OrderId,
Decision = decision
}, ct);
return NodeExecutionResult.Success(new ApprovalOutput { Decision = decision });
}
}