添加 tracing 日志支持,替换 println! 输出,增强错误处理和调试信息

This commit is contained in:
xiaoxixi 2026-04-06 23:11:41 +08:00
parent 09899ddb91
commit 4ed2f986a1
9 changed files with 71 additions and 33 deletions

View File

@ -19,3 +19,6 @@ futures-util = "0.3"
clap = { version = "4", features = ["derive"] } clap = { version = "4", features = ["derive"] }
dirs = "6.0.0" dirs = "6.0.0"
prost = "0.14" prost = "0.14"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-appender = "0.2"

View File

@ -29,6 +29,8 @@ impl AgentLoop {
}) })
.collect(); .collect();
tracing::debug!(history_len = self.history.len(), "Sending request to LLM");
let request = ChatCompletionRequest { let request = ChatCompletionRequest {
messages, messages,
temperature: None, temperature: None,
@ -37,7 +39,12 @@ impl AgentLoop {
}; };
let response = (*self.provider).chat(request).await let response = (*self.provider).chat(request).await
.map_err(|e| AgentError::LlmError(e.to_string()))?; .map_err(|e| {
tracing::error!(error = %e, "LLM request failed");
AgentError::LlmError(e.to_string())
})?;
tracing::debug!(response_len = response.content.len(), "LLM response received");
let assistant_message = ChatMessage::assistant(response.content); let assistant_message = ChatMessage::assistant(response.content);
self.history.push(assistant_message.clone()); self.history.push(assistant_message.clone());
@ -46,7 +53,9 @@ impl AgentLoop {
} }
pub fn clear_history(&mut self) { pub fn clear_history(&mut self) {
let len = self.history.len();
self.history.clear(); self.history.clear();
tracing::debug!(previous_len = len, "Chat history cleared");
} }
pub fn history(&self) -> &[ChatMessage] { pub fn history(&self) -> &[ChatMessage] {

View File

@ -264,7 +264,7 @@ impl FeishuChannel {
/// Handle incoming message - delegate to message handler and send response /// Handle incoming message - delegate to message handler and send response
async fn handle_message(&self, open_id: &str, chat_id: &str, content: &str) -> Result<(), ChannelError> { async fn handle_message(&self, open_id: &str, chat_id: &str, content: &str) -> Result<(), ChannelError> {
println!("Feishu: processing message from {} in chat {}: {}", open_id, chat_id, content); tracing::info!(open_id, chat_id, "Processing message from Feishu");
// Delegate to message handler (Gateway) // Delegate to message handler (Gateway)
let response = self.message_handler let response = self.message_handler
@ -277,7 +277,7 @@ impl FeishuChannel {
let receive_id_type = if chat_id.starts_with("oc_") { "chat_id" } else { "open_id" }; let receive_id_type = if chat_id.starts_with("oc_") { "chat_id" } else { "open_id" };
self.send_message(receive_id, receive_id_type, &response).await?; self.send_message(receive_id, receive_id_type, &response).await?;
println!("Feishu: sent response to {}", receive_id); tracing::info!(receive_id, "Sent response to Feishu");
Ok(()) Ok(())
} }
@ -363,14 +363,14 @@ impl FeishuChannel {
let (wss_url, client_config) = self.get_ws_endpoint(&self.http_client).await?; let (wss_url, client_config) = self.get_ws_endpoint(&self.http_client).await?;
let service_id = Self::extract_service_id(&wss_url); let service_id = Self::extract_service_id(&wss_url);
println!("Feishu: connecting to {}", wss_url); tracing::info!(url = %wss_url, "Connecting to Feishu WebSocket");
let (ws_stream, _) = tokio_tungstenite::connect_async(&wss_url) let (ws_stream, _) = tokio_tungstenite::connect_async(&wss_url)
.await .await
.map_err(|e| ChannelError::ConnectionError(format!("WebSocket connection failed: {}", e)))?; .map_err(|e| ChannelError::ConnectionError(format!("WebSocket connection failed: {}", e)))?;
*self.connected.write().await = true; *self.connected.write().await = true;
println!("Feishu channel connected"); tracing::info!("Feishu WebSocket connected");
let (mut write, mut read) = ws_stream.split(); let (mut write, mut read) = ws_stream.split();
@ -409,20 +409,20 @@ impl FeishuChannel {
Ok(Some(parsed)) => { Ok(Some(parsed)) => {
// Send ACK immediately (Feishu requires within 3 s) // Send ACK immediately (Feishu requires within 3 s)
if let Err(e) = Self::send_ack(&frame, &mut write).await { if let Err(e) = Self::send_ack(&frame, &mut write).await {
eprintln!("Error sending ack: {}", e); tracing::error!(error = %e, "Failed to send ACK to Feishu");
} }
// Then process message asynchronously (don't await) // Then process message asynchronously (don't await)
let channel = self.clone(); let channel = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = channel.handle_message(&parsed.open_id, &parsed.chat_id, &parsed.content).await { if let Err(e) = channel.handle_message(&parsed.open_id, &parsed.chat_id, &parsed.content).await {
eprintln!("Error handling message: {}", e); tracing::error!(error = %e, open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Failed to handle Feishu message");
} }
}); });
} }
Ok(None) => {} Ok(None) => {}
Err(e) => { Err(e) => {
eprintln!("Error handling frame: {}", e); tracing::warn!(error = %e, "Failed to parse Feishu frame");
} }
} }
} }
@ -442,10 +442,11 @@ impl FeishuChannel {
let _ = write.send(tokio_tungstenite::tungstenite::Message::Binary(pong.encode_to_vec().into())).await; let _ = write.send(tokio_tungstenite::tungstenite::Message::Binary(pong.encode_to_vec().into())).await;
} }
Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => { Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => {
tracing::debug!("Feishu WebSocket closed");
break; break;
} }
Some(Err(e)) => { Some(Err(e)) => {
eprintln!("WS error: {}", e); tracing::warn!(error = %e, "Feishu WebSocket error");
break; break;
} }
_ => {} _ => {}
@ -465,12 +466,12 @@ impl FeishuChannel {
payload: None, payload: None,
}; };
if write.send(tokio_tungstenite::tungstenite::Message::Binary(ping.encode_to_vec().into())).await.is_err() { if write.send(tokio_tungstenite::tungstenite::Message::Binary(ping.encode_to_vec().into())).await.is_err() {
eprintln!("Feishu: ping failed, reconnecting"); tracing::warn!("Feishu ping failed, reconnecting");
break; break;
} }
} }
_ = shutdown_rx.recv() => { _ = shutdown_rx.recv() => {
println!("Feishu channel shutdown signal received"); tracing::info!("Feishu channel shutdown signal received");
break; break;
} }
} }
@ -552,13 +553,13 @@ impl Channel for FeishuChannel {
let shutdown_rx = shutdown_tx.subscribe(); let shutdown_rx = shutdown_tx.subscribe();
match channel.run_ws_loop(shutdown_rx).await { match channel.run_ws_loop(shutdown_rx).await {
Ok(_) => { Ok(_) => {
println!("Feishu WebSocket disconnected"); tracing::info!("Feishu WebSocket disconnected");
} }
Err(e) => { Err(e) => {
consecutive_failures += 1; consecutive_failures += 1;
eprintln!("Feishu WebSocket error (attempt {}): {}", consecutive_failures, e); tracing::error!(attempt = consecutive_failures, error = %e, "Feishu WebSocket error");
if consecutive_failures >= max_failures { if consecutive_failures >= max_failures {
eprintln!("Feishu channel: max failures reached, stopping"); tracing::error!("Feishu channel: max failures reached, stopping");
break; break;
} }
} }
@ -568,15 +569,15 @@ impl Channel for FeishuChannel {
break; break;
} }
println!("Feishu channel retrying in 5s..."); tracing::info!("Feishu channel retrying in 5s...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
} }
*channel.running.write().await = false; *channel.running.write().await = false;
println!("Feishu channel stopped"); tracing::info!("Feishu channel stopped");
}); });
println!("Feishu channel started"); tracing::info!("Feishu channel started");
Ok(()) Ok(())
} }

