Portal Community

Example 1: Local Embedding Server (AI Inference)

Loads a local ONNX embedding model once at startup and serves embedding requests from all workflow executions concurrently.

public class LocalEmbeddingServerNode : BaseServerNode
{
    private readonly LocalEmbeddingOptions _options;
    private InferenceSession? _session;           // ONNX runtime session
    private readonly SemaphoreSlim _concurrencyGate;

    public override string NodeId      => "local-embedding";
    public override string DisplayName => "Local Text Embedding Server";

    public LocalEmbeddingServerNode(
        IServerNodeRegistry registry,
        IOptions<LocalEmbeddingOptions> options,
        ILogger<LocalEmbeddingServerNode> logger)
        : base(registry, logger)
    {
        _options         = options.Value;
        _concurrencyGate = new SemaphoreSlim(_options.MaxConcurrency, _options.MaxConcurrency);
    }

    protected override async Task InitialiseAsync(CancellationToken ct)
    {
        // Model load — ~5–30s depending on model size
        await Task.Run(() =>
        {
            _session = new InferenceSession(_options.ModelPath);
        }, ct);

        if (_options.WarmupOnStart)
            await WarmupAsync(ct);
    }

    public override async Task<ServerNodeResponse> HandleRequestAsync(
        ServerNodeRequest request, CancellationToken ct)
    {
        var text = request.Payload.RootElement.GetProperty("text").GetString()
                   ?? throw new ArgumentException("text is required");

        await _concurrencyGate.WaitAsync(ct);
        try
        {
            var vector = await EmbedAsync(text, ct);
            return new ServerNodeResponse
            {
                Success = true,
                Data    = JsonSerializer.SerializeToDocument(new
                {
                    vector     = vector,
                    dimensions = vector.Length,
                    model      = Path.GetFileNameWithoutExtension(_options.ModelPath)
                })
            };
        }
        finally { _concurrencyGate.Release(); }
    }

    public override ServerNodeHealth GetHealth() => new()
    {
        IsReady    = _session is not null,
        Status     = _session is not null ? "healthy" : "starting",
        CheckedAt  = DateTimeOffset.UtcNow,
        Diagnostics = new() { ["maxConcurrency"] = _options.MaxConcurrency }
    };

    protected override Task DisposeResourcesAsync(CancellationToken ct)
    {
        _session?.Dispose();
        return Task.CompletedTask;
    }
}

Example 2: WebSocket Listener

Maintains a persistent WebSocket connection to a trading feed. Workflow nodes can query the latest price snapshot without establishing a new connection per call.

public class TradingFeedServerNode : BaseServerNode
{
    private readonly ConcurrentDictionary<string, decimal> _latestPrices = new();
    private ClientWebSocket? _ws;

    public override string NodeId      => "trading-feed";
    public override string DisplayName => "Live Trading Price Feed";

    protected override async Task InitialiseAsync(CancellationToken ct)
    {
        _ws = new ClientWebSocket();
        await _ws.ConnectAsync(new Uri("wss://feed.exchange.example.com/prices"), ct);
        _ = Task.Run(() => ReceiveLoopAsync(ct), ct);   // background receive loop
    }

    public override Task<ServerNodeResponse> HandleRequestAsync(
        ServerNodeRequest request, CancellationToken ct)
    {
        var symbol = request.Payload.RootElement.GetProperty("symbol").GetString()!;
        if (_latestPrices.TryGetValue(symbol, out var price))
            return Task.FromResult(new ServerNodeResponse
            {
                Success = true,
                Data    = JsonSerializer.SerializeToDocument(new { symbol, price })
            });

        return Task.FromResult(new ServerNodeResponse
        {
            Success      = false,
            ErrorMessage = $"No price data available for symbol '{symbol}'"
        });
    }

    public override ServerNodeHealth GetHealth() => new()
    {
        IsReady = _ws?.State == WebSocketState.Open,
        Status  = _ws?.State.ToString() ?? "not started",
        CheckedAt = DateTimeOffset.UtcNow,
        Diagnostics = new() { ["trackedSymbols"] = _latestPrices.Count }
    };

    protected override async Task DisposeResourcesAsync(CancellationToken ct)
    {
        if (_ws?.State == WebSocketState.Open)
            await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "shutdown", ct);
        _ws?.Dispose();
    }
}

Example 3: CDC Consumer

A Change Data Capture consumer that maintains a cursor on a database transaction log. Workflow nodes can request the latest batch of changed records since the last call.

public class SqlCdcConsumerServerNode : BaseServerNode
{
    private long _lastLsn = 0;   // Log Sequence Number — cursor position

    public override string NodeId      => "sql-cdc-consumer";
    public override string DisplayName => "SQL Server CDC Consumer";

    protected override async Task InitialiseAsync(CancellationToken ct)
    {
        // Load last-known LSN from persistent store
        _lastLsn = await LoadCheckpointAsync(ct);
    }

    public override async Task<ServerNodeResponse> HandleRequestAsync(
        ServerNodeRequest request, CancellationToken ct)
    {
        var table  = request.Payload.RootElement.GetProperty("table").GetString()!;
        var (changes, newLsn) = await FetchChangesAsync(table, _lastLsn, ct);

        // Advance cursor — atomic update to avoid re-processing
        Interlocked.Exchange(ref _lastLsn, newLsn);
        await SaveCheckpointAsync(newLsn, ct);

        return new ServerNodeResponse
        {
            Success = true,
            Data    = JsonSerializer.SerializeToDocument(new
            {
                changes   = changes,
                fromLsn   = _lastLsn,
                toLsn     = newLsn,
                changeCount = changes.Count
            })
        };
    }

    public override ServerNodeHealth GetHealth() => new()
    {
        IsReady   = true,
        Status    = "healthy",
        CheckedAt = DateTimeOffset.UtcNow,
        Diagnostics = new() { ["lastLsn"] = _lastLsn }
    };

    protected override Task DisposeResourcesAsync(CancellationToken ct) => Task.CompletedTask;
}
Pattern summary: All three examples share the same structure — heavy initialisation in InitialiseAsync, stateful shared data in instance fields, thread-safe access via SemaphoreSlim / ConcurrentDictionary / Interlocked, and graceful cleanup in DisposeResourcesAsync. The HandleRequestAsync method is always concise: extract parameters, access shared state, return a structured response.