PicoBot/docs/plans/2026-05-10-incremental-session-recovery.md

19 KiB
Raw Blame History

启动增量恢复 Implementation Plan

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: PicoBot 重启后不再全量加载 messages 表,而是基于 last_compressed_message_at 标记增量恢复,用 Timeline 替代已压缩部分。

Architecture:sessions 表加 last_compressed_message_at 列,compress_if_needed 返回值增加 created_timelines 标志,恢复时按时间戳增量加载消息并用近 3 条 Timeline 前置,seq_counter 统一从 SQLite 查 MAX(seq)。

Tech Stack: Rust, sqlx (SQLite), tokio


Task 1: SessionMeta 和数据库 DDL 加列

Files:

  • Modify: src/storage/session.rs:15
  • Modify: src/storage/mod.rs:44-45 (DDL), :172-180 (migration)
  • Modify: src/storage/mod.rs:317-326 (upsert_session SQL + ON CONFLICT)
  • Modify: src/storage/mod.rs:345-369 (get_session SELECT + struct)
  • Modify: src/storage/mod.rs:380-406, :454-479, :564-588, :728, :754 (list_sessions 及测试 mock)

Step 1: 在 src/storage/session.rs SessionMeta 加字段

last_consolidated_at: Option<i64> 后加一行:

pub last_compressed_message_at: Option<i64>,

Step 2: DDL schema 加列

src/storage/mod.rs 的 CREATE TABLE sessions 中 (line 44)last_consolidated_at INTEGER 后加逗号和:

last_compressed_message_at INTEGER

Step 3: migration 加列

src/storage/mod.rs line 182 之后(现有 migration 的 ); .ok(); 之后),添加新 migration

// Migration: add last_compressed_message_at column if not exists
sqlx::query(
    r#"ALTER TABLE sessions ADD COLUMN last_compressed_message_at INTEGER"#,
)
.execute(&self.pool)
.await
.ok();

Step 4: upsert_session SQL 加列

src/storage/mod.rs line 317: INSERT 列列表加 last_compressed_message_atVALUES 加 ?ON CONFLICT DO UPDATE SET 加 last_compressed_message_at = excluded.last_compressed_message_at。line 338 后加 .bind(meta.last_compressed_message_at)

Step 5: get_session SELECT 加列

src/storage/mod.rs line 348: SELECT 列加 last_compressed_message_at。line 368 后加:

last_compressed_message_at: row.get("last_compressed_message_at"),

Step 6: 其他 SELECT sessions 的地方list_sessions 多个变体)

同样补 last_compressed_message_at 到 SELECT 列和 struct 构造。以及测试中的 mock SessionMeta 构造line 728, 754 等)。

Step 7: 编译检查

cargo check 2>&1

Step 8: Commit

git add src/storage/session.rs src/storage/mod.rs
git commit -m "feat(storage): add last_compressed_message_at column to sessions"

Task 2: Storage 新增加载方法

Files:

  • Modify: src/storage/mod.rs (在 load_messages 之后)
  • Modify: src/storage/memory.rs (在 cleanup_old_timelines 之后)

Step 1: get_max_message_seq

src/storage/mod.rsload_messages 函数后面添加:

pub async fn get_max_message_seq(&self, session_id: &str) -> Result<i64, StorageError> {
    let row = sqlx::query(
        "SELECT COALESCE(MAX(seq), 0) as max_seq FROM messages WHERE session_id = ?",
    )
    .bind(session_id)
    .fetch_one(self.pool())
    .await?;
    Ok(row.get::<i64, _>("max_seq"))
}

Step 2: load_messages_after_timestamp

pub async fn load_messages_after_timestamp(
    &self,
    session_id: &str,
    after_ts: i64,
) -> Result<Vec<crate::storage::message::MessageMeta>, StorageError> {
    let rows = sqlx::query(
        r#"
        SELECT id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, source, created_at
        FROM messages
        WHERE session_id = ? AND created_at > ?
        ORDER BY seq ASC
        "#,
    )
    .bind(session_id)
    .bind(after_ts)
    .fetch_all(self.pool())
    .await?;

    Ok(rows.into_iter().map(|row| crate::storage::message::MessageMeta {
        id: row.get("id"),
        session_id: row.get("session_id"),
        seq: row.get("seq"),
        role: row.get("role"),
        content: row.get("content"),
        media_refs: row.get("media_refs"),
        tool_call_id: row.get("tool_call_id"),
        tool_name: row.get("tool_name"),
        tool_calls: row.get("tool_calls"),
        source: row.get("source"),
        created_at: row.get("created_at"),
    }).collect())
}

