diff --git a/src/gateway/scheduled_agent_task_service.rs b/src/gateway/scheduled_agent_task_service.rs index 214a9f8..66bd633 100644 --- a/src/gateway/scheduled_agent_task_service.rs +++ b/src/gateway/scheduled_agent_task_service.rs @@ -34,7 +34,10 @@ impl ScheduledAgentTaskService { prompt: &str, options: ScheduledAgentTaskOptions, ) -> Result, AgentError> { - let session = self.lifecycle.active_session(channel_name).await?; + // 根据 chat_id 自动选择 Session: + // - scheduler/ 开头:使用定时任务专用 Session(独立实例,不与用户消息竞争锁) + // - 其他:使用主 Session + let session = self.lifecycle.active_session_for_chat_id(channel_name, chat_id).await?; let sender_id = options .sender_id .clone() diff --git a/src/gateway/session_lifecycle.rs b/src/gateway/session_lifecycle.rs index 0ff3be9..3d02ead 100644 --- a/src/gateway/session_lifecycle.rs +++ b/src/gateway/session_lifecycle.rs @@ -32,6 +32,7 @@ impl SessionLifecycleService { self.session_pool.touch(channel_name).await; } + /// 获取活跃的主 Session(用于用户消息) pub(crate) async fn active_session( &self, channel_name: &str, @@ -43,6 +44,21 @@ impl SessionLifecycleService { .ok_or_else(|| AgentError::Other("Session not found".to_string())) } + /// 根据 chat_id 自动选择并获取 Session + /// - scheduler/ 开头:返回定时任务专用 Session + /// - 其他:返回主 Session + pub(crate) async fn active_session_for_chat_id( + &self, + channel_name: &str, + chat_id: &str, + ) -> Result>, AgentError> { + self.session_pool.ensure_session_for_chat_id(channel_name, chat_id).await?; + self.touch(channel_name).await; + self.session_pool.get_for_chat_id(channel_name, chat_id) + .await + .ok_or_else(|| AgentError::Other("Session not found".to_string())) + } + pub(crate) async fn cleanup_expired_sessions(&self) -> usize { self.session_pool.cleanup_expired_sessions().await } diff --git a/src/gateway/session_pool.rs b/src/gateway/session_pool.rs index a26ada7..546f57a 100644 --- a/src/gateway/session_pool.rs +++ b/src/gateway/session_pool.rs @@ -10,6 +10,11 @@ use crate::protocol::WsOutbound; use super::session::Session; use super::session_factory::SessionFactory; +/// 判断 chat_id 是否是定时任务专用(以 "scheduler/" 开头) +pub(crate) fn is_scheduler_chat_id(chat_id: &str) -> bool { + chat_id.starts_with("scheduler/") +} + #[derive(Clone)] pub(crate) struct SessionPool { inner: Arc>, @@ -18,7 +23,10 @@ pub(crate) struct SessionPool { } struct SessionPoolInner { + /// 主 Session:用于用户消息 sessions: HashMap>>, + /// 定时任务专用 Session:独立的实例,避免与用户消息竞争锁 + scheduler_sessions: HashMap>>, session_timestamps: HashMap, } @@ -27,6 +35,7 @@ impl SessionPool { Self { inner: Arc::new(Mutex::new(SessionPoolInner { sessions: HashMap::new(), + scheduler_sessions: HashMap::new(), session_timestamps: HashMap::new(), })), session_factory, @@ -34,11 +43,29 @@ impl SessionPool { } } + /// 确保主 Session 存在(用于用户消息) pub(crate) async fn ensure_session(&self, channel_name: &str) -> Result<(), AgentError> { + self.ensure_session_internal(channel_name, false).await + } + + /// 确保定时任务专用 Session 存在 + pub(crate) async fn ensure_scheduler_session(&self, channel_name: &str) -> Result<(), AgentError> { + self.ensure_session_internal(channel_name, true).await + } + + /// 内部方法:创建 Session(根据 is_scheduler 选择存储位置) + async fn ensure_session_internal(&self, channel_name: &str, is_scheduler: bool) -> Result<(), AgentError> { let mut inner = self.inner.lock().await; + // 选择对应的存储 + let sessions = if is_scheduler { + &mut inner.scheduler_sessions + } else { + &mut inner.sessions + }; + // 简化:只检查 session 是否存在,不做超时判断 - if inner.sessions.contains_key(channel_name) { + if sessions.contains_key(channel_name) { return Ok(()); } @@ -49,9 +76,7 @@ impl SessionPool { .create(channel_name.to_string(), user_tx) .await?; - inner - .sessions - .insert(channel_name.to_string(), Arc::new(Mutex::new(session))); + sessions.insert(channel_name.to_string(), Arc::new(Mutex::new(session))); inner .session_timestamps .insert(channel_name.to_string(), Instant::now()); @@ -59,10 +84,36 @@ impl SessionPool { Ok(()) } + /// 获取主 Session(用于用户消息) pub(crate) async fn get(&self, channel_name: &str) -> Option>> { self.inner.lock().await.sessions.get(channel_name).cloned() } + /// 获取定时任务专用 Session + pub(crate) async fn get_scheduler_session(&self, channel_name: &str) -> Option>> { + self.inner.lock().await.scheduler_sessions.get(channel_name).cloned() + } + + /// 根据 chat_id 自动选择 Session + /// - scheduler/ 开头:返回定时任务专用 Session + /// - 其他:返回主 Session + pub(crate) async fn get_for_chat_id(&self, channel_name: &str, chat_id: &str) -> Option>> { + if is_scheduler_chat_id(chat_id) { + self.get_scheduler_session(channel_name).await + } else { + self.get(channel_name).await + } + } + + /// 确保 Session 存在(根据 chat_id 自动选择) + pub(crate) async fn ensure_session_for_chat_id(&self, channel_name: &str, chat_id: &str) -> Result<(), AgentError> { + if is_scheduler_chat_id(chat_id) { + self.ensure_scheduler_session(channel_name).await + } else { + self.ensure_session(channel_name).await + } + } + pub(crate) async fn touch(&self, channel_name: &str) { self.inner .lock() @@ -101,7 +152,10 @@ impl SessionPool { .collect(); for channel_name in &expired_channels { + // 清理主 Session inner.sessions.remove(channel_name); + // 清理定时任务专用 Session + inner.scheduler_sessions.remove(channel_name); inner.session_timestamps.remove(channel_name); }