Compare commits

...

3 Commits

Author SHA1 Message Date
e37dea886b refactor(tools): 优化交互式进程输出捕获逻辑
- 删除了 PendingUserAction 相关的冗余辅助消息发送代码
- 引入自适应 drain_until_stable 函数循环读取输出直到稳定
- 用 drain_until_stable 替代固定延时等待以捕获最终提示内容
- 确保进程等待 stdin 时完整且及时地捕获所有输出数据
- 移除过时的常量和注释,简化代码逻辑
- 保持对最大循环次数和间隔时间的限制防止死循环
2026-06-13 17:40:44 +08:00
640829ce52 fix(agent): 优化等待工具输出内容的提取逻辑
- 跳过标记行、session_id元数据和空行
- 跳过提示行,提取提示行之后的实际内容
- 限制提取内容最多20行,防止消息过长
- 当提取内容为空时,使用默认提示消息
- 改善助手消息的显示内容格式
2026-06-13 17:40:30 +08:00
229221aab1 refactor(todo): 重构待办事项管理逻辑及更新状态规则
- 移除 TodoItem 中的 priority、created_at 和 updated_at 字段
- 强制每个任务都必须有唯一 id,且由用户负责生成
- 修改合并模式逻辑,merge=true 下保留未提及的旧任务
- 支持已完成和已取消任务重新激活(状态改回 pending 或 in_progress)
- 禁止 in_progress 状态退回到 pending,必须标记为 completed 或 cancelled
- 优化状态转换校验,允许特定状态间合法切换
- 简化任务变更消息,移除详细的新增/更新/移除统计
- 更新文档和示例,明确 id 必须由用户生成和使用
- 修复和补充测试,增强状态转换和合并模式验证
- 调整任务时间戳生成逻辑,统一使用当前时间及索引
- 该变更提供更合理的任务状态机械及管理模式,提升稳定性和易用性
2026-06-13 17:38:18 +08:00
8 changed files with 340 additions and 498 deletions

View File

@ -20,8 +20,6 @@ use std::time::Instant;
/// Minimum characters to keep when truncating /// Minimum characters to keep when truncating
const TRUNCATION_SUFFIX_LEN: usize = 200; const TRUNCATION_SUFFIX_LEN: usize = 200;
const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__"; const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__";
const DEFAULT_PENDING_ASSISTANT_MESSAGE: &str =
"工具已经启动并进入等待用户操作的状态。请先完成外部操作,完成后直接告诉我继续。";
const RECOVERABLE_LLM_ERROR_MESSAGE: &str = "模型服务暂时不可用或响应超时。请稍后重试。"; const RECOVERABLE_LLM_ERROR_MESSAGE: &str = "模型服务暂时不可用或响应超时。请稍后重试。";
const SUPPORTED_IMAGE_MIME_TYPES: &[&str] = &["image/jpeg", "image/png", "image/gif", "image/webp"]; const SUPPORTED_IMAGE_MIME_TYPES: &[&str] = &["image/jpeg", "image/png", "image/gif", "image/webp"];
@ -1091,31 +1089,9 @@ impl AgentLoop {
} }
} }
if let Some((tool_call, pending_result)) = response
.tool_calls
.iter()
.zip(tool_results.iter())
.find(|(_, result)| result.state == ToolExecutionState::PendingUserAction)
{
let assistant_message = ChatMessage::assistant(format!(
"{}\n\n当前等待中的工具: {}",
pending_result
.output
.lines()
.next()
.filter(|line| !line.trim().is_empty())
.unwrap_or(DEFAULT_PENDING_ASSISTANT_MESSAGE),
tool_call.name,
));
emitted_messages.push(assistant_message.clone());
self.emit_live_tool_call_message(assistant_message.clone()).await;
return Ok(AgentProcessResult {
final_response: assistant_message,
emitted_messages,
});
}
// Loop continues to next iteration with updated messages // Loop continues to next iteration with updated messages
// PendingUserAction 工具的结果已在上方加入 messages
// 模型将在下一轮看到完整的终端输出并生成智能回复
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::debug!( tracing::debug!(
iteration, iteration,

View File

@ -76,9 +76,6 @@ impl CommandHandler for ListTodosCommandHandler {
id: r.id, id: r.id,
content: r.content, content: r.content,
status: r.status, status: r.status,
priority: r.priority,
created_at: r.created_at,
updated_at: r.updated_at,
}) })
.collect(); .collect();

View File

