Portal Community

Workflow Execution Events

The Flow Studio Observer Panel uses EdgeStream to display live node execution state. Every time a workflow node starts, completes, or fails, the backend publishes an event on the workflow.execution.* topic namespace. The frontend subscribes and updates the visual graph in real time.

// Frontend: subscribe to all workflow execution events
const subscription = edgeStream.subscribe(
  'bas',
  'workflow.execution.*',
  (envelope: IEnvelope<WorkflowNodeEvent>) => {
    const { nodeId, status, output } = envelope.body;
    observerStore.updateNodeStatus(nodeId, status);
    if (output) observerStore.setNodeOutput(nodeId, output);
  }
);

// Backend publishes via IEdgeStreamPublisher
await publisher.PublishAsync(
  'workflow.execution.node-completed',
  new WorkflowNodeEvent { NodeId = nodeId, Status = "completed", Output = result },
  tenantId: tenantId
);

LLM Token Streaming (Octopus)

Octopus agent responses stream token-by-token over EdgeStream. Each token is a tiny message on the agent.chat.* topic. The chat UI subscribes and appends tokens to the current message bubble as they arrive — producing the streaming effect without polling.

// Subscribe to streaming tokens for a specific conversation
edgeStream.subscribe(
  'bas',
  `agent.chat.${conversationId}.*`,
  (envelope: IEnvelope<AgentTokenEvent>) => {
    if (envelope.body.type === 'token') {
      chatStore.appendToken(envelope.body.token);
    } else if (envelope.body.type === 'done') {
      chatStore.markComplete();
    }
  }
);

Human-in-the-Loop Approval

HIL (Human-in-the-Loop) workflows pause execution and wait for human approval. EdgeStream carries the approval request to the WorkDesk UI and delivers the response back to the workflow engine. The pipeline's pause() feature enables the flow-control side of this pattern.

// Approval request arrives on a topic
edgeStream.subscribe('bas', 'hil.approval.request', async (envelope) => {
  const { taskId, requestedBy, deadline, formSchema } = envelope.body;
  // Show approval UI to user
  hilInboxStore.addPendingApproval({ taskId, requestedBy, deadline, formSchema });
});

// User submits decision — send response
await edgeStream.send('bas', 'hil.approval.response', {
  taskId: taskId,
  decision: 'approved',
  comment: 'Looks good',
  decidedBy: currentUserId,
});

Real-Time Notifications

WorkDesk receives notifications for task assignments, mentions, and system alerts. A single subscriber on a per-user topic namespace ensures the right user sees the right notification.

// Per-user notification subscription
edgeStream.subscribe(
  'bas',
  `notifications.user.${userId}.*`,
  (envelope: IEnvelope<Notification>) => {
    notificationStore.push({
      id: envelope.meta.id,
      type: envelope.body.notificationType,
      title: envelope.body.title,
      at: envelope.meta.receivedAt,
    });
  }
);

ANCP Protocol Messages

The Agent Node Communication Protocol (ANCP) uses EdgeStream as its transport. ANCP messages flow between agent nodes during workflow execution — tool calls, results, memory reads, and orchestration signals all travel on the ancp.* namespace.

// ANCP tool call message
edgeStream.subscribe('bas', 'ancp.tool-call', (envelope) => {
  const { nodeId, toolName, args, correlationId } = envelope.body;
  toolRegistry.execute(toolName, args).then(result => {
    edgeStream.send('bas', 'ancp.tool-result', {
      correlationId,
      result,
      nodeId,
    });
  });
});

Form Interaction Events

Atlas Forms publishes interaction events over EdgeStream when users open, complete, or abandon a form. The backend can react instantly — triggering downstream workflow steps without polling.

// Subscribe to all form events for a workflow
edgeStream.subscribe(
  'bas',
  `form.${workflowId}.*`,
  (envelope: IEnvelope<FormEvent>) => {
    switch (envelope.body.eventType) {
      case 'form.opened':
        analytics.track('form_opened', envelope.body);
        break;
      case 'form.submitted':
        workflowEngine.resumeAfterForm(envelope.body.formId, envelope.body.data);
        break;
      case 'form.abandoned':
        workflowEngine.handleFormTimeout(envelope.body.formId);
        break;
    }
  }
);

Choosing EdgeStream vs. Direct HTTP

ScenarioEdgeStreamDirect HTTP
Real-time UI updatesPreferred — push model, no pollingRequires polling or SSE manually
LLM token streamingPreferred — per-token events, typedComplex with HTTP streaming
CRUD operationsNot recommendedPreferred
File uploadsNot recommendedPreferred (multipart)
Workflow event fanoutPreferred — multiple subscribersWould need event bus
Request/responseUse only when correlation neededPreferred (simple)