PicoBot/docs/plans/2026-04-28-phase1-storage-implementation.md
xiaoxixi 575f264773 docs: 添加 Phase 1 Storage 基础实现计划
9 个 Task:
1. 添加 sqlx + sqlite 依赖
2. 创建 StorageError 类型
3. 创建 storage 模块骨架
4. 实现 Storage 主结构 + init_schema
5. 定义 SessionMeta / MessageMeta
6. 实现 Session CRUD
7. 实现 Message CRUD
8. 添加写入重试逻辑
9. 编写 7 个单元测试
2026-04-28 21:46:32 +08:00

24 KiB
Raw Blame History

Phase 1: Storage 基础 实现计划

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: 创建 src/storage/ 模块,实现 SQLite 持久化层,为后续 Session 扩展提供 Storage 基础设施。

Architecture: 使用 sqlx + sqlite,通过 SqlitePool 实现异步连接池,所有 Storage 操作均为 asyncStorage 内部管理连接池的生命周期。

Tech Stack: sqlx (sqlite, tokio), serde, chrono (时间戳), tokio::time::sleep (重试退避)


Task 1: 添加依赖

Files:

  • Modify: Cargo.toml:36 (在 [dependencies] 末尾添加)

Step 1: 添加 sqlx + sqlite 依赖

Cargo.toml 末尾添加:

sqlx = { version = "0.8", features = ["sqlite", "tokio", "macros", "chrono"] }

Step 2: 运行 cargo check 验证依赖

Run: cargo check 2>&1 Expected: 无报错,依赖解析成功

Step 3: Commit

git add Cargo.toml
git commit -m "deps: 添加 sqlx + sqlite 依赖"

Task 2: 创建 Storage Error 类型

Files:

  • Create: src/storage/error.rs

Step 1: 编写 StorageError 枚举和测试

use thiserror::Error;

#[derive(Error, Debug)]
pub enum StorageError {
    #[error("session not found: {0}")]
    NotFound(String),

    #[error("session already exists: {0}")]
    AlreadyExists(String),

    #[error("database error: {0}")]
    Database(#[from] sqlx::Error),

    #[error("serialization error: {0}")]
    Serialization(String),
}

Step 2: 验证编译

Run: cargo build --lib 2>&1 | head -30 Expected: 报错 "cannot find module storage"(因为模块未创建),这是预期的

Step 3: Commit

git add src/storage/error.rs
git commit -m "feat(storage): 添加 StorageError 类型"

Task 3: 创建 Storage 模块骨架

Files:

  • Create: src/storage/mod.rs
  • Create: src/storage/session.rs
  • Create: src/storage/message.rs

Step 1: 创建 src/storage/mod.rs

pub mod error;
pub mod session;
pub mod message;

pub use error::StorageError;

Step 2: 创建 src/storage/session.rs(空壳)

// Session CRUD 操作占位符

Step 3: 创建 src/storage/message.rs(空壳)

// Message CRUD 操作占位符

Step 4: 在 src/lib.rs 中添加 storage 模块

src/lib.rs 末尾添加:

pub mod storage;

Step 5: 验证编译

Run: cargo build --lib 2>&1 Expected: 编译成功(空壳模块)

Step 6: Commit

git add src/storage/ src/lib.rs
git commit -m "feat(storage): 创建 storage 模块骨架"

Task 4: 实现 Storage 主结构

Files:

  • Modify: src/storage/mod.rs

Step 1: 编写 Storage 结构和初始化逻辑

use sqlx::{Pool, Sqlite, SqlitePool};
use std::path::Path;

pub struct Storage {
    pool: Pool<Sqlite>,
}

impl Storage {
    /// 打开或创建数据库
    pub async fn new(db_path: &Path) -> Result<Self, StorageError> {
        let database_url = format!("sqlite:{}?mode=rwc", db_path.display());
        let pool = SqlitePool::connect(&database_url).await?;

        let storage = Self { pool };
        storage.init_schema().await?;
        Ok(storage)
    }

