diff --git a/Cargo.toml b/Cargo.toml index bf5aaae..c25fb28 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,7 +33,9 @@ base64 = "0.22" image = { version = "0.25", default-features = false, features = ["jpeg", "png", "gif", "webp"] } tempfile = "3" meval = "0.2" -rusqlite = { version = "0.32", features = ["bundled"] } +rusqlite = { version = "0.39", features = ["bundled"] } +r2d2 = "0.8" +r2d2_sqlite = "0.34" rustls = { version = "0.23", features = ["ring"] } wechatbot = { path = "vendor/wechatbot" } encoding_rs = "0.8" diff --git a/src/storage/error.rs b/src/storage/error.rs index 5e67e53..fb3705f 100644 --- a/src/storage/error.rs +++ b/src/storage/error.rs @@ -6,4 +6,6 @@ pub enum StorageError { Io(#[from] std::io::Error), #[error("serialization error: {0}")] Serialization(#[from] serde_json::Error), + #[error("connection pool error: {0}")] + Pool(#[from] r2d2::Error), } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 28c37a3..0185a1f 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,7 +1,8 @@ #[cfg(not(test))] use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex}; +use r2d2::Pool; +use r2d2_sqlite::SqliteConnectionManager; use rusqlite::{Connection, OptionalExtension, params}; use crate::bus::ChatMessage; @@ -24,13 +25,13 @@ pub use records::{ #[derive(Clone)] pub struct SessionStore { - conn: Arc>, + pool: Pool, } impl SessionStore { #[cfg(test)] pub fn new() -> Result { - Self::from_connection(Connection::open_in_memory()?) + Self::in_memory() } #[cfg(not(test))] @@ -45,11 +46,15 @@ impl SessionStore { std::fs::create_dir_all(parent)?; } - let conn = Connection::open(path)?; - Self::from_connection(conn) + let path_str = path.to_string_lossy().to_string(); + let conn = Connection::open(&path_str)?; + Self::from_connection(conn, &path_str) } - fn from_connection(conn: Connection) -> Result { + /// Initialize a SessionStore from a connection and its file path. + /// The connection is used for schema initialization only; the pool + /// manages subsequent connections using the same file path. + fn from_connection(conn: Connection, db_uri: &str) -> Result { conn.busy_timeout(std::time::Duration::from_secs(5))?; conn.execute_batch( " @@ -213,14 +218,35 @@ impl SessionStore { ensure_scheduler_schema(&conn)?; ensure_memory_scope_key_migration(&conn)?; - Ok(Self { - conn: Arc::new(Mutex::new(conn)), - }) + drop(conn); + + let manager = SqliteConnectionManager::file(db_uri) + .with_init(|c| { + c.busy_timeout(std::time::Duration::from_secs(5))?; + Ok(()) + }); + let pool = Pool::builder() + .max_size(8) + .build(manager)?; + + Ok(Self { pool }) } #[cfg(test)] pub(crate) fn in_memory() -> Result { - Self::from_connection(Connection::open_in_memory()?) + // Use a temp file so the database survives across pool connections. + // Temp dir is cleaned by the OS eventually; tests that need cleanup + // can call std::fs::remove_file on the path. + let path = std::env::temp_dir() + .join(format!("picobot_test_{}.db", uuid::Uuid::new_v4())); + let conn = Connection::open(&path)?; + let path_str = path.to_string_lossy().to_string(); + // ignore unused mut warning for manager in tests + #[allow(unused_mut)] + let store = Self::from_connection(conn, &path_str)?; + // Clean up temp file when the store is dropped + // We can't easily do this automatically, but the files are small. + Ok(store) } pub fn create_session( @@ -244,7 +270,7 @@ impl SessionStore { } }); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( " INSERT INTO sessions ( @@ -256,8 +282,7 @@ impl SessionStore { params![&session_id, title, channel_name, id, now], )?; - drop(conn); - self.get_session(&session_id)? + get_session_with_conn(&conn, &session_id)? .ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into()) } @@ -282,12 +307,12 @@ impl SessionStore { chat_id: &str, title: &str, ) -> Result { - if let Some(record) = self.get_session(session_id)? { + let conn = self.pool.get()?; + if let Some(record) = get_session_with_conn(&conn, session_id)? { return Ok(record); } let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); conn.execute( " INSERT INTO sessions ( @@ -298,28 +323,14 @@ impl SessionStore { ", params![session_id, title, channel_name, chat_id, now], )?; - drop(conn); - self.get_session(session_id)? + get_session_with_conn(&conn, session_id)? .ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into()) } pub fn get_session(&self, session_id: &str) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); - let mut stmt = conn.prepare( - " - SELECT id, title, channel_name, chat_id, summary, - created_at, updated_at, last_active_at, - archived_at, deleted_at, message_count, - user_turn_count, agent_prompt_reinjection_count - FROM sessions - WHERE id = ?1 AND deleted_at IS NULL - ", - )?; - - stmt.query_row(params![session_id], map_session_record) - .optional() - .map_err(StorageError::from) + let conn = self.pool.get()?; + get_session_with_conn(&conn, session_id) } /// Find sessions whose id ends with the given suffix (used for task session lookup) @@ -327,7 +338,7 @@ impl SessionStore { &self, suffix: &str, ) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let pattern = format!("%{}", suffix); let mut stmt = conn.prepare( " @@ -354,7 +365,7 @@ impl SessionStore { channel_name: &str, include_archived: bool, ) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let mut sql = String::from( " SELECT id, title, channel_name, chat_id, summary, @@ -384,7 +395,7 @@ impl SessionStore { pub fn rename_session(&self, session_id: &str, title: &str) -> Result<(), StorageError> { let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( "UPDATE sessions SET title = ?2, updated_at = ?3 WHERE id = ?1 AND deleted_at IS NULL", params![session_id, title.trim(), now], @@ -394,7 +405,7 @@ impl SessionStore { pub fn archive_session(&self, session_id: &str) -> Result<(), StorageError> { let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( "UPDATE sessions SET archived_at = ?2, updated_at = ?2 WHERE id = ?1 AND deleted_at IS NULL", params![session_id, now], @@ -403,7 +414,7 @@ impl SessionStore { } pub fn delete_session(&self, session_id: &str) -> Result<(), StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( "DELETE FROM messages WHERE session_id = ?1", params![session_id], @@ -423,7 +434,7 @@ impl SessionStore { let now = current_timestamp(); let id = format!("topic:{}", uuid::Uuid::new_v4()); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( "INSERT INTO topics (id, session_id, title, description, created_at, updated_at, last_active_at, message_count) VALUES (?1, ?2, ?3, ?4, ?5, ?5, ?5, 0)", params![&id, session_id, title, description.unwrap_or(""), now], @@ -435,7 +446,7 @@ impl SessionStore { } pub fn get_topic(&self, topic_id: &str) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let mut stmt = conn.prepare( "SELECT id, session_id, title, description, created_at, updated_at, last_active_at, message_count FROM topics WHERE id = ?1", )?; @@ -457,7 +468,7 @@ impl SessionStore { } pub fn list_topics(&self, session_id: &str) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let mut stmt = conn.prepare( "SELECT id, session_id, title, description, created_at, updated_at, last_active_at, message_count FROM topics WHERE session_id = ?1 ORDER BY last_active_at DESC" )?; @@ -484,7 +495,7 @@ impl SessionStore { pub fn update_topic_title(&self, topic_id: &str, title: &str) -> Result<(), StorageError> { let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( "UPDATE topics SET title = ?2, updated_at = ?3 WHERE id = ?1", params![topic_id, title.trim(), now], @@ -494,7 +505,7 @@ impl SessionStore { pub fn update_topic_description(&self, topic_id: &str, description: &str) -> Result<(), StorageError> { let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( "UPDATE topics SET description = ?2, updated_at = ?3 WHERE id = ?1", params![topic_id, description, now], @@ -503,7 +514,7 @@ impl SessionStore { } pub fn delete_topic(&self, topic_id: &str) -> Result<(), StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; // Messages 的 topic_id 会被设为 NULL(ON DELETE SET NULL) conn.execute("DELETE FROM topics WHERE id = ?1", params![topic_id])?; Ok(()) @@ -511,7 +522,7 @@ impl SessionStore { pub fn touch_topic(&self, topic_id: &str) -> Result<(), StorageError> { let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( "UPDATE topics SET last_active_at = ?2 WHERE id = ?1", params![topic_id, now], @@ -521,7 +532,7 @@ impl SessionStore { pub fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> { let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( "DELETE FROM messages WHERE session_id = ?1", params![session_id], @@ -555,7 +566,7 @@ impl SessionStore { topic_id: Option<&str>, message: &ChatMessage, ) -> Result<(), StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let tx = conn.unchecked_transaction()?; let seq: i64 = tx.query_row( @@ -631,7 +642,7 @@ impl SessionStore { return Ok(()); } - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let tx = conn.unchecked_transaction()?; let mut seq: i64 = tx.query_row( @@ -716,7 +727,7 @@ impl SessionStore { summary_message: &ChatMessage, preserved_messages: &[ChatMessage], ) -> Result { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let tx = conn.unchecked_transaction()?; let current_max_seq: i64 = tx.query_row( @@ -804,7 +815,7 @@ impl SessionStore { pub fn mark_agent_prompt_reinjected(&self, session_id: &str) -> Result<(), StorageError> { let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( " UPDATE sessions @@ -826,7 +837,7 @@ impl SessionStore { skill_name: Option<&str>, payload: &serde_json::Value, ) -> Result<(), StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( " INSERT INTO skill_events ( @@ -849,7 +860,7 @@ impl SessionStore { &self, session_id: Option<&str>, ) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let sql = if session_id.is_some() { " SELECT id, session_id, event_type, skill_name, payload_json, created_at @@ -882,7 +893,7 @@ impl SessionStore { pub fn put_memory(&self, input: &MemoryUpsert) -> Result { let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let tx = conn.unchecked_transaction()?; let existing: Option<(String, i64)> = tx @@ -940,9 +951,9 @@ impl SessionStore { )?; tx.commit()?; - drop(conn); - self.get_memory( + get_memory_with_conn( + &conn, &input.scope_kind, &input.scope_key, &input.namespace, @@ -958,23 +969,8 @@ impl SessionStore { namespace: &str, memory_key: &str, ) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); - let mut stmt = conn.prepare( - " - SELECT id, scope_kind, scope_key, namespace, memory_key, content, - source_type, source_session_id, source_message_id, source_message_seq, - source_channel_name, source_chat_id, created_at, updated_at - FROM memories - WHERE scope_kind = ?1 AND scope_key = ?2 AND namespace = ?3 AND memory_key = ?4 - ", - )?; - - stmt.query_row( - params![scope_kind, scope_key, namespace, memory_key], - map_memory_record, - ) - .optional() - .map_err(StorageError::from) + let conn = self.pool.get()?; + get_memory_with_conn(&conn, scope_kind, scope_key, namespace, memory_key) } pub fn list_memories( @@ -984,7 +980,7 @@ impl SessionStore { namespace: Option<&str>, limit: usize, ) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let limit = limit.max(1) as i64; let mut memories = Vec::new(); @@ -1029,7 +1025,7 @@ impl SessionStore { } pub fn list_memory_scope_keys(&self, scope_kind: &str) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let mut stmt = conn.prepare( " SELECT DISTINCT scope_key @@ -1052,7 +1048,7 @@ impl SessionStore { scope_kind: &str, scope_key: &str, ) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let mut stmt = conn.prepare( " SELECT id, scope_kind, scope_key, namespace, memory_key, content, @@ -1098,7 +1094,7 @@ impl SessionStore { namespace: &str, memory_key: &str, ) -> Result { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let changed = conn.execute( " DELETE FROM memories @@ -1114,7 +1110,7 @@ impl SessionStore { input: &SchedulerJobUpsert, ) -> Result { let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( " INSERT INTO scheduler_jobs ( @@ -1163,9 +1159,7 @@ impl SessionStore { now, ], )?; - drop(conn); - - self.get_scheduler_job(&input.id)? + get_scheduler_job_with_conn(&conn, &input.id)? .ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into()) } @@ -1173,28 +1167,15 @@ impl SessionStore { &self, job_id: &str, ) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); - let mut stmt = conn.prepare( - " - SELECT id, kind, schedule_json, interval_secs, startup_delay_secs, - target_json, payload_json, enabled, state, last_status, last_error, - run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at, - created_at, updated_at - FROM scheduler_jobs - WHERE id = ?1 - ", - )?; - - stmt.query_row(params![job_id], map_scheduler_job_record) - .optional() - .map_err(StorageError::from) + let conn = self.pool.get()?; + get_scheduler_job_with_conn(&conn, job_id) } pub fn list_scheduler_jobs( &self, enabled_only: bool, ) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let sql = if enabled_only { " SELECT id, kind, schedule_json, interval_secs, startup_delay_secs, @@ -1226,7 +1207,7 @@ impl SessionStore { } pub fn list_running_scheduler_jobs(&self) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let sql = " SELECT id, kind, schedule_json, interval_secs, startup_delay_secs, target_json, payload_json, enabled, state, last_status, last_error, @@ -1247,7 +1228,7 @@ impl SessionStore { } pub fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute("DELETE FROM scheduler_jobs WHERE id = ?1", params![job_id])?; Ok(()) } @@ -1264,7 +1245,7 @@ impl SessionStore { paused_at: Option, completed_at: Option, ) -> Result<(), StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.execute( " UPDATE scheduler_jobs @@ -1303,7 +1284,7 @@ impl SessionStore { namespace: Option<&str>, limit: usize, ) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let limit = limit.max(1) as i64; let query = quote_fts_query(query); let mut memories = Vec::new(); @@ -1366,7 +1347,7 @@ impl SessionStore { namespace: Option<&str>, limit: usize, ) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let limit = limit.max(1) as i64; let query = quote_fts_or_query(queries); if query.is_empty() { @@ -1426,12 +1407,12 @@ impl SessionStore { } pub fn load_messages(&self, session_id: &str) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; load_messages_after(&conn, session_id, 0) } pub fn load_messages_for_topic(&self, topic_id: &str) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; let mut stmt = conn.prepare( " SELECT id, role, content, system_context, reasoning_content, media_refs_json, created_at, tool_call_id, tool_name, tool_calls_json, tool_duration_ms @@ -1493,12 +1474,12 @@ impl SessionStore { } pub fn load_all_messages(&self, session_id: &str) -> Result, StorageError> { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; load_messages_after(&conn, session_id, 0) } pub fn count_active_user_messages(&self, session_id: &str) -> Result { - let conn = self.conn.lock().expect("session db mutex poisoned"); + let conn = self.pool.get()?; conn.query_row( " SELECT COUNT(*) @@ -1526,6 +1507,69 @@ fn default_session_db_path() -> Result { Ok(home.join(".picobot").join("storage").join("sessions.db")) } +/// 使用已有连接查询 session(避免从池中重复借用) +fn get_session_with_conn(conn: &Connection, session_id: &str) -> Result, StorageError> { + let mut stmt = conn.prepare( + " + SELECT id, title, channel_name, chat_id, summary, + created_at, updated_at, last_active_at, + archived_at, deleted_at, message_count, + user_turn_count, agent_prompt_reinjection_count + FROM sessions + WHERE id = ?1 AND deleted_at IS NULL + ", + )?; + + stmt.query_row(params![session_id], map_session_record) + .optional() + .map_err(StorageError::from) +} + +fn get_memory_with_conn( + conn: &Connection, + scope_kind: &str, + scope_key: &str, + namespace: &str, + memory_key: &str, +) -> Result, StorageError> { + let mut stmt = conn.prepare( + " + SELECT id, scope_kind, scope_key, namespace, memory_key, content, + source_type, source_session_id, source_message_id, source_message_seq, + source_channel_name, source_chat_id, created_at, updated_at + FROM memories + WHERE scope_kind = ?1 AND scope_key = ?2 AND namespace = ?3 AND memory_key = ?4 + ", + )?; + + stmt.query_row( + params![scope_kind, scope_key, namespace, memory_key], + map_memory_record, + ) + .optional() + .map_err(StorageError::from) +} + +fn get_scheduler_job_with_conn( + conn: &Connection, + job_id: &str, +) -> Result, StorageError> { + let mut stmt = conn.prepare( + " + SELECT id, kind, schedule_json, interval_secs, startup_delay_secs, + target_json, payload_json, enabled, state, last_status, last_error, + run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at, + created_at, updated_at + FROM scheduler_jobs + WHERE id = ?1 + ", + )?; + + stmt.query_row(params![job_id], map_scheduler_job_record) + .optional() + .map_err(StorageError::from) +} + fn map_session_record(row: &rusqlite::Row<'_>) -> rusqlite::Result { Ok(SessionRecord { id: row.get(0)?, @@ -2117,7 +2161,8 @@ mod tests { #[test] fn test_schema_migration_adds_user_turn_and_reinjection_columns() { - let conn = Connection::open_in_memory().unwrap(); + let tmp = std::env::temp_dir().join(format!("picobot_test_mig2_{}.db", uuid::Uuid::new_v4())); + let conn = Connection::open(&tmp).unwrap(); conn.execute_batch( " CREATE TABLE sessions ( @@ -2152,7 +2197,8 @@ mod tests { ) .unwrap(); - let store = SessionStore::from_connection(conn).unwrap(); + let path_str = tmp.to_string_lossy().to_string(); + let store = SessionStore::from_connection(conn, &path_str).unwrap(); let session = store.create_cli_session(Some("migrated")).unwrap(); assert_eq!(session.user_turn_count, 0); assert_eq!(session.agent_prompt_reinjection_count, 0); @@ -2160,7 +2206,9 @@ mod tests { #[test] fn test_schema_migration_adds_reasoning_content_column_to_messages() { - let conn = Connection::open_in_memory().unwrap(); + let tmp = std::env::temp_dir() + .join(format!("picobot_test_mig_{}.db", uuid::Uuid::new_v4())); + let conn = Connection::open(&tmp).unwrap(); conn.execute_batch( " CREATE TABLE sessions ( @@ -2195,8 +2243,9 @@ mod tests { ) .unwrap(); - let _store = SessionStore::from_connection(conn).unwrap(); - let conn = _store.conn.lock().unwrap(); + let path_str = tmp.to_string_lossy().to_string(); + let _store = SessionStore::from_connection(conn, &path_str).unwrap(); + let conn = _store.pool.get().unwrap(); assert!(has_column(&conn, "messages", "reasoning_content").unwrap()); } diff --git a/web/src/components/Chat/MessageBubble.tsx b/web/src/components/Chat/MessageBubble.tsx index 38a02bf..37f54ee 100644 --- a/web/src/components/Chat/MessageBubble.tsx +++ b/web/src/components/Chat/MessageBubble.tsx @@ -647,8 +647,9 @@ export function MessageBubble({ message, onNavigateToSubAgent }: MessageBubblePr // 段落 p: ({ children }) =>

{children}

, // 列表 - ul: ({ children }) =>
    {children}
, - ol: ({ children }) =>
    {children}
, + ul: ({ children }) =>
    {children}
, + ol: ({ children }) =>
    {children}
, + li: ({ children }) =>
  • {children}
  • , // 链接 a: ({ href, children }) => ( wrappers + list-inside + block

    pushes marker and content apart. + Use list-outside so the marker sits outside the

    flow. + ============================================ */ +.markdown-content li > p { + margin: 0; +} + /* ============================================ Code block styling ============================================ */