refactor(tools): 优化交互式进程输出捕获逻辑

- 删除了 PendingUserAction 相关的冗余辅助消息发送代码
- 引入自适应 drain_until_stable 函数循环读取输出直到稳定
- 用 drain_until_stable 替代固定延时等待以捕获最终提示内容
- 确保进程等待 stdin 时完整且及时地捕获所有输出数据
- 移除过时的常量和注释,简化代码逻辑
- 保持对最大循环次数和间隔时间的限制防止死循环
This commit is contained in:
ooodc 2026-06-13 14:56:44 +08:00
parent 4598370fd2
commit 93945b626c
2 changed files with 38 additions and 48 deletions

View File

@ -20,8 +20,6 @@ use std::time::Instant;
/// Minimum characters to keep when truncating /// Minimum characters to keep when truncating
const TRUNCATION_SUFFIX_LEN: usize = 200; const TRUNCATION_SUFFIX_LEN: usize = 200;
const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__"; const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__";
const DEFAULT_PENDING_ASSISTANT_MESSAGE: &str =
"工具已经启动并进入等待用户操作的状态。请先完成外部操作,完成后直接告诉我继续。";
const RECOVERABLE_LLM_ERROR_MESSAGE: &str = "模型服务暂时不可用或响应超时。请稍后重试。"; const RECOVERABLE_LLM_ERROR_MESSAGE: &str = "模型服务暂时不可用或响应超时。请稍后重试。";
const SUPPORTED_IMAGE_MIME_TYPES: &[&str] = &["image/jpeg", "image/png", "image/gif", "image/webp"]; const SUPPORTED_IMAGE_MIME_TYPES: &[&str] = &["image/jpeg", "image/png", "image/gif", "image/webp"];
@ -1091,45 +1089,9 @@ impl AgentLoop {
} }
} }
if let Some((tool_call, pending_result)) = response
.tool_calls
.iter()
.zip(tool_results.iter())
.find(|(_, result)| result.state == ToolExecutionState::PendingUserAction)
{
// 从工具输出中提取有意义的终端内容
// 跳过标记行、session_id 元数据、空行、以及提示行(取提示行之后的实际内容)
let content: String = pending_result
.output
.lines()
.map(|l| l.trim())
.filter(|line| {
!line.is_empty()
&& !line.starts_with("__PICOBOT_")
&& !line.starts_with("[session_id:")
})
.skip(1) // 跳过第一行(提示行,如"进程正在等待输入..."
.take(20) // 最多取 20 行避免过长
.collect::<Vec<_>>()
.join("\n");
let display_content = if content.is_empty() {
DEFAULT_PENDING_ASSISTANT_MESSAGE
} else {
&content
};
let assistant_message = ChatMessage::assistant(format!(
"{}\n\n当前等待中的工具: {}",
display_content, tool_call.name,
));
emitted_messages.push(assistant_message.clone());
self.emit_live_tool_call_message(assistant_message.clone()).await;
return Ok(AgentProcessResult {
final_response: assistant_message,
emitted_messages,
});
}
// Loop continues to next iteration with updated messages // Loop continues to next iteration with updated messages
// PendingUserAction 工具的结果已在上方加入 messages
// 模型将在下一轮看到完整的终端输出并生成智能回复
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::debug!( tracing::debug!(
iteration, iteration,

View File

@ -18,8 +18,6 @@ use crate::tools::{extract_u64, extract_bool, check_null_args};
const MAX_TIMEOUT_SECS: u64 = 600; const MAX_TIMEOUT_SECS: u64 = 600;
const MAX_OUTPUT_CHARS: usize = 50_000; const MAX_OUTPUT_CHARS: usize = 50_000;
const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__"; const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__";
/// 检测到 stdin 等待后,给 read_stream 任务将最后的提示内容传入 channel 的时间
const STDIN_FLUSH_MS: u64 = 500;
const INTERACTIVE_HINT: &str = const INTERACTIVE_HINT: &str =
"进程正在等待输入。请使用 session_id 和 stdin_input 参数回复交互内容。"; "进程正在等待输入。请使用 session_id 和 stdin_input 参数回复交互内容。";
const NON_INTERACTIVE_HINT: &str = const NON_INTERACTIVE_HINT: &str =
@ -267,6 +265,38 @@ async fn drain_available_chunks(
} }
} }
/// 自适应 drain循环读取直到输出稳定确保进程的所有提示内容都被捕获
///
/// - 每次 drain 后等待 200ms 再检查是否有新数据
/// - 最多循环 10 次(即最多等待 2 秒)
/// - 如果连续一次 drain 没有新数据,立即返回
async fn drain_until_stable(
rx: &mut mpsc::UnboundedReceiver<(bool, String)>,
stdout_buf: &Arc<Mutex<String>>,
stderr_buf: &Arc<Mutex<String>>,
) {
const DRAIN_INTERVAL_MS: u64 = 200;
const MAX_DRAIN_ROUNDS: u32 = 10;
for _ in 0..MAX_DRAIN_ROUNDS {
let prev_stdout_len = stdout_buf.lock().await.len();
let prev_stderr_len = stderr_buf.lock().await.len();
drain_available_chunks(rx, stdout_buf, stderr_buf).await;
let new_stdout_len = stdout_buf.lock().await.len();
let new_stderr_len = stderr_buf.lock().await.len();
// 如果没有新数据,说明输出已稳定
if new_stdout_len == prev_stdout_len && new_stderr_len == prev_stderr_len {
break;
}
// 有新数据,等待后再次检查
tokio::time::sleep(Duration::from_millis(DRAIN_INTERVAL_MS)).await;
}
}
impl Default for BashTool { impl Default for BashTool {
fn default() -> Self { fn default() -> Self {
Self::new(Arc::new(ShellSessionManager::new())) Self::new(Arc::new(ShellSessionManager::new()))
@ -479,10 +509,9 @@ impl BashTool {
// Periodic safety net: check OS-level process state // Periodic safety net: check OS-level process state
if let Some(pid) = child.id() { if let Some(pid) = child.id() {
if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) { if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) {
// 给 read_stream 任务时间将最后的提示内容传入 channel // 自适应 drain等待输出稳定
tokio::time::sleep(Duration::from_millis(STDIN_FLUSH_MS)).await;
if let Some(rx_ref) = rx.as_mut() { if let Some(rx_ref) = rx.as_mut() {
drain_available_chunks(rx_ref, &stdout_buf, &stderr_buf).await; drain_until_stable(rx_ref, &stdout_buf, &stderr_buf).await;
} }
let combined = format_command_output(&stdout_buf.lock().await, &stderr_buf.lock().await, None); let combined = format_command_output(&stdout_buf.lock().await, &stderr_buf.lock().await, None);
// 始终创建 session即使输出为空进程可能还没写出提示 // 始终创建 session即使输出为空进程可能还没写出提示
@ -532,10 +561,9 @@ impl BashTool {
// OS-level check: if blocked on stdin, save as session // OS-level check: if blocked on stdin, save as session
if let Some(pid) = child.id() { if let Some(pid) = child.id() {
if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) { if crate::platform::is_process_waiting_on_stdin(pid) == Some(true) {
// 给 read_stream 任务时间将最后的提示内容传入 channel // 自适应 drain等待输出稳定
tokio::time::sleep(Duration::from_millis(STDIN_FLUSH_MS)).await;
if let Some(rx_ref) = rx.as_mut() { if let Some(rx_ref) = rx.as_mut() {
drain_available_chunks(rx_ref, &stdout_buf, &stderr_buf).await; drain_until_stable(rx_ref, &stdout_buf, &stderr_buf).await;
} }
let combined = format_command_output(&stdout_buf.lock().await, &stderr_buf.lock().await, None); let combined = format_command_output(&stdout_buf.lock().await, &stderr_buf.lock().await, None);
if let Some(stdin) = child_stdin { if let Some(stdin) = child_stdin {