View File

@ -52,9 +52,9 @@ impl ChannelManager {
.write() .write()
.await .await
.insert("feishu".to_string(), Arc::new(channel)); .insert("feishu".to_string(), Arc::new(channel));
println!("Feishu channel registered"); tracing::info!("Feishu channel registered");
} else { } else {
println!("Feishu channel disabled in config"); tracing::info!("Feishu channel disabled in config");
} }
} }
Ok(()) Ok(())
@ -63,9 +63,9 @@ impl ChannelManager {
pub async fn start_all(&self) -> Result<(), ChannelError> { pub async fn start_all(&self) -> Result<(), ChannelError> {
let channels = self.channels.read().await; let channels = self.channels.read().await;
for (name, channel) in channels.iter() { for (name, channel) in channels.iter() {
println!("Starting channel: {}", name); tracing::info!(channel = %name, "Starting channel");
if let Err(e) = channel.start().await { if let Err(e) = channel.start().await {
eprintln!("Warning: Failed to start channel {}: {}", name, e); tracing::error!(channel = %name, error = %e, "Failed to start channel");
} }
} }
Ok(()) Ok(())
@ -74,9 +74,9 @@ impl ChannelManager {
pub async fn stop_all(&self) -> Result<(), ChannelError> { pub async fn stop_all(&self) -> Result<(), ChannelError> {
let mut channels = self.channels.write().await; let mut channels = self.channels.write().await;
for (name, channel) in channels.iter() { for (name, channel) in channels.iter() {
println!("Stopping channel: {}", name); tracing::info!(channel = %name, "Stopping channel");
if let Err(e) = channel.stop().await { if let Err(e) = channel.stop().await {
eprintln!("Error stopping channel {}: {}", name, e); tracing::error!(channel = %name, error = %e, "Error stopping channel");
} }
} }
channels.clear(); channels.clear();

View File

@ -11,7 +11,7 @@ fn parse_message(raw: &str) -> Result<WsOutbound, serde_json::Error> {
pub async fn run(gateway_url: &str) -> Result<(), Box<dyn std::error::Error>> { pub async fn run(gateway_url: &str) -> Result<(), Box<dyn std::error::Error>> {
let (ws_stream, _) = connect_async(gateway_url).await?; let (ws_stream, _) = connect_async(gateway_url).await?;
println!("Connected to gateway"); tracing::info!(url = %gateway_url, "Connected to gateway");
let (mut sender, mut receiver) = ws_stream.split(); let (mut sender, mut receiver) = ws_stream.split();
@ -35,6 +35,7 @@ pub async fn run(gateway_url: &str) -> Result<(), Box<dyn std::error::Error>> {
input.write_output(&format!("Error: {}", message)).await?; input.write_output(&format!("Error: {}", message)).await?;
} }
WsOutbound::SessionEstablished { session_id } => { WsOutbound::SessionEstablished { session_id } => {
tracing::debug!(session_id = %session_id, "Session established");
input.write_output(&format!("Session: {}\n", session_id)).await?; input.write_output(&format!("Session: {}\n", session_id)).await?;
} }
_ => {} _ => {}
@ -42,6 +43,7 @@ pub async fn run(gateway_url: &str) -> Result<(), Box<dyn std::error::Error>> {
} }
} }
Some(Ok(Message::Close(_))) | None => { Some(Ok(Message::Close(_))) | None => {
tracing::info!("Gateway disconnected");
input.write_output("Gateway disconnected").await?; input.write_output("Gateway disconnected").await?;
break; break;
} }
@ -75,14 +77,14 @@ pub async fn run(gateway_url: &str) -> Result<(), Box<dyn std::error::Error>> {
}; };
if let Ok(text) = serialize_inbound(&inbound) { if let Ok(text) = serialize_inbound(&inbound) {
if sender.send(Message::Text(text.into())).await.is_err() { if sender.send(Message::Text(text.into())).await.is_err() {
eprintln!("Failed to send message"); tracing::error!("Failed to send message to gateway");
break; break;
} }
} }
} }
Ok(None) => break, Ok(None) => break,
Err(e) => { Err(e) => {
eprintln!("Input error: {}", e); tracing::error!(error = %e, "Input error");
break; break;
} }
} }

