From 06756a4816ac7da0b7da1faee625f8f478731679 Mon Sep 17 00:00:00 2001 From: oudecheng <13802883547@139.com> Date: Fri, 29 May 2026 18:09:00 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E6=B6=88=E6=81=AF?= =?UTF-8?q?=E6=8C=81=E4=B9=85=E5=8C=96=E7=BC=BA=E5=A4=B1=20topic=20?= =?UTF-8?q?=E5=85=B3=E8=81=94=E5=92=8C=20assistant=20=E6=96=87=E6=9C=AC?= =?UTF-8?q?=E4=B8=A2=E5=A4=B1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - PersistingEmittedMessageHandler 新增 topic_id 参数,使用 append_message_with_topic 替代 append_message - agent_loop 的所有退出路径中为最终 assistant 文本添加 emit_live_tool_call_message - 更新 finalize_result filter,live_emitter 存在时抑制所有消息的 post-loop 广播 Co-Authored-By: Claude Opus 4.7 --- src/agent/agent_loop.rs | 11 +++++++++-- src/gateway/execution.rs | 7 ++----- src/gateway/processor.rs | 1 + src/tools/task/runtime.rs | 1 + 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index a69ff2d..efb0249 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -663,6 +663,7 @@ pub struct PersistingEmittedMessageHandler { inner: H, conversation_repository: Arc, session_id: String, + topic_id: Option, } impl PersistingEmittedMessageHandler { @@ -670,8 +671,9 @@ impl PersistingEmittedMessageHandler { inner: H, conversation_repository: Arc, session_id: impl Into, + topic_id: Option, ) -> Self { - Self { inner, conversation_repository, session_id: session_id.into() } + Self { inner, conversation_repository, session_id: session_id.into(), topic_id } } } @@ -679,7 +681,7 @@ impl PersistingEmittedMessageHandler { impl EmittedMessageHandler for PersistingEmittedMessageHandler { async fn handle(&self, message: ChatMessage) { if let Err(e) = self.conversation_repository - .append_message(&self.session_id, &message) + .append_message_with_topic(&self.session_id, self.topic_id.as_deref(), &message) { tracing::error!(error = %e, session_id = %self.session_id, "Failed to persist emitted message"); @@ -915,6 +917,7 @@ impl AgentLoop { let assistant_message = ChatMessage::assistant(recoverable_llm_message(&e.to_string())); emitted_messages.push(assistant_message.clone()); + self.emit_live_tool_call_message(assistant_message.clone()).await; return Ok(AgentProcessResult { final_response: assistant_message, emitted_messages, @@ -939,6 +942,7 @@ impl AgentLoop { ChatMessage::assistant(response.content) }; emitted_messages.push(assistant_message.clone()); + self.emit_live_tool_call_message(assistant_message.clone()).await; return Ok(AgentProcessResult { final_response: assistant_message, emitted_messages, @@ -1044,6 +1048,7 @@ impl AgentLoop { tool_call.name, )); emitted_messages.push(assistant_message.clone()); + self.emit_live_tool_call_message(assistant_message.clone()).await; return Ok(AgentProcessResult { final_response: assistant_message, emitted_messages, @@ -1120,6 +1125,7 @@ impl AgentLoop { ChatMessage::assistant(response.content) }; emitted_messages.push(assistant_message.clone()); + self.emit_live_tool_call_message(assistant_message.clone()).await; Ok(AgentProcessResult { final_response: assistant_message, emitted_messages, @@ -1135,6 +1141,7 @@ impl AgentLoop { ); let final_message = ChatMessage::assistant(recoverable_llm_message(&e.to_string())); emitted_messages.push(final_message.clone()); + self.emit_live_tool_call_message(final_message.clone()).await; Ok(AgentProcessResult { final_response: final_message, emitted_messages, diff --git a/src/gateway/execution.rs b/src/gateway/execution.rs index 7886419..e92dcf2 100644 --- a/src/gateway/execution.rs +++ b/src/gateway/execution.rs @@ -157,11 +157,8 @@ impl AgentExecutionService { .emitted_messages .iter() .filter(|message| { - // 当存在 live_emitter 时,工具调用和工具结果已在 loop 中实时广播 - // 只保留最终 assistant 文本通过 post-loop 路径发送 - let already_emitted = request.suppress_live_tool_calls - && (message.is_assistant_tool_call_message() || message.role == "tool"); - !already_emitted + // 当存在 live_emitter 时,所有消息已在 loop 中实时广播,不需要 post-loop 发送 + !request.suppress_live_tool_calls && should_display_message_to_user(self.show_tool_results, message) }) .flat_map(|message| { diff --git a/src/gateway/processor.rs b/src/gateway/processor.rs index b30aa01..87c734e 100644 --- a/src/gateway/processor.rs +++ b/src/gateway/processor.rs @@ -227,6 +227,7 @@ impl InboundProcessor { ), self.session_manager.store(), &session_id, + current_topic.clone(), )); match self diff --git a/src/tools/task/runtime.rs b/src/tools/task/runtime.rs index d0b6d88..e811cb4 100644 --- a/src/tools/task/runtime.rs +++ b/src/tools/task/runtime.rs @@ -236,6 +236,7 @@ impl DefaultSubAgentRuntime { }, self.conversation_repository.clone(), session.session_id.clone(), + session.parent_topic_id.clone(), )); return agent.with_emitted_message_handler(emitter);