feat(session): Phase 3 - SessionManager 完善

- SessionManager::new 接收 Arc<Storage> 参数
- create_session / get_or_create_session 写入/恢复 Storage
- handle_message 支持无 dialog_id 时自动查找最近活跃 session 或创建新 session
- 实现 list_dialogs() — 从 Storage 读取最近 10 条
- 实现 switch_dialog() — 从 Storage 恢复 session
- 实现 delete_dialog() — 软删除 Storage + 内存移除
- 实现 rename_dialog() — 更新 Storage 和内存 title
- 实现 archive_dialog()(空实现,archive 概念已删除)
- 新增 start_cleanup_task() 后台 TTL 清理任务
- GatewayConfig 新增 session_db_path 和 cleanup_interval_minutes 配置
- Gateway::new 改为 async,创建 Storage 并启动清理任务
- rename_dialog / delete_dialog 改为 async(需 .await)
- WsOutbound::SystemNotification 已在 Phase 2 添加
This commit is contained in:
xiaoxixi 2026-04-28 22:29:24 +08:00
parent 5f7ffd28ef
commit ac7576bb4b
3 changed files with 180 additions and 26 deletions

View File

@ -136,6 +136,10 @@ pub struct GatewayConfig {
pub port: u16, pub port: u16,
#[serde(default, rename = "session_ttl_hours")] #[serde(default, rename = "session_ttl_hours")]
pub session_ttl_hours: Option<u64>, pub session_ttl_hours: Option<u64>,
#[serde(default, rename = "cleanup_interval_minutes")]
pub cleanup_interval_minutes: Option<u64>,
#[serde(default, rename = "session_db_path")]
pub session_db_path: Option<String>,
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
@ -162,6 +166,8 @@ impl Default for GatewayConfig {
host: default_gateway_host(), host: default_gateway_host(),
port: default_gateway_port(), port: default_gateway_port(),
session_ttl_hours: None, session_ttl_hours: None,
cleanup_interval_minutes: None,
session_db_path: None,
} }
} }
} }

View File

