diff --git a/Cargo.toml b/Cargo.toml index f557e7f..c8b5a04 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ mime_guess = "2.0" base64 = "0.22" tempfile = "3" meval = "0.2" -rusqlite = { version = "0.32", features = ["bundled"] } ratatui = "0.27" crossterm = { version = "0.28", features = ["event-stream"] } termimad = "0.34" diff --git a/src/lib.rs b/src/lib.rs index 5b270ca..a624c64 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,5 @@ pub mod protocol; pub mod channels; pub mod logging; pub mod observability; -pub mod storage; pub mod skills; pub mod tools; diff --git a/src/session/error.rs b/src/session/error.rs index c98a1ec..3b9f3cb 100644 --- a/src/session/error.rs +++ b/src/session/error.rs @@ -14,9 +14,3 @@ impl From for SessionError { SessionError::Other(e.to_string()) } } - -impl From for SessionError { - fn from(e: crate::storage::StorageError) -> Self { - SessionError::Other(e.to_string()) - } -} diff --git a/src/session/session.rs b/src/session/session.rs index 0d8bbfd..9a634cc 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -8,13 +8,13 @@ use uuid::Uuid; use crate::bus::ChatMessage; use crate::config::LLMProviderConfig; use crate::agent::{AgentLoop, AgentError, ContextCompressor}; +use crate::agent::system_prompt::build_system_prompt; use crate::agent::context_compressor::ContextCompressionConfig; use crate::protocol::WsOutbound; use crate::providers::{create_provider, LLMProvider}; use crate::session::session_id::{UnifiedSessionId, DEFAULT_DIALOG_ID}; use crate::session::events::DialogInfo; use crate::skills::SkillsLoader; -use crate::storage::{SessionRecord, SessionStore}; use crate::tools::{ BashTool, CalculatorTool, FileEditTool, FileReadTool, FileWriteTool, GetSkillTool, HttpRequestTool, ToolRegistry, WebFetchTool, @@ -35,7 +35,6 @@ pub struct Session { provider: Arc, tools: Arc, compressor: ContextCompressor, - store: Arc, } impl Session { @@ -44,7 +43,6 @@ impl Session { provider_config: LLMProviderConfig, user_tx: mpsc::Sender, tools: Arc, - store: Arc, ) -> Result { let provider_box = create_provider(provider_config.clone()) .map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?; @@ -63,34 +61,14 @@ impl Session { provider: provider.clone(), tools, compressor: ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config), - store, }) } - /// 获取持久化 session ID - pub fn persistent_session_id(&self) -> String { + /// 获取 session ID + pub fn session_id(&self) -> String { self.id.to_string() } - /// 确保存储中有此 session - pub fn ensure_persistent_session(&self) -> Result { - self.store - .ensure_channel_session(&self.id.channel, &self.id.chat_id, &self.id.dialog_id) - .map_err(|err| AgentError::Other(format!("session persistence error: {}", err))) - } - - /// 加载历史消息到内存 - pub fn load_history(&mut self) -> Result<(), AgentError> { - if !self.messages.is_empty() { - return Ok(()); - } - let history = self.store - .load_messages(&self.persistent_session_id()) - .map_err(|err| AgentError::Other(format!("session history load error: {}", err)))?; - self.messages = history; - Ok(()) - } - /// 添加消息到历史 pub fn add_message(&mut self, message: ChatMessage) { self.messages.push(message); @@ -102,50 +80,19 @@ impl Session { } /// 清除历史消息 - pub fn clear_history(&mut self) -> Result<(), AgentError> { + pub fn clear_history(&mut self) { let len = self.messages.len(); self.messages.clear(); #[cfg(debug_assertions)] tracing::debug!(session_id = %self.id, previous_len = len, "Chat history cleared"); - self.store - .clear_messages(&self.persistent_session_id()) - .map_err(|err| AgentError::Other(format!("clear history error: {}", err))) } /// 重置对话上下文 - pub fn reset_context(&mut self) -> Result<(), AgentError> { + pub fn reset_context(&mut self) { let len = self.messages.len(); self.messages.clear(); #[cfg(debug_assertions)] tracing::debug!(session_id = %self.id, previous_len = len, "Chat context reset in memory"); - self.store - .reset_session(&self.persistent_session_id()) - .map_err(|err| AgentError::Other(format!("reset context error: {}", err))) - } - - /// Archive 此 session - pub fn archive(&self) -> Result<(), AgentError> { - self.store - .archive_session(&self.persistent_session_id()) - .map_err(|err| AgentError::Other(format!("archive session error: {}", err))) - } - - /// 持久化消息 - pub fn append_message(&self, message: &ChatMessage) -> Result<(), AgentError> { - self.store - .append_message(&self.persistent_session_id(), message) - .map_err(|err| AgentError::Other(format!("append message error: {}", err))) - } - - /// 持久化多条消息 - pub fn append_messages(&self, messages: I) -> Result<(), AgentError> - where - I: IntoIterator, - { - for message in messages { - self.append_message(&message)?; - } - Ok(()) } pub fn create_user_message(&self, content: &str, media_refs: Vec) -> ChatMessage { @@ -180,6 +127,21 @@ impl Session { self.provider_config.workspace_dir.clone(), )) } + + /// 构建系统提示词(包含 AgentLoop 的基础提示词 + skills) + pub fn build_system_prompt(&self, skills_prompt: &str) -> String { + let base_prompt = build_system_prompt( + &self.provider_config.workspace_dir, + &self.provider_config.model_id, + &self.tools, + ); + + if skills_prompt.trim().is_empty() { + base_prompt + } else { + format!("{}\n\n## Skills\n\n{}\n\nUse the `get_skill` tool to load a skill's full content when needed.", base_prompt, skills_prompt) + } + } } /// SessionManager 管理所有 Session,按 channel_name 路由 @@ -188,7 +150,6 @@ pub struct SessionManager { inner: Arc>, provider_config: LLMProviderConfig, tools: Arc, - store: Arc, skills_loader: Arc, } @@ -282,11 +243,6 @@ pub static SLASH_COMMANDS: &[SlashCommand] = &[ impl SessionManager { pub fn new(session_ttl_hours: u64, provider_config: LLMProviderConfig) -> Result { - let store = Arc::new( - SessionStore::new() - .map_err(|err| AgentError::Other(format!("session store init error: {}", err)))?, - ); - let skills_loader = SkillsLoader::new(); skills_loader.load_skills(); let skills_loader = Arc::new(skills_loader); @@ -301,7 +257,6 @@ impl SessionManager { })), provider_config, tools, - store, skills_loader, }) } @@ -332,11 +287,6 @@ impl SessionManager { match cmd.name { "new" => { - if let Some(sid) = current_session_id { - self.store - .archive_session(&sid.to_string()) - .map_err(|e| AgentError::Other(format!("archive session error: {}", e)))?; - } let title = args.map(|s| s.to_string()); let (new_id, title) = self.create_session(channel, chat_id, title.as_deref()).await?; Ok((Some(new_id), format!("New conversation '{}' created.", title))) @@ -352,50 +302,28 @@ impl SessionManager { Ok((Some(unified_id), format!("Switched to session {}", target_id))) } "rename" => { - let title = args + let _title = args .ok_or_else(|| AgentError::Other("Usage: /rename ".to_string()))?; - if let Some(sid) = current_session_id { - self.store - .rename_session(&sid.to_string(), title) - .map_err(|e| AgentError::Other(format!("rename session error: {}", e)))?; - Ok((None, format!("Conversation renamed to '{}'.", title))) - } else { - Ok((None, "No active conversation to rename.".to_string())) - } + Ok((None, "Rename not available in this mode.".to_string())) } "archive" => { - if let Some(sid) = current_session_id { - self.store - .archive_session(&sid.to_string()) - .map_err(|e| AgentError::Other(format!("archive session error: {}", e)))?; - Ok((None, "Conversation archived.".to_string())) - } else { - Ok((None, "No active conversation to archive.".to_string())) - } + Ok((None, "Archive not available in this mode.".to_string())) } "delete" => { - if let Some(sid) = current_session_id { - self.store - .delete_session(&sid.to_string()) - .map_err(|e| AgentError::Other(format!("delete session error: {}", e)))?; - let (new_id, _title) = self.create_session(channel, chat_id, None).await?; - Ok((Some(new_id), "Conversation deleted. New conversation created.".to_string())) - } else { - Ok((None, "No active conversation to delete.".to_string())) - } + let (new_id, _title) = self.create_session(channel, chat_id, None).await?; + Ok((Some(new_id), "Conversation deleted. New conversation created.".to_string())) } "compact" => { if let Some(sid) = current_session_id { let session = self.get_or_create_session(sid).await?; let mut session_guard = session.lock().await; - session_guard.load_history()?; let original_count = session_guard.get_history().len(); let history = session_guard.get_history().to_vec(); let compressed = session_guard.compressor .compress_if_needed(history) .await?; let compressed_count = compressed.len(); - session_guard.clear_history()?; + session_guard.clear_history(); for msg in compressed { session_guard.add_message(msg); } @@ -412,7 +340,7 @@ impl SessionManager { let session = self.get_or_create_session(sid).await?; let session_guard = session.lock().await; let message_count = session_guard.get_history().len(); - let session_id_str = session_guard.persistent_session_id(); + let session_id_str = session_guard.session_id(); Ok((None, format!( "Session ID: {}\nMessage count: {}", session_id_str, message_count @@ -425,58 +353,6 @@ impl SessionManager { } } - pub fn store(&self) -> Arc<SessionStore> { - self.store.clone() - } - - pub fn create_cli_session(&self, title: Option<&str>) -> Result<SessionRecord, AgentError> { - self.store - .create_cli_session(title) - .map_err(|err| AgentError::Other(format!("create session error: {}", err))) - } - - pub fn get_session_record(&self, session_id: &str) -> Result<Option<SessionRecord>, AgentError> { - self.store - .get_session(session_id) - .map_err(|err| AgentError::Other(format!("get session error: {}", err))) - } - - pub fn list_cli_sessions(&self, include_archived: bool) -> Result<Vec<SessionRecord>, AgentError> { - self.store - .list_sessions("cli", include_archived) - .map_err(|err| AgentError::Other(format!("list sessions error: {}", err))) - } - - pub fn rename_session(&self, session_id: &str, title: &str) -> Result<(), AgentError> { - self.store - .rename_session(session_id, title) - .map_err(|err| AgentError::Other(format!("rename session error: {}", err))) - } - - pub fn archive_session(&self, session_id: &str) -> Result<(), AgentError> { - self.store - .archive_session(session_id) - .map_err(|err| AgentError::Other(format!("archive session error: {}", err))) - } - - pub fn delete_session(&self, session_id: &str) -> Result<(), AgentError> { - self.store - .delete_session(session_id) - .map_err(|err| AgentError::Other(format!("delete session error: {}", err))) - } - - pub fn clear_session_messages(&self, session_id: &str) -> Result<(), AgentError> { - self.store - .clear_messages(session_id) - .map_err(|err| AgentError::Other(format!("clear session messages error: {}", err))) - } - - pub fn load_session_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, AgentError> { - self.store - .load_messages(session_id) - .map_err(|err| AgentError::Other(format!("load session messages error: {}", err))) - } - pub async fn create_session( &self, channel: &str, @@ -493,17 +369,12 @@ impl SessionManager { .map(ToOwned::to_owned) .unwrap_or_else(|| format!("Dialog {}", &dialog_id)); - self.store - .ensure_channel_session(channel, chat_id, &dialog_id) - .map_err(|err| AgentError::Other(format!("create session error: {}", err)))?; - let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100); let session = Session::new( unified_id.clone(), self.provider_config.clone(), user_tx, self.tools.clone(), - self.store.clone(), ).await?; let arc = Arc::new(Mutex::new(session)); @@ -523,29 +394,13 @@ impl SessionManager { return Ok(session.clone()); } - if let Ok(Some(_)) = self.store.get_session(&session_id_str) { - let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100); - let session = Session::new( - unified_id.clone(), - self.provider_config.clone(), - user_tx, - self.tools.clone(), - self.store.clone(), - ).await?; - - let arc = Arc::new(Mutex::new(session)); - inner.sessions.insert(session_id_str.clone(), arc.clone()); - inner.session_timestamps.insert(session_id_str, Instant::now()); - return Ok(arc); - } - + // Create new session let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100); let session = Session::new( unified_id.clone(), self.provider_config.clone(), user_tx, self.tools.clone(), - self.store.clone(), ).await?; let arc = Arc::new(Mutex::new(session)); @@ -554,70 +409,6 @@ impl SessionManager { Ok(arc) } - async fn list_dialogs_for_chat( - &self, - channel: &str, - chat_id: &str, - include_archived: bool, - ) -> Result<Vec<DialogInfo>, AgentError> { - let records = self.store - .list_sessions(channel, include_archived) - .map_err(|err| AgentError::Other(format!("list dialogs error: {}", err)))?; - - let dialogs: Vec<DialogInfo> = records - .into_iter() - .filter(|r| { - if let Some(sid) = UnifiedSessionId::parse(&r.id) { - sid.chat_id == chat_id - } else { - false - } - }) - .map(|r| { - let sid = UnifiedSessionId::parse(&r.id).unwrap(); - DialogInfo { - session_id: sid, - title: r.title, - created_at: r.created_at, - last_active_at: r.last_active_at, - message_count: r.message_count, - archived_at: r.archived_at, - } - }) - .collect(); - - Ok(dialogs) - } - - pub async fn get_most_recent_dialog( - &self, - channel: &str, - chat_id: &str, - ) -> Result<Option<UnifiedSessionId>, AgentError> { - let records = self.store - .list_sessions(channel, false) - .map_err(|err| AgentError::Other(format!("get recent dialog error: {}", err)))?; - - let most_recent = records - .into_iter() - .filter(|r| { - if let Some(sid) = UnifiedSessionId::parse(&r.id) { - sid.chat_id == chat_id - } else { - false - } - }) - .max_by_key(|r| r.last_active_at); - - Ok(most_recent.map(|r| UnifiedSessionId::parse(&r.id).unwrap())) - } - - pub fn rename_dialog(&self, session_id: &UnifiedSessionId, title: &str) -> Result<(), AgentError> { - self.store - .rename_session(&session_id.to_string(), title) - .map_err(|err| AgentError::Other(format!("rename dialog error: {}", err))) - } - pub async fn create_dialog( &self, channel: &str, @@ -629,10 +420,10 @@ impl SessionManager { pub async fn get_current_dialog( &self, - channel: &str, - chat_id: &str, + _channel: &str, + _chat_id: &str, ) -> Result<Option<UnifiedSessionId>, AgentError> { - self.get_most_recent_dialog(channel, chat_id).await + Ok(None) } pub async fn switch_dialog( @@ -646,31 +437,27 @@ impl SessionManager { pub async fn list_dialogs( &self, - channel: &str, - chat_id: &str, - include_archived: bool, + _channel: &str, + _chat_id: &str, + _include_archived: bool, ) -> Result<(Vec<DialogInfo>, Option<String>), AgentError> { - let dialogs = self.list_dialogs_for_chat(channel, chat_id, include_archived).await?; - let current = self.get_most_recent_dialog(channel, chat_id).await?; - Ok((dialogs, current.map(|id| id.to_string()))) + Ok((vec![], None)) } - pub fn archive_dialog(&self, session_id: &UnifiedSessionId) -> Result<(), AgentError> { - self.store - .archive_session(&session_id.to_string()) - .map_err(|err| AgentError::Other(format!("archive dialog error: {}", err))) + pub fn rename_dialog(&self, _session_id: &UnifiedSessionId, _title: &str) -> Result<(), AgentError> { + Err(AgentError::Other("rename_dialog not available".to_string())) } - pub fn delete_dialog(&self, session_id: &UnifiedSessionId) -> Result<(), AgentError> { - self.store - .delete_session(&session_id.to_string()) - .map_err(|err| AgentError::Other(format!("delete dialog error: {}", err))) + pub fn archive_dialog(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> { + Err(AgentError::Other("archive_dialog not available".to_string())) } - pub fn clear_dialog_history(&self, session_id: &UnifiedSessionId) -> Result<(), AgentError> { - self.store - .clear_messages(&session_id.to_string()) - .map_err(|err| AgentError::Other(format!("clear dialog history error: {}", err))) + pub fn delete_dialog(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> { + Err(AgentError::Other("delete_dialog not available".to_string())) + } + + pub fn clear_dialog_history(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> { + Err(AgentError::Other("clear_dialog_history not available".to_string())) } pub async fn handle_message( @@ -688,7 +475,6 @@ impl SessionManager { let response: String = { let mut session_guard = session.lock().await; - session_guard.ensure_persistent_session()?; let media_refs: Vec<String> = media.iter().map(|m| m.path.clone()).collect(); #[cfg(debug_assertions)] @@ -697,19 +483,17 @@ impl SessionManager { } let user_message = session_guard.create_user_message(content, media_refs); - session_guard.add_message(user_message.clone()); - session_guard.append_message(&user_message)?; - - session_guard.load_history()?; + session_guard.add_message(user_message); let mut history = session_guard.get_history().to_vec(); + // Build skills prompt let skills_prompt = self.skills_loader.build_skills_prompt(); - if !skills_prompt.is_empty() { - let skills_message = ChatMessage::system(skills_prompt); - history.insert(0, skills_message); - tracing::debug!("Injected skills into context"); - } + + // Build combined system prompt and inject at position 0 + // This ensures AgentLoop.process() sees a system message and doesn't inject its own + let system_prompt = session_guard.build_system_prompt(&skills_prompt); + history.insert(0, ChatMessage::system(system_prompt)); let history = session_guard.compressor .compress_if_needed(history) @@ -718,8 +502,8 @@ impl SessionManager { let agent = session_guard.create_agent()?; let result = agent.process(history).await?; - for msg in &result.emitted_messages { - session_guard.append_message(msg)?; + for msg in result.emitted_messages { + session_guard.add_message(msg); } result.final_response.content @@ -739,7 +523,7 @@ impl SessionManager { pub async fn clear_session_history(&self, unified_id: &UnifiedSessionId) -> Result<(), AgentError> { let session = self.get_or_create_session(unified_id).await?; let mut session_guard = session.lock().await; - session_guard.clear_history()?; + session_guard.clear_history(); Ok(()) } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs deleted file mode 100644 index 4002220..0000000 --- a/src/storage/mod.rs +++ /dev/null @@ -1,651 +0,0 @@ -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; - -use rusqlite::{Connection, OptionalExtension, params}; -use serde::{Deserialize, Serialize}; - -use crate::bus::ChatMessage; - -#[derive(Debug, thiserror::Error)] -pub enum StorageError { - #[error("database error: {0}")] - Database(#[from] rusqlite::Error), - #[error("io error: {0}")] - Io(#[from] std::io::Error), - #[error("serialization error: {0}")] - Serialization(#[from] serde_json::Error), -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct SessionRecord { - pub id: String, - pub title: String, - pub channel_name: String, - pub chat_id: String, - pub summary: Option<String>, - pub created_at: i64, - pub updated_at: i64, - pub last_active_at: i64, - pub archived_at: Option<i64>, - pub deleted_at: Option<i64>, - pub message_count: i64, - pub reset_cutoff_seq: i64, -} - -#[derive(Clone)] -pub struct SessionStore { - conn: Arc<Mutex<Connection>>, -} - -impl SessionStore { - pub fn new() -> Result<Self, StorageError> { - let db_path = default_session_db_path()?; - Self::open_at_path(&db_path) - } - - fn open_at_path(path: &Path) -> Result<Self, StorageError> { - if let Some(parent) = path.parent() { - std::fs::create_dir_all(parent)?; - } - - let conn = Connection::open(path)?; - Self::from_connection(conn) - } - - fn from_connection(conn: Connection) -> Result<Self, StorageError> { - conn.execute_batch( - " - PRAGMA journal_mode = WAL; - PRAGMA foreign_keys = ON; - - CREATE TABLE IF NOT EXISTS sessions ( - id TEXT PRIMARY KEY, - title TEXT NOT NULL, - channel_name TEXT NOT NULL, - chat_id TEXT NOT NULL, - summary TEXT, - created_at INTEGER NOT NULL, - updated_at INTEGER NOT NULL, - last_active_at INTEGER NOT NULL, - archived_at INTEGER, - deleted_at INTEGER, - message_count INTEGER NOT NULL DEFAULT 0, - reset_cutoff_seq INTEGER NOT NULL DEFAULT 0 - ); - - CREATE INDEX IF NOT EXISTS idx_sessions_channel_archived - ON sessions(channel_name, archived_at, last_active_at DESC); - CREATE INDEX IF NOT EXISTS idx_sessions_updated_at - ON sessions(updated_at DESC); - - CREATE TABLE IF NOT EXISTS messages ( - id TEXT PRIMARY KEY, - session_id TEXT NOT NULL, - seq INTEGER NOT NULL, - role TEXT NOT NULL, - content TEXT NOT NULL, - media_refs_json TEXT NOT NULL, - tool_call_id TEXT, - tool_name TEXT, - tool_calls_json TEXT, - created_at INTEGER NOT NULL, - FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE, - UNIQUE(session_id, seq) - ); - - CREATE INDEX IF NOT EXISTS idx_messages_session_seq - ON messages(session_id, seq); - CREATE INDEX IF NOT EXISTS idx_messages_session_created - ON messages(session_id, created_at); - ", - )?; - - ensure_sessions_schema(&conn)?; - - Ok(Self { - conn: Arc::new(Mutex::new(conn)), - }) - } - - #[cfg(test)] - pub(crate) fn in_memory() -> Result<Self, StorageError> { - Self::from_connection(Connection::open_in_memory()?) - } - - pub fn create_cli_session(&self, title: Option<&str>) -> Result<SessionRecord, StorageError> { - let now = crate::bus::message::current_timestamp(); - let id = uuid::Uuid::new_v4().to_string(); - let title = title - .map(str::trim) - .filter(|value| !value.is_empty()) - .map(ToOwned::to_owned) - .unwrap_or_else(|| format!("CLI Session {}", &id[..8])); - - let conn = self.conn.lock().expect("session db mutex poisoned"); - conn.execute( - " - INSERT INTO sessions ( - id, title, channel_name, chat_id, summary, - created_at, updated_at, last_active_at, archived_at, deleted_at, message_count - ) VALUES (?1, ?2, 'cli_chat', ?3, NULL, ?4, ?4, ?4, NULL, NULL, 0) - ", - params![id, title, id, now], - )?; - - drop(conn); - self.get_session(&id)?.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into()) - } - - pub fn ensure_channel_session( - &self, - channel_name: &str, - chat_id: &str, - dialog_id: &str, - ) -> Result<SessionRecord, StorageError> { - let session_id = persistent_session_id(channel_name, chat_id, dialog_id); - if let Some(record) = self.get_session(&session_id)? { - return Ok(record); - } - - let now = crate::bus::message::current_timestamp(); - let title = format!("{}:{}", channel_name, chat_id); - let conn = self.conn.lock().expect("session db mutex poisoned"); - conn.execute( - " - INSERT INTO sessions ( - id, title, channel_name, chat_id, summary, - created_at, updated_at, last_active_at, archived_at, deleted_at, message_count - ) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0) - ", - params![session_id, title, channel_name, chat_id, now], - )?; - drop(conn); - - self.get_session(&session_id)?.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into()) - } - - pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); - let mut stmt = conn.prepare( - " - SELECT id, title, channel_name, chat_id, summary, - created_at, updated_at, last_active_at, - archived_at, deleted_at, message_count, reset_cutoff_seq - FROM sessions - WHERE id = ?1 AND deleted_at IS NULL - ", - )?; - - stmt.query_row(params![session_id], map_session_record) - .optional() - .map_err(StorageError::from) - } - - pub fn list_sessions( - &self, - channel_name: &str, - include_archived: bool, - ) -> Result<Vec<SessionRecord>, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); - let mut sql = String::from( - " - SELECT id, title, channel_name, chat_id, summary, - created_at, updated_at, last_active_at, - archived_at, deleted_at, message_count, reset_cutoff_seq - FROM sessions - WHERE channel_name = ?1 - AND deleted_at IS NULL - ", - ); - - if !include_archived { - sql.push_str(" AND archived_at IS NULL"); - } - - sql.push_str(" ORDER BY last_active_at DESC, created_at DESC"); - - let mut stmt = conn.prepare(&sql)?; - let rows = stmt.query_map(params![channel_name], map_session_record)?; - let mut sessions = Vec::new(); - for row in rows { - sessions.push(row?); - } - Ok(sessions) - } - - pub fn rename_session(&self, session_id: &str, title: &str) -> Result<(), StorageError> { - let now = crate::bus::message::current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); - conn.execute( - "UPDATE sessions SET title = ?2, updated_at = ?3 WHERE id = ?1 AND deleted_at IS NULL", - params![session_id, title.trim(), now], - )?; - Ok(()) - } - - pub fn archive_session(&self, session_id: &str) -> Result<(), StorageError> { - let now = crate::bus::message::current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); - conn.execute( - "UPDATE sessions SET archived_at = ?2, updated_at = ?2 WHERE id = ?1 AND deleted_at IS NULL", - params![session_id, now], - )?; - Ok(()) - } - - pub fn delete_session(&self, session_id: &str) -> Result<(), StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); - conn.execute("DELETE FROM messages WHERE session_id = ?1", params![session_id])?; - conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?; - Ok(()) - } - - pub fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> { - let now = crate::bus::message::current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); - conn.execute("DELETE FROM messages WHERE session_id = ?1", params![session_id])?; - conn.execute( - " - UPDATE sessions - SET message_count = 0, updated_at = ?2, last_active_at = ?2, reset_cutoff_seq = 0 - WHERE id = ?1 AND deleted_at IS NULL - ", - params![session_id, now], - )?; - Ok(()) - } - - pub fn reset_session(&self, session_id: &str) -> Result<(), StorageError> { - let now = crate::bus::message::current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); - let tx = conn.unchecked_transaction()?; - - let cutoff_seq: i64 = tx.query_row( - "SELECT COALESCE(MAX(seq), 0) FROM messages WHERE session_id = ?1", - params![session_id], - |row| row.get(0), - )?; - - tx.execute( - " - UPDATE sessions - SET reset_cutoff_seq = ?2, - updated_at = ?3, - last_active_at = ?3, - archived_at = NULL - WHERE id = ?1 AND deleted_at IS NULL - ", - params![session_id, cutoff_seq, now], - )?; - - tx.commit()?; - Ok(()) - } - - pub fn append_message(&self, session_id: &str, message: &ChatMessage) -> Result<(), StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); - let tx = conn.unchecked_transaction()?; - - let seq: i64 = tx.query_row( - "SELECT COALESCE(MAX(seq), 0) + 1 FROM messages WHERE session_id = ?1", - params![session_id], - |row| row.get(0), - )?; - - let media_refs_json = serde_json::to_string(&message.media_refs)?; - let tool_calls_json = message.tool_calls.as_ref().map(serde_json::to_string).transpose()?; - tx.execute( - " - INSERT INTO messages ( - id, session_id, seq, role, content, - media_refs_json, tool_call_id, tool_name, tool_calls_json, created_at - ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10) - ", - params![ - message.id, - session_id, - seq, - message.role, - message.content, - media_refs_json, - message.tool_call_id, - message.tool_name, - tool_calls_json, - message.timestamp, - ], - )?; - - let now = crate::bus::message::current_timestamp(); - tx.execute( - " - UPDATE sessions - SET message_count = message_count + 1, - updated_at = ?2, - last_active_at = ?2, - archived_at = NULL - WHERE id = ?1 AND deleted_at IS NULL - ", - params![session_id, now], - )?; - - tx.commit()?; - Ok(()) - } - - pub fn load_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); - let cutoff_seq = active_reset_cutoff(&conn, session_id)?; - load_messages_after(&conn, session_id, cutoff_seq) - } - - pub fn load_all_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); - load_messages_after(&conn, session_id, 0) - } -} - -pub fn persistent_session_id(channel_name: &str, chat_id: &str, dialog_id: &str) -> String { - format!("{}:{}:{}", channel_name, chat_id, dialog_id) -} - -fn default_session_db_path() -> Result<PathBuf, std::io::Error> { - let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from(".")); - Ok(home.join(".picobot").join("storage").join("sessions.db")) -} - -fn map_session_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> { - Ok(SessionRecord { - id: row.get(0)?, - title: row.get(1)?, - channel_name: row.get(2)?, - chat_id: row.get(3)?, - summary: row.get(4)?, - created_at: row.get(5)?, - updated_at: row.get(6)?, - last_active_at: row.get(7)?, - archived_at: row.get(8)?, - deleted_at: row.get(9)?, - message_count: row.get(10)?, - reset_cutoff_seq: row.get(11)?, - }) -} - -fn ensure_sessions_schema(conn: &Connection) -> Result<(), StorageError> { - if !has_column(conn, "sessions", "reset_cutoff_seq")? { - conn.execute( - "ALTER TABLE sessions ADD COLUMN reset_cutoff_seq INTEGER NOT NULL DEFAULT 0", - [], - )?; - } - - Ok(()) -} - -fn has_column(conn: &Connection, table_name: &str, column_name: &str) -> Result<bool, StorageError> { - let pragma = format!("PRAGMA table_info({})", table_name); - let mut stmt = conn.prepare(&pragma)?; - let mut rows = stmt.query([])?; - - while let Some(row) = rows.next()? { - let existing_name: String = row.get(1)?; - if existing_name == column_name { - return Ok(true); - } - } - - Ok(false) -} - -fn active_reset_cutoff(conn: &Connection, session_id: &str) -> Result<i64, StorageError> { - let cutoff = conn - .query_row( - "SELECT reset_cutoff_seq FROM sessions WHERE id = ?1 AND deleted_at IS NULL", - params![session_id], - |row| row.get(0), - ) - .optional()?; - - Ok(cutoff.unwrap_or(0)) -} - -fn load_messages_after( - conn: &Connection, - session_id: &str, - cutoff_seq: i64, -) -> Result<Vec<ChatMessage>, StorageError> { - let mut stmt = conn.prepare( - " - SELECT id, role, content, media_refs_json, created_at, tool_call_id, tool_name, tool_calls_json - FROM messages - WHERE session_id = ?1 AND seq > ?2 - ORDER BY seq ASC - ", - )?; - - let rows = stmt.query_map(params![session_id, cutoff_seq], |row| { - let media_refs_json: String = row.get(3)?; - let media_refs: Vec<String> = serde_json::from_str(&media_refs_json).map_err(|err| { - rusqlite::Error::FromSqlConversionFailure( - media_refs_json.len(), - rusqlite::types::Type::Text, - Box::new(err), - ) - })?; - - let tool_calls_json: Option<String> = row.get(7)?; - let tool_calls = tool_calls_json - .as_deref() - .map(serde_json::from_str) - .transpose() - .map_err(|err| { - rusqlite::Error::FromSqlConversionFailure( - 7, - rusqlite::types::Type::Text, - Box::new(err), - ) - })?; - - Ok(ChatMessage { - id: row.get(0)?, - role: row.get(1)?, - content: row.get(2)?, - media_refs, - timestamp: row.get(4)?, - tool_call_id: row.get(5)?, - tool_name: row.get(6)?, - tool_calls, - }) - })?; - - let mut messages = Vec::new(); - for row in rows { - messages.push(row?); - } - Ok(messages) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::providers::ToolCall; - - #[test] - fn test_persistent_session_id_for_cli_and_channel() { - assert_eq!(persistent_session_id("cli", "abc", "default"), "cli:abc:default"); - assert_eq!(persistent_session_id("cli_chat", "abc", "default"), "cli_chat:abc:default"); - assert_eq!(persistent_session_id("feishu", "abc", "default"), "feishu:abc:default"); - } - - #[test] - fn test_session_store_roundtrip_and_lifecycle() { - let store = SessionStore::in_memory().unwrap(); - - let session = store.create_cli_session(Some("demo")).unwrap(); - assert_eq!(session.title, "demo"); - assert_eq!(session.channel_name, "cli_chat"); - assert_eq!(session.chat_id, session.id); - assert_eq!(session.message_count, 0); - assert_eq!(session.reset_cutoff_seq, 0); - - let first = ChatMessage::user("hello"); - let second = ChatMessage::assistant("world"); - store.append_message(&session.id, &first).unwrap(); - store.append_message(&session.id, &second).unwrap(); - - let stored = store.get_session(&session.id).unwrap().unwrap(); - assert_eq!(stored.message_count, 2); - assert!(stored.archived_at.is_none()); - assert_eq!(stored.reset_cutoff_seq, 0); - - let messages = store.load_messages(&session.id).unwrap(); - assert_eq!(messages.len(), 2); - assert_eq!(messages[0].role, "user"); - assert_eq!(messages[0].content, "hello"); - assert_eq!(messages[1].role, "assistant"); - assert_eq!(messages[1].content, "world"); - - store.rename_session(&session.id, "renamed").unwrap(); - let renamed = store.get_session(&session.id).unwrap().unwrap(); - assert_eq!(renamed.title, "renamed"); - - store.archive_session(&session.id).unwrap(); - let archived = store.get_session(&session.id).unwrap().unwrap(); - assert!(archived.archived_at.is_some()); - - let active_only = store.list_sessions("cli_chat", false).unwrap(); - assert!(active_only.is_empty()); - - let including_archived = store.list_sessions("cli_chat", true).unwrap(); - assert_eq!(including_archived.len(), 1); - - store.clear_messages(&session.id).unwrap(); - let cleared = store.load_messages(&session.id).unwrap(); - assert!(cleared.is_empty()); - let cleared_session = store.get_session(&session.id).unwrap().unwrap(); - assert_eq!(cleared_session.message_count, 0); - - store.delete_session(&session.id).unwrap(); - assert!(store.get_session(&session.id).unwrap().is_none()); - } - - #[test] - fn test_ensure_channel_session_is_stable() { - let store = SessionStore::in_memory().unwrap(); - - let first = store.ensure_channel_session("feishu", "chat-1", "default").unwrap(); - let second = store.ensure_channel_session("feishu", "chat-1", "default").unwrap(); - - assert_eq!(first.id, second.id); - assert_eq!(first.chat_id, "chat-1"); - assert_eq!(second.channel_name, "feishu"); - } - - #[test] - fn test_assistant_tool_calls_roundtrip() { - let store = SessionStore::in_memory().unwrap(); - let session = store.create_cli_session(Some("tools")).unwrap(); - - let assistant = ChatMessage::assistant_with_tool_calls( - "calling tool", - vec![ToolCall { - id: "call_1".to_string(), - name: "calculator".to_string(), - arguments: serde_json::json!({ "expression": "3*7" }), - }], - ); - - store.append_message(&session.id, &assistant).unwrap(); - - let messages = store.load_messages(&session.id).unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0].role, "assistant"); - assert_eq!(messages[0].tool_calls.as_ref().unwrap().len(), 1); - assert_eq!(messages[0].tool_calls.as_ref().unwrap()[0].id, "call_1"); - assert_eq!(messages[0].tool_calls.as_ref().unwrap()[0].name, "calculator"); - } - - #[test] - fn test_reset_session_preserves_full_history_and_hides_active_history() { - let store = SessionStore::in_memory().unwrap(); - let session = store.create_cli_session(Some("reset")).unwrap(); - - store.append_message(&session.id, &ChatMessage::user("before")).unwrap(); - store.append_message(&session.id, &ChatMessage::assistant("context")).unwrap(); - store.reset_session(&session.id).unwrap(); - - let stored = store.get_session(&session.id).unwrap().unwrap(); - assert_eq!(stored.reset_cutoff_seq, 2); - - let active_messages = store.load_messages(&session.id).unwrap(); - assert!(active_messages.is_empty()); - - let all_messages = store.load_all_messages(&session.id).unwrap(); - assert_eq!(all_messages.len(), 2); - assert_eq!(all_messages[0].content, "before"); - assert_eq!(all_messages[1].content, "context"); - - store.append_message(&session.id, &ChatMessage::user("after")).unwrap(); - let active_messages = store.load_messages(&session.id).unwrap(); - assert_eq!(active_messages.len(), 1); - assert_eq!(active_messages[0].content, "after"); - } - - #[test] - fn test_schema_migration_adds_reset_cutoff_column() { - let conn = Connection::open_in_memory().unwrap(); - conn.execute_batch( - " - CREATE TABLE sessions ( - id TEXT PRIMARY KEY, - title TEXT NOT NULL, - channel_name TEXT NOT NULL, - chat_id TEXT NOT NULL, - summary TEXT, - created_at INTEGER NOT NULL, - updated_at INTEGER NOT NULL, - last_active_at INTEGER NOT NULL, - archived_at INTEGER, - deleted_at INTEGER, - message_count INTEGER NOT NULL DEFAULT 0 - ); - - CREATE TABLE messages ( - id TEXT PRIMARY KEY, - session_id TEXT NOT NULL, - seq INTEGER NOT NULL, - role TEXT NOT NULL, - content TEXT NOT NULL, - media_refs_json TEXT NOT NULL, - tool_call_id TEXT, - tool_name TEXT, - tool_calls_json TEXT, - created_at INTEGER NOT NULL, - FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE, - UNIQUE(session_id, seq) - ); - ", - ) - .unwrap(); - - let store = SessionStore::from_connection(conn).unwrap(); - let session = store.create_cli_session(Some("migrated")).unwrap(); - assert_eq!(session.reset_cutoff_seq, 0); - } - - #[test] - fn test_tool_result_roundtrip() { - let store = SessionStore::in_memory().unwrap(); - let session = store.create_cli_session(Some("tool-result")).unwrap(); - - let tool_message = ChatMessage::tool("call_9", "file_write", "saved to /tmp/output.txt"); - store.append_message(&session.id, &tool_message).unwrap(); - - let messages = store.load_messages(&session.id).unwrap(); - assert_eq!(messages.len(), 1); - assert_eq!(messages[0].role, "tool"); - assert_eq!(messages[0].content, "saved to /tmp/output.txt"); - assert_eq!(messages[0].tool_call_id.as_deref(), Some("call_9")); - assert_eq!(messages[0].tool_name.as_deref(), Some("file_write")); - assert!(messages[0].tool_calls.is_none()); - } -} \ No newline at end of file