diff --git a/Cargo.toml b/Cargo.toml index af5c027..d6633e5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,3 +19,6 @@ futures-util = "0.3" clap = { version = "4", features = ["derive"] } dirs = "6.0.0" prost = "0.14" +tracing = "0.1" +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } +tracing-appender = "0.2" diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index ee2134b..4555cc3 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -29,6 +29,8 @@ impl AgentLoop { }) .collect(); + tracing::debug!(history_len = self.history.len(), "Sending request to LLM"); + let request = ChatCompletionRequest { messages, temperature: None, @@ -37,7 +39,12 @@ impl AgentLoop { }; let response = (*self.provider).chat(request).await - .map_err(|e| AgentError::LlmError(e.to_string()))?; + .map_err(|e| { + tracing::error!(error = %e, "LLM request failed"); + AgentError::LlmError(e.to_string()) + })?; + + tracing::debug!(response_len = response.content.len(), "LLM response received"); let assistant_message = ChatMessage::assistant(response.content); self.history.push(assistant_message.clone()); @@ -46,7 +53,9 @@ impl AgentLoop { } pub fn clear_history(&mut self) { + let len = self.history.len(); self.history.clear(); + tracing::debug!(previous_len = len, "Chat history cleared"); } pub fn history(&self) -> &[ChatMessage] { diff --git a/src/channels/feishu.rs b/src/channels/feishu.rs index e8ec13b..c982acb 100644 --- a/src/channels/feishu.rs +++ b/src/channels/feishu.rs @@ -264,7 +264,7 @@ impl FeishuChannel { /// Handle incoming message - delegate to message handler and send response async fn handle_message(&self, open_id: &str, chat_id: &str, content: &str) -> Result<(), ChannelError> { - println!("Feishu: processing message from {} in chat {}: {}", open_id, chat_id, content); + tracing::info!(open_id, chat_id, "Processing message from Feishu"); // Delegate to message handler (Gateway) let response = self.message_handler @@ -277,7 +277,7 @@ impl FeishuChannel { let receive_id_type = if chat_id.starts_with("oc_") { "chat_id" } else { "open_id" }; self.send_message(receive_id, receive_id_type, &response).await?; - println!("Feishu: sent response to {}", receive_id); + tracing::info!(receive_id, "Sent response to Feishu"); Ok(()) } @@ -363,14 +363,14 @@ impl FeishuChannel { let (wss_url, client_config) = self.get_ws_endpoint(&self.http_client).await?; let service_id = Self::extract_service_id(&wss_url); - println!("Feishu: connecting to {}", wss_url); + tracing::info!(url = %wss_url, "Connecting to Feishu WebSocket"); let (ws_stream, _) = tokio_tungstenite::connect_async(&wss_url) .await .map_err(|e| ChannelError::ConnectionError(format!("WebSocket connection failed: {}", e)))?; *self.connected.write().await = true; - println!("Feishu channel connected"); + tracing::info!("Feishu WebSocket connected"); let (mut write, mut read) = ws_stream.split(); @@ -409,20 +409,20 @@ impl FeishuChannel { Ok(Some(parsed)) => { // Send ACK immediately (Feishu requires within 3 s) if let Err(e) = Self::send_ack(&frame, &mut write).await { - eprintln!("Error sending ack: {}", e); + tracing::error!(error = %e, "Failed to send ACK to Feishu"); } // Then process message asynchronously (don't await) let channel = self.clone(); tokio::spawn(async move { if let Err(e) = channel.handle_message(&parsed.open_id, &parsed.chat_id, &parsed.content).await { - eprintln!("Error handling message: {}", e); + tracing::error!(error = %e, open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Failed to handle Feishu message"); } }); } Ok(None) => {} Err(e) => { - eprintln!("Error handling frame: {}", e); + tracing::warn!(error = %e, "Failed to parse Feishu frame"); } } } @@ -442,10 +442,11 @@ impl FeishuChannel { let _ = write.send(tokio_tungstenite::tungstenite::Message::Binary(pong.encode_to_vec().into())).await; } Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => { + tracing::debug!("Feishu WebSocket closed"); break; } Some(Err(e)) => { - eprintln!("WS error: {}", e); + tracing::warn!(error = %e, "Feishu WebSocket error"); break; } _ => {} @@ -465,12 +466,12 @@ impl FeishuChannel { payload: None, }; if write.send(tokio_tungstenite::tungstenite::Message::Binary(ping.encode_to_vec().into())).await.is_err() { - eprintln!("Feishu: ping failed, reconnecting"); + tracing::warn!("Feishu ping failed, reconnecting"); break; } } _ = shutdown_rx.recv() => { - println!("Feishu channel shutdown signal received"); + tracing::info!("Feishu channel shutdown signal received"); break; } } @@ -552,13 +553,13 @@ impl Channel for FeishuChannel { let shutdown_rx = shutdown_tx.subscribe(); match channel.run_ws_loop(shutdown_rx).await { Ok(_) => { - println!("Feishu WebSocket disconnected"); + tracing::info!("Feishu WebSocket disconnected"); } Err(e) => { consecutive_failures += 1; - eprintln!("Feishu WebSocket error (attempt {}): {}", consecutive_failures, e); + tracing::error!(attempt = consecutive_failures, error = %e, "Feishu WebSocket error"); if consecutive_failures >= max_failures { - eprintln!("Feishu channel: max failures reached, stopping"); + tracing::error!("Feishu channel: max failures reached, stopping"); break; } } @@ -568,15 +569,15 @@ impl Channel for FeishuChannel { break; } - println!("Feishu channel retrying in 5s..."); + tracing::info!("Feishu channel retrying in 5s..."); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } *channel.running.write().await = false; - println!("Feishu channel stopped"); + tracing::info!("Feishu channel stopped"); }); - println!("Feishu channel started"); + tracing::info!("Feishu channel started"); Ok(()) } diff --git a/src/channels/manager.rs b/src/channels/manager.rs index 2369d21..6b1a0ec 100644 --- a/src/channels/manager.rs +++ b/src/channels/manager.rs @@ -52,9 +52,9 @@ impl ChannelManager { .write() .await .insert("feishu".to_string(), Arc::new(channel)); - println!("Feishu channel registered"); + tracing::info!("Feishu channel registered"); } else { - println!("Feishu channel disabled in config"); + tracing::info!("Feishu channel disabled in config"); } } Ok(()) @@ -63,9 +63,9 @@ impl ChannelManager { pub async fn start_all(&self) -> Result<(), ChannelError> { let channels = self.channels.read().await; for (name, channel) in channels.iter() { - println!("Starting channel: {}", name); + tracing::info!(channel = %name, "Starting channel"); if let Err(e) = channel.start().await { - eprintln!("Warning: Failed to start channel {}: {}", name, e); + tracing::error!(channel = %name, error = %e, "Failed to start channel"); } } Ok(()) @@ -74,9 +74,9 @@ impl ChannelManager { pub async fn stop_all(&self) -> Result<(), ChannelError> { let mut channels = self.channels.write().await; for (name, channel) in channels.iter() { - println!("Stopping channel: {}", name); + tracing::info!(channel = %name, "Stopping channel"); if let Err(e) = channel.stop().await { - eprintln!("Error stopping channel {}: {}", name, e); + tracing::error!(channel = %name, error = %e, "Error stopping channel"); } } channels.clear(); diff --git a/src/client/mod.rs b/src/client/mod.rs index 64d2b44..29998d2 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -11,7 +11,7 @@ fn parse_message(raw: &str) -> Result { pub async fn run(gateway_url: &str) -> Result<(), Box> { let (ws_stream, _) = connect_async(gateway_url).await?; - println!("Connected to gateway"); + tracing::info!(url = %gateway_url, "Connected to gateway"); let (mut sender, mut receiver) = ws_stream.split(); @@ -35,6 +35,7 @@ pub async fn run(gateway_url: &str) -> Result<(), Box> { input.write_output(&format!("Error: {}", message)).await?; } WsOutbound::SessionEstablished { session_id } => { + tracing::debug!(session_id = %session_id, "Session established"); input.write_output(&format!("Session: {}\n", session_id)).await?; } _ => {} @@ -42,6 +43,7 @@ pub async fn run(gateway_url: &str) -> Result<(), Box> { } } Some(Ok(Message::Close(_))) | None => { + tracing::info!("Gateway disconnected"); input.write_output("Gateway disconnected").await?; break; } @@ -75,14 +77,14 @@ pub async fn run(gateway_url: &str) -> Result<(), Box> { }; if let Ok(text) = serialize_inbound(&inbound) { if sender.send(Message::Text(text.into())).await.is_err() { - eprintln!("Failed to send message"); + tracing::error!("Failed to send message to gateway"); break; } } } Ok(None) => break, Err(e) => { - eprintln!("Input error: {}", e); + tracing::error!(error = %e, "Input error"); break; } } diff --git a/src/config/mod.rs b/src/config/mod.rs index dc6b315..f15dabc 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -138,13 +138,13 @@ impl Config { fn load_from(path: &Path) -> Result> { load_env_file()?; let content = if path.exists() { - println!("Config loaded from: {}", path.display()); + tracing::info!(path = %path.display(), "Config loaded"); fs::read_to_string(path)? } else { // Fallback to current directory let fallback = Path::new("config.json"); if fallback.exists() { - println!("Config loaded from: {}", fallback.display()); + tracing::info!(path = %fallback.display(), "Config loaded from fallback path"); fs::read_to_string(fallback)? } else { return Err(Box::new(ConfigError::ConfigNotFound( diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index e75b9ba..88c9165 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -64,7 +64,7 @@ pub async fn run(host: Option, port: Option) -> Result<(), Box(); @@ -73,7 +73,7 @@ pub async fn run(host: Option, port: Option) -> Result<(), Box Result>, AgentError> { if let Some(agent) = self.chat_agents.get(chat_id) { + tracing::trace!(chat_id = %chat_id, "Reusing existing agent"); return Ok(agent.clone()); } + tracing::debug!(chat_id = %chat_id, "Creating new agent for chat"); let agent = AgentLoop::new(self.provider_config.clone())?; let arc = Arc::new(Mutex::new(agent)); self.chat_agents.insert(chat_id.to_string(), arc.clone()); @@ -99,8 +101,15 @@ impl SessionManager { let mut inner = self.inner.lock().await; let should_recreate = if let Some(last_active) = inner.session_timestamps.get(channel_name) { - last_active.elapsed() > inner.session_ttl + let elapsed = last_active.elapsed(); + if elapsed > inner.session_ttl { + tracing::info!(channel = %channel_name, elapsed_hours = elapsed.as_secs() / 3600, "Session expired, recreating"); + true + } else { + false + } } else { + tracing::debug!(channel = %channel_name, "Creating new session"); true }; @@ -140,6 +149,8 @@ impl SessionManager { chat_id: &str, content: &str, ) -> Result { + tracing::debug!(channel = %channel_name, chat_id = %chat_id, content_len = content.len(), "Routing message to agent"); + // 确保 session 存在(可能需要重建) self.ensure_session(channel_name).await?; @@ -161,6 +172,8 @@ impl SessionManager { let user_msg = ChatMessage::user(content); let response = agent.process(user_msg).await?; + tracing::debug!(channel = %channel_name, chat_id = %chat_id, response_len = response.content.len(), "Agent response received"); + Ok(response.content) } diff --git a/src/gateway/ws.rs b/src/gateway/ws.rs index 91e9b2d..bdab4d1 100644 --- a/src/gateway/ws.rs +++ b/src/gateway/ws.rs @@ -20,7 +20,7 @@ async fn handle_socket(ws: WebSocket, state: Arc) { let provider_config = match state.config.get_provider_config("default") { Ok(cfg) => cfg, Err(e) => { - eprintln!("Failed to get provider config: {}", e); + tracing::error!(error = %e, "Failed to get provider config"); return; } }; @@ -32,12 +32,13 @@ async fn handle_socket(ws: WebSocket, state: Arc) { let session = match Session::new(channel_name.clone(), provider_config, sender).await { Ok(s) => Arc::new(Mutex::new(s)), Err(e) => { - eprintln!("Failed to create session: {}", e); + tracing::error!(error = %e, "Failed to create session"); return; } }; let session_id = session.lock().await.id; + tracing::info!(session_id = %session_id, "CLI session established"); let _ = session.lock().await.send(WsOutbound::SessionEstablished { session_id: session_id.to_string(), @@ -46,10 +47,12 @@ async fn handle_socket(ws: WebSocket, state: Arc) { let (mut ws_sender, mut ws_receiver) = ws.split(); let mut receiver = receiver; + let session_id_for_sender = session_id; tokio::spawn(async move { while let Some(msg) = receiver.recv().await { if let Ok(text) = serialize_outbound(&msg) { if ws_sender.send(WsMessage::Text(text.into())).await.is_err() { + tracing::debug!(session_id = %session_id_for_sender, "WebSocket send error"); break; } } @@ -65,6 +68,7 @@ async fn handle_socket(ws: WebSocket, state: Arc) { handle_inbound(&session, inbound).await; } Err(e) => { + tracing::warn!(error = %e, "Failed to parse inbound message"); let _ = session.lock().await.send(WsOutbound::Error { code: "PARSE_ERROR".to_string(), message: e.to_string(), @@ -73,11 +77,14 @@ async fn handle_socket(ws: WebSocket, state: Arc) { } } Ok(WsMessage::Close(_)) | Err(_) => { + tracing::debug!(session_id = %session_id, "WebSocket closed"); break; } _ => {} } } + + tracing::info!(session_id = %session_id, "CLI session ended"); } async fn handle_inbound(session: &Arc>, inbound: WsInbound) { @@ -100,6 +107,7 @@ async fn handle_inbound(session: &Arc>, inbound: WsInbound) { let agent = match session_guard.get_or_create_agent(&chat_id).await { Ok(a) => a, Err(e) => { + tracing::error!(chat_id = %chat_id, error = %e, "Failed to get or create agent"); let _ = session_guard.send(WsOutbound::Error { code: "AGENT_ERROR".to_string(), message: e.to_string(), @@ -112,6 +120,7 @@ async fn handle_inbound(session: &Arc>, inbound: WsInbound) { let mut agent = agent.lock().await; match agent.process(user_msg).await { Ok(response) => { + tracing::debug!(chat_id = %chat_id, "Agent response sent"); let _ = session.lock().await.send(WsOutbound::AssistantResponse { id: response.id, content: response.content, @@ -119,6 +128,7 @@ async fn handle_inbound(session: &Arc>, inbound: WsInbound) { }).await; } Err(e) => { + tracing::error!(chat_id = %chat_id, error = %e, "Agent process error"); let _ = session.lock().await.send(WsOutbound::Error { code: "LLM_ERROR".to_string(), message: e.to_string(),