Portal Community

Integration Map

ProductEdgeStream RoleTopic NamespaceTransport
Flow StudioWorkflow execution events to Observer Panelworkflow.execution.*SignalR
OctopusLLM token streaming, agent eventsagent.chat.*, ancp.*SignalR
WorkDeskNotifications, HIL approvalsnotifications.*, hil.*SignalR
Atlas FormsForm interaction eventsform.*SignalR
ANCPAgent-to-agent protocol messagesancp.*SignalR / WS
EdgeInteractUI interaction events from forms/widgetsinteract.*SignalR

Flow Studio — Observer Panel

The Flow Studio Observer Panel is the most data-intensive EdgeStream consumer. It receives a continuous stream of node execution events and renders them as a live animated graph.

Topic Conventions

workflow.execution.started        // workflow began
workflow.execution.node-started   // individual node started
workflow.execution.node-completed // node finished successfully
workflow.execution.node-failed    // node threw an error
workflow.execution.paused         // HIL pause requested
workflow.execution.resumed        // HIL resume confirmed
workflow.execution.completed      // workflow done
workflow.execution.failed         // workflow-level failure

Observer Panel Subscription Setup

// In flow-studio observer store
export function setupObserverSubscriptions(workflowId: string) {
  return edgeStream.subscribe(
    'bas',
    'workflow.execution.*',
    (envelope: IEnvelope<WorkflowExecutionEvent>) => {
      if (envelope.meta.correlationId !== workflowId) return;

      const { nodeId, eventType, status, output, error } = envelope.body;
      observerStore.handleExecutionEvent(eventType, { nodeId, status, output, error });
    }
  );
}

Octopus — Agent Chat Streaming

Octopus streams LLM responses token-by-token. The backend publishes each token as a separate EdgeStream message; the chat UI accumulates them.

// Per-session token subscription
export function subscribeToAgentStream(
  conversationId: string,
  onToken: (token: string) => void,
  onComplete: () => void
) {
  return edgeStream.subscribe(
    'bas',
    `agent.chat.${conversationId}.*`,
    (envelope: IEnvelope<AgentStreamEvent>) => {
      switch (envelope.body.eventType) {
        case 'token':
          onToken(envelope.body.token);
          break;
        case 'tool-call':
          onToolCallStarted(envelope.body.toolName);
          break;
        case 'tool-result':
          onToolCallCompleted(envelope.body.result);
          break;
        case 'done':
          onComplete();
          break;
      }
    }
  );
}

WorkDesk — HIL Inbox

When a workflow pauses for human approval, WorkDesk receives an EdgeStream message on the user's personal topic. The user approves or rejects via the WorkDesk UI, and the response flows back over EdgeStream to resume the workflow.

// HIL approval request arrives at WorkDesk
edgeStream.subscribe(
  'bas',
  `hil.approval.${currentUserId}`,
  async (envelope: IEnvelope<HilApprovalRequest>) => {
    hilInboxStore.addTask({
      taskId: envelope.body.taskId,
      workflowName: envelope.body.workflowName,
      requestedBy: envelope.body.requestedBy,
      deadline: new Date(envelope.body.deadline),
      formSchema: envelope.body.formSchema,
    });
  }
);

// User submits decision
async function submitApproval(taskId: string, decision: 'approved' | 'rejected', comment: string) {
  await edgeStream.send('bas', 'hil.approval.response', {
    taskId,
    decision,
    comment,
    decidedBy: currentUserId,
    decidedAt: new Date().toISOString(),
  });
}

Backend Publisher Pattern

On the .NET backend, services publish EdgeStream messages via the IEdgeStreamPublisher service. This is the counterpart to frontend subscriptions.

// C# backend — inject IEdgeStreamPublisher
public class WorkflowExecutionService
{
    private readonly IEdgeStreamPublisher _publisher;

    public async Task OnNodeCompleted(string workflowId, string nodeId, object output)
    {
        await _publisher.PublishAsync(
            topic: "workflow.execution.node-completed",
            payload: new {
                workflowId,
                nodeId,
                eventType = "node-completed",
                status = "completed",
                output,
                timestamp = DateTime.UtcNow
            },
            tenantId: currentTenantId,
            correlationId: workflowId
        );
    }
}

Server Configuration in BizFirstGO Apps

Standard EdgeStream configuration used across BizFirstGO frontend applications:

// edge-stream-config.ts (shared across apps)
import { createEdgeStream, NormalizationHook } from 'edge-stream-js';
import { HookActivityLogger } from 'observability-hooks-js';

export function createBizFirstEdgeStream(accessToken: string) {
  const stream = createEdgeStream({
    logLevel: import.meta.env.DEV ? 'debug' : 'info',
    enableDevtools: import.meta.env.DEV,
  });

  stream.registerServer({
    id: 'bas',
    type: 'bas',
    name: 'BizFirstGO Application Server',
    url: import.meta.env.VITE_EDGE_STREAM_HUB_URL,
    transportConfig: {
      type: 'signalr',
      url: import.meta.env.VITE_EDGE_STREAM_HUB_URL,
      accessToken,
      reconnect: {
        maxAttempts: 0,
        initialDelayMs: 1000,
        maxDelayMs: 60000,
        backoffMultiplier: 2,
      },
    },
  });

  const server = stream.server('bas')!;

  if (import.meta.env.DEV) {
    server.incomingPipeline.addHook(new HookActivityLogger());
  }

  server.incomingPipeline.addHook(new NormalizationHook(true));

  return stream;
}