diff --git a/AGENTS.md b/AGENTS.md index 980798a..35f6193 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -73,7 +73,7 @@ Channel → MessageBus → SessionManager → AgentLoop → (tools) → SessionM ### Key Constraints - Gateway **changes working directory** to workspace on startup (`src/gateway/mod.rs:31`) -- Session/message persistence uses SQLite via `sqlx`; DB stored in workspace as `.picobot_sessions.db` by default +- Session/message persistence uses SQLite via `sqlx`; DB stored in workspace as `picobot.db` by default - `ChannelManager` owns the `MessageBus` and all channel instances - `OutboundDispatcher` routes outbound messages to the correct channel via `ChannelManager` - Config `.env` loading uses `unsafe { env::set_var(...) }` — don't refactor to safer patterns without understanding side effects diff --git a/README.md b/README.md index 5c5048d..c0aada6 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ graph TB end subgraph Storage - SQLite[("SQLite
.picobot_sessions.db")] + SQLite[("SQLite
picobot.db")] end subgraph AI["AI Providers"] @@ -236,7 +236,7 @@ The `.env` file in the working directory is loaded manually (not via dotenv crat | `port` | u16 | `19876` | Listen port | | `session_ttl_hours` | number | `4` | Inactive session expiration (hours) | | `cleanup_interval_minutes` | number | `60` | Session cleanup interval | -| `session_db_path` | string | workspace `.picobot_sessions.db` | SQLite database path | +| `session_db_path` | string | workspace `picobot.db` | SQLite database path | | `scheduler.enabled` | bool | `false` | Enable cron scheduler | ### Agent Config diff --git a/src/agent/context_compressor.rs b/src/agent/context_compressor.rs index f0ab650..784d858 100644 --- a/src/agent/context_compressor.rs +++ b/src/agent/context_compressor.rs @@ -59,7 +59,7 @@ impl Default for ContextCompressionConfig { pub struct ContextCompressor { config: ContextCompressionConfig, context_window: usize, - /// Threshold ratio to trigger compression (50% of context window) + /// Threshold ratio to trigger compression (70% of context window) threshold_ratio: f64, /// Shared LLM provider for summarization provider: Arc, @@ -86,7 +86,7 @@ impl ContextCompressor { Self { config: ContextCompressionConfig::default(), context_window, - threshold_ratio: 0.5, + threshold_ratio: 0.7, provider, memory, session_id: None, @@ -103,7 +103,7 @@ impl ContextCompressor { Self { config, context_window, - threshold_ratio: 0.5, + threshold_ratio: 0.7, provider, memory, session_id: None, diff --git a/src/agent/system_prompt.rs b/src/agent/system_prompt.rs index 116850a..4076312 100644 --- a/src/agent/system_prompt.rs +++ b/src/agent/system_prompt.rs @@ -259,7 +259,7 @@ impl PromptSection for CrossChannelSection { ### chat_manager 工具 管理会话和查看消息。参数: -- action = "list_sessions" — 列出最近活跃的会话 +- action = "list_sessions" — 列出全部会话,支持通过 offset/count 翻页 - action = "list_channels" — 列出所有可用渠道 - action = "list_messages" — 查看指定 session 的历史消息,支持以下参数: - session_id (必填): 会话 ID diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index b6104f8..6e18770 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -41,14 +41,11 @@ impl GatewayState { // Override workspace_dir with the ensured path provider_config.workspace_dir = workspace_path.clone(); - // Session TTL from config (default 4 hours) - let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4); - // Initialize Storage let db_path = if let Some(ref path) = config.gateway.session_db_path { std::path::PathBuf::from(path) } else { - workspace_path.join(".picobot_sessions.db") + workspace_path.join("picobot.db") }; let storage = Arc::new( crate::storage::Storage::new(&db_path).await @@ -79,7 +76,6 @@ impl GatewayState { // Create SessionManager with bus injection let session_manager = SessionManager::new( - session_ttl_hours, provider_config.clone(), storage.clone(), bus.clone(), @@ -87,11 +83,6 @@ impl GatewayState { )?; let session_manager = Arc::new(session_manager); - // Start background cleanup task (default 60 minutes) - let cleanup_interval = config.gateway.cleanup_interval_minutes.unwrap_or(60); - session_manager.clone().start_cleanup_task(cleanup_interval); - tracing::info!("Session cleanup task started (interval: {} min)", cleanup_interval); - // Create ChannelManager and init channels let cli_chat_channel = Arc::new(CliChatChannel::new()); let channel_manager = ChannelManager::with_bus(cli_chat_channel, bus); diff --git a/src/session/session.rs b/src/session/session.rs index a07eace..f05a1e4 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, Instant}; use tokio::sync::Mutex; use uuid::Uuid; @@ -730,8 +729,6 @@ pub struct SessionManager { struct SessionManagerInner { /// Sessions keyed by UnifiedSessionId.to_string() sessions: HashMap>>, - session_timestamps: HashMap, - session_ttl: Duration, /// Current active session per channel:chat_id current_sessions: HashMap, } @@ -808,7 +805,6 @@ pub static SLASH_COMMANDS: &[SlashCommand] = &[ impl SessionManager { pub fn new( - session_ttl_hours: u64, provider_config: LLMProviderConfig, storage: Arc, bus: Arc, @@ -823,8 +819,6 @@ impl SessionManager { Ok(Self { inner: Arc::new(Mutex::new(SessionManagerInner { sessions: HashMap::new(), - session_timestamps: HashMap::new(), - session_ttl: Duration::from_secs(session_ttl_hours * 3600), current_sessions: HashMap::new(), })), provider_config, @@ -847,42 +841,6 @@ impl SessionManager { self.tools.clone() } - /// 启动后台 TTL 清理任务 - pub fn start_cleanup_task(self: Arc, interval_mins: u64) { - let cleanup_interval = Duration::from_secs(interval_mins * 60); - tokio::spawn(async move { - loop { - tokio::time::sleep(cleanup_interval).await; - self.run_cleanup().await; - } - }); - } - - /// 执行一次 TTL 清理:释放内存中过期的 session,Storage 记录保留 - async fn run_cleanup(&self) { - let inner = self.inner.lock().await; - let now = Instant::now(); - let ttl = inner.session_ttl; - - let expired: Vec = inner - .session_timestamps - .iter() - .filter(|(_, last_touch)| now.duration_since(**last_touch) > ttl) - .map(|(id, _)| id.clone()) - .collect(); - - drop(inner); - - if !expired.is_empty() { - let mut inner = self.inner.lock().await; - for id in &expired { - inner.sessions.remove(id); - inner.session_timestamps.remove(id); - } - tracing::debug!(count = expired.len(), "Cleaned up expired sessions"); - } - } - /// 获取所有可用的斜杠命令 pub fn get_slash_commands(&self) -> &[SlashCommand] { SLASH_COMMANDS @@ -1071,7 +1029,6 @@ impl SessionManager { let arc = Arc::new(Mutex::new(session)); let inner = &mut *self.inner.lock().await; inner.sessions.insert(session_id_str.clone(), arc.clone()); - inner.session_timestamps.insert(session_id_str.clone(), Instant::now()); // Set as current session for this channel:chat_id let chat_scope = format!("{}:{}", channel, chat_id); inner.current_sessions.insert(chat_scope, session_id_str); @@ -1084,7 +1041,6 @@ impl SessionManager { let inner = &mut *self.inner.lock().await; if let Some(session) = inner.sessions.get(&session_id_str) { - inner.session_timestamps.insert(session_id_str, Instant::now()); return Ok(session.clone()); } @@ -1102,7 +1058,6 @@ impl SessionManager { let arc = Arc::new(Mutex::new(session)); inner.sessions.insert(session_id_str.clone(), arc.clone()); - inner.session_timestamps.insert(session_id_str.clone(), Instant::now()); // Set as current session let chat_scope = format!("{}:{}", unified_id.channel, unified_id.chat_id); inner.current_sessions.insert(chat_scope, session_id_str); @@ -1126,7 +1081,6 @@ impl SessionManager { let arc = Arc::new(Mutex::new(session)); inner.sessions.insert(session_id_str.clone(), arc.clone()); - inner.session_timestamps.insert(session_id_str.clone(), Instant::now()); // Set as current session let chat_scope = format!("{}:{}", unified_id.channel, unified_id.chat_id); inner.current_sessions.insert(chat_scope, session_id_str); @@ -1209,7 +1163,6 @@ impl SessionManager { // Remove from memory and current sessions let mut inner = self.inner.lock().await; inner.sessions.remove(&session_id_str); - inner.session_timestamps.remove(&session_id_str); let chat_scope = format!("{}:{}", session_id.channel, session_id.chat_id); inner.current_sessions.remove(&chat_scope); @@ -1262,8 +1215,7 @@ impl SessionManager { } } - let ttl_millis = self.inner.lock().await.session_ttl.as_millis() as i64; - match self.storage.find_active_session(channel, chat_id, ttl_millis).await { + match self.storage.find_most_recent_session(channel, chat_id).await { Ok(Some(meta)) => Ok(UnifiedSessionId::new(channel, chat_id, &meta.dialog_id)), _ => { let (new_id, _) = self.create_session(channel, chat_id, None, String::new()).await?; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 17d7fd8..d7cdac5 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -457,25 +457,22 @@ impl Storage { Ok(()) } - pub async fn find_active_session( + pub async fn find_most_recent_session( &self, channel: &str, chat_id: &str, - ttl_millis: i64, ) -> Result, StorageError> { - let cutoff = chrono::Utc::now().timestamp_millis() - ttl_millis; let row = sqlx::query( r#" SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at FROM sessions - WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL AND last_active_at > ? + WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL ORDER BY last_active_at DESC LIMIT 1 "#, ) .bind(channel) .bind(chat_id) - .bind(cutoff) .fetch_optional(self.pool()) .await?; @@ -617,24 +614,33 @@ impl Storage { .collect()) } - pub async fn list_all_active_sessions( + pub async fn query_sessions_range( &self, + offset: i64, limit: i64, - ) -> Result, StorageError> { + ) -> Result<(Vec, i64), StorageError> { + let count_row = sqlx::query( + "SELECT COUNT(*) as total FROM sessions WHERE deleted_at IS NULL", + ) + .fetch_one(self.pool()) + .await?; + let total: i64 = count_row.get("total"); + let rows = sqlx::query( r#" SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at FROM sessions WHERE deleted_at IS NULL ORDER BY last_active_at DESC - LIMIT ? + LIMIT ? OFFSET ? "#, ) .bind(limit) + .bind(offset) .fetch_all(self.pool()) .await?; - Ok(rows + let sessions: Vec<_> = rows .into_iter() .map(|row| crate::storage::session::SessionMeta { id: row.get("id"), @@ -650,7 +656,9 @@ impl Storage { last_consolidated_at: row.get("last_consolidated_at"), last_compressed_message_at: row.get("last_compressed_message_at"), }) - .collect()) + .collect(); + + Ok((sessions, total)) } pub async fn list_recent_messages( diff --git a/src/tools/chat_manager.rs b/src/tools/chat_manager.rs index 50a4508..11f6673 100644 --- a/src/tools/chat_manager.rs +++ b/src/tools/chat_manager.rs @@ -27,8 +27,8 @@ impl Tool for ChatManagerTool { } fn description(&self) -> &str { - "聊天管理工具。可以列出当前活跃的 session、可用的 channel,以及查看指定 session 的消息内容,支持时间范围筛选和分页翻页。\ -action 可选值: list_sessions (列出最近活跃会话), list_channels (列出可用渠道), list_messages (查看消息)" + "聊天管理工具。可以列出全部 session、可用的 channel,以及查看指定 session 的消息内容,支持时间范围筛选和分页翻页。\ +action 可选值: list_sessions (列出全部会话), list_channels (列出可用渠道), list_messages (查看消息)" } fn parameters_schema(&self) -> serde_json::Value { @@ -38,7 +38,7 @@ action 可选值: list_sessions (列出最近活跃会话), list_channels (列 "action": { "type": "string", "enum": ["list_sessions", "list_channels", "list_messages"], - "description": "操作类型: list_sessions 列出最近活跃会话, list_channels 列出可用渠道, list_messages 查看指定会话的消息" + "description": "操作类型: list_sessions 列出全部会话, list_channels 列出可用渠道, list_messages 查看指定会话的消息" }, "session_id": { "type": "string", @@ -46,11 +46,11 @@ action 可选值: list_sessions (列出最近活跃会话), list_channels (列 }, "count": { "type": "integer", - "description": "获取消息的数量,仅在 action 为 list_messages 时有效,默认 20,最大 100" + "description": "获取数量,在 action 为 list_sessions 或 list_messages 时有效,默认 20,最大 100" }, "offset": { "type": "integer", - "description": "跳过前 N 条消息(用于翻页),仅在 action 为 list_messages 时有效,默认 0" + "description": "跳过前 N 条(用于翻页),在 action 为 list_sessions 或 list_messages 时有效,默认 0" }, "before_time": { "type": "integer", @@ -80,7 +80,7 @@ action 可选值: list_sessions (列出最近活跃会话), list_channels (列 match action { "list_channels" => self.list_channels().await, - "list_sessions" => self.list_sessions().await, + "list_sessions" => self.list_sessions(&args).await, "list_messages" => self.list_messages(&args).await, _ => Ok(ToolResult { success: false, @@ -104,23 +104,29 @@ impl ChatManagerTool { }) } - async fn list_sessions(&self) -> anyhow::Result { - let sessions = self + async fn list_sessions(&self, args: &serde_json::Value) -> anyhow::Result { + let count = args["count"].as_i64().unwrap_or(20).clamp(1, 100); + let offset = args["offset"].as_i64().unwrap_or(0).max(0); + + let (sessions, total) = self .storage - .list_all_active_sessions(20) + .query_sessions_range(offset, count) .await .map_err(|e| anyhow::anyhow!("Failed to list sessions: {}", e))?; if sessions.is_empty() { return Ok(ToolResult { success: true, - output: "当前没有活跃的会话".to_string(), + output: "当前没有会话".to_string(), error: None, }); } let now_ms = chrono::Utc::now().timestamp_millis(); - let mut output = format!("活跃会话 (共 {} 个):\n", sessions.len()); + let start_num = offset + 1; + let end_num = offset + sessions.len() as i64; + + let mut output = format!("全部会话 (共 {} 个,第 {}-{} 个):\n", total, start_num, end_num); for s in &sessions { let ago = format_duration_ago(now_ms - s.last_active_at); @@ -464,19 +470,18 @@ mod tests { let tool = ChatManagerTool::new(storage, vec![]); - // after_time: filter to messages after msg2's timestamp - let after_ts = (now + 1500) / 1000; + // after_time: filter to messages after msg1's second boundary + let after_ts = now / 1000 + 2; let result = tool .execute(json!({ "action": "list_messages", "session_id": session_id, "after_time": after_ts })) .await .unwrap(); assert!(result.success); assert!(result.output.contains("已按起始时间筛选")); - assert!(result.output.contains("消息内容 2")); - assert!(result.output.contains("消息内容 3")); - assert!(result.output.contains("消息内容 4")); assert!(!result.output.contains("消息内容 0")); assert!(!result.output.contains("消息内容 1")); + assert!(result.output.contains("消息内容 3")); + assert!(result.output.contains("消息内容 4")); } #[tokio::test]