Flow Studio
INodeEventSubscriber
The full interface contract — four async methods, one for each stage of the node execution lifecycle. Implement all four; leave no-op bodies for events you don't need.
Interface Definition
The interface lives in ProcessEngine/Events/INodeEventSubscriber.cs. All four methods are required — the interface has no default implementations. For events you don't care about, return Task.CompletedTask.
namespace BizFirst.Ai.ProcessEngine.Events;
/// <summary>
/// Observer for node-level lifecycle events. Implementations are discovered
/// via DI and called by NodeEventDispatcher during every node execution.
/// Context passed to methods is a snapshot — mutations have no effect.
/// </summary>
public interface INodeEventSubscriber
{
/// <summary>Called before the executor runs. Throwing cancels execution.</summary>
Task OnBeforeExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct);
/// <summary>Called after a successful result is returned.</summary>
Task OnAfterExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct);
/// <summary>Called when the executor throws. Exception is available on args.</summary>
Task OnErrorAsync(NodeExecutionEventArgs args, CancellationToken ct);
/// <summary>Called when the engine skips this node (conditional routing).</summary>
Task OnSkippedAsync(NodeExecutionEventArgs args, CancellationToken ct);
}
Method Signatures Explained
| Method | When Called | Key Args Available | Can Cancel? |
|---|---|---|---|
OnBeforeExecuteAsync | After engine resolves the executor, before ExecuteInternalAsync | Context snapshot, input data | Yes — throw |
OnAfterExecuteAsync | After executor returns a Success result | Context snapshot, NodeExecutionResult | No |
OnErrorAsync | When executor throws any exception | Context snapshot, Exception, retry count | No |
OnSkippedAsync | When the router decides to skip this node | Context snapshot, SkipReason enum | No |
Minimal Implementation
A subscriber that only cares about errors — all other methods are no-ops:
public class ErrorAlertSubscriber : INodeEventSubscriber
{
private readonly IAlertService _alerts;
public ErrorAlertSubscriber(IAlertService alerts) => _alerts = alerts;
public Task OnBeforeExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct)
=> Task.CompletedTask;
public Task OnAfterExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct)
=> Task.CompletedTask;
public async Task OnErrorAsync(NodeExecutionEventArgs args, CancellationToken ct)
{
await _alerts.SendAsync(new Alert
{
Title = $"Node {args.NodeId} failed",
Message = args.Exception!.Message,
Severity = AlertSeverity.High
}, ct);
}
public Task OnSkippedAsync(NodeExecutionEventArgs args, CancellationToken ct)
=> Task.CompletedTask;
}
Full Audit Subscriber Example
A subscriber that writes a row to an audit table at every lifecycle stage:
public class AuditNodeSubscriber : INodeEventSubscriber
{
private readonly IAuditRepository _audit;
public AuditNodeSubscriber(IAuditRepository audit) => _audit = audit;
public async Task OnBeforeExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct)
{
await _audit.WriteAsync(new NodeAuditEntry
{
ExecutionId = args.ExecutionId,
NodeId = args.NodeId,
NodeType = args.NodeType,
Event = AuditEvent.Started,
Timestamp = DateTimeOffset.UtcNow
}, ct);
}
public async Task OnAfterExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct)
{
await _audit.WriteAsync(new NodeAuditEntry
{
ExecutionId = args.ExecutionId,
NodeId = args.NodeId,
Event = AuditEvent.Completed,
DurationMs = args.DurationMs,
Timestamp = DateTimeOffset.UtcNow
}, ct);
}
public async Task OnErrorAsync(NodeExecutionEventArgs args, CancellationToken ct)
{
await _audit.WriteAsync(new NodeAuditEntry
{
ExecutionId = args.ExecutionId,
NodeId = args.NodeId,
Event = AuditEvent.Failed,
ErrorMessage= args.Exception!.Message,
RetryCount = args.RetryCount,
Timestamp = DateTimeOffset.UtcNow
}, ct);
}
public async Task OnSkippedAsync(NodeExecutionEventArgs args, CancellationToken ct)
{
await _audit.WriteAsync(new NodeAuditEntry
{
ExecutionId = args.ExecutionId,
NodeId = args.NodeId,
Event = AuditEvent.Skipped,
SkipReason = args.SkipReason.ToString(),
Timestamp = DateTimeOffset.UtcNow
}, ct);
}
}
Dispatcher Behaviour
The NodeEventDispatcher (internal engine class) handles the orchestration:
- Resolves all
INodeEventSubscriberinstances from the DI scope. - Calls them in registration order.
- If a subscriber's method throws, the exception is caught, logged at Warning level, and the next subscriber is still called.
- The dispatcher never lets subscriber exceptions propagate to the execution engine — a broken subscriber does not fail the workflow.
Exception in OnBeforeExecuteAsync: The rule above applies to all events except
OnBeforeExecuteAsync. If a subscriber throws there, the engine treats it as a node failure. This is intentional but dangerous — design OnBefore subscribers to be extremely robust.