View File

@ -138,13 +138,13 @@ impl Config {
fn load_from(path: &Path) -> Result<Self, Box<dyn std::error::Error>> { fn load_from(path: &Path) -> Result<Self, Box<dyn std::error::Error>> {
load_env_file()?; load_env_file()?;
let content = if path.exists() { let content = if path.exists() {
println!("Config loaded from: {}", path.display()); tracing::info!(path = %path.display(), "Config loaded");
fs::read_to_string(path)? fs::read_to_string(path)?
} else { } else {
// Fallback to current directory // Fallback to current directory
let fallback = Path::new("config.json"); let fallback = Path::new("config.json");
if fallback.exists() { if fallback.exists() {
println!("Config loaded from: {}", fallback.display()); tracing::info!(path = %fallback.display(), "Config loaded from fallback path");
fs::read_to_string(fallback)? fs::read_to_string(fallback)?
} else { } else {
return Err(Box::new(ConfigError::ConfigNotFound( return Err(Box::new(ConfigError::ConfigNotFound(

View File

@ -64,7 +64,7 @@ pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn
let addr = format!("{}:{}", bind_host, bind_port); let addr = format!("{}:{}", bind_host, bind_port);
let listener = TcpListener::bind(&addr).await?; let listener = TcpListener::bind(&addr).await?;
println!("Gateway listening on {}", addr); tracing::info!(address = %addr, "Gateway listening");
// Graceful shutdown using oneshot channel // Graceful shutdown using oneshot channel
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
@ -73,7 +73,7 @@ pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn
// Spawn ctrl_c handler // Spawn ctrl_c handler
tokio::spawn(async move { tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok(); tokio::signal::ctrl_c().await.ok();
println!("Shutting down..."); tracing::info!("Shutdown signal received");
let _ = channel_manager.stop_all().await; let _ = channel_manager.stop_all().await;
let _ = shutdown_tx.send(()); let _ = shutdown_tx.send(());
}); });

View File

@ -36,8 +36,10 @@ impl Session {
/// 获取或创建指定 chat_id 的 AgentLoop /// 获取或创建指定 chat_id 的 AgentLoop
pub async fn get_or_create_agent(&mut self, chat_id: &str) -> Result<Arc<Mutex<AgentLoop>>, AgentError> { pub async fn get_or_create_agent(&mut self, chat_id: &str) -> Result<Arc<Mutex<AgentLoop>>, AgentError> {
if let Some(agent) = self.chat_agents.get(chat_id) { if let Some(agent) = self.chat_agents.get(chat_id) {
tracing::trace!(chat_id = %chat_id, "Reusing existing agent");
return Ok(agent.clone()); return Ok(agent.clone());
} }
tracing::debug!(chat_id = %chat_id, "Creating new agent for chat");
let agent = AgentLoop::new(self.provider_config.clone())?; let agent = AgentLoop::new(self.provider_config.clone())?;
let arc = Arc::new(Mutex::new(agent)); let arc = Arc::new(Mutex::new(agent));
self.chat_agents.insert(chat_id.to_string(), arc.clone()); self.chat_agents.insert(chat_id.to_string(), arc.clone());
@ -99,8 +101,15 @@ impl SessionManager {
let mut inner = self.inner.lock().await; let mut inner = self.inner.lock().await;
let should_recreate = if let Some(last_active) = inner.session_timestamps.get(channel_name) { let should_recreate = if let Some(last_active) = inner.session_timestamps.get(channel_name) {
last_active.elapsed() > inner.session_ttl let elapsed = last_active.elapsed();
if elapsed > inner.session_ttl {
tracing::info!(channel = %channel_name, elapsed_hours = elapsed.as_secs() / 3600, "Session expired, recreating");
true
} else {
false
}
} else { } else {
tracing::debug!(channel = %channel_name, "Creating new session");
true true
}; };
@ -140,6 +149,8 @@ impl SessionManager {
chat_id: &str, chat_id: &str,
content: &str, content: &str,
) -> Result<String, AgentError> { ) -> Result<String, AgentError> {
tracing::debug!(channel = %channel_name, chat_id = %chat_id, content_len = content.len(), "Routing message to agent");
// 确保 session 存在(可能需要重建) // 确保 session 存在(可能需要重建)
self.ensure_session(channel_name).await?; self.ensure_session(channel_name).await?;
@ -161,6 +172,8 @@ impl SessionManager {
let user_msg = ChatMessage::user(content); let user_msg = ChatMessage::user(content);
let response = agent.process(user_msg).await?; let response = agent.process(user_msg).await?;
tracing::debug!(channel = %channel_name, chat_id = %chat_id, response_len = response.content.len(), "Agent response received");
Ok(response.content) Ok(response.content)
} }

View File

@ -20,7 +20,7 @@ async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
let provider_config = match state.config.get_provider_config("default") { let provider_config = match state.config.get_provider_config("default") {
Ok(cfg) => cfg, Ok(cfg) => cfg,
Err(e) => { Err(e) => {
eprintln!("Failed to get provider config: {}", e); tracing::error!(error = %e, "Failed to get provider config");
return; return;
} }
}; };
@ -32,12 +32,13 @@ async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
let session = match Session::new(channel_name.clone(), provider_config, sender).await { let session = match Session::new(channel_name.clone(), provider_config, sender).await {
Ok(s) => Arc::new(Mutex::new(s)), Ok(s) => Arc::new(Mutex::new(s)),
Err(e) => { Err(e) => {
eprintln!("Failed to create session: {}", e); tracing::error!(error = %e, "Failed to create session");
return; return;
} }
}; };
let session_id = session.lock().await.id; let session_id = session.lock().await.id;
tracing::info!(session_id = %session_id, "CLI session established");
let _ = session.lock().await.send(WsOutbound::SessionEstablished { let _ = session.lock().await.send(WsOutbound::SessionEstablished {
session_id: session_id.to_string(), session_id: session_id.to_string(),
@ -46,10 +47,12 @@ async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
let (mut ws_sender, mut ws_receiver) = ws.split(); let (mut ws_sender, mut ws_receiver) = ws.split();
let mut receiver = receiver; let mut receiver = receiver;
let session_id_for_sender = session_id;
tokio::spawn(async move { tokio::spawn(async move {
while let Some(msg) = receiver.recv().await { while let Some(msg) = receiver.recv().await {
if let Ok(text) = serialize_outbound(&msg) { if let Ok(text) = serialize_outbound(&msg) {
if ws_sender.send(WsMessage::Text(text.into())).await.is_err() { if ws_sender.send(WsMessage::Text(text.into())).await.is_err() {
tracing::debug!(session_id = %session_id_for_sender, "WebSocket send error");
break; break;
} }
} }
@ -65,6 +68,7 @@ async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
handle_inbound(&session, inbound).await; handle_inbound(&session, inbound).await;
} }
Err(e) => { Err(e) => {
tracing::warn!(error = %e, "Failed to parse inbound message");
let _ = session.lock().await.send(WsOutbound::Error { let _ = session.lock().await.send(WsOutbound::Error {
code: "PARSE_ERROR".to_string(), code: "PARSE_ERROR".to_string(),
message: e.to_string(), message: e.to_string(),
@ -73,11 +77,14 @@ async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
} }
} }
Ok(WsMessage::Close(_)) | Err(_) => { Ok(WsMessage::Close(_)) | Err(_) => {
tracing::debug!(session_id = %session_id, "WebSocket closed");
break; break;
} }
_ => {} _ => {}
} }
} }
tracing::info!(session_id = %session_id, "CLI session ended");
} }
async fn handle_inbound(session: &Arc<Mutex<Session>>, inbound: WsInbound) { async fn handle_inbound(session: &Arc<Mutex<Session>>, inbound: WsInbound) {
@ -100,6 +107,7 @@ async fn handle_inbound(session: &Arc<Mutex<Session>>, inbound: WsInbound) {
let agent = match session_guard.get_or_create_agent(&chat_id).await { let agent = match session_guard.get_or_create_agent(&chat_id).await {
Ok(a) => a, Ok(a) => a,
Err(e) => { Err(e) => {
tracing::error!(chat_id = %chat_id, error = %e, "Failed to get or create agent");
let _ = session_guard.send(WsOutbound::Error { let _ = session_guard.send(WsOutbound::Error {
code: "AGENT_ERROR".to_string(), code: "AGENT_ERROR".to_string(),
message: e.to_string(), message: e.to_string(),
@ -112,6 +120,7 @@ async fn handle_inbound(session: &Arc<Mutex<Session>>, inbound: WsInbound) {
let mut agent = agent.lock().await; let mut agent = agent.lock().await;
match agent.process(user_msg).await { match agent.process(user_msg).await {
Ok(response) => { Ok(response) => {
tracing::debug!(chat_id = %chat_id, "Agent response sent");
let _ = session.lock().await.send(WsOutbound::AssistantResponse { let _ = session.lock().await.send(WsOutbound::AssistantResponse {
id: response.id, id: response.id,
content: response.content, content: response.content,
@ -119,6 +128,7 @@ async fn handle_inbound(session: &Arc<Mutex<Session>>, inbound: WsInbound) {
}).await; }).await;
} }
Err(e) => { Err(e) => {
tracing::error!(chat_id = %chat_id, error = %e, "Agent process error");
let _ = session.lock().await.send(WsOutbound::Error { let _ = session.lock().await.send(WsOutbound::Error {
code: "LLM_ERROR".to_string(), code: "LLM_ERROR".to_string(),
message: e.to_string(), message: e.to_string(),