From acc8f63da0677c4ac327a65c04f0712a48f52aba Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Tue, 28 Apr 2026 14:32:14 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20SessionLifecycleSe?= =?UTF-8?q?rvice=20=E6=A8=A1=E5=9D=97=EF=BC=8C=E9=87=8D=E6=9E=84=E4=BC=9A?= =?UTF-8?q?=E8=AF=9D=E7=AE=A1=E7=90=86=E9=80=BB=E8=BE=91=E4=BB=A5=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E4=BC=9A=E8=AF=9D=E7=94=9F=E5=91=BD=E5=91=A8=E6=9C=9F?= =?UTF-8?q?=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot --- src/gateway/mod.rs | 1 + src/gateway/session.rs | 36 +++++++---------------- src/gateway/session_lifecycle.rs | 49 ++++++++++++++++++++++++++++++++ 3 files changed, 60 insertions(+), 26 deletions(-) create mode 100644 src/gateway/session_lifecycle.rs diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 88a98ea..b131307 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -12,6 +12,7 @@ pub mod prompt; pub mod prompt_injector; pub mod session; pub mod session_factory; +pub mod session_lifecycle; pub mod session_pool; pub mod tool_registry_factory; pub mod ws; diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 6e1435d..09b9d07 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -31,7 +31,7 @@ use super::memory_maintenance::{ }; use super::prompt_injector::PromptInjector; use super::session_factory::SessionFactory; -use super::session_pool::SessionPool; +use super::session_lifecycle::SessionLifecycleService; use super::tool_registry_factory::ToolRegistryFactory; fn preview_text(content: &str, max_chars: usize) -> String { @@ -455,7 +455,7 @@ pub struct SessionManager { skills: Arc, store: Arc, show_tool_results: bool, - session_pool: SessionPool, + lifecycle: SessionLifecycleService, cli_sessions: CliSessionService, } @@ -499,7 +499,7 @@ impl SessionManager { prompt_injector, store.clone(), ); - let session_pool = SessionPool::new(session_ttl_hours, session_factory); + let lifecycle = SessionLifecycleService::new(session_ttl_hours, session_factory); let cli_sessions = CliSessionService::new(store.clone()); Ok(Self { @@ -509,7 +509,7 @@ impl SessionManager { skills, store, show_tool_results, - session_pool, + lifecycle, cli_sessions, }) } @@ -569,21 +569,21 @@ impl SessionManager { /// 确保 session 存在且未超时,超时则重建 pub async fn ensure_session(&self, channel_name: &str) -> Result<(), AgentError> { - self.session_pool.ensure_session(channel_name).await + self.lifecycle.ensure_session(channel_name).await } /// 获取 session(不检查超时) pub async fn get(&self, channel_name: &str) -> Option>> { - self.session_pool.get(channel_name).await + self.lifecycle.get(channel_name).await } /// 更新最后活跃时间 pub async fn touch(&self, channel_name: &str) { - self.session_pool.touch(channel_name).await; + self.lifecycle.touch(channel_name).await; } pub async fn cleanup_expired_sessions(&self) -> usize { - self.session_pool.cleanup_expired_sessions().await + self.lifecycle.cleanup_expired_sessions().await } /// 处理消息:路由到对应 session 的 agent @@ -610,17 +610,7 @@ impl SessionManager { } } - // 确保 session 存在(可能需要重建) - self.ensure_session(channel_name).await?; - - // 更新活跃时间 - self.touch(channel_name).await; - - // 获取 session - let session = self - .get(channel_name) - .await - .ok_or_else(|| AgentError::Other("Session not found".to_string()))?; + let session = self.lifecycle.active_session(channel_name).await?; let outbound_messages = AgentExecutionService::new(self.show_tool_results) .prepare_and_execute_message(MessageExecutionRequest { @@ -652,13 +642,7 @@ impl SessionManager { prompt: &str, options: ScheduledAgentTaskOptions, ) -> Result, AgentError> { - self.ensure_session(channel_name).await?; - self.touch(channel_name).await; - - let session = self - .get(channel_name) - .await - .ok_or_else(|| AgentError::Other("Session not found".to_string()))?; + let session = self.lifecycle.active_session(channel_name).await?; let sender_id = options .sender_id diff --git a/src/gateway/session_lifecycle.rs b/src/gateway/session_lifecycle.rs new file mode 100644 index 0000000..e81af98 --- /dev/null +++ b/src/gateway/session_lifecycle.rs @@ -0,0 +1,49 @@ +use std::sync::Arc; + +use tokio::sync::Mutex; + +use crate::agent::AgentError; + +use super::session::Session; +use super::session_factory::SessionFactory; +use super::session_pool::SessionPool; + +#[derive(Clone)] +pub(crate) struct SessionLifecycleService { + session_pool: SessionPool, +} + +impl SessionLifecycleService { + pub(crate) fn new(session_ttl_hours: u64, session_factory: SessionFactory) -> Self { + Self { + session_pool: SessionPool::new(session_ttl_hours, session_factory), + } + } + + pub(crate) async fn ensure_session(&self, channel_name: &str) -> Result<(), AgentError> { + self.session_pool.ensure_session(channel_name).await + } + + pub(crate) async fn get(&self, channel_name: &str) -> Option>> { + self.session_pool.get(channel_name).await + } + + pub(crate) async fn touch(&self, channel_name: &str) { + self.session_pool.touch(channel_name).await; + } + + pub(crate) async fn active_session( + &self, + channel_name: &str, + ) -> Result>, AgentError> { + self.ensure_session(channel_name).await?; + self.touch(channel_name).await; + self.get(channel_name) + .await + .ok_or_else(|| AgentError::Other("Session not found".to_string())) + } + + pub(crate) async fn cleanup_expired_sessions(&self) -> usize { + self.session_pool.cleanup_expired_sessions().await + } +}