BizFirstGO Integration
Where EdgeStream powers Flow Studio, Octopus, WorkDesk, and other BizFirstGO products — with topic namespaces and integration patterns for each.
Integration Map
| Product | EdgeStream Role | Topic Namespace | Transport |
|---|---|---|---|
| Flow Studio | Workflow execution events to Observer Panel | workflow.execution.* | SignalR |
| Octopus | LLM token streaming, agent events | agent.chat.*, ancp.* | SignalR |
| WorkDesk | Notifications, HIL approvals | notifications.*, hil.* | SignalR |
| Atlas Forms | Form interaction events | form.* | SignalR |
| ANCP | Agent-to-agent protocol messages | ancp.* | SignalR / WS |
| EdgeInteract | UI interaction events from forms/widgets | interact.* | SignalR |
Flow Studio — Observer Panel
The Flow Studio Observer Panel is the most data-intensive EdgeStream consumer. It receives a continuous stream of node execution events and renders them as a live animated graph.
Topic Conventions
workflow.execution.started // workflow began
workflow.execution.node-started // individual node started
workflow.execution.node-completed // node finished successfully
workflow.execution.node-failed // node threw an error
workflow.execution.paused // HIL pause requested
workflow.execution.resumed // HIL resume confirmed
workflow.execution.completed // workflow done
workflow.execution.failed // workflow-level failure
Observer Panel Subscription Setup
// In flow-studio observer store
export function setupObserverSubscriptions(workflowId: string) {
return edgeStream.subscribe(
'bas',
'workflow.execution.*',
(envelope: IEnvelope<WorkflowExecutionEvent>) => {
if (envelope.meta.correlationId !== workflowId) return;
const { nodeId, eventType, status, output, error } = envelope.body;
observerStore.handleExecutionEvent(eventType, { nodeId, status, output, error });
}
);
}
Octopus — Agent Chat Streaming
Octopus streams LLM responses token-by-token. The backend publishes each token as a separate EdgeStream message; the chat UI accumulates them.
// Per-session token subscription
export function subscribeToAgentStream(
conversationId: string,
onToken: (token: string) => void,
onComplete: () => void
) {
return edgeStream.subscribe(
'bas',
`agent.chat.${conversationId}.*`,
(envelope: IEnvelope<AgentStreamEvent>) => {
switch (envelope.body.eventType) {
case 'token':
onToken(envelope.body.token);
break;
case 'tool-call':
onToolCallStarted(envelope.body.toolName);
break;
case 'tool-result':
onToolCallCompleted(envelope.body.result);
break;
case 'done':
onComplete();
break;
}
}
);
}
WorkDesk — HIL Inbox
When a workflow pauses for human approval, WorkDesk receives an EdgeStream message on the user's personal topic. The user approves or rejects via the WorkDesk UI, and the response flows back over EdgeStream to resume the workflow.
// HIL approval request arrives at WorkDesk
edgeStream.subscribe(
'bas',
`hil.approval.${currentUserId}`,
async (envelope: IEnvelope<HilApprovalRequest>) => {
hilInboxStore.addTask({
taskId: envelope.body.taskId,
workflowName: envelope.body.workflowName,
requestedBy: envelope.body.requestedBy,
deadline: new Date(envelope.body.deadline),
formSchema: envelope.body.formSchema,
});
}
);
// User submits decision
async function submitApproval(taskId: string, decision: 'approved' | 'rejected', comment: string) {
await edgeStream.send('bas', 'hil.approval.response', {
taskId,
decision,
comment,
decidedBy: currentUserId,
decidedAt: new Date().toISOString(),
});
}
Backend Publisher Pattern
On the .NET backend, services publish EdgeStream messages via the IEdgeStreamPublisher service. This is the counterpart to frontend subscriptions.
// C# backend — inject IEdgeStreamPublisher
public class WorkflowExecutionService
{
private readonly IEdgeStreamPublisher _publisher;
public async Task OnNodeCompleted(string workflowId, string nodeId, object output)
{
await _publisher.PublishAsync(
topic: "workflow.execution.node-completed",
payload: new {
workflowId,
nodeId,
eventType = "node-completed",
status = "completed",
output,
timestamp = DateTime.UtcNow
},
tenantId: currentTenantId,
correlationId: workflowId
);
}
}
Server Configuration in BizFirstGO Apps
Standard EdgeStream configuration used across BizFirstGO frontend applications:
// edge-stream-config.ts (shared across apps)
import { createEdgeStream, NormalizationHook } from 'edge-stream-js';
import { HookActivityLogger } from 'observability-hooks-js';
export function createBizFirstEdgeStream(accessToken: string) {
const stream = createEdgeStream({
logLevel: import.meta.env.DEV ? 'debug' : 'info',
enableDevtools: import.meta.env.DEV,
});
stream.registerServer({
id: 'bas',
type: 'bas',
name: 'BizFirstGO Application Server',
url: import.meta.env.VITE_EDGE_STREAM_HUB_URL,
transportConfig: {
type: 'signalr',
url: import.meta.env.VITE_EDGE_STREAM_HUB_URL,
accessToken,
reconnect: {
maxAttempts: 0,
initialDelayMs: 1000,
maxDelayMs: 60000,
backoffMultiplier: 2,
},
},
});
const server = stream.server('bas')!;
if (import.meta.env.DEV) {
server.incomingPipeline.addHook(new HookActivityLogger());
}
server.incomingPipeline.addHook(new NormalizationHook(true));
return stream;
}