Step 3: load_session_timelines

src/storage/memory.rscleanup_old_timelines 之后line 252 的 } 之前)添加:

pub async fn load_session_timelines(
    &self,
    session_id: &str,
    limit: usize,
) -> Result<Vec<MemoryEntry>, StorageError> {
    let rows = sqlx::query(
        r#"
        SELECT id, key, content, category, importance,
               session_id, created_at, updated_at
        FROM memories
        WHERE session_id = ? AND category = 'timeline'
        ORDER BY created_at DESC
        LIMIT ?
        "#,
    )
    .bind(session_id)
    .bind(limit as i64)
    .fetch_all(self.pool())
    .await?;

    parse_memory_rows(&rows)
}

Step 4: 编译检查

cargo check 2>&1

Step 5: Commit

git add src/storage/mod.rs src/storage/memory.rs
git commit -m "feat(storage): add load_messages_after_timestamp, load_session_timelines, get_max_message_seq"

Task 3: ContextCompressor 引入 CompressionResult

Files:

  • Modify: src/agent/context_compressor.rs:172-274 (compress_if_needed)
  • Modify: src/agent/context_compressor.rs:320-402 (compress_once)

Step 1: 定义 CompressionResult

在 context_compressor.rs 中 ContextCompressor struct 定义之后添加:

pub struct CompressionResult {
    pub history: Vec<ChatMessage>,
    pub created_timelines: bool,
}

Step 2: 修改 compress_if_needed 签名和返回

pub async fn compress_if_needed(&self, mut history: Vec<ChatMessage>) -> Result<Vec<ChatMessage>, AgentError> 改为:

pub async fn compress_if_needed(
    &self,
    mut history: Vec<ChatMessage>,
) -> Result<CompressionResult, AgentError> {

内部的 return Ok(history) 改为 return Ok(CompressionResult { history, created_timelines: false })Tier 1 fast trim 和不需要压缩时)。

Step 3: 修改 LLM summarization pass 部分

在压缩循环中维护一个 created_timelines 标志:

let mut created_timelines = false;
for pass in 0..self.config.max_passes {
    // ...
    match self.compress_once(...).await {
        Ok(Some(compressed)) => {
            current_history = compressed;
            created_timelines = true;
        }
        // ...
    }
}

最后返回:

Ok(CompressionResult { history: current_history, created_timelines })

Step 4: 更新所有 compress_if_needed 调用方

所有 compress_if_needed(history) 改为 compress_if_needed(history).await?.history。在 handle_message/compact 中还需要用到 created_timelines

Step 5: 编译检查

cargo check 2>&1

Step 6: Commit

git add src/agent/context_compressor.rs src/session/session.rs
git commit -m "feat(compressor): return CompressionResult with created_timelines flag"

Task 4: Session 结构体和持久化

Files:

  • Modify: src/session/session.rs:52-74 (Session struct)
  • Modify: src/session/session.rs:76-121 (Session::new)
  • Modify: src/session/session.rs:298-320 (persist_session_meta)

Step 1: Session struct 加字段

pub last_consolidated_at: Option<i64> 后加:

pub last_compressed_message_at: Option<i64>,

Step 2: Session::new 初始化

last_consolidated_at: None 后加:

last_compressed_message_at: None,

Step 3: persist_session_meta 加字段

last_consolidated_at: self.last_consolidated_at 后加:

last_compressed_message_at: self.last_compressed_message_at,

Step 4: 编译检查

cargo check 2>&1

Step 5: Commit

git add src/session/session.rs
git commit -m "feat(session): add last_compressed_message_at field to Session and persist"

Task 5: Session::from_storage() 增量恢复

Files:

  • Modify: src/session/session.rs:124-189 (from_storage)

Step 1: 重写 from_storage

替换现有实现为:

