删除旧的session持久化机制

This commit is contained in:
xiaoxixi 2026-04-28 20:34:44 +08:00
parent 61eea62bfc
commit 6c50f433d1
5 changed files with 55 additions and 930 deletions

View File

@ -27,7 +27,6 @@ mime_guess = "2.0"
base64 = "0.22"
tempfile = "3"
meval = "0.2"
rusqlite = { version = "0.32", features = ["bundled"] }
ratatui = "0.27"
crossterm = { version = "0.28", features = ["event-stream"] }
termimad = "0.34"

View File

@ -9,6 +9,5 @@ pub mod protocol;
pub mod channels;
pub mod logging;
pub mod observability;
pub mod storage;
pub mod skills;
pub mod tools;

View File

@ -14,9 +14,3 @@ impl From<crate::channels::base::ChannelError> for SessionError {
SessionError::Other(e.to_string())
}
}
impl From<crate::storage::StorageError> for SessionError {
fn from(e: crate::storage::StorageError) -> Self {
SessionError::Other(e.to_string())
}
}

View File

@ -8,13 +8,13 @@ use uuid::Uuid;
use crate::bus::ChatMessage;
use crate::config::LLMProviderConfig;
use crate::agent::{AgentLoop, AgentError, ContextCompressor};
use crate::agent::system_prompt::build_system_prompt;
use crate::agent::context_compressor::ContextCompressionConfig;
use crate::protocol::WsOutbound;
use crate::providers::{create_provider, LLMProvider};
use crate::session::session_id::{UnifiedSessionId, DEFAULT_DIALOG_ID};
use crate::session::events::DialogInfo;
use crate::skills::SkillsLoader;
use crate::storage::{SessionRecord, SessionStore};
use crate::tools::{
BashTool, CalculatorTool, FileEditTool, FileReadTool, FileWriteTool,
GetSkillTool, HttpRequestTool, ToolRegistry, WebFetchTool,
@ -35,7 +35,6 @@ pub struct Session {
provider: Arc<dyn LLMProvider>,
tools: Arc<ToolRegistry>,
compressor: ContextCompressor,
store: Arc<SessionStore>,
}
impl Session {
@ -44,7 +43,6 @@ impl Session {
provider_config: LLMProviderConfig,
user_tx: mpsc::Sender<WsOutbound>,
tools: Arc<ToolRegistry>,
store: Arc<SessionStore>,
) -> Result<Self, AgentError> {
let provider_box = create_provider(provider_config.clone())
.map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?;
@ -63,34 +61,14 @@ impl Session {
provider: provider.clone(),
tools,
compressor: ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config),
store,
})
}
/// 获取持久化 session ID
pub fn persistent_session_id(&self) -> String {
/// 获取 session ID
pub fn session_id(&self) -> String {
self.id.to_string()
}
/// 确保存储中有此 session
pub fn ensure_persistent_session(&self) -> Result<SessionRecord, AgentError> {
self.store
.ensure_channel_session(&self.id.channel, &self.id.chat_id, &self.id.dialog_id)
.map_err(|err| AgentError::Other(format!("session persistence error: {}", err)))
}
/// 加载历史消息到内存
pub fn load_history(&mut self) -> Result<(), AgentError> {
if !self.messages.is_empty() {
return Ok(());
}
let history = self.store
.load_messages(&self.persistent_session_id())
.map_err(|err| AgentError::Other(format!("session history load error: {}", err)))?;
self.messages = history;
Ok(())
}
/// 添加消息到历史
pub fn add_message(&mut self, message: ChatMessage) {
self.messages.push(message);
@ -102,50 +80,19 @@ impl Session {
}
/// 清除历史消息
pub fn clear_history(&mut self) -> Result<(), AgentError> {
pub fn clear_history(&mut self) {
let len = self.messages.len();
self.messages.clear();
#[cfg(debug_assertions)]
tracing::debug!(session_id = %self.id, previous_len = len, "Chat history cleared");
self.store
.clear_messages(&self.persistent_session_id())
.map_err(|err| AgentError::Other(format!("clear history error: {}", err)))
}
/// 重置对话上下文
pub fn reset_context(&mut self) -> Result<(), AgentError> {
pub fn reset_context(&mut self) {
let len = self.messages.len();
self.messages.clear();
#[cfg(debug_assertions)]
tracing::debug!(session_id = %self.id, previous_len = len, "Chat context reset in memory");
self.store
.reset_session(&self.persistent_session_id())
.map_err(|err| AgentError::Other(format!("reset context error: {}", err)))
}
/// Archive 此 session
pub fn archive(&self) -> Result<(), AgentError> {
self.store
.archive_session(&self.persistent_session_id())
.map_err(|err| AgentError::Other(format!("archive session error: {}", err)))
}
/// 持久化消息
pub fn append_message(&self, message: &ChatMessage) -> Result<(), AgentError> {
self.store
.append_message(&self.persistent_session_id(), message)
.map_err(|err| AgentError::Other(format!("append message error: {}", err)))
}
/// 持久化多条消息
pub fn append_messages<I>(&self, messages: I) -> Result<(), AgentError>
where
I: IntoIterator<Item = ChatMessage>,
{
for message in messages {
self.append_message(&message)?;
}
Ok(())
}
pub fn create_user_message(&self, content: &str, media_refs: Vec<String>) -> ChatMessage {
@ -180,6 +127,21 @@ impl Session {
self.provider_config.workspace_dir.clone(),
))
}
/// 构建系统提示词(包含 AgentLoop 的基础提示词 + skills
pub fn build_system_prompt(&self, skills_prompt: &str) -> String {
let base_prompt = build_system_prompt(
&self.provider_config.workspace_dir,
&self.provider_config.model_id,
&self.tools,
);
if skills_prompt.trim().is_empty() {
base_prompt
} else {
format!("{}\n\n## Skills\n\n{}\n\nUse the `get_skill` tool to load a skill's full content when needed.", base_prompt, skills_prompt)
}
}
}
/// SessionManager 管理所有 Session按 channel_name 路由
@ -188,7 +150,6 @@ pub struct SessionManager {
inner: Arc<Mutex<SessionManagerInner>>,
provider_config: LLMProviderConfig,
tools: Arc<ToolRegistry>,
store: Arc<SessionStore>,
skills_loader: Arc<SkillsLoader>,
}
@ -282,11 +243,6 @@ pub static SLASH_COMMANDS: &[SlashCommand] = &[
impl SessionManager {
pub fn new(session_ttl_hours: u64, provider_config: LLMProviderConfig) -> Result<Self, AgentError> {
let store = Arc::new(
SessionStore::new()
.map_err(|err| AgentError::Other(format!("session store init error: {}", err)))?,
);
let skills_loader = SkillsLoader::new();
skills_loader.load_skills();
let skills_loader = Arc::new(skills_loader);
@ -301,7 +257,6 @@ impl SessionManager {
})),
provider_config,
tools,
store,
skills_loader,
})
}
@ -332,11 +287,6 @@ impl SessionManager {
match cmd.name {
"new" => {
if let Some(sid) = current_session_id {
self.store
.archive_session(&sid.to_string())
.map_err(|e| AgentError::Other(format!("archive session error: {}", e)))?;
}
let title = args.map(|s| s.to_string());
let (new_id, title) = self.create_session(channel, chat_id, title.as_deref()).await?;
Ok((Some(new_id), format!("New conversation '{}' created.", title)))
@ -352,50 +302,28 @@ impl SessionManager {
Ok((Some(unified_id), format!("Switched to session {}", target_id)))
}
"rename" => {
let title = args
let _title = args
.ok_or_else(|| AgentError::Other("Usage: /rename <title>".to_string()))?;
if let Some(sid) = current_session_id {
self.store
.rename_session(&sid.to_string(), title)
.map_err(|e| AgentError::Other(format!("rename session error: {}", e)))?;
Ok((None, format!("Conversation renamed to '{}'.", title)))
} else {
Ok((None, "No active conversation to rename.".to_string()))
}
Ok((None, "Rename not available in this mode.".to_string()))
}
"archive" => {
if let Some(sid) = current_session_id {
self.store
.archive_session(&sid.to_string())
.map_err(|e| AgentError::Other(format!("archive session error: {}", e)))?;
Ok((None, "Conversation archived.".to_string()))
} else {
Ok((None, "No active conversation to archive.".to_string()))
}
Ok((None, "Archive not available in this mode.".to_string()))
}
"delete" => {
if let Some(sid) = current_session_id {
self.store
.delete_session(&sid.to_string())
.map_err(|e| AgentError::Other(format!("delete session error: {}", e)))?;
let (new_id, _title) = self.create_session(channel, chat_id, None).await?;
Ok((Some(new_id), "Conversation deleted. New conversation created.".to_string()))
} else {
Ok((None, "No active conversation to delete.".to_string()))
}
let (new_id, _title) = self.create_session(channel, chat_id, None).await?;
Ok((Some(new_id), "Conversation deleted. New conversation created.".to_string()))
}
"compact" => {
if let Some(sid) = current_session_id {
let session = self.get_or_create_session(sid).await?;
let mut session_guard = session.lock().await;
session_guard.load_history()?;
let original_count = session_guard.get_history().len();
let history = session_guard.get_history().to_vec();
let compressed = session_guard.compressor
.compress_if_needed(history)
.await?;
let compressed_count = compressed.len();
session_guard.clear_history()?;
session_guard.clear_history();
for msg in compressed {
session_guard.add_message(msg);
}
@ -412,7 +340,7 @@ impl SessionManager {
let session = self.get_or_create_session(sid).await?;
let session_guard = session.lock().await;
let message_count = session_guard.get_history().len();
let session_id_str = session_guard.persistent_session_id();
let session_id_str = session_guard.session_id();
Ok((None, format!(
"Session ID: {}\nMessage count: {}",
session_id_str, message_count
@ -425,58 +353,6 @@ impl SessionManager {
}
}
pub fn store(&self) -> Arc<SessionStore> {
self.store.clone()
}
pub fn create_cli_session(&self, title: Option<&str>) -> Result<SessionRecord, AgentError> {
self.store
.create_cli_session(title)
.map_err(|err| AgentError::Other(format!("create session error: {}", err)))
}
pub fn get_session_record(&self, session_id: &str) -> Result<Option<SessionRecord>, AgentError> {
self.store
.get_session(session_id)
.map_err(|err| AgentError::Other(format!("get session error: {}", err)))
}
pub fn list_cli_sessions(&self, include_archived: bool) -> Result<Vec<SessionRecord>, AgentError> {
self.store
.list_sessions("cli", include_archived)
.map_err(|err| AgentError::Other(format!("list sessions error: {}", err)))
}
pub fn rename_session(&self, session_id: &str, title: &str) -> Result<(), AgentError> {
self.store
.rename_session(session_id, title)
.map_err(|err| AgentError::Other(format!("rename session error: {}", err)))
}
pub fn archive_session(&self, session_id: &str) -> Result<(), AgentError> {
self.store
.archive_session(session_id)
.map_err(|err| AgentError::Other(format!("archive session error: {}", err)))
}
pub fn delete_session(&self, session_id: &str) -> Result<(), AgentError> {
self.store
.delete_session(session_id)
.map_err(|err| AgentError::Other(format!("delete session error: {}", err)))
}
pub fn clear_session_messages(&self, session_id: &str) -> Result<(), AgentError> {
self.store
.clear_messages(session_id)
.map_err(|err| AgentError::Other(format!("clear session messages error: {}", err)))
}
pub fn load_session_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, AgentError> {
self.store
.load_messages(session_id)
.map_err(|err| AgentError::Other(format!("load session messages error: {}", err)))
}
pub async fn create_session(
&self,
channel: &str,
@ -493,17 +369,12 @@ impl SessionManager {
.map(ToOwned::to_owned)
.unwrap_or_else(|| format!("Dialog {}", &dialog_id));
self.store
.ensure_channel_session(channel, chat_id, &dialog_id)
.map_err(|err| AgentError::Other(format!("create session error: {}", err)))?;
let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100);
let session = Session::new(
unified_id.clone(),
self.provider_config.clone(),
user_tx,
self.tools.clone(),
self.store.clone(),
).await?;
let arc = Arc::new(Mutex::new(session));
@ -523,29 +394,13 @@ impl SessionManager {
return Ok(session.clone());
}
if let Ok(Some(_)) = self.store.get_session(&session_id_str) {
let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100);
let session = Session::new(
unified_id.clone(),
self.provider_config.clone(),
user_tx,
self.tools.clone(),
self.store.clone(),
).await?;
let arc = Arc::new(Mutex::new(session));
inner.sessions.insert(session_id_str.clone(), arc.clone());
inner.session_timestamps.insert(session_id_str, Instant::now());
return Ok(arc);
}
// Create new session
let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100);
let session = Session::new(
unified_id.clone(),
self.provider_config.clone(),
user_tx,
self.tools.clone(),
self.store.clone(),
).await?;
let arc = Arc::new(Mutex::new(session));
@ -554,70 +409,6 @@ impl SessionManager {
Ok(arc)
}
async fn list_dialogs_for_chat(
&self,
channel: &str,
chat_id: &str,
include_archived: bool,
) -> Result<Vec<DialogInfo>, AgentError> {
let records = self.store
.list_sessions(channel, include_archived)
.map_err(|err| AgentError::Other(format!("list dialogs error: {}", err)))?;
let dialogs: Vec<DialogInfo> = records
.into_iter()
.filter(|r| {
if let Some(sid) = UnifiedSessionId::parse(&r.id) {
sid.chat_id == chat_id
} else {
false
}
})
.map(|r| {
let sid = UnifiedSessionId::parse(&r.id).unwrap();
DialogInfo {
session_id: sid,
title: r.title,
created_at: r.created_at,
last_active_at: r.last_active_at,
message_count: r.message_count,
archived_at: r.archived_at,
}
})
.collect();
Ok(dialogs)
}
pub async fn get_most_recent_dialog(
&self,
channel: &str,
chat_id: &str,
) -> Result<Option<UnifiedSessionId>, AgentError> {
let records = self.store
.list_sessions(channel, false)
.map_err(|err| AgentError::Other(format!("get recent dialog error: {}", err)))?;
let most_recent = records
.into_iter()
.filter(|r| {
if let Some(sid) = UnifiedSessionId::parse(&r.id) {
sid.chat_id == chat_id
} else {
false
}
})
.max_by_key(|r| r.last_active_at);
Ok(most_recent.map(|r| UnifiedSessionId::parse(&r.id).unwrap()))
}
pub fn rename_dialog(&self, session_id: &UnifiedSessionId, title: &str) -> Result<(), AgentError> {
self.store
.rename_session(&session_id.to_string(), title)
.map_err(|err| AgentError::Other(format!("rename dialog error: {}", err)))
}
pub async fn create_dialog(
&self,
channel: &str,
@ -629,10 +420,10 @@ impl SessionManager {
pub async fn get_current_dialog(
&self,
channel: &str,
chat_id: &str,
_channel: &str,
_chat_id: &str,
) -> Result<Option<UnifiedSessionId>, AgentError> {
self.get_most_recent_dialog(channel, chat_id).await
Ok(None)
}
pub async fn switch_dialog(
@ -646,31 +437,27 @@ impl SessionManager {
pub async fn list_dialogs(
&self,
channel: &str,
chat_id: &str,
include_archived: bool,
_channel: &str,
_chat_id: &str,
_include_archived: bool,
) -> Result<(Vec<DialogInfo>, Option<String>), AgentError> {
let dialogs = self.list_dialogs_for_chat(channel, chat_id, include_archived).await?;
let current = self.get_most_recent_dialog(channel, chat_id).await?;
Ok((dialogs, current.map(|id| id.to_string())))
Ok((vec![], None))
}
pub fn archive_dialog(&self, session_id: &UnifiedSessionId) -> Result<(), AgentError> {
self.store
.archive_session(&session_id.to_string())
.map_err(|err| AgentError::Other(format!("archive dialog error: {}", err)))
pub fn rename_dialog(&self, _session_id: &UnifiedSessionId, _title: &str) -> Result<(), AgentError> {
Err(AgentError::Other("rename_dialog not available".to_string()))
}
pub fn delete_dialog(&self, session_id: &UnifiedSessionId) -> Result<(), AgentError> {
self.store
.delete_session(&session_id.to_string())
.map_err(|err| AgentError::Other(format!("delete dialog error: {}", err)))
pub fn archive_dialog(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> {
Err(AgentError::Other("archive_dialog not available".to_string()))
}
pub fn clear_dialog_history(&self, session_id: &UnifiedSessionId) -> Result<(), AgentError> {
self.store
.clear_messages(&session_id.to_string())
.map_err(|err| AgentError::Other(format!("clear dialog history error: {}", err)))
pub fn delete_dialog(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> {
Err(AgentError::Other("delete_dialog not available".to_string()))
}
pub fn clear_dialog_history(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> {
Err(AgentError::Other("clear_dialog_history not available".to_string()))
}
pub async fn handle_message(
@ -688,7 +475,6 @@ impl SessionManager {
let response: String = {
let mut session_guard = session.lock().await;
session_guard.ensure_persistent_session()?;
let media_refs: Vec<String> = media.iter().map(|m| m.path.clone()).collect();
#[cfg(debug_assertions)]
@ -697,19 +483,17 @@ impl SessionManager {
}
let user_message = session_guard.create_user_message(content, media_refs);
session_guard.add_message(user_message.clone());
session_guard.append_message(&user_message)?;
session_guard.load_history()?;
session_guard.add_message(user_message);
let mut history = session_guard.get_history().to_vec();
// Build skills prompt
let skills_prompt = self.skills_loader.build_skills_prompt();
if !skills_prompt.is_empty() {
let skills_message = ChatMessage::system(skills_prompt);
history.insert(0, skills_message);
tracing::debug!("Injected skills into context");
}
// Build combined system prompt and inject at position 0
// This ensures AgentLoop.process() sees a system message and doesn't inject its own
let system_prompt = session_guard.build_system_prompt(&skills_prompt);
history.insert(0, ChatMessage::system(system_prompt));
let history = session_guard.compressor
.compress_if_needed(history)
@ -718,8 +502,8 @@ impl SessionManager {
let agent = session_guard.create_agent()?;
let result = agent.process(history).await?;
for msg in &result.emitted_messages {
session_guard.append_message(msg)?;
for msg in result.emitted_messages {
session_guard.add_message(msg);
}
result.final_response.content
@ -739,7 +523,7 @@ impl SessionManager {
pub async fn clear_session_history(&self, unified_id: &UnifiedSessionId) -> Result<(), AgentError> {
let session = self.get_or_create_session(unified_id).await?;
let mut session_guard = session.lock().await;
session_guard.clear_history()?;
session_guard.clear_history();
Ok(())
}
}

View File

@ -1,651 +0,0 @@
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use rusqlite::{Connection, OptionalExtension, params};
use serde::{Deserialize, Serialize};
use crate::bus::ChatMessage;
#[derive(Debug, thiserror::Error)]
pub enum StorageError {
#[error("database error: {0}")]
Database(#[from] rusqlite::Error),
#[error("io error: {0}")]
Io(#[from] std::io::Error),
#[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionRecord {
pub id: String,
pub title: String,
pub channel_name: String,
pub chat_id: String,
pub summary: Option<String>,
pub created_at: i64,
pub updated_at: i64,
pub last_active_at: i64,
pub archived_at: Option<i64>,
pub deleted_at: Option<i64>,
pub message_count: i64,
pub reset_cutoff_seq: i64,
}
#[derive(Clone)]
pub struct SessionStore {
conn: Arc<Mutex<Connection>>,
}
impl SessionStore {
pub fn new() -> Result<Self, StorageError> {
let db_path = default_session_db_path()?;
Self::open_at_path(&db_path)
}
fn open_at_path(path: &Path) -> Result<Self, StorageError> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let conn = Connection::open(path)?;
Self::from_connection(conn)
}
fn from_connection(conn: Connection) -> Result<Self, StorageError> {
conn.execute_batch(
"
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
channel_name TEXT NOT NULL,
chat_id TEXT NOT NULL,
summary TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_active_at INTEGER NOT NULL,
archived_at INTEGER,
deleted_at INTEGER,
message_count INTEGER NOT NULL DEFAULT 0,
reset_cutoff_seq INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_sessions_channel_archived
ON sessions(channel_name, archived_at, last_active_at DESC);
CREATE INDEX IF NOT EXISTS idx_sessions_updated_at
ON sessions(updated_at DESC);
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
seq INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
media_refs_json TEXT NOT NULL,
tool_call_id TEXT,
tool_name TEXT,
tool_calls_json TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE,
UNIQUE(session_id, seq)
);
CREATE INDEX IF NOT EXISTS idx_messages_session_seq
ON messages(session_id, seq);
CREATE INDEX IF NOT EXISTS idx_messages_session_created
ON messages(session_id, created_at);
",
)?;
ensure_sessions_schema(&conn)?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
})
}
#[cfg(test)]
pub(crate) fn in_memory() -> Result<Self, StorageError> {
Self::from_connection(Connection::open_in_memory()?)
}
pub fn create_cli_session(&self, title: Option<&str>) -> Result<SessionRecord, StorageError> {
let now = crate::bus::message::current_timestamp();
let id = uuid::Uuid::new_v4().to_string();
let title = title
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.unwrap_or_else(|| format!("CLI Session {}", &id[..8]));
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"
INSERT INTO sessions (
id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at, archived_at, deleted_at, message_count
) VALUES (?1, ?2, 'cli_chat', ?3, NULL, ?4, ?4, ?4, NULL, NULL, 0)
",
params![id, title, id, now],
)?;
drop(conn);
self.get_session(&id)?.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
}
pub fn ensure_channel_session(
&self,
channel_name: &str,
chat_id: &str,
dialog_id: &str,
) -> Result<SessionRecord, StorageError> {
let session_id = persistent_session_id(channel_name, chat_id, dialog_id);
if let Some(record) = self.get_session(&session_id)? {
return Ok(record);
}
let now = crate::bus::message::current_timestamp();
let title = format!("{}:{}", channel_name, chat_id);
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"
INSERT INTO sessions (
id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at, archived_at, deleted_at, message_count
) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0)
",
params![session_id, title, channel_name, chat_id, now],
)?;
drop(conn);
self.get_session(&session_id)?.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
}
pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at,
archived_at, deleted_at, message_count, reset_cutoff_seq
FROM sessions
WHERE id = ?1 AND deleted_at IS NULL
",
)?;
stmt.query_row(params![session_id], map_session_record)
.optional()
.map_err(StorageError::from)
}
pub fn list_sessions(
&self,
channel_name: &str,
include_archived: bool,
) -> Result<Vec<SessionRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut sql = String::from(
"
SELECT id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at,
archived_at, deleted_at, message_count, reset_cutoff_seq
FROM sessions
WHERE channel_name = ?1
AND deleted_at IS NULL
",
);
if !include_archived {
sql.push_str(" AND archived_at IS NULL");
}
sql.push_str(" ORDER BY last_active_at DESC, created_at DESC");
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(params![channel_name], map_session_record)?;
let mut sessions = Vec::new();
for row in rows {
sessions.push(row?);
}
Ok(sessions)
}
pub fn rename_session(&self, session_id: &str, title: &str) -> Result<(), StorageError> {
let now = crate::bus::message::current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"UPDATE sessions SET title = ?2, updated_at = ?3 WHERE id = ?1 AND deleted_at IS NULL",
params![session_id, title.trim(), now],
)?;
Ok(())
}
pub fn archive_session(&self, session_id: &str) -> Result<(), StorageError> {
let now = crate::bus::message::current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"UPDATE sessions SET archived_at = ?2, updated_at = ?2 WHERE id = ?1 AND deleted_at IS NULL",
params![session_id, now],
)?;
Ok(())
}
pub fn delete_session(&self, session_id: &str) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute("DELETE FROM messages WHERE session_id = ?1", params![session_id])?;
conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
Ok(())
}
pub fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> {
let now = crate::bus::message::current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute("DELETE FROM messages WHERE session_id = ?1", params![session_id])?;
conn.execute(
"
UPDATE sessions
SET message_count = 0, updated_at = ?2, last_active_at = ?2, reset_cutoff_seq = 0
WHERE id = ?1 AND deleted_at IS NULL
",
params![session_id, now],
)?;
Ok(())
}
pub fn reset_session(&self, session_id: &str) -> Result<(), StorageError> {
let now = crate::bus::message::current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
let tx = conn.unchecked_transaction()?;
let cutoff_seq: i64 = tx.query_row(
"SELECT COALESCE(MAX(seq), 0) FROM messages WHERE session_id = ?1",
params![session_id],
|row| row.get(0),
)?;
tx.execute(
"
UPDATE sessions
SET reset_cutoff_seq = ?2,
updated_at = ?3,
last_active_at = ?3,
archived_at = NULL
WHERE id = ?1 AND deleted_at IS NULL
",
params![session_id, cutoff_seq, now],
)?;
tx.commit()?;
Ok(())
}
pub fn append_message(&self, session_id: &str, message: &ChatMessage) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let tx = conn.unchecked_transaction()?;
let seq: i64 = tx.query_row(
"SELECT COALESCE(MAX(seq), 0) + 1 FROM messages WHERE session_id = ?1",
params![session_id],
|row| row.get(0),
)?;
let media_refs_json = serde_json::to_string(&message.media_refs)?;
let tool_calls_json = message.tool_calls.as_ref().map(serde_json::to_string).transpose()?;
tx.execute(
"
INSERT INTO messages (
id, session_id, seq, role, content,
media_refs_json, tool_call_id, tool_name, tool_calls_json, created_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10)
",
params![
message.id,
session_id,
seq,
message.role,
message.content,
media_refs_json,
message.tool_call_id,
message.tool_name,
tool_calls_json,
message.timestamp,
],
)?;
let now = crate::bus::message::current_timestamp();
tx.execute(
"
UPDATE sessions
SET message_count = message_count + 1,
updated_at = ?2,
last_active_at = ?2,
archived_at = NULL
WHERE id = ?1 AND deleted_at IS NULL
",
params![session_id, now],
)?;
tx.commit()?;
Ok(())
}
pub fn load_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let cutoff_seq = active_reset_cutoff(&conn, session_id)?;
load_messages_after(&conn, session_id, cutoff_seq)
}
pub fn load_all_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
load_messages_after(&conn, session_id, 0)
}
}
pub fn persistent_session_id(channel_name: &str, chat_id: &str, dialog_id: &str) -> String {
format!("{}:{}:{}", channel_name, chat_id, dialog_id)
}
fn default_session_db_path() -> Result<PathBuf, std::io::Error> {
let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
Ok(home.join(".picobot").join("storage").join("sessions.db"))
}
fn map_session_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
Ok(SessionRecord {
id: row.get(0)?,
title: row.get(1)?,
channel_name: row.get(2)?,
chat_id: row.get(3)?,
summary: row.get(4)?,
created_at: row.get(5)?,
updated_at: row.get(6)?,
last_active_at: row.get(7)?,
archived_at: row.get(8)?,
deleted_at: row.get(9)?,
message_count: row.get(10)?,
reset_cutoff_seq: row.get(11)?,
})
}
fn ensure_sessions_schema(conn: &Connection) -> Result<(), StorageError> {
if !has_column(conn, "sessions", "reset_cutoff_seq")? {
conn.execute(
"ALTER TABLE sessions ADD COLUMN reset_cutoff_seq INTEGER NOT NULL DEFAULT 0",
[],
)?;
}
Ok(())
}
fn has_column(conn: &Connection, table_name: &str, column_name: &str) -> Result<bool, StorageError> {
let pragma = format!("PRAGMA table_info({})", table_name);
let mut stmt = conn.prepare(&pragma)?;
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let existing_name: String = row.get(1)?;
if existing_name == column_name {
return Ok(true);
}
}
Ok(false)
}
fn active_reset_cutoff(conn: &Connection, session_id: &str) -> Result<i64, StorageError> {
let cutoff = conn
.query_row(
"SELECT reset_cutoff_seq FROM sessions WHERE id = ?1 AND deleted_at IS NULL",
params![session_id],
|row| row.get(0),
)
.optional()?;
Ok(cutoff.unwrap_or(0))
}
fn load_messages_after(
conn: &Connection,
session_id: &str,
cutoff_seq: i64,
) -> Result<Vec<ChatMessage>, StorageError> {
let mut stmt = conn.prepare(
"
SELECT id, role, content, media_refs_json, created_at, tool_call_id, tool_name, tool_calls_json
FROM messages
WHERE session_id = ?1 AND seq > ?2
ORDER BY seq ASC
",
)?;
let rows = stmt.query_map(params![session_id, cutoff_seq], |row| {
let media_refs_json: String = row.get(3)?;
let media_refs: Vec<String> = serde_json::from_str(&media_refs_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
media_refs_json.len(),
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
let tool_calls_json: Option<String> = row.get(7)?;
let tool_calls = tool_calls_json
.as_deref()
.map(serde_json::from_str)
.transpose()
.map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
7,
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
Ok(ChatMessage {
id: row.get(0)?,
role: row.get(1)?,
content: row.get(2)?,
media_refs,
timestamp: row.get(4)?,
tool_call_id: row.get(5)?,
tool_name: row.get(6)?,
tool_calls,
})
})?;
let mut messages = Vec::new();
for row in rows {
messages.push(row?);
}
Ok(messages)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::providers::ToolCall;
#[test]
fn test_persistent_session_id_for_cli_and_channel() {
assert_eq!(persistent_session_id("cli", "abc", "default"), "cli:abc:default");
assert_eq!(persistent_session_id("cli_chat", "abc", "default"), "cli_chat:abc:default");
assert_eq!(persistent_session_id("feishu", "abc", "default"), "feishu:abc:default");
}
#[test]
fn test_session_store_roundtrip_and_lifecycle() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("demo")).unwrap();
assert_eq!(session.title, "demo");
assert_eq!(session.channel_name, "cli_chat");
assert_eq!(session.chat_id, session.id);
assert_eq!(session.message_count, 0);
assert_eq!(session.reset_cutoff_seq, 0);
let first = ChatMessage::user("hello");
let second = ChatMessage::assistant("world");
store.append_message(&session.id, &first).unwrap();
store.append_message(&session.id, &second).unwrap();
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.message_count, 2);
assert!(stored.archived_at.is_none());
assert_eq!(stored.reset_cutoff_seq, 0);
let messages = store.load_messages(&session.id).unwrap();
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].role, "user");
assert_eq!(messages[0].content, "hello");
assert_eq!(messages[1].role, "assistant");
assert_eq!(messages[1].content, "world");
store.rename_session(&session.id, "renamed").unwrap();
let renamed = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(renamed.title, "renamed");
store.archive_session(&session.id).unwrap();
let archived = store.get_session(&session.id).unwrap().unwrap();
assert!(archived.archived_at.is_some());
let active_only = store.list_sessions("cli_chat", false).unwrap();
assert!(active_only.is_empty());
let including_archived = store.list_sessions("cli_chat", true).unwrap();
assert_eq!(including_archived.len(), 1);
store.clear_messages(&session.id).unwrap();
let cleared = store.load_messages(&session.id).unwrap();
assert!(cleared.is_empty());
let cleared_session = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(cleared_session.message_count, 0);
store.delete_session(&session.id).unwrap();
assert!(store.get_session(&session.id).unwrap().is_none());
}
#[test]
fn test_ensure_channel_session_is_stable() {
let store = SessionStore::in_memory().unwrap();
let first = store.ensure_channel_session("feishu", "chat-1", "default").unwrap();
let second = store.ensure_channel_session("feishu", "chat-1", "default").unwrap();
assert_eq!(first.id, second.id);
assert_eq!(first.chat_id, "chat-1");
assert_eq!(second.channel_name, "feishu");
}
#[test]
fn test_assistant_tool_calls_roundtrip() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("tools")).unwrap();
let assistant = ChatMessage::assistant_with_tool_calls(
"calling tool",
vec![ToolCall {
id: "call_1".to_string(),
name: "calculator".to_string(),
arguments: serde_json::json!({ "expression": "3*7" }),
}],
);
store.append_message(&session.id, &assistant).unwrap();
let messages = store.load_messages(&session.id).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].role, "assistant");
assert_eq!(messages[0].tool_calls.as_ref().unwrap().len(), 1);
assert_eq!(messages[0].tool_calls.as_ref().unwrap()[0].id, "call_1");
assert_eq!(messages[0].tool_calls.as_ref().unwrap()[0].name, "calculator");
}
#[test]
fn test_reset_session_preserves_full_history_and_hides_active_history() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("reset")).unwrap();
store.append_message(&session.id, &ChatMessage::user("before")).unwrap();
store.append_message(&session.id, &ChatMessage::assistant("context")).unwrap();
store.reset_session(&session.id).unwrap();
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.reset_cutoff_seq, 2);
let active_messages = store.load_messages(&session.id).unwrap();
assert!(active_messages.is_empty());
let all_messages = store.load_all_messages(&session.id).unwrap();
assert_eq!(all_messages.len(), 2);
assert_eq!(all_messages[0].content, "before");
assert_eq!(all_messages[1].content, "context");
store.append_message(&session.id, &ChatMessage::user("after")).unwrap();
let active_messages = store.load_messages(&session.id).unwrap();
assert_eq!(active_messages.len(), 1);
assert_eq!(active_messages[0].content, "after");
}
#[test]
fn test_schema_migration_adds_reset_cutoff_column() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
"
CREATE TABLE sessions (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
channel_name TEXT NOT NULL,
chat_id TEXT NOT NULL,
summary TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_active_at INTEGER NOT NULL,
archived_at INTEGER,
deleted_at INTEGER,
message_count INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
seq INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
media_refs_json TEXT NOT NULL,
tool_call_id TEXT,
tool_name TEXT,
tool_calls_json TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE,
UNIQUE(session_id, seq)
);
",
)
.unwrap();
let store = SessionStore::from_connection(conn).unwrap();
let session = store.create_cli_session(Some("migrated")).unwrap();
assert_eq!(session.reset_cutoff_seq, 0);
}
#[test]
fn test_tool_result_roundtrip() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("tool-result")).unwrap();
let tool_message = ChatMessage::tool("call_9", "file_write", "saved to /tmp/output.txt");
store.append_message(&session.id, &tool_message).unwrap();
let messages = store.load_messages(&session.id).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].role, "tool");
assert_eq!(messages[0].content, "saved to /tmp/output.txt");
assert_eq!(messages[0].tool_call_id.as_deref(), Some("call_9"));
assert_eq!(messages[0].tool_name.as_deref(), Some("file_write"));
assert!(messages[0].tool_calls.is_none());
}
}