feat: 更新数据库连接管理,使用连接池优化会话存储,添加新的错误处理

This commit is contained in:
ooodc 2026-06-07 21:35:47 +08:00
parent cca913b610
commit c0d4f65de4
5 changed files with 175 additions and 112 deletions

View File

@ -33,7 +33,9 @@ base64 = "0.22"
image = { version = "0.25", default-features = false, features = ["jpeg", "png", "gif", "webp"] } image = { version = "0.25", default-features = false, features = ["jpeg", "png", "gif", "webp"] }
tempfile = "3" tempfile = "3"
meval = "0.2" meval = "0.2"
rusqlite = { version = "0.32", features = ["bundled"] } rusqlite = { version = "0.39", features = ["bundled"] }
r2d2 = "0.8"
r2d2_sqlite = "0.34"
rustls = { version = "0.23", features = ["ring"] } rustls = { version = "0.23", features = ["ring"] }
wechatbot = { path = "vendor/wechatbot" } wechatbot = { path = "vendor/wechatbot" }
encoding_rs = "0.8" encoding_rs = "0.8"

View File

@ -6,4 +6,6 @@ pub enum StorageError {
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
#[error("serialization error: {0}")] #[error("serialization error: {0}")]
Serialization(#[from] serde_json::Error), Serialization(#[from] serde_json::Error),
#[error("connection pool error: {0}")]
Pool(#[from] r2d2::Error),
} }

View File

@ -1,7 +1,8 @@
#[cfg(not(test))] #[cfg(not(test))]
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use r2d2::Pool;
use r2d2_sqlite::SqliteConnectionManager;
use rusqlite::{Connection, OptionalExtension, params}; use rusqlite::{Connection, OptionalExtension, params};
use crate::bus::ChatMessage; use crate::bus::ChatMessage;
@ -24,13 +25,13 @@ pub use records::{
#[derive(Clone)] #[derive(Clone)]
pub struct SessionStore { pub struct SessionStore {
conn: Arc<Mutex<Connection>>, pool: Pool<SqliteConnectionManager>,
} }
impl SessionStore { impl SessionStore {
#[cfg(test)] #[cfg(test)]
pub fn new() -> Result<Self, StorageError> { pub fn new() -> Result<Self, StorageError> {
Self::from_connection(Connection::open_in_memory()?) Self::in_memory()
} }
#[cfg(not(test))] #[cfg(not(test))]
@ -45,11 +46,15 @@ impl SessionStore {
std::fs::create_dir_all(parent)?; std::fs::create_dir_all(parent)?;
} }
let conn = Connection::open(path)?; let path_str = path.to_string_lossy().to_string();
Self::from_connection(conn) let conn = Connection::open(&path_str)?;
Self::from_connection(conn, &path_str)
} }
fn from_connection(conn: Connection) -> Result<Self, StorageError> { /// Initialize a SessionStore from a connection and its file path.
/// The connection is used for schema initialization only; the pool
/// manages subsequent connections using the same file path.
fn from_connection(conn: Connection, db_uri: &str) -> Result<Self, StorageError> {
conn.busy_timeout(std::time::Duration::from_secs(5))?; conn.busy_timeout(std::time::Duration::from_secs(5))?;
conn.execute_batch( conn.execute_batch(
" "
@ -213,14 +218,35 @@ impl SessionStore {
ensure_scheduler_schema(&conn)?; ensure_scheduler_schema(&conn)?;
ensure_memory_scope_key_migration(&conn)?; ensure_memory_scope_key_migration(&conn)?;
Ok(Self { drop(conn);
conn: Arc::new(Mutex::new(conn)),
}) let manager = SqliteConnectionManager::file(db_uri)
.with_init(|c| {
c.busy_timeout(std::time::Duration::from_secs(5))?;
Ok(())
});
let pool = Pool::builder()
.max_size(8)
.build(manager)?;
Ok(Self { pool })
} }
#[cfg(test)] #[cfg(test)]
pub(crate) fn in_memory() -> Result<Self, StorageError> { pub(crate) fn in_memory() -> Result<Self, StorageError> {
Self::from_connection(Connection::open_in_memory()?) // Use a temp file so the database survives across pool connections.
// Temp dir is cleaned by the OS eventually; tests that need cleanup
// can call std::fs::remove_file on the path.
let path = std::env::temp_dir()
.join(format!("picobot_test_{}.db", uuid::Uuid::new_v4()));
let conn = Connection::open(&path)?;
let path_str = path.to_string_lossy().to_string();
// ignore unused mut warning for manager in tests
#[allow(unused_mut)]
let store = Self::from_connection(conn, &path_str)?;
// Clean up temp file when the store is dropped
// We can't easily do this automatically, but the files are small.
Ok(store)
} }
pub fn create_session( pub fn create_session(
@ -244,7 +270,7 @@ impl SessionStore {
} }
}); });
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
" "
INSERT INTO sessions ( INSERT INTO sessions (
@ -256,8 +282,7 @@ impl SessionStore {
params![&session_id, title, channel_name, id, now], params![&session_id, title, channel_name, id, now],
)?; )?;
drop(conn); get_session_with_conn(&conn, &session_id)?
self.get_session(&session_id)?
.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into()) .ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
} }
@ -282,12 +307,12 @@ impl SessionStore {
chat_id: &str, chat_id: &str,
title: &str, title: &str,
) -> Result<SessionRecord, StorageError> { ) -> Result<SessionRecord, StorageError> {
if let Some(record) = self.get_session(session_id)? { let conn = self.pool.get()?;
if let Some(record) = get_session_with_conn(&conn, session_id)? {
return Ok(record); return Ok(record);
} }
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute( conn.execute(
" "
INSERT INTO sessions ( INSERT INTO sessions (
@ -298,28 +323,14 @@ impl SessionStore {
", ",
params![session_id, title, channel_name, chat_id, now], params![session_id, title, channel_name, chat_id, now],
)?; )?;
drop(conn);
self.get_session(session_id)? get_session_with_conn(&conn, session_id)?
.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into()) .ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
} }
pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>, StorageError> { pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let mut stmt = conn.prepare( get_session_with_conn(&conn, session_id)
"
SELECT id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at,
archived_at, deleted_at, message_count,
user_turn_count, agent_prompt_reinjection_count
FROM sessions
WHERE id = ?1 AND deleted_at IS NULL
",
)?;
stmt.query_row(params![session_id], map_session_record)
.optional()
.map_err(StorageError::from)
} }
/// Find sessions whose id ends with the given suffix (used for task session lookup) /// Find sessions whose id ends with the given suffix (used for task session lookup)
@ -327,7 +338,7 @@ impl SessionStore {
&self, &self,
suffix: &str, suffix: &str,
) -> Result<Vec<SessionRecord>, StorageError> { ) -> Result<Vec<SessionRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let pattern = format!("%{}", suffix); let pattern = format!("%{}", suffix);
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
" "
@ -354,7 +365,7 @@ impl SessionStore {
channel_name: &str, channel_name: &str,
include_archived: bool, include_archived: bool,
) -> Result<Vec<SessionRecord>, StorageError> { ) -> Result<Vec<SessionRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let mut sql = String::from( let mut sql = String::from(
" "
SELECT id, title, channel_name, chat_id, summary, SELECT id, title, channel_name, chat_id, summary,
@ -384,7 +395,7 @@ impl SessionStore {
pub fn rename_session(&self, session_id: &str, title: &str) -> Result<(), StorageError> { pub fn rename_session(&self, session_id: &str, title: &str) -> Result<(), StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
"UPDATE sessions SET title = ?2, updated_at = ?3 WHERE id = ?1 AND deleted_at IS NULL", "UPDATE sessions SET title = ?2, updated_at = ?3 WHERE id = ?1 AND deleted_at IS NULL",
params![session_id, title.trim(), now], params![session_id, title.trim(), now],
@ -394,7 +405,7 @@ impl SessionStore {
pub fn archive_session(&self, session_id: &str) -> Result<(), StorageError> { pub fn archive_session(&self, session_id: &str) -> Result<(), StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
"UPDATE sessions SET archived_at = ?2, updated_at = ?2 WHERE id = ?1 AND deleted_at IS NULL", "UPDATE sessions SET archived_at = ?2, updated_at = ?2 WHERE id = ?1 AND deleted_at IS NULL",
params![session_id, now], params![session_id, now],
@ -403,7 +414,7 @@ impl SessionStore {
} }
pub fn delete_session(&self, session_id: &str) -> Result<(), StorageError> { pub fn delete_session(&self, session_id: &str) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
"DELETE FROM messages WHERE session_id = ?1", "DELETE FROM messages WHERE session_id = ?1",
params![session_id], params![session_id],
@ -423,7 +434,7 @@ impl SessionStore {
let now = current_timestamp(); let now = current_timestamp();
let id = format!("topic:{}", uuid::Uuid::new_v4()); let id = format!("topic:{}", uuid::Uuid::new_v4());
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
"INSERT INTO topics (id, session_id, title, description, created_at, updated_at, last_active_at, message_count) VALUES (?1, ?2, ?3, ?4, ?5, ?5, ?5, 0)", "INSERT INTO topics (id, session_id, title, description, created_at, updated_at, last_active_at, message_count) VALUES (?1, ?2, ?3, ?4, ?5, ?5, ?5, 0)",
params![&id, session_id, title, description.unwrap_or(""), now], params![&id, session_id, title, description.unwrap_or(""), now],
@ -435,7 +446,7 @@ impl SessionStore {
} }
pub fn get_topic(&self, topic_id: &str) -> Result<Option<TopicRecord>, StorageError> { pub fn get_topic(&self, topic_id: &str) -> Result<Option<TopicRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
"SELECT id, session_id, title, description, created_at, updated_at, last_active_at, message_count FROM topics WHERE id = ?1", "SELECT id, session_id, title, description, created_at, updated_at, last_active_at, message_count FROM topics WHERE id = ?1",
)?; )?;
@ -457,7 +468,7 @@ impl SessionStore {
} }
pub fn list_topics(&self, session_id: &str) -> Result<Vec<TopicRecord>, StorageError> { pub fn list_topics(&self, session_id: &str) -> Result<Vec<TopicRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
"SELECT id, session_id, title, description, created_at, updated_at, last_active_at, message_count FROM topics WHERE session_id = ?1 ORDER BY last_active_at DESC" "SELECT id, session_id, title, description, created_at, updated_at, last_active_at, message_count FROM topics WHERE session_id = ?1 ORDER BY last_active_at DESC"
)?; )?;
@ -484,7 +495,7 @@ impl SessionStore {
pub fn update_topic_title(&self, topic_id: &str, title: &str) -> Result<(), StorageError> { pub fn update_topic_title(&self, topic_id: &str, title: &str) -> Result<(), StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
"UPDATE topics SET title = ?2, updated_at = ?3 WHERE id = ?1", "UPDATE topics SET title = ?2, updated_at = ?3 WHERE id = ?1",
params![topic_id, title.trim(), now], params![topic_id, title.trim(), now],
@ -494,7 +505,7 @@ impl SessionStore {
pub fn update_topic_description(&self, topic_id: &str, description: &str) -> Result<(), StorageError> { pub fn update_topic_description(&self, topic_id: &str, description: &str) -> Result<(), StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
"UPDATE topics SET description = ?2, updated_at = ?3 WHERE id = ?1", "UPDATE topics SET description = ?2, updated_at = ?3 WHERE id = ?1",
params![topic_id, description, now], params![topic_id, description, now],
@ -503,7 +514,7 @@ impl SessionStore {
} }
pub fn delete_topic(&self, topic_id: &str) -> Result<(), StorageError> { pub fn delete_topic(&self, topic_id: &str) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
// Messages 的 topic_id 会被设为 NULLON DELETE SET NULL // Messages 的 topic_id 会被设为 NULLON DELETE SET NULL
conn.execute("DELETE FROM topics WHERE id = ?1", params![topic_id])?; conn.execute("DELETE FROM topics WHERE id = ?1", params![topic_id])?;
Ok(()) Ok(())
@ -511,7 +522,7 @@ impl SessionStore {
pub fn touch_topic(&self, topic_id: &str) -> Result<(), StorageError> { pub fn touch_topic(&self, topic_id: &str) -> Result<(), StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
"UPDATE topics SET last_active_at = ?2 WHERE id = ?1", "UPDATE topics SET last_active_at = ?2 WHERE id = ?1",
params![topic_id, now], params![topic_id, now],
@ -521,7 +532,7 @@ impl SessionStore {
pub fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> { pub fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
"DELETE FROM messages WHERE session_id = ?1", "DELETE FROM messages WHERE session_id = ?1",
params![session_id], params![session_id],
@ -555,7 +566,7 @@ impl SessionStore {
topic_id: Option<&str>, topic_id: Option<&str>,
message: &ChatMessage, message: &ChatMessage,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let tx = conn.unchecked_transaction()?; let tx = conn.unchecked_transaction()?;
let seq: i64 = tx.query_row( let seq: i64 = tx.query_row(
@ -631,7 +642,7 @@ impl SessionStore {
return Ok(()); return Ok(());
} }
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let tx = conn.unchecked_transaction()?; let tx = conn.unchecked_transaction()?;
let mut seq: i64 = tx.query_row( let mut seq: i64 = tx.query_row(
@ -716,7 +727,7 @@ impl SessionStore {
summary_message: &ChatMessage, summary_message: &ChatMessage,
preserved_messages: &[ChatMessage], preserved_messages: &[ChatMessage],
) -> Result<bool, StorageError> { ) -> Result<bool, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let tx = conn.unchecked_transaction()?; let tx = conn.unchecked_transaction()?;
let current_max_seq: i64 = tx.query_row( let current_max_seq: i64 = tx.query_row(
@ -804,7 +815,7 @@ impl SessionStore {
pub fn mark_agent_prompt_reinjected(&self, session_id: &str) -> Result<(), StorageError> { pub fn mark_agent_prompt_reinjected(&self, session_id: &str) -> Result<(), StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
" "
UPDATE sessions UPDATE sessions
@ -826,7 +837,7 @@ impl SessionStore {
skill_name: Option<&str>, skill_name: Option<&str>,
payload: &serde_json::Value, payload: &serde_json::Value,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
" "
INSERT INTO skill_events ( INSERT INTO skill_events (
@ -849,7 +860,7 @@ impl SessionStore {
&self, &self,
session_id: Option<&str>, session_id: Option<&str>,
) -> Result<Vec<SkillEventRecord>, StorageError> { ) -> Result<Vec<SkillEventRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let sql = if session_id.is_some() { let sql = if session_id.is_some() {
" "
SELECT id, session_id, event_type, skill_name, payload_json, created_at SELECT id, session_id, event_type, skill_name, payload_json, created_at
@ -882,7 +893,7 @@ impl SessionStore {
pub fn put_memory(&self, input: &MemoryUpsert) -> Result<MemoryRecord, StorageError> { pub fn put_memory(&self, input: &MemoryUpsert) -> Result<MemoryRecord, StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let tx = conn.unchecked_transaction()?; let tx = conn.unchecked_transaction()?;
let existing: Option<(String, i64)> = tx let existing: Option<(String, i64)> = tx
@ -940,9 +951,9 @@ impl SessionStore {
)?; )?;
tx.commit()?; tx.commit()?;
drop(conn);
self.get_memory( get_memory_with_conn(
&conn,
&input.scope_kind, &input.scope_kind,
&input.scope_key, &input.scope_key,
&input.namespace, &input.namespace,
@ -958,23 +969,8 @@ impl SessionStore {
namespace: &str, namespace: &str,
memory_key: &str, memory_key: &str,
) -> Result<Option<MemoryRecord>, StorageError> { ) -> Result<Option<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let mut stmt = conn.prepare( get_memory_with_conn(&conn, scope_kind, scope_key, namespace, memory_key)
"
SELECT id, scope_kind, scope_key, namespace, memory_key, content,
source_type, source_session_id, source_message_id, source_message_seq,
source_channel_name, source_chat_id, created_at, updated_at
FROM memories
WHERE scope_kind = ?1 AND scope_key = ?2 AND namespace = ?3 AND memory_key = ?4
",
)?;
stmt.query_row(
params![scope_kind, scope_key, namespace, memory_key],
map_memory_record,
)
.optional()
.map_err(StorageError::from)
} }
pub fn list_memories( pub fn list_memories(
@ -984,7 +980,7 @@ impl SessionStore {
namespace: Option<&str>, namespace: Option<&str>,
limit: usize, limit: usize,
) -> Result<Vec<MemoryRecord>, StorageError> { ) -> Result<Vec<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let limit = limit.max(1) as i64; let limit = limit.max(1) as i64;
let mut memories = Vec::new(); let mut memories = Vec::new();
@ -1029,7 +1025,7 @@ impl SessionStore {
} }
pub fn list_memory_scope_keys(&self, scope_kind: &str) -> Result<Vec<String>, StorageError> { pub fn list_memory_scope_keys(&self, scope_kind: &str) -> Result<Vec<String>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
" "
SELECT DISTINCT scope_key SELECT DISTINCT scope_key
@ -1052,7 +1048,7 @@ impl SessionStore {
scope_kind: &str, scope_kind: &str,
scope_key: &str, scope_key: &str,
) -> Result<Vec<MemoryRecord>, StorageError> { ) -> Result<Vec<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
" "
SELECT id, scope_kind, scope_key, namespace, memory_key, content, SELECT id, scope_kind, scope_key, namespace, memory_key, content,
@ -1098,7 +1094,7 @@ impl SessionStore {
namespace: &str, namespace: &str,
memory_key: &str, memory_key: &str,
) -> Result<bool, StorageError> { ) -> Result<bool, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let changed = conn.execute( let changed = conn.execute(
" "
DELETE FROM memories DELETE FROM memories
@ -1114,7 +1110,7 @@ impl SessionStore {
input: &SchedulerJobUpsert, input: &SchedulerJobUpsert,
) -> Result<SchedulerJobRecord, StorageError> { ) -> Result<SchedulerJobRecord, StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
" "
INSERT INTO scheduler_jobs ( INSERT INTO scheduler_jobs (
@ -1163,9 +1159,7 @@ impl SessionStore {
now, now,
], ],
)?; )?;
drop(conn); get_scheduler_job_with_conn(&conn, &input.id)?
self.get_scheduler_job(&input.id)?
.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into()) .ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
} }
@ -1173,28 +1167,15 @@ impl SessionStore {
&self, &self,
job_id: &str, job_id: &str,
) -> Result<Option<SchedulerJobRecord>, StorageError> { ) -> Result<Option<SchedulerJobRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let mut stmt = conn.prepare( get_scheduler_job_with_conn(&conn, job_id)
"
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
FROM scheduler_jobs
WHERE id = ?1
",
)?;
stmt.query_row(params![job_id], map_scheduler_job_record)
.optional()
.map_err(StorageError::from)
} }
pub fn list_scheduler_jobs( pub fn list_scheduler_jobs(
&self, &self,
enabled_only: bool, enabled_only: bool,
) -> Result<Vec<SchedulerJobRecord>, StorageError> { ) -> Result<Vec<SchedulerJobRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let sql = if enabled_only { let sql = if enabled_only {
" "
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs, SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
@ -1226,7 +1207,7 @@ impl SessionStore {
} }
pub fn list_running_scheduler_jobs(&self) -> Result<Vec<SchedulerJobRecord>, StorageError> { pub fn list_running_scheduler_jobs(&self) -> Result<Vec<SchedulerJobRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let sql = " let sql = "
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs, SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error, target_json, payload_json, enabled, state, last_status, last_error,
@ -1247,7 +1228,7 @@ impl SessionStore {
} }
pub fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> { pub fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute("DELETE FROM scheduler_jobs WHERE id = ?1", params![job_id])?; conn.execute("DELETE FROM scheduler_jobs WHERE id = ?1", params![job_id])?;
Ok(()) Ok(())
} }
@ -1264,7 +1245,7 @@ impl SessionStore {
paused_at: Option<i64>, paused_at: Option<i64>,
completed_at: Option<i64>, completed_at: Option<i64>,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.execute( conn.execute(
" "
UPDATE scheduler_jobs UPDATE scheduler_jobs
@ -1303,7 +1284,7 @@ impl SessionStore {
namespace: Option<&str>, namespace: Option<&str>,
limit: usize, limit: usize,
) -> Result<Vec<MemoryRecord>, StorageError> { ) -> Result<Vec<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let limit = limit.max(1) as i64; let limit = limit.max(1) as i64;
let query = quote_fts_query(query); let query = quote_fts_query(query);
let mut memories = Vec::new(); let mut memories = Vec::new();
@ -1366,7 +1347,7 @@ impl SessionStore {
namespace: Option<&str>, namespace: Option<&str>,
limit: usize, limit: usize,
) -> Result<Vec<MemoryRecord>, StorageError> { ) -> Result<Vec<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let limit = limit.max(1) as i64; let limit = limit.max(1) as i64;
let query = quote_fts_or_query(queries); let query = quote_fts_or_query(queries);
if query.is_empty() { if query.is_empty() {
@ -1426,12 +1407,12 @@ impl SessionStore {
} }
pub fn load_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> { pub fn load_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
load_messages_after(&conn, session_id, 0) load_messages_after(&conn, session_id, 0)
} }
pub fn load_messages_for_topic(&self, topic_id: &str) -> Result<Vec<ChatMessage>, StorageError> { pub fn load_messages_for_topic(&self, topic_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
let mut stmt = conn.prepare( let mut stmt = conn.prepare(
" "
SELECT id, role, content, system_context, reasoning_content, media_refs_json, created_at, tool_call_id, tool_name, tool_calls_json, tool_duration_ms SELECT id, role, content, system_context, reasoning_content, media_refs_json, created_at, tool_call_id, tool_name, tool_calls_json, tool_duration_ms
@ -1493,12 +1474,12 @@ impl SessionStore {
} }
pub fn load_all_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> { pub fn load_all_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
load_messages_after(&conn, session_id, 0) load_messages_after(&conn, session_id, 0)
} }
pub fn count_active_user_messages(&self, session_id: &str) -> Result<i64, StorageError> { pub fn count_active_user_messages(&self, session_id: &str) -> Result<i64, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.pool.get()?;
conn.query_row( conn.query_row(
" "
SELECT COUNT(*) SELECT COUNT(*)
@ -1526,6 +1507,69 @@ fn default_session_db_path() -> Result<PathBuf, std::io::Error> {
Ok(home.join(".picobot").join("storage").join("sessions.db")) Ok(home.join(".picobot").join("storage").join("sessions.db"))
} }
/// 使用已有连接查询 session避免从池中重复借用
fn get_session_with_conn(conn: &Connection, session_id: &str) -> Result<Option<SessionRecord>, StorageError> {
let mut stmt = conn.prepare(
"
SELECT id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at,
archived_at, deleted_at, message_count,
user_turn_count, agent_prompt_reinjection_count
FROM sessions
WHERE id = ?1 AND deleted_at IS NULL
",
)?;
stmt.query_row(params![session_id], map_session_record)
.optional()
.map_err(StorageError::from)
}
fn get_memory_with_conn(
conn: &Connection,
scope_kind: &str,
scope_key: &str,
namespace: &str,
memory_key: &str,
) -> Result<Option<MemoryRecord>, StorageError> {
let mut stmt = conn.prepare(
"
SELECT id, scope_kind, scope_key, namespace, memory_key, content,
source_type, source_session_id, source_message_id, source_message_seq,
source_channel_name, source_chat_id, created_at, updated_at
FROM memories
WHERE scope_kind = ?1 AND scope_key = ?2 AND namespace = ?3 AND memory_key = ?4
",
)?;
stmt.query_row(
params![scope_kind, scope_key, namespace, memory_key],
map_memory_record,
)
.optional()
.map_err(StorageError::from)
}
fn get_scheduler_job_with_conn(
conn: &Connection,
job_id: &str,
) -> Result<Option<SchedulerJobRecord>, StorageError> {
let mut stmt = conn.prepare(
"
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
FROM scheduler_jobs
WHERE id = ?1
",
)?;
stmt.query_row(params![job_id], map_scheduler_job_record)
.optional()
.map_err(StorageError::from)
}
fn map_session_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> { fn map_session_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
Ok(SessionRecord { Ok(SessionRecord {
id: row.get(0)?, id: row.get(0)?,
@ -2117,7 +2161,8 @@ mod tests {
#[test] #[test]
fn test_schema_migration_adds_user_turn_and_reinjection_columns() { fn test_schema_migration_adds_user_turn_and_reinjection_columns() {
let conn = Connection::open_in_memory().unwrap(); let tmp = std::env::temp_dir().join(format!("picobot_test_mig2_{}.db", uuid::Uuid::new_v4()));
let conn = Connection::open(&tmp).unwrap();
conn.execute_batch( conn.execute_batch(
" "
CREATE TABLE sessions ( CREATE TABLE sessions (
@ -2152,7 +2197,8 @@ mod tests {
) )
.unwrap(); .unwrap();
let store = SessionStore::from_connection(conn).unwrap(); let path_str = tmp.to_string_lossy().to_string();
let store = SessionStore::from_connection(conn, &path_str).unwrap();
let session = store.create_cli_session(Some("migrated")).unwrap(); let session = store.create_cli_session(Some("migrated")).unwrap();
assert_eq!(session.user_turn_count, 0); assert_eq!(session.user_turn_count, 0);
assert_eq!(session.agent_prompt_reinjection_count, 0); assert_eq!(session.agent_prompt_reinjection_count, 0);
@ -2160,7 +2206,9 @@ mod tests {
#[test] #[test]
fn test_schema_migration_adds_reasoning_content_column_to_messages() { fn test_schema_migration_adds_reasoning_content_column_to_messages() {
let conn = Connection::open_in_memory().unwrap(); let tmp = std::env::temp_dir()
.join(format!("picobot_test_mig_{}.db", uuid::Uuid::new_v4()));
let conn = Connection::open(&tmp).unwrap();
conn.execute_batch( conn.execute_batch(
" "
CREATE TABLE sessions ( CREATE TABLE sessions (
@ -2195,8 +2243,9 @@ mod tests {
) )
.unwrap(); .unwrap();
let _store = SessionStore::from_connection(conn).unwrap(); let path_str = tmp.to_string_lossy().to_string();
let conn = _store.conn.lock().unwrap(); let _store = SessionStore::from_connection(conn, &path_str).unwrap();
let conn = _store.pool.get().unwrap();
assert!(has_column(&conn, "messages", "reasoning_content").unwrap()); assert!(has_column(&conn, "messages", "reasoning_content").unwrap());
} }

View File

@ -647,8 +647,9 @@ export function MessageBubble({ message, onNavigateToSubAgent }: MessageBubblePr
// 段落 // 段落
p: ({ children }) => <p className="mb-2 last:mb-0">{children}</p>, p: ({ children }) => <p className="mb-2 last:mb-0">{children}</p>,
// 列表 // 列表
ul: ({ children }) => <ul className="list-disc list-inside mb-2 space-y-1">{children}</ul>, ul: ({ children }) => <ul className="list-disc list-outside mb-2 space-y-1 pl-5">{children}</ul>,
ol: ({ children }) => <ol className="list-decimal list-inside mb-2 space-y-1">{children}</ol>, ol: ({ children }) => <ol className="list-decimal list-outside mb-2 space-y-1 pl-5">{children}</ol>,
li: ({ children }) => <li className="[&>p]:m-0">{children}</li>,
// 链接 // 链接
a: ({ href, children }) => ( a: ({ href, children }) => (
<a <a

View File

@ -265,6 +265,15 @@ body {
.typing-indicator span:nth-child(2) { animation-delay: 0.2s; } .typing-indicator span:nth-child(2) { animation-delay: 0.2s; }
.typing-indicator span:nth-child(3) { animation-delay: 0.4s; } .typing-indicator span:nth-child(3) { animation-delay: 0.4s; }
/* ============================================
Markdown list kill extra spacing from loose-list <p> wrappers
list-inside + block <p> pushes marker and content apart.
Use list-outside so the marker sits outside the <p> flow.
============================================ */
.markdown-content li > p {
margin: 0;
}
/* ============================================ /* ============================================
Code block styling Code block styling
============================================ */ ============================================ */