EdgeStream
Agent Message Plugin
The Agent Message Plugin owns the agent.* topic namespace. It carries Octopus agent messages from the LLM backend to the chat UI — including streaming tokens, tool calls, and turn completions.
Topic Namespace
agent.chat.turnStarted // New agent turn begins (user sent message)
agent.chat.token // Streaming token — one per LLM output token
agent.chat.toolCallStarted // Agent invoked a tool
agent.chat.toolCallCompleted // Tool call finished, result available
agent.chat.turnCompleted // Full agent response assembled
agent.chat.error // Agent error (timeout, API failure, etc.)
agent.status // Agent availability/health status
Message Formats
// agent.chat.token — high-frequency streaming message
interface AgentTokenMessage {
sessionId: string;
turnId: string;
agentId: string;
token: string; // single LLM output token (e.g., "Hello", " world")
tokenIndex: number; // position in the stream
isLast: boolean; // true on final token
}
// agent.chat.turnCompleted
interface AgentTurnCompletedMessage {
sessionId: string;
turnId: string;
agentId: string;
fullText: string; // complete assembled response
toolCallsUsed: string[];
inputTokens: number;
outputTokens: number;
durationMs: number;
}
// agent.chat.toolCallStarted
interface AgentToolCallMessage {
sessionId: string;
turnId: string;
toolName: string;
toolCallId: string;
arguments: Record<string, unknown>;
}
Streaming Chat Implementation
import { useCallback, useState } from 'react';
import { useSubscription } from 'edge-stream-js-react';
function ChatStreamingWindow({ sessionId }: { sessionId: string }) {
const [streamingText, setStreamingText] = useState('');
const [isStreaming, setIsStreaming] = useState(false);
const [toolCalls, setToolCalls] = useState<string[]>([]);
const tokenHandler = useCallback((envelope) => {
const { token, sessionId: sid, isLast } = envelope.body;
if (sid !== sessionId) return;
setStreamingText(prev => prev + token);
if (isLast) setIsStreaming(false);
}, [sessionId]);
const turnHandler = useCallback((envelope) => {
if (envelope.body.sessionId !== sessionId) return;
setIsStreaming(true);
setStreamingText('');
setToolCalls([]);
}, [sessionId]);
const toolHandler = useCallback((envelope) => {
if (envelope.body.sessionId !== sessionId) return;
setToolCalls(prev => [...prev, envelope.body.toolName]);
}, [sessionId]);
// Use 'chat' server which has WebSocket transport
useSubscription('chat', 'agent.chat.token', tokenHandler);
useSubscription('chat', 'agent.chat.turnStarted', turnHandler);
useSubscription('chat', 'agent.chat.toolCallStarted', toolHandler);
return (
<div>
{toolCalls.length > 0 && (
<div className="tool-calls">
Using tools: {toolCalls.join(', ')}
</div>
)}
<div className="agent-response">
{streamingText}
{isStreaming && <span className="cursor">|</span>}
</div>
</div>
);
}
High-Frequency Token Handling
Token messages arrive at LLM output speed — potentially 50-100 tokens/second. Apply these patterns to prevent rendering bottlenecks:
// Pattern: accumulate tokens in a ref, flush to state on requestAnimationFrame
function EfficientTokenStream({ sessionId }) {
const [text, setText] = useState('');
const bufferRef = useRef('');
const rafRef = useRef<number>(0);
const tokenHandler = useCallback((envelope) => {
if (envelope.body.sessionId !== sessionId) return;
bufferRef.current += envelope.body.token;
// Only schedule one RAF at a time
if (!rafRef.current) {
rafRef.current = requestAnimationFrame(() => {
setText(prev => prev + bufferRef.current);
bufferRef.current = '';
rafRef.current = 0;
});
}
}, [sessionId]);
useSubscription('chat', 'agent.chat.token', tokenHandler);
return <pre>{text}</pre>;
}
Use the 'chat' Server
Agent messages use the
'chat' server (WebSocket transport) — not the 'bas' server (SignalR). The WebSocket transport is lower-overhead for high-frequency streaming, and the namespaces are isolated so token floods do not affect workflow event delivery.