diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index 1beba50..63af2a6 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -853,121 +853,6 @@ impl AgentLoop { &self.tools } - /// Sanitize message history by removing assistant messages with tool_calls - /// that don't have corresponding tool result messages, at ANY position in - /// the history (not just trailing). - /// - /// Incomplete sequences can appear in the middle of history when: - /// 1. The process was interrupted mid-execution (before commit cb58d9f), - /// then a new user message was appended, burying the orphan. - /// 2. History compaction preserves orphaned tool_calls from a pre-fix era - /// or from a race condition between persistence and snapshot. - /// - /// Sending such incomplete sequences to the API causes errors like - /// "insufficient tool messages following tool_calls message". - /// - /// Returns the number of messages removed. - fn sanitize_incomplete_tool_call_sequences(messages: &mut Vec) -> usize { - use std::collections::HashSet; - - let mut removed = 0; - - // Phase 1: Single reverse pass to find ALL assistant messages with - // incomplete tool_calls, regardless of position. - // - // Scanning right-to-left means we encounter tool results before their - // parent assistants, so we naturally know which tool_call_ids have - // corresponding results. - let mut resolved_ids: HashSet = HashSet::new(); - let mut with_parent: HashSet = HashSet::new(); - let mut remove_indices: Vec = Vec::new(); - - // Phase 1: Reverse pass — collect tool result IDs first, then - // validate each assistant's tool_calls against already-seen results. - // - // Because we scan right-to-left, any tool result we've already seen - // appears AFTER the current message in forward order. This correctly - // identifies which assistant tool_calls have corresponding results. - for i in (0..messages.len()).rev() { - let msg = &messages[i]; - - if msg.role == "tool" { - if let Some(ref tc_id) = msg.tool_call_id { - resolved_ids.insert(tc_id.clone()); - } - } - - if msg.role == "assistant" - && msg.tool_calls.as_ref().map_or(false, |calls| !calls.is_empty()) - { - let tool_calls = msg.tool_calls.as_ref().unwrap(); - let all_have_results = tool_calls - .iter() - .all(|tc| resolved_ids.contains(&tc.id)); - - if all_have_results { - for tc in tool_calls.iter() { - with_parent.insert(tc.id.clone()); - } - } else { - let missing_count = tool_calls - .iter() - .filter(|tc| !resolved_ids.contains(&tc.id)) - .count(); - - tracing::warn!( - tool_call_count = tool_calls.len(), - missing_tool_results = missing_count, - message_id = %msg.id, - message_index = i, - "Removing assistant message with incomplete tool call sequence — \ - tool results were never persisted (likely due to process interruption \ - or history compaction preserving an orphan)" - ); - - remove_indices.push(i); - } - } - } - - // Remove in descending index order to avoid shifting - for &idx in &remove_indices { - messages.remove(idx); - removed += 1; - } - - // Phase 2: Forward pass to remove ALL orphaned tool messages (not just - // trailing ones). A tool message is orphaned if its tool_call_id has no - // matching parent assistant remaining in the history. - if !with_parent.is_empty() || !resolved_ids.is_empty() { - let mut i = 0; - while i < messages.len() { - let msg = &messages[i]; - if msg.role == "tool" { - let is_orphaned = match &msg.tool_call_id { - Some(tc_id) => !with_parent.contains(tc_id), - None => true, - }; - if is_orphaned { - tracing::warn!( - tool_call_id = ?msg.tool_call_id, - message_id = %msg.id, - message_index = i, - "Removing orphaned tool result message — its parent assistant \ - tool_calls message was removed or never persisted" - ); - messages.remove(i); - removed += 1; - continue; - } - } - i += 1; - } - } - - removed - } - /// Process a message using the provided conversation history. /// History management is handled externally by SessionManager. /// @@ -993,7 +878,7 @@ impl AgentLoop { // Sanitize: remove any trailing incomplete tool call sequences // that may have been persisted before a process interruption. - Self::sanitize_incomplete_tool_call_sequences(&mut messages); + crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); // Track tool calls for loop detection let mut loop_detector = LoopDetector::new(LoopDetectorConfig::default()); @@ -1936,7 +1821,7 @@ mod tests { // Tool result for call_1 is MISSING — incomplete sequence ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); assert_eq!(removed, 1); assert_eq!(messages.len(), 1); assert_eq!(messages[0].role, "user"); @@ -1957,7 +1842,7 @@ mod tests { ChatMessage::tool("call_1", "calculator", "2"), ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); assert_eq!(removed, 0); assert_eq!(messages.len(), 3); } @@ -1987,7 +1872,7 @@ mod tests { // Also missing tool result for call_2 ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); // Should remove both trailing assistant messages with incomplete tool calls assert_eq!(removed, 2); assert_eq!(messages.len(), 2); @@ -2021,7 +1906,7 @@ mod tests { // Missing tool result for call_2 ]; - let removed_count = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed_count = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); // Phase 1 removes the assistant message (call_2 has no result). // Phase 2 removes the orphaned tool result for call_1 (its parent // assistant was removed). @@ -2038,7 +1923,7 @@ mod tests { ChatMessage::user("how are you"), ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); assert_eq!(removed, 0); assert_eq!(messages.len(), 3); } @@ -2046,7 +1931,7 @@ mod tests { #[test] fn test_sanitize_handles_empty_messages() { let mut messages: Vec = vec![]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); assert_eq!(removed, 0); } @@ -2058,7 +1943,7 @@ mod tests { ChatMessage::tool("call_1", "calculator", "2"), ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); assert_eq!(removed, 1); assert!(messages.is_empty()); } @@ -2086,7 +1971,7 @@ mod tests { ChatMessage::tool("call_2", "read", "contents of README"), ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); assert_eq!(removed, 0); assert_eq!(messages.len(), 4); } @@ -2119,7 +2004,7 @@ mod tests { // Missing tool result for call_2 — only THIS sequence should be trimmed ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); assert_eq!(removed, 1); // First complete sequence preserved (5 messages), user message for second // question preserved @@ -2157,7 +2042,7 @@ mod tests { ChatMessage::tool("call_2", "read", "file contents"), ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); // call_1 assistant removed (1), rest preserved (4) assert_eq!(removed, 1); assert_eq!(messages.len(), 4); @@ -2197,7 +2082,7 @@ mod tests { ChatMessage::user("third"), ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); assert_eq!(removed, 2); assert_eq!(messages.len(), 3); assert_eq!(messages[0].content, "first"); @@ -2242,7 +2127,7 @@ mod tests { ChatMessage::tool("call_valid", "good_tool", "valid result"), ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); // call_has_result/call_no_result assistant (1) + its orphaned tool result (1) = 2 removed assert_eq!(removed, 2); assert_eq!(messages.len(), 4); @@ -2304,7 +2189,7 @@ mod tests { ChatMessage::assistant("task 3 is done"), ]; - let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages); + let removed = crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut messages); // Removed: assistant with t2_call_1/t2_call_2 (1 message) assert_eq!(removed, 1); // Original 10 messages - 1 = 9 diff --git a/src/bus/message.rs b/src/bus/message.rs index 87f5778..49b0912 100644 --- a/src/bus/message.rs +++ b/src/bus/message.rs @@ -1,5 +1,5 @@ use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use crate::domain::messages::ToolCall; @@ -237,6 +237,123 @@ impl ChatMessage { } } +// ============================================================================ +// Message sanitization +// ============================================================================ + +/// Sanitize message history by removing assistant messages with `tool_calls` +/// that don't have corresponding tool result messages, at ANY position in +/// the history (not just trailing). +/// +/// Incomplete sequences can appear in the middle of history when: +/// 1. The process was interrupted mid-execution, then a new user message +/// was appended, burying the orphan. +/// 2. History compaction preserves orphaned `tool_calls` from a pre-fix era +/// or from a race condition between persistence and snapshot. +/// +/// Sending such incomplete sequences to the API causes errors like +/// "insufficient tool messages following tool_calls message". +/// +/// Returns the number of messages removed. +pub(crate) fn sanitize_incomplete_tool_call_sequences(messages: &mut Vec) -> usize { + let mut removed = 0; + + // Phase 1: Single reverse pass to find ALL assistant messages with + // incomplete tool_calls, regardless of position. + // + // Scanning right-to-left means we encounter tool results before their + // parent assistants, so we naturally know which tool_call_ids have + // corresponding results. + let mut resolved_ids: HashSet = HashSet::new(); + let mut with_parent: HashSet = HashSet::new(); + let mut remove_indices: Vec = Vec::new(); + + // Reverse pass — collect tool result IDs first, then validate each + // assistant's tool_calls against already-seen results. + // + // Because we scan right-to-left, any tool result we've already seen + // appears AFTER the current message in forward order. This correctly + // identifies which assistant tool_calls have corresponding results. + for i in (0..messages.len()).rev() { + let msg = &messages[i]; + + if msg.role == "tool" { + if let Some(ref tc_id) = msg.tool_call_id { + resolved_ids.insert(tc_id.clone()); + } + } + + if msg.role == "assistant" + && msg.tool_calls.as_ref().map_or(false, |calls| !calls.is_empty()) + { + let tool_calls = msg.tool_calls.as_ref().unwrap(); + let all_have_results = tool_calls + .iter() + .all(|tc| resolved_ids.contains(&tc.id)); + + if all_have_results { + for tc in tool_calls.iter() { + with_parent.insert(tc.id.clone()); + } + } else { + let missing_count = tool_calls + .iter() + .filter(|tc| !resolved_ids.contains(&tc.id)) + .count(); + + tracing::warn!( + tool_call_count = tool_calls.len(), + missing_tool_results = missing_count, + message_id = %msg.id, + message_index = i, + "Removing assistant message with incomplete tool call sequence — \ + tool results were never persisted (likely due to process interruption \ + or history compaction preserving an orphan)" + ); + + remove_indices.push(i); + } + } + } + + // Remove in descending index order to avoid shifting + for &idx in &remove_indices { + messages.remove(idx); + removed += 1; + } + + // Phase 2: Forward pass to remove ALL orphaned tool messages (not just + // trailing ones). A tool message is orphaned if its tool_call_id has no + // matching parent assistant remaining in the history. + if !with_parent.is_empty() || !resolved_ids.is_empty() { + let mut i = 0; + while i < messages.len() { + let msg = &messages[i]; + if msg.role == "tool" { + let is_orphaned = match &msg.tool_call_id { + Some(tc_id) => !with_parent.contains(tc_id), + None => true, + }; + if is_orphaned { + tracing::warn!( + tool_call_id = ?msg.tool_call_id, + message_id = %msg.id, + message_index = i, + "Removing orphaned tool result message — its parent assistant \ + tool_calls message was removed or never persisted" + ); + messages.remove(i); + removed += 1; + continue; + } + } + i += 1; + } + } + + removed +} + // ============================================================================ // InboundMessage - Message from Channel to Bus (user input) // ============================================================================ diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 95f2a96..956a9f9 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -729,29 +729,45 @@ impl SessionStore { let delta_messages = load_messages_between(&tx, session_id, snapshot_end_seq, current_max_seq)?; - let mut next_seq = current_max_seq + 1; let now = current_timestamp(); + + // Collect all new messages first, then sanitize incomplete tool call + // sequences before writing to DB. This prevents orphaned tool_calls + // (without corresponding tool results) from being persisted permanently + // when compaction preserves an incomplete sequence from the snapshot or + // captures a partial sequence from delta messages. + let mut new_messages: Vec = Vec::new(); + + for message in preserved_system_messages { + new_messages.push(clone_message_for_compaction(message, message.timestamp)); + } + + new_messages.push(clone_message_for_compaction(summary_message, now)); + + for message in preserved_messages.iter().chain(delta_messages.iter()) { + new_messages.push(clone_message_for_compaction(message, message.timestamp)); + } + + let removed = + crate::bus::message::sanitize_incomplete_tool_call_sequences(&mut new_messages); + if removed > 0 { + tracing::warn!( + removed_count = removed, + session_id = %session_id, + "Compaction removed incomplete tool call sequences from new history" + ); + } + + // Write sanitized messages to DB + let mut next_seq = current_max_seq + 1; let mut inserted_count = 0_i64; let mut active_user_turn_count = 0_i64; - for message in preserved_system_messages { - let copied = clone_message_for_compaction(message, message.timestamp); - insert_message_with_seq(&tx, session_id, next_seq, &copied)?; - next_seq += 1; - inserted_count += 1; - } - - let summary_copy = clone_message_for_compaction(summary_message, now); - insert_message_with_seq(&tx, session_id, next_seq, &summary_copy)?; - next_seq += 1; - inserted_count += 1; - - for message in preserved_messages.iter().chain(delta_messages.iter()) { - let copied = clone_message_for_compaction(message, message.timestamp); - if copied.role == "user" { + for message in &new_messages { + if message.role == "user" { active_user_turn_count += 1; } - insert_message_with_seq(&tx, session_id, next_seq, &copied)?; + insert_message_with_seq(&tx, session_id, next_seq, message)?; next_seq += 1; inserted_count += 1; } diff --git a/web/src/components/Chat/MessageBubble.tsx b/web/src/components/Chat/MessageBubble.tsx index 80a25f5..080dbe1 100644 --- a/web/src/components/Chat/MessageBubble.tsx +++ b/web/src/components/Chat/MessageBubble.tsx @@ -379,11 +379,6 @@ export function MessageBubble({ message, onNavigateToSubAgent }: MessageBubblePr )} - {!taskResult && !isTaskTool && !hasResult && ( -
- 等待工具执行... -
- )} {isTaskTool && !taskResult && !message.subagentTaskId && (
子智能体正在执行... @@ -455,9 +450,6 @@ export function MessageBubble({ message, onNavigateToSubAgent }: MessageBubblePr
)} - {!hasArgs && !hasResult && ( -
等待工具执行...
- )} {isTaskTool && message.subagentTaskId && ( + )} + + {/* 回到底部 */} + {showScrollToBottom && ( + + )} + ) }