diff --git a/PERSISTENCE.md b/PERSISTENCE.md new file mode 100644 index 0000000..70e5007 --- /dev/null +++ b/PERSISTENCE.md @@ -0,0 +1,300 @@ +# PicoBot 持久化设计说明 + +本文档介绍 PicoBot 当前的会话持久化实现,目标读者是需要维护或集成该模块的技术人员。 + +## 1. 总览 + +PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据库文件: + +- 默认路径:`~/.picobot/storage/sessions.db` +- 初始化入口:`SessionStore::new()` +- 核心实现:`src/storage/mod.rs` + +数据库启动时会完成以下初始化: + +- 打开 SQLite 连接 +- 创建父目录 +- 打开 WAL 模式 +- 打开外键约束 +- 自动建表和建索引 + +当前持久化只覆盖两类核心数据: + +- `sessions`:会话元数据 +- `messages`:会话内的消息流水 + +内存中的 `Session` 负责运行态处理,SQLite 负责跨进程、跨重启保留历史。整体设计是“内存缓存 + SQLite 事实来源”。 + +## 2. 持久化在系统中的位置 + +相关模块职责如下: + +- `src/gateway/session.rs` + - 管理运行时 `Session` + - 在收到消息时确保持久化会话存在 + - 首次访问某个 `chat_id` 时从数据库加载历史 + - 在新消息产生后同时写入数据库和内存历史 +- `src/storage/mod.rs` + - 封装 SQLite 访问 + - 提供会话和消息的增删改查 +- `src/bus/message.rs` + - 定义持久化消息结构 `ChatMessage` +- `src/providers/*` + - 将历史消息转换为不同 LLM provider 需要的格式 + +典型关系如下: + +1. 网关收到用户消息。 +2. `SessionManager` 定位到对应 channel 的运行时 `Session`。 +3. `Session::ensure_persistent_session(chat_id)` 确保数据库里有对应会话记录。 +4. `Session::ensure_chat_loaded(chat_id)` 在内存中没有历史时,从 `messages` 表加载该会话全部历史。 +5. 新的用户消息先写入 `messages`,再放入内存历史。 +6. Agent 执行后产生的 assistant/tool 消息按实际顺序继续写入 `messages`。 +7. 下次进程重启或 session 过期后,可从数据库完整恢复上下文。 + +## 3. 会话标识规则 + +数据库中的会话主键并不总是随机 UUID,而是依据 channel 类型区分: + +- CLI 会话:`session_id == chat_id` +- 非 CLI 会话:`session_id = "{channel_name}:{chat_id}"` + +这套规则由 `persistent_session_id(channel_name, chat_id)` 统一生成,目的是: + +- 对 CLI 支持显式创建、切换和管理多个会话 +- 对外部渠道(例如飞书)让同一个 chat 稳定映射到同一条持久化会话 + +## 4. 表结构 + +### 4.1 sessions + +保存会话级元数据,每条记录代表一个可被恢复的历史会话。 + +字段说明: + +| 字段 | 类型 | 含义 | 当前用途 | +| --- | --- | --- | --- | +| `id` | `TEXT PRIMARY KEY` | 会话主键 | 作为会话唯一标识,被 `messages.session_id` 引用 | +| `title` | `TEXT NOT NULL` | 会话标题 | CLI 展示、重命名 | +| `channel_name` | `TEXT NOT NULL` | 来源渠道名 | 例如 `cli`、`feishu` | +| `chat_id` | `TEXT NOT NULL` | 渠道侧会话标识 | 用于恢复和路由到同一聊天 | +| `summary` | `TEXT` | 会话摘要 | 预留字段,当前 schema 中存在,但当前代码未写入实际摘要 | +| `created_at` | `INTEGER NOT NULL` | 创建时间 | 毫秒级 Unix 时间戳 | +| `updated_at` | `INTEGER NOT NULL` | 最近元数据更新时间 | 重命名、归档、追加消息时更新 | +| `last_active_at` | `INTEGER NOT NULL` | 最近活跃时间 | 追加消息、清空历史时更新,用于排序 | +| `archived_at` | `INTEGER` | 归档时间 | 非空表示会话已归档 | +| `deleted_at` | `INTEGER` | 删除时间 | 预留字段,当前读取逻辑会过滤该字段,但当前删除实现是物理删除 | +| `message_count` | `INTEGER NOT NULL DEFAULT 0` | 消息数 | 追加消息时自增,清空历史时重置 | + +索引: + +- `idx_sessions_channel_archived(channel_name, archived_at, last_active_at DESC)` + - 用于按渠道列出会话,并支持过滤归档态和按最近活跃排序 +- `idx_sessions_updated_at(updated_at DESC)` + - 用于最近更新时间维度的查询优化 + +### 4.2 messages + +保存会话中的消息流水。这里的“消息”不仅包括用户和助手文本,还包括工具调用结果。 + +字段说明: + +| 字段 | 类型 | 含义 | 当前用途 | +| --- | --- | --- | --- | +| `id` | `TEXT PRIMARY KEY` | 消息唯一标识 | 对应 `ChatMessage.id` | +| `session_id` | `TEXT NOT NULL` | 所属会话 | 外键指向 `sessions.id` | +| `seq` | `INTEGER NOT NULL` | 会话内顺序号 | 保证同一会话消息顺序稳定 | +| `role` | `TEXT NOT NULL` | 消息角色 | 典型值为 `user`、`assistant`、`system`、`tool` | +| `content` | `TEXT NOT NULL` | 消息正文 | 文本内容或工具结果文本 | +| `media_refs_json` | `TEXT NOT NULL` | 媒体引用列表 JSON | 存储附件、本地文件路径等上下文引用 | +| `tool_call_id` | `TEXT` | 工具调用 ID | 仅 `role=tool` 时通常有值,用来关联某次工具结果对应哪一个 tool call | +| `tool_name` | `TEXT` | 工具名称 | 例如 `calculator`、`file_write` | +| `tool_calls_json` | `TEXT` | assistant 发起的工具调用列表 JSON | 仅 assistant 发出工具调用时有值 | +| `created_at` | `INTEGER NOT NULL` | 消息创建时间 | 毫秒级 Unix 时间戳 | + +约束和索引: + +- 外键:`FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE` +- 唯一约束:`UNIQUE(session_id, seq)`,确保同一会话内顺序号不重复 +- 索引: + - `idx_messages_session_seq(session_id, seq)`,按顺序读取历史 + - `idx_messages_session_created(session_id, created_at)`,按时间维度检索 + +## 5. 字段与运行时结构的映射 + +持久化层存储的消息对象是 `ChatMessage`,关键映射关系如下: + +| `ChatMessage` 字段 | 对应数据库字段 | 说明 | +| --- | --- | --- | +| `id` | `messages.id` | 消息唯一 ID | +| `role` | `messages.role` | 消息角色 | +| `content` | `messages.content` | 文本主体 | +| `media_refs` | `messages.media_refs_json` | 序列化为 JSON 数组 | +| `timestamp` | `messages.created_at` | 时间戳 | +| `tool_call_id` | `messages.tool_call_id` | 工具结果与调用的关联 ID | +| `tool_name` | `messages.tool_name` | 工具名 | +| `tool_calls` | `messages.tool_calls_json` | assistant 发起的工具调用列表 | + +设计上分成 `tool_call_id` 和 `tool_calls_json` 两种字段,是因为两者表达的是不同方向的信息: + +- `tool_calls_json` 表示“assistant 想调用哪些工具” +- `tool_call_id` 表示“这一条 tool 结果是在回应哪一次工具调用” + +## 6. 数据写入流程 + +### 6.1 创建会话 + +有两种进入方式: + +- CLI 模式调用 `create_cli_session()` 显式创建会话 +- 渠道消息进入时调用 `ensure_channel_session()` 自动创建或复用会话 + +创建时会写入 `sessions` 表,初始状态: + +- `summary = NULL` +- `archived_at = NULL` +- `deleted_at = NULL` +- `message_count = 0` + +### 6.2 追加消息 + +消息持久化统一走 `append_message()`,写入过程是一个 SQLite 事务: + +1. 查询当前会话 `MAX(seq) + 1` 作为下一条消息顺序。 +2. 将 `media_refs` 序列化为 `media_refs_json`。 +3. 将 `tool_calls` 序列化为 `tool_calls_json`。 +4. 插入一条 `messages` 记录。 +5. 更新 `sessions.message_count`、`updated_at`、`last_active_at`。 +6. 将 `sessions.archived_at` 置空。 +7. 提交事务。 + +其中第 6 步很重要:归档会话一旦收到新消息,会自动恢复为活跃态。 + +### 6.3 读取历史 + +`load_messages(session_id)` 会按 `seq ASC` 读取整个消息历史,并把 JSON 字段反序列化回 `ChatMessage`。 + +因此它恢复的是“逻辑顺序”,而不是简单按创建时间排序。只要 `seq` 连续,重放顺序就稳定。 + +## 7. 典型时序 + +### 7.1 普通问答 + +1. 用户消息进入网关。 +2. 如果数据库中没有对应会话,先插入一条 `sessions`。 +3. 用户消息写入 `messages`,`role = user`。 +4. Agent 基于历史生成回复。 +5. assistant 回复写入 `messages`,`role = assistant`。 +6. 会话的 `message_count` 增加 2,`last_active_at` 更新时间。 + +### 7.2 带工具调用的问答 + +1. assistant 先生成一条带 `tool_calls_json` 的消息,`role = assistant`。 +2. 系统执行对应工具。 +3. 每个工具结果作为独立消息写入 `messages`,`role = tool`。 +4. 这些 `tool` 消息会带 `tool_call_id` 和 `tool_name`。 +5. assistant 最终整理工具结果后再写入一条普通回复。 + +这样保存后,即使进程重启,后续仍能完整恢复: + +- assistant 当时发起了哪些工具调用 +- 每个工具调用返回了什么 +- 最终 assistant 给了什么结论 + +## 8. 会话生命周期操作 + +### 8.1 重命名 + +`rename_session(session_id, title)`: + +- 更新 `sessions.title` +- 更新 `sessions.updated_at` + +### 8.2 归档 + +`archive_session(session_id)`: + +- 将 `sessions.archived_at` 设为当前时间 +- 更新 `sessions.updated_at` +- 不删除消息数据 + +列出会话时: + +- `include_archived = false` 只返回 `archived_at IS NULL` 的会话 +- `include_archived = true` 返回全部未删除会话 + +### 8.3 清空消息 + +`clear_messages(session_id)`: + +- 删除该会话在 `messages` 中的所有记录 +- 将 `sessions.message_count` 重置为 0 +- 更新 `updated_at` 和 `last_active_at` +- 保留会话本身 + +这适合“保留会话入口,但丢弃聊天内容”的场景。 + +### 8.4 删除会话 + +`delete_session(session_id)`: + +- 显式删除 `messages` +- 再删除 `sessions` + +虽然表结构中存在 `deleted_at` 字段,并且查询时也会过滤 `deleted_at IS NULL`,但当前实现并没有做软删除,而是直接物理删除。换句话说: + +- `deleted_at` 当前是保留字段 +- 如果后续需要回收站或审计恢复,可以基于它演进成软删除 + +## 9. 并发与一致性 + +当前 `SessionStore` 的一致性策略比较直接: + +- 进程内使用 `Arc>` 保护单连接访问 +- 追加消息时使用 SQLite 事务 +- 单条消息的写入与会话计数更新在同一事务中完成 + +这意味着: + +- 对单进程场景,消息顺序和 `message_count` 是一致的 +- `seq` 通过事务内 `MAX(seq) + 1` 分配,避免同一连接并发下的顺序错乱 +- WAL 模式提升读取和写入并存时的稳定性 + +需要注意的是,当前设计主要面向单进程本地运行。如果未来要扩展到多进程或多实例共享同一数据库,需要重新评估: + +- 单连接模型 +- `MAX(seq) + 1` 的扩展性 +- 会话加载缓存和跨实例同步 + +## 10. 当前实现中的保留点 + +下面这些字段或能力已经在 schema 中出现,但还没有完整业务闭环: + +- `sessions.summary` + - 当前代码没有把 `ContextCompressor` 产出的摘要写回数据库 + - 目前摘要只参与运行时上下文压缩,不参与持久化 +- `sessions.deleted_at` + - 当前查询逻辑兼容软删除 + - 当前删除实现仍然是物理删除 + +这说明当前 schema 已经为“会话摘要”和“软删除”预留了演进空间,但并未完全落地。 + +## 11. 给维护者的快速判断指南 + +如果你要排查持久化问题,可以先按下面的思路判断: + +- 会话查不到:先看 `persistent_session_id` 是否和实际 `channel_name/chat_id` 一致 +- 重启后没历史:检查 `ensure_chat_loaded()` 调用链,以及数据库文件路径是否正确 +- 消息顺序不对:检查 `messages.seq` +- 工具调用上下文异常:同时检查 `tool_calls_json` 和 `tool_call_id` +- 会话列表里看不到记录:检查 `archived_at` 和 `include_archived` 参数 +- 清空后仍有上下文:确认是内存历史没清掉,还是数据库 `messages` 没删掉 + +## 12. 总结 + +PicoBot 当前的持久化设计比较克制,核心目标只有两个: + +- 让同一会话在重启后可以恢复上下文 +- 让工具调用链可以被完整回放 + +从实现上看,它不是通用 ORM,也不是复杂事件存储,而是一层针对聊天历史的轻量 SQLite 封装。对于当前单机、单进程、聊天驱动的运行方式,这个设计足够直接,也便于继续演进。 \ No newline at end of file diff --git a/src/storage/mod.rs b/src/storage/mod.rs index f1d53c0..cf5c58e 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -98,10 +98,6 @@ impl SessionStore { ", )?; - if !table_has_column(&conn, "messages", "tool_calls_json")? { - conn.execute("ALTER TABLE messages ADD COLUMN tool_calls_json TEXT", [])?; - } - Ok(Self { conn: Arc::new(Mutex::new(conn)), }) @@ -507,19 +503,4 @@ mod tests { assert_eq!(messages[0].tool_name.as_deref(), Some("file_write")); assert!(messages[0].tool_calls.is_none()); } -} - -fn table_has_column(conn: &Connection, table: &str, column: &str) -> Result { - let pragma = format!("PRAGMA table_info({})", table); - let mut stmt = conn.prepare(&pragma)?; - let mut rows = stmt.query([])?; - - while let Some(row) = rows.next()? { - let name: String = row.get(1)?; - if name == column { - return Ok(true); - } - } - - Ok(false) } \ No newline at end of file