From ac7576bb4b06f2b0bef24eb05af401267345fe09 Mon Sep 17 00:00:00 2001 From: xiaoxixi Date: Tue, 28 Apr 2026 22:29:24 +0800 Subject: [PATCH] =?UTF-8?q?feat(session):=20Phase=203=20-=20SessionManager?= =?UTF-8?q?=20=E5=AE=8C=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - SessionManager::new 接收 Arc 参数 - create_session / get_or_create_session 写入/恢复 Storage - handle_message 支持无 dialog_id 时自动查找最近活跃 session 或创建新 session - 实现 list_dialogs() — 从 Storage 读取最近 10 条 - 实现 switch_dialog() — 从 Storage 恢复 session - 实现 delete_dialog() — 软删除 Storage + 内存移除 - 实现 rename_dialog() — 更新 Storage 和内存 title - 实现 archive_dialog()(空实现,archive 概念已删除) - 新增 start_cleanup_task() 后台 TTL 清理任务 - GatewayConfig 新增 session_db_path 和 cleanup_interval_minutes 配置 - Gateway::new 改为 async,创建 Storage 并启动清理任务 - rename_dialog / delete_dialog 改为 async(需 .await) - WsOutbound::SystemNotification 已在 Phase 2 添加 --- src/config/mod.rs | 6 ++ src/gateway/mod.rs | 27 +++++-- src/session/session.rs | 173 ++++++++++++++++++++++++++++++++++++----- 3 files changed, 180 insertions(+), 26 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index eed56ca..d44f949 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -136,6 +136,10 @@ pub struct GatewayConfig { pub port: u16, #[serde(default, rename = "session_ttl_hours")] pub session_ttl_hours: Option, + #[serde(default, rename = "cleanup_interval_minutes")] + pub cleanup_interval_minutes: Option, + #[serde(default, rename = "session_db_path")] + pub session_db_path: Option, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -162,6 +166,8 @@ impl Default for GatewayConfig { host: default_gateway_host(), port: default_gateway_port(), session_ttl_hours: None, + cleanup_interval_minutes: None, + session_db_path: None, } } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 2683316..6a002bf 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -20,7 +20,7 @@ pub struct GatewayState { } impl GatewayState { - pub fn new() -> Result> { + pub async fn new() -> Result> { let config = Config::load_default()?; // Initialize workspace directory: expand path and ensure it exists @@ -41,7 +41,24 @@ impl GatewayState { // Session TTL from config (default 4 hours) let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4); - let session_manager = SessionManager::new(session_ttl_hours, provider_config.clone())?; + // Initialize Storage + let db_path = if let Some(ref path) = config.gateway.session_db_path { + std::path::PathBuf::from(path) + } else { + workspace_path.join(".picobot_sessions.db") + }; + let storage = Arc::new( + crate::storage::Storage::new(&db_path).await + .map_err(|e| format!("failed to initialize session storage: {}", e))? + ); + tracing::info!("Session storage: {}", db_path.display()); + + let session_manager = SessionManager::new(session_ttl_hours, provider_config.clone(), storage.clone())?; + + // Start background cleanup task (default 60 minutes) + let cleanup_interval = config.gateway.cleanup_interval_minutes.unwrap_or(60); + Arc::new(session_manager.clone()).start_cleanup_task(cleanup_interval); + tracing::info!("Session cleanup task started (interval: {} min)", cleanup_interval); // Create CLI Chat Channel first (needed for ChannelManager) let cli_chat_channel = Arc::new(CliChatChannel::new()); @@ -173,7 +190,7 @@ impl GatewayState { .map_err(|e| ChannelError::Other(e.to_string())) } RenameDialog { session_id, title } => { - session_manager.rename_dialog(&session_id, &title) + session_manager.rename_dialog(&session_id, &title).await .map(|()| SessionEvent::DialogRenamed { session_id, title }) .map_err(|e| ChannelError::Other(e.to_string())) } @@ -183,7 +200,7 @@ impl GatewayState { .map_err(|e| ChannelError::Other(e.to_string())) } DeleteDialog { session_id } => { - session_manager.delete_dialog(&session_id) + session_manager.delete_dialog(&session_id).await .map(|()| SessionEvent::DialogDeleted { session_id }) .map_err(|e| ChannelError::Other(e.to_string())) } @@ -213,7 +230,7 @@ pub async fn run(host: Option, port: Option) -> Result<(), Box, skills_loader: Arc, + storage: Arc, } struct SessionManagerInner { @@ -630,7 +631,11 @@ pub static SLASH_COMMANDS: &[SlashCommand] = &[ ]; impl SessionManager { - pub fn new(session_ttl_hours: u64, provider_config: LLMProviderConfig) -> Result { + pub fn new( + session_ttl_hours: u64, + provider_config: LLMProviderConfig, + storage: Arc, + ) -> Result { let skills_loader = SkillsLoader::new(); skills_loader.load_skills(); let skills_loader = Arc::new(skills_loader); @@ -646,6 +651,7 @@ impl SessionManager { provider_config, tools, skills_loader, + storage, }) } @@ -653,6 +659,42 @@ impl SessionManager { self.tools.clone() } + /// 启动后台 TTL 清理任务 + pub fn start_cleanup_task(self: Arc, interval_mins: u64) { + let cleanup_interval = Duration::from_secs(interval_mins * 60); + tokio::spawn(async move { + loop { + tokio::time::sleep(cleanup_interval).await; + self.run_cleanup().await; + } + }); + } + + /// 执行一次 TTL 清理:释放内存中过期的 session,Storage 记录保留 + async fn run_cleanup(&self) { + let inner = self.inner.lock().await; + let now = Instant::now(); + let ttl = inner.session_ttl; + + let expired: Vec = inner + .session_timestamps + .iter() + .filter(|(_, last_touch)| now.duration_since(**last_touch) > ttl) + .map(|(id, _)| id.clone()) + .collect(); + + drop(inner); + + if !expired.is_empty() { + let mut inner = self.inner.lock().await; + for id in &expired { + inner.sessions.remove(id); + inner.session_timestamps.remove(id); + } + tracing::debug!(count = expired.len(), "Cleaned up expired sessions"); + } + } + /// 获取所有可用的斜杠命令 pub fn get_slash_commands(&self) -> &[SlashCommand] { SLASH_COMMANDS @@ -758,13 +800,30 @@ impl SessionManager { .map(ToOwned::to_owned) .unwrap_or_else(|| "新对话".to_string()); + // Write to Storage first + let now = chrono::Utc::now().timestamp_millis(); + let meta = crate::storage::session::SessionMeta { + id: session_id_str.clone(), + channel: channel.to_string(), + chat_id: chat_id.to_string(), + dialog_id: dialog_id.clone(), + title: title.clone(), + created_at: now, + last_active_at: now, + message_count: 0, + routing_info: if routing_info.is_empty() { None } else { Some(routing_info.clone()) }, + deleted_at: None, + }; + self.storage.upsert_session(&meta).await + .map_err(|e| AgentError::Other(format!("failed to create session in storage: {}", e)))?; + let (user_tx, _rx) = mpsc::channel::(100); let session = Session::new( unified_id.clone(), self.provider_config.clone(), user_tx, self.tools.clone(), - None, // storage injected in Phase 3 + Some(self.storage.clone()), routing_info, title.clone(), ).await?; @@ -786,6 +845,28 @@ impl SessionManager { return Ok(session.clone()); } + // Try to restore from Storage + match self.storage.get_session(&session_id_str).await { + Ok(meta) => { + let (user_tx, _rx) = mpsc::channel::(100); + let session = Session::from_storage( + unified_id.clone(), + self.provider_config.clone(), + user_tx, + self.tools.clone(), + self.storage.clone(), + ).await?; + + let arc = Arc::new(Mutex::new(session)); + inner.sessions.insert(session_id_str.clone(), arc.clone()); + inner.session_timestamps.insert(session_id_str, Instant::now()); + return Ok(arc); + } + Err(_) => { + // Session not in Storage, create new + } + } + // Create new session let (user_tx, _rx) = mpsc::channel::(100); let session = Session::new( @@ -793,9 +874,9 @@ impl SessionManager { self.provider_config.clone(), user_tx, self.tools.clone(), - None, // storage injected in Phase 3 - String::new(), // routing_info - set by channel layer in Phase 3 - format!("Dialog {}", &unified_id.dialog_id), + Some(self.storage.clone()), + String::new(), + format!("新对话"), ).await?; let arc = Arc::new(Mutex::new(session)); @@ -823,32 +904,67 @@ impl SessionManager { pub async fn switch_dialog( &self, - _channel: &str, - _chat_id: &str, - _dialog_id: &str, + channel: &str, + chat_id: &str, + dialog_id: &str, ) -> Result { - Err(AgentError::Other("switch_dialog not applicable in new architecture".to_string())) + let unified_id = UnifiedSessionId::new(channel, chat_id, dialog_id); + // Ensure session is loaded into memory + self.get_or_create_session(&unified_id).await?; + Ok(unified_id) } pub async fn list_dialogs( &self, - _channel: &str, - _chat_id: &str, + channel: &str, + chat_id: &str, _include_archived: bool, ) -> Result<(Vec, Option), AgentError> { - Ok((vec![], None)) + let metas = self.storage.list_sessions(channel, chat_id, 10).await + .map_err(|e| AgentError::Other(format!("failed to list dialogs: {}", e)))?; + + let dialogs: Vec = metas.into_iter().map(|meta| { + DialogInfo { + session_id: UnifiedSessionId::new(channel, chat_id, &meta.dialog_id), + title: meta.title, + created_at: meta.created_at, + last_active_at: meta.last_active_at, + message_count: meta.message_count, + archived_at: None, + } + }).collect(); + + Ok((dialogs, None)) } - pub fn rename_dialog(&self, _session_id: &UnifiedSessionId, _title: &str) -> Result<(), AgentError> { - Err(AgentError::Other("rename_dialog not available".to_string())) + pub async fn rename_dialog(&self, session_id: &UnifiedSessionId, title: &str) -> Result<(), AgentError> { + // Update in-memory session + let session = self.get_or_create_session(session_id).await?; + let mut session_guard = session.lock().await; + session_guard.title = title.to_string(); + session_guard.persist_session_meta().await + .map_err(|e| AgentError::Other(format!("failed to rename dialog: {}", e)))?; + Ok(()) + } + + pub async fn delete_dialog(&self, session_id: &UnifiedSessionId) -> Result<(), AgentError> { + let session_id_str = session_id.to_string(); + + // Soft delete from Storage + self.storage.soft_delete_session(&session_id_str).await + .map_err(|e| AgentError::Other(format!("failed to delete dialog: {}", e)))?; + + // Remove from memory + let mut inner = self.inner.lock().await; + inner.sessions.remove(&session_id_str); + inner.session_timestamps.remove(&session_id_str); + + Ok(()) } pub fn archive_dialog(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> { - Err(AgentError::Other("archive_dialog not available".to_string())) - } - - pub fn delete_dialog(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> { - Err(AgentError::Other("delete_dialog not available".to_string())) + // Archive concept removed - just return OK + Ok(()) } pub fn clear_dialog_history(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> { @@ -864,8 +980,23 @@ impl SessionManager { content: &str, media: Vec, ) -> Result { - let dialog_id = dialog_id.unwrap_or(DEFAULT_DIALOG_ID); - let unified_id = UnifiedSessionId::new(channel, chat_id, dialog_id); + // Determine dialog_id: if not provided, find most recent active or create new + let unified_id = if let Some(did) = dialog_id { + UnifiedSessionId::new(channel, chat_id, did) + } else { + // Find active session from Storage + let ttl_millis = self.inner.lock().await.session_ttl.as_millis() as i64; + match self.storage.find_active_session(channel, chat_id, ttl_millis).await { + Ok(Some(meta)) => { + UnifiedSessionId::new(channel, chat_id, &meta.dialog_id) + } + Ok(None) | Err(_) => { + // Create new session + let (new_id, _) = self.create_session(channel, chat_id, None, String::new()).await?; + new_id + } + } + }; let session = self.get_or_create_session(&unified_id).await?; // Check for slash command