Flow Studio
Reading Pinned Data in Executors
ctx.ExecutionMemory.GetPinnedData(nodeId) retrieves the pinned data that was stored by a previous run of the specified node. The call hits the database — it is a real async read, not an in-memory lookup. The data returned was written by the most recent run that set non-null pinned data for that node.
IExecutionMemory — Pinned Data Methods
// IExecutionMemory.cs (pinned data methods only)
public interface IExecutionMemory
{
// Read pinned data for any node in this workflow by nodeId
Task<object?> GetPinnedDataAsync(string nodeId, CancellationToken ct = default);
// Typed convenience overload — deserializes JSON to T
Task<T?> GetPinnedDataAsync<T>(string nodeId, CancellationToken ct = default)
where T : class;
// ... ephemeral output methods are separate (GetNodeOutput, SetNodeOutput, etc.)
}
Basic Pattern — Reading Own Previous Pinned Data
The most common pattern is a node reading its own pinned data from the last run in order to build on it.
// ConversationMemoryNode.cs
public override async Task<NodeExecutionResult> ExecuteAsync(
NodeExecutionContext ctx,
CancellationToken ct)
{
// Read what this node pinned in the previous run
var history = await ctx.ExecutionMemory
.GetPinnedDataAsync<ConversationHistory>(ctx.NodeId, ct);
// history is null on first run (no previous pinned data)
history ??= new ConversationHistory { Messages = new List<ChatMessage>() };
// Add new message from this run's input
var userMessage = ctx.ExecutionMemory.GetNodeOutput<string>("input-node");
history.Messages.Add(new ChatMessage { Role = "user", Content = userMessage });
var aiReply = await _llm.ChatAsync(history.Messages, ct);
history.Messages.Add(new ChatMessage { Role = "assistant", Content = aiReply });
// Return the reply as output, and the updated history as pinned data
return NodeExecutionResult.Success(
output : aiReply,
pinnedData : history
);
}
Reading Another Node's Pinned Data
A node can read the pinned data set by any other node in the same workflow by passing that node's ID. This enables cross-node durable state sharing.
// DownstreamAnalyticsNode.cs
public override async Task<NodeExecutionResult> ExecuteAsync(
NodeExecutionContext ctx,
CancellationToken ct)
{
// Read pinned data from a different node in this workflow
var cacheData = await ctx.ExecutionMemory
.GetPinnedDataAsync<ApiResponseCache>("api-fetch-node", ct);
if (cacheData is not null && cacheData.FetchedAt > DateTime.UtcNow.AddHours(-1))
{
// Cache is fresh — use pinned data instead of making a new API call
ctx.Observability.Logger.LogInformation("Using cached API response from previous run");
return NodeExecutionResult.Success(cacheData.Data);
}
// Cache stale or absent — fetch fresh
var fresh = await _apiClient.FetchAsync(ct);
return NodeExecutionResult.Success(
output : fresh,
pinnedData : new ApiResponseCache { Data = fresh, FetchedAt = DateTime.UtcNow }
);
}
Read vs. Get — Method Differences
| Method | Return Type | Notes |
|---|---|---|
GetPinnedDataAsync(nodeId) | Task<object?> | Untyped — returns JsonElement or null |
GetPinnedDataAsync<T>(nodeId) | Task<T?> | Typed — deserializes JSON to T; returns null if no data or deserialization fails |
Null Handling
GetPinnedDataAsync returns null in three cases:
- The node has never set pinned data (first run).
- The pinned data row has expired (ExpiresAt is in the past).
- The pinned data was explicitly cleared via
ClearPinnedDataAsync.
Always null-check the return value and provide a sensible default before using it.
GetPinnedData reads from the database, not from memory. Unlike
GetNodeOutput (which reads from an in-memory dictionary populated during this execution), GetPinnedDataAsync always makes a database query. Call it once and cache the result in a local variable if you need it multiple times within the same ExecuteAsync call.
Pinned data scope is processId, not executionId. If the same process runs concurrently (multiple simultaneous executions), all of them share the same pinned data row. Last-write-wins — concurrent writes are not conflicted. Design your pinned data schema to handle this if concurrent execution is expected.