From d0051baa0776a0e9157ea8302e5f1dd1a14d3b22 Mon Sep 17 00:00:00 2001 From: oudecheng <13802883547@139.com> Date: Fri, 29 May 2026 16:47:57 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E6=B6=88=E6=81=AF=E6=8C=81?= =?UTF-8?q?=E4=B9=85=E5=8C=96=E4=BB=8E=E6=89=B9=E9=87=8F=E6=94=B9=E4=B8=BA?= =?UTF-8?q?=E5=AE=9E=E6=97=B6=E9=80=90=E6=9D=A1=EF=BC=8C=E9=80=9A=E8=BF=87?= =?UTF-8?q?=E8=A3=85=E9=A5=B0=E5=99=A8=E6=A8=A1=E5=BC=8F=E5=AE=9E=E7=8E=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 PersistingEmittedMessageHandler 装饰器,在 emitter 广播前逐条落库 - processor 和 task/runtime 使用装饰器包裹 emitter,替代 post-loop 批量写入 - 移除 session_history 中的批量 DB 写入,仅保留内存历史更新 - execution 中跳过已由 live emitter 实时广播的工具消息,避免重复 - 前端支持运行中 task 工具卡片"查看实时进度"跳转子智能体视图 Co-Authored-By: Claude Opus 4.7 --- src/agent/agent_loop.rs | 31 +++++++++++++++++++++++ src/agent/mod.rs | 3 ++- src/command/adapters/websocket.rs | 14 +++++----- src/gateway/execution.rs | 6 ++++- src/gateway/processor.rs | 16 +++++++----- src/gateway/session_history.rs | 24 +++--------------- src/gateway/ws.rs | 3 +++ src/protocol/mod.rs | 4 +++ src/protocol/ws_adapter.rs | 5 ++++ src/tools/task/runtime.rs | 28 ++++++++------------ web/src/components/Chat/MessageBubble.tsx | 25 ++++++++++++++++++ web/src/hooks/useChat.ts | 23 ++++++++++++++--- web/src/types/protocol.ts | 2 ++ 13 files changed, 128 insertions(+), 56 deletions(-) diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index 619ada1..a69ff2d 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -2,6 +2,7 @@ use crate::agent::AgentRuntimeConfig; use crate::agent::{SystemPromptContext, SystemPromptProvider}; use crate::bus::ChatMessage; use crate::bus::message::ToolMessageState; +use crate::storage::ConversationRepository; use crate::domain::messages::{ContentBlock, ToolCall}; use crate::observability::{ Observer, ObserverEvent, ToolExecutionOutcome, ToolExecutionState, truncate_args, @@ -657,6 +658,36 @@ pub trait EmittedMessageHandler: Send + Sync + 'static { async fn handle(&self, message: ChatMessage); } +/// 装饰器:在内部 emitter 广播前,先将消息持久化到 DB +pub struct PersistingEmittedMessageHandler { + inner: H, + conversation_repository: Arc, + session_id: String, +} + +impl PersistingEmittedMessageHandler { + pub fn new( + inner: H, + conversation_repository: Arc, + session_id: impl Into, + ) -> Self { + Self { inner, conversation_repository, session_id: session_id.into() } + } +} + +#[async_trait] +impl EmittedMessageHandler for PersistingEmittedMessageHandler { + async fn handle(&self, message: ChatMessage) { + if let Err(e) = self.conversation_repository + .append_message(&self.session_id, &message) + { + tracing::error!(error = %e, session_id = %self.session_id, + "Failed to persist emitted message"); + } + self.inner.handle(message).await; + } +} + pub trait SkillProvider: Send + Sync + 'static { fn system_index_prompt(&self) -> Option; diff --git a/src/agent/mod.rs b/src/agent/mod.rs index ec41da3..65e9f28 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -4,7 +4,8 @@ pub mod runtime_config; pub mod system_prompt; pub use agent_loop::{ - AgentError, AgentLoop, AgentProcessResult, EmittedMessageHandler, SkillProvider, + AgentError, AgentLoop, AgentProcessResult, EmittedMessageHandler, + PersistingEmittedMessageHandler, SkillProvider, }; pub use context_compressor::ContextCompressor; pub use runtime_config::AgentRuntimeConfig; diff --git a/src/command/adapters/websocket.rs b/src/command/adapters/websocket.rs index 2b20022..3bf5034 100644 --- a/src/command/adapters/websocket.rs +++ b/src/command/adapters/websocket.rs @@ -77,7 +77,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), + attachments: Vec::new(), subagent_task_id: None, }, MessageKind::Notification => { // 根据元数据判断具体类型 @@ -97,7 +97,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), + attachments: Vec::new(), subagent_task_id: None, }, } } else if let Some(session_id) = response.metadata.get("session_id") { @@ -136,7 +136,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), + attachments: Vec::new(), subagent_task_id: None, }, } } else if let Some(sessions_json) = response.metadata.get("sessions") { @@ -154,7 +154,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), + attachments: Vec::new(), subagent_task_id: None, }, } } else if let Some(topics_json) = response.metadata.get("topics") { @@ -173,7 +173,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), + attachments: Vec::new(), subagent_task_id: None, }, } } else { @@ -182,7 +182,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), + attachments: Vec::new(), subagent_task_id: None, } } } @@ -194,7 +194,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), + attachments: Vec::new(), subagent_task_id: None, }, }; outbounds.push(outbound); diff --git a/src/gateway/execution.rs b/src/gateway/execution.rs index ecac20f..7886419 100644 --- a/src/gateway/execution.rs +++ b/src/gateway/execution.rs @@ -157,7 +157,11 @@ impl AgentExecutionService { .emitted_messages .iter() .filter(|message| { - (!message.is_assistant_tool_call_message() || !request.suppress_live_tool_calls) + // 当存在 live_emitter 时,工具调用和工具结果已在 loop 中实时广播 + // 只保留最终 assistant 文本通过 post-loop 路径发送 + let already_emitted = request.suppress_live_tool_calls + && (message.is_assistant_tool_call_message() || message.role == "tool"); + !already_emitted && should_display_message_to_user(self.show_tool_results, message) }) .flat_map(|message| { diff --git a/src/gateway/processor.rs b/src/gateway/processor.rs index 4fd0dbe..b30aa01 100644 --- a/src/gateway/processor.rs +++ b/src/gateway/processor.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use tokio::sync::Semaphore; -use crate::agent::{AgentError, CompositeSystemPromptProvider}; +use crate::agent::{AgentError, CompositeSystemPromptProvider, PersistingEmittedMessageHandler}; use crate::bus::{InboundMessage, MessageBus, OutboundMessage}; use crate::command::adapter::InputAdapter; use crate::command::adapters::channel::ChannelInputAdapter; @@ -218,11 +218,15 @@ impl InboundProcessor { } // 普通消息进入 AgentLoop - let live_emitter = Arc::new(BusToolCallEmitter::new( - self.bus.clone(), - inbound.channel.clone(), - inbound.chat_id.clone(), - inbound.forwarded_metadata.clone(), + let live_emitter = Arc::new(PersistingEmittedMessageHandler::new( + BusToolCallEmitter::new( + self.bus.clone(), + inbound.channel.clone(), + inbound.chat_id.clone(), + inbound.forwarded_metadata.clone(), + ), + self.session_manager.store(), + &session_id, )); match self diff --git a/src/gateway/session_history.rs b/src/gateway/session_history.rs index 1ca4dae..88719a0 100644 --- a/src/gateway/session_history.rs +++ b/src/gateway/session_history.rs @@ -158,39 +158,23 @@ impl SessionHistory { return Ok(()); } - let session_id = self.persistent_session_id(chat_id); - let topic_id = self.chat_topic_ids.get(chat_id).map(|s| s.as_str()); - self.conversations - .append_messages_batch(&session_id, topic_id, &messages) - .map_err(|err| { - AgentError::Other(format!("batch append messages error: {}", err)) - })?; - for message in messages { self.add_message(chat_id, message); } Ok(()) } - /// 将消息保存到指定话题(直接写入数据库,不更新内存历史) - /// 用于异步执行结果保存到原始话题的场景 + /// 将消息保存到指定话题 + /// 每条消息已通过 PersistingEmittedMessageHandler 逐条持久化,此处仅保留接口兼容 pub(crate) fn append_to_topic( &self, - chat_id: &str, - topic_id: &str, + _chat_id: &str, + _topic_id: &str, messages: &[ChatMessage], ) -> Result<(), AgentError> { if messages.is_empty() { return Ok(()); } - - let session_id = self.persistent_session_id(chat_id); - self.conversations - .append_messages_batch(&session_id, Some(topic_id), messages) - .map_err(|err| { - AgentError::Other(format!("batch append to topic error: {}", err)) - })?; - Ok(()) } diff --git a/src/gateway/ws.rs b/src/gateway/ws.rs index b843181..351150c 100644 --- a/src/gateway/ws.rs +++ b/src/gateway/ws.rs @@ -512,6 +512,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option { @@ -532,6 +533,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option Option None, } diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 289e1e1..a96c13a 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -79,6 +79,8 @@ pub enum WsOutbound { role: String, #[serde(default, skip_serializing_if = "Vec::is_empty")] attachments: Vec, + #[serde(default, skip_serializing_if = "Option::is_none")] + subagent_task_id: Option, }, #[serde(rename = "tool_call")] ToolCall { @@ -109,6 +111,8 @@ pub enum WsOutbound { content: String, role: String, resume_hint: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + subagent_task_id: Option, }, #[serde(rename = "error")] Error { code: String, message: String }, diff --git a/src/protocol/ws_adapter.rs b/src/protocol/ws_adapter.rs index 9c67821..25475f9 100644 --- a/src/protocol/ws_adapter.rs +++ b/src/protocol/ws_adapter.rs @@ -21,6 +21,7 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec Vec Vec Vec::new(), @@ -88,6 +91,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve content: message.content.clone(), role: message.role.clone(), attachments, + subagent_task_id: message.metadata.get("subagent_task_id").cloned(), }] } OutboundEventKind::ToolCall => vec![WsOutbound::ToolCall { @@ -126,6 +130,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve content: message.content.clone(), role: message.role.clone(), resume_hint: TOOL_PENDING_RESUME_HINT.to_string(), + subagent_task_id: message.metadata.get("subagent_task_id").cloned(), }], OutboundEventKind::ErrorNotification => vec![WsOutbound::Error { code: "AGENT_ERROR".to_string(), diff --git a/src/tools/task/runtime.rs b/src/tools/task/runtime.rs index 3c2ccc5..d0b6d88 100644 --- a/src/tools/task/runtime.rs +++ b/src/tools/task/runtime.rs @@ -7,7 +7,7 @@ use std::time::Duration; use async_trait::async_trait; use serde::Deserialize; -use crate::agent::{AgentLoop, AgentRuntimeConfig, EmittedMessageHandler, SystemPrompt, SystemPromptContext, SystemPromptProvider}; +use crate::agent::{AgentLoop, AgentRuntimeConfig, EmittedMessageHandler, PersistingEmittedMessageHandler, SystemPrompt, SystemPromptContext, SystemPromptProvider}; use crate::bus::ChatMessage; use crate::bus::message::OutboundMessage; use crate::bus::MessageBus; @@ -227,12 +227,16 @@ impl DefaultSubAgentRuntime { metadata.insert("subagent_task_id".to_string(), session.id.clone()); metadata.insert("is_subagent_event".to_string(), "true".to_string()); - let emitter = Arc::new(SubAgentEmitter { - bus: bus.clone(), - channel_name: session.parent_channel_name.clone(), - chat_id: session.parent_chat_id.clone(), - metadata, - }); + let emitter = Arc::new(PersistingEmittedMessageHandler::new( + SubAgentEmitter { + bus: bus.clone(), + channel_name: session.parent_channel_name.clone(), + chat_id: session.parent_chat_id.clone(), + metadata, + }, + self.conversation_repository.clone(), + session.session_id.clone(), + )); return agent.with_emitted_message_handler(emitter); } @@ -274,11 +278,6 @@ impl DefaultSubAgentRuntime { match result { Ok(Ok(process_result)) => { - // 保存子智能体产生的所有消息到数据库(批量单事务) - self.conversation_repository - .append_messages_batch(&session.session_id, None, &process_result.emitted_messages) - .map_err(TaskError::RepositoryError)?; - let final_message = process_result.final_response; Ok(TaskToolResult { status: "success".to_string(), @@ -324,11 +323,6 @@ impl DefaultSubAgentRuntime { match result { Ok(Ok(process_result)) => { - // 保存子智能体产生的所有消息到数据库(批量单事务) - self.conversation_repository - .append_messages_batch(&session.session_id, None, &process_result.emitted_messages) - .map_err(TaskError::RepositoryError)?; - let final_message = process_result.final_response; Ok(TaskToolResult { status: "success".to_string(), diff --git a/web/src/components/Chat/MessageBubble.tsx b/web/src/components/Chat/MessageBubble.tsx index ee48fd0..cf76695 100644 --- a/web/src/components/Chat/MessageBubble.tsx +++ b/web/src/components/Chat/MessageBubble.tsx @@ -300,6 +300,19 @@ export function MessageBubble({ message, onNavigateToSubAgent }: MessageBubblePr 点击查看子智能体输出 + ) : isTaskTool && message.subagentTaskId ? ( +
+ +
) : hasResult ? (
点击查看工具结果 @@ -378,6 +391,18 @@ export function MessageBubble({ message, onNavigateToSubAgent }: MessageBubblePr {!hasArgs && !hasResult && (
等待工具执行...
)} + {isTaskTool && message.subagentTaskId && ( + + )} )}
diff --git a/web/src/hooks/useChat.ts b/web/src/hooks/useChat.ts index 1266afe..d715ee4 100644 --- a/web/src/hooks/useChat.ts +++ b/web/src/hooks/useChat.ts @@ -98,8 +98,9 @@ export function useChat(): UseChatReturn { // Extract subagent_task_id from a message if present const getSubagentTaskId = (message: WsOutbound): string | undefined => { - if (message.type === 'tool_call' || message.type === 'tool_result') { - return (message as ToolCall | ToolResult).subagent_task_id + if (message.type === 'tool_call' || message.type === 'tool_result' + || message.type === 'tool_pending' || message.type === 'assistant_response') { + return (message as ToolCall | ToolResult | ToolPending | AssistantResponse).subagent_task_id } return undefined } @@ -117,6 +118,7 @@ export function useChat(): UseChatReturn { timestamp: Date.now(), type: 'message', attachments: msg.attachments, + subagentTaskId: msg.subagent_task_id, } } case 'tool_call': { @@ -156,6 +158,7 @@ export function useChat(): UseChatReturn { type: 'tool_pending', toolName: msg.tool_name, toolCallId: msg.tool_call_id, + subagentTaskId: msg.subagent_task_id, } } case 'error': { @@ -215,8 +218,20 @@ export function useChat(): UseChatReturn { return } - // In main view, skip sub-agent messages (they belong to sub-agent view) - if (getSubagentTaskId(message)) { + // In main view, skip sub-agent messages (they belong to sub-agent view). + // But use the task_id to associate with the running task tool card. + const msgSubagentTaskId = getSubagentTaskId(message) + if (msgSubagentTaskId) { + setMessages((prev) => { + for (let i = prev.length - 1; i >= 0; i--) { + if (prev[i].type === 'tool_call' && prev[i].toolName === 'task' && !prev[i].subagentTaskId) { + const updated = [...prev] + updated[i] = { ...updated[i], subagentTaskId: msgSubagentTaskId } + return updated + } + } + return prev + }) return } diff --git a/web/src/types/protocol.ts b/web/src/types/protocol.ts index 5804966..b21cd8a 100644 --- a/web/src/types/protocol.ts +++ b/web/src/types/protocol.ts @@ -41,6 +41,7 @@ export interface AssistantResponse { content: string role: string attachments?: Attachment[] + subagent_task_id?: string } export interface ToolCall { @@ -72,6 +73,7 @@ export interface ToolPending { content: string role: string resume_hint: string + subagent_task_id?: string } export interface WsError {