# Phase 1: Storage 基础 实现计划 > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** 创建 `src/storage/` 模块,实现 SQLite 持久化层,为后续 Session 扩展提供 Storage 基础设施。 **Architecture:** 使用 `sqlx` + `sqlite`,通过 `SqlitePool` 实现异步连接池,所有 Storage 操作均为 async,在 `Storage` 内部管理连接池的生命周期。 **Tech Stack:** `sqlx` (sqlite, tokio), `serde`, `chrono` (时间戳), `tokio::time::sleep` (重试退避) --- ## Task 1: 添加依赖 **Files:** - Modify: `Cargo.toml:36` (在 `[dependencies]` 末尾添加) **Step 1: 添加 sqlx + sqlite 依赖** 在 `Cargo.toml` 末尾添加: ```toml sqlx = { version = "0.8", features = ["sqlite", "tokio", "macros", "chrono"] } ``` **Step 2: 运行 cargo check 验证依赖** Run: `cargo check 2>&1` Expected: 无报错,依赖解析成功 **Step 3: Commit** ```bash git add Cargo.toml git commit -m "deps: 添加 sqlx + sqlite 依赖" ``` --- ## Task 2: 创建 Storage Error 类型 **Files:** - Create: `src/storage/error.rs` **Step 1: 编写 StorageError 枚举和测试** ```rust use thiserror::Error; #[derive(Error, Debug)] pub enum StorageError { #[error("session not found: {0}")] NotFound(String), #[error("session already exists: {0}")] AlreadyExists(String), #[error("database error: {0}")] Database(#[from] sqlx::Error), #[error("serialization error: {0}")] Serialization(String), } ``` **Step 2: 验证编译** Run: `cargo build --lib 2>&1 | head -30` Expected: 报错 "cannot find module `storage`"(因为模块未创建),这是预期的 **Step 3: Commit** ```bash git add src/storage/error.rs git commit -m "feat(storage): 添加 StorageError 类型" ``` --- ## Task 3: 创建 Storage 模块骨架 **Files:** - Create: `src/storage/mod.rs` - Create: `src/storage/session.rs` - Create: `src/storage/message.rs` **Step 1: 创建 `src/storage/mod.rs`** ```rust pub mod error; pub mod session; pub mod message; pub use error::StorageError; ``` **Step 2: 创建 `src/storage/session.rs`(空壳)** ```rust // Session CRUD 操作占位符 ``` **Step 3: 创建 `src/storage/message.rs`(空壳)** ```rust // Message CRUD 操作占位符 ``` **Step 4: 在 `src/lib.rs` 中添加 storage 模块** 在 `src/lib.rs` 末尾添加: ```rust pub mod storage; ``` **Step 5: 验证编译** Run: `cargo build --lib 2>&1` Expected: 编译成功(空壳模块) **Step 6: Commit** ```bash git add src/storage/ src/lib.rs git commit -m "feat(storage): 创建 storage 模块骨架" ``` --- ## Task 4: 实现 Storage 主结构 **Files:** - Modify: `src/storage/mod.rs` **Step 1: 编写 Storage 结构和初始化逻辑** ```rust use sqlx::{Pool, Sqlite, SqlitePool}; use std::path::Path; pub struct Storage { pool: Pool, } impl Storage { /// 打开或创建数据库 pub async fn new(db_path: &Path) -> Result { let database_url = format!("sqlite:{}?mode=rwc", db_path.display()); let pool = SqlitePool::connect(&database_url).await?; let storage = Self { pool }; storage.init_schema().await?; Ok(storage) } /// 初始化数据库 schema async fn init_schema(&self) -> Result<(), StorageError> { sqlx::query( r#" CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, channel TEXT NOT NULL, chat_id TEXT NOT NULL, dialog_id TEXT NOT NULL, title TEXT NOT NULL DEFAULT '新对话', created_at INTEGER NOT NULL, last_active_at INTEGER NOT NULL, message_count INTEGER DEFAULT 0, routing_info TEXT, deleted_at INTEGER, UNIQUE(channel, chat_id, dialog_id) ) "#, ) .execute(&self.pool) .await?; sqlx::query( r#" CREATE INDEX IF NOT EXISTS idx_sessions_chat ON sessions(channel, chat_id, deleted_at) "#, ) .execute(&self.pool) .await?; sqlx::query( r#" CREATE TABLE IF NOT EXISTS messages ( id TEXT PRIMARY KEY, session_id TEXT NOT NULL, seq INTEGER NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, media_refs TEXT, tool_call_id TEXT, tool_name TEXT, tool_calls TEXT, created_at INTEGER NOT NULL, FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE ) "#, ) .execute(&self.pool) .await?; sqlx::query( r#" CREATE INDEX IF NOT EXISTS idx_messages_session_seq ON messages(session_id, seq) "#, ) .execute(&self.pool) .await?; Ok(()) } /// 获取连接池引用(供内部 CRUD 使用) pub(crate) fn pool(&self) -> &Pool { &self.pool } } ``` **Step 2: 验证编译** Run: `cargo build --lib 2>&1` Expected: 编译成功 **Step 3: Commit** ```bash git add src/storage/mod.rs git commit -m "feat(storage): 实现 Storage 主结构和初始化" ``` --- ## Task 5: 定义 SessionMeta 和 MessageMeta 数据结构 **Files:** - Modify: `src/storage/session.rs` - Modify: `src/storage/message.rs` **Step 1: 在 `session.rs` 中定义 SessionMeta** ```rust use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SessionMeta { pub id: String, pub channel: String, pub chat_id: String, pub dialog_id: String, pub title: String, pub created_at: i64, pub last_active_at: i64, pub message_count: i64, pub routing_info: Option, pub deleted_at: Option, } ``` **Step 2: 在 `message.rs` 中定义 MessageMeta** ```rust use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct MessageMeta { pub id: String, pub session_id: String, pub seq: i64, pub role: String, pub content: String, pub media_refs: Option, pub tool_call_id: Option, pub tool_name: Option, pub tool_calls: Option, pub created_at: i64, } ``` **Step 3: 验证编译** Run: `cargo build --lib 2>&1` Expected: 编译成功 **Step 4: Commit** ```bash git add src/storage/session.rs src/storage/message.rs git commit -m "feat(storage): 定义 SessionMeta 和 MessageMeta 数据结构" ``` --- ## Task 6: 实现 Session CRUD 操作 **Files:** - Modify: `src/storage/session.rs` **Step 1: 编写 upsert_session** ```rust use sqlx::Row; use super::SessionMeta; impl Storage { pub async fn upsert_session(&self, meta: &SessionMeta) -> Result<(), StorageError> { sqlx::query( r#" INSERT INTO sessions (id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET title = excluded.title, last_active_at = excluded.last_active_at, message_count = excluded.message_count, routing_info = excluded.routing_info, deleted_at = excluded.deleted_at "#, ) .bind(&meta.id) .bind(&meta.channel) .bind(&meta.chat_id) .bind(&meta.dialog_id) .bind(&meta.title) .bind(meta.created_at) .bind(meta.last_active_at) .bind(meta.message_count) .bind(&meta.routing_info) .bind(meta.deleted_at) .execute(self.pool()) .await?; Ok(()) } pub async fn get_session(&self, id: &str) -> Result { let row = sqlx::query( r#" SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at FROM sessions WHERE id = ? AND deleted_at IS NULL "#, ) .bind(id) .fetch_optional(self.pool()) .await? .ok_or_else(|| StorageError::NotFound(id.to_string()))?; Ok(SessionMeta { id: row.get("id"), channel: row.get("channel"), chat_id: row.get("chat_id"), dialog_id: row.get("dialog_id"), title: row.get("title"), created_at: row.get("created_at"), last_active_at: row.get("last_active_at"), message_count: row.get("message_count"), routing_info: row.get("routing_info"), deleted_at: row.get("deleted_at"), }) } pub async fn list_sessions( &self, channel: &str, chat_id: &str, limit: i64, ) -> Result, StorageError> { let rows = sqlx::query( r#" SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at FROM sessions WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL ORDER BY last_active_at DESC LIMIT ? "#, ) .bind(channel) .bind(chat_id) .bind(limit) .fetch_all(self.pool()) .await?; Ok(rows .into_iter() .map(|row| SessionMeta { id: row.get("id"), channel: row.get("channel"), chat_id: row.get("chat_id"), dialog_id: row.get("dialog_id"), title: row.get("title"), created_at: row.get("created_at"), last_active_at: row.get("last_active_at"), message_count: row.get("message_count"), routing_info: row.get("routing_info"), deleted_at: row.get("deleted_at"), }) .collect()) } pub async fn touch_session( &self, id: &str, message_count: i64, last_active_at: i64, ) -> Result<(), StorageError> { sqlx::query( r#" UPDATE sessions SET message_count = ?, last_active_at = ? WHERE id = ? "#, ) .bind(message_count) .bind(last_active_at) .bind(id) .execute(self.pool()) .await?; Ok(()) } pub async fn soft_delete_session(&self, id: &str) -> Result<(), StorageError> { let now = chrono::Utc::now().timestamp_millis(); sqlx::query( r#"UPDATE sessions SET deleted_at = ? WHERE id = ?"#, ) .bind(now) .bind(id) .execute(self.pool()) .await?; Ok(()) } /// 查找 channel:chat_id 下最近活跃且未过期的 session pub async fn find_active_session( &self, channel: &str, chat_id: &str, ttl_millis: i64, ) -> Result, StorageError> { let cutoff = chrono::Utc::now().timestamp_millis() - ttl_millis; let row = sqlx::query( r#" SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at FROM sessions WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL AND last_active_at > ? ORDER BY last_active_at DESC LIMIT 1 "#, ) .bind(channel) .bind(chat_id) .bind(cutoff) .fetch_optional(self.pool()) .await?; match row { Some(row) => Ok(Some(SessionMeta { id: row.get("id"), channel: row.get("channel"), chat_id: row.get("chat_id"), dialog_id: row.get("dialog_id"), title: row.get("title"), created_at: row.get("created_at"), last_active_at: row.get("last_active_at"), message_count: row.get("message_count"), routing_info: row.get("routing_info"), deleted_at: row.get("deleted_at"), })), None => Ok(None), } } } ``` > 注意:`Storage` 的 CRUD 方法需要能访问 `pool()`,但目前 `pool()` 是 `pub(crate)`。在 `mod.rs` 中为 `session.rs` 实现 `Storage` 的 CRUD,所以同模块内可访问。 **Step 2: 验证编译** Run: `cargo build --lib 2>&1` Expected: 编译成功 **Step 3: Commit** ```bash git add src/storage/session.rs git commit -m "feat(storage): 实现 Session CRUD 操作" ``` --- ## Task 7: 实现 Message CRUD 操作 **Files:** - Modify: `src/storage/message.rs` **Step 1: 编写 Message CRUD** ```rust use sqlx::Row; use super::MessageMeta; impl Storage { pub async fn append_message(&self, session_id: &str, msg: &MessageMeta) -> Result { sqlx::query( r#" INSERT INTO messages (id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, created_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) "#, ) .bind(&msg.id) .bind(session_id) .bind(msg.seq) .bind(&msg.role) .bind(&msg.content) .bind(&msg.media_refs) .bind(&msg.tool_call_id) .bind(&msg.tool_name) .bind(&msg.tool_calls) .bind(msg.created_at) .execute(self.pool()) .await?; Ok(msg.seq) } pub async fn append_messages( &self, session_id: &str, msgs: &[MessageMeta], ) -> Result, StorageError> { let mut seqs = Vec::with_capacity(msgs.len()); for msg in msgs { let seq = self.append_message(session_id, msg).await?; seqs.push(seq); } Ok(seqs) } pub async fn load_messages( &self, session_id: &str, from_seq: i64, ) -> Result, StorageError> { let rows = sqlx::query( r#" SELECT id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, created_at FROM messages WHERE session_id = ? AND seq >= ? ORDER BY seq ASC "#, ) .bind(session_id) .bind(from_seq) .fetch_all(self.pool()) .await?; Ok(rows .into_iter() .map(|row| MessageMeta { id: row.get("id"), session_id: row.get("session_id"), seq: row.get("seq"), role: row.get("role"), content: row.get("content"), media_refs: row.get("media_refs"), tool_call_id: row.get("tool_call_id"), tool_name: row.get("tool_name"), tool_calls: row.get("tool_calls"), created_at: row.get("created_at"), }) .collect()) } pub async fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> { sqlx::query(r#"DELETE FROM messages WHERE session_id = ?"#) .bind(session_id) .execute(self.pool()) .await?; Ok(()) } } ``` > 注意:同样在 `mod.rs` 中实现,这样 `Storage` 的方法对 `message.rs` 中的 impl 可见。 **Step 2: 验证编译** Run: `cargo build --lib 2>&1` Expected: 编译成功 **Step 3: Commit** ```bash git add src/storage/message.rs git commit -m "feat(storage): 实现 Message CRUD 操作" ``` --- ## Task 8: 实现写入重试逻辑 **Files:** - Modify: `src/storage/mod.rs` **Step 1: 在 Storage 中添加带重试的 append_message** 在 `mod.rs` 的 `Storage` impl 块中添加: ```rust use tokio::time::{sleep, Duration}; impl Storage { /// 追加消息,带重试逻辑 /// 重试 3 次(100/200/300ms 退避),仍失败返回错误 pub async fn append_message_with_retry( &self, session_id: &str, msg: &MessageMeta, ) -> Result { let delays = [100, 200, 300]; for (i, delay) in delays.iter().enumerate() { match self.append_message(session_id, msg).await { Ok(seq) => return Ok(seq), Err(e) if i < delays.len() - 1 => { sleep(Duration::from_millis(*delay)).await; tracing::warn!("Storage write failed, retrying: {}", e); } Err(e) => { tracing::error!("Storage write failed after retries: {}", e); return Err(e); } } } unreachable!() } } ``` > 注意:需要 `use sqlx::Row;` 在 `mod.rs` 中。 **Step 2: 验证编译** Run: `cargo build --lib 2>&1` Expected: 编译成功 **Step 3: Commit** ```bash git add src/storage/mod.rs git commit -m "feat(storage): 添加写入重试逻辑" ``` --- ## Task 9: 编写 Storage 单元测试 **Files:** - Modify: `src/storage/mod.rs`(添加测试模块) **Step 1: 编写 Storage 集成测试** 在 `src/storage/mod.rs` 末尾添加: ```rust #[cfg(test)] mod tests { use super::*; use tempfile::tempdir; use std::path::Path; async fn create_test_storage() -> (Storage, impl Fn()) { let dir = tempdir().unwrap(); let db_path = dir.path().join("test.db"); let storage = Storage::new(&db_path).await.unwrap(); let cleanup = || { drop(dir); }; (storage, cleanup) } #[tokio::test] async fn test_upsert_and_get_session() { let (storage, cleanup) = create_test_storage().await; defer { cleanup(); } let meta = SessionMeta { id: "cli_chat:sid123:dialog1".to_string(), channel: "cli_chat".to_string(), chat_id: "sid123".to_string(), dialog_id: "dialog1".to_string(), title: "测试会话".to_string(), created_at: 1000, last_active_at: 1000, message_count: 0, routing_info: Some(r#"{"type":"cli"}"#.to_string()), deleted_at: None, }; storage.upsert_session(&meta).await.unwrap(); let loaded = storage.get_session(&meta.id).await.unwrap(); assert_eq!(loaded.title, "测试会话"); assert_eq!(loaded.channel, "cli_chat"); } #[tokio::test] async fn test_get_nonexistent_session() { let (storage, cleanup) = create_test_storage().await; defer { cleanup(); } let result = storage.get_session("nonexistent").await; assert!(result.is_err()); matches!(result.unwrap_err(), StorageError::NotFound(_)); } #[tokio::test] async fn test_list_sessions() { let (storage, cleanup) = create_test_storage().await; defer { cleanup(); } for i in 0..5 { let meta = SessionMeta { id: format!("cli_chat:sid123:dialog{}", i), channel: "cli_chat".to_string(), chat_id: "sid123".to_string(), dialog_id: format!("dialog{}", i), title: format!("会话{}", i), created_at: i as i64 * 1000, last_active_at: i as i64 * 1000, message_count: i, routing_info: None, deleted_at: None, }; storage.upsert_session(&meta).await.unwrap(); } let sessions = storage.list_sessions("cli_chat", "sid123", 10).await.unwrap(); assert_eq!(sessions.len(), 5); // 按 last_active_at DESC 排序 assert_eq!(sessions[0].dialog_id, "dialog4"); } #[tokio::test] async fn test_soft_delete() { let (storage, cleanup) = create_test_storage().await; defer { cleanup(); } let meta = SessionMeta { id: "cli_chat:sid123:dialog1".to_string(), channel: "cli_chat".to_string(), chat_id: "sid123".to_string(), dialog_id: "dialog1".to_string(), title: "测试".to_string(), created_at: 1000, last_active_at: 1000, message_count: 0, routing_info: None, deleted_at: None, }; storage.upsert_session(&meta).await.unwrap(); storage.soft_delete_session(&meta.id).await.unwrap(); let result = storage.get_session(&meta.id).await; assert!(result.is_err()); matches!(result.unwrap_err(), StorageError::NotFound(_)); } #[tokio::test] async fn test_append_and_load_messages() { let (storage, cleanup) = create_test_storage().await; defer { cleanup(); } let session_meta = SessionMeta { id: "cli_chat:sid123:dialog1".to_string(), channel: "cli_chat".to_string(), chat_id: "sid123".to_string(), dialog_id: "dialog1".to_string(), title: "测试".to_string(), created_at: 1000, last_active_at: 1000, message_count: 0, routing_info: None, deleted_at: None, }; storage.upsert_session(&session_meta).await.unwrap(); let msg = MessageMeta { id: "msg1".to_string(), session_id: session_meta.id.clone(), seq: 1, role: "user".to_string(), content: "你好".to_string(), media_refs: None, tool_call_id: None, tool_name: None, tool_calls: None, created_at: 1000, }; let seq = storage.append_message(&session_meta.id, &msg).await.unwrap(); assert_eq!(seq, 1); let loaded = storage.load_messages(&session_meta.id, 0).await.unwrap(); assert_eq!(loaded.len(), 1); assert_eq!(loaded[0].content, "你好"); } #[tokio::test] async fn test_touch_session() { let (storage, cleanup) = create_test_storage().await; defer { cleanup(); } let meta = SessionMeta { id: "cli_chat:sid123:dialog1".to_string(), channel: "cli_chat".to_string(), chat_id: "sid123".to_string(), dialog_id: "dialog1".to_string(), title: "测试".to_string(), created_at: 1000, last_active_at: 1000, message_count: 0, routing_info: None, deleted_at: None, }; storage.upsert_session(&meta).await.unwrap(); storage.touch_session(&meta.id, 5, 2000).await.unwrap(); let loaded = storage.get_session(&meta.id).await.unwrap(); assert_eq!(loaded.message_count, 5); assert_eq!(loaded.last_active_at, 2000); } } ``` > 需要在 `Cargo.toml` 中添加 `tempfile` 依赖(已存在)。`defer` 宏可自己实现一个简单的:`fn defer(f: F) { f() }` **Step 2: 运行测试** Run: `cargo test storage::tests --lib 2>&1` Expected: 所有 7 个测试 PASS **Step 3: Commit** ```bash git add src/storage/mod.rs git commit -m "test(storage): 编写 Storage 单元测试" ``` --- ## 汇总 | Task | 改动文件 | 关键交付物 | |------|----------|-----------| | 1 | `Cargo.toml` | sqlx 依赖 | | 2 | `src/storage/error.rs` | StorageError | | 3 | `src/storage/{mod.rs,session.rs,message.rs}`, `src/lib.rs` | 模块骨架 | | 4 | `src/storage/mod.rs` | Storage 结构 + init_schema | | 5 | `src/storage/session.rs`, `message.rs` | SessionMeta, MessageMeta | | 6 | `src/storage/session.rs` | Session CRUD | | 7 | `src/storage/message.rs` | Message CRUD | | 8 | `src/storage/mod.rs` | append_message_with_retry | | 9 | `src/storage/mod.rs` | 7 个单元测试 | **Phase 1 完成后:** Storage 模块可独立使用,具备完整的持久化能力,可安全地集成到 Session 和 SessionManager 中。