pub async fn from_storage(
    id: UnifiedSessionId,
    provider_config: LLMProviderConfig,
    tools: Arc<ToolRegistry>,
    storage: StdArc<Storage>,
    memory_manager: Arc<crate::memory::MemoryManager>,
) -> Result<Self, AgentError> {
    let session_meta = storage.get_session(&id.to_string()).await
        .map_err(|e| AgentError::Other(format!("failed to load session from storage: {}", e)))?;

    let mut provider_box = create_provider(provider_config.clone())
        .map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?;
    provider_box.set_storage(storage.clone());
    let provider: Arc<dyn LLMProvider> = Arc::from(provider_box);

    let compressor_config = ContextCompressionConfig {
        protect_first_n: 2,
        ..Default::default()
    };

    let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone());
    compressor.set_session_id(Some(id.to_string()));

    // Determine recovery strategy
    let mut chat_messages: Vec<ChatMessage> = Vec::new();

    if let Some(after_ts) = session_meta.last_compressed_message_at {
        // Load last 4 timelines to determine if there are > 3
        let timelines = storage
            .load_session_timelines(&id.to_string(), 4)
            .await
            .unwrap_or_default();

        let has_more_timelines = timelines.len() > 3;

        // Insert hint if more timelines exist
        if has_more_timelines {
            chat_messages.push(ChatMessage::user(
                "[Earlier conversation summaries exist. \
                 Use `timeline_recall` to search if needed.]"
            ));
        }

        // Insert latest 3 timelines as context (reversed: oldest first)
        for tl in timelines.iter().take(3).rev() {
            chat_messages.push(ChatMessage::user(format!(
                "[Previous Context]\n{}", tl.content
            )));
        }

        // Load raw messages after compressed timestamp
        let tail = storage
            .load_messages_after_timestamp(&id.to_string(), after_ts)
            .await
            .unwrap_or_default();

        let mut tail_msgs: Vec<ChatMessage> = tail.into_iter().map(|m| {
            ChatMessage {
                id: m.id,
                role: m.role,
                content: m.content,
                media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(),
                timestamp: m.created_at,
                tool_call_id: m.tool_call_id,
                tool_name: m.tool_name,
                tool_calls: m.tool_calls
                    .and_then(|tc| serde_json::from_str::<Vec<crate::providers::ToolCall>>(&tc).ok())
                    .filter(|v| !v.is_empty()),
                source: m.source.and_then(|s| serde_json::from_str(&s).ok()),
            }
        }).collect();

        repair_tool_call_chains(&mut tail_msgs);
        chat_messages.extend(tail_msgs);
    } else {
        // No prior compression — load all messages (existing behavior)
        let messages = storage.load_messages(&id.to_string(), 0).await
            .map_err(|e| AgentError::Other(format!("failed to load messages from storage: {}", e)))?;

        chat_messages = messages.into_iter().map(|m| {
            ChatMessage {
                id: m.id,
                role: m.role,
                content: m.content,
                media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(),
                timestamp: m.created_at,
                tool_call_id: m.tool_call_id,
                tool_name: m.tool_name,
                tool_calls: m.tool_calls
                    .and_then(|tc| serde_json::from_str::<Vec<crate::providers::ToolCall>>(&tc).ok())
                    .filter(|v| !v.is_empty()),
                source: m.source.and_then(|s| serde_json::from_str(&s).ok()),
            }
        }).collect();

        repair_tool_call_chains(&mut chat_messages);
    }

    // seq_counter from actual DB max
    let max_seq = storage
        .get_max_message_seq(&id.to_string())
        .await
        .unwrap_or(0);
    let seq_counter = max_seq + 1;
    let total_message_count = session_meta.message_count;

    Ok(Self {
        id: id.clone(),
        title: session_meta.title,
        created_at: session_meta.created_at,
        last_active_at: session_meta.last_active_at,
        message_count: session_meta.message_count,
        total_message_count,
        messages: chat_messages,
        seq_counter,
        provider_config: provider_config.clone(),
        provider: provider.clone(),
        tools,
        compressor,
        storage: Some(storage),
        routing_info: session_meta.routing_info.unwrap_or_default(),
        last_consolidated_at: session_meta.last_consolidated_at,
        last_compressed_message_at: session_meta.last_compressed_message_at,
        memory_manager,
    })
}

Step 2: 编译检查

cargo check 2>&1

Step 3: Commit

git add src/session/session.rs
git commit -m "feat(session): incremental recovery from storage using compressed timeline"

Task 6: 系统提示词加历史会话提示

Files:

  • Modify: src/agent/system_prompt.rs:289-304 (MemorySection)
  • Modify: src/agent/system_prompt.rs:16-23 (PromptContext)
  • Modify: src/agent/system_prompt.rs:343-358 (build_system_prompt free function)
  • Modify: src/session/session.rs:411-426 (build_system_prompt)

Step 1: PromptContext 加 has_compressed_history 字段

pub struct PromptContext<'a> {
    pub workspace_dir: &'a Path,
    pub model_name: &'a str,
    pub tools: &'a ToolRegistry,
    pub session_id: Option<&'a str>,
    pub memory_context: Option<&'a str>,
    pub has_compressed_history: bool,
}

Step 2: 加 HistorySection

在 MemorySection 后面添加:

pub struct HistorySection;

impl PromptSection for HistorySection {
    fn name(&self) -> &str {
        "history"
    }

