Compare commits
5 Commits
15dfc48837
...
1d4ebb27a7
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1d4ebb27a7 | ||
|
|
1b571e943f | ||
|
|
a11fdac86a | ||
|
|
0d6880f6a3 | ||
|
|
a783abd0e3 |
@ -646,6 +646,8 @@ pub struct AgentLoop {
|
|||||||
observer: Option<Arc<dyn Observer>>,
|
observer: Option<Arc<dyn Observer>>,
|
||||||
emitted_message_handler: Option<Arc<dyn EmittedMessageHandler>>,
|
emitted_message_handler: Option<Arc<dyn EmittedMessageHandler>>,
|
||||||
max_iterations: usize,
|
max_iterations: usize,
|
||||||
|
/// 取消信号接收端:Agent 在每次迭代开始时检查是否被取消
|
||||||
|
cancel_token: Option<tokio::sync::watch::Receiver<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@ -742,6 +744,7 @@ impl AgentLoop {
|
|||||||
tool_context: ToolContext::default(),
|
tool_context: ToolContext::default(),
|
||||||
observer: None,
|
observer: None,
|
||||||
emitted_message_handler: None,
|
emitted_message_handler: None,
|
||||||
|
cancel_token: None,
|
||||||
max_iterations,
|
max_iterations,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -764,6 +767,7 @@ impl AgentLoop {
|
|||||||
tool_context: ToolContext::default(),
|
tool_context: ToolContext::default(),
|
||||||
observer: None,
|
observer: None,
|
||||||
emitted_message_handler: None,
|
emitted_message_handler: None,
|
||||||
|
cancel_token: None,
|
||||||
max_iterations,
|
max_iterations,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -787,6 +791,7 @@ impl AgentLoop {
|
|||||||
tool_context: ToolContext::default(),
|
tool_context: ToolContext::default(),
|
||||||
observer: None,
|
observer: None,
|
||||||
emitted_message_handler: None,
|
emitted_message_handler: None,
|
||||||
|
cancel_token: None,
|
||||||
max_iterations,
|
max_iterations,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -814,6 +819,7 @@ impl AgentLoop {
|
|||||||
tool_context: ToolContext::default(),
|
tool_context: ToolContext::default(),
|
||||||
observer: None,
|
observer: None,
|
||||||
emitted_message_handler: None,
|
emitted_message_handler: None,
|
||||||
|
cancel_token: None,
|
||||||
max_iterations,
|
max_iterations,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -834,122 +840,129 @@ impl AgentLoop {
|
|||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 设置取消信号接收端。
|
||||||
|
///
|
||||||
|
/// Agent 在每次迭代开始时检查 `cancel_token.has_changed()`,
|
||||||
|
/// 如果已收到取消信号则提前返回。
|
||||||
|
pub fn with_cancel_token(mut self, token: tokio::sync::watch::Receiver<()>) -> Self {
|
||||||
|
self.cancel_token = Some(token);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
pub fn tools(&self) -> &Arc<ToolRegistry> {
|
pub fn tools(&self) -> &Arc<ToolRegistry> {
|
||||||
&self.tools
|
&self.tools
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sanitize message history by removing trailing assistant messages with
|
/// Sanitize message history by removing assistant messages with tool_calls
|
||||||
/// tool_calls that don't have corresponding tool result messages.
|
/// that don't have corresponding tool result messages, at ANY position in
|
||||||
|
/// the history (not just trailing).
|
||||||
///
|
///
|
||||||
/// This can happen if the process was interrupted mid-execution: the
|
/// Incomplete sequences can appear in the middle of history when:
|
||||||
/// assistant message with tool_calls was persisted but the tool results
|
/// 1. The process was interrupted mid-execution (before commit cb58d9f),
|
||||||
/// were not. Sending such incomplete sequences to the API causes errors
|
/// then a new user message was appended, burying the orphan.
|
||||||
/// like "insufficient tool messages following tool_calls message".
|
/// 2. History compaction preserves orphaned tool_calls from a pre-fix era
|
||||||
|
/// or from a race condition between persistence and snapshot.
|
||||||
|
///
|
||||||
|
/// Sending such incomplete sequences to the API causes errors like
|
||||||
|
/// "insufficient tool messages following tool_calls message".
|
||||||
///
|
///
|
||||||
/// Returns the number of messages removed.
|
/// Returns the number of messages removed.
|
||||||
fn sanitize_incomplete_tool_call_sequences(messages: &mut Vec<ChatMessage>) -> usize {
|
fn sanitize_incomplete_tool_call_sequences(messages: &mut Vec<ChatMessage>) -> usize {
|
||||||
|
use std::collections::HashSet;
|
||||||
|
|
||||||
let mut removed = 0;
|
let mut removed = 0;
|
||||||
|
|
||||||
// Phase 1: Remove trailing assistant messages with tool_calls that lack
|
// Phase 1: Single reverse pass to find ALL assistant messages with
|
||||||
// corresponding tool result messages. We loop because removing one may
|
// incomplete tool_calls, regardless of position.
|
||||||
// expose another incomplete sequence.
|
//
|
||||||
loop {
|
// Scanning right-to-left means we encounter tool results before their
|
||||||
let last_assistant_idx = match messages.iter().rposition(|m| {
|
// parent assistants, so we naturally know which tool_call_ids have
|
||||||
m.role == "assistant"
|
// corresponding results.
|
||||||
&& m.tool_calls
|
let mut resolved_ids: HashSet<String> = HashSet::new();
|
||||||
.as_ref()
|
let mut with_parent: HashSet<String> = HashSet::new();
|
||||||
.map_or(false, |calls| !calls.is_empty())
|
let mut remove_indices: Vec<usize> = Vec::new();
|
||||||
}) {
|
|
||||||
Some(idx) => idx,
|
|
||||||
None => break,
|
|
||||||
};
|
|
||||||
|
|
||||||
// Collect all tool_call_ids from this assistant message
|
// Phase 1: Reverse pass — collect tool result IDs first, then
|
||||||
let tool_call_ids: Vec<&str> = messages[last_assistant_idx]
|
// validate each assistant's tool_calls against already-seen results.
|
||||||
.tool_calls
|
//
|
||||||
.as_ref()
|
// Because we scan right-to-left, any tool result we've already seen
|
||||||
.unwrap()
|
// appears AFTER the current message in forward order. This correctly
|
||||||
.iter()
|
// identifies which assistant tool_calls have corresponding results.
|
||||||
.map(|tc| tc.id.as_str())
|
for i in (0..messages.len()).rev() {
|
||||||
.collect();
|
let msg = &messages[i];
|
||||||
|
|
||||||
// Check if ALL tool_call_ids have corresponding tool messages
|
if msg.role == "tool" {
|
||||||
// appearing AFTER this assistant message
|
if let Some(ref tc_id) = msg.tool_call_id {
|
||||||
let all_have_results = tool_call_ids.iter().all(|&tc_id| {
|
resolved_ids.insert(tc_id.clone());
|
||||||
messages[last_assistant_idx + 1..]
|
}
|
||||||
.iter()
|
|
||||||
.any(|m| m.role == "tool" && m.tool_call_id.as_deref() == Some(tc_id))
|
|
||||||
});
|
|
||||||
|
|
||||||
if all_have_results {
|
|
||||||
// Complete sequence found, stop trimming
|
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let tool_call_count = tool_call_ids.len();
|
if msg.role == "assistant"
|
||||||
let missing_count = tool_call_ids
|
&& msg.tool_calls.as_ref().map_or(false, |calls| !calls.is_empty())
|
||||||
.iter()
|
{
|
||||||
.filter(|&&tc_id| {
|
let tool_calls = msg.tool_calls.as_ref().unwrap();
|
||||||
!messages[last_assistant_idx + 1..]
|
let all_have_results = tool_calls
|
||||||
|
.iter()
|
||||||
|
.all(|tc| resolved_ids.contains(&tc.id));
|
||||||
|
|
||||||
|
if all_have_results {
|
||||||
|
for tc in tool_calls.iter() {
|
||||||
|
with_parent.insert(tc.id.clone());
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let missing_count = tool_calls
|
||||||
.iter()
|
.iter()
|
||||||
.any(|m| m.role == "tool" && m.tool_call_id.as_deref() == Some(tc_id))
|
.filter(|tc| !resolved_ids.contains(&tc.id))
|
||||||
})
|
.count();
|
||||||
.count();
|
|
||||||
|
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
tool_call_count,
|
tool_call_count = tool_calls.len(),
|
||||||
missing_tool_results = missing_count,
|
missing_tool_results = missing_count,
|
||||||
message_id = %messages[last_assistant_idx].id,
|
message_id = %msg.id,
|
||||||
"Removing assistant message with incomplete tool call sequence — \
|
message_index = i,
|
||||||
tool results were never persisted (likely due to process interruption)"
|
"Removing assistant message with incomplete tool call sequence — \
|
||||||
);
|
tool results were never persisted (likely due to process interruption \
|
||||||
|
or history compaction preserving an orphan)"
|
||||||
|
);
|
||||||
|
|
||||||
messages.remove(last_assistant_idx);
|
remove_indices.push(i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove in descending index order to avoid shifting
|
||||||
|
for &idx in &remove_indices {
|
||||||
|
messages.remove(idx);
|
||||||
removed += 1;
|
removed += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 2: Remove orphaned trailing tool messages that no longer have a
|
// Phase 2: Forward pass to remove ALL orphaned tool messages (not just
|
||||||
// corresponding assistant tool_calls message before them. These are left
|
// trailing ones). A tool message is orphaned if its tool_call_id has no
|
||||||
// over from Phase 1 removals.
|
// matching parent assistant remaining in the history.
|
||||||
//
|
if !with_parent.is_empty() || !resolved_ids.is_empty() {
|
||||||
// We work backwards: a tool message at the end of the sequence is
|
let mut i = 0;
|
||||||
// orphaned if none of the preceding messages is an assistant with a
|
while i < messages.len() {
|
||||||
// matching tool_call in its tool_calls.
|
let msg = &messages[i];
|
||||||
while let Some(last_idx) = messages.last().map(|_| messages.len() - 1) {
|
if msg.role == "tool" {
|
||||||
let last = &messages[last_idx];
|
let is_orphaned = match &msg.tool_call_id {
|
||||||
if last.role != "tool" {
|
Some(tc_id) => !with_parent.contains(tc_id),
|
||||||
break;
|
None => true,
|
||||||
|
};
|
||||||
|
if is_orphaned {
|
||||||
|
tracing::warn!(
|
||||||
|
tool_call_id = ?msg.tool_call_id,
|
||||||
|
message_id = %msg.id,
|
||||||
|
message_index = i,
|
||||||
|
"Removing orphaned tool result message — its parent assistant \
|
||||||
|
tool_calls message was removed or never persisted"
|
||||||
|
);
|
||||||
|
messages.remove(i);
|
||||||
|
removed += 1;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
i += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
let tool_id = match &last.tool_call_id {
|
|
||||||
Some(id) => id.as_str(),
|
|
||||||
None => break, // tool message without tool_call_id — shouldn't happen, but safe
|
|
||||||
};
|
|
||||||
|
|
||||||
// Check if any preceding assistant message has this tool_call_id in
|
|
||||||
// its tool_calls
|
|
||||||
let has_parent = messages[..last_idx].iter().any(|m| {
|
|
||||||
m.role == "assistant"
|
|
||||||
&& m.tool_calls
|
|
||||||
.as_ref()
|
|
||||||
.map_or(false, |calls| {
|
|
||||||
calls.iter().any(|tc| tc.id == tool_id)
|
|
||||||
})
|
|
||||||
});
|
|
||||||
|
|
||||||
if has_parent {
|
|
||||||
break; // This tool message has a valid parent, stop
|
|
||||||
}
|
|
||||||
|
|
||||||
tracing::warn!(
|
|
||||||
tool_call_id = %tool_id,
|
|
||||||
message_id = %last.id,
|
|
||||||
"Removing orphaned tool result message — its parent assistant \
|
|
||||||
tool_calls message was removed or never persisted"
|
|
||||||
);
|
|
||||||
|
|
||||||
messages.remove(last_idx);
|
|
||||||
removed += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
removed
|
removed
|
||||||
@ -990,6 +1003,25 @@ impl AgentLoop {
|
|||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
tracing::debug!(iteration, "Agent iteration started");
|
tracing::debug!(iteration, "Agent iteration started");
|
||||||
|
|
||||||
|
// 检查取消信号
|
||||||
|
if let Some(ref token) = self.cancel_token {
|
||||||
|
if token.has_changed().unwrap_or(false) {
|
||||||
|
tracing::info!(iteration, "Agent execution cancelled by user");
|
||||||
|
let cancel_message = format!(
|
||||||
|
"\n\n[用户已取消执行。已迭代 {} 次,取消前共生成了 {} 条消息。]",
|
||||||
|
iteration,
|
||||||
|
emitted_messages.len()
|
||||||
|
);
|
||||||
|
let assistant_message = ChatMessage::assistant(cancel_message);
|
||||||
|
emitted_messages.push(assistant_message.clone());
|
||||||
|
self.emit_live_tool_call_message(assistant_message.clone()).await;
|
||||||
|
return Ok(AgentProcessResult {
|
||||||
|
final_response: assistant_message,
|
||||||
|
emitted_messages,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Build request
|
// Build request
|
||||||
let tool_defs = self.tools.get_definitions();
|
let tool_defs = self.tools.get_definitions();
|
||||||
let tools = if tool_defs.is_empty() {
|
let tools = if tool_defs.is_empty() {
|
||||||
@ -2096,6 +2128,190 @@ mod tests {
|
|||||||
assert_eq!(messages[3].content, "the answer is 2");
|
assert_eq!(messages[3].content, "the answer is 2");
|
||||||
assert_eq!(messages[4].content, "second question");
|
assert_eq!(messages[4].content, "second question");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sanitize_removes_mid_history_orphaned_tool_calls() {
|
||||||
|
// Bug scenario: orphaned tool_calls in the MIDDLE of history,
|
||||||
|
// followed by a complete sequence. The old trailing-only sanitizer
|
||||||
|
// would stop at the complete sequence and never remove the orphan.
|
||||||
|
let mut messages = vec![
|
||||||
|
ChatMessage::user("first question"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"orphaned tool call",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "call_1".to_string(),
|
||||||
|
name: "calculator".to_string(),
|
||||||
|
arguments: serde_json::json!({"expression": "1+1"}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
// Missing tool result for call_1 — ORPHAN in the middle
|
||||||
|
ChatMessage::user("second question"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"valid tool call",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "call_2".to_string(),
|
||||||
|
name: "read".to_string(),
|
||||||
|
arguments: serde_json::json!({"path": "README.md"}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
ChatMessage::tool("call_2", "read", "file contents"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages);
|
||||||
|
// call_1 assistant removed (1), rest preserved (4)
|
||||||
|
assert_eq!(removed, 1);
|
||||||
|
assert_eq!(messages.len(), 4);
|
||||||
|
assert_eq!(messages[0].role, "user");
|
||||||
|
assert_eq!(messages[0].content, "first question");
|
||||||
|
assert_eq!(messages[1].role, "user");
|
||||||
|
assert_eq!(messages[1].content, "second question");
|
||||||
|
// The complete call_2 sequence is preserved
|
||||||
|
assert_eq!(messages[2].role, "assistant");
|
||||||
|
assert_eq!(messages[3].role, "tool");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sanitize_removes_multiple_mid_history_orphans() {
|
||||||
|
// Multiple orphaned tool_calls scattered throughout history
|
||||||
|
let mut messages = vec![
|
||||||
|
ChatMessage::user("first"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"orphan 1",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "orphan_1".to_string(),
|
||||||
|
name: "tool_a".to_string(),
|
||||||
|
arguments: serde_json::json!({}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
// Missing result for orphan_1
|
||||||
|
ChatMessage::user("second"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"orphan 2",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "orphan_2".to_string(),
|
||||||
|
name: "tool_b".to_string(),
|
||||||
|
arguments: serde_json::json!({}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
// Missing result for orphan_2
|
||||||
|
ChatMessage::user("third"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages);
|
||||||
|
assert_eq!(removed, 2);
|
||||||
|
assert_eq!(messages.len(), 3);
|
||||||
|
assert_eq!(messages[0].content, "first");
|
||||||
|
assert_eq!(messages[1].content, "second");
|
||||||
|
assert_eq!(messages[2].content, "third");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sanitize_removes_orphaned_tool_results_for_removed_mid_assistant() {
|
||||||
|
// When removing a mid-history assistant with partial tool results,
|
||||||
|
// both the assistant AND its orphaned tool results must be removed.
|
||||||
|
// Assistant has 2 tool calls, only 1 has a result → assistant is
|
||||||
|
// incomplete → both assistant and its lone tool result removed.
|
||||||
|
let mut messages = vec![
|
||||||
|
ChatMessage::user("first question"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"two tool calls, only one has result",
|
||||||
|
vec![
|
||||||
|
ToolCall {
|
||||||
|
id: "call_has_result".to_string(),
|
||||||
|
name: "tool_a".to_string(),
|
||||||
|
arguments: serde_json::json!({}),
|
||||||
|
},
|
||||||
|
ToolCall {
|
||||||
|
id: "call_no_result".to_string(),
|
||||||
|
name: "tool_b".to_string(),
|
||||||
|
arguments: serde_json::json!({}),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
ChatMessage::tool("call_has_result", "tool_a", "some result"),
|
||||||
|
// Missing tool result for call_no_result → incomplete sequence
|
||||||
|
ChatMessage::user("second question"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"valid tool call",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "call_valid".to_string(),
|
||||||
|
name: "good_tool".to_string(),
|
||||||
|
arguments: serde_json::json!({}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
ChatMessage::tool("call_valid", "good_tool", "valid result"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages);
|
||||||
|
// call_has_result/call_no_result assistant (1) + its orphaned tool result (1) = 2 removed
|
||||||
|
assert_eq!(removed, 2);
|
||||||
|
assert_eq!(messages.len(), 4);
|
||||||
|
assert_eq!(messages[0].content, "first question");
|
||||||
|
assert_eq!(messages[1].content, "second question");
|
||||||
|
assert_eq!(messages[2].role, "assistant");
|
||||||
|
assert_eq!(messages[3].role, "tool");
|
||||||
|
// Verify the remaining tool belongs to call_valid
|
||||||
|
assert_eq!(messages[3].tool_call_id.as_deref(), Some("call_valid"));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_sanitize_handles_complex_interleaved_history() {
|
||||||
|
// Complete → Orphaned → Complete: a realistic scenario after
|
||||||
|
// history compaction
|
||||||
|
let mut messages = vec![
|
||||||
|
ChatMessage::user("task 1"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"doing task 1",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "t1_call".to_string(),
|
||||||
|
name: "read".to_string(),
|
||||||
|
arguments: serde_json::json!({"path": "a.txt"}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
ChatMessage::tool("t1_call", "read", "content A"),
|
||||||
|
ChatMessage::assistant("task 1 is done"),
|
||||||
|
// End of task 1 — complete sequence
|
||||||
|
|
||||||
|
ChatMessage::user("task 2"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"doing task 2 — this got interrupted",
|
||||||
|
vec![
|
||||||
|
ToolCall {
|
||||||
|
id: "t2_call_1".to_string(),
|
||||||
|
name: "write".to_string(),
|
||||||
|
arguments: serde_json::json!({"path": "b.txt"}),
|
||||||
|
},
|
||||||
|
ToolCall {
|
||||||
|
id: "t2_call_2".to_string(),
|
||||||
|
name: "calculator".to_string(),
|
||||||
|
arguments: serde_json::json!({"expression": "2+2"}),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
),
|
||||||
|
// Missing BOTH tool results — process was killed here
|
||||||
|
// End of task 2 — orphaned sequence in the middle
|
||||||
|
|
||||||
|
ChatMessage::user("task 3"),
|
||||||
|
ChatMessage::assistant_with_tool_calls(
|
||||||
|
"doing task 3",
|
||||||
|
vec![ToolCall {
|
||||||
|
id: "t3_call".to_string(),
|
||||||
|
name: "search".to_string(),
|
||||||
|
arguments: serde_json::json!({"query": "hello"}),
|
||||||
|
}],
|
||||||
|
),
|
||||||
|
ChatMessage::tool("t3_call", "search", "found results"),
|
||||||
|
ChatMessage::assistant("task 3 is done"),
|
||||||
|
];
|
||||||
|
|
||||||
|
let removed = AgentLoop::sanitize_incomplete_tool_call_sequences(&mut messages);
|
||||||
|
// Removed: assistant with t2_call_1/t2_call_2 (1 message)
|
||||||
|
assert_eq!(removed, 1);
|
||||||
|
// Original 10 messages - 1 = 9
|
||||||
|
assert_eq!(messages.len(), 9);
|
||||||
|
assert_eq!(messages[4].content, "task 2");
|
||||||
|
assert_eq!(messages[5].content, "task 3");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|||||||
@ -1931,15 +1931,24 @@ impl FeishuChannel {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Restore all code block placeholders in the given text.
|
||||||
|
fn restore_code_blocks(text: &str, code_blocks: &[String]) -> String {
|
||||||
|
let mut result = text.to_string();
|
||||||
|
for (i, cb) in code_blocks.iter().enumerate() {
|
||||||
|
result = result.replace(&format!("\x00CODE{}\x00", i), cb);
|
||||||
|
}
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
let mut elements: Vec<serde_json::Value> = Vec::new();
|
let mut elements: Vec<serde_json::Value> = Vec::new();
|
||||||
let mut last_end = 0;
|
let mut last_end = 0;
|
||||||
|
|
||||||
for m in patterns.heading_re.find_iter(&protected) {
|
for m in patterns.heading_re.find_iter(&protected) {
|
||||||
let before = &protected[last_end..m.start()].trim();
|
let before = protected[last_end..m.start()].trim();
|
||||||
if !before.is_empty() {
|
if !before.is_empty() {
|
||||||
elements.push(serde_json::json!({
|
elements.push(serde_json::json!({
|
||||||
"tag": "markdown",
|
"tag": "markdown",
|
||||||
"content": before
|
"content": restore_code_blocks(before, &code_blocks)
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1963,14 +1972,9 @@ impl FeishuChannel {
|
|||||||
|
|
||||||
let remaining = protected[last_end..].trim();
|
let remaining = protected[last_end..].trim();
|
||||||
if !remaining.is_empty() {
|
if !remaining.is_empty() {
|
||||||
// Restore code blocks
|
|
||||||
let mut final_content = remaining.to_string();
|
|
||||||
for (i, cb) in code_blocks.iter().enumerate() {
|
|
||||||
final_content = final_content.replace(&format!("\x00CODE{}\x00", i), cb);
|
|
||||||
}
|
|
||||||
elements.push(serde_json::json!({
|
elements.push(serde_json::json!({
|
||||||
"tag": "markdown",
|
"tag": "markdown",
|
||||||
"content": final_content
|
"content": restore_code_blocks(remaining, &code_blocks)
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -133,6 +133,11 @@ impl InputAdapter for ChannelInputAdapter {
|
|||||||
return Ok(Some(Command::GetCurrentSession));
|
return Ok(Some(Command::GetCurrentSession));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 解析 /stop 命令 - 停止当前执行的 Agent
|
||||||
|
if trimmed == "/stop" {
|
||||||
|
return Ok(Some(Command::StopExecution));
|
||||||
|
}
|
||||||
|
|
||||||
// 解析 /help 命令 - 显示所有支持的命令
|
// 解析 /help 命令 - 显示所有支持的命令
|
||||||
if trimmed == "/help" {
|
if trimmed == "/help" {
|
||||||
return Ok(Some(Command::Help));
|
return Ok(Some(Command::Help));
|
||||||
|
|||||||
@ -134,6 +134,11 @@ impl InputAdapter for CliInputAdapter {
|
|||||||
return Ok(Some(Command::GetCurrentSession));
|
return Ok(Some(Command::GetCurrentSession));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 解析 /stop 命令 - 停止当前执行的 Agent
|
||||||
|
if trimmed == "/stop" {
|
||||||
|
return Ok(Some(Command::StopExecution));
|
||||||
|
}
|
||||||
|
|
||||||
// 解析 /help 命令 - 显示所有支持的命令
|
// 解析 /help 命令 - 显示所有支持的命令
|
||||||
if trimmed == "/help" {
|
if trimmed == "/help" {
|
||||||
return Ok(Some(Command::Help));
|
return Ok(Some(Command::Help));
|
||||||
|
|||||||
@ -11,6 +11,7 @@ pub mod load_topic;
|
|||||||
pub mod save_session;
|
pub mod save_session;
|
||||||
pub mod save_topic;
|
pub mod save_topic;
|
||||||
pub mod session;
|
pub mod session;
|
||||||
|
pub mod stop_execution;
|
||||||
pub mod switch_topic;
|
pub mod switch_topic;
|
||||||
|
|
||||||
// 导出公共函数供其他模块复用
|
// 导出公共函数供其他模块复用
|
||||||
|
|||||||
57
src/command/handlers/stop_execution.rs
Normal file
57
src/command/handlers/stop_execution.rs
Normal file
@ -0,0 +1,57 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
use crate::command::context::CommandContext;
|
||||||
|
use crate::command::handler::{CommandHandler, CommandMetadata};
|
||||||
|
use crate::command::response::{CommandError, CommandResponse, MessageKind};
|
||||||
|
use crate::command::Command;
|
||||||
|
use crate::gateway::cancel_manager::CancelManager;
|
||||||
|
|
||||||
|
/// 处理 StopExecution 命令:按话题取消当前正在执行的 Agent。
|
||||||
|
pub struct StopExecutionCommandHandler {
|
||||||
|
cancel_manager: CancelManager,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StopExecutionCommandHandler {
|
||||||
|
pub fn new(cancel_manager: CancelManager) -> Self {
|
||||||
|
Self { cancel_manager }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl CommandHandler for StopExecutionCommandHandler {
|
||||||
|
fn can_handle(&self, cmd: &Command) -> bool {
|
||||||
|
matches!(cmd, Command::StopExecution)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn metadata(&self) -> Option<CommandMetadata> {
|
||||||
|
Some(CommandMetadata {
|
||||||
|
name: "stop",
|
||||||
|
description: "停止当前话题正在执行的 Agent",
|
||||||
|
usage: "/stop",
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
&self,
|
||||||
|
_cmd: Command,
|
||||||
|
ctx: CommandContext,
|
||||||
|
) -> Result<CommandResponse, CommandError> {
|
||||||
|
let topic_id = match ctx.topic_id.as_deref() {
|
||||||
|
Some(id) => id,
|
||||||
|
None => {
|
||||||
|
return Ok(CommandResponse::success(ctx.request_id)
|
||||||
|
.with_message(MessageKind::Notification, "当前没有活跃的话题,无法停止"));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let cancelled = self.cancel_manager.cancel_by_topic(topic_id).await;
|
||||||
|
|
||||||
|
if cancelled {
|
||||||
|
Ok(CommandResponse::success(ctx.request_id)
|
||||||
|
.with_message(MessageKind::Notification, "正在停止当前任务..."))
|
||||||
|
} else {
|
||||||
|
Ok(CommandResponse::success(ctx.request_id)
|
||||||
|
.with_message(MessageKind::Notification, "当前没有正在执行的任务"))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -52,6 +52,8 @@ pub enum Command {
|
|||||||
channel: String,
|
channel: String,
|
||||||
chat_id: String,
|
chat_id: String,
|
||||||
},
|
},
|
||||||
|
/// 停止当前正在执行的 Agent
|
||||||
|
StopExecution,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Command {
|
impl Command {
|
||||||
@ -72,6 +74,7 @@ impl Command {
|
|||||||
Command::LoadTaskMessages { .. } => "load_task_messages",
|
Command::LoadTaskMessages { .. } => "load_task_messages",
|
||||||
Command::ListSchedulerJobs => "list_scheduler_jobs",
|
Command::ListSchedulerJobs => "list_scheduler_jobs",
|
||||||
Command::LoadChatMessages { .. } => "load_chat_messages",
|
Command::LoadChatMessages { .. } => "load_chat_messages",
|
||||||
|
Command::StopExecution => "stop_execution",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -23,6 +23,8 @@ pub(crate) struct AgentBuildRequest<'a> {
|
|||||||
pub(crate) sender_id: Option<&'a str>,
|
pub(crate) sender_id: Option<&'a str>,
|
||||||
pub(crate) message_id: Option<&'a str>,
|
pub(crate) message_id: Option<&'a str>,
|
||||||
pub(crate) provider_config: LLMProviderConfig,
|
pub(crate) provider_config: LLMProviderConfig,
|
||||||
|
/// 取消信号接收端(可选):Agent 在每次迭代时检查是否被取消
|
||||||
|
pub(crate) cancel_token: Option<tokio::sync::watch::Receiver<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AgentFactory {
|
impl AgentFactory {
|
||||||
@ -64,7 +66,7 @@ impl AgentFactory {
|
|||||||
let tool_chat_id = request
|
let tool_chat_id = request
|
||||||
.notification_chat_id
|
.notification_chat_id
|
||||||
.unwrap_or(request.session_chat_id);
|
.unwrap_or(request.session_chat_id);
|
||||||
agent.with_tool_context(ToolContext {
|
let mut agent = agent.with_tool_context(ToolContext {
|
||||||
channel_name: Some(request.channel_name.to_string()),
|
channel_name: Some(request.channel_name.to_string()),
|
||||||
sender_id: request.sender_id.map(str::to_string),
|
sender_id: request.sender_id.map(str::to_string),
|
||||||
chat_id: Some(tool_chat_id.to_string()),
|
chat_id: Some(tool_chat_id.to_string()),
|
||||||
@ -73,7 +75,12 @@ impl AgentFactory {
|
|||||||
message_id: request.message_id.map(str::to_string),
|
message_id: request.message_id.map(str::to_string),
|
||||||
message_seq: None,
|
message_seq: None,
|
||||||
subagent_description: None,
|
subagent_description: None,
|
||||||
})
|
});
|
||||||
|
// 如果有取消信号接收端,注入 Agent
|
||||||
|
if let Some(token) = request.cancel_token {
|
||||||
|
agent = agent.with_cancel_token(token);
|
||||||
|
}
|
||||||
|
agent
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
61
src/gateway/cancel_manager.rs
Normal file
61
src/gateway/cancel_manager.rs
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::{Mutex, watch};
|
||||||
|
|
||||||
|
/// 共享的 Agent 取消注册表。
|
||||||
|
///
|
||||||
|
/// 每个正在执行的 Agent 在启动前按 topic_id 注册一个 watch::Sender,
|
||||||
|
/// 外部(如 /stop 命令)通过 cancel_by_topic() 发送取消信号。
|
||||||
|
/// Agent 循环内部通过 watch::Receiver::has_changed() 检测取消。
|
||||||
|
///
|
||||||
|
/// key 使用 topic_id(UUID),全局唯一,精确到话题级别。
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub struct CancelManager {
|
||||||
|
tokens: Arc<Mutex<HashMap<String, watch::Sender<()>>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CancelManager {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
tokens: Arc::new(Mutex::new(HashMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 按 topic_id 注册一个取消通道,返回 receiver 供 Agent 持有。
|
||||||
|
///
|
||||||
|
/// 如果同 topic_id 已有注册,旧 sender 被覆盖并 drop,
|
||||||
|
/// 旧 receiver 将收到通道关闭信号。
|
||||||
|
pub async fn register(&self, topic_id: &str) -> watch::Receiver<()> {
|
||||||
|
let (tx, rx) = watch::channel(());
|
||||||
|
self.tokens.lock().await.insert(topic_id.to_string(), tx);
|
||||||
|
rx
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 按 topic_id 发送取消信号并移除注册条目。
|
||||||
|
///
|
||||||
|
/// 返回 `true` 表示找到了对应的任务并发送了取消信号,
|
||||||
|
/// 返回 `false` 表示没有找到对应的任务(可能已经完成或从未注册)。
|
||||||
|
pub async fn cancel_by_topic(&self, topic_id: &str) -> bool {
|
||||||
|
if let Some(tx) = self.tokens.lock().await.remove(topic_id) {
|
||||||
|
// send 可能失败(receiver 已被 drop),这不影响语义
|
||||||
|
let _ = tx.send(());
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 按 topic_id 正常完成后清理注册条目(幂等)。
|
||||||
|
///
|
||||||
|
/// 与 cancel_by_topic() 不同,此方法不发送取消信号,仅移除条目。
|
||||||
|
/// 如果条目已被 cancel_by_topic() 移除,此调用为 no-op。
|
||||||
|
pub async fn remove_by_topic(&self, topic_id: &str) {
|
||||||
|
self.tokens.lock().await.remove(topic_id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for CancelManager {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self::new()
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1,6 +1,7 @@
|
|||||||
pub mod agent_factory;
|
pub mod agent_factory;
|
||||||
pub mod agent_prompt_provider;
|
pub mod agent_prompt_provider;
|
||||||
pub mod agent_task_executor;
|
pub mod agent_task_executor;
|
||||||
|
pub mod cancel_manager;
|
||||||
pub mod cli_session;
|
pub mod cli_session;
|
||||||
pub mod command;
|
pub mod command;
|
||||||
pub mod compaction;
|
pub mod compaction;
|
||||||
@ -42,6 +43,7 @@ use crate::scheduler::Scheduler;
|
|||||||
use crate::skills::SkillRuntime;
|
use crate::skills::SkillRuntime;
|
||||||
use crate::tools::task::repository::TaskRepository;
|
use crate::tools::task::repository::TaskRepository;
|
||||||
use agent_task_executor::{AgentTaskExecutor, SchedulerMaintenanceService};
|
use agent_task_executor::{AgentTaskExecutor, SchedulerMaintenanceService};
|
||||||
|
use cancel_manager::CancelManager;
|
||||||
use outbound_dispatcher::OutboundDispatcher;
|
use outbound_dispatcher::OutboundDispatcher;
|
||||||
use processor::InboundProcessor;
|
use processor::InboundProcessor;
|
||||||
use runtime::build_session_manager_with_sender;
|
use runtime::build_session_manager_with_sender;
|
||||||
@ -55,6 +57,7 @@ pub struct GatewayState {
|
|||||||
pub channel_manager: ChannelManager,
|
pub channel_manager: ChannelManager,
|
||||||
pub bus: Arc<MessageBus>,
|
pub bus: Arc<MessageBus>,
|
||||||
pub task_repository: Arc<dyn TaskRepository>,
|
pub task_repository: Arc<dyn TaskRepository>,
|
||||||
|
pub cancel_manager: CancelManager,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GatewayState {
|
impl GatewayState {
|
||||||
@ -96,12 +99,15 @@ impl GatewayState {
|
|||||||
Some(bus.clone()),
|
Some(bus.clone()),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
|
let cancel_manager = CancelManager::new();
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
config,
|
config,
|
||||||
session_manager,
|
session_manager,
|
||||||
channel_manager,
|
channel_manager,
|
||||||
bus,
|
bus,
|
||||||
task_repository,
|
task_repository,
|
||||||
|
cancel_manager,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -122,7 +128,7 @@ impl GatewayState {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
let inbound_processor =
|
let inbound_processor =
|
||||||
InboundProcessor::new(self.bus.clone(), self.session_manager.clone(), semaphore, provider_config);
|
InboundProcessor::new(self.bus.clone(), self.session_manager.clone(), semaphore, provider_config, self.cancel_manager.clone());
|
||||||
tokio::spawn(inbound_processor.run());
|
tokio::spawn(inbound_processor.run());
|
||||||
|
|
||||||
// Spawn outbound dispatcher
|
// Spawn outbound dispatcher
|
||||||
|
|||||||
@ -14,9 +14,11 @@ use crate::command::handlers::load_topic::LoadTopicCommandHandler;
|
|||||||
use crate::command::handlers::save_session::SaveSessionCommandHandler;
|
use crate::command::handlers::save_session::SaveSessionCommandHandler;
|
||||||
use crate::command::handlers::save_topic::SaveTopicCommandHandler;
|
use crate::command::handlers::save_topic::SaveTopicCommandHandler;
|
||||||
use crate::command::handlers::session::SessionCommandHandler;
|
use crate::command::handlers::session::SessionCommandHandler;
|
||||||
|
use crate::command::handlers::stop_execution::StopExecutionCommandHandler;
|
||||||
use crate::command::handlers::switch_topic::SwitchTopicCommandHandler;
|
use crate::command::handlers::switch_topic::SwitchTopicCommandHandler;
|
||||||
use crate::config::LLMProviderConfig;
|
use crate::config::LLMProviderConfig;
|
||||||
use crate::gateway::agent_prompt_provider::AgentPromptProvider;
|
use crate::gateway::agent_prompt_provider::AgentPromptProvider;
|
||||||
|
use crate::gateway::cancel_manager::CancelManager;
|
||||||
use crate::providers::{create_provider, ProviderRuntimeConfig};
|
use crate::providers::{create_provider, ProviderRuntimeConfig};
|
||||||
use crate::skills::SkillPromptProvider;
|
use crate::skills::SkillPromptProvider;
|
||||||
use crate::storage::persistent_session_id;
|
use crate::storage::persistent_session_id;
|
||||||
@ -31,6 +33,7 @@ pub struct InboundProcessor {
|
|||||||
semaphore: Arc<Semaphore>,
|
semaphore: Arc<Semaphore>,
|
||||||
provider_config: LLMProviderConfig,
|
provider_config: LLMProviderConfig,
|
||||||
command_router: Arc<CommandRouter>,
|
command_router: Arc<CommandRouter>,
|
||||||
|
cancel_manager: CancelManager,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InboundProcessor {
|
impl InboundProcessor {
|
||||||
@ -39,6 +42,7 @@ impl InboundProcessor {
|
|||||||
session_manager: SessionManager,
|
session_manager: SessionManager,
|
||||||
semaphore: Arc<Semaphore>,
|
semaphore: Arc<Semaphore>,
|
||||||
provider_config: LLMProviderConfig,
|
provider_config: LLMProviderConfig,
|
||||||
|
cancel_manager: CancelManager,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
// 创建命令路由器并注册处理器
|
// 创建命令路由器并注册处理器
|
||||||
let mut command_router = CommandRouter::new();
|
let mut command_router = CommandRouter::new();
|
||||||
@ -97,12 +101,18 @@ impl InboundProcessor {
|
|||||||
let metadata = command_router.metadata_arc();
|
let metadata = command_router.metadata_arc();
|
||||||
command_router.register(Box::new(HelpCommandHandler::new(metadata)));
|
command_router.register(Box::new(HelpCommandHandler::new(metadata)));
|
||||||
|
|
||||||
|
// 注册 stop_execution 处理器
|
||||||
|
command_router.register(Box::new(StopExecutionCommandHandler::new(
|
||||||
|
cancel_manager.clone(),
|
||||||
|
)));
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
bus,
|
bus,
|
||||||
session_manager,
|
session_manager,
|
||||||
semaphore,
|
semaphore,
|
||||||
provider_config,
|
provider_config,
|
||||||
command_router: Arc::new(command_router),
|
command_router: Arc::new(command_router),
|
||||||
|
cancel_manager,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -236,6 +246,18 @@ impl InboundProcessor {
|
|||||||
current_topic.clone(),
|
current_topic.clone(),
|
||||||
));
|
));
|
||||||
|
|
||||||
|
// 保存 channel 和 chat_id 用于后续清理(因 match 中可能 move inbound)
|
||||||
|
let channel = inbound.channel.clone();
|
||||||
|
let chat_id = inbound.chat_id.clone();
|
||||||
|
|
||||||
|
// 按 topic_id 注册取消信号:Agent 构建时通过 Session 消费该 receiver
|
||||||
|
if let Some(ref topic_id) = current_topic {
|
||||||
|
let cancel_rx = self.cancel_manager.register(topic_id).await;
|
||||||
|
self.session_manager
|
||||||
|
.set_agent_cancel_token(&channel, &chat_id, cancel_rx)
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
|
||||||
match self
|
match self
|
||||||
.session_manager
|
.session_manager
|
||||||
.handle_message(
|
.handle_message(
|
||||||
@ -308,6 +330,11 @@ impl InboundProcessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 清理取消信号注册(幂等:如果已被 cancel_by_topic() 移除则为 no-op)
|
||||||
|
if let Some(ref topic_id) = current_topic {
|
||||||
|
self.cancel_manager.remove_by_topic(topic_id).await;
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -45,6 +45,9 @@ pub struct Session {
|
|||||||
compressor: ContextCompressor,
|
compressor: ContextCompressor,
|
||||||
history: SessionHistory,
|
history: SessionHistory,
|
||||||
store: Arc<SessionStore>,
|
store: Arc<SessionStore>,
|
||||||
|
/// 等待中的取消信号接收端(按 chat_id 索引)。
|
||||||
|
/// 在 Agent 执行前由外部注入,Agent 构建时消费。
|
||||||
|
pending_cancel_tokens: HashMap<String, tokio::sync::watch::Receiver<()>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct BusToolCallEmitter {
|
pub struct BusToolCallEmitter {
|
||||||
@ -163,6 +166,7 @@ impl Session {
|
|||||||
skill_events,
|
skill_events,
|
||||||
),
|
),
|
||||||
store,
|
store,
|
||||||
|
pending_cancel_tokens: HashMap::new(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -179,6 +183,19 @@ impl Session {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 存入待使用的取消信号接收端。
|
||||||
|
///
|
||||||
|
/// 在 Agent 执行前由处理器调用,Agent 构建时(create_agent)自动消费。
|
||||||
|
/// 每个 chat_id 同时只允许一个 pending token;新 token 会替换旧 token。
|
||||||
|
pub fn set_cancel_receiver(
|
||||||
|
&mut self,
|
||||||
|
chat_id: &str,
|
||||||
|
receiver: tokio::sync::watch::Receiver<()>,
|
||||||
|
) {
|
||||||
|
self.pending_cancel_tokens
|
||||||
|
.insert(chat_id.to_string(), receiver);
|
||||||
|
}
|
||||||
|
|
||||||
/// 获取当前话题 ID(指定 chat)
|
/// 获取当前话题 ID(指定 chat)
|
||||||
pub fn current_topic(&self, chat_id: &str) -> Option<&str> {
|
pub fn current_topic(&self, chat_id: &str) -> Option<&str> {
|
||||||
self.history.chat_topic(chat_id)
|
self.history.chat_topic(chat_id)
|
||||||
@ -420,7 +437,7 @@ impl Session {
|
|||||||
|
|
||||||
/// 创建一个临时的 AgentLoop 实例来处理消息
|
/// 创建一个临时的 AgentLoop 实例来处理消息
|
||||||
pub fn create_agent(
|
pub fn create_agent(
|
||||||
&self,
|
&mut self,
|
||||||
chat_id: &str,
|
chat_id: &str,
|
||||||
sender_id: Option<&str>,
|
sender_id: Option<&str>,
|
||||||
message_id: Option<&str>,
|
message_id: Option<&str>,
|
||||||
@ -435,13 +452,15 @@ impl Session {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_agent_with_provider_config(
|
pub fn create_agent_with_provider_config(
|
||||||
&self,
|
&mut self,
|
||||||
session_chat_id: &str,
|
session_chat_id: &str,
|
||||||
notification_chat_id: Option<&str>,
|
notification_chat_id: Option<&str>,
|
||||||
sender_id: Option<&str>,
|
sender_id: Option<&str>,
|
||||||
message_id: Option<&str>,
|
message_id: Option<&str>,
|
||||||
provider_config: LLMProviderConfig,
|
provider_config: LLMProviderConfig,
|
||||||
) -> Result<AgentLoop, AgentError> {
|
) -> Result<AgentLoop, AgentError> {
|
||||||
|
// 消费 pending 的取消信号接收端(如果存在)
|
||||||
|
let cancel_token = self.pending_cancel_tokens.remove(session_chat_id);
|
||||||
self.agent_factory.create(AgentBuildRequest {
|
self.agent_factory.create(AgentBuildRequest {
|
||||||
channel_name: &self.channel_name,
|
channel_name: &self.channel_name,
|
||||||
session_chat_id,
|
session_chat_id,
|
||||||
@ -449,6 +468,7 @@ impl Session {
|
|||||||
sender_id,
|
sender_id,
|
||||||
message_id,
|
message_id,
|
||||||
provider_config,
|
provider_config,
|
||||||
|
cancel_token,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -607,6 +627,20 @@ impl SessionManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// 存入 Agent 取消信号接收端,供 Agent 构建时消费。
|
||||||
|
///
|
||||||
|
/// 在 Agent 执行前由处理器调用。Agent 在 create_agent() 时自动取出。
|
||||||
|
pub async fn set_agent_cancel_token(
|
||||||
|
&self,
|
||||||
|
channel_name: &str,
|
||||||
|
chat_id: &str,
|
||||||
|
token: tokio::sync::watch::Receiver<()>,
|
||||||
|
) {
|
||||||
|
if let Some(session) = self.get(channel_name).await {
|
||||||
|
session.lock().await.set_cancel_receiver(chat_id, token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// 更新最后活跃时间
|
/// 更新最后活跃时间
|
||||||
pub async fn touch(&self, channel_name: &str) {
|
pub async fn touch(&self, channel_name: &str) {
|
||||||
self.lifecycle.touch(channel_name).await;
|
self.lifecycle.touch(channel_name).await;
|
||||||
|
|||||||
@ -17,6 +17,7 @@ use crate::command::handlers::load_task_messages::LoadTaskMessagesCommandHandler
|
|||||||
use crate::command::handlers::load_topic::LoadTopicCommandHandler;
|
use crate::command::handlers::load_topic::LoadTopicCommandHandler;
|
||||||
use crate::command::handlers::save_session::SaveSessionCommandHandler;
|
use crate::command::handlers::save_session::SaveSessionCommandHandler;
|
||||||
use crate::command::handlers::session::SessionCommandHandler;
|
use crate::command::handlers::session::SessionCommandHandler;
|
||||||
|
use crate::command::handlers::stop_execution::StopExecutionCommandHandler;
|
||||||
use crate::command::handlers::switch_topic::SwitchTopicCommandHandler;
|
use crate::command::handlers::switch_topic::SwitchTopicCommandHandler;
|
||||||
use crate::gateway::agent_prompt_provider::AgentPromptProvider;
|
use crate::gateway::agent_prompt_provider::AgentPromptProvider;
|
||||||
use crate::protocol::{WsInbound, WsOutbound, MediaSummary, parse_inbound, serialize_outbound};
|
use crate::protocol::{WsInbound, WsOutbound, MediaSummary, parse_inbound, serialize_outbound};
|
||||||
@ -405,6 +406,10 @@ async fn handle_inbound(
|
|||||||
router.register(Box::new(ListSchedulerJobsCommandHandler::new(store.clone())));
|
router.register(Box::new(ListSchedulerJobsCommandHandler::new(store.clone())));
|
||||||
// 注册 load_chat_messages 处理器
|
// 注册 load_chat_messages 处理器
|
||||||
router.register(Box::new(LoadChatMessagesCommandHandler::new()));
|
router.register(Box::new(LoadChatMessagesCommandHandler::new()));
|
||||||
|
// 注册 stop_execution 处理器
|
||||||
|
router.register(Box::new(StopExecutionCommandHandler::new(
|
||||||
|
state.cancel_manager.clone(),
|
||||||
|
)));
|
||||||
|
|
||||||
// 构建命令上下文
|
// 构建命令上下文
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
|
|||||||
@ -187,6 +187,8 @@ pub enum WsOutbound {
|
|||||||
SchedulerJobList {
|
SchedulerJobList {
|
||||||
jobs: Vec<SchedulerJobSummary>,
|
jobs: Vec<SchedulerJobSummary>,
|
||||||
},
|
},
|
||||||
|
#[serde(rename = "execution_cancelled")]
|
||||||
|
ExecutionCancelled { message: String },
|
||||||
#[serde(rename = "pong")]
|
#[serde(rename = "pong")]
|
||||||
Pong,
|
Pong,
|
||||||
}
|
}
|
||||||
|
|||||||
@ -155,6 +155,7 @@ fn test_tool_result_outbound_serialization() {
|
|||||||
content: "工具结果: calculator\n\n2".to_string(),
|
content: "工具结果: calculator\n\n2".to_string(),
|
||||||
role: "tool".to_string(),
|
role: "tool".to_string(),
|
||||||
subagent_task_id: None,
|
subagent_task_id: None,
|
||||||
|
duration_ms: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let json = serde_json::to_string(&msg).unwrap();
|
let json = serde_json::to_string(&msg).unwrap();
|
||||||
|
|||||||
@ -51,6 +51,7 @@ function App() {
|
|||||||
requestTopicList,
|
requestTopicList,
|
||||||
enterSubAgentView,
|
enterSubAgentView,
|
||||||
exitSubAgentView,
|
exitSubAgentView,
|
||||||
|
handleStop,
|
||||||
} = useChat()
|
} = useChat()
|
||||||
|
|
||||||
const { status, sendMessage } = useWebSocket({
|
const { status, sendMessage } = useWebSocket({
|
||||||
@ -127,6 +128,9 @@ function App() {
|
|||||||
case 'save':
|
case 'save':
|
||||||
cmd = { type: 'save_topic', filepath: args[0] || undefined, include_subagents: false }
|
cmd = { type: 'save_topic', filepath: args[0] || undefined, include_subagents: false }
|
||||||
break
|
break
|
||||||
|
case 'stop':
|
||||||
|
cmd = { type: 'stop_execution' }
|
||||||
|
break
|
||||||
default:
|
default:
|
||||||
alert(`Unknown command: /${command}`)
|
alert(`Unknown command: /${command}`)
|
||||||
return
|
return
|
||||||
@ -147,6 +151,12 @@ function App() {
|
|||||||
[sendMessage, handleMessage, handleCommand, sessionId, chatId, isReadOnly]
|
[sendMessage, handleMessage, handleCommand, sessionId, chatId, isReadOnly]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const handleStopExecution = useCallback(() => {
|
||||||
|
const cmd = handleStop()
|
||||||
|
handleCommand(cmd)
|
||||||
|
sendMessage({ type: 'command', payload: JSON.stringify(cmd) })
|
||||||
|
}, [sendMessage, handleCommand, handleStop])
|
||||||
|
|
||||||
const handleCreateTopic = useCallback(() => {
|
const handleCreateTopic = useCallback(() => {
|
||||||
if (isReadOnly || !sessionId) {
|
if (isReadOnly || !sessionId) {
|
||||||
return
|
return
|
||||||
@ -415,6 +425,7 @@ function App() {
|
|||||||
}
|
}
|
||||||
onSendMessage={subAgentView || schedulerView ? () => {} : handleSendMessage}
|
onSendMessage={subAgentView || schedulerView ? () => {} : handleSendMessage}
|
||||||
onNavigateToSubAgent={handleNavigateToSubAgent}
|
onNavigateToSubAgent={handleNavigateToSubAgent}
|
||||||
|
onStop={handleStopExecution}
|
||||||
/>
|
/>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|||||||
@ -9,6 +9,7 @@ interface ChatContainerProps {
|
|||||||
channelName?: string
|
channelName?: string
|
||||||
onSendMessage: (content: string, attachments: Attachment[]) => void
|
onSendMessage: (content: string, attachments: Attachment[]) => void
|
||||||
onNavigateToSubAgent?: (taskId: string, description: string) => void
|
onNavigateToSubAgent?: (taskId: string, description: string) => void
|
||||||
|
onStop?: () => void
|
||||||
}
|
}
|
||||||
|
|
||||||
export function ChatContainer({
|
export function ChatContainer({
|
||||||
@ -18,6 +19,7 @@ export function ChatContainer({
|
|||||||
channelName,
|
channelName,
|
||||||
onSendMessage,
|
onSendMessage,
|
||||||
onNavigateToSubAgent,
|
onNavigateToSubAgent,
|
||||||
|
onStop,
|
||||||
}: ChatContainerProps) {
|
}: ChatContainerProps) {
|
||||||
return (
|
return (
|
||||||
<div className="flex h-full flex-col">
|
<div className="flex h-full flex-col">
|
||||||
@ -26,6 +28,7 @@ export function ChatContainer({
|
|||||||
</div>
|
</div>
|
||||||
<MessageInput
|
<MessageInput
|
||||||
onSend={onSendMessage}
|
onSend={onSendMessage}
|
||||||
|
onStop={onStop}
|
||||||
disabled={isLoading}
|
disabled={isLoading}
|
||||||
isLoading={isLoading}
|
isLoading={isLoading}
|
||||||
isReadOnly={isReadOnly}
|
isReadOnly={isReadOnly}
|
||||||
|
|||||||
@ -207,7 +207,7 @@ export function MessageBubble({ message, onNavigateToSubAgent }: MessageBubblePr
|
|||||||
const displayContent = hasResult ? stripToolResultPrefix(message.resultContent!) : ''
|
const displayContent = hasResult ? stripToolResultPrefix(message.resultContent!) : ''
|
||||||
|
|
||||||
const isTaskTool = message.toolName === 'task'
|
const isTaskTool = message.toolName === 'task'
|
||||||
const taskResult = isTaskTool && hasResult ? parseTaskResult(message.resultContent!) : null
|
const taskResult = isTaskTool && hasResult ? parseTaskResult(displayContent) : null
|
||||||
const isSubAgent = !!message.subagentTaskId
|
const isSubAgent = !!message.subagentTaskId
|
||||||
const subagentType = (message.arguments as Record<string, unknown> | null)?.subagent_type as string || 'general'
|
const subagentType = (message.arguments as Record<string, unknown> | null)?.subagent_type as string || 'general'
|
||||||
const taskDescription = (message.arguments as Record<string, unknown> | null)?.description as string || ''
|
const taskDescription = (message.arguments as Record<string, unknown> | null)?.description as string || ''
|
||||||
|
|||||||
@ -1,4 +1,4 @@
|
|||||||
import { Send, Loader2, Sparkles, Eye, Paperclip, X, FileIcon, ImageIcon, MusicIcon, VideoIcon } from 'lucide-react'
|
import { Send, Loader2, Square, Sparkles, Eye, Paperclip, X, FileIcon, ImageIcon, MusicIcon, VideoIcon } from 'lucide-react'
|
||||||
import { useState, useRef, useEffect } from 'react'
|
import { useState, useRef, useEffect } from 'react'
|
||||||
import type { Attachment } from '../../types/protocol'
|
import type { Attachment } from '../../types/protocol'
|
||||||
|
|
||||||
@ -6,6 +6,7 @@ const MAX_FILE_SIZE = 50 * 1024 * 1024 // 50MB
|
|||||||
|
|
||||||
interface MessageInputProps {
|
interface MessageInputProps {
|
||||||
onSend: (content: string, attachments: Attachment[]) => void
|
onSend: (content: string, attachments: Attachment[]) => void
|
||||||
|
onStop?: () => void
|
||||||
disabled?: boolean
|
disabled?: boolean
|
||||||
isLoading?: boolean
|
isLoading?: boolean
|
||||||
placeholder?: string
|
placeholder?: string
|
||||||
@ -29,6 +30,7 @@ function getMediaType(mimeType: string): string {
|
|||||||
|
|
||||||
export function MessageInput({
|
export function MessageInput({
|
||||||
onSend,
|
onSend,
|
||||||
|
onStop,
|
||||||
disabled = false,
|
disabled = false,
|
||||||
isLoading = false,
|
isLoading = false,
|
||||||
placeholder = '输入消息...按 / 查看命令',
|
placeholder = '输入消息...按 / 查看命令',
|
||||||
@ -366,18 +368,28 @@ export function MessageInput({
|
|||||||
<Sparkles className="absolute right-3 top-1/2 -translate-y-1/2 h-4 w-4 text-zinc-600 pointer-events-none" />
|
<Sparkles className="absolute right-3 top-1/2 -translate-y-1/2 h-4 w-4 text-zinc-600 pointer-events-none" />
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{/* 发送按钮 */}
|
{/* 发送/停止按钮 */}
|
||||||
<button
|
{isLoading && onStop ? (
|
||||||
onClick={handleSend}
|
<button
|
||||||
disabled={disabled || (!content.trim() && attachments.length === 0)}
|
onClick={onStop}
|
||||||
className="flex h-10 w-10 shrink-0 items-center justify-center rounded-xl bg-gradient-to-r from-[#00f0ff] to-[#3b82f6] text-white shadow-lg shadow-[#00f0ff]/20 hover:shadow-xl hover:shadow-[#00f0ff]/30 hover:scale-105 disabled:opacity-50 disabled:hover:scale-100 disabled:cursor-not-allowed transition-all"
|
title="停止执行"
|
||||||
>
|
className="flex h-10 w-10 shrink-0 items-center justify-center rounded-xl bg-red-500/10 border border-red-500/20 text-red-400 hover:bg-red-500/20 hover:text-red-300 transition-all"
|
||||||
{disabled ? (
|
>
|
||||||
<Loader2 className="h-4 w-4 animate-spin" />
|
<Square className="h-4 w-4" />
|
||||||
) : (
|
</button>
|
||||||
<Send className="h-4 w-4" />
|
) : (
|
||||||
)}
|
<button
|
||||||
</button>
|
onClick={handleSend}
|
||||||
|
disabled={disabled || (!content.trim() && attachments.length === 0)}
|
||||||
|
className="flex h-10 w-10 shrink-0 items-center justify-center rounded-xl bg-gradient-to-r from-[#00f0ff] to-[#3b82f6] text-white shadow-lg shadow-[#00f0ff]/20 hover:shadow-xl hover:shadow-[#00f0ff]/30 hover:scale-105 disabled:opacity-50 disabled:hover:scale-100 disabled:cursor-not-allowed transition-all"
|
||||||
|
>
|
||||||
|
{disabled ? (
|
||||||
|
<Loader2 className="h-4 w-4 animate-spin" />
|
||||||
|
) : (
|
||||||
|
<Send className="h-4 w-4" />
|
||||||
|
)}
|
||||||
|
</button>
|
||||||
|
)}
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{/* 提示 */}
|
{/* 提示 */}
|
||||||
|
|||||||
@ -72,6 +72,9 @@ interface UseChatReturn {
|
|||||||
schedulerView: SchedulerJobView | null
|
schedulerView: SchedulerJobView | null
|
||||||
enterSchedulerJobView: (lookup: SchedulerJobSessionLookup, jobId: string, description: string) => Command
|
enterSchedulerJobView: (lookup: SchedulerJobSessionLookup, jobId: string, description: string) => Command
|
||||||
exitSchedulerJobView: () => void
|
exitSchedulerJobView: () => void
|
||||||
|
|
||||||
|
// 停止当前 Agent 执行
|
||||||
|
handleStop: () => Command
|
||||||
}
|
}
|
||||||
|
|
||||||
interface SubAgentView {
|
interface SubAgentView {
|
||||||
@ -257,8 +260,8 @@ export function useChat(): UseChatReturn {
|
|||||||
const msgSubagentTaskId = getSubagentTaskId(message)
|
const msgSubagentTaskId = getSubagentTaskId(message)
|
||||||
if (msgSubagentTaskId && msgSubagentTaskId === currentSubAgentView.taskId) {
|
if (msgSubagentTaskId && msgSubagentTaskId === currentSubAgentView.taskId) {
|
||||||
appendToSubAgentViewMessage(message)
|
appendToSubAgentViewMessage(message)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
return
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// In main view, skip sub-agent messages (they belong to sub-agent view).
|
// In main view, skip sub-agent messages (they belong to sub-agent view).
|
||||||
@ -410,6 +413,21 @@ export function useChat(): UseChatReturn {
|
|||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case 'execution_cancelled': {
|
||||||
|
setMessages((prev) => [
|
||||||
|
...prev,
|
||||||
|
{
|
||||||
|
id: generateMessageId(),
|
||||||
|
role: 'assistant',
|
||||||
|
content: (message as { type: 'execution_cancelled'; message: string }).message,
|
||||||
|
timestamp: Date.now(),
|
||||||
|
type: 'message',
|
||||||
|
},
|
||||||
|
])
|
||||||
|
setIsLoading(false)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
case 'error': {
|
case 'error': {
|
||||||
setMessages((prev) => [
|
setMessages((prev) => [
|
||||||
...prev,
|
...prev,
|
||||||
@ -568,6 +586,10 @@ export function useChat(): UseChatReturn {
|
|||||||
setSchedulerView(null)
|
setSchedulerView(null)
|
||||||
}, [])
|
}, [])
|
||||||
|
|
||||||
|
const handleStop = useCallback((): Command => {
|
||||||
|
return { type: 'stop_execution' }
|
||||||
|
}, [])
|
||||||
|
|
||||||
// Memoize messages: sub-agent view > scheduler view > main
|
// Memoize messages: sub-agent view > scheduler view > main
|
||||||
const resolvedMessages = useMemo(() => {
|
const resolvedMessages = useMemo(() => {
|
||||||
if (subAgentView) {
|
if (subAgentView) {
|
||||||
@ -612,5 +634,6 @@ export function useChat(): UseChatReturn {
|
|||||||
schedulerView,
|
schedulerView,
|
||||||
enterSchedulerJobView,
|
enterSchedulerJobView,
|
||||||
exitSchedulerJobView,
|
exitSchedulerJobView,
|
||||||
|
handleStop,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -192,6 +192,11 @@ export interface TaskMessagesLoaded {
|
|||||||
summary?: string
|
summary?: string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface ExecutionCancelled {
|
||||||
|
type: 'execution_cancelled'
|
||||||
|
message: string
|
||||||
|
}
|
||||||
|
|
||||||
export type WsOutbound =
|
export type WsOutbound =
|
||||||
| AssistantResponse
|
| AssistantResponse
|
||||||
| ToolCall
|
| ToolCall
|
||||||
@ -207,6 +212,7 @@ export type WsOutbound =
|
|||||||
| ChannelList
|
| ChannelList
|
||||||
| TaskMessagesLoaded
|
| TaskMessagesLoaded
|
||||||
| SchedulerJobList
|
| SchedulerJobList
|
||||||
|
| ExecutionCancelled
|
||||||
| Pong
|
| Pong
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
@ -284,6 +290,10 @@ export interface LoadChatMessagesCommand {
|
|||||||
chat_id: string
|
chat_id: string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export interface StopExecutionCommand {
|
||||||
|
type: 'stop_execution'
|
||||||
|
}
|
||||||
|
|
||||||
export type Command =
|
export type Command =
|
||||||
| CreateSessionCommand
|
| CreateSessionCommand
|
||||||
| ListSessionsCommand
|
| ListSessionsCommand
|
||||||
@ -299,6 +309,7 @@ export type Command =
|
|||||||
| LoadTaskMessagesCommand
|
| LoadTaskMessagesCommand
|
||||||
| ListSchedulerJobsCommand
|
| ListSchedulerJobsCommand
|
||||||
| LoadChatMessagesCommand
|
| LoadChatMessagesCommand
|
||||||
|
| StopExecutionCommand
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// UI Types
|
// UI Types
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user