diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index c89b29e..68a08b5 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -20,8 +20,6 @@ use std::time::Instant; /// Minimum characters to keep when truncating const TRUNCATION_SUFFIX_LEN: usize = 200; const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__"; -const DEFAULT_PENDING_ASSISTANT_MESSAGE: &str = - "工具已经启动并进入等待用户操作的状态。请先完成外部操作,完成后直接告诉我继续。"; const RECOVERABLE_LLM_ERROR_MESSAGE: &str = "模型服务暂时不可用或响应超时。请稍后重试。"; const SUPPORTED_IMAGE_MIME_TYPES: &[&str] = &["image/jpeg", "image/png", "image/gif", "image/webp"]; @@ -1091,45 +1089,9 @@ impl AgentLoop { } } - if let Some((tool_call, pending_result)) = response - .tool_calls - .iter() - .zip(tool_results.iter()) - .find(|(_, result)| result.state == ToolExecutionState::PendingUserAction) - { - // 从工具输出中提取有意义的终端内容 - // 跳过:标记行、session_id 元数据、空行、以及提示行(取提示行之后的实际内容) - let content: String = pending_result - .output - .lines() - .map(|l| l.trim()) - .filter(|line| { - !line.is_empty() - && !line.starts_with("__PICOBOT_") - && !line.starts_with("[session_id:") - }) - .skip(1) // 跳过第一行(提示行,如"进程正在等待输入...") - .take(20) // 最多取 20 行避免过长 - .collect::>() - .join("\n"); - let display_content = if content.is_empty() { - DEFAULT_PENDING_ASSISTANT_MESSAGE - } else { - &content - }; - let assistant_message = ChatMessage::assistant(format!( - "{}\n\n当前等待中的工具: {}", - display_content, tool_call.name, - )); - emitted_messages.push(assistant_message.clone()); - self.emit_live_tool_call_message(assistant_message.clone()).await; - return Ok(AgentProcessResult { - final_response: assistant_message, - emitted_messages, - }); - } - // Loop continues to next iteration with updated messages + // PendingUserAction 工具的结果已在上方加入 messages, + // 模型将在下一轮看到完整的终端输出并生成智能回复 #[cfg(debug_assertions)] tracing::debug!( iteration, diff --git a/src/tools/bash.rs b/src/tools/bash.rs index a4f5f44..0ac5361 100644 --- a/src/tools/bash.rs +++ b/src/tools/bash.rs @@ -182,12 +182,17 @@ impl BashTool { let session_line = session_id .map(|id| format!("[session_id: {}]\n", id)) .unwrap_or_default(); + let output_section = if output.trim().is_empty() { + "(进程尚未输出内容。进程正在等待输入,请使用 session_id 和 stdin_input 参数发送输入内容。)" + } else { + &self.truncate_output(output.trim()) + }; format!( "{}\n{}{}\n\n{}", PENDING_USER_ACTION_MARKER, session_line, hint, - self.truncate_output(output.trim()) + output_section ) } @@ -260,6 +265,38 @@ async fn drain_available_chunks( } } +/// 自适应 drain:循环读取直到输出稳定,确保进程的所有提示内容都被捕获 +/// +/// - 每次 drain 后等待 200ms 再检查是否有新数据 +/// - 最多循环 10 次(即最多等待 2 秒) +/// - 如果连续一次 drain 没有新数据,立即返回 +async fn drain_until_stable( + rx: &mut mpsc::UnboundedReceiver<(bool, String)>, + stdout_buf: &Arc>, + stderr_buf: &Arc>, +) { + const DRAIN_INTERVAL_MS: u64 = 200; + const MAX_DRAIN_ROUNDS: u32 = 10; + + for _ in 0..MAX_DRAIN_ROUNDS { + let prev_stdout_len = stdout_buf.lock().await.len(); + let prev_stderr_len = stderr_buf.lock().await.len(); + + drain_available_chunks(rx, stdout_buf, stderr_buf).await; + + let new_stdout_len = stdout_buf.lock().await.len(); + let new_stderr_len = stderr_buf.lock().await.len(); + + // 如果没有新数据,说明输出已稳定 + if new_stdout_len == prev_stdout_len && new_stderr_len == prev_stderr_len { + break; + } + + // 有新数据,等待后再次检查 + tokio::time::sleep(Duration::from_millis(DRAIN_INTERVAL_MS)).await; + } +} + impl Default for BashTool { fn default() -> Self { Self::new(Arc::new(ShellSessionManager::new())) @@ -472,25 +509,25 @@ impl BashTool { // Periodic safety net: check OS-level process state if let Some(pid) = child.id() { if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) { + // 自适应 drain,等待输出稳定 if let Some(rx_ref) = rx.as_mut() { - drain_available_chunks(rx_ref, &stdout_buf, &stderr_buf).await; + drain_until_stable(rx_ref, &stdout_buf, &stderr_buf).await; } let combined = format_command_output(&stdout_buf.lock().await, &stderr_buf.lock().await, None); - if !combined.trim().is_empty() { - if let Some(stdin) = child_stdin { - if let Some(rx_val) = rx.take() { - let session_id = self.session_manager.save_session( - child, stdin, rx_val, - stdout_buf.lock().await.clone(), - stderr_buf.lock().await.clone(), - ).await; - return Ok(self.pending_output(&combined, Some(&session_id))); - } + // 始终创建 session,即使输出为空(进程可能还没写出提示) + if let Some(stdin) = child_stdin { + if let Some(rx_val) = rx.take() { + let session_id = self.session_manager.save_session( + child, stdin, rx_val, + stdout_buf.lock().await.clone(), + stderr_buf.lock().await.clone(), + ).await; + return Ok(self.pending_output(&combined, Some(&session_id))); } - let _ = child.start_kill(); - let _ = child.wait().await; - return Ok(self.pending_output(&combined, None)); } + let _ = child.start_kill(); + let _ = child.wait().await; + return Ok(self.pending_output(&combined, None)); } } @@ -523,9 +560,12 @@ impl BashTool { // OS-level check: if blocked on stdin, save as session if let Some(pid) = child.id() { - if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) - && !combined.trim().is_empty() - { + if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) { + // 自适应 drain,等待输出稳定 + if let Some(rx_ref) = rx.as_mut() { + drain_until_stable(rx_ref, &stdout_buf, &stderr_buf).await; + } + let combined = format_command_output(&stdout_buf.lock().await, &stderr_buf.lock().await, None); if let Some(stdin) = child_stdin { if let Some(rx_val) = rx.take() { let session_id = self.session_manager.save_session(