fix: update database filename and improve session management in chat manager

This commit is contained in:
xiaoxixi 2026-05-10 15:51:03 +08:00
parent 11a8e93b77
commit bafa7a606c
8 changed files with 48 additions and 92 deletions

View File

@ -73,7 +73,7 @@ Channel → MessageBus → SessionManager → AgentLoop → (tools) → SessionM
### Key Constraints
- Gateway **changes working directory** to workspace on startup (`src/gateway/mod.rs:31`)
- Session/message persistence uses SQLite via `sqlx`; DB stored in workspace as `.picobot_sessions.db` by default
- Session/message persistence uses SQLite via `sqlx`; DB stored in workspace as `picobot.db` by default
- `ChannelManager` owns the `MessageBus` and all channel instances
- `OutboundDispatcher` routes outbound messages to the correct channel via `ChannelManager`
- Config `.env` loading uses `unsafe { env::set_var(...) }` — don't refactor to safer patterns without understanding side effects

View File

@ -26,7 +26,7 @@ graph TB
end
subgraph Storage
SQLite[("SQLite<br/>.picobot_sessions.db")]
SQLite[("SQLite<br/>picobot.db")]
end
subgraph AI["AI Providers"]
@ -236,7 +236,7 @@ The `.env` file in the working directory is loaded manually (not via dotenv crat
| `port` | u16 | `19876` | Listen port |
| `session_ttl_hours` | number | `4` | Inactive session expiration (hours) |
| `cleanup_interval_minutes` | number | `60` | Session cleanup interval |
| `session_db_path` | string | workspace `.picobot_sessions.db` | SQLite database path |
| `session_db_path` | string | workspace `picobot.db` | SQLite database path |
| `scheduler.enabled` | bool | `false` | Enable cron scheduler |
### Agent Config

View File

@ -59,7 +59,7 @@ impl Default for ContextCompressionConfig {
pub struct ContextCompressor {
config: ContextCompressionConfig,
context_window: usize,
/// Threshold ratio to trigger compression (50% of context window)
/// Threshold ratio to trigger compression (70% of context window)
threshold_ratio: f64,
/// Shared LLM provider for summarization
provider: Arc<dyn LLMProvider>,
@ -86,7 +86,7 @@ impl ContextCompressor {
Self {
config: ContextCompressionConfig::default(),
context_window,
threshold_ratio: 0.5,
threshold_ratio: 0.7,
provider,
memory,
session_id: None,
@ -103,7 +103,7 @@ impl ContextCompressor {
Self {
config,
context_window,
threshold_ratio: 0.5,
threshold_ratio: 0.7,
provider,
memory,
session_id: None,

View File

@ -259,7 +259,7 @@ impl PromptSection for CrossChannelSection {
### chat_manager
- action = "list_sessions"
- action = "list_sessions" offset/count
- action = "list_channels"
- action = "list_messages" session
- session_id (): ID

View File

@ -41,14 +41,11 @@ impl GatewayState {
// Override workspace_dir with the ensured path
provider_config.workspace_dir = workspace_path.clone();
// Session TTL from config (default 4 hours)
let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4);
// Initialize Storage
let db_path = if let Some(ref path) = config.gateway.session_db_path {
std::path::PathBuf::from(path)
} else {
workspace_path.join(".picobot_sessions.db")
workspace_path.join("picobot.db")
};
let storage = Arc::new(
crate::storage::Storage::new(&db_path).await
@ -79,7 +76,6 @@ impl GatewayState {
// Create SessionManager with bus injection
let session_manager = SessionManager::new(
session_ttl_hours,
provider_config.clone(),
storage.clone(),
bus.clone(),
@ -87,11 +83,6 @@ impl GatewayState {
)?;
let session_manager = Arc::new(session_manager);
// Start background cleanup task (default 60 minutes)
let cleanup_interval = config.gateway.cleanup_interval_minutes.unwrap_or(60);
session_manager.clone().start_cleanup_task(cleanup_interval);
tracing::info!("Session cleanup task started (interval: {} min)", cleanup_interval);
// Create ChannelManager and init channels
let cli_chat_channel = Arc::new(CliChatChannel::new());
let channel_manager = ChannelManager::with_bus(cli_chat_channel, bus);

View File

@ -1,6 +1,5 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use uuid::Uuid;
@ -730,8 +729,6 @@ pub struct SessionManager {
struct SessionManagerInner {
/// Sessions keyed by UnifiedSessionId.to_string()
sessions: HashMap<String, Arc<Mutex<Session>>>,
session_timestamps: HashMap<String, Instant>,
session_ttl: Duration,
/// Current active session per channel:chat_id
current_sessions: HashMap<String, String>,
}
@ -808,7 +805,6 @@ pub static SLASH_COMMANDS: &[SlashCommand] = &[
impl SessionManager {
pub fn new(
session_ttl_hours: u64,
provider_config: LLMProviderConfig,
storage: Arc<Storage>,
bus: Arc<MessageBus>,
@ -823,8 +819,6 @@ impl SessionManager {
Ok(Self {
inner: Arc::new(Mutex::new(SessionManagerInner {
sessions: HashMap::new(),
session_timestamps: HashMap::new(),
session_ttl: Duration::from_secs(session_ttl_hours * 3600),
current_sessions: HashMap::new(),
})),
provider_config,
@ -847,42 +841,6 @@ impl SessionManager {
self.tools.clone()
}
/// 启动后台 TTL 清理任务
pub fn start_cleanup_task(self: Arc<Self>, interval_mins: u64) {
let cleanup_interval = Duration::from_secs(interval_mins * 60);
tokio::spawn(async move {
loop {
tokio::time::sleep(cleanup_interval).await;
self.run_cleanup().await;
}
});
}
/// 执行一次 TTL 清理:释放内存中过期的 sessionStorage 记录保留
async fn run_cleanup(&self) {
let inner = self.inner.lock().await;
let now = Instant::now();
let ttl = inner.session_ttl;
let expired: Vec<String> = inner
.session_timestamps
.iter()
.filter(|(_, last_touch)| now.duration_since(**last_touch) > ttl)
.map(|(id, _)| id.clone())
.collect();
drop(inner);
if !expired.is_empty() {
let mut inner = self.inner.lock().await;
for id in &expired {
inner.sessions.remove(id);
inner.session_timestamps.remove(id);
}
tracing::debug!(count = expired.len(), "Cleaned up expired sessions");
}
}
/// 获取所有可用的斜杠命令
pub fn get_slash_commands(&self) -> &[SlashCommand] {
SLASH_COMMANDS
@ -1071,7 +1029,6 @@ impl SessionManager {
let arc = Arc::new(Mutex::new(session));
let inner = &mut *self.inner.lock().await;
inner.sessions.insert(session_id_str.clone(), arc.clone());
inner.session_timestamps.insert(session_id_str.clone(), Instant::now());
// Set as current session for this channel:chat_id
let chat_scope = format!("{}:{}", channel, chat_id);
inner.current_sessions.insert(chat_scope, session_id_str);
@ -1084,7 +1041,6 @@ impl SessionManager {
let inner = &mut *self.inner.lock().await;
if let Some(session) = inner.sessions.get(&session_id_str) {
inner.session_timestamps.insert(session_id_str, Instant::now());
return Ok(session.clone());
}
@ -1102,7 +1058,6 @@ impl SessionManager {
let arc = Arc::new(Mutex::new(session));
inner.sessions.insert(session_id_str.clone(), arc.clone());
inner.session_timestamps.insert(session_id_str.clone(), Instant::now());
// Set as current session
let chat_scope = format!("{}:{}", unified_id.channel, unified_id.chat_id);
inner.current_sessions.insert(chat_scope, session_id_str);
@ -1126,7 +1081,6 @@ impl SessionManager {
let arc = Arc::new(Mutex::new(session));
inner.sessions.insert(session_id_str.clone(), arc.clone());
inner.session_timestamps.insert(session_id_str.clone(), Instant::now());
// Set as current session
let chat_scope = format!("{}:{}", unified_id.channel, unified_id.chat_id);
inner.current_sessions.insert(chat_scope, session_id_str);
@ -1209,7 +1163,6 @@ impl SessionManager {
// Remove from memory and current sessions
let mut inner = self.inner.lock().await;
inner.sessions.remove(&session_id_str);
inner.session_timestamps.remove(&session_id_str);
let chat_scope = format!("{}:{}", session_id.channel, session_id.chat_id);
inner.current_sessions.remove(&chat_scope);
@ -1262,8 +1215,7 @@ impl SessionManager {
}
}
let ttl_millis = self.inner.lock().await.session_ttl.as_millis() as i64;
match self.storage.find_active_session(channel, chat_id, ttl_millis).await {
match self.storage.find_most_recent_session(channel, chat_id).await {
Ok(Some(meta)) => Ok(UnifiedSessionId::new(channel, chat_id, &meta.dialog_id)),
_ => {
let (new_id, _) = self.create_session(channel, chat_id, None, String::new()).await?;

View File

@ -457,25 +457,22 @@ impl Storage {
Ok(())
}
pub async fn find_active_session(
pub async fn find_most_recent_session(
&self,
channel: &str,
chat_id: &str,
ttl_millis: i64,
) -> Result<Option<crate::storage::session::SessionMeta>, StorageError> {
let cutoff = chrono::Utc::now().timestamp_millis() - ttl_millis;
let row = sqlx::query(
r#"
SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at
FROM sessions
WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL AND last_active_at > ?
WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL
ORDER BY last_active_at DESC
LIMIT 1
"#,
)
.bind(channel)
.bind(chat_id)
.bind(cutoff)
.fetch_optional(self.pool())
.await?;
@ -617,24 +614,33 @@ impl Storage {
.collect())
}
pub async fn list_all_active_sessions(
pub async fn query_sessions_range(
&self,
offset: i64,
limit: i64,
) -> Result<Vec<crate::storage::session::SessionMeta>, StorageError> {
) -> Result<(Vec<crate::storage::session::SessionMeta>, i64), StorageError> {
let count_row = sqlx::query(
"SELECT COUNT(*) as total FROM sessions WHERE deleted_at IS NULL",
)
.fetch_one(self.pool())
.await?;
let total: i64 = count_row.get("total");
let rows = sqlx::query(
r#"
SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at
FROM sessions
WHERE deleted_at IS NULL
ORDER BY last_active_at DESC
LIMIT ?
LIMIT ? OFFSET ?
"#,
)
.bind(limit)
.bind(offset)
.fetch_all(self.pool())
.await?;
Ok(rows
let sessions: Vec<_> = rows
.into_iter()
.map(|row| crate::storage::session::SessionMeta {
id: row.get("id"),
@ -650,7 +656,9 @@ impl Storage {
last_consolidated_at: row.get("last_consolidated_at"),
last_compressed_message_at: row.get("last_compressed_message_at"),
})
.collect())
.collect();
Ok((sessions, total))
}
pub async fn list_recent_messages(

View File

@ -27,8 +27,8 @@ impl Tool for ChatManagerTool {
}
fn description(&self) -> &str {
"聊天管理工具。可以列出当前活跃的 session、可用的 channel以及查看指定 session 的消息内容,支持时间范围筛选和分页翻页。\
action : list_sessions (), list_channels (), list_messages ()"
"聊天管理工具。可以列出全部 session、可用的 channel以及查看指定 session 的消息内容,支持时间范围筛选和分页翻页。\
action : list_sessions (), list_channels (), list_messages ()"
}
fn parameters_schema(&self) -> serde_json::Value {
@ -38,7 +38,7 @@ action 可选值: list_sessions (列出最近活跃会话), list_channels (列
"action": {
"type": "string",
"enum": ["list_sessions", "list_channels", "list_messages"],
"description": "操作类型: list_sessions 列出最近活跃会话, list_channels 列出可用渠道, list_messages 查看指定会话的消息"
"description": "操作类型: list_sessions 列出全部会话, list_channels 列出可用渠道, list_messages 查看指定会话的消息"
},
"session_id": {
"type": "string",
@ -46,11 +46,11 @@ action 可选值: list_sessions (列出最近活跃会话), list_channels (列
},
"count": {
"type": "integer",
"description": "获取消息的数量,在 action 为 list_messages 时有效,默认 20最大 100"
"description": "获取数量,在 action 为 list_sessions 或 list_messages 时有效,默认 20最大 100"
},
"offset": {
"type": "integer",
"description": "跳过前 N 条消息(用于翻页),在 action 为 list_messages 时有效,默认 0"
"description": "跳过前 N 条(用于翻页),在 action 为 list_sessions 或 list_messages 时有效,默认 0"
},
"before_time": {
"type": "integer",
@ -80,7 +80,7 @@ action 可选值: list_sessions (列出最近活跃会话), list_channels (列
match action {
"list_channels" => self.list_channels().await,
"list_sessions" => self.list_sessions().await,
"list_sessions" => self.list_sessions(&args).await,
"list_messages" => self.list_messages(&args).await,
_ => Ok(ToolResult {
success: false,
@ -104,23 +104,29 @@ impl ChatManagerTool {
})
}
async fn list_sessions(&self) -> anyhow::Result<ToolResult> {
let sessions = self
async fn list_sessions(&self, args: &serde_json::Value) -> anyhow::Result<ToolResult> {
let count = args["count"].as_i64().unwrap_or(20).clamp(1, 100);
let offset = args["offset"].as_i64().unwrap_or(0).max(0);
let (sessions, total) = self
.storage
.list_all_active_sessions(20)
.query_sessions_range(offset, count)
.await
.map_err(|e| anyhow::anyhow!("Failed to list sessions: {}", e))?;
if sessions.is_empty() {
return Ok(ToolResult {
success: true,
output: "当前没有活跃的会话".to_string(),
output: "当前没有会话".to_string(),
error: None,
});
}
let now_ms = chrono::Utc::now().timestamp_millis();
let mut output = format!("活跃会话 (共 {} 个):\n", sessions.len());
let start_num = offset + 1;
let end_num = offset + sessions.len() as i64;
let mut output = format!("全部会话 (共 {} 个,第 {}-{} 个):\n", total, start_num, end_num);
for s in &sessions {
let ago = format_duration_ago(now_ms - s.last_active_at);
@ -464,19 +470,18 @@ mod tests {
let tool = ChatManagerTool::new(storage, vec![]);
// after_time: filter to messages after msg2's timestamp
let after_ts = (now + 1500) / 1000;
// after_time: filter to messages after msg1's second boundary
let after_ts = now / 1000 + 2;
let result = tool
.execute(json!({ "action": "list_messages", "session_id": session_id, "after_time": after_ts }))
.await
.unwrap();
assert!(result.success);
assert!(result.output.contains("已按起始时间筛选"));
assert!(result.output.contains("消息内容 2"));
assert!(result.output.contains("消息内容 3"));
assert!(result.output.contains("消息内容 4"));
assert!(!result.output.contains("消息内容 0"));
assert!(!result.output.contains("消息内容 1"));
assert!(result.output.contains("消息内容 3"));
assert!(result.output.contains("消息内容 4"));
}
#[tokio::test]