@ -20,7 +20,7 @@ pub struct GatewayState {
} }
impl GatewayState { impl GatewayState {
pub fn new() -> Result<Self, Box<dyn std::error::Error>> { pub async fn new() -> Result<Self, Box<dyn std::error::Error>> {
let config = Config::load_default()?; let config = Config::load_default()?;
// Initialize workspace directory: expand path and ensure it exists // Initialize workspace directory: expand path and ensure it exists
@ -41,7 +41,24 @@ impl GatewayState {
// Session TTL from config (default 4 hours) // Session TTL from config (default 4 hours)
let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4); let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4);
let session_manager = SessionManager::new(session_ttl_hours, provider_config.clone())?; // Initialize Storage
let db_path = if let Some(ref path) = config.gateway.session_db_path {
std::path::PathBuf::from(path)
} else {
workspace_path.join(".picobot_sessions.db")
};
let storage = Arc::new(
crate::storage::Storage::new(&db_path).await
.map_err(|e| format!("failed to initialize session storage: {}", e))?
);
tracing::info!("Session storage: {}", db_path.display());
let session_manager = SessionManager::new(session_ttl_hours, provider_config.clone(), storage.clone())?;
// Start background cleanup task (default 60 minutes)
let cleanup_interval = config.gateway.cleanup_interval_minutes.unwrap_or(60);
Arc::new(session_manager.clone()).start_cleanup_task(cleanup_interval);
tracing::info!("Session cleanup task started (interval: {} min)", cleanup_interval);
// Create CLI Chat Channel first (needed for ChannelManager) // Create CLI Chat Channel first (needed for ChannelManager)
let cli_chat_channel = Arc::new(CliChatChannel::new()); let cli_chat_channel = Arc::new(CliChatChannel::new());
@ -173,7 +190,7 @@ impl GatewayState {
.map_err(|e| ChannelError::Other(e.to_string())) .map_err(|e| ChannelError::Other(e.to_string()))
} }
RenameDialog { session_id, title } => { RenameDialog { session_id, title } => {
session_manager.rename_dialog(&session_id, &title) session_manager.rename_dialog(&session_id, &title).await
.map(|()| SessionEvent::DialogRenamed { session_id, title }) .map(|()| SessionEvent::DialogRenamed { session_id, title })
.map_err(|e| ChannelError::Other(e.to_string())) .map_err(|e| ChannelError::Other(e.to_string()))
} }
@ -183,7 +200,7 @@ impl GatewayState {
.map_err(|e| ChannelError::Other(e.to_string())) .map_err(|e| ChannelError::Other(e.to_string()))
} }
DeleteDialog { session_id } => { DeleteDialog { session_id } => {
session_manager.delete_dialog(&session_id) session_manager.delete_dialog(&session_id).await
.map(|()| SessionEvent::DialogDeleted { session_id }) .map(|()| SessionEvent::DialogDeleted { session_id })
.map_err(|e| ChannelError::Other(e.to_string())) .map_err(|e| ChannelError::Other(e.to_string()))
} }
@ -213,7 +230,7 @@ pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn
logging::init_logging(); logging::init_logging();
tracing::info!("Starting PicoBot Gateway"); tracing::info!("Starting PicoBot Gateway");
let state = Arc::new(GatewayState::new()?); let state = Arc::new(GatewayState::new().await?);
// Initialize and start channels with workspace directory // Initialize and start channels with workspace directory
state.channel_manager.init( state.channel_manager.init(

View File

@ -554,6 +554,7 @@ pub struct SessionManager {
provider_config: LLMProviderConfig, provider_config: LLMProviderConfig,
tools: Arc<ToolRegistry>, tools: Arc<ToolRegistry>,
skills_loader: Arc<SkillsLoader>, skills_loader: Arc<SkillsLoader>,
storage: Arc<Storage>,
} }
struct SessionManagerInner { struct SessionManagerInner {
@ -630,7 +631,11 @@ pub static SLASH_COMMANDS: &[SlashCommand] = &[
]; ];
impl SessionManager { impl SessionManager {
pub fn new(session_ttl_hours: u64, provider_config: LLMProviderConfig) -> Result<Self, AgentError> { pub fn new(
session_ttl_hours: u64,
provider_config: LLMProviderConfig,
storage: Arc<Storage>,
) -> Result<Self, AgentError> {
let skills_loader = SkillsLoader::new(); let skills_loader = SkillsLoader::new();
skills_loader.load_skills(); skills_loader.load_skills();
let skills_loader = Arc::new(skills_loader); let skills_loader = Arc::new(skills_loader);
@ -646,6 +651,7 @@ impl SessionManager {
provider_config, provider_config,
tools, tools,
skills_loader, skills_loader,
storage,
}) })
} }
@ -653,6 +659,42 @@ impl SessionManager {
self.tools.clone() self.tools.clone()
} }
/// 启动后台 TTL 清理任务
pub fn start_cleanup_task(self: Arc<Self>, interval_mins: u64) {
let cleanup_interval = Duration::from_secs(interval_mins * 60);
tokio::spawn(async move {
loop {
tokio::time::sleep(cleanup_interval).await;
self.run_cleanup().await;
}
});
}
/// 执行一次 TTL 清理:释放内存中过期的 sessionStorage 记录保留
async fn run_cleanup(&self) {
let inner = self.inner.lock().await;
let now = Instant::now();
let ttl = inner.session_ttl;
let expired: Vec<String> = inner
.session_timestamps
.iter()
.filter(|(_, last_touch)| now.duration_since(**last_touch) > ttl)
.map(|(id, _)| id.clone())
.collect();
drop(inner);
if !expired.is_empty() {
let mut inner = self.inner.lock().await;
for id in &expired {
inner.sessions.remove(id);
inner.session_timestamps.remove(id);
}
tracing::debug!(count = expired.len(), "Cleaned up expired sessions");
}
}
/// 获取所有可用的斜杠命令 /// 获取所有可用的斜杠命令
pub fn get_slash_commands(&self) -> &[SlashCommand] { pub fn get_slash_commands(&self) -> &[SlashCommand] {
SLASH_COMMANDS SLASH_COMMANDS
@ -758,13 +800,30 @@ impl SessionManager {
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.unwrap_or_else(|| "新对话".to_string()); .unwrap_or_else(|| "新对话".to_string());
// Write to Storage first
let now = chrono::Utc::now().timestamp_millis();
let meta = crate::storage::session::SessionMeta {
id: session_id_str.clone(),
channel: channel.to_string(),
chat_id: chat_id.to_string(),
dialog_id: dialog_id.clone(),
title: title.clone(),
created_at: now,
last_active_at: now,
message_count: 0,
routing_info: if routing_info.is_empty() { None } else { Some(routing_info.clone()) },
deleted_at: None,
};
self.storage.upsert_session(&meta).await
.map_err(|e| AgentError::Other(format!("failed to create session in storage: {}", e)))?;
let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100); let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100);
let session = Session::new( let session = Session::new(
unified_id.clone(), unified_id.clone(),
self.provider_config.clone(), self.provider_config.clone(),
user_tx, user_tx,
self.tools.clone(), self.tools.clone(),
None, // storage injected in Phase 3 Some(self.storage.clone()),
routing_info, routing_info,
title.clone(), title.clone(),
).await?; ).await?;
@ -786,6 +845,28 @@ impl SessionManager {
return Ok(session.clone()); return Ok(session.clone());
} }
// Try to restore from Storage
match self.storage.get_session(&session_id_str).await {
Ok(meta) => {
let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100);
let session = Session::from_storage(
unified_id.clone(),
self.provider_config.clone(),
user_tx,
self.tools.clone(),
self.storage.clone(),
).await?;
let arc = Arc::new(Mutex::new(session));
inner.sessions.insert(session_id_str.clone(), arc.clone());
inner.session_timestamps.insert(session_id_str, Instant::now());
return Ok(arc);
}
Err(_) => {
// Session not in Storage, create new
}
}
// Create new session // Create new session
let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100); let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100);
let session = Session::new( let session = Session::new(
@ -793,9 +874,9 @@ impl SessionManager {
self.provider_config.clone(), self.provider_config.clone(),
user_tx, user_tx,
self.tools.clone(), self.tools.clone(),
None, // storage injected in Phase 3 Some(self.storage.clone()),
String::new(), // routing_info - set by channel layer in Phase 3 String::new(),
format!("Dialog {}", &unified_id.dialog_id), format!("新对话"),
).await?; ).await?;
let arc = Arc::new(Mutex::new(session)); let arc = Arc::new(Mutex::new(session));
@ -823,32 +904,67 @@ impl SessionManager {
pub async fn switch_dialog( pub async fn switch_dialog(
&self, &self,
_channel: &str, channel: &str,
_chat_id: &str, chat_id: &str,
_dialog_id: &str, dialog_id: &str,
) -> Result<UnifiedSessionId, AgentError> { ) -> Result<UnifiedSessionId, AgentError> {
Err(AgentError::Other("switch_dialog not applicable in new architecture".to_string())) let unified_id = UnifiedSessionId::new(channel, chat_id, dialog_id);
// Ensure session is loaded into memory
self.get_or_create_session(&unified_id).await?;
Ok(unified_id)
} }
pub async fn list_dialogs( pub async fn list_dialogs(
&self, &self,
_channel: &str, channel: &str,
_chat_id: &str, chat_id: &str,
_include_archived: bool, _include_archived: bool,
) -> Result<(Vec<DialogInfo>, Option<String>), AgentError> { ) -> Result<(Vec<DialogInfo>, Option<String>), AgentError> {
Ok((vec![], None)) let metas = self.storage.list_sessions(channel, chat_id, 10).await
.map_err(|e| AgentError::Other(format!("failed to list dialogs: {}", e)))?;
let dialogs: Vec<DialogInfo> = metas.into_iter().map(|meta| {
DialogInfo {
session_id: UnifiedSessionId::new(channel, chat_id, &meta.dialog_id),
title: meta.title,
created_at: meta.created_at,
last_active_at: meta.last_active_at,
message_count: meta.message_count,
archived_at: None,
}
}).collect();
Ok((dialogs, None))
} }
pub fn rename_dialog(&self, _session_id: &UnifiedSessionId, _title: &str) -> Result<(), AgentError> { pub async fn rename_dialog(&self, session_id: &UnifiedSessionId, title: &str) -> Result<(), AgentError> {
Err(AgentError::Other("rename_dialog not available".to_string())) // Update in-memory session
let session = self.get_or_create_session(session_id).await?;
let mut session_guard = session.lock().await;
session_guard.title = title.to_string();
session_guard.persist_session_meta().await
.map_err(|e| AgentError::Other(format!("failed to rename dialog: {}", e)))?;
Ok(())
}
pub async fn delete_dialog(&self, session_id: &UnifiedSessionId) -> Result<(), AgentError> {
let session_id_str = session_id.to_string();
// Soft delete from Storage
self.storage.soft_delete_session(&session_id_str).await
.map_err(|e| AgentError::Other(format!("failed to delete dialog: {}", e)))?;
// Remove from memory
let mut inner = self.inner.lock().await;
inner.sessions.remove(&session_id_str);
inner.session_timestamps.remove(&session_id_str);
Ok(())
} }
pub fn archive_dialog(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> { pub fn archive_dialog(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> {
Err(AgentError::Other("archive_dialog not available".to_string())) // Archive concept removed - just return OK
} Ok(())
pub fn delete_dialog(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> {
Err(AgentError::Other("delete_dialog not available".to_string()))
} }
pub fn clear_dialog_history(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> { pub fn clear_dialog_history(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> {
@ -864,8 +980,23 @@ impl SessionManager {
content: &str, content: &str,
media: Vec<crate::bus::MediaItem>, media: Vec<crate::bus::MediaItem>,
) -> Result<HandleResult, AgentError> { ) -> Result<HandleResult, AgentError> {
let dialog_id = dialog_id.unwrap_or(DEFAULT_DIALOG_ID); // Determine dialog_id: if not provided, find most recent active or create new
let unified_id = UnifiedSessionId::new(channel, chat_id, dialog_id); let unified_id = if let Some(did) = dialog_id {
UnifiedSessionId::new(channel, chat_id, did)
} else {
// Find active session from Storage
let ttl_millis = self.inner.lock().await.session_ttl.as_millis() as i64;
match self.storage.find_active_session(channel, chat_id, ttl_millis).await {
Ok(Some(meta)) => {
UnifiedSessionId::new(channel, chat_id, &meta.dialog_id)
}
Ok(None) | Err(_) => {
// Create new session
let (new_id, _) = self.create_session(channel, chat_id, None, String::new()).await?;
new_id
}
}
};
let session = self.get_or_create_session(&unified_id).await?; let session = self.get_or_create_session(&unified_id).await?;
// Check for slash command // Check for slash command