feat: 添加 SessionLifecycleService 模块,重构会话管理逻辑以优化会话生命周期处理

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
ooodc 2026-04-28 14:32:14 +08:00
parent 8f27bd2735
commit acc8f63da0
3 changed files with 60 additions and 26 deletions

View File

@ -12,6 +12,7 @@ pub mod prompt;
pub mod prompt_injector; pub mod prompt_injector;
pub mod session; pub mod session;
pub mod session_factory; pub mod session_factory;
pub mod session_lifecycle;
pub mod session_pool; pub mod session_pool;
pub mod tool_registry_factory; pub mod tool_registry_factory;
pub mod ws; pub mod ws;

View File

@ -31,7 +31,7 @@ use super::memory_maintenance::{
}; };
use super::prompt_injector::PromptInjector; use super::prompt_injector::PromptInjector;
use super::session_factory::SessionFactory; use super::session_factory::SessionFactory;
use super::session_pool::SessionPool; use super::session_lifecycle::SessionLifecycleService;
use super::tool_registry_factory::ToolRegistryFactory; use super::tool_registry_factory::ToolRegistryFactory;
fn preview_text(content: &str, max_chars: usize) -> String { fn preview_text(content: &str, max_chars: usize) -> String {
@ -455,7 +455,7 @@ pub struct SessionManager {
skills: Arc<SkillRuntime>, skills: Arc<SkillRuntime>,
store: Arc<SessionStore>, store: Arc<SessionStore>,
show_tool_results: bool, show_tool_results: bool,
session_pool: SessionPool, lifecycle: SessionLifecycleService,
cli_sessions: CliSessionService, cli_sessions: CliSessionService,
} }
@ -499,7 +499,7 @@ impl SessionManager {
prompt_injector, prompt_injector,
store.clone(), store.clone(),
); );
let session_pool = SessionPool::new(session_ttl_hours, session_factory); let lifecycle = SessionLifecycleService::new(session_ttl_hours, session_factory);
let cli_sessions = CliSessionService::new(store.clone()); let cli_sessions = CliSessionService::new(store.clone());
Ok(Self { Ok(Self {
@ -509,7 +509,7 @@ impl SessionManager {
skills, skills,
store, store,
show_tool_results, show_tool_results,
session_pool, lifecycle,
cli_sessions, cli_sessions,
}) })
} }
@ -569,21 +569,21 @@ impl SessionManager {
/// 确保 session 存在且未超时,超时则重建 /// 确保 session 存在且未超时,超时则重建
pub async fn ensure_session(&self, channel_name: &str) -> Result<(), AgentError> { pub async fn ensure_session(&self, channel_name: &str) -> Result<(), AgentError> {
self.session_pool.ensure_session(channel_name).await self.lifecycle.ensure_session(channel_name).await
} }
/// 获取 session不检查超时 /// 获取 session不检查超时
pub async fn get(&self, channel_name: &str) -> Option<Arc<Mutex<Session>>> { pub async fn get(&self, channel_name: &str) -> Option<Arc<Mutex<Session>>> {
self.session_pool.get(channel_name).await self.lifecycle.get(channel_name).await
} }
/// 更新最后活跃时间 /// 更新最后活跃时间
pub async fn touch(&self, channel_name: &str) { pub async fn touch(&self, channel_name: &str) {
self.session_pool.touch(channel_name).await; self.lifecycle.touch(channel_name).await;
} }
pub async fn cleanup_expired_sessions(&self) -> usize { pub async fn cleanup_expired_sessions(&self) -> usize {
self.session_pool.cleanup_expired_sessions().await self.lifecycle.cleanup_expired_sessions().await
} }
/// 处理消息:路由到对应 session 的 agent /// 处理消息:路由到对应 session 的 agent
@ -610,17 +610,7 @@ impl SessionManager {
} }
} }
// 确保 session 存在(可能需要重建) let session = self.lifecycle.active_session(channel_name).await?;
self.ensure_session(channel_name).await?;
// 更新活跃时间
self.touch(channel_name).await;
// 获取 session
let session = self
.get(channel_name)
.await
.ok_or_else(|| AgentError::Other("Session not found".to_string()))?;
let outbound_messages = AgentExecutionService::new(self.show_tool_results) let outbound_messages = AgentExecutionService::new(self.show_tool_results)
.prepare_and_execute_message(MessageExecutionRequest { .prepare_and_execute_message(MessageExecutionRequest {
@ -652,13 +642,7 @@ impl SessionManager {
prompt: &str, prompt: &str,
options: ScheduledAgentTaskOptions, options: ScheduledAgentTaskOptions,
) -> Result<Vec<OutboundMessage>, AgentError> { ) -> Result<Vec<OutboundMessage>, AgentError> {
self.ensure_session(channel_name).await?; let session = self.lifecycle.active_session(channel_name).await?;
self.touch(channel_name).await;
let session = self
.get(channel_name)
.await
.ok_or_else(|| AgentError::Other("Session not found".to_string()))?;
let sender_id = options let sender_id = options
.sender_id .sender_id

View File

@ -0,0 +1,49 @@
use std::sync::Arc;
use tokio::sync::Mutex;
use crate::agent::AgentError;
use super::session::Session;
use super::session_factory::SessionFactory;
use super::session_pool::SessionPool;
#[derive(Clone)]
pub(crate) struct SessionLifecycleService {
session_pool: SessionPool,
}
impl SessionLifecycleService {
pub(crate) fn new(session_ttl_hours: u64, session_factory: SessionFactory) -> Self {
Self {
session_pool: SessionPool::new(session_ttl_hours, session_factory),
}
}
pub(crate) async fn ensure_session(&self, channel_name: &str) -> Result<(), AgentError> {
self.session_pool.ensure_session(channel_name).await
}
pub(crate) async fn get(&self, channel_name: &str) -> Option<Arc<Mutex<Session>>> {
self.session_pool.get(channel_name).await
}
pub(crate) async fn touch(&self, channel_name: &str) {
self.session_pool.touch(channel_name).await;
}
pub(crate) async fn active_session(
&self,
channel_name: &str,
) -> Result<Arc<Mutex<Session>>, AgentError> {
self.ensure_session(channel_name).await?;
self.touch(channel_name).await;
self.get(channel_name)
.await
.ok_or_else(|| AgentError::Other("Session not found".to_string()))
}
pub(crate) async fn cleanup_expired_sessions(&self) -> usize {
self.session_pool.cleanup_expired_sessions().await
}
}