feat: 优化消息历史清理逻辑,支持移除任意位置的未完成工具调用序列
This commit is contained in:
parent
0d6880f6a3
commit
a11fdac86a
@ -838,118 +838,116 @@ impl AgentLoop {
|
|||||||
&self.tools
|
&self.tools
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sanitize message history by removing trailing assistant messages with
|
/// Sanitize message history by removing assistant messages with tool_calls
|
||||||
/// tool_calls that don't have corresponding tool result messages.
|
/// that don't have corresponding tool result messages, at ANY position in
|
||||||
|
/// the history (not just trailing).
|
||||||
///
|
///
|
||||||
/// This can happen if the process was interrupted mid-execution: the
|
/// Incomplete sequences can appear in the middle of history when:
|
||||||
/// assistant message with tool_calls was persisted but the tool results
|
/// 1. The process was interrupted mid-execution (before commit cb58d9f),
|
||||||
/// were not. Sending such incomplete sequences to the API causes errors
|
/// then a new user message was appended, burying the orphan.
|
||||||
/// like "insufficient tool messages following tool_calls message".
|
/// 2. History compaction preserves orphaned tool_calls from a pre-fix era
|
||||||
|
/// or from a race condition between persistence and snapshot.
|
||||||
|
///
|
||||||
|
/// Sending such incomplete sequences to the API causes errors like
|
||||||
|
/// "insufficient tool messages following tool_calls message".
|
||||||
///
|
///
|
||||||
/// Returns the number of messages removed.
|
/// Returns the number of messages removed.
|
||||||
fn sanitize_incomplete_tool_call_sequences(messages: &mut Vec<ChatMessage>) -> usize {
|
fn sanitize_incomplete_tool_call_sequences(messages: &mut Vec<ChatMessage>) -> usize {
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
let mut removed = 0;
|
let mut removed = 0;
|
||||||
|
|
||||||
// Phase 1: Remove trailing assistant messages with tool_calls that lack
|
// Phase 1: Single reverse pass to find ALL assistant messages with
|
||||||
// corresponding tool result messages. We loop because removing one may
|
// incomplete tool_calls, regardless of position.
|
||||||
// expose another incomplete sequence.
|
//
|
||||||
loop {
|
// Scanning right-to-left means we encounter tool results before their
|
||||||
let last_assistant_idx = match messages.iter().rposition(|m| {
|
// parent assistants, so we naturally know which tool_call_ids have
|
||||||
m.role == "assistant"
|
// corresponding results.
|
||||||
&& m.tool_calls
|
let mut resolved_ids: HashSet<String> = HashSet::new();
|
||||||
.as_ref()
|
let mut with_parent: HashSet<String> = HashSet::new();
|
||||||
.map_or(false, |calls| !calls.is_empty())
|
let mut remove_indices: Vec<usize> = Vec::new();
|
||||||
}) {
|
|
||||||
Some(idx) => idx,
|
|
||||||
None => break,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Collect all tool_call_ids from this assistant message
|
// Phase 1: Reverse pass — collect tool result IDs first, then
|
||||||
let tool_call_ids: Vec<&str> = messages[last_assistant_idx]
|
// validate each assistant's tool_calls against already-seen results.
|
||||||
.tool_calls
|
//
|
||||||
.as_ref()
|
// Because we scan right-to-left, any tool result we've already seen
|
||||||
.unwrap()
|
// appears AFTER the current message in forward order. This correctly
|
||||||
.iter()
|
// identifies which assistant tool_calls have corresponding results.
|
||||||
.map(|tc| tc.id.as_str())
|
for i in (0..messages.len()).rev() {
|
||||||
.collect();
|
let msg = &messages[i];
|
||||||
|
|
||||||
// Check if ALL tool_call_ids have corresponding tool messages
|
if msg.role == "tool" {
|
||||||
// appearing AFTER this assistant message
|
if let Some(ref tc_id) = msg.tool_call_id {
|
||||||
let all_have_results = tool_call_ids.iter().all(|&tc_id| {
|
resolved_ids.insert(tc_id.clone());
|
||||||
messages[last_assistant_idx + 1..]
|
}
|
||||||
.iter()
|
|
||||||
.any(|m| m.role == "tool" && m.tool_call_id.as_deref() == Some(tc_id))
|
|
||||||
});
|
|
||||||
|
|
||||||
if all_have_results {
|
|
||||||
// Complete sequence found, stop trimming
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let tool_call_count = tool_call_ids.len();
|
if msg.role == "assistant"
|
||||||
let missing_count = tool_call_ids
|
&& msg.tool_calls.as_ref().map_or(false, |calls| !calls.is_empty())
|
||||||
|
{
|
||||||
|
let tool_calls = msg.tool_calls.as_ref().unwrap();
|
||||||
|
let all_have_results = tool_calls
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|&&tc_id| {
|
.all(|tc| resolved_ids.contains(&tc.id));
|
||||||
!messages[last_assistant_idx + 1..]
|
|
||||||
|
if all_have_results {
|
||||||
|
for tc in tool_calls.iter() {
|
||||||
|
with_parent.insert(tc.id.clone());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let missing_count = tool_calls
|
||||||
.iter()
|
.iter()
|
||||||
.any(|m| m.role == "tool" && m.tool_call_id.as_deref() == Some(tc_id))
|
.filter(|tc| !resolved_ids.contains(&tc.id))
|
||||||
})
|
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
tool_call_count,
|
tool_call_count = tool_calls.len(),
|
||||||
missing_tool_results = missing_count,
|
missing_tool_results = missing_count,
|
||||||
message_id = %messages[last_assistant_idx].id,
|
message_id = %msg.id,
|
||||||
|
message_index = i,
|
||||||
"Removing assistant message with incomplete tool call sequence — \
|
"Removing assistant message with incomplete tool call sequence — \
|
||||||
tool results were never persisted (likely due to process interruption)"
|
tool results were never persisted (likely due to process interruption \
|
||||||
|
or history compaction preserving an orphan)"
|
||||||
);
|
);
|
||||||
|
|
||||||
messages.remove(last_assistant_idx);
|
remove_indices.push(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove in descending index order to avoid shifting
|
||||||
|
for &idx in &remove_indices {
|
||||||
|
messages.remove(idx);
|
||||||
removed += 1;
|
removed += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 2: Remove orphaned trailing tool messages that no longer have a
|
// Phase 2: Forward pass to remove ALL orphaned tool messages (not just
|
||||||
// corresponding assistant tool_calls message before them. These are left
|
// trailing ones). A tool message is orphaned if its tool_call_id has no
|
||||||
// over from Phase 1 removals.
|
// matching parent assistant remaining in the history.
|
||||||
//
|
if !with_parent.is_empty() || !resolved_ids.is_empty() {
|
||||||
// We work backwards: a tool message at the end of the sequence is
|
let mut i = 0;
|
||||||
// orphaned if none of the preceding messages is an assistant with a
|
while i < messages.len() {
|
||||||
// matching tool_call in its tool_calls.
|
let msg = &messages[i];
|
||||||
while let Some(last_idx) = messages.last().map(|_| messages.len() - 1) {
|
if msg.role == "tool" {
|
||||||
let last = &messages[last_idx];
|
let is_orphaned = match &msg.tool_call_id {
|
||||||
if last.role != "tool" {
|
Some(tc_id) => !with_parent.contains(tc_id),
|
||||||
break;
|
None => true,
|
||||||
}
|
|
||||||
|
|
||||||
let tool_id = match &last.tool_call_id {
|
|
||||||
Some(id) => id.as_str(),
|
|
||||||
None => break, // tool message without tool_call_id — shouldn't happen, but safe
|
|
||||||
};
|
};
|
||||||
|
if is_orphaned {
|
||||||
// Check if any preceding assistant message has this tool_call_id in
|
|
||||||
// its tool_calls
|
|
||||||
let has_parent = messages[..last_idx].iter().any(|m| {
|
|
||||||
m.role == "assistant"
|
|
||||||
&& m.tool_calls
|
|
||||||
.as_ref()
|
|
||||||
.map_or(false, |calls| {
|
|
||||||
calls.iter().any(|tc| tc.id == tool_id)
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
if has_parent {
|
|
||||||
break; // This tool message has a valid parent, stop
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
tool_call_id = %tool_id,
|
tool_call_id = ?msg.tool_call_id,
|
||||||
message_id = %last.id,
|
message_id = %msg.id,
|
||||||
|
message_index = i,
|
||||||
"Removing orphaned tool result message — its parent assistant \
|
"Removing orphaned tool result message — its parent assistant \
|
||||||
tool_calls message was removed or never persisted"
|
tool_calls message was removed or never persisted"
|
||||||
);
|
);
|
||||||
|
messages.remove(i);
|
||||||
messages.remove(last_idx);
|
|
||||||
removed += 1;
|
removed += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
i += 1;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
removed
|
removed
|
||||||
@ -2096,6 +2094,190 @@ mod tests {
|
|||||||
assert_eq!(messages[3].content, "the answer is 2");
|
assert_eq!(messages[3].content, "the answer is 2");
|
||||||
assert_eq!(messages[4].content, "second question");
|
assert_eq!(messages[4].content, "second question");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sanitize_removes_mid_history_orphaned_tool_calls() {
|
||||||
|
// Bug scenario: orphaned tool_calls in the MIDDLE of history,
|
||||||
|
// followed by a complete sequence. The old trailing-only sanitizer
|
||||||
|
// would stop at the complete sequence and never remove the orphan.
|
||||||
|
let mut messages = vec![
|
||||||
|
ChatMessage::user("first question"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"orphaned tool call",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "call_1".to_string(),
|
||||||
|
name: "calculator".to_string(),
|
||||||
|
arguments: serde_json::json!({"expression": "1+1"}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
// Missing tool result for call_1 — ORPHAN in the middle
|
||||||
|
ChatMessage::user("second question"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"valid tool call",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "call_2".to_string(),
|
||||||
|
name: "read".to_string(),
|
||||||
|
arguments: serde_json::json!({"path": "README.md"}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
ChatMessage::tool("call_2", "read", "file contents"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages);
|
||||||
|
// call_1 assistant removed (1), rest preserved (4)
|
||||||
|
assert_eq!(removed, 1);
|
||||||
|
assert_eq!(messages.len(), 4);
|
||||||
|
assert_eq!(messages[0].role, "user");
|
||||||
|
assert_eq!(messages[0].content, "first question");
|
||||||
|
assert_eq!(messages[1].role, "user");
|
||||||
|
assert_eq!(messages[1].content, "second question");
|
||||||
|
// The complete call_2 sequence is preserved
|
||||||
|
assert_eq!(messages[2].role, "assistant");
|
||||||
|
assert_eq!(messages[3].role, "tool");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sanitize_removes_multiple_mid_history_orphans() {
|
||||||
|
// Multiple orphaned tool_calls scattered throughout history
|
||||||
|
let mut messages = vec![
|
||||||
|
ChatMessage::user("first"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"orphan 1",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "orphan_1".to_string(),
|
||||||
|
name: "tool_a".to_string(),
|
||||||
|
arguments: serde_json::json!({}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
// Missing result for orphan_1
|
||||||
|
ChatMessage::user("second"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"orphan 2",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "orphan_2".to_string(),
|
||||||
|
name: "tool_b".to_string(),
|
||||||
|
arguments: serde_json::json!({}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
// Missing result for orphan_2
|
||||||
|
ChatMessage::user("third"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages);
|
||||||
|
assert_eq!(removed, 2);
|
||||||
|
assert_eq!(messages.len(), 3);
|
||||||
|
assert_eq!(messages[0].content, "first");
|
||||||
|
assert_eq!(messages[1].content, "second");
|
||||||
|
assert_eq!(messages[2].content, "third");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sanitize_removes_orphaned_tool_results_for_removed_mid_assistant() {
|
||||||
|
// When removing a mid-history assistant with partial tool results,
|
||||||
|
// both the assistant AND its orphaned tool results must be removed.
|
||||||
|
// Assistant has 2 tool calls, only 1 has a result → assistant is
|
||||||
|
// incomplete → both assistant and its lone tool result removed.
|
||||||
|
let mut messages = vec![
|
||||||
|
ChatMessage::user("first question"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"two tool calls, only one has result",
|
||||||
|
vec![
|
||||||
|
ToolCall {
|
||||||
|
id: "call_has_result".to_string(),
|
||||||
|
name: "tool_a".to_string(),
|
||||||
|
arguments: serde_json::json!({}),
|
||||||
|
},
|
||||||
|
ToolCall {
|
||||||
|
id: "call_no_result".to_string(),
|
||||||
|
name: "tool_b".to_string(),
|
||||||
|
arguments: serde_json::json!({}),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
ChatMessage::tool("call_has_result", "tool_a", "some result"),
|
||||||
|
// Missing tool result for call_no_result → incomplete sequence
|
||||||
|
ChatMessage::user("second question"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"valid tool call",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "call_valid".to_string(),
|
||||||
|
name: "good_tool".to_string(),
|
||||||
|
arguments: serde_json::json!({}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
ChatMessage::tool("call_valid", "good_tool", "valid result"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages);
|
||||||
|
// call_has_result/call_no_result assistant (1) + its orphaned tool result (1) = 2 removed
|
||||||
|
assert_eq!(removed, 2);
|
||||||
|
assert_eq!(messages.len(), 4);
|
||||||
|
assert_eq!(messages[0].content, "first question");
|
||||||
|
assert_eq!(messages[1].content, "second question");
|
||||||
|
assert_eq!(messages[2].role, "assistant");
|
||||||
|
assert_eq!(messages[3].role, "tool");
|
||||||
|
// Verify the remaining tool belongs to call_valid
|
||||||
|
assert_eq!(messages[3].tool_call_id.as_deref(), Some("call_valid"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sanitize_handles_complex_interleaved_history() {
|
||||||
|
// Complete → Orphaned → Complete: a realistic scenario after
|
||||||
|
// history compaction
|
||||||
|
let mut messages = vec![
|
||||||
|
ChatMessage::user("task 1"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"doing task 1",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "t1_call".to_string(),
|
||||||
|
name: "read".to_string(),
|
||||||
|
arguments: serde_json::json!({"path": "a.txt"}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
ChatMessage::tool("t1_call", "read", "content A"),
|
||||||
|
ChatMessage::assistant("task 1 is done"),
|
||||||
|
// End of task 1 — complete sequence
|
||||||
|
|
||||||
|
ChatMessage::user("task 2"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"doing task 2 — this got interrupted",
|
||||||
|
vec![
|
||||||
|
ToolCall {
|
||||||
|
id: "t2_call_1".to_string(),
|
||||||
|
name: "write".to_string(),
|
||||||
|
arguments: serde_json::json!({"path": "b.txt"}),
|
||||||
|
},
|
||||||
|
ToolCall {
|
||||||
|
id: "t2_call_2".to_string(),
|
||||||
|
name: "calculator".to_string(),
|
||||||
|
arguments: serde_json::json!({"expression": "2+2"}),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
// Missing BOTH tool results — process was killed here
|
||||||
|
// End of task 2 — orphaned sequence in the middle
|
||||||
|
|
||||||
|
ChatMessage::user("task 3"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"doing task 3",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "t3_call".to_string(),
|
||||||
|
name: "search".to_string(),
|
||||||
|
arguments: serde_json::json!({"query": "hello"}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
ChatMessage::tool("t3_call", "search", "found results"),
|
||||||
|
ChatMessage::assistant("task 3 is done"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages);
|
||||||
|
// Removed: assistant with t2_call_1/t2_call_2 (1 message)
|
||||||
|
assert_eq!(removed, 1);
|
||||||
|
// Original 10 messages - 1 = 9
|
||||||
|
assert_eq!(messages.len(), 9);
|
||||||
|
assert_eq!(messages[4].content, "task 2");
|
||||||
|
assert_eq!(messages[5].content, "task 3");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user