PicoBot/docs/plans/2026-05-10-incremental-session-recovery.md

675 lines
19 KiB
Markdown
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 启动增量恢复 Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** PicoBot 重启后不再全量加载 messages 表,而是基于 `last_compressed_message_at` 标记增量恢复,用 Timeline 替代已压缩部分。
**Architecture:**`sessions` 表加 `last_compressed_message_at` 列,`compress_if_needed` 返回值增加 `created_timelines` 标志,恢复时按时间戳增量加载消息并用近 3 条 Timeline 前置,`seq_counter` 统一从 SQLite 查 MAX(seq)。
**Tech Stack:** Rust, sqlx (SQLite), tokio
---
### Task 1: SessionMeta 和数据库 DDL 加列
**Files:**
- Modify: `src/storage/session.rs:15`
- Modify: `src/storage/mod.rs:44-45` (DDL), `:172-180` (migration)
- Modify: `src/storage/mod.rs:317-326` (upsert_session SQL + ON CONFLICT)
- Modify: `src/storage/mod.rs:345-369` (get_session SELECT + struct)
- Modify: `src/storage/mod.rs:380-406`, `:454-479`, `:564-588`, `:728`, `:754` (list_sessions 及测试 mock)
**Step 1: 在 `src/storage/session.rs` SessionMeta 加字段**
`last_consolidated_at: Option<i64>` 后加一行:
```rust
pub last_compressed_message_at: Option<i64>,
```
**Step 2: DDL schema 加列**
`src/storage/mod.rs` 的 CREATE TABLE sessions 中 (line 44)`last_consolidated_at INTEGER` 后加逗号和:
```sql
last_compressed_message_at INTEGER
```
**Step 3: migration 加列**
`src/storage/mod.rs` line 182 之后(现有 migration 的 `); .ok();` 之后),添加新 migration
```rust
// Migration: add last_compressed_message_at column if not exists
sqlx::query(
r#"ALTER TABLE sessions ADD COLUMN last_compressed_message_at INTEGER"#,
)
.execute(&self.pool)
.await
.ok();
```
**Step 4: upsert_session SQL 加列**
`src/storage/mod.rs` line 317: INSERT 列列表加 `last_compressed_message_at`VALUES 加 `?`ON CONFLICT DO UPDATE SET 加 `last_compressed_message_at = excluded.last_compressed_message_at`。line 338 后加 `.bind(meta.last_compressed_message_at)`
**Step 5: get_session SELECT 加列**
`src/storage/mod.rs` line 348: SELECT 列加 `last_compressed_message_at`。line 368 后加:
```rust
last_compressed_message_at: row.get("last_compressed_message_at"),
```
**Step 6: 其他 SELECT sessions 的地方list_sessions 多个变体)**
同样补 `last_compressed_message_at` 到 SELECT 列和 struct 构造。以及测试中的 mock SessionMeta 构造line 728, 754 等)。
**Step 7: 编译检查**
```bash
cargo check 2>&1
```
**Step 8: Commit**
```bash
git add src/storage/session.rs src/storage/mod.rs
git commit -m "feat(storage): add last_compressed_message_at column to sessions"
```
---
### Task 2: Storage 新增加载方法
**Files:**
- Modify: `src/storage/mod.rs` (在 load_messages 之后)
- Modify: `src/storage/memory.rs` (在 cleanup_old_timelines 之后)
**Step 1: `get_max_message_seq`**
`src/storage/mod.rs``load_messages` 函数后面添加:
```rust
pub async fn get_max_message_seq(&self, session_id: &str) -> Result<i64, StorageError> {
let row = sqlx::query(
"SELECT COALESCE(MAX(seq), 0) as max_seq FROM messages WHERE session_id = ?",
)
.bind(session_id)
.fetch_one(self.pool())
.await?;
Ok(row.get::<i64, _>("max_seq"))
}
```
**Step 2: `load_messages_after_timestamp`**
```rust
pub async fn load_messages_after_timestamp(
&self,
session_id: &str,
after_ts: i64,
) -> Result<Vec<crate::storage::message::MessageMeta>, StorageError> {
let rows = sqlx::query(
r#"
SELECT id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, source, created_at
FROM messages
WHERE session_id = ? AND created_at > ?
ORDER BY seq ASC
"#,
)
.bind(session_id)
.bind(after_ts)
.fetch_all(self.pool())
.await?;
Ok(rows.into_iter().map(|row| crate::storage::message::MessageMeta {
id: row.get("id"),
session_id: row.get("session_id"),
seq: row.get("seq"),
role: row.get("role"),
content: row.get("content"),
media_refs: row.get("media_refs"),
tool_call_id: row.get("tool_call_id"),
tool_name: row.get("tool_name"),
tool_calls: row.get("tool_calls"),
source: row.get("source"),
created_at: row.get("created_at"),
}).collect())
}
```
**Step 3: `load_session_timelines`**
`src/storage/memory.rs``cleanup_old_timelines` 之后line 252 的 `}` 之前)添加:
```rust
pub async fn load_session_timelines(
&self,
session_id: &str,
limit: usize,
) -> Result<Vec<MemoryEntry>, StorageError> {
let rows = sqlx::query(
r#"
SELECT id, key, content, category, importance,
session_id, created_at, updated_at
FROM memories
WHERE session_id = ? AND category = 'timeline'
ORDER BY created_at DESC
LIMIT ?
"#,
)
.bind(session_id)
.bind(limit as i64)
.fetch_all(self.pool())
.await?;
parse_memory_rows(&rows)
}
```
**Step 4: 编译检查**
```bash
cargo check 2>&1
```
**Step 5: Commit**
```bash
git add src/storage/mod.rs src/storage/memory.rs
git commit -m "feat(storage): add load_messages_after_timestamp, load_session_timelines, get_max_message_seq"
```
---
### Task 3: ContextCompressor 引入 CompressionResult
**Files:**
- Modify: `src/agent/context_compressor.rs:172-274` (compress_if_needed)
- Modify: `src/agent/context_compressor.rs:320-402` (compress_once)
**Step 1: 定义 CompressionResult**
在 context_compressor.rs 中 `ContextCompressor` struct 定义之后添加:
```rust
pub struct CompressionResult {
pub history: Vec<ChatMessage>,
pub created_timelines: bool,
}
```
**Step 2: 修改 compress_if_needed 签名和返回**
`pub async fn compress_if_needed(&self, mut history: Vec<ChatMessage>) -> Result<Vec<ChatMessage>, AgentError>` 改为:
```rust
pub async fn compress_if_needed(
&self,
mut history: Vec<ChatMessage>,
) -> Result<CompressionResult, AgentError> {
```
内部的 `return Ok(history)` 改为 `return Ok(CompressionResult { history, created_timelines: false })`Tier 1 fast trim 和不需要压缩时)。
**Step 3: 修改 LLM summarization pass 部分**
在压缩循环中维护一个 `created_timelines` 标志:
```rust
let mut created_timelines = false;
for pass in 0..self.config.max_passes {
// ...
match self.compress_once(...).await {
Ok(Some(compressed)) => {
current_history = compressed;
created_timelines = true;
}
// ...
}
}
```
最后返回:
```rust
Ok(CompressionResult { history: current_history, created_timelines })
```
**Step 4: 更新所有 compress_if_needed 调用方**
所有 `compress_if_needed(history)` 改为 `compress_if_needed(history).await?.history`。在 `handle_message``/compact` 中还需要用到 `created_timelines`
**Step 5: 编译检查**
```bash
cargo check 2>&1
```
**Step 6: Commit**
```bash
git add src/agent/context_compressor.rs src/session/session.rs
git commit -m "feat(compressor): return CompressionResult with created_timelines flag"
```
---
### Task 4: Session 结构体和持久化
**Files:**
- Modify: `src/session/session.rs:52-74` (Session struct)
- Modify: `src/session/session.rs:76-121` (Session::new)
- Modify: `src/session/session.rs:298-320` (persist_session_meta)
**Step 1: Session struct 加字段**
`pub last_consolidated_at: Option<i64>` 后加:
```rust
pub last_compressed_message_at: Option<i64>,
```
**Step 2: Session::new 初始化**
`last_consolidated_at: None` 后加:
```rust
last_compressed_message_at: None,
```
**Step 3: persist_session_meta 加字段**
`last_consolidated_at: self.last_consolidated_at` 后加:
```rust
last_compressed_message_at: self.last_compressed_message_at,
```
**Step 4: 编译检查**
```bash
cargo check 2>&1
```
**Step 5: Commit**
```bash
git add src/session/session.rs
git commit -m "feat(session): add last_compressed_message_at field to Session and persist"
```
---
### Task 5: Session::from_storage() 增量恢复
**Files:**
- Modify: `src/session/session.rs:124-189` (from_storage)
**Step 1: 重写 from_storage**
替换现有实现为:
```rust
pub async fn from_storage(
id: UnifiedSessionId,
provider_config: LLMProviderConfig,
tools: Arc<ToolRegistry>,
storage: StdArc<Storage>,
memory_manager: Arc<crate::memory::MemoryManager>,
) -> Result<Self, AgentError> {
let session_meta = storage.get_session(&id.to_string()).await
.map_err(|e| AgentError::Other(format!("failed to load session from storage: {}", e)))?;
let mut provider_box = create_provider(provider_config.clone())
.map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?;
provider_box.set_storage(storage.clone());
let provider: Arc<dyn LLMProvider> = Arc::from(provider_box);
let compressor_config = ContextCompressionConfig {
protect_first_n: 2,
..Default::default()
};
let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone());
compressor.set_session_id(Some(id.to_string()));
// Determine recovery strategy
let mut chat_messages: Vec<ChatMessage> = Vec::new();
if let Some(after_ts) = session_meta.last_compressed_message_at {
// Load last 4 timelines to determine if there are > 3
let timelines = storage
.load_session_timelines(&id.to_string(), 4)
.await
.unwrap_or_default();
let has_more_timelines = timelines.len() > 3;
// Insert hint if more timelines exist
if has_more_timelines {
chat_messages.push(ChatMessage::user(
"[Earlier conversation summaries exist. \
Use `timeline_recall` to search if needed.]"
));
}
// Insert latest 3 timelines as context (reversed: oldest first)
for tl in timelines.iter().take(3).rev() {
chat_messages.push(ChatMessage::user(format!(
"[Previous Context]\n{}", tl.content
)));
}
// Load raw messages after compressed timestamp
let tail = storage
.load_messages_after_timestamp(&id.to_string(), after_ts)
.await
.unwrap_or_default();
let mut tail_msgs: Vec<ChatMessage> = tail.into_iter().map(|m| {
ChatMessage {
id: m.id,
role: m.role,
content: m.content,
media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(),
timestamp: m.created_at,
tool_call_id: m.tool_call_id,
tool_name: m.tool_name,
tool_calls: m.tool_calls
.and_then(|tc| serde_json::from_str::<Vec<crate::providers::ToolCall>>(&tc).ok())
.filter(|v| !v.is_empty()),
source: m.source.and_then(|s| serde_json::from_str(&s).ok()),
}
}).collect();
repair_tool_call_chains(&mut tail_msgs);
chat_messages.extend(tail_msgs);
} else {
// No prior compression — load all messages (existing behavior)
let messages = storage.load_messages(&id.to_string(), 0).await
.map_err(|e| AgentError::Other(format!("failed to load messages from storage: {}", e)))?;
chat_messages = messages.into_iter().map(|m| {
ChatMessage {
id: m.id,
role: m.role,
content: m.content,
media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(),
timestamp: m.created_at,
tool_call_id: m.tool_call_id,
tool_name: m.tool_name,
tool_calls: m.tool_calls
.and_then(|tc| serde_json::from_str::<Vec<crate::providers::ToolCall>>(&tc).ok())
.filter(|v| !v.is_empty()),
source: m.source.and_then(|s| serde_json::from_str(&s).ok()),
}
}).collect();
repair_tool_call_chains(&mut chat_messages);
}
// seq_counter from actual DB max
let max_seq = storage
.get_max_message_seq(&id.to_string())
.await
.unwrap_or(0);
let seq_counter = max_seq + 1;
let total_message_count = session_meta.message_count;
Ok(Self {
id: id.clone(),
title: session_meta.title,
created_at: session_meta.created_at,
last_active_at: session_meta.last_active_at,
message_count: session_meta.message_count,
total_message_count,
messages: chat_messages,
seq_counter,
provider_config: provider_config.clone(),
provider: provider.clone(),
tools,
compressor,
storage: Some(storage),
routing_info: session_meta.routing_info.unwrap_or_default(),
last_consolidated_at: session_meta.last_consolidated_at,
last_compressed_message_at: session_meta.last_compressed_message_at,
memory_manager,
})
}
```
**Step 2: 编译检查**
```bash
cargo check 2>&1
```
**Step 3: Commit**
```bash
git add src/session/session.rs
git commit -m "feat(session): incremental recovery from storage using compressed timeline"
```
---
### Task 6: 系统提示词加历史会话提示
**Files:**
- Modify: `src/agent/system_prompt.rs:289-304` (MemorySection)
- Modify: `src/agent/system_prompt.rs:16-23` (PromptContext)
- Modify: `src/agent/system_prompt.rs:343-358` (build_system_prompt free function)
- Modify: `src/session/session.rs:411-426` (build_system_prompt)
**Step 1: PromptContext 加 has_compressed_history 字段**
```rust
pub struct PromptContext<'a> {
pub workspace_dir: &'a Path,
pub model_name: &'a str,
pub tools: &'a ToolRegistry,
pub session_id: Option<&'a str>,
pub memory_context: Option<&'a str>,
pub has_compressed_history: bool,
}
```
**Step 2: 加 HistorySection**
在 MemorySection 后面添加:
```rust
pub struct HistorySection;
impl PromptSection for HistorySection {
fn name(&self) -> &str {
"history"
}
fn build(&self, ctx: &PromptContext<'_>) -> String {
if ctx.has_compressed_history {
"## 历史会话\n之前的对话摘要已归档。如需回顾历史上下文,使用 `timeline_recall` 工具搜索。".to_string()
} else {
String::new()
}
}
}
```
**Step 3: 注册到 SystemPromptBuilder::with_defaults**
`with_defaults()` 的 sections vec 中 `Box::new(MemorySection)` 后加 `Box::new(HistorySection)`
**Step 4: 更新 build_system_prompt 签名和调用**
```rust
pub fn build_system_prompt(
workspace_dir: &Path,
model_name: &str,
tools: &ToolRegistry,
session_id: Option<&str>,
memory_context: Option<&str>,
has_compressed_history: bool,
) -> String {
let ctx = PromptContext {
workspace_dir,
model_name,
tools,
session_id,
memory_context,
has_compressed_history,
};
SystemPromptBuilder::with_defaults().build(&ctx)
}
```
**Step 5: 更新 Session::build_system_prompt**
```rust
pub fn build_system_prompt(&self, skills_prompt: &str, memory_context: Option<&str>) -> String {
let base_prompt = build_system_prompt(
&self.provider_config.workspace_dir,
&self.provider_config.model_id,
&self.tools,
Some(&self.id.to_string()),
memory_context,
self.last_compressed_message_at.is_some(),
);
if skills_prompt.trim().is_empty() {
base_prompt
} else {
format!("{}\n\n## Skills\n\n{}\n\nUse the `get_skill` tool to load a skill's full content when needed.", base_prompt, skills_prompt)
}
}
```
**Step 6: 更新所有其他 build_system_prompt 调用方**
搜索 `build_system_prompt(` 的所有调用位置,每个都要加 `false` 参数。主要有 `agent/agent_loop.rs` 中的两个调用。
**Step 7: 编译检查**
```bash
cargo check 2>&1
```
**Step 8: Commit**
```bash
git add src/agent/system_prompt.rs src/session/session.rs src/agent/agent_loop.rs
git commit -m "feat(system-prompt): add history section for archived conversation context"
```
---
### Task 7: handle_message 和 /compact 记录压缩标记
**Files:**
- Modify: `src/session/session.rs:1339-1355` (handle_message 压缩后)
- Modify: `src/session/session.rs:1372-1376` (handle_message 溢出重试)
- Modify: `src/session/session.rs:851-872` (/compact 命令)
**Step 1: handle_message 正常流**
`compress_if_needed(history).await?` 之后line 1346改为
```rust
let result = session_guard.compressor
.compress_if_needed(history)
.await?;
if result.created_timelines {
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
if let Err(e) = session_guard.persist_session_meta().await {
tracing::warn!(error = %e, "Failed to persist compressed message marker");
}
}
let mut history = result.history;
```
同时删除后面line 1350-1355单独的 `persist_session_meta` 调用(现在已合入上面的逻辑)。
**Step 2: handle_message 溢出重试流**
```rust
let raw = session_guard.get_history().to_vec();
let result = session_guard.compressor.compress_if_needed(raw).await?;
if result.created_timelines {
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
let _ = session_guard.persist_session_meta().await;
}
let mut retry = result.history;
retry.insert(0, ChatMessage::system(system_prompt));
agent.process(retry).await?
```
**Step 3: /compact 命令**
```rust
let result = session_guard.compressor
.compress_if_needed(history)
.await?;
let compressed_count = result.history.len();
if result.created_timelines {
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
let _ = session_guard.persist_session_meta().await;
}
session_guard.clear_history();
for msg in result.history {
session_guard.add_message(msg, false).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
}
```
同时确认 `compress_if_needed` 的 import 正常(已在 scope 中)。
**Step 4: 编译检查**
```bash
cargo check 2>&1
```
**Step 5: Commit**
```bash
git add src/session/session.rs
git commit -m "feat(session): record last_compressed_message_at after compression"
```
---
### Task 8: 全局编译和测试
**Step 1: 全局编译**
```bash
cargo check 2>&1
```
修复所有编译错误,确保全部文件一致。
**Step 2: 运行单元测试**
```bash
cargo test --lib 2>&1
```
**Step 3: 测试通过后 commit**
```bash
git add -A
git commit -m "chore: fix remaining compilation and test issues for incremental recovery"
```
**Step 4: 运行 lint**
```bash
cargo clippy --lib 2>&1 | head -50
```
修复任何 warning。
---
### Task 9: 验证 & 提交设计文档
**Step 1: 最终验证**
```bash
cargo test --lib 2>&1
```
**Step 2: Commit 设计文档**
```bash
git add docs/plans/2026-05-10-incremental-session-recovery-design.md
git commit -m "docs: add incremental session recovery design doc"
```