2413 lines
85 KiB
Rust

#[cfg(not(test))]
use std::path::{Path, PathBuf};
use std::sync::{Arc, Mutex};
use rusqlite::{Connection, OptionalExtension, params};
use crate::bus::ChatMessage;
pub mod error;
pub mod records;
pub use error::StorageError;
pub use records::{
MemoryRecord, MemoryUpsert, SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus,
SchedulerJobUpsert, SessionRecord, SkillEventRecord,
};
#[derive(Clone)]
pub struct SessionStore {
conn: Arc<Mutex<Connection>>,
}
impl SessionStore {
#[cfg(test)]
pub fn new() -> Result<Self, StorageError> {
Self::from_connection(Connection::open_in_memory()?)
}
#[cfg(not(test))]
pub fn new() -> Result<Self, StorageError> {
let db_path = default_session_db_path()?;
Self::open_at_path(&db_path)
}
#[cfg(not(test))]
fn open_at_path(path: &Path) -> Result<Self, StorageError> {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let conn = Connection::open(path)?;
Self::from_connection(conn)
}
fn from_connection(conn: Connection) -> Result<Self, StorageError> {
conn.busy_timeout(std::time::Duration::from_secs(5))?;
conn.execute_batch(
"
PRAGMA journal_mode = WAL;
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
channel_name TEXT NOT NULL,
chat_id TEXT NOT NULL,
summary TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_active_at INTEGER NOT NULL,
archived_at INTEGER,
deleted_at INTEGER,
message_count INTEGER NOT NULL DEFAULT 0,
reset_cutoff_seq INTEGER NOT NULL DEFAULT 0,
user_turn_count INTEGER NOT NULL DEFAULT 0,
agent_prompt_reinjection_count INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_sessions_channel_archived
ON sessions(channel_name, archived_at, last_active_at DESC);
CREATE INDEX IF NOT EXISTS idx_sessions_updated_at
ON sessions(updated_at DESC);
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
seq INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
system_context TEXT,
reasoning_content TEXT,
media_refs_json TEXT NOT NULL,
tool_call_id TEXT,
tool_name TEXT,
tool_calls_json TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE,
UNIQUE(session_id, seq)
);
CREATE INDEX IF NOT EXISTS idx_messages_session_seq
ON messages(session_id, seq);
CREATE INDEX IF NOT EXISTS idx_messages_session_created
ON messages(session_id, created_at);
CREATE TABLE IF NOT EXISTS skill_events (
id TEXT PRIMARY KEY,
session_id TEXT,
event_type TEXT NOT NULL,
skill_name TEXT,
payload_json TEXT NOT NULL,
created_at INTEGER NOT NULL,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_skill_events_session_created
ON skill_events(session_id, created_at DESC);
CREATE INDEX IF NOT EXISTS idx_skill_events_type_created
ON skill_events(event_type, created_at DESC);
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
scope_kind TEXT NOT NULL,
scope_key TEXT NOT NULL,
namespace TEXT NOT NULL,
memory_key TEXT NOT NULL,
content TEXT NOT NULL,
source_type TEXT NOT NULL,
source_session_id TEXT,
source_message_id TEXT,
source_message_seq INTEGER,
source_channel_name TEXT,
source_chat_id TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
UNIQUE(scope_kind, scope_key, namespace, memory_key)
);
CREATE INDEX IF NOT EXISTS idx_memories_scope_updated
ON memories(scope_kind, scope_key, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_memories_scope_namespace_updated
ON memories(scope_kind, scope_key, namespace, updated_at DESC);
CREATE INDEX IF NOT EXISTS idx_memories_source_session
ON memories(source_session_id, updated_at DESC);
CREATE TABLE IF NOT EXISTS scheduler_jobs (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
schedule_json TEXT NOT NULL DEFAULT '{}',
interval_secs INTEGER NOT NULL DEFAULT 0,
startup_delay_secs INTEGER NOT NULL DEFAULT 0,
target_json TEXT NOT NULL,
payload_json TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
state TEXT NOT NULL DEFAULT 'scheduled',
last_status TEXT,
last_error TEXT,
run_count INTEGER NOT NULL DEFAULT 0,
max_runs INTEGER,
last_fired_at INTEGER,
next_fire_at INTEGER,
paused_at INTEGER,
completed_at INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_scheduler_jobs_enabled_next_fire
ON scheduler_jobs(enabled, state, next_fire_at ASC);
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
namespace,
memory_key,
content,
content='memories',
content_rowid='rowid'
);
CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
INSERT INTO memories_fts(rowid, namespace, memory_key, content)
VALUES (new.rowid, new.namespace, new.memory_key, new.content);
END;
CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, namespace, memory_key, content)
VALUES ('delete', old.rowid, old.namespace, old.memory_key, old.content);
END;
CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, namespace, memory_key, content)
VALUES ('delete', old.rowid, old.namespace, old.memory_key, old.content);
INSERT INTO memories_fts(rowid, namespace, memory_key, content)
VALUES (new.rowid, new.namespace, new.memory_key, new.content);
END;
",
)?;
ensure_sessions_schema(&conn)?;
ensure_messages_schema(&conn)?;
ensure_scheduler_schema(&conn)?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
})
}
#[cfg(test)]
pub(crate) fn in_memory() -> Result<Self, StorageError> {
Self::from_connection(Connection::open_in_memory()?)
}
pub fn create_cli_session(&self, title: Option<&str>) -> Result<SessionRecord, StorageError> {
let now = current_timestamp();
let id = uuid::Uuid::new_v4().to_string();
let title = title
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.unwrap_or_else(|| format!("CLI Session {}", &id[..8]));
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"
INSERT INTO sessions (
id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at, archived_at, deleted_at, message_count,
reset_cutoff_seq, user_turn_count, agent_prompt_reinjection_count
) VALUES (?1, ?2, 'cli', ?3, NULL, ?4, ?4, ?4, NULL, NULL, 0, 0, 0, 0)
",
params![id, title, id, now],
)?;
drop(conn);
self.get_session(&id)?
.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
}
pub fn ensure_channel_session(
&self,
channel_name: &str,
chat_id: &str,
) -> Result<SessionRecord, StorageError> {
let session_id = persistent_session_id(channel_name, chat_id);
if let Some(record) = self.get_session(&session_id)? {
return Ok(record);
}
let now = current_timestamp();
let title = format!("{}:{}", channel_name, chat_id);
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"
INSERT INTO sessions (
id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at, archived_at, deleted_at, message_count,
reset_cutoff_seq, user_turn_count, agent_prompt_reinjection_count
) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0, 0)
",
params![session_id, title, channel_name, chat_id, now],
)?;
drop(conn);
self.get_session(&session_id)?
.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
}
pub fn get_session(&self, session_id: &str) -> Result<Option<SessionRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at,
archived_at, deleted_at, message_count, reset_cutoff_seq,
user_turn_count, agent_prompt_reinjection_count
FROM sessions
WHERE id = ?1 AND deleted_at IS NULL
",
)?;
stmt.query_row(params![session_id], map_session_record)
.optional()
.map_err(StorageError::from)
}
pub fn list_sessions(
&self,
channel_name: &str,
include_archived: bool,
) -> Result<Vec<SessionRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut sql = String::from(
"
SELECT id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at,
archived_at, deleted_at, message_count, reset_cutoff_seq,
user_turn_count, agent_prompt_reinjection_count
FROM sessions
WHERE channel_name = ?1
AND deleted_at IS NULL
",
);
if !include_archived {
sql.push_str(" AND archived_at IS NULL");
}
sql.push_str(" ORDER BY last_active_at DESC, created_at DESC");
let mut stmt = conn.prepare(&sql)?;
let rows = stmt.query_map(params![channel_name], map_session_record)?;
let mut sessions = Vec::new();
for row in rows {
sessions.push(row?);
}
Ok(sessions)
}
pub fn rename_session(&self, session_id: &str, title: &str) -> Result<(), StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"UPDATE sessions SET title = ?2, updated_at = ?3 WHERE id = ?1 AND deleted_at IS NULL",
params![session_id, title.trim(), now],
)?;
Ok(())
}
pub fn archive_session(&self, session_id: &str) -> Result<(), StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"UPDATE sessions SET archived_at = ?2, updated_at = ?2 WHERE id = ?1 AND deleted_at IS NULL",
params![session_id, now],
)?;
Ok(())
}
pub fn delete_session(&self, session_id: &str) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"DELETE FROM messages WHERE session_id = ?1",
params![session_id],
)?;
conn.execute("DELETE FROM sessions WHERE id = ?1", params![session_id])?;
Ok(())
}
pub fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"DELETE FROM messages WHERE session_id = ?1",
params![session_id],
)?;
conn.execute(
"
UPDATE sessions
SET message_count = 0,
updated_at = ?2,
last_active_at = ?2,
reset_cutoff_seq = 0,
user_turn_count = 0,
agent_prompt_reinjection_count = 0
WHERE id = ?1 AND deleted_at IS NULL
",
params![session_id, now],
)?;
Ok(())
}
pub fn reset_session(&self, session_id: &str) -> Result<(), StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
let tx = conn.unchecked_transaction()?;
let cutoff_seq: i64 = tx.query_row(
"SELECT COALESCE(MAX(seq), 0) FROM messages WHERE session_id = ?1",
params![session_id],
|row| row.get(0),
)?;
tx.execute(
"
UPDATE sessions
SET reset_cutoff_seq = ?2,
updated_at = ?3,
last_active_at = ?3,
archived_at = NULL,
user_turn_count = 0,
agent_prompt_reinjection_count = 0
WHERE id = ?1 AND deleted_at IS NULL
",
params![session_id, cutoff_seq, now],
)?;
tx.commit()?;
Ok(())
}
pub fn append_message(
&self,
session_id: &str,
message: &ChatMessage,
) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let tx = conn.unchecked_transaction()?;
let seq: i64 = tx.query_row(
"SELECT COALESCE(MAX(seq), 0) + 1 FROM messages WHERE session_id = ?1",
params![session_id],
|row| row.get(0),
)?;
let media_refs_json = serde_json::to_string(&message.media_refs)?;
let tool_calls_json = message
.tool_calls
.as_ref()
.map(serde_json::to_string)
.transpose()?;
tx.execute(
"
INSERT INTO messages (
id, session_id, seq, role, content,
system_context, reasoning_content, media_refs_json, tool_call_id, tool_name, tool_calls_json, created_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
",
params![
message.id,
session_id,
seq,
message.role,
message.content,
message.system_context,
message.reasoning_content,
media_refs_json,
message.tool_call_id,
message.tool_name,
tool_calls_json,
message.timestamp,
],
)?;
let now = current_timestamp();
let is_user_message = message.role == "user";
tx.execute(
"
UPDATE sessions
SET message_count = message_count + 1,
user_turn_count = user_turn_count + ?3,
updated_at = ?2,
last_active_at = ?2,
archived_at = NULL
WHERE id = ?1 AND deleted_at IS NULL
",
params![session_id, now, if is_user_message { 1 } else { 0 }],
)?;
tx.commit()?;
Ok(())
}
pub fn compact_active_history(
&self,
session_id: &str,
expected_reset_cutoff_seq: i64,
snapshot_end_seq: i64,
preserved_system_messages: &[ChatMessage],
summary_message: &ChatMessage,
preserved_messages: &[ChatMessage],
) -> Result<bool, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let tx = conn.unchecked_transaction()?;
let current_cutoff = active_reset_cutoff(&tx, session_id)?;
if current_cutoff != expected_reset_cutoff_seq {
return Ok(false);
}
let current_max_seq: i64 = tx.query_row(
"SELECT COALESCE(MAX(seq), 0) FROM messages WHERE session_id = ?1",
params![session_id],
|row| row.get(0),
)?;
if snapshot_end_seq <= current_cutoff || snapshot_end_seq > current_max_seq {
return Ok(false);
}
let delta_messages =
load_messages_between(&tx, session_id, snapshot_end_seq, current_max_seq)?;
let mut next_seq = current_max_seq + 1;
let now = current_timestamp();
let mut inserted_count = 0_i64;
let mut active_user_turn_count = 0_i64;
for message in preserved_system_messages {
let copied = clone_message_for_compaction(message, message.timestamp);
insert_message_with_seq(&tx, session_id, next_seq, &copied)?;
next_seq += 1;
inserted_count += 1;
}
let summary_copy = clone_message_for_compaction(summary_message, now);
insert_message_with_seq(&tx, session_id, next_seq, &summary_copy)?;
next_seq += 1;
inserted_count += 1;
for message in preserved_messages.iter().chain(delta_messages.iter()) {
let copied = clone_message_for_compaction(message, message.timestamp);
if copied.role == "user" {
active_user_turn_count += 1;
}
insert_message_with_seq(&tx, session_id, next_seq, &copied)?;
next_seq += 1;
inserted_count += 1;
}
tx.execute(
"
UPDATE sessions
SET reset_cutoff_seq = ?2,
message_count = message_count + ?3,
user_turn_count = ?4,
updated_at = ?5,
last_active_at = ?5,
archived_at = NULL
WHERE id = ?1 AND deleted_at IS NULL
",
params![
session_id,
current_max_seq,
inserted_count,
active_user_turn_count,
now,
],
)?;
tx.commit()?;
Ok(true)
}
pub fn mark_agent_prompt_reinjected(&self, session_id: &str) -> Result<(), StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"
UPDATE sessions
SET agent_prompt_reinjection_count = agent_prompt_reinjection_count + 1,
updated_at = ?2,
last_active_at = ?2,
archived_at = NULL
WHERE id = ?1 AND deleted_at IS NULL
",
params![session_id, now],
)?;
Ok(())
}
pub fn append_skill_event(
&self,
session_id: Option<&str>,
event_type: &str,
skill_name: Option<&str>,
payload: &serde_json::Value,
) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"
INSERT INTO skill_events (
id, session_id, event_type, skill_name, payload_json, created_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6)
",
params![
uuid::Uuid::new_v4().to_string(),
session_id,
event_type,
skill_name,
serde_json::to_string(payload)?,
current_timestamp(),
],
)?;
Ok(())
}
pub fn list_skill_events(
&self,
session_id: Option<&str>,
) -> Result<Vec<SkillEventRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let sql = if session_id.is_some() {
"
SELECT id, session_id, event_type, skill_name, payload_json, created_at
FROM skill_events
WHERE session_id = ?1
ORDER BY created_at ASC
"
} else {
"
SELECT id, session_id, event_type, skill_name, payload_json, created_at
FROM skill_events
WHERE session_id IS NULL
ORDER BY created_at ASC
"
};
let mut stmt = conn.prepare(sql)?;
let rows = if let Some(session_id) = session_id {
stmt.query_map(params![session_id], map_skill_event_record)?
} else {
stmt.query_map([], map_skill_event_record)?
};
let mut events = Vec::new();
for row in rows {
events.push(row?);
}
Ok(events)
}
pub fn put_memory(&self, input: &MemoryUpsert) -> Result<MemoryRecord, StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
let tx = conn.unchecked_transaction()?;
let existing: Option<(String, i64)> = tx
.query_row(
"
SELECT id, created_at
FROM memories
WHERE scope_kind = ?1 AND scope_key = ?2 AND namespace = ?3 AND memory_key = ?4
",
params![
input.scope_kind,
input.scope_key,
input.namespace,
input.memory_key,
],
|row| Ok((row.get(0)?, row.get(1)?)),
)
.optional()?;
let (id, created_at) = existing.unwrap_or_else(|| (uuid::Uuid::new_v4().to_string(), now));
tx.execute(
"
INSERT INTO memories (
id, scope_kind, scope_key, namespace, memory_key, content,
source_type, source_session_id, source_message_id, source_message_seq,
source_channel_name, source_chat_id, created_at, updated_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14)
ON CONFLICT(scope_kind, scope_key, namespace, memory_key) DO UPDATE SET
content = excluded.content,
source_type = excluded.source_type,
source_session_id = excluded.source_session_id,
source_message_id = excluded.source_message_id,
source_message_seq = excluded.source_message_seq,
source_channel_name = excluded.source_channel_name,
source_chat_id = excluded.source_chat_id,
updated_at = excluded.updated_at
",
params![
id,
input.scope_kind,
input.scope_key,
input.namespace,
input.memory_key,
input.content,
input.source_type,
input.source_session_id,
input.source_message_id,
input.source_message_seq,
input.source_channel_name,
input.source_chat_id,
created_at,
now,
],
)?;
tx.commit()?;
drop(conn);
self.get_memory(
&input.scope_kind,
&input.scope_key,
&input.namespace,
&input.memory_key,
)?
.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
}
pub fn get_memory(
&self,
scope_kind: &str,
scope_key: &str,
namespace: &str,
memory_key: &str,
) -> Result<Option<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT id, scope_kind, scope_key, namespace, memory_key, content,
source_type, source_session_id, source_message_id, source_message_seq,
source_channel_name, source_chat_id, created_at, updated_at
FROM memories
WHERE scope_kind = ?1 AND scope_key = ?2 AND namespace = ?3 AND memory_key = ?4
",
)?;
stmt.query_row(
params![scope_kind, scope_key, namespace, memory_key],
map_memory_record,
)
.optional()
.map_err(StorageError::from)
}
pub fn list_memories(
&self,
scope_kind: &str,
scope_key: &str,
namespace: Option<&str>,
limit: usize,
) -> Result<Vec<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let limit = limit.max(1) as i64;
let mut memories = Vec::new();
if let Some(namespace) = namespace {
let mut stmt = conn.prepare(
"
SELECT id, scope_kind, scope_key, namespace, memory_key, content,
source_type, source_session_id, source_message_id, source_message_seq,
source_channel_name, source_chat_id, created_at, updated_at
FROM memories
WHERE scope_kind = ?1 AND scope_key = ?2 AND namespace = ?3
ORDER BY updated_at DESC, created_at DESC
LIMIT ?4
",
)?;
let rows = stmt.query_map(
params![scope_kind, scope_key, namespace, limit],
map_memory_record,
)?;
for row in rows {
memories.push(row?);
}
} else {
let mut stmt = conn.prepare(
"
SELECT id, scope_kind, scope_key, namespace, memory_key, content,
source_type, source_session_id, source_message_id, source_message_seq,
source_channel_name, source_chat_id, created_at, updated_at
FROM memories
WHERE scope_kind = ?1 AND scope_key = ?2
ORDER BY updated_at DESC, created_at DESC
LIMIT ?3
",
)?;
let rows = stmt.query_map(params![scope_kind, scope_key, limit], map_memory_record)?;
for row in rows {
memories.push(row?);
}
}
Ok(memories)
}
pub fn list_memory_scope_keys(&self, scope_kind: &str) -> Result<Vec<String>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT DISTINCT scope_key
FROM memories
WHERE scope_kind = ?1
ORDER BY scope_key ASC
",
)?;
let rows = stmt.query_map(params![scope_kind], |row| row.get::<_, String>(0))?;
let mut scope_keys = Vec::new();
for row in rows {
scope_keys.push(row?);
}
Ok(scope_keys)
}
pub fn list_memory_scope_keys_updated_since(
&self,
scope_kind: &str,
since_timestamp: i64,
) -> Result<Vec<String>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT DISTINCT scope_key
FROM memories
WHERE scope_kind = ?1 AND updated_at > ?2
ORDER BY scope_key ASC
",
)?;
let rows = stmt.query_map(params![scope_kind, since_timestamp], |row| {
row.get::<_, String>(0)
})?;
let mut scope_keys = Vec::new();
for row in rows {
scope_keys.push(row?);
}
Ok(scope_keys)
}
pub fn list_memories_for_scope(
&self,
scope_kind: &str,
scope_key: &str,
) -> Result<Vec<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT id, scope_kind, scope_key, namespace, memory_key, content,
source_type, source_session_id, source_message_id, source_message_seq,
source_channel_name, source_chat_id, created_at, updated_at
FROM memories
WHERE scope_kind = ?1 AND scope_key = ?2
ORDER BY updated_at DESC, namespace ASC, memory_key ASC
",
)?;
let rows = stmt.query_map(params![scope_kind, scope_key], map_memory_record)?;
let mut memories = Vec::new();
for row in rows {
memories.push(row?);
}
Ok(memories)
}
pub fn update_memory(
&self,
input: &MemoryUpsert,
) -> Result<Option<MemoryRecord>, StorageError> {
if self
.get_memory(
&input.scope_kind,
&input.scope_key,
&input.namespace,
&input.memory_key,
)?
.is_none()
{
return Ok(None);
}
self.put_memory(input).map(Some)
}
pub fn delete_memory(
&self,
scope_kind: &str,
scope_key: &str,
namespace: &str,
memory_key: &str,
) -> Result<bool, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let changed = conn.execute(
"
DELETE FROM memories
WHERE scope_kind = ?1 AND scope_key = ?2 AND namespace = ?3 AND memory_key = ?4
",
params![scope_kind, scope_key, namespace, memory_key],
)?;
Ok(changed > 0)
}
pub fn upsert_scheduler_job(
&self,
input: &SchedulerJobUpsert,
) -> Result<SchedulerJobRecord, StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"
INSERT INTO scheduler_jobs (
id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?18)
ON CONFLICT(id) DO UPDATE SET
kind = excluded.kind,
schedule_json = excluded.schedule_json,
interval_secs = excluded.interval_secs,
startup_delay_secs = excluded.startup_delay_secs,
target_json = excluded.target_json,
payload_json = excluded.payload_json,
enabled = excluded.enabled,
state = excluded.state,
last_status = excluded.last_status,
last_error = excluded.last_error,
run_count = excluded.run_count,
max_runs = excluded.max_runs,
last_fired_at = excluded.last_fired_at,
next_fire_at = excluded.next_fire_at,
paused_at = excluded.paused_at,
completed_at = excluded.completed_at,
updated_at = excluded.updated_at
",
params![
input.id,
input.kind,
serde_json::to_string(&input.schedule)?,
input.interval_secs,
input.startup_delay_secs,
serde_json::to_string(&input.target)?,
serde_json::to_string(&input.payload)?,
if input.enabled { 1 } else { 0 },
input.state.as_str(),
input.last_status.as_ref().map(SchedulerJobStatus::as_str),
input.last_error,
input.run_count,
input.max_runs,
input.last_fired_at,
input.next_fire_at,
input.paused_at,
input.completed_at,
now,
],
)?;
drop(conn);
self.get_scheduler_job(&input.id)?
.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
}
pub fn get_scheduler_job(
&self,
job_id: &str,
) -> Result<Option<SchedulerJobRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
FROM scheduler_jobs
WHERE id = ?1
",
)?;
stmt.query_row(params![job_id], map_scheduler_job_record)
.optional()
.map_err(StorageError::from)
}
pub fn list_scheduler_jobs(
&self,
enabled_only: bool,
) -> Result<Vec<SchedulerJobRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let sql = if enabled_only {
"
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
FROM scheduler_jobs
WHERE enabled = 1
ORDER BY COALESCE(next_fire_at, created_at) ASC, id ASC
"
} else {
"
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
FROM scheduler_jobs
ORDER BY COALESCE(next_fire_at, created_at) ASC, id ASC
"
};
let mut stmt = conn.prepare(sql)?;
let rows = stmt.query_map([], map_scheduler_job_record)?;
let mut jobs = Vec::new();
for row in rows {
jobs.push(row?);
}
Ok(jobs)
}
pub fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute("DELETE FROM scheduler_jobs WHERE id = ?1", params![job_id])?;
Ok(())
}
pub fn update_scheduler_job_runtime(
&self,
job_id: &str,
state: SchedulerJobState,
last_status: Option<SchedulerJobStatus>,
last_error: Option<&str>,
run_count: i64,
last_fired_at: Option<i64>,
next_fire_at: Option<i64>,
paused_at: Option<i64>,
completed_at: Option<i64>,
) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"
UPDATE scheduler_jobs
SET state = ?2,
last_status = ?3,
last_error = ?4,
run_count = ?5,
last_fired_at = ?6,
next_fire_at = ?7,
paused_at = ?8,
completed_at = ?9,
updated_at = ?10
WHERE id = ?1
",
params![
job_id,
state.as_str(),
last_status.as_ref().map(SchedulerJobStatus::as_str),
last_error,
run_count,
last_fired_at,
next_fire_at,
paused_at,
completed_at,
current_timestamp(),
],
)?;
Ok(())
}
pub fn search_memories(
&self,
scope_kind: &str,
scope_key: &str,
query: &str,
namespace: Option<&str>,
limit: usize,
) -> Result<Vec<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let limit = limit.max(1) as i64;
let query = quote_fts_query(query);
let mut memories = Vec::new();
if let Some(namespace) = namespace {
let mut stmt = conn.prepare(
"
SELECT m.id, m.scope_kind, m.scope_key, m.namespace, m.memory_key, m.content,
m.source_type, m.source_session_id, m.source_message_id, m.source_message_seq,
m.source_channel_name, m.source_chat_id, m.created_at, m.updated_at
FROM memories_fts f
JOIN memories m ON m.rowid = f.rowid
WHERE memories_fts MATCH ?1
AND m.scope_kind = ?2
AND m.scope_key = ?3
AND m.namespace = ?4
ORDER BY bm25(memories_fts), m.updated_at DESC
LIMIT ?5
",
)?;
let rows = stmt.query_map(
params![query, scope_kind, scope_key, namespace, limit],
map_memory_record,
)?;
for row in rows {
memories.push(row?);
}
} else {
let mut stmt = conn.prepare(
"
SELECT m.id, m.scope_kind, m.scope_key, m.namespace, m.memory_key, m.content,
m.source_type, m.source_session_id, m.source_message_id, m.source_message_seq,
m.source_channel_name, m.source_chat_id, m.created_at, m.updated_at
FROM memories_fts f
JOIN memories m ON m.rowid = f.rowid
WHERE memories_fts MATCH ?1
AND m.scope_kind = ?2
AND m.scope_key = ?3
ORDER BY bm25(memories_fts), m.updated_at DESC
LIMIT ?4
",
)?;
let rows = stmt.query_map(
params![query, scope_kind, scope_key, limit],
map_memory_record,
)?;
for row in rows {
memories.push(row?);
}
}
Ok(memories)
}
pub fn search_memories_any(
&self,
scope_kind: &str,
scope_key: &str,
queries: &[String],
namespace: Option<&str>,
limit: usize,
) -> Result<Vec<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let limit = limit.max(1) as i64;
let query = quote_fts_or_query(queries);
if query.is_empty() {
return Ok(Vec::new());
}
let mut memories = Vec::new();
if let Some(namespace) = namespace {
let mut stmt = conn.prepare(
"
SELECT m.id, m.scope_kind, m.scope_key, m.namespace, m.memory_key, m.content,
m.source_type, m.source_session_id, m.source_message_id, m.source_message_seq,
m.source_channel_name, m.source_chat_id, m.created_at, m.updated_at
FROM memories_fts f
JOIN memories m ON m.rowid = f.rowid
WHERE memories_fts MATCH ?1
AND m.scope_kind = ?2
AND m.scope_key = ?3
AND m.namespace = ?4
ORDER BY bm25(memories_fts), m.updated_at DESC
LIMIT ?5
",
)?;
let rows = stmt.query_map(
params![query, scope_kind, scope_key, namespace, limit],
map_memory_record,
)?;
for row in rows {
memories.push(row?);
}
} else {
let mut stmt = conn.prepare(
"
SELECT m.id, m.scope_kind, m.scope_key, m.namespace, m.memory_key, m.content,
m.source_type, m.source_session_id, m.source_message_id, m.source_message_seq,
m.source_channel_name, m.source_chat_id, m.created_at, m.updated_at
FROM memories_fts f
JOIN memories m ON m.rowid = f.rowid
WHERE memories_fts MATCH ?1
AND m.scope_kind = ?2
AND m.scope_key = ?3
ORDER BY bm25(memories_fts), m.updated_at DESC
LIMIT ?4
",
)?;
let rows = stmt.query_map(
params![query, scope_kind, scope_key, limit],
map_memory_record,
)?;
for row in rows {
memories.push(row?);
}
}
Ok(memories)
}
pub fn load_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let cutoff_seq = active_reset_cutoff(&conn, session_id)?;
load_messages_after(&conn, session_id, cutoff_seq)
}
pub fn load_all_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
load_messages_after(&conn, session_id, 0)
}
pub fn count_active_user_messages(&self, session_id: &str) -> Result<i64, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let cutoff_seq = active_reset_cutoff(&conn, session_id)?;
conn.query_row(
"
SELECT COUNT(*)
FROM messages
WHERE session_id = ?1 AND seq > ?2 AND role = 'user'
",
params![session_id, cutoff_seq],
|row| row.get(0),
)
.map_err(StorageError::from)
}
}
pub fn persistent_session_id(channel_name: &str, chat_id: &str) -> String {
if channel_name == "cli" {
chat_id.to_string()
} else {
format!("{}:{}", channel_name, chat_id)
}
}
#[cfg(not(test))]
fn default_session_db_path() -> Result<PathBuf, std::io::Error> {
let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
Ok(home.join(".picobot").join("storage").join("sessions.db"))
}
fn map_session_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord> {
Ok(SessionRecord {
id: row.get(0)?,
title: row.get(1)?,
channel_name: row.get(2)?,
chat_id: row.get(3)?,
summary: row.get(4)?,
created_at: row.get(5)?,
updated_at: row.get(6)?,
last_active_at: row.get(7)?,
archived_at: row.get(8)?,
deleted_at: row.get(9)?,
message_count: row.get(10)?,
reset_cutoff_seq: row.get(11)?,
user_turn_count: row.get(12)?,
agent_prompt_reinjection_count: row.get(13)?,
})
}
fn map_skill_event_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<SkillEventRecord> {
let payload_json: String = row.get(4)?;
let payload = serde_json::from_str(&payload_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(4, rusqlite::types::Type::Text, Box::new(err))
})?;
Ok(SkillEventRecord {
id: row.get(0)?,
session_id: row.get(1)?,
event_type: row.get(2)?,
skill_name: row.get(3)?,
payload,
created_at: row.get(5)?,
})
}
fn map_memory_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<MemoryRecord> {
Ok(MemoryRecord {
id: row.get(0)?,
scope_kind: row.get(1)?,
scope_key: row.get(2)?,
namespace: row.get(3)?,
memory_key: row.get(4)?,
content: row.get(5)?,
source_type: row.get(6)?,
source_session_id: row.get(7)?,
source_message_id: row.get(8)?,
source_message_seq: row.get(9)?,
source_channel_name: row.get(10)?,
source_chat_id: row.get(11)?,
created_at: row.get(12)?,
updated_at: row.get(13)?,
})
}
fn map_scheduler_job_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<SchedulerJobRecord> {
let schedule_json: String = row.get(2)?;
let target_json: String = row.get(5)?;
let payload_json: String = row.get(6)?;
let state: String = row.get(8)?;
let last_status: Option<String> = row.get(9)?;
let schedule = serde_json::from_str(&schedule_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(2, rusqlite::types::Type::Text, Box::new(err))
})?;
let target = serde_json::from_str(&target_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(5, rusqlite::types::Type::Text, Box::new(err))
})?;
let payload = serde_json::from_str(&payload_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(6, rusqlite::types::Type::Text, Box::new(err))
})?;
Ok(SchedulerJobRecord {
id: row.get(0)?,
kind: row.get(1)?,
schedule,
interval_secs: row.get(3)?,
startup_delay_secs: row.get(4)?,
target,
payload,
enabled: row.get::<_, i64>(7)? != 0,
state: SchedulerJobState::from_str(&state).ok_or_else(|| {
rusqlite::Error::FromSqlConversionFailure(
8,
rusqlite::types::Type::Text,
format!("invalid scheduler job state: {}", state).into(),
)
})?,
last_status: last_status.and_then(|value| SchedulerJobStatus::from_str(&value)),
last_error: row.get(10)?,
run_count: row.get(11)?,
max_runs: row.get(12)?,
last_fired_at: row.get(13)?,
next_fire_at: row.get(14)?,
paused_at: row.get(15)?,
completed_at: row.get(16)?,
created_at: row.get(17)?,
updated_at: row.get(18)?,
})
}
fn ensure_sessions_schema(conn: &Connection) -> Result<(), StorageError> {
if !has_column(conn, "sessions", "reset_cutoff_seq")? {
add_column_if_missing(
conn,
"ALTER TABLE sessions ADD COLUMN reset_cutoff_seq INTEGER NOT NULL DEFAULT 0",
)?;
}
if !has_column(conn, "sessions", "user_turn_count")? {
add_column_if_missing(
conn,
"ALTER TABLE sessions ADD COLUMN user_turn_count INTEGER NOT NULL DEFAULT 0",
)?;
}
if !has_column(conn, "sessions", "agent_prompt_reinjection_count")? {
add_column_if_missing(
conn,
"ALTER TABLE sessions ADD COLUMN agent_prompt_reinjection_count INTEGER NOT NULL DEFAULT 0",
)?;
}
Ok(())
}
fn ensure_messages_schema(conn: &Connection) -> Result<(), StorageError> {
if !has_column(conn, "messages", "system_context")? {
add_column_if_missing(conn, "ALTER TABLE messages ADD COLUMN system_context TEXT")?;
}
if !has_column(conn, "messages", "reasoning_content")? {
add_column_if_missing(
conn,
"ALTER TABLE messages ADD COLUMN reasoning_content TEXT",
)?;
}
Ok(())
}
fn ensure_scheduler_schema(conn: &Connection) -> Result<(), StorageError> {
if !has_column(conn, "scheduler_jobs", "schedule_json")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN schedule_json TEXT NOT NULL DEFAULT '{}'",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "state")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN state TEXT NOT NULL DEFAULT 'scheduled'",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "last_status")? {
conn.execute("ALTER TABLE scheduler_jobs ADD COLUMN last_status TEXT", [])?;
}
if !has_column(conn, "scheduler_jobs", "last_error")? {
conn.execute("ALTER TABLE scheduler_jobs ADD COLUMN last_error TEXT", [])?;
}
if !has_column(conn, "scheduler_jobs", "run_count")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN run_count INTEGER NOT NULL DEFAULT 0",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "max_runs")? {
conn.execute("ALTER TABLE scheduler_jobs ADD COLUMN max_runs INTEGER", [])?;
}
if !has_column(conn, "scheduler_jobs", "paused_at")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN paused_at INTEGER",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "completed_at")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN completed_at INTEGER",
[],
)?;
}
Ok(())
}
fn has_column(
conn: &Connection,
table_name: &str,
column_name: &str,
) -> Result<bool, StorageError> {
let pragma = format!("PRAGMA table_info({})", table_name);
let mut stmt = conn.prepare(&pragma)?;
let mut rows = stmt.query([])?;
while let Some(row) = rows.next()? {
let existing_name: String = row.get(1)?;
if existing_name == column_name {
return Ok(true);
}
}
Ok(false)
}
fn add_column_if_missing(conn: &Connection, sql: &str) -> Result<(), StorageError> {
match conn.execute(sql, []) {
Ok(_) => Ok(()),
Err(rusqlite::Error::SqliteFailure(_, Some(message)))
if message.contains("duplicate column name") =>
{
Ok(())
}
Err(error) => Err(StorageError::Database(error)),
}
}
fn active_reset_cutoff(conn: &Connection, session_id: &str) -> Result<i64, StorageError> {
let cutoff = conn
.query_row(
"SELECT reset_cutoff_seq FROM sessions WHERE id = ?1 AND deleted_at IS NULL",
params![session_id],
|row| row.get(0),
)
.optional()?;
Ok(cutoff.unwrap_or(0))
}
fn insert_message_with_seq(
conn: &rusqlite::Transaction<'_>,
session_id: &str,
seq: i64,
message: &ChatMessage,
) -> Result<(), StorageError> {
let media_refs_json = serde_json::to_string(&message.media_refs)?;
let tool_calls_json = message
.tool_calls
.as_ref()
.map(serde_json::to_string)
.transpose()?;
conn.execute(
"
INSERT INTO messages (
id, session_id, seq, role, content,
system_context, reasoning_content, media_refs_json, tool_call_id, tool_name, tool_calls_json, created_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12)
",
params![
message.id,
session_id,
seq,
message.role,
message.content,
message.system_context,
message.reasoning_content,
media_refs_json,
message.tool_call_id,
message.tool_name,
tool_calls_json,
message.timestamp,
],
)?;
Ok(())
}
fn clone_message_for_compaction(message: &ChatMessage, timestamp: i64) -> ChatMessage {
ChatMessage {
id: uuid::Uuid::new_v4().to_string(),
role: message.role.clone(),
content: message.content.clone(),
media_refs: message.media_refs.clone(),
timestamp,
system_context: message.system_context.clone(),
reasoning_content: message.reasoning_content.clone(),
tool_call_id: message.tool_call_id.clone(),
tool_name: message.tool_name.clone(),
tool_state: message.tool_state.clone(),
tool_calls: message.tool_calls.clone(),
}
}
fn load_messages_between(
conn: &rusqlite::Transaction<'_>,
session_id: &str,
start_seq_exclusive: i64,
end_seq_inclusive: i64,
) -> Result<Vec<ChatMessage>, StorageError> {
let mut stmt = conn.prepare(
"
SELECT id, role, content, system_context, reasoning_content, media_refs_json, created_at, tool_call_id, tool_name, tool_calls_json
FROM messages
WHERE session_id = ?1 AND seq > ?2 AND seq <= ?3
ORDER BY seq ASC
",
)?;
let rows = stmt.query_map(
params![session_id, start_seq_exclusive, end_seq_inclusive],
|row| {
let media_refs_json: String = row.get(5)?;
let media_refs: Vec<String> =
serde_json::from_str(&media_refs_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
media_refs_json.len(),
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
let tool_calls_json: Option<String> = row.get(9)?;
let tool_calls = tool_calls_json
.as_deref()
.map(serde_json::from_str)
.transpose()
.map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
9,
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
Ok(ChatMessage {
id: row.get(0)?,
role: row.get(1)?,
content: row.get(2)?,
system_context: row.get(3)?,
reasoning_content: row.get(4)?,
media_refs,
timestamp: row.get(6)?,
tool_call_id: row.get(7)?,
tool_name: row.get(8)?,
tool_state: None,
tool_calls,
})
},
)?;
let mut messages = Vec::new();
for row in rows {
messages.push(row?);
}
Ok(messages)
}
fn load_messages_after(
conn: &Connection,
session_id: &str,
cutoff_seq: i64,
) -> Result<Vec<ChatMessage>, StorageError> {
let mut stmt = conn.prepare(
"
SELECT id, role, content, system_context, reasoning_content, media_refs_json, created_at, tool_call_id, tool_name, tool_calls_json
FROM messages
WHERE session_id = ?1 AND seq > ?2
ORDER BY seq ASC
",
)?;
let rows = stmt.query_map(params![session_id, cutoff_seq], |row| {
let media_refs_json: String = row.get(5)?;
let media_refs: Vec<String> = serde_json::from_str(&media_refs_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
media_refs_json.len(),
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
let tool_calls_json: Option<String> = row.get(9)?;
let tool_calls = tool_calls_json
.as_deref()
.map(serde_json::from_str)
.transpose()
.map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
9,
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
Ok(ChatMessage {
id: row.get(0)?,
role: row.get(1)?,
content: row.get(2)?,
system_context: row.get(3)?,
reasoning_content: row.get(4)?,
media_refs,
timestamp: row.get(6)?,
tool_call_id: row.get(7)?,
tool_name: row.get(8)?,
tool_state: None,
tool_calls,
})
})?;
let mut messages = Vec::new();
for row in rows {
messages.push(row?);
}
Ok(messages)
}
fn current_timestamp() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.expect("system clock before unix epoch")
.as_millis() as i64
}
fn quote_fts_query(query: &str) -> String {
format!("\"{}\"", query.replace('"', "\"\""))
}
fn quote_fts_or_query(queries: &[String]) -> String {
queries
.iter()
.map(|query| query.trim())
.filter(|query| !query.is_empty())
.map(quote_fts_query)
.collect::<Vec<_>>()
.join(" OR ")
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bus::SYSTEM_CONTEXT_AGENT_PROMPT;
use crate::domain::messages::ToolCall;
#[test]
fn test_persistent_session_id_for_cli_and_channel() {
assert_eq!(persistent_session_id("cli", "abc"), "abc");
assert_eq!(persistent_session_id("feishu", "abc"), "feishu:abc");
}
#[test]
fn test_session_store_roundtrip_and_lifecycle() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("demo")).unwrap();
assert_eq!(session.title, "demo");
assert_eq!(session.channel_name, "cli");
assert_eq!(session.chat_id, session.id);
assert_eq!(session.message_count, 0);
assert_eq!(session.reset_cutoff_seq, 0);
assert_eq!(session.user_turn_count, 0);
assert_eq!(session.agent_prompt_reinjection_count, 0);
let first = ChatMessage::user("hello");
let second = ChatMessage::assistant("world");
store.append_message(&session.id, &first).unwrap();
store.append_message(&session.id, &second).unwrap();
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.message_count, 2);
assert!(stored.archived_at.is_none());
assert_eq!(stored.reset_cutoff_seq, 0);
assert_eq!(stored.user_turn_count, 1);
assert_eq!(stored.agent_prompt_reinjection_count, 0);
let messages = store.load_messages(&session.id).unwrap();
assert_eq!(messages.len(), 2);
assert_eq!(messages[0].role, "user");
assert_eq!(messages[0].content, "hello");
assert_eq!(messages[1].role, "assistant");
assert_eq!(messages[1].content, "world");
store.rename_session(&session.id, "renamed").unwrap();
let renamed = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(renamed.title, "renamed");
store.archive_session(&session.id).unwrap();
let archived = store.get_session(&session.id).unwrap().unwrap();
assert!(archived.archived_at.is_some());
let active_only = store.list_sessions("cli", false).unwrap();
assert!(active_only.is_empty());
let including_archived = store.list_sessions("cli", true).unwrap();
assert_eq!(including_archived.len(), 1);
store.clear_messages(&session.id).unwrap();
let cleared = store.load_messages(&session.id).unwrap();
assert!(cleared.is_empty());
let cleared_session = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(cleared_session.message_count, 0);
assert_eq!(cleared_session.user_turn_count, 0);
assert_eq!(cleared_session.agent_prompt_reinjection_count, 0);
store.delete_session(&session.id).unwrap();
assert!(store.get_session(&session.id).unwrap().is_none());
}
#[test]
fn test_ensure_channel_session_is_stable() {
let store = SessionStore::in_memory().unwrap();
let first = store.ensure_channel_session("feishu", "chat-1").unwrap();
let second = store.ensure_channel_session("feishu", "chat-1").unwrap();
assert_eq!(first.id, second.id);
assert_eq!(first.chat_id, "chat-1");
assert_eq!(second.channel_name, "feishu");
}
#[test]
fn test_assistant_tool_calls_roundtrip() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("tools")).unwrap();
let assistant = ChatMessage::assistant_with_tool_calls(
"calling tool",
vec![ToolCall {
id: "call_1".to_string(),
name: "calculator".to_string(),
arguments: serde_json::json!({ "expression": "3*7" }),
}],
);
store.append_message(&session.id, &assistant).unwrap();
let messages = store.load_messages(&session.id).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].role, "assistant");
assert_eq!(messages[0].tool_calls.as_ref().unwrap().len(), 1);
assert_eq!(messages[0].tool_calls.as_ref().unwrap()[0].id, "call_1");
assert_eq!(
messages[0].tool_calls.as_ref().unwrap()[0].name,
"calculator"
);
}
#[test]
fn test_assistant_reasoning_content_roundtrip() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("reasoning")).unwrap();
let assistant = ChatMessage::assistant_with_reasoning("final answer", "hidden reasoning");
store.append_message(&session.id, &assistant).unwrap();
let messages = store.load_messages(&session.id).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].content, "final answer");
assert_eq!(
messages[0].reasoning_content.as_deref(),
Some("hidden reasoning")
);
}
#[test]
fn test_reset_session_preserves_full_history_and_hides_active_history() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("reset")).unwrap();
store
.append_message(&session.id, &ChatMessage::user("before"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::assistant("context"))
.unwrap();
store.reset_session(&session.id).unwrap();
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.reset_cutoff_seq, 2);
assert_eq!(stored.user_turn_count, 0);
assert_eq!(stored.agent_prompt_reinjection_count, 0);
let active_messages = store.load_messages(&session.id).unwrap();
assert!(active_messages.is_empty());
let all_messages = store.load_all_messages(&session.id).unwrap();
assert_eq!(all_messages.len(), 2);
assert_eq!(all_messages[0].content, "before");
assert_eq!(all_messages[1].content, "context");
store
.append_message(&session.id, &ChatMessage::user("after"))
.unwrap();
let active_messages = store.load_messages(&session.id).unwrap();
assert_eq!(active_messages.len(), 1);
assert_eq!(active_messages[0].content, "after");
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.user_turn_count, 1);
}
#[test]
fn test_schema_migration_adds_reset_cutoff_column() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
"
CREATE TABLE sessions (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
channel_name TEXT NOT NULL,
chat_id TEXT NOT NULL,
summary TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_active_at INTEGER NOT NULL,
archived_at INTEGER,
deleted_at INTEGER,
message_count INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
seq INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
media_refs_json TEXT NOT NULL,
tool_call_id TEXT,
tool_name TEXT,
tool_calls_json TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE,
UNIQUE(session_id, seq)
);
",
)
.unwrap();
let store = SessionStore::from_connection(conn).unwrap();
let session = store.create_cli_session(Some("migrated")).unwrap();
assert_eq!(session.reset_cutoff_seq, 0);
assert_eq!(session.user_turn_count, 0);
assert_eq!(session.agent_prompt_reinjection_count, 0);
}
#[test]
fn test_schema_migration_adds_reasoning_content_column_to_messages() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
"
CREATE TABLE sessions (
id TEXT PRIMARY KEY,
title TEXT NOT NULL,
channel_name TEXT NOT NULL,
chat_id TEXT NOT NULL,
summary TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_active_at INTEGER NOT NULL,
archived_at INTEGER,
deleted_at INTEGER,
message_count INTEGER NOT NULL DEFAULT 0
);
CREATE TABLE messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
seq INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
media_refs_json TEXT NOT NULL,
tool_call_id TEXT,
tool_name TEXT,
tool_calls_json TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE,
UNIQUE(session_id, seq)
);
",
)
.unwrap();
let _store = SessionStore::from_connection(conn).unwrap();
let conn = _store.conn.lock().unwrap();
assert!(has_column(&conn, "messages", "reasoning_content").unwrap());
}
#[test]
fn test_count_active_user_messages_respects_reset_cutoff_seq() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("count-users")).unwrap();
store
.append_message(&session.id, &ChatMessage::system("agent"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::user("u1"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::assistant("a1"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::user("u2"))
.unwrap();
assert_eq!(store.count_active_user_messages(&session.id).unwrap(), 2);
store.reset_session(&session.id).unwrap();
assert_eq!(store.count_active_user_messages(&session.id).unwrap(), 0);
store
.append_message(&session.id, &ChatMessage::system("agent-again"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::user("u3"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::user("u4"))
.unwrap();
assert_eq!(store.count_active_user_messages(&session.id).unwrap(), 2);
}
#[test]
fn test_compact_active_history_rebuilds_active_segment_with_delta_messages() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("compact-history")).unwrap();
let agent_prompt = ChatMessage::system_with_context(
"agent",
Some(SYSTEM_CONTEXT_AGENT_PROMPT.to_string()),
);
let seed_messages = vec![
agent_prompt.clone(),
ChatMessage::user("u1"),
ChatMessage::assistant("a1"),
ChatMessage::user("u2"),
ChatMessage::assistant("a2"),
ChatMessage::user("u3"),
ChatMessage::assistant("a3"),
ChatMessage::user("u4"),
ChatMessage::assistant("a4"),
];
for message in &seed_messages {
store.append_message(&session.id, message).unwrap();
}
let snapshot_end_seq = store
.get_session(&session.id)
.unwrap()
.unwrap()
.message_count;
let preserved_messages = store.load_messages(&session.id).unwrap()[3..].to_vec();
let preserved_system_messages = vec![agent_prompt];
store
.append_message(&session.id, &ChatMessage::user("u5"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::assistant("a5"))
.unwrap();
let summary_message = ChatMessage::system("[Compressed History]\n\nsummary");
let compacted = store
.compact_active_history(
&session.id,
0,
snapshot_end_seq,
&preserved_system_messages,
&summary_message,
&preserved_messages,
)
.unwrap();
assert!(compacted);
let active_messages = store.load_messages(&session.id).unwrap();
assert_eq!(active_messages.len(), 10);
assert_eq!(active_messages[0].role, "system");
assert_eq!(active_messages[0].content, "agent");
assert_eq!(
active_messages[0].system_context.as_deref(),
Some(SYSTEM_CONTEXT_AGENT_PROMPT)
);
assert_eq!(active_messages[1].role, "system");
assert_eq!(
active_messages[1].content,
"[Compressed History]\n\nsummary"
);
assert_eq!(active_messages[2].content, "u2");
assert_eq!(active_messages[3].content, "a2");
assert_eq!(active_messages[8].content, "u5");
assert_eq!(active_messages[9].content, "a5");
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.reset_cutoff_seq, 11);
assert_eq!(stored.user_turn_count, 4);
let all_messages = store.load_all_messages(&session.id).unwrap();
assert_eq!(all_messages.len(), 21);
}
#[test]
fn test_mark_agent_prompt_reinjected_increments_counter() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("prompt")).unwrap();
store.mark_agent_prompt_reinjected(&session.id).unwrap();
store.mark_agent_prompt_reinjected(&session.id).unwrap();
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.agent_prompt_reinjection_count, 2);
}
#[test]
fn test_tool_result_roundtrip() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("tool-result")).unwrap();
let tool_message = ChatMessage::tool("call_9", "file_write", "saved to /tmp/output.txt");
store.append_message(&session.id, &tool_message).unwrap();
let messages = store.load_messages(&session.id).unwrap();
assert_eq!(messages.len(), 1);
assert_eq!(messages[0].role, "tool");
assert_eq!(messages[0].content, "saved to /tmp/output.txt");
assert_eq!(messages[0].tool_call_id.as_deref(), Some("call_9"));
assert_eq!(messages[0].tool_name.as_deref(), Some("file_write"));
assert!(messages[0].tool_calls.is_none());
}
#[test]
fn test_skill_events_roundtrip() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("skill-events")).unwrap();
store
.append_skill_event(None, "discovered", None, &serde_json::json!({"count": 2}))
.unwrap();
store
.append_skill_event(
Some(&session.id),
"activated",
Some("code-review"),
&serde_json::json!({"source": "project"}),
)
.unwrap();
let global_events = store.list_skill_events(None).unwrap();
assert_eq!(global_events.len(), 1);
assert_eq!(global_events[0].event_type, "discovered");
assert_eq!(global_events[0].payload["count"], 2);
let session_events = store.list_skill_events(Some(&session.id)).unwrap();
assert_eq!(session_events.len(), 1);
assert_eq!(session_events[0].event_type, "activated");
assert_eq!(session_events[0].skill_name.as_deref(), Some("code-review"));
assert_eq!(session_events[0].payload["source"], "project");
}
#[test]
fn test_memory_roundtrip_with_source_fields() {
let store = SessionStore::in_memory().unwrap();
let saved = store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "profile".to_string(),
memory_key: "language".to_string(),
content: "Rust".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-1".to_string()),
source_message_id: Some("msg-1".to_string()),
source_message_seq: Some(7),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-1".to_string()),
})
.unwrap();
assert_eq!(saved.content, "Rust");
assert_eq!(saved.source_type, "message");
assert_eq!(saved.source_session_id.as_deref(), Some("feishu:chat-1"));
assert_eq!(saved.source_message_id.as_deref(), Some("msg-1"));
assert_eq!(saved.source_message_seq, Some(7));
let fetched = store
.get_memory("user", "feishu:user-1", "profile", "language")
.unwrap()
.unwrap();
assert_eq!(fetched.id, saved.id);
assert_eq!(fetched.source_chat_id.as_deref(), Some("chat-1"));
}
#[test]
fn test_memory_fts_tracks_upsert_and_delete() {
let store = SessionStore::in_memory().unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "preferences".to_string(),
memory_key: "editor".to_string(),
content: "Prefers rust-analyzer and cargo test output".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-2".to_string()),
source_message_id: Some("msg-2".to_string()),
source_message_seq: Some(3),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-2".to_string()),
})
.unwrap();
let hits = store
.search_memories("user", "feishu:user-1", "rust-analyzer", None, 10)
.unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].memory_key, "editor");
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "preferences".to_string(),
memory_key: "editor".to_string(),
content: "Prefers clippy diagnostics".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-3".to_string()),
source_message_id: Some("msg-3".to_string()),
source_message_seq: Some(4),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-3".to_string()),
})
.unwrap();
let old_hits = store
.search_memories("user", "feishu:user-1", "rust-analyzer", None, 10)
.unwrap();
assert!(old_hits.is_empty());
let new_hits = store
.search_memories("user", "feishu:user-1", "clippy", None, 10)
.unwrap();
assert_eq!(new_hits.len(), 1);
let deleted = store
.delete_memory("user", "feishu:user-1", "preferences", "editor")
.unwrap();
assert!(deleted);
let hits_after_delete = store
.search_memories("user", "feishu:user-1", "clippy", None, 10)
.unwrap();
assert!(hits_after_delete.is_empty());
}
#[test]
fn test_memory_search_matches_memory_key_field() {
let store = SessionStore::in_memory().unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "preferences".to_string(),
memory_key: "email_folder_preference".to_string(),
content: "用户提到邮件时默认查看代收邮箱。".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-8".to_string()),
source_message_id: Some("msg-8".to_string()),
source_message_seq: Some(8),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-8".to_string()),
})
.unwrap();
let hits = store
.search_memories("user", "feishu:user-1", "email_folder_preference", None, 10)
.unwrap();
assert_eq!(hits.len(), 1);
assert_eq!(hits[0].memory_key, "email_folder_preference");
}
#[test]
fn test_search_memories_any_matches_multiple_keywords_once() {
let store = SessionStore::in_memory().unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "preferences".to_string(),
memory_key: "editor".to_string(),
content: "Prefers rust-analyzer and cargo test output".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-2".to_string()),
source_message_id: Some("msg-2".to_string()),
source_message_seq: Some(3),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-2".to_string()),
})
.unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "tasks".to_string(),
memory_key: "quality".to_string(),
content: "Tracks clippy warnings before release".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-3".to_string()),
source_message_id: Some("msg-3".to_string()),
source_message_seq: Some(4),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-3".to_string()),
})
.unwrap();
let hits = store
.search_memories_any(
"user",
"feishu:user-1",
&["rust-analyzer".to_string(), "clippy".to_string()],
None,
10,
)
.unwrap();
assert_eq!(hits.len(), 2);
assert!(hits.iter().any(|memory| memory.memory_key == "editor"));
assert!(hits.iter().any(|memory| memory.memory_key == "quality"));
}
#[test]
fn test_memory_scope_listing_and_full_scope_read() {
let store = SessionStore::in_memory().unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-2".to_string(),
namespace: "preferences".to_string(),
memory_key: "style".to_string(),
content: "偏好简洁表达".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-2".to_string()),
source_message_id: Some("msg-2".to_string()),
source_message_seq: Some(2),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-2".to_string()),
})
.unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "profile".to_string(),
memory_key: "work".to_string(),
content: "用户在做AI产品".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-1".to_string()),
source_message_id: Some("msg-1".to_string()),
source_message_seq: Some(1),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-1".to_string()),
})
.unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "patterns".to_string(),
memory_key: "workflow".to_string(),
content: "习惯先问方案再要代码".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-1".to_string()),
source_message_id: Some("msg-3".to_string()),
source_message_seq: Some(3),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-1".to_string()),
})
.unwrap();
let scope_keys = store.list_memory_scope_keys("user").unwrap();
assert_eq!(
scope_keys,
vec!["feishu:user-1".to_string(), "feishu:user-2".to_string()]
);
let full_scope = store
.list_memories_for_scope("user", "feishu:user-1")
.unwrap();
assert_eq!(full_scope.len(), 2);
assert!(
full_scope
.iter()
.all(|memory| memory.scope_key == "feishu:user-1")
);
assert!(full_scope.iter().any(|memory| memory.memory_key == "work"));
assert!(
full_scope
.iter()
.any(|memory| memory.memory_key == "workflow")
);
}
#[test]
fn test_scheduler_job_roundtrip_and_runtime_update() {
let store = SessionStore::in_memory().unwrap();
let saved = store
.upsert_scheduler_job(&SchedulerJobUpsert {
id: "heartbeat".to_string(),
kind: "outbound_message".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 300,
"startup_delay_secs": 10,
}),
interval_secs: 300,
startup_delay_secs: 10,
target: serde_json::json!({
"channel": "feishu",
"chat_id": "oc_demo",
}),
payload: serde_json::json!({
"content": "heartbeat",
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: Some(3),
last_fired_at: None,
next_fire_at: Some(1_700_000_000_000),
paused_at: None,
completed_at: None,
})
.unwrap();
assert_eq!(saved.id, "heartbeat");
assert_eq!(saved.kind, "outbound_message");
assert_eq!(saved.state, SchedulerJobState::Scheduled);
assert_eq!(saved.max_runs, Some(3));
store
.update_scheduler_job_runtime(
"heartbeat",
SchedulerJobState::Completed,
Some(SchedulerJobStatus::Ok),
None,
1,
Some(1_700_000_000_000),
None,
None,
Some(1_700_000_000_100),
)
.unwrap();
let fetched = store.get_scheduler_job("heartbeat").unwrap().unwrap();
assert_eq!(fetched.state, SchedulerJobState::Completed);
assert_eq!(fetched.last_status, Some(SchedulerJobStatus::Ok));
assert_eq!(fetched.run_count, 1);
assert_eq!(fetched.completed_at, Some(1_700_000_000_100));
}
#[test]
fn test_list_memory_scope_keys_updated_since_filters_recent_scopes() {
let store = SessionStore::in_memory().unwrap();
let first = store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "profile".to_string(),
memory_key: "work".to_string(),
content: "用户在做AI产品".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
})
.unwrap();
let cutoff = first.updated_at;
std::thread::sleep(std::time::Duration::from_millis(2));
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-2".to_string(),
namespace: "preferences".to_string(),
memory_key: "style".to_string(),
content: "偏好简洁表达".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
})
.unwrap();
let recent_scope_keys = store
.list_memory_scope_keys_updated_since("user", cutoff)
.unwrap();
assert_eq!(recent_scope_keys, vec!["feishu:user-2".to_string()]);
}
}