@ -141,9 +141,15 @@ impl BusToolCallEmitter {
let topic_id = self.metadata.get("topic_id").filter(|t| !t.is_empty()).cloned(); let topic_id = self.metadata.get("topic_id").filter(|t| !t.is_empty()).cloned();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64;
let records: Vec<crate::storage::TodoRecord> = todos_array let records: Vec<crate::storage::TodoRecord> = todos_array
.iter() .iter()
.filter_map(|item| { .enumerate()
.filter_map(|(idx, item)| {
Some(crate::storage::TodoRecord { Some(crate::storage::TodoRecord {
id: item.get("id")?.as_str()?.to_string(), id: item.get("id")?.as_str()?.to_string(),
scope_key: scope_key.clone(), scope_key: scope_key.clone(),
@ -151,9 +157,9 @@ impl BusToolCallEmitter {
topic_id: topic_id.clone(), topic_id: topic_id.clone(),
content: item.get("content")?.as_str()?.to_string(), content: item.get("content")?.as_str()?.to_string(),
status: item.get("status")?.as_str()?.to_string(), status: item.get("status")?.as_str()?.to_string(),
priority: item.get("priority")?.as_str()?.to_string(), priority: "medium".to_string(),
created_at: item.get("created_at")?.as_i64()?, created_at: now + idx as i64,
updated_at: item.get("updated_at")?.as_i64()?, updated_at: now,
}) })
}) })
.collect(); .collect();

View File

@ -28,7 +28,7 @@ const TODO_WRITE_INSTRUCTIONS: &str = r#"
### merge ### merge
- `merge: false` todo - `merge: false` todo
- `merge: true` **使 merge=true id** - `merge: true` **使 merge=true**
### ###
- `pending` - `pending`
@ -39,33 +39,34 @@ const TODO_WRITE_INSTRUCTIONS: &str = r#"
### ###
1. `in_progress` 1. `in_progress`
2. `in_progress` 2. `in_progress`
3. `completed` `cancelled` 3. `completed` `cancelled` `in_progress` `pending`
4. completed 4. `in_progress` 退 `pending` `completed` `cancelled`
5. `content` 5. completed
6. ** `id`** id idid todo_write `current_todos` 6. `content`
7. ** `id`** id `"r9Tg8Kq2"`使 idid todo_write `current_todos`
### 使 ### 使
pending in_progress id id
```json ```json
{"merge": true, "todos": [{"content": "修复登录 bug", "status": "in_progress"}]} {"merge": true, "todos": [{"id": "aB3kLm9x", "content": "修复登录 bug", "status": "in_progress"}]}
``` ```
```json ```json
{"merge": true, "todos": [{"content": "补充测试", "status": "pending"}]} {"merge": true, "todos": [{"id": "pQ7nWy2z", "content": "补充测试", "status": "pending"}]}
``` ```
** id** current_todos 使 id
```json ```json
{"merge": true, "todos": [{"id": "abc-123", "content": "修复登录 bug", "status": "completed"}]} {"merge": true, "todos": [{"id": "aB3kLm9x", "content": "修复登录 bug", "status": "completed"}]}
``` ```
```json ```json
{"merge": true, "todos": [ {"merge": true, "todos": [
{"id": "abc-123", "content": "修复登录 bug", "status": "completed"}, {"id": "aB3kLm9x", "content": "修复登录 bug", "status": "completed"},
{"content": "代码审查", "status": "in_progress"} {"id": "pQ7nWy2z", "content": "补充测试", "status": "in_progress"}
]} ]}
``` ```
"#; "#;

View File

@ -88,9 +88,6 @@ pub struct TodoItemSummary {
pub id: String, pub id: String,
pub content: String, pub content: String,
pub status: String, pub status: String,
pub priority: String,
pub created_at: i64,
pub updated_at: i64,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]

View File

@ -182,12 +182,17 @@ impl BashTool {
let session_line = session_id let session_line = session_id
.map(|id| format!("[session_id: {}]\n", id)) .map(|id| format!("[session_id: {}]\n", id))
.unwrap_or_default(); .unwrap_or_default();
let output_section = if output.trim().is_empty() {
"(进程尚未输出内容。进程正在等待输入,请使用 session_id 和 stdin_input 参数发送输入内容。)"
} else {
&self.truncate_output(output.trim())
};
format!( format!(
"{}\n{}{}\n\n{}", "{}\n{}{}\n\n{}",
PENDING_USER_ACTION_MARKER, PENDING_USER_ACTION_MARKER,
session_line, session_line,
hint, hint,
self.truncate_output(output.trim()) output_section
) )
} }
@ -260,6 +265,38 @@ async fn drain_available_chunks(
} }
} }
/// 自适应 drain循环读取直到输出稳定确保进程的所有提示内容都被捕获
///
/// - 每次 drain 后等待 200ms 再检查是否有新数据
/// - 最多循环 10 次(即最多等待 2 秒)
/// - 如果连续一次 drain 没有新数据,立即返回
async fn drain_until_stable(
rx: &mut mpsc::UnboundedReceiver<(bool, String)>,
stdout_buf: &Arc<Mutex<String>>,
stderr_buf: &Arc<Mutex<String>>,
) {
const DRAIN_INTERVAL_MS: u64 = 200;
const MAX_DRAIN_ROUNDS: u32 = 10;
for _ in 0..MAX_DRAIN_ROUNDS {
let prev_stdout_len = stdout_buf.lock().await.len();
let prev_stderr_len = stderr_buf.lock().await.len();
drain_available_chunks(rx, stdout_buf, stderr_buf).await;
let new_stdout_len = stdout_buf.lock().await.len();
let new_stderr_len = stderr_buf.lock().await.len();
// 如果没有新数据,说明输出已稳定
if new_stdout_len == prev_stdout_len && new_stderr_len == prev_stderr_len {
break;
}
// 有新数据,等待后再次检查
tokio::time::sleep(Duration::from_millis(DRAIN_INTERVAL_MS)).await;
}
}
impl Default for BashTool { impl Default for BashTool {
fn default() -> Self { fn default() -> Self {
Self::new(Arc::new(ShellSessionManager::new())) Self::new(Arc::new(ShellSessionManager::new()))
@ -472,25 +509,25 @@ impl BashTool {
// Periodic safety net: check OS-level process state // Periodic safety net: check OS-level process state
if let Some(pid) = child.id() { if let Some(pid) = child.id() {
if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) { if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) {
// 自适应 drain等待输出稳定
if let Some(rx_ref) = rx.as_mut() { if let Some(rx_ref) = rx.as_mut() {
drain_available_chunks(rx_ref, &stdout_buf, &stderr_buf).await; drain_until_stable(rx_ref, &stdout_buf, &stderr_buf).await;
} }
let combined = format_command_output(&stdout_buf.lock().await, &stderr_buf.lock().await, None); let combined = format_command_output(&stdout_buf.lock().await, &stderr_buf.lock().await, None);
if !combined.trim().is_empty() { // 始终创建 session即使输出为空进程可能还没写出提示
if let Some(stdin) = child_stdin { if let Some(stdin) = child_stdin {
if let Some(rx_val) = rx.take() { if let Some(rx_val) = rx.take() {
let session_id = self.session_manager.save_session( let session_id = self.session_manager.save_session(
child, stdin, rx_val, child, stdin, rx_val,
stdout_buf.lock().await.clone(), stdout_buf.lock().await.clone(),
stderr_buf.lock().await.clone(), stderr_buf.lock().await.clone(),
).await; ).await;
return Ok(self.pending_output(&combined, Some(&session_id))); return Ok(self.pending_output(&combined, Some(&session_id)));
}
} }
let _ = child.start_kill();
let _ = child.wait().await;
return Ok(self.pending_output(&combined, None));
} }
let _ = child.start_kill();
let _ = child.wait().await;
return Ok(self.pending_output(&combined, None));
} }
} }
@ -523,9 +560,12 @@ impl BashTool {
// OS-level check: if blocked on stdin, save as session // OS-level check: if blocked on stdin, save as session
if let Some(pid) = child.id() { if let Some(pid) = child.id() {
if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) {
&& !combined.trim().is_empty() // 自适应 drain等待输出稳定
{ if let Some(rx_ref) = rx.as_mut() {
drain_until_stable(rx_ref, &stdout_buf, &stderr_buf).await;
}
let combined = format_command_output(&stdout_buf.lock().await, &stderr_buf.lock().await, None);
if let Some(stdin) = child_stdin { if let Some(stdin) = child_stdin {
if let Some(rx_val) = rx.take() { if let Some(rx_val) = rx.take() {
let session_id = self.session_manager.save_session( let session_id = self.session_manager.save_session(

View File

@ -57,20 +57,20 @@ impl Default for InMemoryTaskRepository {
#[async_trait] #[async_trait]
impl TaskRepository for InMemoryTaskRepository { impl TaskRepository for InMemoryTaskRepository {
async fn save_task_session(&self, session: &TaskSession) -> Result<(), StorageError> { async fn save_task_session(&self, session: &TaskSession) -> Result<(), StorageError> {
tracing::warn!( tracing::debug!(
task_id = %session.id, task_id = %session.id,
session_id = %session.session_id, session_id = %session.session_id,
state = ?session.state, state = ?session.state,
"REPO_SAVE: Saving task session" "Saving task session"
); );
self.sessions self.sessions
.write() .write()
.unwrap() .unwrap()
.insert(session.id.clone(), session.clone()); .insert(session.id.clone(), session.clone());
tracing::warn!( tracing::debug!(
task_id = %session.id, task_id = %session.id,
total_tasks = self.sessions.read().unwrap().len(), total_tasks = self.sessions.read().unwrap().len(),
"REPO_SAVE: Task session saved, current repository size" "Task session saved, current repository size"
); );
Ok(()) Ok(())
} }
@ -79,11 +79,11 @@ impl TaskRepository for InMemoryTaskRepository {
let sessions = self.sessions.read().unwrap(); let sessions = self.sessions.read().unwrap();
let total = sessions.len(); let total = sessions.len();
let keys: Vec<&str> = sessions.keys().map(|k| k.as_str()).collect(); let keys: Vec<&str> = sessions.keys().map(|k| k.as_str()).collect();
tracing::warn!( tracing::debug!(
lookup_task_id = %task_id, lookup_task_id = %task_id,
total_tasks = total, total_tasks = total,
all_keys = ?keys, all_keys = ?keys,
"REPO_LOOKUP: Looking up task session" "Looking up task session"
); );
Ok(sessions.get(task_id).cloned()) Ok(sessions.get(task_id).cloned())
} }

File diff suppressed because it is too large Load Diff