fix: advance last_consolidated_at after compression, update docs

This commit is contained in:
xiaoxixi 2026-05-08 21:56:05 +08:00
parent 81e9f1e7db
commit 488e10dceb
5 changed files with 438 additions and 435 deletions

View File

@ -8,9 +8,10 @@
## Config ## Config
- Config file: `~/.picobot/config.json` or `./config.json` (fallback order, see `src/config/mod.rs:213`) - Config load order: `~/.picobot/config.json` then fallback to `./config.json` (`src/config/mod.rs:237-267`)
- `.env` is loaded manually (not via dotenv crate); env var placeholders `<VAR_NAME>` in config JSON are substituted - `.env` (cwd) is loaded with a custom parser, not via dotenv crate; env var placeholders `<VAR_NAME>` in config JSON are substituted
- Config example: `config.example.json` - Config example: `config.example.json`
- `session_ttl_hours` defaults to 4 in code when absent (`src/gateway/mod.rs:44`); config.example.json shows 168 as a suggestion
## Tests ## Tests
@ -52,6 +53,7 @@ Channel → MessageBus → SessionManager → AgentLoop → (tools) → SessionM
| `tools` | Agent tools (bash, file ops, http, web, get_skill) | `ToolRegistry`, `Tool` trait | | `tools` | Agent tools (bash, file ops, http, web, get_skill) | `ToolRegistry`, `Tool` trait |
| `skills` | Skills loading, management, and prompt building | `SkillsLoader`, `Skill` | | `skills` | Skills loading, management, and prompt building | `SkillsLoader`, `Skill` |
| `storage` | SQLite persistence for sessions and messages | `Storage`, `SessionMeta`, `MessageMeta` | | `storage` | SQLite persistence for sessions and messages | `Storage`, `SessionMeta`, `MessageMeta` |
| `scheduler` | Cron-based job scheduling, next-run computation | `Scheduler`, `Schedule`, `next_run_for_schedule()` |
| `observability` | Observer pattern for agent/tool telemetry events | `Observer` trait, `ObserverEvent`, `MultiObserver` | | `observability` | Observer pattern for agent/tool telemetry events | `Observer` trait, `ObserverEvent`, `MultiObserver` |
| `protocol` | WebSocket protocol message types | `WsInbound`, `WsOutbound`, `SessionSummary` | | `protocol` | WebSocket protocol message types | `WsInbound`, `WsOutbound`, `SessionSummary` |
| `config` | Config loading, env substitution, path resolution | `Config`, `LLMProviderConfig` | | `config` | Config loading, env substitution, path resolution | `Config`, `LLMProviderConfig` |

422
README.md
View File

