diff --git a/src/tools/task/runtime.rs b/src/tools/task/runtime.rs index be78f8b..e29d9fa 100644 --- a/src/tools/task/runtime.rs +++ b/src/tools/task/runtime.rs @@ -11,6 +11,7 @@ use crate::agent::{AgentLoop, AgentRuntimeConfig, EmittedMessageHandler, Persist use crate::bus::ChatMessage; use crate::bus::message::{OutboundMessage, OutboundEventKind}; use crate::bus::MessageBus; +use crate::providers::StreamDelta; use crate::config::{LLMProviderConfig, SubagentsConfig}; use crate::storage::{ConversationRepository, SessionStore}; use crate::tools::{ToolContext, ToolRegistry}; @@ -107,6 +108,7 @@ struct SubAgentEmitter { metadata: HashMap, store: Arc, sub_session_id: String, + stream_message_id: std::sync::Mutex>, } #[async_trait] @@ -159,6 +161,41 @@ impl EmittedMessageHandler for SubAgentEmitter { self.persist_todo_write_result(&message); } } + + async fn handle_stream_delta(&self, delta: &StreamDelta) { + let message_id = { + let mut guard = self.stream_message_id.lock().unwrap(); + guard.get_or_insert_with(|| uuid::Uuid::new_v4().to_string()).clone() + }; + + let outbound = if delta.content.is_empty() && delta.reasoning_content.is_none() { + OutboundMessage::stream_end( + &self.channel_name, + &self.chat_id, + None, + &message_id, + self.metadata.clone(), + ) + } else { + OutboundMessage::stream_delta( + &self.channel_name, + &self.chat_id, + None, + &message_id, + &delta.content, + delta.reasoning_content.clone(), + self.metadata.clone(), + ) + }; + + if let Err(error) = self.bus.publish_outbound(outbound).await { + tracing::error!(error = %error, channel = %self.channel_name, "Failed to publish sub-agent stream delta"); + } + } + + async fn set_stream_message_id(&self, id: &str) { + *self.stream_message_id.lock().unwrap() = Some(id.to_string()); + } } impl SubAgentEmitter { @@ -316,6 +353,7 @@ impl DefaultSubAgentRuntime { metadata, store: self.store.clone(), sub_session_id: session.session_id.clone(), + stream_message_id: std::sync::Mutex::new(None), }, self.conversation_repository.clone(), session.session_id.clone(), diff --git a/web/src/components/Chat/MessageBubble.tsx b/web/src/components/Chat/MessageBubble.tsx index 643f75b..e8a2751 100644 --- a/web/src/components/Chat/MessageBubble.tsx +++ b/web/src/components/Chat/MessageBubble.tsx @@ -684,6 +684,7 @@ export function MessageBubble({ message, onNavigateToSubAgent, showThinking = tr )} {/* AI 和工具消息使用 Markdown 渲染 */} + {message.content.trim() && (
+ )} )} {message.attachments && message.attachments.length > 0 && (
diff --git a/web/src/hooks/useChat.ts b/web/src/hooks/useChat.ts index 4af92cf..7bc19a7 100644 --- a/web/src/hooks/useChat.ts +++ b/web/src/hooks/useChat.ts @@ -28,6 +28,7 @@ import type { Channel, ChannelList, StreamDelta, + StreamEnd, } from '../types/protocol' // 简化后的层级状态 @@ -187,6 +188,9 @@ export function useChat(): UseChatReturn { || message.type === 'tool_pending' || message.type === 'assistant_response') { return (message as ToolCall | ToolResult | ToolPending | AssistantResponse).subagent_task_id } + if (message.type === 'stream_delta' || message.type === 'stream_end') { + return (message as StreamDelta | StreamEnd).subagent_task_id + } return undefined } @@ -258,6 +262,18 @@ export function useChat(): UseChatReturn { subagentTaskId: msg.subagent_task_id, } } + case 'stream_delta': { + const msg = message as StreamDelta + return { + id: msg.id, + role: 'assistant' as const, + content: msg.delta, + timestamp: Math.floor(Date.now() / 1000), + type: 'message' as const, + subagentTaskId: msg.subagent_task_id, + reasoningContent: msg.reasoning_delta, + } + } case 'error': { return { id: generateMessageId(), @@ -272,15 +288,48 @@ export function useChat(): UseChatReturn { } } - // Append a server message to the sub-agent view + // Append a server message to the sub-agent view (with streaming delta accumulation) const appendToSubAgentViewMessage = (message: WsOutbound) => { + // stream_delta: accumulate into existing message by ID, or create new + if (message.type === 'stream_delta') { + const msg = message as StreamDelta + setSubAgentView((prev) => { + if (!prev) return prev + const existingIdx = prev.messages.findIndex(m => m.id === msg.id && m.type === 'message') + if (existingIdx >= 0) { + const updated = [...prev.messages] + const existing = updated[existingIdx] + updated[existingIdx] = { + ...existing, + content: existing.content + msg.delta, + reasoningContent: msg.reasoning_delta + ? (existing.reasoningContent || '') + msg.reasoning_delta + : existing.reasoningContent, + } + return { ...prev, messages: updated } + } + const chatMsg = serverMessageToChatMessage(message) + return chatMsg ? { ...prev, messages: [...prev.messages, chatMsg] } : prev + }) + return + } + // stream_end: no-op, assistant_response will replace + if (message.type === 'stream_end') return + // Other messages: assistant_response replaces streamed message by ID const chatMsg = serverMessageToChatMessage(message) if (chatMsg) { - setSubAgentView((prev) => - prev - ? { ...prev, messages: [...prev.messages, chatMsg] } - : prev - ) + setSubAgentView((prev) => { + if (!prev) return prev + if (message.type === 'assistant_response') { + const existingIdx = prev.messages.findIndex(m => m.id === chatMsg.id && m.type === 'message') + if (existingIdx >= 0) { + const updated = [...prev.messages] + updated[existingIdx] = chatMsg + return { ...prev, messages: updated } + } + } + return { ...prev, messages: [...prev.messages, chatMsg] } + }) } }