diff --git a/src/bootstrap.rs b/src/bootstrap.rs index 937c325..c13a391 100644 --- a/src/bootstrap.rs +++ b/src/bootstrap.rs @@ -1,3 +1,26 @@ pub fn initialize_process_runtime() { let _ = rustls::crypto::ring::default_provider().install_default(); + + // Install a global panic hook so that any panic in a spawned task + // is logged with a full backtrace before the process exits. Without + // this hook, panics are silent (or print a brief message to stderr) + // which makes root-causing crashes difficult. + let default_hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + // Use tracing so the panic appears in the same log stream as everything else. + tracing::error!( + panic.payload = ?info.payload(), + panic.location = ?info.location(), + "FATAL: process panicked — collecting backtrace" + ); + // Print a compact backtrace to stderr as well (backtrace is not + // captured by tracing). + let backtrace = std::backtrace::Backtrace::capture(); + if backtrace.status() == std::backtrace::BacktraceStatus::Captured { + eprintln!("FATAL panic backtrace:\n{}", backtrace); + } + // Delegate to the default hook which prints the panic message and + // optionally the RUST_BACKTRACE-based backtrace. + default_hook(info); + })); } \ No newline at end of file diff --git a/src/bus/mod.rs b/src/bus/mod.rs index 7b09308..de0d507 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -43,18 +43,13 @@ impl MessageBus { .map_err(|_| BusError::Closed) } - /// Consume a message from the inbound queue - pub async fn consume_inbound(&self) -> InboundMessage { - let msg = self - .inbound_rx - .lock() - .await - .recv() - .await - .expect("bus inbound closed"); + /// Consume a message from the inbound queue. + /// Returns `None` when the channel is closed (all senders dropped). + pub async fn consume_inbound(&self) -> Option { + let msg = self.inbound_rx.lock().await.recv().await?; #[cfg(debug_assertions)] tracing::debug!(channel = %msg.channel, sender = %msg.sender_id, chat = %msg.chat_id, "Bus: consuming inbound message"); - msg + Some(msg) } /// Publish a message to the outbound queue @@ -67,14 +62,10 @@ impl MessageBus { .map_err(|_| BusError::Closed) } - /// Consume an outbound message from the outbound queue - pub async fn consume_outbound(&self) -> OutboundMessage { - self.outbound_rx - .lock() - .await - .recv() - .await - .expect("bus outbound closed") + /// Consume an outbound message from the outbound queue. + /// Returns `None` when the channel is closed (all senders dropped). + pub async fn consume_outbound(&self) -> Option { + self.outbound_rx.lock().await.recv().await } } diff --git a/src/channels/wechat.rs b/src/channels/wechat.rs index 00112b1..79c2e63 100644 --- a/src/channels/wechat.rs +++ b/src/channels/wechat.rs @@ -8,6 +8,7 @@ use std::sync::{ use std::time::UNIX_EPOCH; use async_trait::async_trait; +use futures_util::FutureExt; use tokio::sync::RwLock; use tokio::task::JoinHandle; use wechatbot::{BotOptions, SendContent, WeChatBot}; @@ -246,24 +247,39 @@ impl Channel for WechatChannel { let running = self.running.clone(); let handle = tokio::spawn(async move { - match bot.login(force_login).await { - Ok(creds) => { - tracing::info!( - channel = %channel_name, - account_id = %creds.account_id, - user_id = %creds.user_id, - "WeChat login succeeded" - ); + // Use catch_unwind to prevent a panic in the WeChat SDK (login or + // long-poll loop) from crashing the entire process. Any panic is + // logged and the channel is cleanly marked as stopped. + // AssertUnwindSafe is needed because WeChatBot contains internal + // locks (RwLock) that are not RefUnwindSafe. + let result = std::panic::AssertUnwindSafe(async { + match bot.login(force_login).await { + Ok(creds) => { + tracing::info!( + channel = %channel_name, + account_id = %creds.account_id, + user_id = %creds.user_id, + "WeChat login succeeded" + ); + } + Err(error) => { + tracing::error!(channel = %channel_name, error = %error, "WeChat login failed"); + return; + } } - Err(error) => { - running.store(false, Ordering::SeqCst); - tracing::error!(channel = %channel_name, error = %error, "WeChat login failed"); - return; - } - } - if let Err(error) = bot.run().await { - tracing::error!(channel = %channel_name, error = %error, "WeChat channel stopped with error"); + if let Err(error) = bot.run().await { + tracing::error!(channel = %channel_name, error = %error, "WeChat channel stopped with error"); + } + }) + .catch_unwind() + .await; + + if let Err(_panic) = result { + tracing::error!( + channel = %channel_name, + "WeChat bot task panicked — marking channel as stopped" + ); } running.store(false, Ordering::SeqCst); diff --git a/src/gateway/outbound_dispatcher.rs b/src/gateway/outbound_dispatcher.rs index a4bfe71..3f6c604 100644 --- a/src/gateway/outbound_dispatcher.rs +++ b/src/gateway/outbound_dispatcher.rs @@ -34,7 +34,13 @@ impl OutboundDispatcher { tracing::info!("OutboundDispatcher started"); loop { - let msg = self.bus.consume_outbound().await; + let msg = match self.bus.consume_outbound().await { + Some(msg) => msg, + None => { + tracing::info!("Outbound bus closed, stopping dispatcher"); + break; + } + }; #[cfg(debug_assertions)] tracing::debug!( channel = %msg.channel, diff --git a/src/gateway/processor.rs b/src/gateway/processor.rs index 87c734e..7bf15e1 100644 --- a/src/gateway/processor.rs +++ b/src/gateway/processor.rs @@ -114,8 +114,14 @@ impl InboundProcessor { ); loop { - // 1. 消费消息 - let inbound = self.bus.consume_inbound().await; + // 1. 消费消息 (channel 关闭时返回 None,优雅退出) + let inbound = match self.bus.consume_inbound().await { + Some(msg) => msg, + None => { + tracing::info!("Inbound bus closed, stopping inbound processor"); + break; + } + }; #[cfg(debug_assertions)] { diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 3505d79..0f07325 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -1753,7 +1753,8 @@ mod tests { let msg = tokio::time::timeout(std::time::Duration::from_millis(500), bus.consume_outbound()) .await - .expect("should have received an outbound message"); + .expect("timeout waiting for outbound message") + .expect("bus outbound closed"); assert_eq!(msg.event_kind, OutboundEventKind::ToolResult); } diff --git a/src/gateway/session_message_sender.rs b/src/gateway/session_message_sender.rs index 50cb569..cc8c1db 100644 --- a/src/gateway/session_message_sender.rs +++ b/src/gateway/session_message_sender.rs @@ -138,7 +138,7 @@ mod tests { } ); - let msg = bus.consume_outbound().await; + let msg = bus.consume_outbound().await.expect("bus outbound closed"); assert_eq!(msg.content, "hello"); assert_eq!(msg.media.len(), 1); assert_eq!(msg.media[0].media_type, "image"); diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 5593bef..fd3754c 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -1681,7 +1681,8 @@ mod tests { bus.consume_outbound(), ) .await - .unwrap(); + .expect("timeout waiting for outbound message") + .expect("bus outbound closed"); assert_eq!(outbound.channel, "test-channel"); assert_eq!(outbound.chat_id, "oc_demo"); assert!(outbound.content.contains("定时任务执行失败"));