PicoBot/docs/plans/2026-04-28-phase1-storage-implementation.md
xiaoxixi 575f264773 docs: 添加 Phase 1 Storage 基础实现计划
9 个 Task:
1. 添加 sqlx + sqlite 依赖
2. 创建 StorageError 类型
3. 创建 storage 模块骨架
4. 实现 Storage 主结构 + init_schema
5. 定义 SessionMeta / MessageMeta
6. 实现 Session CRUD
7. 实现 Message CRUD
8. 添加写入重试逻辑
9. 编写 7 个单元测试
2026-04-28 21:46:32 +08:00

878 lines
24 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Phase 1: Storage 基础 实现计划
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** 创建 `src/storage/` 模块,实现 SQLite 持久化层,为后续 Session 扩展提供 Storage 基础设施。
**Architecture:** 使用 `sqlx` + `sqlite`,通过 `SqlitePool` 实现异步连接池,所有 Storage 操作均为 async`Storage` 内部管理连接池的生命周期。
**Tech Stack:** `sqlx` (sqlite, tokio), `serde`, `chrono` (时间戳), `tokio::time::sleep` (重试退避)
---
## Task 1: 添加依赖
**Files:**
- Modify: `Cargo.toml:36` (在 `[dependencies]` 末尾添加)
**Step 1: 添加 sqlx + sqlite 依赖**
`Cargo.toml` 末尾添加:
```toml
sqlx = { version = "0.8", features = ["sqlite", "tokio", "macros", "chrono"] }
```
**Step 2: 运行 cargo check 验证依赖**
Run: `cargo check 2>&1`
Expected: 无报错,依赖解析成功
**Step 3: Commit**
```bash
git add Cargo.toml
git commit -m "deps: 添加 sqlx + sqlite 依赖"
```
---
## Task 2: 创建 Storage Error 类型
**Files:**
- Create: `src/storage/error.rs`
**Step 1: 编写 StorageError 枚举和测试**
```rust
use thiserror::Error;
#[derive(Error, Debug)]
pub enum StorageError {
#[error("session not found: {0}")]
NotFound(String),
#[error("session already exists: {0}")]
AlreadyExists(String),
#[error("database error: {0}")]
Database(#[from] sqlx::Error),
#[error("serialization error: {0}")]
Serialization(String),
}
```
**Step 2: 验证编译**
Run: `cargo build --lib 2>&1 | head -30`
Expected: 报错 "cannot find module `storage`"(因为模块未创建),这是预期的
**Step 3: Commit**
```bash
git add src/storage/error.rs
git commit -m "feat(storage): 添加 StorageError 类型"
```
---
## Task 3: 创建 Storage 模块骨架
**Files:**
- Create: `src/storage/mod.rs`
- Create: `src/storage/session.rs`
- Create: `src/storage/message.rs`
**Step 1: 创建 `src/storage/mod.rs`**
```rust
pub mod error;
pub mod session;
pub mod message;
pub use error::StorageError;
```
**Step 2: 创建 `src/storage/session.rs`(空壳)**
```rust
// Session CRUD 操作占位符
```
**Step 3: 创建 `src/storage/message.rs`(空壳)**
```rust
// Message CRUD 操作占位符
```
**Step 4: 在 `src/lib.rs` 中添加 storage 模块**
`src/lib.rs` 末尾添加:
```rust
pub mod storage;
```
**Step 5: 验证编译**
Run: `cargo build --lib 2>&1`
Expected: 编译成功(空壳模块)
**Step 6: Commit**
```bash
git add src/storage/ src/lib.rs
git commit -m "feat(storage): 创建 storage 模块骨架"
```
---
## Task 4: 实现 Storage 主结构
**Files:**
- Modify: `src/storage/mod.rs`
**Step 1: 编写 Storage 结构和初始化逻辑**
```rust
use sqlx::{Pool, Sqlite, SqlitePool};
use std::path::Path;
pub struct Storage {
pool: Pool<Sqlite>,
}
impl Storage {
/// 打开或创建数据库
pub async fn new(db_path: &Path) -> Result<Self, StorageError> {
let database_url = format!("sqlite:{}?mode=rwc", db_path.display());
let pool = SqlitePool::connect(&database_url).await?;
let storage = Self { pool };
storage.init_schema().await?;
Ok(storage)
}
/// 初始化数据库 schema
async fn init_schema(&self) -> Result<(), StorageError> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS sessions (
id TEXT PRIMARY KEY,
channel TEXT NOT NULL,
chat_id TEXT NOT NULL,
dialog_id TEXT NOT NULL,
title TEXT NOT NULL DEFAULT '新对话',
created_at INTEGER NOT NULL,
last_active_at INTEGER NOT NULL,
message_count INTEGER DEFAULT 0,
routing_info TEXT,
deleted_at INTEGER,
UNIQUE(channel, chat_id, dialog_id)
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_sessions_chat
ON sessions(channel, chat_id, deleted_at)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
seq INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
media_refs TEXT,
tool_call_id TEXT,
tool_name TEXT,
tool_calls TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE
)
"#,
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE INDEX IF NOT EXISTS idx_messages_session_seq
ON messages(session_id, seq)
"#,
)
.execute(&self.pool)
.await?;
Ok(())
}
/// 获取连接池引用(供内部 CRUD 使用)
pub(crate) fn pool(&self) -> &Pool<Sqlite> {
&self.pool
}
}
```
**Step 2: 验证编译**
Run: `cargo build --lib 2>&1`
Expected: 编译成功
**Step 3: Commit**
```bash
git add src/storage/mod.rs
git commit -m "feat(storage): 实现 Storage 主结构和初始化"
```
---
## Task 5: 定义 SessionMeta 和 MessageMeta 数据结构
**Files:**
- Modify: `src/storage/session.rs`
- Modify: `src/storage/message.rs`
**Step 1: 在 `session.rs` 中定义 SessionMeta**
```rust
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SessionMeta {
pub id: String,
pub channel: String,
pub chat_id: String,
pub dialog_id: String,
pub title: String,
pub created_at: i64,
pub last_active_at: i64,
pub message_count: i64,
pub routing_info: Option<String>,
pub deleted_at: Option<i64>,
}
```
**Step 2: 在 `message.rs` 中定义 MessageMeta**
```rust
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MessageMeta {
pub id: String,
pub session_id: String,
pub seq: i64,
pub role: String,
pub content: String,
pub media_refs: Option<String>,
pub tool_call_id: Option<String>,
pub tool_name: Option<String>,
pub tool_calls: Option<String>,
pub created_at: i64,
}
```
**Step 3: 验证编译**
Run: `cargo build --lib 2>&1`
Expected: 编译成功
**Step 4: Commit**
```bash
git add src/storage/session.rs src/storage/message.rs
git commit -m "feat(storage): 定义 SessionMeta 和 MessageMeta 数据结构"
```
---
## Task 6: 实现 Session CRUD 操作
**Files:**
- Modify: `src/storage/session.rs`
**Step 1: 编写 upsert_session**
```rust
use sqlx::Row;
use super::SessionMeta;
impl Storage {
pub async fn upsert_session(&self, meta: &SessionMeta) -> Result<(), StorageError> {
sqlx::query(
r#"
INSERT INTO sessions (id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
title = excluded.title,
last_active_at = excluded.last_active_at,
message_count = excluded.message_count,
routing_info = excluded.routing_info,
deleted_at = excluded.deleted_at
"#,
)
.bind(&meta.id)
.bind(&meta.channel)
.bind(&meta.chat_id)
.bind(&meta.dialog_id)
.bind(&meta.title)
.bind(meta.created_at)
.bind(meta.last_active_at)
.bind(meta.message_count)
.bind(&meta.routing_info)
.bind(meta.deleted_at)
.execute(self.pool())
.await?;
Ok(())
}
pub async fn get_session(&self, id: &str) -> Result<SessionMeta, StorageError> {
let row = sqlx::query(
r#"
SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at
FROM sessions WHERE id = ? AND deleted_at IS NULL
"#,
)
.bind(id)
.fetch_optional(self.pool())
.await?
.ok_or_else(|| StorageError::NotFound(id.to_string()))?;
Ok(SessionMeta {
id: row.get("id"),
channel: row.get("channel"),
chat_id: row.get("chat_id"),
dialog_id: row.get("dialog_id"),
title: row.get("title"),
created_at: row.get("created_at"),
last_active_at: row.get("last_active_at"),
message_count: row.get("message_count"),
routing_info: row.get("routing_info"),
deleted_at: row.get("deleted_at"),
})
}
pub async fn list_sessions(
&self,
channel: &str,
chat_id: &str,
limit: i64,
) -> Result<Vec<SessionMeta>, StorageError> {
let rows = sqlx::query(
r#"
SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at
FROM sessions
WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL
ORDER BY last_active_at DESC
LIMIT ?
"#,
)
.bind(channel)
.bind(chat_id)
.bind(limit)
.fetch_all(self.pool())
.await?;
Ok(rows
.into_iter()
.map(|row| SessionMeta {
id: row.get("id"),
channel: row.get("channel"),
chat_id: row.get("chat_id"),
dialog_id: row.get("dialog_id"),
title: row.get("title"),
created_at: row.get("created_at"),
last_active_at: row.get("last_active_at"),
message_count: row.get("message_count"),
routing_info: row.get("routing_info"),
deleted_at: row.get("deleted_at"),
})
.collect())
}
pub async fn touch_session(
&self,
id: &str,
message_count: i64,
last_active_at: i64,
) -> Result<(), StorageError> {
sqlx::query(
r#"
UPDATE sessions SET message_count = ?, last_active_at = ?
WHERE id = ?
"#,
)
.bind(message_count)
.bind(last_active_at)
.bind(id)
.execute(self.pool())
.await?;
Ok(())
}
pub async fn soft_delete_session(&self, id: &str) -> Result<(), StorageError> {
let now = chrono::Utc::now().timestamp_millis();
sqlx::query(
r#"UPDATE sessions SET deleted_at = ? WHERE id = ?"#,
)
.bind(now)
.bind(id)
.execute(self.pool())
.await?;
Ok(())
}
/// 查找 channel:chat_id 下最近活跃且未过期的 session
pub async fn find_active_session(
&self,
channel: &str,
chat_id: &str,
ttl_millis: i64,
) -> Result<Option<SessionMeta>, StorageError> {
let cutoff = chrono::Utc::now().timestamp_millis() - ttl_millis;
let row = sqlx::query(
r#"
SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at
FROM sessions
WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL AND last_active_at > ?
ORDER BY last_active_at DESC
LIMIT 1
"#,
)
.bind(channel)
.bind(chat_id)
.bind(cutoff)
.fetch_optional(self.pool())
.await?;
match row {
Some(row) => Ok(Some(SessionMeta {
id: row.get("id"),
channel: row.get("channel"),
chat_id: row.get("chat_id"),
dialog_id: row.get("dialog_id"),
title: row.get("title"),
created_at: row.get("created_at"),
last_active_at: row.get("last_active_at"),
message_count: row.get("message_count"),
routing_info: row.get("routing_info"),
deleted_at: row.get("deleted_at"),
})),
None => Ok(None),
}
}
}
```
> 注意:`Storage` 的 CRUD 方法需要能访问 `pool()`,但目前 `pool()` 是 `pub(crate)`。在 `mod.rs` 中为 `session.rs` 实现 `Storage` 的 CRUD所以同模块内可访问。
**Step 2: 验证编译**
Run: `cargo build --lib 2>&1`
Expected: 编译成功
**Step 3: Commit**
```bash
git add src/storage/session.rs
git commit -m "feat(storage): 实现 Session CRUD 操作"
```
---
## Task 7: 实现 Message CRUD 操作
**Files:**
- Modify: `src/storage/message.rs`
**Step 1: 编写 Message CRUD**
```rust
use sqlx::Row;
use super::MessageMeta;
impl Storage {
pub async fn append_message(&self, session_id: &str, msg: &MessageMeta) -> Result<i64, StorageError> {
sqlx::query(
r#"
INSERT INTO messages (id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(&msg.id)
.bind(session_id)
.bind(msg.seq)
.bind(&msg.role)
.bind(&msg.content)
.bind(&msg.media_refs)
.bind(&msg.tool_call_id)
.bind(&msg.tool_name)
.bind(&msg.tool_calls)
.bind(msg.created_at)
.execute(self.pool())
.await?;
Ok(msg.seq)
}
pub async fn append_messages(
&self,
session_id: &str,
msgs: &[MessageMeta],
) -> Result<Vec<i64>, StorageError> {
let mut seqs = Vec::with_capacity(msgs.len());
for msg in msgs {
let seq = self.append_message(session_id, msg).await?;
seqs.push(seq);
}
Ok(seqs)
}
pub async fn load_messages(
&self,
session_id: &str,
from_seq: i64,
) -> Result<Vec<MessageMeta>, StorageError> {
let rows = sqlx::query(
r#"
SELECT id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, created_at
FROM messages
WHERE session_id = ? AND seq >= ?
ORDER BY seq ASC
"#,
)
.bind(session_id)
.bind(from_seq)
.fetch_all(self.pool())
.await?;
Ok(rows
.into_iter()
.map(|row| MessageMeta {
id: row.get("id"),
session_id: row.get("session_id"),
seq: row.get("seq"),
role: row.get("role"),
content: row.get("content"),
media_refs: row.get("media_refs"),
tool_call_id: row.get("tool_call_id"),
tool_name: row.get("tool_name"),
tool_calls: row.get("tool_calls"),
created_at: row.get("created_at"),
})
.collect())
}
pub async fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> {
sqlx::query(r#"DELETE FROM messages WHERE session_id = ?"#)
.bind(session_id)
.execute(self.pool())
.await?;
Ok(())
}
}
```
> 注意:同样在 `mod.rs` 中实现,这样 `Storage` 的方法对 `message.rs` 中的 impl 可见。
**Step 2: 验证编译**
Run: `cargo build --lib 2>&1`
Expected: 编译成功
**Step 3: Commit**
```bash
git add src/storage/message.rs
git commit -m "feat(storage): 实现 Message CRUD 操作"
```
---
## Task 8: 实现写入重试逻辑
**Files:**
- Modify: `src/storage/mod.rs`
**Step 1: 在 Storage 中添加带重试的 append_message**
`mod.rs``Storage` impl 块中添加:
```rust
use tokio::time::{sleep, Duration};
impl Storage {
/// 追加消息,带重试逻辑
/// 重试 3 次100/200/300ms 退避),仍失败返回错误
pub async fn append_message_with_retry(
&self,
session_id: &str,
msg: &MessageMeta,
) -> Result<i64, StorageError> {
let delays = [100, 200, 300];
for (i, delay) in delays.iter().enumerate() {
match self.append_message(session_id, msg).await {
Ok(seq) => return Ok(seq),
Err(e) if i < delays.len() - 1 => {
sleep(Duration::from_millis(*delay)).await;
tracing::warn!("Storage write failed, retrying: {}", e);
}
Err(e) => {
tracing::error!("Storage write failed after retries: {}", e);
return Err(e);
}
}
}
unreachable!()
}
}
```
> 注意:需要 `use sqlx::Row;` 在 `mod.rs` 中。
**Step 2: 验证编译**
Run: `cargo build --lib 2>&1`
Expected: 编译成功
**Step 3: Commit**
```bash
git add src/storage/mod.rs
git commit -m "feat(storage): 添加写入重试逻辑"
```
---
## Task 9: 编写 Storage 单元测试
**Files:**
- Modify: `src/storage/mod.rs`(添加测试模块)
**Step 1: 编写 Storage 集成测试**
`src/storage/mod.rs` 末尾添加:
```rust
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
use std::path::Path;
async fn create_test_storage() -> (Storage, impl Fn()) {
let dir = tempdir().unwrap();
let db_path = dir.path().join("test.db");
let storage = Storage::new(&db_path).await.unwrap();
let cleanup = || {
drop(dir);
};
(storage, cleanup)
}
#[tokio::test]
async fn test_upsert_and_get_session() {
let (storage, cleanup) = create_test_storage().await;
defer { cleanup(); }
let meta = SessionMeta {
id: "cli_chat:sid123:dialog1".to_string(),
channel: "cli_chat".to_string(),
chat_id: "sid123".to_string(),
dialog_id: "dialog1".to_string(),
title: "测试会话".to_string(),
created_at: 1000,
last_active_at: 1000,
message_count: 0,
routing_info: Some(r#"{"type":"cli"}"#.to_string()),
deleted_at: None,
};
storage.upsert_session(&meta).await.unwrap();
let loaded = storage.get_session(&meta.id).await.unwrap();
assert_eq!(loaded.title, "测试会话");
assert_eq!(loaded.channel, "cli_chat");
}
#[tokio::test]
async fn test_get_nonexistent_session() {
let (storage, cleanup) = create_test_storage().await;
defer { cleanup(); }
let result = storage.get_session("nonexistent").await;
assert!(result.is_err());
matches!(result.unwrap_err(), StorageError::NotFound(_));
}
#[tokio::test]
async fn test_list_sessions() {
let (storage, cleanup) = create_test_storage().await;
defer { cleanup(); }
for i in 0..5 {
let meta = SessionMeta {
id: format!("cli_chat:sid123:dialog{}", i),
channel: "cli_chat".to_string(),
chat_id: "sid123".to_string(),
dialog_id: format!("dialog{}", i),
title: format!("会话{}", i),
created_at: i as i64 * 1000,
last_active_at: i as i64 * 1000,
message_count: i,
routing_info: None,
deleted_at: None,
};
storage.upsert_session(&meta).await.unwrap();
}
let sessions = storage.list_sessions("cli_chat", "sid123", 10).await.unwrap();
assert_eq!(sessions.len(), 5);
// 按 last_active_at DESC 排序
assert_eq!(sessions[0].dialog_id, "dialog4");
}
#[tokio::test]
async fn test_soft_delete() {
let (storage, cleanup) = create_test_storage().await;
defer { cleanup(); }
let meta = SessionMeta {
id: "cli_chat:sid123:dialog1".to_string(),
channel: "cli_chat".to_string(),
chat_id: "sid123".to_string(),
dialog_id: "dialog1".to_string(),
title: "测试".to_string(),
created_at: 1000,
last_active_at: 1000,
message_count: 0,
routing_info: None,
deleted_at: None,
};
storage.upsert_session(&meta).await.unwrap();
storage.soft_delete_session(&meta.id).await.unwrap();
let result = storage.get_session(&meta.id).await;
assert!(result.is_err());
matches!(result.unwrap_err(), StorageError::NotFound(_));
}
#[tokio::test]
async fn test_append_and_load_messages() {
let (storage, cleanup) = create_test_storage().await;
defer { cleanup(); }
let session_meta = SessionMeta {
id: "cli_chat:sid123:dialog1".to_string(),
channel: "cli_chat".to_string(),
chat_id: "sid123".to_string(),
dialog_id: "dialog1".to_string(),
title: "测试".to_string(),
created_at: 1000,
last_active_at: 1000,
message_count: 0,
routing_info: None,
deleted_at: None,
};
storage.upsert_session(&session_meta).await.unwrap();
let msg = MessageMeta {
id: "msg1".to_string(),
session_id: session_meta.id.clone(),
seq: 1,
role: "user".to_string(),
content: "你好".to_string(),
media_refs: None,
tool_call_id: None,
tool_name: None,
tool_calls: None,
created_at: 1000,
};
let seq = storage.append_message(&session_meta.id, &msg).await.unwrap();
assert_eq!(seq, 1);
let loaded = storage.load_messages(&session_meta.id, 0).await.unwrap();
assert_eq!(loaded.len(), 1);
assert_eq!(loaded[0].content, "你好");
}
#[tokio::test]
async fn test_touch_session() {
let (storage, cleanup) = create_test_storage().await;
defer { cleanup(); }
let meta = SessionMeta {
id: "cli_chat:sid123:dialog1".to_string(),
channel: "cli_chat".to_string(),
chat_id: "sid123".to_string(),
dialog_id: "dialog1".to_string(),
title: "测试".to_string(),
created_at: 1000,
last_active_at: 1000,
message_count: 0,
routing_info: None,
deleted_at: None,
};
storage.upsert_session(&meta).await.unwrap();
storage.touch_session(&meta.id, 5, 2000).await.unwrap();
let loaded = storage.get_session(&meta.id).await.unwrap();
assert_eq!(loaded.message_count, 5);
assert_eq!(loaded.last_active_at, 2000);
}
}
```
> 需要在 `Cargo.toml` 中添加 `tempfile` 依赖(已存在)。`defer` 宏可自己实现一个简单的:`fn defer<F: FnOnce()>(f: F) { f() }`
**Step 2: 运行测试**
Run: `cargo test storage::tests --lib 2>&1`
Expected: 所有 7 个测试 PASS
**Step 3: Commit**
```bash
git add src/storage/mod.rs
git commit -m "test(storage): 编写 Storage 单元测试"
```
---
## 汇总
| Task | 改动文件 | 关键交付物 |
|------|----------|-----------|
| 1 | `Cargo.toml` | sqlx 依赖 |
| 2 | `src/storage/error.rs` | StorageError |
| 3 | `src/storage/{mod.rs,session.rs,message.rs}`, `src/lib.rs` | 模块骨架 |
| 4 | `src/storage/mod.rs` | Storage 结构 + init_schema |
| 5 | `src/storage/session.rs`, `message.rs` | SessionMeta, MessageMeta |
| 6 | `src/storage/session.rs` | Session CRUD |
| 7 | `src/storage/message.rs` | Message CRUD |
| 8 | `src/storage/mod.rs` | append_message_with_retry |
| 9 | `src/storage/mod.rs` | 7 个单元测试 |
**Phase 1 完成后:** Storage 模块可独立使用,具备完整的持久化能力,可安全地集成到 Session 和 SessionManager 中。