    fn build(&self, ctx: &PromptContext<'_>) -> String {
        if ctx.has_compressed_history {
            "## 历史会话\n之前的对话摘要已归档。如需回顾历史上下文,使用 `timeline_recall` 工具搜索。".to_string()
        } else {
            String::new()
        }
    }
}

Step 3: 注册到 SystemPromptBuilder::with_defaults

with_defaults() 的 sections vec 中 Box::new(MemorySection) 后加 Box::new(HistorySection)

Step 4: 更新 build_system_prompt 签名和调用

pub fn build_system_prompt(
    workspace_dir: &Path,
    model_name: &str,
    tools: &ToolRegistry,
    session_id: Option<&str>,
    memory_context: Option<&str>,
    has_compressed_history: bool,
) -> String {
    let ctx = PromptContext {
        workspace_dir,
        model_name,
        tools,
        session_id,
        memory_context,
        has_compressed_history,
    };
    SystemPromptBuilder::with_defaults().build(&ctx)
}

Step 5: 更新 Session::build_system_prompt

pub fn build_system_prompt(&self, skills_prompt: &str, memory_context: Option<&str>) -> String {
    let base_prompt = build_system_prompt(
        &self.provider_config.workspace_dir,
        &self.provider_config.model_id,
        &self.tools,
        Some(&self.id.to_string()),
        memory_context,
        self.last_compressed_message_at.is_some(),
    );

    if skills_prompt.trim().is_empty() {
        base_prompt
    } else {
        format!("{}\n\n## Skills\n\n{}\n\nUse the `get_skill` tool to load a skill's full content when needed.", base_prompt, skills_prompt)
    }
}

Step 6: 更新所有其他 build_system_prompt 调用方

搜索 build_system_prompt( 的所有调用位置,每个都要加 false 参数。主要有 agent/agent_loop.rs 中的两个调用。

Step 7: 编译检查

cargo check 2>&1

Step 8: Commit

git add src/agent/system_prompt.rs src/session/session.rs src/agent/agent_loop.rs
git commit -m "feat(system-prompt): add history section for archived conversation context"

Task 7: handle_message 和 /compact 记录压缩标记

Files:

  • Modify: src/session/session.rs:1339-1355 (handle_message 压缩后)
  • Modify: src/session/session.rs:1372-1376 (handle_message 溢出重试)
  • Modify: src/session/session.rs:851-872 (/compact 命令)

Step 1: handle_message 正常流

compress_if_needed(history).await? 之后line 1346改为

let result = session_guard.compressor
    .compress_if_needed(history)
    .await?;
if result.created_timelines {
    session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
    if let Err(e) = session_guard.persist_session_meta().await {
        tracing::warn!(error = %e, "Failed to persist compressed message marker");
    }
}
let mut history = result.history;

同时删除后面line 1350-1355单独的 persist_session_meta 调用(现在已合入上面的逻辑)。

Step 2: handle_message 溢出重试流

let raw = session_guard.get_history().to_vec();
let result = session_guard.compressor.compress_if_needed(raw).await?;
if result.created_timelines {
    session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
    let _ = session_guard.persist_session_meta().await;
}
let mut retry = result.history;
retry.insert(0, ChatMessage::system(system_prompt));
agent.process(retry).await?

Step 3: /compact 命令

let result = session_guard.compressor
    .compress_if_needed(history)
    .await?;
let compressed_count = result.history.len();
if result.created_timelines {
    session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
    let _ = session_guard.persist_session_meta().await;
}
session_guard.clear_history();
for msg in result.history {
    session_guard.add_message(msg, false).await
        .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
}

同时确认 compress_if_needed 的 import 正常(已在 scope 中)。

Step 4: 编译检查

cargo check 2>&1

Step 5: Commit

git add src/session/session.rs
git commit -m "feat(session): record last_compressed_message_at after compression"

Task 8: 全局编译和测试

Step 1: 全局编译

cargo check 2>&1

修复所有编译错误,确保全部文件一致。

Step 2: 运行单元测试

cargo test --lib 2>&1

Step 3: 测试通过后 commit

git add -A
git commit -m "chore: fix remaining compilation and test issues for incremental recovery"

Step 4: 运行 lint

cargo clippy --lib 2>&1 | head -50

修复任何 warning。


Task 9: 验证 & 提交设计文档

Step 1: 最终验证

cargo test --lib 2>&1

Step 2: Commit 设计文档

git add docs/plans/2026-05-10-incremental-session-recovery-design.md
git commit -m "docs: add incremental session recovery design doc"