EdgeInteract
Callback Subscription
The callback subscription is the server-side subscription to interactions.callback.{interactionId}. The InteractionPipeline manages this subscription internally as part of PublishAndWaitAsync(). This page explains what happens under the hood.
The Callback Topic
Each interaction gets a unique callback topic: interactions.callback.{interactionId}. This topic:
- Is created transiently when the interaction is published
- Only two parties use it: the client (publishes
InteractionResponse) and the server pipeline (subscribes) - Is destroyed after the response or timeout is received
- Cannot be subscribed to by arbitrary clients — topic access control enforces this
How PublishAndWaitAsync Uses the Callback
// Simplified internal implementation of PublishAndWaitAsync
public async Task<InteractionResponse> PublishAndWaitAsync(
InteractionRequest request,
CancellationToken ct = default)
{
// 1. Assign interactionId
request = request with { InteractionId = Guid.NewGuid().ToString() };
// 2. Set up callback subscription BEFORE publishing
// (to avoid race conditions where the response arrives before we subscribe)
var tcs = new TaskCompletionSource<InteractionResponse>();
var callbackTopic = request.CallbackTopic
?? $"interactions.callback.{request.InteractionId}";
var callbackSub = await _edgeStream.SubscribeAsync(callbackTopic, msg =>
{
var response = JsonSerializer.Deserialize<InteractionResponse>(msg.Body);
if (response?.Outcome == "timeout")
{
tcs.TrySetException(new InteractionTimeoutException(request.InteractionId));
}
else
{
tcs.TrySetResult(response!);
}
});
try
{
// 3. Run pre-send hooks
await _hookRunner.RunBeforePublishAsync(request, ct);
// 4. Publish to EdgeStream
await _edgeStream.PublishAsync($"interactions.{request.TargetUserId}",
JsonSerializer.Serialize(request));
// 5. Set up timeout timer
var timeoutToken = new CancellationTokenSource(request.TimeoutMs);
// 6. Await response (or timeout)
using var combined = CancellationTokenSource.CreateLinkedTokenSource(
ct, timeoutToken.Token);
return await tcs.Task.WaitAsync(combined.Token);
}
finally
{
// 7. Always clean up the callback subscription
await callbackSub.UnsubscribeAsync();
}
}
Race Condition: Subscribe Before Publish
A critical implementation detail: the callback subscription must be set up before the request is published to EdgeStream. If the subscription is set up after publishing, there is a window where a fast client could respond before the server is subscribed — causing the response to be lost.
Always Subscribe Before Publishing
This is handled automatically by
PublishAndWaitAsync(). If you are implementing a custom interaction publisher (not recommended), ensure your callback subscription is established before publishing the request.
Custom Callback Topics
Override callbackTopic when you need the response routed to a different service or subscription:
// Route response to a shared response aggregator
var request = new InteractionRequest {
type: 'approval',
targetUserId: 'usr_abc',
payload: { ... },
timeoutMs: 86_400_000,
callbackTopic: 'approval-service/responses'
};
// Your aggregator service is subscribed to 'approval-service/responses'
// and handles all approval responses for that service