    /// 初始化数据库 schema
    async fn init_schema(&self) -> Result<(), StorageError> {
        sqlx::query(
            r#"
            CREATE TABLE IF NOT EXISTS sessions (
                id              TEXT PRIMARY KEY,
                channel        TEXT NOT NULL,
                chat_id        TEXT NOT NULL,
                dialog_id      TEXT NOT NULL,
                title          TEXT NOT NULL DEFAULT '新对话',
                created_at     INTEGER NOT NULL,
                last_active_at INTEGER NOT NULL,
                message_count  INTEGER DEFAULT 0,
                routing_info   TEXT,
                deleted_at     INTEGER,
                UNIQUE(channel, chat_id, dialog_id)
            )
            "#,
        )
        .execute(&self.pool)
        .await?;

        sqlx::query(
            r#"
            CREATE INDEX IF NOT EXISTS idx_sessions_chat
            ON sessions(channel, chat_id, deleted_at)
            "#,
        )
        .execute(&self.pool)
        .await?;

        sqlx::query(
            r#"
            CREATE TABLE IF NOT EXISTS messages (
                id          TEXT PRIMARY KEY,
                session_id  TEXT NOT NULL,
                seq         INTEGER NOT NULL,
                role        TEXT NOT NULL,
                content     TEXT NOT NULL,
                media_refs  TEXT,
                tool_call_id TEXT,
                tool_name   TEXT,
                tool_calls  TEXT,
                created_at  INTEGER NOT NULL,
                FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE
            )
            "#,
        )
        .execute(&self.pool)
        .await?;

        sqlx::query(
            r#"
            CREATE INDEX IF NOT EXISTS idx_messages_session_seq
            ON messages(session_id, seq)
            "#,
        )
        .execute(&self.pool)
        .await?;

        Ok(())
    }

    /// 获取连接池引用(供内部 CRUD 使用)
    pub(crate) fn pool(&self) -> &Pool<Sqlite> {
        &self.pool
    }
}

Step 2: 验证编译

Run: cargo build --lib 2>&1 Expected: 编译成功

Step 3: Commit

git add src/storage/mod.rs
git commit -m "feat(storage): 实现 Storage 主结构和初始化"

Task 5: 定义 SessionMeta 和 MessageMeta 数据结构

Files:

  • Modify: src/storage/session.rs
  • Modify: src/storage/message.rs

Step 1: 在 session.rs 中定义 SessionMeta

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionMeta {
    pub id: String,
    pub channel: String,
    pub chat_id: String,
    pub dialog_id: String,
    pub title: String,
    pub created_at: i64,
    pub last_active_at: i64,
    pub message_count: i64,
    pub routing_info: Option<String>,
    pub deleted_at: Option<i64>,
}

Step 2: 在 message.rs 中定义 MessageMeta

use serde::{Deserialize, Serialize};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageMeta {
    pub id: String,
    pub session_id: String,
    pub seq: i64,
    pub role: String,
    pub content: String,
    pub media_refs: Option<String>,
    pub tool_call_id: Option<String>,
    pub tool_name: Option<String>,
    pub tool_calls: Option<String>,
    pub created_at: i64,
}

Step 3: 验证编译

Run: cargo build --lib 2>&1 Expected: 编译成功

Step 4: Commit

git add src/storage/session.rs src/storage/message.rs
git commit -m "feat(storage): 定义 SessionMeta 和 MessageMeta 数据结构"

Task 6: 实现 Session CRUD 操作

Files:

  • Modify: src/storage/session.rs

Step 1: 编写 upsert_session

use sqlx::Row;
use super::SessionMeta;

impl Storage {
    pub async fn upsert_session(&self, meta: &SessionMeta) -> Result<(), StorageError> {
        sqlx::query(
            r#"
            INSERT INTO sessions (id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            ON CONFLICT(id) DO UPDATE SET
                title = excluded.title,
                last_active_at = excluded.last_active_at,
                message_count = excluded.message_count,
                routing_info = excluded.routing_info,
                deleted_at = excluded.deleted_at
            "#,
        )
        .bind(&meta.id)
        .bind(&meta.channel)
        .bind(&meta.chat_id)
        .bind(&meta.dialog_id)
        .bind(&meta.title)
        .bind(meta.created_at)
        .bind(meta.last_active_at)
        .bind(meta.message_count)
        .bind(&meta.routing_info)
        .bind(meta.deleted_at)
        .execute(self.pool())
        .await?;

        Ok(())
    }

    pub async fn get_session(&self, id: &str) -> Result<SessionMeta, StorageError> {
        let row = sqlx::query(
            r#"
            SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at
            FROM sessions WHERE id = ? AND deleted_at IS NULL
            "#,
        )
        .bind(id)
        .fetch_optional(self.pool())
        .await?
        .ok_or_else(|| StorageError::NotFound(id.to_string()))?;

        Ok(SessionMeta {
            id: row.get("id"),
            channel: row.get("channel"),
            chat_id: row.get("chat_id"),
            dialog_id: row.get("dialog_id"),
            title: row.get("title"),
            created_at: row.get("created_at"),
            last_active_at: row.get("last_active_at"),
            message_count: row.get("message_count"),
            routing_info: row.get("routing_info"),
            deleted_at: row.get("deleted_at"),
        })
    }

    pub async fn list_sessions(
        &self,
        channel: &str,
        chat_id: &str,
        limit: i64,
    ) -> Result<Vec<SessionMeta>, StorageError> {
        let rows = sqlx::query(
            r#"
            SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at
            FROM sessions
            WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL
            ORDER BY last_active_at DESC
            LIMIT ?
            "#,
        )
        .bind(channel)
        .bind(chat_id)
        .bind(limit)
        .fetch_all(self.pool())
        .await?;

        Ok(rows
            .into_iter()
            .map(|row| SessionMeta {
                id: row.get("id"),
                channel: row.get("channel"),
                chat_id: row.get("chat_id"),
                dialog_id: row.get("dialog_id"),
                title: row.get("title"),
                created_at: row.get("created_at"),
                last_active_at: row.get("last_active_at"),
                message_count: row.get("message_count"),
                routing_info: row.get("routing_info"),
                deleted_at: row.get("deleted_at"),
            })
            .collect())
    }

    pub async fn touch_session(
        &self,
        id: &str,
        message_count: i64,
        last_active_at: i64,
    ) -> Result<(), StorageError> {
        sqlx::query(
            r#"
            UPDATE sessions SET message_count = ?, last_active_at = ?
            WHERE id = ?
            "#,
        )
        .bind(message_count)
        .bind(last_active_at)
        .bind(id)
        .execute(self.pool())
        .await?;

        Ok(())
    }

    pub async fn soft_delete_session(&self, id: &str) -> Result<(), StorageError> {
        let now = chrono::Utc::now().timestamp_millis();
        sqlx::query(
            r#"UPDATE sessions SET deleted_at = ? WHERE id = ?"#,
        )
        .bind(now)
        .bind(id)
        .execute(self.pool())
        .await?;

        Ok(())
    }

    /// 查找 channel:chat_id 下最近活跃且未过期的 session
    pub async fn find_active_session(
        &self,
        channel: &str,
        chat_id: &str,
        ttl_millis: i64,
    ) -> Result<Option<SessionMeta>, StorageError> {
        let cutoff = chrono::Utc::now().timestamp_millis() - ttl_millis;
        let row = sqlx::query(
            r#"
            SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at
            FROM sessions
            WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL AND last_active_at > ?
            ORDER BY last_active_at DESC
            LIMIT 1
            "#,
        )
        .bind(channel)
        .bind(chat_id)
        .bind(cutoff)
        .fetch_optional(self.pool())
        .await?;

        match row {
            Some(row) => Ok(Some(SessionMeta {
                id: row.get("id"),
                channel: row.get("channel"),
                chat_id: row.get("chat_id"),
                dialog_id: row.get("dialog_id"),
                title: row.get("title"),
                created_at: row.get("created_at"),
                last_active_at: row.get("last_active_at"),
                message_count: row.get("message_count"),
                routing_info: row.get("routing_info"),
                deleted_at: row.get("deleted_at"),
            })),
            None => Ok(None),
        }
    }
}

注意:Storage 的 CRUD 方法需要能访问 pool(),但目前 pool()pub(crate)。在 mod.rs 中为 session.rs 实现 Storage 的 CRUD所以同模块内可访问。

Step 2: 验证编译

Run: cargo build --lib 2>&1 Expected: 编译成功

Step 3: Commit

git add src/storage/session.rs
git commit -m "feat(storage): 实现 Session CRUD 操作"

Task 7: 实现 Message CRUD 操作

Files:

