feat: 增强日志记录,添加消息处理和工具执行的详细信息

This commit is contained in:
ooodc 2026-06-06 09:26:50 +08:00
parent bd13cffe14
commit fb90641774
4 changed files with 254 additions and 28 deletions

View File

@ -885,8 +885,13 @@ impl AgentLoop {
let mut emitted_messages = Vec::new(); let mut emitted_messages = Vec::new();
for iteration in 0..self.max_iterations { for iteration in 0..self.max_iterations {
#[cfg(debug_assertions)] tracing::info!(
tracing::debug!(iteration, "Agent iteration started"); iteration = iteration,
message_count = messages.len(),
emitted_count = emitted_messages.len(),
max_iterations = self.max_iterations,
"AgentLoop: iteration start"
);
// 检查取消信号 // 检查取消信号
if let Some(ref token) = self.cancel_token { if let Some(ref token) = self.cancel_token {
@ -953,6 +958,8 @@ impl AgentLoop {
.map(|message| chat_message_to_llm_message(message, &mut image_budget)), .map(|message| chat_message_to_llm_message(message, &mut image_budget)),
); );
let request_msg_count = messages_for_llm.len();
let has_tools = tools.is_some();
let request = ChatCompletionRequest { let request = ChatCompletionRequest {
messages: messages_for_llm, messages: messages_for_llm,
temperature: None, temperature: None,
@ -960,12 +967,31 @@ impl AgentLoop {
tools, tools,
}; };
tracing::info!(
iteration = iteration,
request_messages = request_msg_count,
has_tools = has_tools,
"AgentLoop: calling LLM"
);
let llm_start = std::time::Instant::now();
let response = match (*self.provider).chat(request).await { let response = match (*self.provider).chat(request).await {
Ok(response) => response, Ok(response) => {
let llm_ms = llm_start.elapsed().as_millis();
tracing::info!(
iteration = iteration,
llm_ms = llm_ms,
response_len = response.content.len(),
tool_calls_count = response.tool_calls.len(),
"AgentLoop: LLM response received"
);
response
}
Err(e) => { Err(e) => {
let llm_ms = llm_start.elapsed().as_millis();
tracing::error!( tracing::error!(
provider = %self.provider.name(), provider = %self.provider.name(),
model = %self.provider.model_id(), model = %self.provider.model_id(),
llm_ms = llm_ms,
error = %e, error = %e,
error_details = %format_error_chain(e.as_ref()), error_details = %format_error_chain(e.as_ref()),
"LLM request failed" "LLM request failed"
@ -1037,7 +1063,15 @@ impl AgentLoop {
.await; .await;
// Execute tools and add results to messages // Execute tools and add results to messages
let tools_start = std::time::Instant::now();
let tool_results = self.execute_tools(&response.tool_calls).await; let tool_results = self.execute_tools(&response.tool_calls).await;
let tools_ms = tools_start.elapsed().as_millis();
tracing::info!(
iteration = iteration,
tool_count = response.tool_calls.len(),
tools_ms = tools_ms,
"AgentLoop: tools executed, emitting results"
);
for (tool_call, result) in response.tool_calls.iter().zip(tool_results.iter()) { for (tool_call, result) in response.tool_calls.iter().zip(tool_results.iter()) {
// Truncate tool result if too large // Truncate tool result if too large
@ -1116,15 +1150,21 @@ impl AgentLoop {
} }
// Loop continues to next iteration with updated messages // Loop continues to next iteration with updated messages
#[cfg(debug_assertions)] tracing::info!(
tracing::debug!( iteration = iteration,
iteration,
message_count = messages.len(), message_count = messages.len(),
"Tool execution complete, continuing to next iteration" emitted_count = emitted_messages.len(),
"AgentLoop: tool results emitted, continuing to next iteration"
); );
} }
// Max iterations reached - ask LLM for a summary based on completed work // Max iterations reached - ask LLM for a summary based on completed work
tracing::warn!(
max_iterations = self.max_iterations,
message_count = messages.len(),
emitted_count = emitted_messages.len(),
"AgentLoop: MAX ITERATIONS REACHED, forcing summary"
);
tracing::warn!("Max iterations reached, requesting final summary from LLM"); tracing::warn!("Max iterations reached, requesting final summary from LLM");
// Add a message asking for summary // Add a message asking for summary
@ -1211,15 +1251,56 @@ impl AgentLoop {
} }
async fn emit_live_tool_call_message(&self, message: ChatMessage) { async fn emit_live_tool_call_message(&self, message: ChatMessage) {
let start = std::time::Instant::now();
let content_len = message.content.len();
let has_tool_calls = message.tool_calls.is_some();
tracing::info!(
content_len = content_len,
has_tool_calls = has_tool_calls,
role = %message.role,
"AgentLoop: emit_live_tool_call_message START"
);
if let Some(handler) = &self.emitted_message_handler { if let Some(handler) = &self.emitted_message_handler {
handler.handle(message).await; handler.handle(message).await;
} }
let elapsed_ms = start.elapsed().as_millis();
if elapsed_ms > 100 {
tracing::warn!(
elapsed_ms = elapsed_ms,
"AgentLoop: emit_live_tool_call_message SLOW (>100ms)"
);
} else {
tracing::info!(
elapsed_ms = elapsed_ms,
"AgentLoop: emit_live_tool_call_message DONE"
);
}
} }
async fn emit_tool_result(&self, message: ChatMessage, duration_ms: Option<u64>) { async fn emit_tool_result(&self, message: ChatMessage, duration_ms: Option<u64>) {
let start = std::time::Instant::now();
let content_len = message.content.len();
tracing::info!(
tool_name = %message.tool_name.as_deref().unwrap_or("unknown"),
content_len = content_len,
tool_duration_ms = duration_ms,
"AgentLoop: emit_tool_result START"
);
if let Some(handler) = &self.emitted_message_handler { if let Some(handler) = &self.emitted_message_handler {
handler.handle_tool_result(message, duration_ms).await; handler.handle_tool_result(message, duration_ms).await;
} }
let elapsed_ms = start.elapsed().as_millis();
if elapsed_ms > 100 {
tracing::warn!(
elapsed_ms = elapsed_ms,
"AgentLoop: emit_tool_result SLOW (>100ms)"
);
} else {
tracing::info!(
elapsed_ms = elapsed_ms,
"AgentLoop: emit_tool_result DONE"
);
}
} }
/// Determine whether to execute tools in parallel or sequentially. /// Determine whether to execute tools in parallel or sequentially.

View File

@ -54,12 +54,60 @@ impl MessageBus {
/// Publish a message to the outbound queue /// Publish a message to the outbound queue
pub async fn publish_outbound(&self, msg: OutboundMessage) -> Result<(), BusError> { pub async fn publish_outbound(&self, msg: OutboundMessage) -> Result<(), BusError> {
let event_kind = format!("{:?}", msg.event_kind);
let channel_name = msg.channel.clone();
let chat_id = msg.chat_id.clone();
let content_len = msg.content.len();
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::debug!(channel = %msg.channel, chat_id = %msg.chat_id, content_len = %msg.content.len(), "Bus: publishing outbound message"); tracing::debug!(channel = %channel_name, chat_id = %chat_id, content_len = content_len, "Bus: publishing outbound message");
self.outbound_tx
.send(msg) // Try non-blocking send first for diagnostics
.await match self.outbound_tx.try_send(msg) {
.map_err(|_| BusError::Closed) Ok(()) => {
tracing::debug!(
event_kind = %event_kind,
channel = %channel_name,
chat_id = %chat_id,
"Bus: outbound sent immediately"
);
return Ok(());
}
Err(tokio::sync::mpsc::error::TrySendError::Full(msg)) => {
let msg_content_len = msg.content.len();
tracing::warn!(
event_kind = %event_kind,
channel = %channel_name,
chat_id = %chat_id,
content_len = msg_content_len,
"Bus: outbound channel FULL, blocking until capacity available..."
);
let start = std::time::Instant::now();
let result = self.outbound_tx.send(msg).await;
let wait_ms = start.elapsed().as_millis();
match &result {
Ok(()) => {
tracing::info!(
event_kind = %event_kind,
channel = %channel_name,
chat_id = %chat_id,
wait_ms = wait_ms,
"Bus: outbound sent after blocking (channel was full)"
);
}
Err(_) => {
tracing::error!(
event_kind = %event_kind,
wait_ms = wait_ms,
"Bus: outbound channel closed while blocked"
);
}
}
return result.map_err(|_| BusError::Closed);
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => {
return Err(BusError::Closed);
}
}
} }
/// Consume an outbound message from the outbound queue. /// Consume an outbound message from the outbound queue.

View File

@ -33,21 +33,33 @@ impl OutboundDispatcher {
pub async fn run(&self) { pub async fn run(&self) {
tracing::info!("OutboundDispatcher started"); tracing::info!("OutboundDispatcher started");
let mut total_processed: u64 = 0;
loop { loop {
let consume_start = std::time::Instant::now();
let msg = match self.bus.consume_outbound().await { let msg = match self.bus.consume_outbound().await {
Some(msg) => msg, Some(msg) => msg,
None => { None => {
tracing::info!("Outbound bus closed, stopping dispatcher"); tracing::info!(total_processed = total_processed, "Outbound bus closed, stopping dispatcher");
break; break;
} }
}; };
#[cfg(debug_assertions)] let consume_wait_ms = consume_start.elapsed().as_millis();
tracing::debug!( let event_kind = format!("{:?}", msg.event_kind);
channel = %msg.channel, let channel_name = msg.channel.clone();
chat_id = %msg.chat_id, let chat_id = msg.chat_id.clone();
content_len = msg.content.len(), let _content_len = msg.content.len();
"OutboundDispatcher received message" total_processed += 1;
);
if consume_wait_ms > 100 {
tracing::warn!(
event_kind = %event_kind,
channel = %channel_name,
chat_id = %chat_id,
total_processed = total_processed,
consume_wait_ms = consume_wait_ms,
"OutboundDispatcher: waited long for next message (bus may be draining slowly)"
);
}
// Skip messages with virtual scheduler chat IDs (e.g., "scheduler/job_id") // Skip messages with virtual scheduler chat IDs (e.g., "scheduler/job_id")
// These are internal messages from SilentAgentTask that should not be sent externally // These are internal messages from SilentAgentTask that should not be sent externally
@ -65,8 +77,37 @@ impl OutboundDispatcher {
match channel { match channel {
Some(ch) => { Some(ch) => {
if let Err(error) = self.send_with_retry(&*ch, msg).await { let send_start = std::time::Instant::now();
tracing::error!(channel = %channel_name, error = %error, "Failed to send message after retries"); let event_kind = format!("{:?}", msg.event_kind);
match self.send_with_retry(&*ch, msg).await {
Ok(()) => {
let send_ms = send_start.elapsed().as_millis();
if send_ms > 500 {
tracing::warn!(
channel = %channel_name,
event_kind = %event_kind,
send_ms = send_ms,
"OutboundDispatcher: slow channel send"
);
} else {
tracing::info!(
channel = %channel_name,
event_kind = %event_kind,
send_ms = send_ms,
"OutboundDispatcher: message sent to channel"
);
}
}
Err(error) => {
let send_ms = send_start.elapsed().as_millis();
tracing::error!(
channel = %channel_name,
event_kind = %event_kind,
send_ms = send_ms,
error = %error,
"OutboundDispatcher: failed to send after retries"
);
}
} }
} }
None => { None => {

View File

@ -76,16 +76,44 @@ impl BusToolCallEmitter {
#[async_trait] #[async_trait]
impl EmittedMessageHandler for BusToolCallEmitter { impl EmittedMessageHandler for BusToolCallEmitter {
async fn handle(&self, message: ChatMessage) { async fn handle(&self, message: ChatMessage) {
for outbound in OutboundMessage::from_chat_message( let outbounds = OutboundMessage::from_chat_message(
&self.channel_name, &self.channel_name,
&self.chat_id, &self.chat_id,
None, // session_id None, // session_id
None, None,
&self.metadata, &self.metadata,
&message, &message,
) { );
let count = outbounds.len();
tracing::info!(
channel = %self.channel_name,
chat_id = %self.chat_id,
msg_count = count,
msg_role = %message.role,
has_tool_calls = message.tool_calls.is_some(),
"BusToolCallEmitter::handle publishing messages"
);
for (i, outbound) in outbounds.into_iter().enumerate() {
let event_kind = format!("{:?}", outbound.event_kind);
let content_len = outbound.content.len();
if let Err(error) = self.bus.publish_outbound(outbound).await { if let Err(error) = self.bus.publish_outbound(outbound).await {
tracing::error!(error = %error, channel = %self.channel_name, chat_id = %self.chat_id, "Failed to publish live outbound tool call"); tracing::error!(
error = %error,
channel = %self.channel_name,
chat_id = %self.chat_id,
event_kind = %event_kind,
"Failed to publish live outbound tool call"
);
} else {
tracing::info!(
channel = %self.channel_name,
chat_id = %self.chat_id,
event_kind = %event_kind,
idx = i,
total = count,
content_len = content_len,
"BusToolCallEmitter: published message to bus"
);
} }
} }
} }
@ -95,16 +123,44 @@ impl EmittedMessageHandler for BusToolCallEmitter {
if let Some(ms) = duration_ms { if let Some(ms) = duration_ms {
metadata.insert("tool_duration_ms".to_string(), ms.to_string()); metadata.insert("tool_duration_ms".to_string(), ms.to_string());
} }
for outbound in OutboundMessage::from_chat_message( let outbounds = OutboundMessage::from_chat_message(
&self.channel_name, &self.channel_name,
&self.chat_id, &self.chat_id,
None, // session_id None, // session_id
None, None,
&metadata, &metadata,
&message, &message,
) { );
let count = outbounds.len();
tracing::info!(
channel = %self.channel_name,
chat_id = %self.chat_id,
msg_count = count,
tool_name = %message.tool_name.as_deref().unwrap_or("unknown"),
tool_state = ?message.tool_state,
content_len = message.content.len(),
duration_ms = duration_ms,
"BusToolCallEmitter::handle_tool_result publishing messages"
);
for (i, outbound) in outbounds.into_iter().enumerate() {
let event_kind = format!("{:?}", outbound.event_kind);
if let Err(error) = self.bus.publish_outbound(outbound).await { if let Err(error) = self.bus.publish_outbound(outbound).await {
tracing::error!(error = %error, channel = %self.channel_name, chat_id = %self.chat_id, "Failed to publish live outbound tool call"); tracing::error!(
error = %error,
channel = %self.channel_name,
chat_id = %self.chat_id,
event_kind = %event_kind,
"Failed to publish live outbound tool result"
);
} else {
tracing::info!(
channel = %self.channel_name,
chat_id = %self.chat_id,
event_kind = %event_kind,
idx = i,
total = count,
"BusToolCallEmitter: published tool result to bus"
);
} }
} }
} }