Portal Community

The Callback Topic

Each interaction gets a unique callback topic: interactions.callback.{interactionId}. This topic:

How PublishAndWaitAsync Uses the Callback

// Simplified internal implementation of PublishAndWaitAsync
public async Task<InteractionResponse> PublishAndWaitAsync(
    InteractionRequest request,
    CancellationToken ct = default)
{
    // 1. Assign interactionId
    request = request with { InteractionId = Guid.NewGuid().ToString() };

    // 2. Set up callback subscription BEFORE publishing
    //    (to avoid race conditions where the response arrives before we subscribe)
    var tcs = new TaskCompletionSource<InteractionResponse>();
    var callbackTopic = request.CallbackTopic
        ?? $"interactions.callback.{request.InteractionId}";

    var callbackSub = await _edgeStream.SubscribeAsync(callbackTopic, msg =>
    {
        var response = JsonSerializer.Deserialize<InteractionResponse>(msg.Body);
        if (response?.Outcome == "timeout")
        {
            tcs.TrySetException(new InteractionTimeoutException(request.InteractionId));
        }
        else
        {
            tcs.TrySetResult(response!);
        }
    });

    try
    {
        // 3. Run pre-send hooks
        await _hookRunner.RunBeforePublishAsync(request, ct);

        // 4. Publish to EdgeStream
        await _edgeStream.PublishAsync($"interactions.{request.TargetUserId}",
            JsonSerializer.Serialize(request));

        // 5. Set up timeout timer
        var timeoutToken = new CancellationTokenSource(request.TimeoutMs);

        // 6. Await response (or timeout)
        using var combined = CancellationTokenSource.CreateLinkedTokenSource(
            ct, timeoutToken.Token);

        return await tcs.Task.WaitAsync(combined.Token);
    }
    finally
    {
        // 7. Always clean up the callback subscription
        await callbackSub.UnsubscribeAsync();
    }
}

Race Condition: Subscribe Before Publish

A critical implementation detail: the callback subscription must be set up before the request is published to EdgeStream. If the subscription is set up after publishing, there is a window where a fast client could respond before the server is subscribed — causing the response to be lost.

Always Subscribe Before Publishing This is handled automatically by PublishAndWaitAsync(). If you are implementing a custom interaction publisher (not recommended), ensure your callback subscription is established before publishing the request.

Custom Callback Topics

Override callbackTopic when you need the response routed to a different service or subscription:

// Route response to a shared response aggregator
var request = new InteractionRequest {
    type: 'approval',
    targetUserId: 'usr_abc',
    payload: { ... },
    timeoutMs: 86_400_000,
    callbackTopic: 'approval-service/responses'
};

// Your aggregator service is subscribed to 'approval-service/responses'
// and handles all approval responses for that service