diff --git a/docs/plans/2026-05-10-incremental-session-recovery-design.md b/docs/plans/2026-05-10-incremental-session-recovery-design.md new file mode 100644 index 0000000..6de44ce --- /dev/null +++ b/docs/plans/2026-05-10-incremental-session-recovery-design.md @@ -0,0 +1,90 @@ +# 启动增量恢复设计 + +## 问题 + +PicoBot 重启后,`Session::from_storage()` 全量加载 `messages` 表,恢复的 history 可能直接超出上下文窗口,首次 LLM 调用即触发压缩,浪费 token。 + +## 设计 + +### 核心思路 + +用 `last_compressed_message_at` 标记最后压缩时刻。恢复时: +- 加载该标记之后的原始消息 +- 用该 session 的 Timeline 条目替代已压缩部分 +- `seq_counter` 统一从 SQLite 查 `MAX(seq) + 1` + +``` +messages 表 memories(timeline) +┌──────────────────────────┐ ┌───────────────────────────┐ +│ created_at = T1..T5 │ ← 跳过 │ session = feishu:oc:dialog │ +│ (压缩已覆盖,用Timeline替代)│ │ created_at 降序 │ +├──────────────────────────┤ ├───────────────────────────┤ +│ created_at > T6 │ ← 加载 │ 只取最近 3 条 │ +└──────────────────────────┘ └───────────────────────────┘ +``` + +### 数据变更 + +**`sessions` 表加列:** +```sql +last_compressed_message_at INTEGER +``` + +**`SessionMeta` / `Session` 加字段:** `last_compressed_message_at: Option` + +### Storage 层新增方法 + +| 方法 | SQL | +|------|-----| +| `get_max_message_seq(session_id)` | `SELECT MAX(seq) FROM messages WHERE session_id = ?` | +| `load_messages_after_timestamp(session_id, after_ts)` | `WHERE created_at > ?` | +| `load_session_timelines(session_id, limit)` | `WHERE session_id = ? AND category = 'timeline' ORDER BY created_at DESC LIMIT ?` | + +### 压缩跟踪 + +`compress_if_needed()` 返回值改为 `CompressionResult { history, created_timelines: bool }`。 +`compress_once()` 中 LLM 摘要路径才置 `true`(Tier 2),Tier 1/3 不产生 Timeline。 + +**记录时机**(`handle_message` 正常流、溢出重试流、`/compact` 统一): +```rust +if result.created_timelines { + session.last_compressed_message_at = Some(now()); + session.persist_session_meta().await; +} +``` + +### Session::from_storage() 恢复逻辑 + +有压缩标记时: +1. `load_session_timelines(limit=4)` → 取 3 条给 LLM,第 4 条判"有更多" +2. 有更多 → 插入提示 user 消息 +3. 逐条插入 Timeline 为 `[Previous Context]` user 消息 +4. `load_messages_after_timestamp(after_ts)` → 原始尾消息 +5. `repair_tool_call_chains` + +无压缩标记 → 全量加载(现有行为)。 + +统一:`seq_counter = MAX(seq) + 1` + +### 系统提示词 + +`Session.last_compressed_message_at` 非空时追加: +``` +## 历史会话 +之前的对话摘要已归档。如需回顾历史上下文,使用 `timeline_recall` 工具搜索。 +``` + +## 改动清单 + +| # | 文件 | 改动 | +|---|------|------| +| 1 | `storage/session.rs` | `SessionMeta` 加 `last_compressed_message_at` | +| 2 | `storage/mod.rs` | DDL migration + upsert/get_session 加列 | +| 3 | `storage/mod.rs` | 新增 `get_max_message_seq`, `load_messages_after_timestamp` | +| 4 | `storage/memory.rs` | 新增 `load_session_timelines` | +| 5 | `agent/context_compressor.rs` | 返回值改为 `CompressionResult` 含 `created_timelines` | +| 6 | `session/session.rs` | `Session` 加字段,`persist_session_meta` 加字段 | +| 7 | `session/session.rs` | `from_storage()` 重写恢复逻辑 | +| 8 | `session/session.rs` | `handle_message()` 压缩后记录标记 | +| 9 | `session/session.rs` | `/compact` 命令压缩后记录标记 | +| 10 | `session/session.rs` | `build_system_prompt()` 注入 `last_compressed_message_at` | diff --git a/docs/plans/2026-05-10-incremental-session-recovery.md b/docs/plans/2026-05-10-incremental-session-recovery.md new file mode 100644 index 0000000..3c02216 --- /dev/null +++ b/docs/plans/2026-05-10-incremental-session-recovery.md @@ -0,0 +1,674 @@ +# 启动增量恢复 Implementation Plan + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** PicoBot 重启后不再全量加载 messages 表,而是基于 `last_compressed_message_at` 标记增量恢复,用 Timeline 替代已压缩部分。 + +**Architecture:** 在 `sessions` 表加 `last_compressed_message_at` 列,`compress_if_needed` 返回值增加 `created_timelines` 标志,恢复时按时间戳增量加载消息并用近 3 条 Timeline 前置,`seq_counter` 统一从 SQLite 查 MAX(seq)。 + +**Tech Stack:** Rust, sqlx (SQLite), tokio + +--- + +### Task 1: SessionMeta 和数据库 DDL 加列 + +**Files:** +- Modify: `src/storage/session.rs:15` +- Modify: `src/storage/mod.rs:44-45` (DDL), `:172-180` (migration) +- Modify: `src/storage/mod.rs:317-326` (upsert_session SQL + ON CONFLICT) +- Modify: `src/storage/mod.rs:345-369` (get_session SELECT + struct) +- Modify: `src/storage/mod.rs:380-406`, `:454-479`, `:564-588`, `:728`, `:754` (list_sessions 及测试 mock) + +**Step 1: 在 `src/storage/session.rs` SessionMeta 加字段** + +在 `last_consolidated_at: Option` 后加一行: +```rust +pub last_compressed_message_at: Option, +``` + +**Step 2: DDL schema 加列** + +在 `src/storage/mod.rs` 的 CREATE TABLE sessions 中 (line 44),`last_consolidated_at INTEGER` 后加逗号和: +```sql +last_compressed_message_at INTEGER +``` + +**Step 3: migration 加列** + +在 `src/storage/mod.rs` line 182 之后(现有 migration 的 `); .ok();` 之后),添加新 migration: +```rust +// Migration: add last_compressed_message_at column if not exists +sqlx::query( + r#"ALTER TABLE sessions ADD COLUMN last_compressed_message_at INTEGER"#, +) +.execute(&self.pool) +.await +.ok(); +``` + +**Step 4: upsert_session SQL 加列** + +`src/storage/mod.rs` line 317: INSERT 列列表加 `last_compressed_message_at`,VALUES 加 `?`,ON CONFLICT DO UPDATE SET 加 `last_compressed_message_at = excluded.last_compressed_message_at`。line 338 后加 `.bind(meta.last_compressed_message_at)`。 + +**Step 5: get_session SELECT 加列** + +`src/storage/mod.rs` line 348: SELECT 列加 `last_compressed_message_at`。line 368 后加: +```rust +last_compressed_message_at: row.get("last_compressed_message_at"), +``` + +**Step 6: 其他 SELECT sessions 的地方(list_sessions 多个变体)** + +同样补 `last_compressed_message_at` 到 SELECT 列和 struct 构造。以及测试中的 mock SessionMeta 构造(line 728, 754 等)。 + +**Step 7: 编译检查** + +```bash +cargo check 2>&1 +``` + +**Step 8: Commit** + +```bash +git add src/storage/session.rs src/storage/mod.rs +git commit -m "feat(storage): add last_compressed_message_at column to sessions" +``` + +--- + +### Task 2: Storage 新增加载方法 + +**Files:** +- Modify: `src/storage/mod.rs` (在 load_messages 之后) +- Modify: `src/storage/memory.rs` (在 cleanup_old_timelines 之后) + +**Step 1: `get_max_message_seq`** + +在 `src/storage/mod.rs` 中 `load_messages` 函数后面添加: +```rust +pub async fn get_max_message_seq(&self, session_id: &str) -> Result { + let row = sqlx::query( + "SELECT COALESCE(MAX(seq), 0) as max_seq FROM messages WHERE session_id = ?", + ) + .bind(session_id) + .fetch_one(self.pool()) + .await?; + Ok(row.get::("max_seq")) +} +``` + +**Step 2: `load_messages_after_timestamp`** + +```rust +pub async fn load_messages_after_timestamp( + &self, + session_id: &str, + after_ts: i64, +) -> Result, StorageError> { + let rows = sqlx::query( + r#" + SELECT id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, source, created_at + FROM messages + WHERE session_id = ? AND created_at > ? + ORDER BY seq ASC + "#, + ) + .bind(session_id) + .bind(after_ts) + .fetch_all(self.pool()) + .await?; + + Ok(rows.into_iter().map(|row| crate::storage::message::MessageMeta { + id: row.get("id"), + session_id: row.get("session_id"), + seq: row.get("seq"), + role: row.get("role"), + content: row.get("content"), + media_refs: row.get("media_refs"), + tool_call_id: row.get("tool_call_id"), + tool_name: row.get("tool_name"), + tool_calls: row.get("tool_calls"), + source: row.get("source"), + created_at: row.get("created_at"), + }).collect()) +} +``` + +**Step 3: `load_session_timelines`** + +在 `src/storage/memory.rs` 的 `cleanup_old_timelines` 之后(line 252 的 `}` 之前)添加: +```rust +pub async fn load_session_timelines( + &self, + session_id: &str, + limit: usize, +) -> Result, StorageError> { + let rows = sqlx::query( + r#" + SELECT id, key, content, category, importance, + session_id, created_at, updated_at + FROM memories + WHERE session_id = ? AND category = 'timeline' + ORDER BY created_at DESC + LIMIT ? + "#, + ) + .bind(session_id) + .bind(limit as i64) + .fetch_all(self.pool()) + .await?; + + parse_memory_rows(&rows) +} +``` + +**Step 4: 编译检查** + +```bash +cargo check 2>&1 +``` + +**Step 5: Commit** + +```bash +git add src/storage/mod.rs src/storage/memory.rs +git commit -m "feat(storage): add load_messages_after_timestamp, load_session_timelines, get_max_message_seq" +``` + +--- + +### Task 3: ContextCompressor 引入 CompressionResult + +**Files:** +- Modify: `src/agent/context_compressor.rs:172-274` (compress_if_needed) +- Modify: `src/agent/context_compressor.rs:320-402` (compress_once) + +**Step 1: 定义 CompressionResult** + +在 context_compressor.rs 中 `ContextCompressor` struct 定义之后添加: +```rust +pub struct CompressionResult { + pub history: Vec, + pub created_timelines: bool, +} +``` + +**Step 2: 修改 compress_if_needed 签名和返回** + +将 `pub async fn compress_if_needed(&self, mut history: Vec) -> Result, AgentError>` 改为: +```rust +pub async fn compress_if_needed( + &self, + mut history: Vec, +) -> Result { +``` + +内部的 `return Ok(history)` 改为 `return Ok(CompressionResult { history, created_timelines: false })`(Tier 1 fast trim 和不需要压缩时)。 + +**Step 3: 修改 LLM summarization pass 部分** + +在压缩循环中维护一个 `created_timelines` 标志: +```rust +let mut created_timelines = false; +for pass in 0..self.config.max_passes { + // ... + match self.compress_once(...).await { + Ok(Some(compressed)) => { + current_history = compressed; + created_timelines = true; + } + // ... + } +} +``` + +最后返回: +```rust +Ok(CompressionResult { history: current_history, created_timelines }) +``` + +**Step 4: 更新所有 compress_if_needed 调用方** + +所有 `compress_if_needed(history)` 改为 `compress_if_needed(history).await?.history`。在 `handle_message` 和 `/compact` 中还需要用到 `created_timelines`。 + +**Step 5: 编译检查** + +```bash +cargo check 2>&1 +``` + +**Step 6: Commit** + +```bash +git add src/agent/context_compressor.rs src/session/session.rs +git commit -m "feat(compressor): return CompressionResult with created_timelines flag" +``` + +--- + +### Task 4: Session 结构体和持久化 + +**Files:** +- Modify: `src/session/session.rs:52-74` (Session struct) +- Modify: `src/session/session.rs:76-121` (Session::new) +- Modify: `src/session/session.rs:298-320` (persist_session_meta) + +**Step 1: Session struct 加字段** + +在 `pub last_consolidated_at: Option` 后加: +```rust +pub last_compressed_message_at: Option, +``` + +**Step 2: Session::new 初始化** + +在 `last_consolidated_at: None` 后加: +```rust +last_compressed_message_at: None, +``` + +**Step 3: persist_session_meta 加字段** + +在 `last_consolidated_at: self.last_consolidated_at` 后加: +```rust +last_compressed_message_at: self.last_compressed_message_at, +``` + +**Step 4: 编译检查** + +```bash +cargo check 2>&1 +``` + +**Step 5: Commit** + +```bash +git add src/session/session.rs +git commit -m "feat(session): add last_compressed_message_at field to Session and persist" +``` + +--- + +### Task 5: Session::from_storage() 增量恢复 + +**Files:** +- Modify: `src/session/session.rs:124-189` (from_storage) + +**Step 1: 重写 from_storage** + +替换现有实现为: + +```rust +pub async fn from_storage( + id: UnifiedSessionId, + provider_config: LLMProviderConfig, + tools: Arc, + storage: StdArc, + memory_manager: Arc, +) -> Result { + let session_meta = storage.get_session(&id.to_string()).await + .map_err(|e| AgentError::Other(format!("failed to load session from storage: {}", e)))?; + + let mut provider_box = create_provider(provider_config.clone()) + .map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?; + provider_box.set_storage(storage.clone()); + let provider: Arc = Arc::from(provider_box); + + let compressor_config = ContextCompressionConfig { + protect_first_n: 2, + ..Default::default() + }; + + let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone()); + compressor.set_session_id(Some(id.to_string())); + + // Determine recovery strategy + let mut chat_messages: Vec = Vec::new(); + + if let Some(after_ts) = session_meta.last_compressed_message_at { + // Load last 4 timelines to determine if there are > 3 + let timelines = storage + .load_session_timelines(&id.to_string(), 4) + .await + .unwrap_or_default(); + + let has_more_timelines = timelines.len() > 3; + + // Insert hint if more timelines exist + if has_more_timelines { + chat_messages.push(ChatMessage::user( + "[Earlier conversation summaries exist. \ + Use `timeline_recall` to search if needed.]" + )); + } + + // Insert latest 3 timelines as context (reversed: oldest first) + for tl in timelines.iter().take(3).rev() { + chat_messages.push(ChatMessage::user(format!( + "[Previous Context]\n{}", tl.content + ))); + } + + // Load raw messages after compressed timestamp + let tail = storage + .load_messages_after_timestamp(&id.to_string(), after_ts) + .await + .unwrap_or_default(); + + let mut tail_msgs: Vec = tail.into_iter().map(|m| { + ChatMessage { + id: m.id, + role: m.role, + content: m.content, + media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(), + timestamp: m.created_at, + tool_call_id: m.tool_call_id, + tool_name: m.tool_name, + tool_calls: m.tool_calls + .and_then(|tc| serde_json::from_str::>(&tc).ok()) + .filter(|v| !v.is_empty()), + source: m.source.and_then(|s| serde_json::from_str(&s).ok()), + } + }).collect(); + + repair_tool_call_chains(&mut tail_msgs); + chat_messages.extend(tail_msgs); + } else { + // No prior compression — load all messages (existing behavior) + let messages = storage.load_messages(&id.to_string(), 0).await + .map_err(|e| AgentError::Other(format!("failed to load messages from storage: {}", e)))?; + + chat_messages = messages.into_iter().map(|m| { + ChatMessage { + id: m.id, + role: m.role, + content: m.content, + media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(), + timestamp: m.created_at, + tool_call_id: m.tool_call_id, + tool_name: m.tool_name, + tool_calls: m.tool_calls + .and_then(|tc| serde_json::from_str::>(&tc).ok()) + .filter(|v| !v.is_empty()), + source: m.source.and_then(|s| serde_json::from_str(&s).ok()), + } + }).collect(); + + repair_tool_call_chains(&mut chat_messages); + } + + // seq_counter from actual DB max + let max_seq = storage + .get_max_message_seq(&id.to_string()) + .await + .unwrap_or(0); + let seq_counter = max_seq + 1; + let total_message_count = session_meta.message_count; + + Ok(Self { + id: id.clone(), + title: session_meta.title, + created_at: session_meta.created_at, + last_active_at: session_meta.last_active_at, + message_count: session_meta.message_count, + total_message_count, + messages: chat_messages, + seq_counter, + provider_config: provider_config.clone(), + provider: provider.clone(), + tools, + compressor, + storage: Some(storage), + routing_info: session_meta.routing_info.unwrap_or_default(), + last_consolidated_at: session_meta.last_consolidated_at, + last_compressed_message_at: session_meta.last_compressed_message_at, + memory_manager, + }) +} +``` + +**Step 2: 编译检查** + +```bash +cargo check 2>&1 +``` + +**Step 3: Commit** + +```bash +git add src/session/session.rs +git commit -m "feat(session): incremental recovery from storage using compressed timeline" +``` + +--- + +### Task 6: 系统提示词加历史会话提示 + +**Files:** +- Modify: `src/agent/system_prompt.rs:289-304` (MemorySection) +- Modify: `src/agent/system_prompt.rs:16-23` (PromptContext) +- Modify: `src/agent/system_prompt.rs:343-358` (build_system_prompt free function) +- Modify: `src/session/session.rs:411-426` (build_system_prompt) + +**Step 1: PromptContext 加 has_compressed_history 字段** + +```rust +pub struct PromptContext<'a> { + pub workspace_dir: &'a Path, + pub model_name: &'a str, + pub tools: &'a ToolRegistry, + pub session_id: Option<&'a str>, + pub memory_context: Option<&'a str>, + pub has_compressed_history: bool, +} +``` + +**Step 2: 加 HistorySection** + +在 MemorySection 后面添加: +```rust +pub struct HistorySection; + +impl PromptSection for HistorySection { + fn name(&self) -> &str { + "history" + } + + fn build(&self, ctx: &PromptContext<'_>) -> String { + if ctx.has_compressed_history { + "## 历史会话\n之前的对话摘要已归档。如需回顾历史上下文,使用 `timeline_recall` 工具搜索。".to_string() + } else { + String::new() + } + } +} +``` + +**Step 3: 注册到 SystemPromptBuilder::with_defaults** + +在 `with_defaults()` 的 sections vec 中 `Box::new(MemorySection)` 后加 `Box::new(HistorySection)`。 + +**Step 4: 更新 build_system_prompt 签名和调用** + +```rust +pub fn build_system_prompt( + workspace_dir: &Path, + model_name: &str, + tools: &ToolRegistry, + session_id: Option<&str>, + memory_context: Option<&str>, + has_compressed_history: bool, +) -> String { + let ctx = PromptContext { + workspace_dir, + model_name, + tools, + session_id, + memory_context, + has_compressed_history, + }; + SystemPromptBuilder::with_defaults().build(&ctx) +} +``` + +**Step 5: 更新 Session::build_system_prompt** + +```rust +pub fn build_system_prompt(&self, skills_prompt: &str, memory_context: Option<&str>) -> String { + let base_prompt = build_system_prompt( + &self.provider_config.workspace_dir, + &self.provider_config.model_id, + &self.tools, + Some(&self.id.to_string()), + memory_context, + self.last_compressed_message_at.is_some(), + ); + + if skills_prompt.trim().is_empty() { + base_prompt + } else { + format!("{}\n\n## Skills\n\n{}\n\nUse the `get_skill` tool to load a skill's full content when needed.", base_prompt, skills_prompt) + } +} +``` + +**Step 6: 更新所有其他 build_system_prompt 调用方** + +搜索 `build_system_prompt(` 的所有调用位置,每个都要加 `false` 参数。主要有 `agent/agent_loop.rs` 中的两个调用。 + +**Step 7: 编译检查** + +```bash +cargo check 2>&1 +``` + +**Step 8: Commit** + +```bash +git add src/agent/system_prompt.rs src/session/session.rs src/agent/agent_loop.rs +git commit -m "feat(system-prompt): add history section for archived conversation context" +``` + +--- + +### Task 7: handle_message 和 /compact 记录压缩标记 + +**Files:** +- Modify: `src/session/session.rs:1339-1355` (handle_message 压缩后) +- Modify: `src/session/session.rs:1372-1376` (handle_message 溢出重试) +- Modify: `src/session/session.rs:851-872` (/compact 命令) + +**Step 1: handle_message 正常流** + +在 `compress_if_needed(history).await?` 之后(line 1346),改为: +```rust +let result = session_guard.compressor + .compress_if_needed(history) + .await?; +if result.created_timelines { + session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis()); + if let Err(e) = session_guard.persist_session_meta().await { + tracing::warn!(error = %e, "Failed to persist compressed message marker"); + } +} +let mut history = result.history; +``` + +同时删除后面(line 1350-1355)单独的 `persist_session_meta` 调用(现在已合入上面的逻辑)。 + +**Step 2: handle_message 溢出重试流** + +```rust +let raw = session_guard.get_history().to_vec(); +let result = session_guard.compressor.compress_if_needed(raw).await?; +if result.created_timelines { + session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis()); + let _ = session_guard.persist_session_meta().await; +} +let mut retry = result.history; +retry.insert(0, ChatMessage::system(system_prompt)); +agent.process(retry).await? +``` + +**Step 3: /compact 命令** + +```rust +let result = session_guard.compressor + .compress_if_needed(history) + .await?; +let compressed_count = result.history.len(); +if result.created_timelines { + session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis()); + let _ = session_guard.persist_session_meta().await; +} +session_guard.clear_history(); +for msg in result.history { + session_guard.add_message(msg, false).await + .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; +} +``` + +同时确认 `compress_if_needed` 的 import 正常(已在 scope 中)。 + +**Step 4: 编译检查** + +```bash +cargo check 2>&1 +``` + +**Step 5: Commit** + +```bash +git add src/session/session.rs +git commit -m "feat(session): record last_compressed_message_at after compression" +``` + +--- + +### Task 8: 全局编译和测试 + +**Step 1: 全局编译** + +```bash +cargo check 2>&1 +``` + +修复所有编译错误,确保全部文件一致。 + +**Step 2: 运行单元测试** + +```bash +cargo test --lib 2>&1 +``` + +**Step 3: 测试通过后 commit** + +```bash +git add -A +git commit -m "chore: fix remaining compilation and test issues for incremental recovery" +``` + +**Step 4: 运行 lint** + +```bash +cargo clippy --lib 2>&1 | head -50 +``` + +修复任何 warning。 + +--- + +### Task 9: 验证 & 提交设计文档 + +**Step 1: 最终验证** + +```bash +cargo test --lib 2>&1 +``` + +**Step 2: Commit 设计文档** + +```bash +git add docs/plans/2026-05-10-incremental-session-recovery-design.md +git commit -m "docs: add incremental session recovery design doc" +```