@ -0,0 +1,422 @@
# PicoBot
A multi-channel AI agent framework with a WebSocket gateway and TUI client, supporting OpenAI-compatible and Anthropic LLM providers, tool calling, session persistence, and cron-based scheduling.
## System Architecture
```mermaid
graph TB
subgraph Clients
TUI["🖥️ CLI Chat (TUI)"]
FS["📱 Feishu/Lark"]
end
subgraph Gateway["Gateway Server (127.0.0.1:19876)"]
HTTP["HTTP Endpoints<br/>GET /health<br/>GET /ws (WebSocket upgrade)"]
WS["WebSocket Handler"]
CD["ChannelManager"]
SP["SessionManager"]
AL["AgentLoop"]
end
subgraph Bus["MessageBus"]
IB["Inbound Channel"]
OB["Outbound Channel"]
CC["Control Channel"]
end
subgraph Storage
SQLite[("SQLite<br/>.picobot_sessions.db")]
end
subgraph AI["AI Providers"]
OpenAI["OpenAI / DashScope"]
Anthropic["Anthropic Claude"]
end
TUI <-->|WebSocket| WS
FS <-->|Webhook| HTTP
CD -->|InboundMessage| IB
IB -->|DialogEvent| SP
CC -->|ControlMessage| SP
SP <--> AL
AL -->|API Call| OpenAI
AL -->|API Call| Anthropic
AL -->|Tool Call| Tools
SP -->|OutboundMessage| OB
OB --> CD
SP --> SQLite
Tools --> SQLite
subgraph Tools
Bash["Bash"]
FileIO["File Read/Write/Edit"]
Web["HTTP Request / Web Fetch"]
Calc["Calculator"]
Skill["Get Skill"]
Msg["Send Message"]
Cron["Cron Jobs"]
end
```
### Core Data Flow
```mermaid
sequenceDiagram
participant Channel as Channel<br/>(CLI/Feishu)
participant Bus as MessageBus
participant SM as SessionManager
participant AL as AgentLoop
participant LLM as LLM Provider
participant Tool as Tools
Channel->>Bus: InboundMessage (user input)
Bus->>SM: DialogEvent
SM->>SM: Load/Resolve Session
SM->>AL: Process (session state)
AL->>LLM: ChatCompletionRequest
LLM-->>AL: response / tool_calls
alt Tool Calls
AL->>Tool: execute tool
Tool-->>AL: result
AL->>LLM: continue with tool result
end
AL-->>SM: AgentProcessResult (text + token count)
SM->>SM: Persist to SQLite
SM->>Bus: OutboundMessage
Bus->>Channel: response to user
```
## Features
### Multi-Channel Support
- **CLI Chat Client** — Full TUI with session management, Markdown rendering, slash commands
- **Feishu (Lark)** — Webhook-based integration with typing indicators and media support
### Multi-Provider LLM
- OpenAI-compatible API (GPT-4, DashScope, Volcengine, etc.)
- Anthropic Messages API (Claude)
- Cross-provider JSON Schema normalization for tool calling compatibility
### Session Management
- Multi-session conversations per channel/chat
- Create, switch, rename, archive, delete dialogs via slash commands or WebSocket
- SQLite-persisted session history with automatic TTL-based cleanup
- Context compression for long conversations approaching token limits
### Tool System
| Tool | Description |
|------|-------------|
| `bash` | Execute shell commands in workspace |
| `file_read` | Read file contents |
| `file_write` | Create/overwrite files |
| `file_edit` | Precise string substitution in files |
| `http_request` | Make HTTP API requests |
| `web_fetch` | Fetch and parse web pages |
| `calculator` | Evaluate mathematical expressions |
| `get_skill` | Load agent skills from local skill files |
| `send_message` | Send messages to other channels |
| `cron_add/list/remove/enable/disable/update` | Manage scheduled jobs |
### Scheduling
- Cron-based recurring jobs with optional timezone support
- One-shot (`at`) and interval (`every`) schedules
- Jobs trigger agent processing via specified channel/chat
### Skills System
- Load Markdown skill files from `~/.picobot/skills` and `~/.agent/skills`
- Skills inject specialized system prompts for specific tasks
- Automatic hot-reload on file changes
### Observability
- Observer pattern for agent and tool telemetry
- Events: `AgentStart`, `AgentEnd`, `ToolCallStart`, `ToolCall`
- Structured JSON logging with file rotation
## Quick Start
### Prerequisites
- Rust nightly (edition 2024) — use `rustup` to install
### Build
```bash
cargo build
```
### Configure
1. Create `config.json` (or `~/.picobot/config.json`):
```json
{
"providers": {
"openai": {
"type": "openai",
"base_url": "https://api.openai.com/v1",
"api_key": "<OPENAI_API_KEY>"
}
},
"models": {
"gpt-4o": {
"model_id": "gpt-4o",
"temperature": 0.7,
"max_tokens": 4096
}
},
"agents": {
"default": {
"provider": "openai",
"model": "gpt-4o",
"max_tool_iterations": 20,
"token_limit": 128000
}
}
}
```
2. Set API keys via `.env` file (one `KEY=VALUE` per line):
```env
OPENAI_API_KEY=sk-xxxxx
```
### Run
**Start gateway server:**
```bash
cargo run -- gateway
```
Binds `127.0.0.1:19876` by default. Override with `--host` and `--port`.
**Connect CLI client:**
```bash
cargo run -- chat
```
Connects to `ws://127.0.0.1:19876/ws`. Override with `--gateway-url`.
## Configuration Reference
Config load order: `~/.picobot/config.json``./config.json` (fallback).
### Full Config Structure
```mermaid
graph LR
Config["config.json"]
Config --> Providers["providers<br/>ProviderConfig{}"]
Config --> Models["models<br/>ModelConfig{}"]
Config --> Agents["agents<br/>AgentConfig{}"]
Config --> Gateway["gateway<br/>GatewayConfig"]
Config --> Client["client<br/>ClientConfig"]
Config --> Channels["channels<br/>ChannelConfig{}"]
Config --> Workspace["workspace_dir"]
Providers --> PT["type (openai / anthropic)<br/>base_url<br/>api_key<br/>extra_headers"]
Models --> MT["model_id<br/>temperature<br/>max_tokens"]
Agents --> AT["provider (ref)<br/>model (ref)<br/>max_tool_iterations<br/>token_limit"]
Gateway --> GT["host / port<br/>session_ttl_hours<br/>cleanup_interval_minutes<br/>session_db_path<br/>scheduler"]
Channels --> CT["feishu: app_id, app_secret<br/>allow_from, agent, media_dir"]
```
### Environment Variables
The `.env` file in the working directory is loaded manually (not via dotenv crate). Placeholders in `config.json` written as `<VAR_NAME>` are substituted at load time.
### Gateway Config
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `host` | string | `127.0.0.1` | Bind address |
| `port` | u16 | `19876` | Listen port |
| `session_ttl_hours` | number | `4` | Inactive session expiration (hours) |
| `cleanup_interval_minutes` | number | `60` | Session cleanup interval |
| `session_db_path` | string | workspace `.picobot_sessions.db` | SQLite database path |
| `scheduler.enabled` | bool | `false` | Enable cron scheduler |
### Agent Config
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| `provider` | string | — | Provider name (key in `providers`) |
| `model` | string | — | Model name (key in `models`) |
| `max_tool_iterations` | number | `20` | Max tool call iterations per turn |
| `token_limit` | number | `128000` | Context window token limit |
## Slash Commands
Available in CLI chat and Feishu:
| Command | Alias | Description |
|---------|-------|-------------|
| `/new` | `/刷新` | Create a new dialog |
| `/list` | `/对话列表` | List all dialogs |
| `/switch <id>` | — | Switch to a dialog |
| `/rename <title>` | — | Rename current dialog |
| `/archive` | — | Archive current dialog |
| `/delete` | — | Delete current dialog |
| `/clear` | `/清空` | Clear current dialog history |
## WebSocket Protocol
The gateway exposes a WebSocket endpoint at `/ws`. Messages use typed JSON with a `type` discriminator field.
### Client → Server (WsInbound)
| Type | Fields |
|------|--------|
| `user_input` | `content`, `channel?`, `chat_id?`, `sender_id?` |
| `create_session` | `title?` |
| `list_sessions` | `include_archived` |
| `load_session` | `session_id` |
| `rename_session` | `session_id?`, `title` |
| `archive_session` | `session_id?` |
| `delete_session` | `session_id?` |
| `clear_history` | `chat_id?`, `session_id?` |
| `get_slash_commands` | — |
| `ping` | — |
### Server → Client (WsOutbound)
| Type | Fields |
|------|--------|
| `assistant_response` | `session_id`, `response`, `tokens_used?`, `tool_calls?` |
| `session_list` | `sessions[]` |
| `session_loaded` | `session_id`, `messages[]` |
| `session_created` | `session_id`, `title` |
| `session_renamed` | `session_id`, `title` |
| `session_archived` | `session_id` |
| `session_deleted` | `session_id` |
| `slash_commands` | `commands[]` |
| `error` | `message` |
| `pong` | — |
## HTTP Endpoints
| Method | Path | Description |
|--------|------|-------------|
| `GET` | `/health` | Health check — returns `{"status":"ok","version":"x.y.z"}` |
| `GET` | `/ws` | WebSocket upgrade for chat clients |
## Testing
```bash
# Unit tests (no external dependencies)
cargo test --lib
# Integration tests (require API keys)
cp tests/test.env.example tests/test.env
# Fill in your API keys in tests/test.env
cargo test --test test_integration -- --ignored
cargo test --test test_tool_calling -- --ignored
cargo test --test test_request_format -- --ignored
# Run all tests
cargo test -- --ignored
```
Integration tests are `#[ignore]` by default because they make real API calls.
## Project Structure
```
├── src/
│ ├── main.rs # CLI entrypoint (clap-based subcommands)
│ ├── lib.rs # Module declarations
│ ├── gateway/ # HTTP/WS server, GatewayState initialization
│ │ ├── mod.rs
│ │ ├── http.rs # Health endpoint
│ │ └── ws.rs # WebSocket handler
│ ├── client/ # TUI chat client
│ │ ├── mod.rs
│ │ └── tui/ # Ratatui-based terminal UI
│ ├── channels/ # Channel integrations
│ │ ├── base.rs # Channel trait
│ │ ├── cli_chat.rs # CLI WebSocket channel
│ │ ├── feishu.rs # Feishu/Lark webhook channel
│ │ ├── manager.rs # ChannelManager
│ │ └── slash_command.rs # Slash command parser
│ ├── bus/ # Async message bus
│ │ ├── mod.rs # MessageBus (tokio mpsc channels)
│ │ ├── message.rs # Message types
│ │ └── dispatcher.rs # OutboundDispatcher
│ ├── session/ # Session & dialog management
│ │ ├── mod.rs
│ │ ├── session.rs # Session, SessionManager
│ │ ├── session_id.rs # UnifiedSessionId
│ │ ├── commands.rs # SessionCommand enum
│ │ └── events.rs # SessionEvent, DialogInfo
│ ├── agent/ # LLM interaction loop
│ │ ├── mod.rs
│ │ ├── agent_loop.rs # AgentLoop (stateless)
│ │ ├── context_compressor.rs # Token estimation & summarization
│ │ └── system_prompt.rs # System prompt builder
│ ├── providers/ # LLM API clients
│ │ ├── mod.rs # Factory: create_provider()
│ │ ├── traits.rs # LLMProvider trait
│ │ ├── openai.rs # OpenAI-compatible client
│ │ └── anthropic.rs # Anthropic Messages API client
│ ├── tools/ # Agent tools
│ │ ├── mod.rs # create_default_tools()
│ │ ├── registry.rs # ToolRegistry
│ │ ├── traits.rs # Tool trait, ToolResult
│ │ ├── schema.rs # Cross-provider JSON Schema cleaner
│ │ ├── bash.rs # Shell command execution
│ │ ├── calculator.rs # Math expression evaluator
│ │ ├── chat_manager.rs # Session management tool
│ │ ├── cron.rs # Cron job management tools
│ │ ├── file_read.rs # File reader
│ │ ├── file_write.rs # File writer
│ │ ├── file_edit.rs # File editor (string substitution)
│ │ ├── get_skill.rs # Skill loader tool
│ │ ├── http_request.rs # HTTP request tool
│ │ ├── send_message.rs # Cross-channel messaging
│ │ └── web_fetch.rs # Web page fetcher
│ ├── skills/ # Skills loading from markdown files
│ │ └── mod.rs # SkillsLoader, Skill
│ ├── storage/ # SQLite persistence
│ │ ├── mod.rs # Storage, schema init
│ │ ├── session.rs # Session CRUD operations
│ │ ├── message.rs # Message persistence
│ │ ├── scheduler.rs # ScheduledJob, JobRun storage
│ │ └── error.rs # StorageError
│ ├── scheduler/ # Cron scheduler runtime
│ │ ├── mod.rs # Scheduler, next_run_for_schedule()
│ │ └── types.rs # Schedule enum (At/Every/Cron)
│ ├── observability/ # Telemetry observer pattern
│ │ └── mod.rs # Observer trait, ObserverEvent, MultiObserver
│ ├── protocol.rs # WebSocket message types (WsInbound/WsOutbound)
│ ├── config/ # Config loading & env substitution
│ │ └── mod.rs # Config, LLMProviderConfig, load_env_file()
│ └── logging.rs # Tracing subscriber init with file rotation
├── tests/
│ ├── test_integration.rs # LLM provider integration tests
│ ├── test_tool_calling.rs # Tool calling integration tests
│ ├── test_request_format.rs # Request format tests
│ ├── test_scheduler.rs # Scheduler unit tests
│ ├── test.env.example # Test environment template
│ └── test.env # Actual test keys (gitignored)
├── reference/ # Third-party reference code (do not modify)
├── config.example.json # Full config example
└── Cargo.toml
```
## Key Dependencies
| Crate | Purpose |
|-------|---------|
| `axum` + `tokio-tungstenite` | HTTP server & WebSocket |
| `sqlx` (SQLite) | Session/Message/Job persistence |
| `reqwest` (rustls) | LLM API & external HTTP calls |
| `ratatui` + `crossterm` | Terminal UI |
| `clap` | CLI argument parsing |
| `tracing` + `tracing-subscriber` | Structured logging |
| `cron` + `chrono-tz` | Cron schedule parsing |
| `meval` | Mathematical expression evaluation |
| `uuid` | Session/Dialog ID generation |
| `dirs` | Platform config directory resolution |

