Portal Community

Interface Definition

The interface lives in ProcessEngine/Events/INodeEventSubscriber.cs. All four methods are required — the interface has no default implementations. For events you don't care about, return Task.CompletedTask.

namespace BizFirst.Ai.ProcessEngine.Events;

/// <summary>
/// Observer for node-level lifecycle events. Implementations are discovered
/// via DI and called by NodeEventDispatcher during every node execution.
/// Context passed to methods is a snapshot — mutations have no effect.
/// </summary>
public interface INodeEventSubscriber
{
    /// <summary>Called before the executor runs. Throwing cancels execution.</summary>
    Task OnBeforeExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct);

    /// <summary>Called after a successful result is returned.</summary>
    Task OnAfterExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct);

    /// <summary>Called when the executor throws. Exception is available on args.</summary>
    Task OnErrorAsync(NodeExecutionEventArgs args, CancellationToken ct);

    /// <summary>Called when the engine skips this node (conditional routing).</summary>
    Task OnSkippedAsync(NodeExecutionEventArgs args, CancellationToken ct);
}

Method Signatures Explained

MethodWhen CalledKey Args AvailableCan Cancel?
OnBeforeExecuteAsyncAfter engine resolves the executor, before ExecuteInternalAsyncContext snapshot, input dataYes — throw
OnAfterExecuteAsyncAfter executor returns a Success resultContext snapshot, NodeExecutionResultNo
OnErrorAsyncWhen executor throws any exceptionContext snapshot, Exception, retry countNo
OnSkippedAsyncWhen the router decides to skip this nodeContext snapshot, SkipReason enumNo

Minimal Implementation

A subscriber that only cares about errors — all other methods are no-ops:

public class ErrorAlertSubscriber : INodeEventSubscriber
{
    private readonly IAlertService _alerts;

    public ErrorAlertSubscriber(IAlertService alerts) => _alerts = alerts;

    public Task OnBeforeExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct)
        => Task.CompletedTask;

    public Task OnAfterExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct)
        => Task.CompletedTask;

    public async Task OnErrorAsync(NodeExecutionEventArgs args, CancellationToken ct)
    {
        await _alerts.SendAsync(new Alert
        {
            Title    = $"Node {args.NodeId} failed",
            Message  = args.Exception!.Message,
            Severity = AlertSeverity.High
        }, ct);
    }

    public Task OnSkippedAsync(NodeExecutionEventArgs args, CancellationToken ct)
        => Task.CompletedTask;
}

Full Audit Subscriber Example

A subscriber that writes a row to an audit table at every lifecycle stage:

public class AuditNodeSubscriber : INodeEventSubscriber
{
    private readonly IAuditRepository _audit;

    public AuditNodeSubscriber(IAuditRepository audit) => _audit = audit;

    public async Task OnBeforeExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct)
    {
        await _audit.WriteAsync(new NodeAuditEntry
        {
            ExecutionId = args.ExecutionId,
            NodeId      = args.NodeId,
            NodeType    = args.NodeType,
            Event       = AuditEvent.Started,
            Timestamp   = DateTimeOffset.UtcNow
        }, ct);
    }

    public async Task OnAfterExecuteAsync(NodeExecutionEventArgs args, CancellationToken ct)
    {
        await _audit.WriteAsync(new NodeAuditEntry
        {
            ExecutionId  = args.ExecutionId,
            NodeId       = args.NodeId,
            Event        = AuditEvent.Completed,
            DurationMs   = args.DurationMs,
            Timestamp    = DateTimeOffset.UtcNow
        }, ct);
    }

    public async Task OnErrorAsync(NodeExecutionEventArgs args, CancellationToken ct)
    {
        await _audit.WriteAsync(new NodeAuditEntry
        {
            ExecutionId = args.ExecutionId,
            NodeId      = args.NodeId,
            Event       = AuditEvent.Failed,
            ErrorMessage= args.Exception!.Message,
            RetryCount  = args.RetryCount,
            Timestamp   = DateTimeOffset.UtcNow
        }, ct);
    }

    public async Task OnSkippedAsync(NodeExecutionEventArgs args, CancellationToken ct)
    {
        await _audit.WriteAsync(new NodeAuditEntry
        {
            ExecutionId = args.ExecutionId,
            NodeId      = args.NodeId,
            Event       = AuditEvent.Skipped,
            SkipReason  = args.SkipReason.ToString(),
            Timestamp   = DateTimeOffset.UtcNow
        }, ct);
    }
}

Dispatcher Behaviour

The NodeEventDispatcher (internal engine class) handles the orchestration:

Exception in OnBeforeExecuteAsync: The rule above applies to all events except OnBeforeExecuteAsync. If a subscriber throws there, the engine treats it as a node failure. This is intentional but dangerous — design OnBefore subscribers to be extremely robust.