diff --git a/docs/plans/2026-04-28-phase1-storage-implementation.md b/docs/plans/2026-04-28-phase1-storage-implementation.md new file mode 100644 index 0000000..d6b1b16 --- /dev/null +++ b/docs/plans/2026-04-28-phase1-storage-implementation.md @@ -0,0 +1,877 @@ +# Phase 1: Storage 基础 实现计划 + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** 创建 `src/storage/` 模块,实现 SQLite 持久化层,为后续 Session 扩展提供 Storage 基础设施。 + +**Architecture:** 使用 `sqlx` + `sqlite`,通过 `SqlitePool` 实现异步连接池,所有 Storage 操作均为 async,在 `Storage` 内部管理连接池的生命周期。 + +**Tech Stack:** `sqlx` (sqlite, tokio), `serde`, `chrono` (时间戳), `tokio::time::sleep` (重试退避) + +--- + +## Task 1: 添加依赖 + +**Files:** +- Modify: `Cargo.toml:36` (在 `[dependencies]` 末尾添加) + +**Step 1: 添加 sqlx + sqlite 依赖** + +在 `Cargo.toml` 末尾添加: + +```toml +sqlx = { version = "0.8", features = ["sqlite", "tokio", "macros", "chrono"] } +``` + +**Step 2: 运行 cargo check 验证依赖** + +Run: `cargo check 2>&1` +Expected: 无报错,依赖解析成功 + +**Step 3: Commit** + +```bash +git add Cargo.toml +git commit -m "deps: 添加 sqlx + sqlite 依赖" +``` + +--- + +## Task 2: 创建 Storage Error 类型 + +**Files:** +- Create: `src/storage/error.rs` + +**Step 1: 编写 StorageError 枚举和测试** + +```rust +use thiserror::Error; + +#[derive(Error, Debug)] +pub enum StorageError { + #[error("session not found: {0}")] + NotFound(String), + + #[error("session already exists: {0}")] + AlreadyExists(String), + + #[error("database error: {0}")] + Database(#[from] sqlx::Error), + + #[error("serialization error: {0}")] + Serialization(String), +} +``` + +**Step 2: 验证编译** + +Run: `cargo build --lib 2>&1 | head -30` +Expected: 报错 "cannot find module `storage`"(因为模块未创建),这是预期的 + +**Step 3: Commit** + +```bash +git add src/storage/error.rs +git commit -m "feat(storage): 添加 StorageError 类型" +``` + +--- + +## Task 3: 创建 Storage 模块骨架 + +**Files:** +- Create: `src/storage/mod.rs` +- Create: `src/storage/session.rs` +- Create: `src/storage/message.rs` + +**Step 1: 创建 `src/storage/mod.rs`** + +```rust +pub mod error; +pub mod session; +pub mod message; + +pub use error::StorageError; +``` + +**Step 2: 创建 `src/storage/session.rs`(空壳)** + +```rust +// Session CRUD 操作占位符 +``` + +**Step 3: 创建 `src/storage/message.rs`(空壳)** + +```rust +// Message CRUD 操作占位符 +``` + +**Step 4: 在 `src/lib.rs` 中添加 storage 模块** + +在 `src/lib.rs` 末尾添加: + +```rust +pub mod storage; +``` + +**Step 5: 验证编译** + +Run: `cargo build --lib 2>&1` +Expected: 编译成功(空壳模块) + +**Step 6: Commit** + +```bash +git add src/storage/ src/lib.rs +git commit -m "feat(storage): 创建 storage 模块骨架" +``` + +--- + +## Task 4: 实现 Storage 主结构 + +**Files:** +- Modify: `src/storage/mod.rs` + +**Step 1: 编写 Storage 结构和初始化逻辑** + +```rust +use sqlx::{Pool, Sqlite, SqlitePool}; +use std::path::Path; + +pub struct Storage { + pool: Pool, +} + +impl Storage { + /// 打开或创建数据库 + pub async fn new(db_path: &Path) -> Result { + let database_url = format!("sqlite:{}?mode=rwc", db_path.display()); + let pool = SqlitePool::connect(&database_url).await?; + + let storage = Self { pool }; + storage.init_schema().await?; + Ok(storage) + } + + /// 初始化数据库 schema + async fn init_schema(&self) -> Result<(), StorageError> { + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS sessions ( + id TEXT PRIMARY KEY, + channel TEXT NOT NULL, + chat_id TEXT NOT NULL, + dialog_id TEXT NOT NULL, + title TEXT NOT NULL DEFAULT '新对话', + created_at INTEGER NOT NULL, + last_active_at INTEGER NOT NULL, + message_count INTEGER DEFAULT 0, + routing_info TEXT, + deleted_at INTEGER, + UNIQUE(channel, chat_id, dialog_id) + ) + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE INDEX IF NOT EXISTS idx_sessions_chat + ON sessions(channel, chat_id, deleted_at) + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS messages ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + seq INTEGER NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + media_refs TEXT, + tool_call_id TEXT, + tool_name TEXT, + tool_calls TEXT, + created_at INTEGER NOT NULL, + FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE + ) + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE INDEX IF NOT EXISTS idx_messages_session_seq + ON messages(session_id, seq) + "#, + ) + .execute(&self.pool) + .await?; + + Ok(()) + } + + /// 获取连接池引用(供内部 CRUD 使用) + pub(crate) fn pool(&self) -> &Pool { + &self.pool + } +} +``` + +**Step 2: 验证编译** + +Run: `cargo build --lib 2>&1` +Expected: 编译成功 + +**Step 3: Commit** + +```bash +git add src/storage/mod.rs +git commit -m "feat(storage): 实现 Storage 主结构和初始化" +``` + +--- + +## Task 5: 定义 SessionMeta 和 MessageMeta 数据结构 + +**Files:** +- Modify: `src/storage/session.rs` +- Modify: `src/storage/message.rs` + +**Step 1: 在 `session.rs` 中定义 SessionMeta** + +```rust +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SessionMeta { + pub id: String, + pub channel: String, + pub chat_id: String, + pub dialog_id: String, + pub title: String, + pub created_at: i64, + pub last_active_at: i64, + pub message_count: i64, + pub routing_info: Option, + pub deleted_at: Option, +} +``` + +**Step 2: 在 `message.rs` 中定义 MessageMeta** + +```rust +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MessageMeta { + pub id: String, + pub session_id: String, + pub seq: i64, + pub role: String, + pub content: String, + pub media_refs: Option, + pub tool_call_id: Option, + pub tool_name: Option, + pub tool_calls: Option, + pub created_at: i64, +} +``` + +**Step 3: 验证编译** + +Run: `cargo build --lib 2>&1` +Expected: 编译成功 + +**Step 4: Commit** + +```bash +git add src/storage/session.rs src/storage/message.rs +git commit -m "feat(storage): 定义 SessionMeta 和 MessageMeta 数据结构" +``` + +--- + +## Task 6: 实现 Session CRUD 操作 + +**Files:** +- Modify: `src/storage/session.rs` + +**Step 1: 编写 upsert_session** + +```rust +use sqlx::Row; +use super::SessionMeta; + +impl Storage { + pub async fn upsert_session(&self, meta: &SessionMeta) -> Result<(), StorageError> { + sqlx::query( + r#" + INSERT INTO sessions (id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + title = excluded.title, + last_active_at = excluded.last_active_at, + message_count = excluded.message_count, + routing_info = excluded.routing_info, + deleted_at = excluded.deleted_at + "#, + ) + .bind(&meta.id) + .bind(&meta.channel) + .bind(&meta.chat_id) + .bind(&meta.dialog_id) + .bind(&meta.title) + .bind(meta.created_at) + .bind(meta.last_active_at) + .bind(meta.message_count) + .bind(&meta.routing_info) + .bind(meta.deleted_at) + .execute(self.pool()) + .await?; + + Ok(()) + } + + pub async fn get_session(&self, id: &str) -> Result { + let row = sqlx::query( + r#" + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at + FROM sessions WHERE id = ? AND deleted_at IS NULL + "#, + ) + .bind(id) + .fetch_optional(self.pool()) + .await? + .ok_or_else(|| StorageError::NotFound(id.to_string()))?; + + Ok(SessionMeta { + id: row.get("id"), + channel: row.get("channel"), + chat_id: row.get("chat_id"), + dialog_id: row.get("dialog_id"), + title: row.get("title"), + created_at: row.get("created_at"), + last_active_at: row.get("last_active_at"), + message_count: row.get("message_count"), + routing_info: row.get("routing_info"), + deleted_at: row.get("deleted_at"), + }) + } + + pub async fn list_sessions( + &self, + channel: &str, + chat_id: &str, + limit: i64, + ) -> Result, StorageError> { + let rows = sqlx::query( + r#" + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at + FROM sessions + WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL + ORDER BY last_active_at DESC + LIMIT ? + "#, + ) + .bind(channel) + .bind(chat_id) + .bind(limit) + .fetch_all(self.pool()) + .await?; + + Ok(rows + .into_iter() + .map(|row| SessionMeta { + id: row.get("id"), + channel: row.get("channel"), + chat_id: row.get("chat_id"), + dialog_id: row.get("dialog_id"), + title: row.get("title"), + created_at: row.get("created_at"), + last_active_at: row.get("last_active_at"), + message_count: row.get("message_count"), + routing_info: row.get("routing_info"), + deleted_at: row.get("deleted_at"), + }) + .collect()) + } + + pub async fn touch_session( + &self, + id: &str, + message_count: i64, + last_active_at: i64, + ) -> Result<(), StorageError> { + sqlx::query( + r#" + UPDATE sessions SET message_count = ?, last_active_at = ? + WHERE id = ? + "#, + ) + .bind(message_count) + .bind(last_active_at) + .bind(id) + .execute(self.pool()) + .await?; + + Ok(()) + } + + pub async fn soft_delete_session(&self, id: &str) -> Result<(), StorageError> { + let now = chrono::Utc::now().timestamp_millis(); + sqlx::query( + r#"UPDATE sessions SET deleted_at = ? WHERE id = ?"#, + ) + .bind(now) + .bind(id) + .execute(self.pool()) + .await?; + + Ok(()) + } + + /// 查找 channel:chat_id 下最近活跃且未过期的 session + pub async fn find_active_session( + &self, + channel: &str, + chat_id: &str, + ttl_millis: i64, + ) -> Result, StorageError> { + let cutoff = chrono::Utc::now().timestamp_millis() - ttl_millis; + let row = sqlx::query( + r#" + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at + FROM sessions + WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL AND last_active_at > ? + ORDER BY last_active_at DESC + LIMIT 1 + "#, + ) + .bind(channel) + .bind(chat_id) + .bind(cutoff) + .fetch_optional(self.pool()) + .await?; + + match row { + Some(row) => Ok(Some(SessionMeta { + id: row.get("id"), + channel: row.get("channel"), + chat_id: row.get("chat_id"), + dialog_id: row.get("dialog_id"), + title: row.get("title"), + created_at: row.get("created_at"), + last_active_at: row.get("last_active_at"), + message_count: row.get("message_count"), + routing_info: row.get("routing_info"), + deleted_at: row.get("deleted_at"), + })), + None => Ok(None), + } + } +} +``` + +> 注意:`Storage` 的 CRUD 方法需要能访问 `pool()`,但目前 `pool()` 是 `pub(crate)`。在 `mod.rs` 中为 `session.rs` 实现 `Storage` 的 CRUD,所以同模块内可访问。 + +**Step 2: 验证编译** + +Run: `cargo build --lib 2>&1` +Expected: 编译成功 + +**Step 3: Commit** + +```bash +git add src/storage/session.rs +git commit -m "feat(storage): 实现 Session CRUD 操作" +``` + +--- + +## Task 7: 实现 Message CRUD 操作 + +**Files:** +- Modify: `src/storage/message.rs` + +**Step 1: 编写 Message CRUD** + +```rust +use sqlx::Row; +use super::MessageMeta; + +impl Storage { + pub async fn append_message(&self, session_id: &str, msg: &MessageMeta) -> Result { + sqlx::query( + r#" + INSERT INTO messages (id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(&msg.id) + .bind(session_id) + .bind(msg.seq) + .bind(&msg.role) + .bind(&msg.content) + .bind(&msg.media_refs) + .bind(&msg.tool_call_id) + .bind(&msg.tool_name) + .bind(&msg.tool_calls) + .bind(msg.created_at) + .execute(self.pool()) + .await?; + + Ok(msg.seq) + } + + pub async fn append_messages( + &self, + session_id: &str, + msgs: &[MessageMeta], + ) -> Result, StorageError> { + let mut seqs = Vec::with_capacity(msgs.len()); + for msg in msgs { + let seq = self.append_message(session_id, msg).await?; + seqs.push(seq); + } + Ok(seqs) + } + + pub async fn load_messages( + &self, + session_id: &str, + from_seq: i64, + ) -> Result, StorageError> { + let rows = sqlx::query( + r#" + SELECT id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, created_at + FROM messages + WHERE session_id = ? AND seq >= ? + ORDER BY seq ASC + "#, + ) + .bind(session_id) + .bind(from_seq) + .fetch_all(self.pool()) + .await?; + + Ok(rows + .into_iter() + .map(|row| MessageMeta { + id: row.get("id"), + session_id: row.get("session_id"), + seq: row.get("seq"), + role: row.get("role"), + content: row.get("content"), + media_refs: row.get("media_refs"), + tool_call_id: row.get("tool_call_id"), + tool_name: row.get("tool_name"), + tool_calls: row.get("tool_calls"), + created_at: row.get("created_at"), + }) + .collect()) + } + + pub async fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> { + sqlx::query(r#"DELETE FROM messages WHERE session_id = ?"#) + .bind(session_id) + .execute(self.pool()) + .await?; + Ok(()) + } +} +``` + +> 注意:同样在 `mod.rs` 中实现,这样 `Storage` 的方法对 `message.rs` 中的 impl 可见。 + +**Step 2: 验证编译** + +Run: `cargo build --lib 2>&1` +Expected: 编译成功 + +**Step 3: Commit** + +```bash +git add src/storage/message.rs +git commit -m "feat(storage): 实现 Message CRUD 操作" +``` + +--- + +## Task 8: 实现写入重试逻辑 + +**Files:** +- Modify: `src/storage/mod.rs` + +**Step 1: 在 Storage 中添加带重试的 append_message** + +在 `mod.rs` 的 `Storage` impl 块中添加: + +```rust +use tokio::time::{sleep, Duration}; + +impl Storage { + /// 追加消息,带重试逻辑 + /// 重试 3 次(100/200/300ms 退避),仍失败返回错误 + pub async fn append_message_with_retry( + &self, + session_id: &str, + msg: &MessageMeta, + ) -> Result { + let delays = [100, 200, 300]; + + for (i, delay) in delays.iter().enumerate() { + match self.append_message(session_id, msg).await { + Ok(seq) => return Ok(seq), + Err(e) if i < delays.len() - 1 => { + sleep(Duration::from_millis(*delay)).await; + tracing::warn!("Storage write failed, retrying: {}", e); + } + Err(e) => { + tracing::error!("Storage write failed after retries: {}", e); + return Err(e); + } + } + } + unreachable!() + } +} +``` + +> 注意:需要 `use sqlx::Row;` 在 `mod.rs` 中。 + +**Step 2: 验证编译** + +Run: `cargo build --lib 2>&1` +Expected: 编译成功 + +**Step 3: Commit** + +```bash +git add src/storage/mod.rs +git commit -m "feat(storage): 添加写入重试逻辑" +``` + +--- + +## Task 9: 编写 Storage 单元测试 + +**Files:** +- Modify: `src/storage/mod.rs`(添加测试模块) + +**Step 1: 编写 Storage 集成测试** + +在 `src/storage/mod.rs` 末尾添加: + +```rust +#[cfg(test)] +mod tests { + use super::*; + use tempfile::tempdir; + use std::path::Path; + + async fn create_test_storage() -> (Storage, impl Fn()) { + let dir = tempdir().unwrap(); + let db_path = dir.path().join("test.db"); + let storage = Storage::new(&db_path).await.unwrap(); + + let cleanup = || { + drop(dir); + }; + + (storage, cleanup) + } + + #[tokio::test] + async fn test_upsert_and_get_session() { + let (storage, cleanup) = create_test_storage().await; + defer { cleanup(); } + + let meta = SessionMeta { + id: "cli_chat:sid123:dialog1".to_string(), + channel: "cli_chat".to_string(), + chat_id: "sid123".to_string(), + dialog_id: "dialog1".to_string(), + title: "测试会话".to_string(), + created_at: 1000, + last_active_at: 1000, + message_count: 0, + routing_info: Some(r#"{"type":"cli"}"#.to_string()), + deleted_at: None, + }; + + storage.upsert_session(&meta).await.unwrap(); + + let loaded = storage.get_session(&meta.id).await.unwrap(); + assert_eq!(loaded.title, "测试会话"); + assert_eq!(loaded.channel, "cli_chat"); + } + + #[tokio::test] + async fn test_get_nonexistent_session() { + let (storage, cleanup) = create_test_storage().await; + defer { cleanup(); } + + let result = storage.get_session("nonexistent").await; + assert!(result.is_err()); + matches!(result.unwrap_err(), StorageError::NotFound(_)); + } + + #[tokio::test] + async fn test_list_sessions() { + let (storage, cleanup) = create_test_storage().await; + defer { cleanup(); } + + for i in 0..5 { + let meta = SessionMeta { + id: format!("cli_chat:sid123:dialog{}", i), + channel: "cli_chat".to_string(), + chat_id: "sid123".to_string(), + dialog_id: format!("dialog{}", i), + title: format!("会话{}", i), + created_at: i as i64 * 1000, + last_active_at: i as i64 * 1000, + message_count: i, + routing_info: None, + deleted_at: None, + }; + storage.upsert_session(&meta).await.unwrap(); + } + + let sessions = storage.list_sessions("cli_chat", "sid123", 10).await.unwrap(); + assert_eq!(sessions.len(), 5); + // 按 last_active_at DESC 排序 + assert_eq!(sessions[0].dialog_id, "dialog4"); + } + + #[tokio::test] + async fn test_soft_delete() { + let (storage, cleanup) = create_test_storage().await; + defer { cleanup(); } + + let meta = SessionMeta { + id: "cli_chat:sid123:dialog1".to_string(), + channel: "cli_chat".to_string(), + chat_id: "sid123".to_string(), + dialog_id: "dialog1".to_string(), + title: "测试".to_string(), + created_at: 1000, + last_active_at: 1000, + message_count: 0, + routing_info: None, + deleted_at: None, + }; + + storage.upsert_session(&meta).await.unwrap(); + storage.soft_delete_session(&meta.id).await.unwrap(); + + let result = storage.get_session(&meta.id).await; + assert!(result.is_err()); + matches!(result.unwrap_err(), StorageError::NotFound(_)); + } + + #[tokio::test] + async fn test_append_and_load_messages() { + let (storage, cleanup) = create_test_storage().await; + defer { cleanup(); } + + let session_meta = SessionMeta { + id: "cli_chat:sid123:dialog1".to_string(), + channel: "cli_chat".to_string(), + chat_id: "sid123".to_string(), + dialog_id: "dialog1".to_string(), + title: "测试".to_string(), + created_at: 1000, + last_active_at: 1000, + message_count: 0, + routing_info: None, + deleted_at: None, + }; + storage.upsert_session(&session_meta).await.unwrap(); + + let msg = MessageMeta { + id: "msg1".to_string(), + session_id: session_meta.id.clone(), + seq: 1, + role: "user".to_string(), + content: "你好".to_string(), + media_refs: None, + tool_call_id: None, + tool_name: None, + tool_calls: None, + created_at: 1000, + }; + + let seq = storage.append_message(&session_meta.id, &msg).await.unwrap(); + assert_eq!(seq, 1); + + let loaded = storage.load_messages(&session_meta.id, 0).await.unwrap(); + assert_eq!(loaded.len(), 1); + assert_eq!(loaded[0].content, "你好"); + } + + #[tokio::test] + async fn test_touch_session() { + let (storage, cleanup) = create_test_storage().await; + defer { cleanup(); } + + let meta = SessionMeta { + id: "cli_chat:sid123:dialog1".to_string(), + channel: "cli_chat".to_string(), + chat_id: "sid123".to_string(), + dialog_id: "dialog1".to_string(), + title: "测试".to_string(), + created_at: 1000, + last_active_at: 1000, + message_count: 0, + routing_info: None, + deleted_at: None, + }; + storage.upsert_session(&meta).await.unwrap(); + + storage.touch_session(&meta.id, 5, 2000).await.unwrap(); + + let loaded = storage.get_session(&meta.id).await.unwrap(); + assert_eq!(loaded.message_count, 5); + assert_eq!(loaded.last_active_at, 2000); + } +} +``` + +> 需要在 `Cargo.toml` 中添加 `tempfile` 依赖(已存在)。`defer` 宏可自己实现一个简单的:`fn defer(f: F) { f() }` + +**Step 2: 运行测试** + +Run: `cargo test storage::tests --lib 2>&1` +Expected: 所有 7 个测试 PASS + +**Step 3: Commit** + +```bash +git add src/storage/mod.rs +git commit -m "test(storage): 编写 Storage 单元测试" +``` + +--- + +## 汇总 + +| Task | 改动文件 | 关键交付物 | +|------|----------|-----------| +| 1 | `Cargo.toml` | sqlx 依赖 | +| 2 | `src/storage/error.rs` | StorageError | +| 3 | `src/storage/{mod.rs,session.rs,message.rs}`, `src/lib.rs` | 模块骨架 | +| 4 | `src/storage/mod.rs` | Storage 结构 + init_schema | +| 5 | `src/storage/session.rs`, `message.rs` | SessionMeta, MessageMeta | +| 6 | `src/storage/session.rs` | Session CRUD | +| 7 | `src/storage/message.rs` | Message CRUD | +| 8 | `src/storage/mod.rs` | append_message_with_retry | +| 9 | `src/storage/mod.rs` | 7 个单元测试 | + +**Phase 1 完成后:** Storage 模块可独立使用,具备完整的持久化能力,可安全地集成到 Session 和 SessionManager 中。