View File

@ -1,433 +0,0 @@
# Session 管理详细设计
## 一、设计目标
1. Session 数据持久化到 SQLite系统重启后可恢复
2. 支持 Dialog 的完整生命周期管理(创建/列表/切换/重命名/删除)
3. 基于 TTL 的自动内存清理DB 保留所有数据)
4. LLM 自动生成会话标题title帮助用户和 AI 理解对话上下文
5. 每条消息实时写入 DB失败后重试 + 告警
---
## 二、概念定义
### 2.1 层级结构
```
Channel (渠道)
└── Chat (聊天)
└── Dialog (对话)
└── Session (会话实例)
```
- **Channel**:消息来源渠道(如 `cli_chat``feishu`
- **Chat**:同一渠道下的一个聊天会话(如某个 CLI session ID 或飞书 open_conversation_id
- **Dialog**:聊天内的多个独立对话线程(如 `/new` 创建的新对话)
- **Session**:一个 Dialog 的运行时实例包含消息历史、LLM 配置、工具等
### 2.2 UnifiedSessionId
```
格式:{channel}:{chat_id}:{dialog_id}
示例cli_chat:sid_abc123:dialog_xyz
feishu:oc_123456:default
```
| 字段 | 说明 |
|------|------|
| channel | 渠道标识 |
| chat_id | 聊天标识 |
| dialog_id | 对话标识(默认 `default` |
### 2.3 Session 与 Dialog 的关系
- 每个 Dialog 在运行时对应一个 `Session` 实例
- `Session` 存在于内存中,可通过 `UnifiedSessionId` 访问
- `Dialog` 是 Storage 中的持久化记录,`Session` 是其运行时投影
---
## 三、数据库 Schema
### 3.1 sessions 表
```sql
CREATE TABLE sessions (
id TEXT PRIMARY KEY,
channel TEXT NOT NULL,
chat_id TEXT NOT NULL,
dialog_id TEXT NOT NULL,
title TEXT NOT NULL,
created_at INTEGER NOT NULL,
last_active_at INTEGER NOT NULL,
message_count INTEGER DEFAULT 0,
deleted_at INTEGER,
UNIQUE(channel, chat_id, dialog_id)
);
```
> 注意:已删除 `archived_at` 字段,不保留归档概念。
### 3.2 messages 表
```sql
CREATE TABLE messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
seq INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
media_refs TEXT,
tool_call_id TEXT,
tool_name TEXT,
tool_calls TEXT,
created_at INTEGER NOT NULL,
FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE
);
CREATE INDEX idx_messages_session_seq ON messages(session_id, seq);
CREATE INDEX idx_sessions_chat ON sessions(channel, chat_id, deleted_at);
```
---
## 四、Storage API
### 4.1 Storage 职责
- 唯一的持久化 source of truth
- 由调用方GatewayState构造通过 `Arc<Storage>` 注入 `SessionManager`
- 所有写操作失败后重试 3 次100ms, 200ms, 300ms 退避),仍失败则触发系统通知告警
### 4.2 Session 操作
| 方法 | 说明 |
|------|------|
| `new(db_path) -> Storage` | 打开/创建数据库 |
| `upsert_session(meta) -> Result<(), StorageError>` | 插入或更新 session 元数据 |
| `get_session(id) -> Result<SessionMeta, StorageError>` | 获取单个 session |
| `list_sessions(channel, chat_id, limit) -> Result<Vec<SessionMeta>>` | 最近 N 条(供 `/sessions` |
| `delete_session(id) -> Result<(), StorageError>` | 物理删除 session 及关联消息 |
| `touch_session(id, message_count, last_active_at)` | 更新计数和最后活跃时间 |
### 4.3 Message 操作
| 方法 | 说明 |
|------|------|
| `append_message(session_id, msg) -> Result<i64, StorageError>` | 追加单条消息,返回 seq |
| `append_messages(session_id, msgs) -> Result<Vec<i64>, StorageError>` | 批量追加 |
| `load_messages(session_id, from_seq) -> Result<Vec<MessageMeta>>` | 从指定 seq 加载消息 |
| `clear_messages(session_id) -> Result<(), StorageError>` | 清除消息(保留 session |
### 4.4 写入失败处理
```rust
async fn append_message_with_retry(&self, session_id: &str, msg: &MessageMeta) -> Result<i64, StorageError> {
let delays = [100, 200, 300];
for (i, delay) in delays.iter().enumerate() {
match self.append_message(session_id, msg) {
Ok(seq) => return Ok(seq),
Err(e) if i < delays.len() - 1 => {
sleep(Duration::from_millis(*delay)).await;
tracing::warn!("Storage write failed, retrying: {}", e);
}
Err(e) => {
// 全部重试失败后,通过 Session 发送系统通知
return Err(e);
}
}
}
unreachable!()
}
```
---
## 五、Session 结构
```rust
pub struct Session {
pub id: UnifiedSessionId,
pub title: String, // 会话标题(用户指定或 LLM 自动生成)
pub created_at: i64, // 创建时间ms
pub last_active_at: i64, // 最后活跃时间ms
pub message_count: i64, // 用户消息计数(触发 title 自动生成)
pub total_message_count: i64, // 含系统消息的总数
messages: Vec<ChatMessage>, // 内存中的消息历史(压缩后)
seq_counter: i64, // 下一个消息的 seq
provider_config: LLMProviderConfig,
provider: Arc<dyn LLMProvider>,
tools: Arc<ToolRegistry>,
compressor: ContextCompressor,
user_tx: mpsc::Sender<WsOutbound>,
storage: Arc<Storage>, // 持久化 sink
}
```
### 5.1 初始化流程
```
new() 或 from_storage()
注入 storage 引用
创建 provider, tools, compressor
从 Storage 加载 messagesfrom_seq = 0
设置 seq_counter = messages.len() + 1
返回 Session 实例
```
### 5.2 消息管理
```rust
pub async fn add_message(&mut self, msg: ChatMessage) -> Result<(), StorageError> {
// 1. 分配序号: seq = seq_counter; seq_counter += 1
// 2. 转换为 MessageMeta
// 3. 写入 Storage重试 + 告警)
// 4. 更新内存: messages.push(msg)
// 5. 更新计数: message_count / total_message_count += 1
// 6. 更新 last_active_at
}
```
### 5.3 系统通知接口(不记历史)
```rust
impl Session {
/// 发送系统通知(不记录进 session 历史)
pub async fn send_system_notification(&self, content: &str) {
let msg = WsOutbound::SystemNotification {
content: content.to_string(),
};
let _ = self.user_tx.send(msg).await;
}
}
```
### 5.4 Title 自动生成
调用时机:
1. Session 首次创建时(初始 title = "Dialog {dialog_id}"
2. `message_count` 达到阈值10 条)且 title 仍为默认值时,自动更新为 LLM 生成
3. 用户执行 `/rename` 命令手动更新
生成 Prompt
```
给定以下对话历史生成一个简短的会话标题5-15 个中文字符),
概括这个对话的核心内容或用户的主要需求。只返回一个标题,不要解释。
历史:
{messages}
```
---
## 六、SessionManager 设计
### 6.1 数据结构
```rust
pub struct SessionManager {
inner: Arc<Mutex<SessionManagerInner>>,
provider_config: LLMProviderConfig,
tools: Arc<ToolRegistry>,
skills_loader: Arc<SkillsLoader>,
storage: Arc<Storage>, // 由调用方注入
cleanup_interval: Duration,
}
struct SessionManagerInner {
sessions: HashMap<String, Arc<Mutex<Session>>>,
session_timestamps: HashMap<String, Instant>,
session_ttl: Duration,
}
```
### 6.2 handle_message 完整流程
```
handle_message(channel, sender_id, chat_id, dialog_id, content, media)
├── 1. 确定 UnifiedSessionId
│ │
│ ├── dialog_id 有值 → 直接使用
│ │
│ └── dialog_id 无值 → 查找 channel:chat_id 下最近活跃的 session
│ ├── 找到且未过期 → 使用该 session
│ └── 未找到或已过期 → 创建新 session
├── 2. 获取或创建 Session
│ 有 → 更新 session_timestamps
│ 无 → 从 Storage 恢复 或 创建新 Session
├── 3. 添加用户消息并持久化
│ seq = seq_counter; seq_counter += 1
│ Storage.append_message()(失败重试 → 仍失败则 send_system_notification
│ messages.push(user_msg)
│ message_count += 1
├── 4. 检查 title 自动生成条件
│ message_count == 10 → 调用 LLM 生成 → 更新 title → 写回 Storage
├── 5. 注入 skills_promptindex 0 之后)
├── 6. 新 session 注入欢迎消息(作为系统消息,不计入 message_count
├── 7. 上下文压缩(如需要)
├── 8. 调用 AgentLoop
├── 9. 保存 Agent 响应消息并持久化(同样流程)
└── 10. 返回最终响应
```
**欢迎消息**(仅新 session 创建时注入历史):
```
新对话已创建!会话 ID: {dialog_id}
使用 /sessions 查看所有对话,/switch <id> 切换对话。
```
### 6.3 Dialog 生命周期
| 操作 | 方法 | 说明 |
|------|------|------|
| 创建 | `create_dialog()` | 生成 dialog_id创建 Session写入 Storage |
| 列表 | `list_dialogs()` | 从 Storage 读取limit=10 |
| 切换 | `switch_dialog()` | 从 Storage 加载 session激活到内存 |
| 重命名 | `rename_dialog()` | 更新 Storage 和内存 title |
| 删除 | `delete_dialog()` | 删除内存 session + 删除 Storage 记录 |
| 软重置 | **已删除** | 用户直接 `/new` 开新 session |
### 6.4 TTL 清理
```rust
fn start_cleanup_task(&self) {
tokio::spawn(async move {
loop {
sleep(cleanup_interval).await;
self.run_cleanup().await;
}
});
}
async fn run_cleanup(&self) {
// 扫描 session_timestamps
// 超时的 session → 从内存 HashMap 移除
// Storage 中的 session 记录保留(用户切回可重新加载)
}
```
清理策略:
- 内存 session 超时 → 仅释放内存Storage 记录保留
- 用户切换回该 session → 从 Storage 重新加载到内存
---
## 七、斜杠命令
| 命令 | 触发词 | 说明 |
|------|--------|------|
| new | `/new [标题]` | 创建新 dialog |
| sessions | `/sessions` | 列出当前 chat 最近 10 条 dialog |
| switch | `/switch <dialog_id>` | 切换到指定 dialog |
| rename | `/rename <新标题>` | 重命名当前 dialog |
| delete | `/delete` | 删除当前 dialog内存 + Storage |
| compact | `/compact` | 手动触发上下文压缩 |
| info | `/info` | 显示当前 dialog 信息 |
---
## 八、错误处理
```rust
pub enum StorageError {
NotFound(String),
AlreadyExists(String),
Database(String),
Serialization(String),
}
```
| 场景 | 处理 |
|------|------|
| Storage 写入失败 | 重试 3 次 → 发送系统通知告警 |
| Storage 读取失败 | 若 session 在内存中,继续使用内存数据 |
| Session 不存在 | 创建新 session |
| 并发冲突 | SQLite transaction 保护 |
---
## 九、配置项
```json
{
"session": {
"ttl_hours": 24,
"cleanup_interval_minutes": 60,
"auto_title_after_n_messages": 10,
"storage_retry_delays_ms": [100, 200, 300]
}
}
```
---
## 十、文件结构
```
src/
├── storage/
│ ├── mod.rs # Storage 主模块
│ ├── session.rs # Session CRUD
│ ├── message.rs # Message CRUD
│ └── error.rs # StorageError
└── session/
├── mod.rs # 导出 Session, SessionManager
├── session.rs # Session, SessionManager 实现
├── session_id.rs # UnifiedSessionId已有
├── commands.rs # SessionCommand已有
├── events.rs # SessionEvent, DialogInfo已有
└── error.rs # SessionError已有
```
---
## 十一、实现顺序
### Phase 1: Storage 基础
1. `Storage` 结构和数据库初始化
2. Session CRUDupsert_session, get_session, list_sessions, delete_session
3. Message CRUDappend_message, load_messages
4. 写入失败重试逻辑
5. 单元测试
### Phase 2: Session 扩展
1. 扩展 `Session` 结构体(添加 storage 引用、计数字段、seq_counter
2. `add_message` 持久化集成
3. `send_system_notification` 接口
4. `from_storage()` 恢复逻辑
5. Title 自动生成 LLM 调用
### Phase 3: SessionManager 完善
1. 将 `Arc<Storage>` 集成到 `SessionManager`
2. 实现 `list_dialogs()`limit=10
3. 实现 `switch_dialog()`(从 Storage 加载)
4. 实现 `delete_dialog()`(内存 + Storage
5. 实现 `rename_dialog()`
6. 后台 TTL 清理任务
7. 集成测试
### Phase 4: 斜杠命令
1. 实现 `/sessions`(列出最近 10 条)
2. 实现 `/switch`
3. 实现 `/rename`
4. 实现 `/delete`
5. 端到端测试

View File

@ -96,6 +96,11 @@ impl ContextCompressor {
self.session_id = id; self.session_id = id;
} }
/// Always true — memory is always available (memory system is always on).
pub fn has_memory(&self) -> bool {
true
}
/// Get the compression threshold in tokens. /// Get the compression threshold in tokens.
fn threshold(&self) -> usize { fn threshold(&self) -> usize {
(self.context_window as f64 * self.threshold_ratio) as usize (self.context_window as f64 * self.threshold_ratio) as usize

View File

@ -1328,6 +1328,13 @@ impl SessionManager {
.compress_if_needed(history) .compress_if_needed(history)
.await?; .await?;
// Advance consolidation pointer — future compressions skip already-processed messages
let now = chrono::Utc::now().timestamp_millis();
session_guard.last_consolidated_at = Some(now);
if let Err(e) = session_guard.persist_session_meta().await {
tracing::warn!(error = %e, "Failed to persist consolidation timestamp");
}
let agent = session_guard.create_agent_with_notify(notify_tx)?; let agent = session_guard.create_agent_with_notify(notify_tx)?;
let result = agent.process(history).await?; let result = agent.process(history).await?;