diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index 21cd5a4..19f39a3 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -885,8 +885,13 @@ impl AgentLoop { let mut emitted_messages = Vec::new(); for iteration in 0..self.max_iterations { - #[cfg(debug_assertions)] - tracing::debug!(iteration, "Agent iteration started"); + tracing::info!( + iteration = iteration, + message_count = messages.len(), + emitted_count = emitted_messages.len(), + max_iterations = self.max_iterations, + "AgentLoop: iteration start" + ); // 检查取消信号 if let Some(ref token) = self.cancel_token { @@ -953,6 +958,8 @@ impl AgentLoop { .map(|message| chat_message_to_llm_message(message, &mut image_budget)), ); + let request_msg_count = messages_for_llm.len(); + let has_tools = tools.is_some(); let request = ChatCompletionRequest { messages: messages_for_llm, temperature: None, @@ -960,12 +967,31 @@ impl AgentLoop { tools, }; + tracing::info!( + iteration = iteration, + request_messages = request_msg_count, + has_tools = has_tools, + "AgentLoop: calling LLM" + ); + let llm_start = std::time::Instant::now(); let response = match (*self.provider).chat(request).await { - Ok(response) => response, + Ok(response) => { + let llm_ms = llm_start.elapsed().as_millis(); + tracing::info!( + iteration = iteration, + llm_ms = llm_ms, + response_len = response.content.len(), + tool_calls_count = response.tool_calls.len(), + "AgentLoop: LLM response received" + ); + response + } Err(e) => { + let llm_ms = llm_start.elapsed().as_millis(); tracing::error!( provider = %self.provider.name(), model = %self.provider.model_id(), + llm_ms = llm_ms, error = %e, error_details = %format_error_chain(e.as_ref()), "LLM request failed" @@ -1037,7 +1063,15 @@ impl AgentLoop { .await; // Execute tools and add results to messages + let tools_start = std::time::Instant::now(); let tool_results = self.execute_tools(&response.tool_calls).await; + let tools_ms = tools_start.elapsed().as_millis(); + tracing::info!( + iteration = iteration, + tool_count = response.tool_calls.len(), + tools_ms = tools_ms, + "AgentLoop: tools executed, emitting results" + ); for (tool_call, result) in response.tool_calls.iter().zip(tool_results.iter()) { // Truncate tool result if too large @@ -1116,15 +1150,21 @@ impl AgentLoop { } // Loop continues to next iteration with updated messages - #[cfg(debug_assertions)] - tracing::debug!( - iteration, + tracing::info!( + iteration = iteration, message_count = messages.len(), - "Tool execution complete, continuing to next iteration" + emitted_count = emitted_messages.len(), + "AgentLoop: tool results emitted, continuing to next iteration" ); } // Max iterations reached - ask LLM for a summary based on completed work + tracing::warn!( + max_iterations = self.max_iterations, + message_count = messages.len(), + emitted_count = emitted_messages.len(), + "AgentLoop: MAX ITERATIONS REACHED, forcing summary" + ); tracing::warn!("Max iterations reached, requesting final summary from LLM"); // Add a message asking for summary @@ -1211,15 +1251,56 @@ impl AgentLoop { } async fn emit_live_tool_call_message(&self, message: ChatMessage) { + let start = std::time::Instant::now(); + let content_len = message.content.len(); + let has_tool_calls = message.tool_calls.is_some(); + tracing::info!( + content_len = content_len, + has_tool_calls = has_tool_calls, + role = %message.role, + "AgentLoop: emit_live_tool_call_message START" + ); if let Some(handler) = &self.emitted_message_handler { handler.handle(message).await; } + let elapsed_ms = start.elapsed().as_millis(); + if elapsed_ms > 100 { + tracing::warn!( + elapsed_ms = elapsed_ms, + "AgentLoop: emit_live_tool_call_message SLOW (>100ms)" + ); + } else { + tracing::info!( + elapsed_ms = elapsed_ms, + "AgentLoop: emit_live_tool_call_message DONE" + ); + } } async fn emit_tool_result(&self, message: ChatMessage, duration_ms: Option) { + let start = std::time::Instant::now(); + let content_len = message.content.len(); + tracing::info!( + tool_name = %message.tool_name.as_deref().unwrap_or("unknown"), + content_len = content_len, + tool_duration_ms = duration_ms, + "AgentLoop: emit_tool_result START" + ); if let Some(handler) = &self.emitted_message_handler { handler.handle_tool_result(message, duration_ms).await; } + let elapsed_ms = start.elapsed().as_millis(); + if elapsed_ms > 100 { + tracing::warn!( + elapsed_ms = elapsed_ms, + "AgentLoop: emit_tool_result SLOW (>100ms)" + ); + } else { + tracing::info!( + elapsed_ms = elapsed_ms, + "AgentLoop: emit_tool_result DONE" + ); + } } /// Determine whether to execute tools in parallel or sequentially. diff --git a/src/bus/mod.rs b/src/bus/mod.rs index de0d507..022cbb8 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -54,12 +54,60 @@ impl MessageBus { /// Publish a message to the outbound queue pub async fn publish_outbound(&self, msg: OutboundMessage) -> Result<(), BusError> { + let event_kind = format!("{:?}", msg.event_kind); + let channel_name = msg.channel.clone(); + let chat_id = msg.chat_id.clone(); + let content_len = msg.content.len(); #[cfg(debug_assertions)] - tracing::debug!(channel = %msg.channel, chat_id = %msg.chat_id, content_len = %msg.content.len(), "Bus: publishing outbound message"); - self.outbound_tx - .send(msg) - .await - .map_err(|_| BusError::Closed) + tracing::debug!(channel = %channel_name, chat_id = %chat_id, content_len = content_len, "Bus: publishing outbound message"); + + // Try non-blocking send first for diagnostics + match self.outbound_tx.try_send(msg) { + Ok(()) => { + tracing::debug!( + event_kind = %event_kind, + channel = %channel_name, + chat_id = %chat_id, + "Bus: outbound sent immediately" + ); + return Ok(()); + } + Err(tokio::sync::mpsc::error::TrySendError::Full(msg)) => { + let msg_content_len = msg.content.len(); + tracing::warn!( + event_kind = %event_kind, + channel = %channel_name, + chat_id = %chat_id, + content_len = msg_content_len, + "Bus: outbound channel FULL, blocking until capacity available..." + ); + let start = std::time::Instant::now(); + let result = self.outbound_tx.send(msg).await; + let wait_ms = start.elapsed().as_millis(); + match &result { + Ok(()) => { + tracing::info!( + event_kind = %event_kind, + channel = %channel_name, + chat_id = %chat_id, + wait_ms = wait_ms, + "Bus: outbound sent after blocking (channel was full)" + ); + } + Err(_) => { + tracing::error!( + event_kind = %event_kind, + wait_ms = wait_ms, + "Bus: outbound channel closed while blocked" + ); + } + } + return result.map_err(|_| BusError::Closed); + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => { + return Err(BusError::Closed); + } + } } /// Consume an outbound message from the outbound queue. diff --git a/src/gateway/outbound_dispatcher.rs b/src/gateway/outbound_dispatcher.rs index 3f6c604..ad5cbf4 100644 --- a/src/gateway/outbound_dispatcher.rs +++ b/src/gateway/outbound_dispatcher.rs @@ -33,21 +33,33 @@ impl OutboundDispatcher { pub async fn run(&self) { tracing::info!("OutboundDispatcher started"); + let mut total_processed: u64 = 0; loop { + let consume_start = std::time::Instant::now(); let msg = match self.bus.consume_outbound().await { Some(msg) => msg, None => { - tracing::info!("Outbound bus closed, stopping dispatcher"); + tracing::info!(total_processed = total_processed, "Outbound bus closed, stopping dispatcher"); break; } }; - #[cfg(debug_assertions)] - tracing::debug!( - channel = %msg.channel, - chat_id = %msg.chat_id, - content_len = msg.content.len(), - "OutboundDispatcher received message" - ); + let consume_wait_ms = consume_start.elapsed().as_millis(); + let event_kind = format!("{:?}", msg.event_kind); + let channel_name = msg.channel.clone(); + let chat_id = msg.chat_id.clone(); + let _content_len = msg.content.len(); + total_processed += 1; + + if consume_wait_ms > 100 { + tracing::warn!( + event_kind = %event_kind, + channel = %channel_name, + chat_id = %chat_id, + total_processed = total_processed, + consume_wait_ms = consume_wait_ms, + "OutboundDispatcher: waited long for next message (bus may be draining slowly)" + ); + } // Skip messages with virtual scheduler chat IDs (e.g., "scheduler/job_id") // These are internal messages from SilentAgentTask that should not be sent externally @@ -65,8 +77,37 @@ impl OutboundDispatcher { match channel { Some(ch) => { - if let Err(error) = self.send_with_retry(&*ch, msg).await { - tracing::error!(channel = %channel_name, error = %error, "Failed to send message after retries"); + let send_start = std::time::Instant::now(); + let event_kind = format!("{:?}", msg.event_kind); + match self.send_with_retry(&*ch, msg).await { + Ok(()) => { + let send_ms = send_start.elapsed().as_millis(); + if send_ms > 500 { + tracing::warn!( + channel = %channel_name, + event_kind = %event_kind, + send_ms = send_ms, + "OutboundDispatcher: slow channel send" + ); + } else { + tracing::info!( + channel = %channel_name, + event_kind = %event_kind, + send_ms = send_ms, + "OutboundDispatcher: message sent to channel" + ); + } + } + Err(error) => { + let send_ms = send_start.elapsed().as_millis(); + tracing::error!( + channel = %channel_name, + event_kind = %event_kind, + send_ms = send_ms, + error = %error, + "OutboundDispatcher: failed to send after retries" + ); + } } } None => { diff --git a/src/gateway/session.rs b/src/gateway/session.rs index b21d5ae..7cb4066 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -76,16 +76,44 @@ impl BusToolCallEmitter { #[async_trait] impl EmittedMessageHandler for BusToolCallEmitter { async fn handle(&self, message: ChatMessage) { - for outbound in OutboundMessage::from_chat_message( + let outbounds = OutboundMessage::from_chat_message( &self.channel_name, &self.chat_id, None, // session_id None, &self.metadata, &message, - ) { + ); + let count = outbounds.len(); + tracing::info!( + channel = %self.channel_name, + chat_id = %self.chat_id, + msg_count = count, + msg_role = %message.role, + has_tool_calls = message.tool_calls.is_some(), + "BusToolCallEmitter::handle publishing messages" + ); + for (i, outbound) in outbounds.into_iter().enumerate() { + let event_kind = format!("{:?}", outbound.event_kind); + let content_len = outbound.content.len(); if let Err(error) = self.bus.publish_outbound(outbound).await { - tracing::error!(error = %error, channel = %self.channel_name, chat_id = %self.chat_id, "Failed to publish live outbound tool call"); + tracing::error!( + error = %error, + channel = %self.channel_name, + chat_id = %self.chat_id, + event_kind = %event_kind, + "Failed to publish live outbound tool call" + ); + } else { + tracing::info!( + channel = %self.channel_name, + chat_id = %self.chat_id, + event_kind = %event_kind, + idx = i, + total = count, + content_len = content_len, + "BusToolCallEmitter: published message to bus" + ); } } } @@ -95,16 +123,44 @@ impl EmittedMessageHandler for BusToolCallEmitter { if let Some(ms) = duration_ms { metadata.insert("tool_duration_ms".to_string(), ms.to_string()); } - for outbound in OutboundMessage::from_chat_message( + let outbounds = OutboundMessage::from_chat_message( &self.channel_name, &self.chat_id, None, // session_id None, &metadata, &message, - ) { + ); + let count = outbounds.len(); + tracing::info!( + channel = %self.channel_name, + chat_id = %self.chat_id, + msg_count = count, + tool_name = %message.tool_name.as_deref().unwrap_or("unknown"), + tool_state = ?message.tool_state, + content_len = message.content.len(), + duration_ms = duration_ms, + "BusToolCallEmitter::handle_tool_result publishing messages" + ); + for (i, outbound) in outbounds.into_iter().enumerate() { + let event_kind = format!("{:?}", outbound.event_kind); if let Err(error) = self.bus.publish_outbound(outbound).await { - tracing::error!(error = %error, channel = %self.channel_name, chat_id = %self.chat_id, "Failed to publish live outbound tool call"); + tracing::error!( + error = %error, + channel = %self.channel_name, + chat_id = %self.chat_id, + event_kind = %event_kind, + "Failed to publish live outbound tool result" + ); + } else { + tracing::info!( + channel = %self.channel_name, + chat_id = %self.chat_id, + event_kind = %event_kind, + idx = i, + total = count, + "BusToolCallEmitter: published tool result to bus" + ); } } }