Flow Studio
Examples
Three real-world server node implementations — an AI inference server (ONNX embedding), a WebSocket event listener, and a CDC consumer — illustrating the pattern across different use cases.
Example 1: Local Embedding Server (AI Inference)
Loads a local ONNX embedding model once at startup and serves embedding requests from all workflow executions concurrently.
public class LocalEmbeddingServerNode : BaseServerNode
{
private readonly LocalEmbeddingOptions _options;
private InferenceSession? _session; // ONNX runtime session
private readonly SemaphoreSlim _concurrencyGate;
public override string NodeId => "local-embedding";
public override string DisplayName => "Local Text Embedding Server";
public LocalEmbeddingServerNode(
IServerNodeRegistry registry,
IOptions<LocalEmbeddingOptions> options,
ILogger<LocalEmbeddingServerNode> logger)
: base(registry, logger)
{
_options = options.Value;
_concurrencyGate = new SemaphoreSlim(_options.MaxConcurrency, _options.MaxConcurrency);
}
protected override async Task InitialiseAsync(CancellationToken ct)
{
// Model load — ~5–30s depending on model size
await Task.Run(() =>
{
_session = new InferenceSession(_options.ModelPath);
}, ct);
if (_options.WarmupOnStart)
await WarmupAsync(ct);
}
public override async Task<ServerNodeResponse> HandleRequestAsync(
ServerNodeRequest request, CancellationToken ct)
{
var text = request.Payload.RootElement.GetProperty("text").GetString()
?? throw new ArgumentException("text is required");
await _concurrencyGate.WaitAsync(ct);
try
{
var vector = await EmbedAsync(text, ct);
return new ServerNodeResponse
{
Success = true,
Data = JsonSerializer.SerializeToDocument(new
{
vector = vector,
dimensions = vector.Length,
model = Path.GetFileNameWithoutExtension(_options.ModelPath)
})
};
}
finally { _concurrencyGate.Release(); }
}
public override ServerNodeHealth GetHealth() => new()
{
IsReady = _session is not null,
Status = _session is not null ? "healthy" : "starting",
CheckedAt = DateTimeOffset.UtcNow,
Diagnostics = new() { ["maxConcurrency"] = _options.MaxConcurrency }
};
protected override Task DisposeResourcesAsync(CancellationToken ct)
{
_session?.Dispose();
return Task.CompletedTask;
}
}
Example 2: WebSocket Listener
Maintains a persistent WebSocket connection to a trading feed. Workflow nodes can query the latest price snapshot without establishing a new connection per call.
public class TradingFeedServerNode : BaseServerNode
{
private readonly ConcurrentDictionary<string, decimal> _latestPrices = new();
private ClientWebSocket? _ws;
public override string NodeId => "trading-feed";
public override string DisplayName => "Live Trading Price Feed";
protected override async Task InitialiseAsync(CancellationToken ct)
{
_ws = new ClientWebSocket();
await _ws.ConnectAsync(new Uri("wss://feed.exchange.example.com/prices"), ct);
_ = Task.Run(() => ReceiveLoopAsync(ct), ct); // background receive loop
}
public override Task<ServerNodeResponse> HandleRequestAsync(
ServerNodeRequest request, CancellationToken ct)
{
var symbol = request.Payload.RootElement.GetProperty("symbol").GetString()!;
if (_latestPrices.TryGetValue(symbol, out var price))
return Task.FromResult(new ServerNodeResponse
{
Success = true,
Data = JsonSerializer.SerializeToDocument(new { symbol, price })
});
return Task.FromResult(new ServerNodeResponse
{
Success = false,
ErrorMessage = $"No price data available for symbol '{symbol}'"
});
}
public override ServerNodeHealth GetHealth() => new()
{
IsReady = _ws?.State == WebSocketState.Open,
Status = _ws?.State.ToString() ?? "not started",
CheckedAt = DateTimeOffset.UtcNow,
Diagnostics = new() { ["trackedSymbols"] = _latestPrices.Count }
};
protected override async Task DisposeResourcesAsync(CancellationToken ct)
{
if (_ws?.State == WebSocketState.Open)
await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, "shutdown", ct);
_ws?.Dispose();
}
}
Example 3: CDC Consumer
A Change Data Capture consumer that maintains a cursor on a database transaction log. Workflow nodes can request the latest batch of changed records since the last call.
public class SqlCdcConsumerServerNode : BaseServerNode
{
private long _lastLsn = 0; // Log Sequence Number — cursor position
public override string NodeId => "sql-cdc-consumer";
public override string DisplayName => "SQL Server CDC Consumer";
protected override async Task InitialiseAsync(CancellationToken ct)
{
// Load last-known LSN from persistent store
_lastLsn = await LoadCheckpointAsync(ct);
}
public override async Task<ServerNodeResponse> HandleRequestAsync(
ServerNodeRequest request, CancellationToken ct)
{
var table = request.Payload.RootElement.GetProperty("table").GetString()!;
var (changes, newLsn) = await FetchChangesAsync(table, _lastLsn, ct);
// Advance cursor — atomic update to avoid re-processing
Interlocked.Exchange(ref _lastLsn, newLsn);
await SaveCheckpointAsync(newLsn, ct);
return new ServerNodeResponse
{
Success = true,
Data = JsonSerializer.SerializeToDocument(new
{
changes = changes,
fromLsn = _lastLsn,
toLsn = newLsn,
changeCount = changes.Count
})
};
}
public override ServerNodeHealth GetHealth() => new()
{
IsReady = true,
Status = "healthy",
CheckedAt = DateTimeOffset.UtcNow,
Diagnostics = new() { ["lastLsn"] = _lastLsn }
};
protected override Task DisposeResourcesAsync(CancellationToken ct) => Task.CompletedTask;
}
Pattern summary: All three examples share the same structure — heavy initialisation in
InitialiseAsync, stateful shared data in instance fields, thread-safe access via SemaphoreSlim / ConcurrentDictionary / Interlocked, and graceful cleanup in DisposeResourcesAsync. The HandleRequestAsync method is always concise: extract parameters, access shared state, return a structured response.