diff --git a/src/gateway/execution.rs b/src/gateway/execution.rs index e6430ee..b58792f 100644 --- a/src/gateway/execution.rs +++ b/src/gateway/execution.rs @@ -38,6 +38,7 @@ pub(crate) struct FinalizeAgentResultRequest<'a> { pub(crate) metadata: &'a HashMap, pub(crate) suppress_live_tool_calls: bool, pub(crate) execution_kind: &'a str, + pub(crate) original_topic_id: Option, } pub(crate) struct FinalizedAgentResult { @@ -78,10 +79,14 @@ impl AgentExecutionService { session: &mut Session, request: FinalizeAgentResultRequest<'_>, ) -> Result { - if !session.matches_current_user_turn(request.chat_id, request.user_message) { + // 检查是否是最新的用户回合 + let is_current_turn = + session.matches_current_user_turn(request.chat_id, request.user_message); + + if !is_current_turn { let (latest_user_id, latest_user_preview, compression_in_flight, history_len) = session.stale_result_diagnostics(request.chat_id); - tracing::warn!( + tracing::info!( channel = %request.channel_name, chat_id = %request.chat_id, user_message_id = %request.user_message.id, @@ -90,41 +95,66 @@ impl AgentExecutionService { compression_in_flight, history_len, execution_kind = %request.execution_kind, - "Skipping stale agent result because a newer user message is already present" + original_topic_id = ?request.original_topic_id, + "User switched topic during agent execution - saving result to original topic" ); - - return Ok(FinalizedAgentResult { - outbound_messages: Vec::new(), - should_schedule_compaction: false, - }); } - session - .append_persisted_messages(request.chat_id, request.result.emitted_messages.clone())?; + // 确定保存消息的话题 ID + // 如果是最新回合,使用当前话题;否则使用原始话题 + let target_topic_id = if is_current_turn { + session.current_topic(request.chat_id) + } else { + request.original_topic_id.as_deref() + }; - let outbound_messages = request - .result - .emitted_messages - .iter() - .filter(|message| { - (!message.is_assistant_tool_call_message() || !request.suppress_live_tool_calls) - && should_display_message_to_user(self.show_tool_results, message) - }) - .flat_map(|message| { - OutboundMessage::from_chat_message( - request.channel_name, - request.chat_id, - None, // session_id - None, - request.metadata, - message, - ) - }) - .collect(); + // 将结果消息保存到确定的话题 + if let Some(topic_id) = target_topic_id { + if let Err(err) = session.append_messages_to_topic( + request.chat_id, + topic_id, + &request.result.emitted_messages, + ) { + tracing::error!( + error = %err, + topic_id = %topic_id, + "Failed to append messages to topic" + ); + } + } + + // 只有当是最新回合时才发送 outbound 消息给用户 + // 如果用户已经切换到其他话题,只保存结果,不发送消息(避免打扰) + let outbound_messages = if is_current_turn { + request + .result + .emitted_messages + .iter() + .filter(|message| { + (!message.is_assistant_tool_call_message() || !request.suppress_live_tool_calls) + && should_display_message_to_user(self.show_tool_results, message) + }) + .flat_map(|message| { + OutboundMessage::from_chat_message( + request.channel_name, + request.chat_id, + None, // session_id + None, + request.metadata, + message, + ) + }) + .collect() + } else { + Vec::new() + }; + + // 只有当是最新回合时才触发历史压缩 + let should_schedule_compaction = is_current_turn; Ok(FinalizedAgentResult { outbound_messages, - should_schedule_compaction: true, + should_schedule_compaction, }) } @@ -132,7 +162,7 @@ impl AgentExecutionService { &self, request: MessageExecutionRequest<'_>, ) -> Result, AgentError> { - let (history, agent, user_message, user_message_count) = { + let (history, agent, user_message, user_message_count, original_topic_id) = { let mut session_guard = request.session.lock().await; session_guard.ensure_persistent_session(request.chat_id)?; @@ -156,6 +186,11 @@ impl AgentExecutionService { let history_before = session_guard.get_or_create_history(request.chat_id).clone(); let user_message_count = history_before.iter().filter(|m| m.role == "user").count(); + // 在添加用户消息前,记录当前话题 ID + let original_topic_id = session_guard + .current_topic(request.chat_id) + .map(|s| s.to_string()); + let user_message = session_guard.create_user_message(&enriched_content, media_refs); session_guard.append_persisted_message(request.chat_id, user_message.clone())?; @@ -172,7 +207,7 @@ impl AgentExecutionService { agent = agent.with_emitted_message_handler(handler); } - (history, agent, user_message, user_message_count) + (history, agent, user_message, user_message_count, original_topic_id) }; // 构建系统提示词上下文 @@ -195,6 +230,7 @@ impl AgentExecutionService { metadata: &metadata, suppress_live_tool_calls: request.live_emitter.is_some(), execution_kind: "message", + original_topic_id, }, ) .await @@ -204,7 +240,7 @@ impl AgentExecutionService { &self, request: ScheduledExecutionRequest<'_>, ) -> Result, AgentError> { - let (history, agent, user_message, user_message_count) = { + let (history, agent, user_message, user_message_count, original_topic_id) = { let mut session_guard = request.session.lock().await; session_guard.ensure_persistent_session(request.chat_id)?; @@ -230,6 +266,11 @@ impl AgentExecutionService { let history_before = session_guard.get_or_create_history(request.chat_id).clone(); let user_message_count = history_before.iter().filter(|m| m.role == "user").count(); + // 在添加用户消息前,记录当前话题 ID + let original_topic_id = session_guard + .current_topic(request.chat_id) + .map(|s| s.to_string()); + let user_message = session_guard.create_user_message(request.prompt, Vec::new()); session_guard.append_persisted_message(request.chat_id, user_message.clone())?; @@ -239,13 +280,13 @@ impl AgentExecutionService { let agent = session_guard.create_agent_with_provider_config( request.chat_id, - request.notification_chat_id, // 传入真实 chat_id + request.notification_chat_id, // 传入真实 chat_id Some(request.sender_id), Some(&user_message.id), request.provider_config.clone(), )?; - (history, agent, user_message, user_message_count) + (history, agent, user_message, user_message_count, original_topic_id) }; // 构建系统提示词上下文 @@ -267,6 +308,7 @@ impl AgentExecutionService { metadata: request.metadata, suppress_live_tool_calls: false, execution_kind: "scheduled_task", + original_topic_id, }, ) .await diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 0e86f29..8f87532 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -305,6 +305,16 @@ impl Session { self.history.append_persisted_messages(chat_id, messages) } + /// 将消息保存到指定话题(直接写入数据库,不更新内存历史) + pub fn append_messages_to_topic( + &self, + chat_id: &str, + topic_id: &str, + messages: &[ChatMessage], + ) -> Result<(), AgentError> { + self.history.append_to_topic(chat_id, topic_id, messages) + } + pub fn create_user_message(&self, content: &str, media_refs: Vec) -> ChatMessage { if media_refs.is_empty() { ChatMessage::user(content) diff --git a/src/gateway/session_history.rs b/src/gateway/session_history.rs index 3893929..dccc53e 100644 --- a/src/gateway/session_history.rs +++ b/src/gateway/session_history.rs @@ -212,6 +212,27 @@ impl SessionHistory { Ok(()) } + /// 将消息保存到指定话题(直接写入数据库,不更新内存历史) + /// 用于异步执行结果保存到原始话题的场景 + pub(crate) fn append_to_topic( + &self, + chat_id: &str, + topic_id: &str, + messages: &[ChatMessage], + ) -> Result<(), AgentError> { + let session_id = self.persistent_session_id(chat_id); + + for message in messages { + self.conversations + .append_message_with_topic(&session_id, Some(topic_id), message) + .map_err(|err| { + AgentError::Other(format!("append message to topic error: {}", err)) + })?; + } + + Ok(()) + } + pub(crate) fn latest_user_message(&self, chat_id: &str) -> Option<&ChatMessage> { self.get_history(chat_id) .and_then(|history| history.iter().rev().find(|message| message.role == "user"))