From 988e77123c8e5cef965337aab560a67abfc48d0f Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Sat, 6 Jun 2026 10:41:52 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=B2=BE=E7=AE=80=E6=97=A5=E5=BF=97?= =?UTF-8?q?=E8=AE=B0=E5=BD=95=EF=BC=8C=E7=A7=BB=E9=99=A4=E5=86=97=E4=BD=99?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=EF=BC=8C=E4=BC=98=E5=8C=96=E8=B0=83=E8=AF=95?= =?UTF-8?q?=E8=BE=93=E5=87=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/agent/agent_loop.rs | 95 +++--------------------------- src/bus/mod.rs | 58 ++---------------- src/gateway/outbound_dispatcher.rs | 61 ++++--------------- src/gateway/session.rs | 68 ++------------------- 4 files changed, 28 insertions(+), 254 deletions(-) diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index 19f39a3..21cd5a4 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -885,13 +885,8 @@ impl AgentLoop { let mut emitted_messages = Vec::new(); for iteration in 0..self.max_iterations { - tracing::info!( - iteration = iteration, - message_count = messages.len(), - emitted_count = emitted_messages.len(), - max_iterations = self.max_iterations, - "AgentLoop: iteration start" - ); + #[cfg(debug_assertions)] + tracing::debug!(iteration, "Agent iteration started"); // 检查取消信号 if let Some(ref token) = self.cancel_token { @@ -958,8 +953,6 @@ impl AgentLoop { .map(|message| chat_message_to_llm_message(message, &mut image_budget)), ); - let request_msg_count = messages_for_llm.len(); - let has_tools = tools.is_some(); let request = ChatCompletionRequest { messages: messages_for_llm, temperature: None, @@ -967,31 +960,12 @@ impl AgentLoop { tools, }; - tracing::info!( - iteration = iteration, - request_messages = request_msg_count, - has_tools = has_tools, - "AgentLoop: calling LLM" - ); - let llm_start = std::time::Instant::now(); let response = match (*self.provider).chat(request).await { - Ok(response) => { - let llm_ms = llm_start.elapsed().as_millis(); - tracing::info!( - iteration = iteration, - llm_ms = llm_ms, - response_len = response.content.len(), - tool_calls_count = response.tool_calls.len(), - "AgentLoop: LLM response received" - ); - response - } + Ok(response) => response, Err(e) => { - let llm_ms = llm_start.elapsed().as_millis(); tracing::error!( provider = %self.provider.name(), model = %self.provider.model_id(), - llm_ms = llm_ms, error = %e, error_details = %format_error_chain(e.as_ref()), "LLM request failed" @@ -1063,15 +1037,7 @@ impl AgentLoop { .await; // Execute tools and add results to messages - let tools_start = std::time::Instant::now(); let tool_results = self.execute_tools(&response.tool_calls).await; - let tools_ms = tools_start.elapsed().as_millis(); - tracing::info!( - iteration = iteration, - tool_count = response.tool_calls.len(), - tools_ms = tools_ms, - "AgentLoop: tools executed, emitting results" - ); for (tool_call, result) in response.tool_calls.iter().zip(tool_results.iter()) { // Truncate tool result if too large @@ -1150,21 +1116,15 @@ impl AgentLoop { } // Loop continues to next iteration with updated messages - tracing::info!( - iteration = iteration, + #[cfg(debug_assertions)] + tracing::debug!( + iteration, message_count = messages.len(), - emitted_count = emitted_messages.len(), - "AgentLoop: tool results emitted, continuing to next iteration" + "Tool execution complete, continuing to next iteration" ); } // Max iterations reached - ask LLM for a summary based on completed work - tracing::warn!( - max_iterations = self.max_iterations, - message_count = messages.len(), - emitted_count = emitted_messages.len(), - "AgentLoop: MAX ITERATIONS REACHED, forcing summary" - ); tracing::warn!("Max iterations reached, requesting final summary from LLM"); // Add a message asking for summary @@ -1251,56 +1211,15 @@ impl AgentLoop { } async fn emit_live_tool_call_message(&self, message: ChatMessage) { - let start = std::time::Instant::now(); - let content_len = message.content.len(); - let has_tool_calls = message.tool_calls.is_some(); - tracing::info!( - content_len = content_len, - has_tool_calls = has_tool_calls, - role = %message.role, - "AgentLoop: emit_live_tool_call_message START" - ); if let Some(handler) = &self.emitted_message_handler { handler.handle(message).await; } - let elapsed_ms = start.elapsed().as_millis(); - if elapsed_ms > 100 { - tracing::warn!( - elapsed_ms = elapsed_ms, - "AgentLoop: emit_live_tool_call_message SLOW (>100ms)" - ); - } else { - tracing::info!( - elapsed_ms = elapsed_ms, - "AgentLoop: emit_live_tool_call_message DONE" - ); - } } async fn emit_tool_result(&self, message: ChatMessage, duration_ms: Option) { - let start = std::time::Instant::now(); - let content_len = message.content.len(); - tracing::info!( - tool_name = %message.tool_name.as_deref().unwrap_or("unknown"), - content_len = content_len, - tool_duration_ms = duration_ms, - "AgentLoop: emit_tool_result START" - ); if let Some(handler) = &self.emitted_message_handler { handler.handle_tool_result(message, duration_ms).await; } - let elapsed_ms = start.elapsed().as_millis(); - if elapsed_ms > 100 { - tracing::warn!( - elapsed_ms = elapsed_ms, - "AgentLoop: emit_tool_result SLOW (>100ms)" - ); - } else { - tracing::info!( - elapsed_ms = elapsed_ms, - "AgentLoop: emit_tool_result DONE" - ); - } } /// Determine whether to execute tools in parallel or sequentially. diff --git a/src/bus/mod.rs b/src/bus/mod.rs index 022cbb8..de0d507 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -54,60 +54,12 @@ impl MessageBus { /// Publish a message to the outbound queue pub async fn publish_outbound(&self, msg: OutboundMessage) -> Result<(), BusError> { - let event_kind = format!("{:?}", msg.event_kind); - let channel_name = msg.channel.clone(); - let chat_id = msg.chat_id.clone(); - let content_len = msg.content.len(); #[cfg(debug_assertions)] - tracing::debug!(channel = %channel_name, chat_id = %chat_id, content_len = content_len, "Bus: publishing outbound message"); - - // Try non-blocking send first for diagnostics - match self.outbound_tx.try_send(msg) { - Ok(()) => { - tracing::debug!( - event_kind = %event_kind, - channel = %channel_name, - chat_id = %chat_id, - "Bus: outbound sent immediately" - ); - return Ok(()); - } - Err(tokio::sync::mpsc::error::TrySendError::Full(msg)) => { - let msg_content_len = msg.content.len(); - tracing::warn!( - event_kind = %event_kind, - channel = %channel_name, - chat_id = %chat_id, - content_len = msg_content_len, - "Bus: outbound channel FULL, blocking until capacity available..." - ); - let start = std::time::Instant::now(); - let result = self.outbound_tx.send(msg).await; - let wait_ms = start.elapsed().as_millis(); - match &result { - Ok(()) => { - tracing::info!( - event_kind = %event_kind, - channel = %channel_name, - chat_id = %chat_id, - wait_ms = wait_ms, - "Bus: outbound sent after blocking (channel was full)" - ); - } - Err(_) => { - tracing::error!( - event_kind = %event_kind, - wait_ms = wait_ms, - "Bus: outbound channel closed while blocked" - ); - } - } - return result.map_err(|_| BusError::Closed); - } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { - return Err(BusError::Closed); - } - } + tracing::debug!(channel = %msg.channel, chat_id = %msg.chat_id, content_len = %msg.content.len(), "Bus: publishing outbound message"); + self.outbound_tx + .send(msg) + .await + .map_err(|_| BusError::Closed) } /// Consume an outbound message from the outbound queue. diff --git a/src/gateway/outbound_dispatcher.rs b/src/gateway/outbound_dispatcher.rs index ad5cbf4..3f6c604 100644 --- a/src/gateway/outbound_dispatcher.rs +++ b/src/gateway/outbound_dispatcher.rs @@ -33,33 +33,21 @@ impl OutboundDispatcher { pub async fn run(&self) { tracing::info!("OutboundDispatcher started"); - let mut total_processed: u64 = 0; loop { - let consume_start = std::time::Instant::now(); let msg = match self.bus.consume_outbound().await { Some(msg) => msg, None => { - tracing::info!(total_processed = total_processed, "Outbound bus closed, stopping dispatcher"); + tracing::info!("Outbound bus closed, stopping dispatcher"); break; } }; - let consume_wait_ms = consume_start.elapsed().as_millis(); - let event_kind = format!("{:?}", msg.event_kind); - let channel_name = msg.channel.clone(); - let chat_id = msg.chat_id.clone(); - let _content_len = msg.content.len(); - total_processed += 1; - - if consume_wait_ms > 100 { - tracing::warn!( - event_kind = %event_kind, - channel = %channel_name, - chat_id = %chat_id, - total_processed = total_processed, - consume_wait_ms = consume_wait_ms, - "OutboundDispatcher: waited long for next message (bus may be draining slowly)" - ); - } + #[cfg(debug_assertions)] + tracing::debug!( + channel = %msg.channel, + chat_id = %msg.chat_id, + content_len = msg.content.len(), + "OutboundDispatcher received message" + ); // Skip messages with virtual scheduler chat IDs (e.g., "scheduler/job_id") // These are internal messages from SilentAgentTask that should not be sent externally @@ -77,37 +65,8 @@ impl OutboundDispatcher { match channel { Some(ch) => { - let send_start = std::time::Instant::now(); - let event_kind = format!("{:?}", msg.event_kind); - match self.send_with_retry(&*ch, msg).await { - Ok(()) => { - let send_ms = send_start.elapsed().as_millis(); - if send_ms > 500 { - tracing::warn!( - channel = %channel_name, - event_kind = %event_kind, - send_ms = send_ms, - "OutboundDispatcher: slow channel send" - ); - } else { - tracing::info!( - channel = %channel_name, - event_kind = %event_kind, - send_ms = send_ms, - "OutboundDispatcher: message sent to channel" - ); - } - } - Err(error) => { - let send_ms = send_start.elapsed().as_millis(); - tracing::error!( - channel = %channel_name, - event_kind = %event_kind, - send_ms = send_ms, - error = %error, - "OutboundDispatcher: failed to send after retries" - ); - } + if let Err(error) = self.send_with_retry(&*ch, msg).await { + tracing::error!(channel = %channel_name, error = %error, "Failed to send message after retries"); } } None => { diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 7cb4066..b21d5ae 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -76,44 +76,16 @@ impl BusToolCallEmitter { #[async_trait] impl EmittedMessageHandler for BusToolCallEmitter { async fn handle(&self, message: ChatMessage) { - let outbounds = OutboundMessage::from_chat_message( + for outbound in OutboundMessage::from_chat_message( &self.channel_name, &self.chat_id, None, // session_id None, &self.metadata, &message, - ); - let count = outbounds.len(); - tracing::info!( - channel = %self.channel_name, - chat_id = %self.chat_id, - msg_count = count, - msg_role = %message.role, - has_tool_calls = message.tool_calls.is_some(), - "BusToolCallEmitter::handle publishing messages" - ); - for (i, outbound) in outbounds.into_iter().enumerate() { - let event_kind = format!("{:?}", outbound.event_kind); - let content_len = outbound.content.len(); + ) { if let Err(error) = self.bus.publish_outbound(outbound).await { - tracing::error!( - error = %error, - channel = %self.channel_name, - chat_id = %self.chat_id, - event_kind = %event_kind, - "Failed to publish live outbound tool call" - ); - } else { - tracing::info!( - channel = %self.channel_name, - chat_id = %self.chat_id, - event_kind = %event_kind, - idx = i, - total = count, - content_len = content_len, - "BusToolCallEmitter: published message to bus" - ); + tracing::error!(error = %error, channel = %self.channel_name, chat_id = %self.chat_id, "Failed to publish live outbound tool call"); } } } @@ -123,44 +95,16 @@ impl EmittedMessageHandler for BusToolCallEmitter { if let Some(ms) = duration_ms { metadata.insert("tool_duration_ms".to_string(), ms.to_string()); } - let outbounds = OutboundMessage::from_chat_message( + for outbound in OutboundMessage::from_chat_message( &self.channel_name, &self.chat_id, None, // session_id None, &metadata, &message, - ); - let count = outbounds.len(); - tracing::info!( - channel = %self.channel_name, - chat_id = %self.chat_id, - msg_count = count, - tool_name = %message.tool_name.as_deref().unwrap_or("unknown"), - tool_state = ?message.tool_state, - content_len = message.content.len(), - duration_ms = duration_ms, - "BusToolCallEmitter::handle_tool_result publishing messages" - ); - for (i, outbound) in outbounds.into_iter().enumerate() { - let event_kind = format!("{:?}", outbound.event_kind); + ) { if let Err(error) = self.bus.publish_outbound(outbound).await { - tracing::error!( - error = %error, - channel = %self.channel_name, - chat_id = %self.chat_id, - event_kind = %event_kind, - "Failed to publish live outbound tool result" - ); - } else { - tracing::info!( - channel = %self.channel_name, - chat_id = %self.chat_id, - event_kind = %event_kind, - idx = i, - total = count, - "BusToolCallEmitter: published tool result to bus" - ); + tracing::error!(error = %error, channel = %self.channel_name, chat_id = %self.chat_id, "Failed to publish live outbound tool call"); } } }