  • Modify: src/storage/message.rs

Step 1: 编写 Message CRUD

use sqlx::Row;
use super::MessageMeta;

impl Storage {
    pub async fn append_message(&self, session_id: &str, msg: &MessageMeta) -> Result<i64, StorageError> {
        sqlx::query(
            r#"
            INSERT INTO messages (id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, created_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            "#,
        )
        .bind(&msg.id)
        .bind(session_id)
        .bind(msg.seq)
        .bind(&msg.role)
        .bind(&msg.content)
        .bind(&msg.media_refs)
        .bind(&msg.tool_call_id)
        .bind(&msg.tool_name)
        .bind(&msg.tool_calls)
        .bind(msg.created_at)
        .execute(self.pool())
        .await?;

        Ok(msg.seq)
    }

    pub async fn append_messages(
        &self,
        session_id: &str,
        msgs: &[MessageMeta],
    ) -> Result<Vec<i64>, StorageError> {
        let mut seqs = Vec::with_capacity(msgs.len());
        for msg in msgs {
            let seq = self.append_message(session_id, msg).await?;
            seqs.push(seq);
        }
        Ok(seqs)
    }

    pub async fn load_messages(
        &self,
        session_id: &str,
        from_seq: i64,
    ) -> Result<Vec<MessageMeta>, StorageError> {
        let rows = sqlx::query(
            r#"
            SELECT id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, created_at
            FROM messages
            WHERE session_id = ? AND seq >= ?
            ORDER BY seq ASC
            "#,
        )
        .bind(session_id)
        .bind(from_seq)
        .fetch_all(self.pool())
        .await?;

        Ok(rows
            .into_iter()
            .map(|row| MessageMeta {
                id: row.get("id"),
                session_id: row.get("session_id"),
                seq: row.get("seq"),
                role: row.get("role"),
                content: row.get("content"),
                media_refs: row.get("media_refs"),
                tool_call_id: row.get("tool_call_id"),
                tool_name: row.get("tool_name"),
                tool_calls: row.get("tool_calls"),
                created_at: row.get("created_at"),
            })
            .collect())
    }

    pub async fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> {
        sqlx::query(r#"DELETE FROM messages WHERE session_id = ?"#)
            .bind(session_id)
            .execute(self.pool())
            .await?;
        Ok(())
    }
}

注意:同样在 mod.rs 中实现,这样 Storage 的方法对 message.rs 中的 impl 可见。

Step 2: 验证编译

Run: cargo build --lib 2>&1 Expected: 编译成功

Step 3: Commit

git add src/storage/message.rs
git commit -m "feat(storage): 实现 Message CRUD 操作"

Task 8: 实现写入重试逻辑

Files:

  • Modify: src/storage/mod.rs

Step 1: 在 Storage 中添加带重试的 append_message

mod.rsStorage impl 块中添加:

use tokio::time::{sleep, Duration};

impl Storage {
    /// 追加消息,带重试逻辑
    /// 重试 3 次100/200/300ms 退避),仍失败返回错误
    pub async fn append_message_with_retry(
        &self,
        session_id: &str,
        msg: &MessageMeta,
    ) -> Result<i64, StorageError> {
        let delays = [100, 200, 300];

        for (i, delay) in delays.iter().enumerate() {
            match self.append_message(session_id, msg).await {
                Ok(seq) => return Ok(seq),
                Err(e) if i < delays.len() - 1 => {
                    sleep(Duration::from_millis(*delay)).await;
                    tracing::warn!("Storage write failed, retrying: {}", e);
                }
                Err(e) => {
                    tracing::error!("Storage write failed after retries: {}", e);
                    return Err(e);
                }
            }
        }
        unreachable!()
    }
}

注意:需要 use sqlx::Row;mod.rs 中。

Step 2: 验证编译

Run: cargo build --lib 2>&1 Expected: 编译成功

Step 3: Commit

git add src/storage/mod.rs
git commit -m "feat(storage): 添加写入重试逻辑"

Task 9: 编写 Storage 单元测试

Files:

  • Modify: src/storage/mod.rs(添加测试模块)

Step 1: 编写 Storage 集成测试

src/storage/mod.rs 末尾添加:

#[cfg(test)]
mod tests {
    use super::*;
    use tempfile::tempdir;
    use std::path::Path;

    async fn create_test_storage() -> (Storage, impl Fn()) {
        let dir = tempdir().unwrap();
        let db_path = dir.path().join("test.db");
        let storage = Storage::new(&db_path).await.unwrap();

        let cleanup = || {
            drop(dir);
        };

        (storage, cleanup)
    }

    #[tokio::test]
    async fn test_upsert_and_get_session() {
        let (storage, cleanup) = create_test_storage().await;
        defer { cleanup(); }

        let meta = SessionMeta {
            id: "cli_chat:sid123:dialog1".to_string(),
            channel: "cli_chat".to_string(),
            chat_id: "sid123".to_string(),
            dialog_id: "dialog1".to_string(),
            title: "测试会话".to_string(),
            created_at: 1000,
            last_active_at: 1000,
            message_count: 0,
            routing_info: Some(r#"{"type":"cli"}"#.to_string()),
            deleted_at: None,
        };

        storage.upsert_session(&meta).await.unwrap();

        let loaded = storage.get_session(&meta.id).await.unwrap();
        assert_eq!(loaded.title, "测试会话");
        assert_eq!(loaded.channel, "cli_chat");
    }

    #[tokio::test]
    async fn test_get_nonexistent_session() {
        let (storage, cleanup) = create_test_storage().await;
        defer { cleanup(); }

        let result = storage.get_session("nonexistent").await;
        assert!(result.is_err());
        matches!(result.unwrap_err(), StorageError::NotFound(_));
    }

    #[tokio::test]
    async fn test_list_sessions() {
        let (storage, cleanup) = create_test_storage().await;
        defer { cleanup(); }

        for i in 0..5 {
            let meta = SessionMeta {
                id: format!("cli_chat:sid123:dialog{}", i),
                channel: "cli_chat".to_string(),
                chat_id: "sid123".to_string(),
                dialog_id: format!("dialog{}", i),
                title: format!("会话{}", i),
                created_at: i as i64 * 1000,
                last_active_at: i as i64 * 1000,
                message_count: i,
                routing_info: None,
                deleted_at: None,
            };
            storage.upsert_session(&meta).await.unwrap();
        }

        let sessions = storage.list_sessions("cli_chat", "sid123", 10).await.unwrap();
        assert_eq!(sessions.len(), 5);
        // 按 last_active_at DESC 排序
        assert_eq!(sessions[0].dialog_id, "dialog4");
    }

    #[tokio::test]
    async fn test_soft_delete() {
        let (storage, cleanup) = create_test_storage().await;
        defer { cleanup(); }

        let meta = SessionMeta {
            id: "cli_chat:sid123:dialog1".to_string(),
            channel: "cli_chat".to_string(),
            chat_id: "sid123".to_string(),
            dialog_id: "dialog1".to_string(),
            title: "测试".to_string(),
            created_at: 1000,
            last_active_at: 1000,
            message_count: 0,
            routing_info: None,
            deleted_at: None,
        };

        storage.upsert_session(&meta).await.unwrap();
        storage.soft_delete_session(&meta.id).await.unwrap();

        let result = storage.get_session(&meta.id).await;
        assert!(result.is_err());
        matches!(result.unwrap_err(), StorageError::NotFound(_));
    }

    #[tokio::test]
    async fn test_append_and_load_messages() {
        let (storage, cleanup) = create_test_storage().await;
        defer { cleanup(); }

        let session_meta = SessionMeta {
            id: "cli_chat:sid123:dialog1".to_string(),
            channel: "cli_chat".to_string(),
            chat_id: "sid123".to_string(),
            dialog_id: "dialog1".to_string(),
            title: "测试".to_string(),
            created_at: 1000,
            last_active_at: 1000,
            message_count: 0,
            routing_info: None,
            deleted_at: None,
        };
        storage.upsert_session(&session_meta).await.unwrap();

        let msg = MessageMeta {
            id: "msg1".to_string(),
            session_id: session_meta.id.clone(),
            seq: 1,
            role: "user".to_string(),
            content: "你好".to_string(),
            media_refs: None,
            tool_call_id: None,
            tool_name: None,
            tool_calls: None,
            created_at: 1000,
        };

        let seq = storage.append_message(&session_meta.id, &msg).await.unwrap();
        assert_eq!(seq, 1);

        let loaded = storage.load_messages(&session_meta.id, 0).await.unwrap();
        assert_eq!(loaded.len(), 1);
        assert_eq!(loaded[0].content, "你好");
    }

    #[tokio::test]
    async fn test_touch_session() {
        let (storage, cleanup) = create_test_storage().await;
        defer { cleanup(); }

        let meta = SessionMeta {
            id: "cli_chat:sid123:dialog1".to_string(),
            channel: "cli_chat".to_string(),
            chat_id: "sid123".to_string(),
            dialog_id: "dialog1".to_string(),
            title: "测试".to_string(),
            created_at: 1000,
            last_active_at: 1000,
            message_count: 0,
            routing_info: None,
            deleted_at: None,
        };
        storage.upsert_session(&meta).await.unwrap();

        storage.touch_session(&meta.id, 5, 2000).await.unwrap();

        let loaded = storage.get_session(&meta.id).await.unwrap();
        assert_eq!(loaded.message_count, 5);
        assert_eq!(loaded.last_active_at, 2000);
    }
}

需要在 Cargo.toml 中添加 tempfile 依赖(已存在)。defer 宏可自己实现一个简单的:fn defer<F: FnOnce()>(f: F) { f() }

Step 2: 运行测试

Run: cargo test storage::tests --lib 2>&1 Expected: 所有 7 个测试 PASS

Step 3: Commit

git add src/storage/mod.rs
git commit -m "test(storage): 编写 Storage 单元测试"

汇总

Task 改动文件 关键交付物
1 Cargo.toml sqlx 依赖
2 src/storage/error.rs StorageError
3 src/storage/{mod.rs,session.rs,message.rs}, src/lib.rs 模块骨架
4 src/storage/mod.rs Storage 结构 + init_schema
5 src/storage/session.rs, message.rs SessionMeta, MessageMeta
6 src/storage/session.rs Session CRUD
7 src/storage/message.rs Message CRUD
8 src/storage/mod.rs append_message_with_retry
9 src/storage/mod.rs 7 个单元测试

Phase 1 完成后: Storage 模块可独立使用,具备完整的持久化能力,可安全地集成到 Session 和 SessionManager 中。