From 488e10dcebcdc7949b5cf212fd24add1229dfc62 Mon Sep 17 00:00:00 2001 From: xiaoxixi Date: Fri, 8 May 2026 21:56:05 +0800 Subject: [PATCH] fix: advance last_consolidated_at after compression, update docs --- AGENTS.md | 6 +- README.md | 422 +++++++++++++++++++++++++++++++ session_plan.md | 433 -------------------------------- src/agent/context_compressor.rs | 5 + src/session/session.rs | 7 + 5 files changed, 438 insertions(+), 435 deletions(-) delete mode 100644 session_plan.md diff --git a/AGENTS.md b/AGENTS.md index d063b68..980798a 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -8,9 +8,10 @@ ## Config -- Config file: `~/.picobot/config.json` or `./config.json` (fallback order, see `src/config/mod.rs:213`) -- `.env` is loaded manually (not via dotenv crate); env var placeholders `` in config JSON are substituted +- Config load order: `~/.picobot/config.json` then fallback to `./config.json` (`src/config/mod.rs:237-267`) +- `.env` (cwd) is loaded with a custom parser, not via dotenv crate; env var placeholders `` in config JSON are substituted - Config example: `config.example.json` +- `session_ttl_hours` defaults to 4 in code when absent (`src/gateway/mod.rs:44`); config.example.json shows 168 as a suggestion ## Tests @@ -52,6 +53,7 @@ Channel → MessageBus → SessionManager → AgentLoop → (tools) → SessionM | `tools` | Agent tools (bash, file ops, http, web, get_skill) | `ToolRegistry`, `Tool` trait | | `skills` | Skills loading, management, and prompt building | `SkillsLoader`, `Skill` | | `storage` | SQLite persistence for sessions and messages | `Storage`, `SessionMeta`, `MessageMeta` | +| `scheduler` | Cron-based job scheduling, next-run computation | `Scheduler`, `Schedule`, `next_run_for_schedule()` | | `observability` | Observer pattern for agent/tool telemetry events | `Observer` trait, `ObserverEvent`, `MultiObserver` | | `protocol` | WebSocket protocol message types | `WsInbound`, `WsOutbound`, `SessionSummary` | | `config` | Config loading, env substitution, path resolution | `Config`, `LLMProviderConfig` | diff --git a/README.md b/README.md index e69de29..5c5048d 100644 --- a/README.md +++ b/README.md @@ -0,0 +1,422 @@ +# PicoBot + +A multi-channel AI agent framework with a WebSocket gateway and TUI client, supporting OpenAI-compatible and Anthropic LLM providers, tool calling, session persistence, and cron-based scheduling. + +## System Architecture + +```mermaid +graph TB + subgraph Clients + TUI["🖥️ CLI Chat (TUI)"] + FS["📱 Feishu/Lark"] + end + + subgraph Gateway["Gateway Server (127.0.0.1:19876)"] + HTTP["HTTP Endpoints
GET /health
GET /ws (WebSocket upgrade)"] + WS["WebSocket Handler"] + CD["ChannelManager"] + SP["SessionManager"] + AL["AgentLoop"] + end + + subgraph Bus["MessageBus"] + IB["Inbound Channel"] + OB["Outbound Channel"] + CC["Control Channel"] + end + + subgraph Storage + SQLite[("SQLite
.picobot_sessions.db")] + end + + subgraph AI["AI Providers"] + OpenAI["OpenAI / DashScope"] + Anthropic["Anthropic Claude"] + end + + TUI <-->|WebSocket| WS + FS <-->|Webhook| HTTP + + CD -->|InboundMessage| IB + IB -->|DialogEvent| SP + CC -->|ControlMessage| SP + SP <--> AL + AL -->|API Call| OpenAI + AL -->|API Call| Anthropic + AL -->|Tool Call| Tools + SP -->|OutboundMessage| OB + OB --> CD + SP --> SQLite + Tools --> SQLite + + subgraph Tools + Bash["Bash"] + FileIO["File Read/Write/Edit"] + Web["HTTP Request / Web Fetch"] + Calc["Calculator"] + Skill["Get Skill"] + Msg["Send Message"] + Cron["Cron Jobs"] + end +``` + +### Core Data Flow + +```mermaid +sequenceDiagram + participant Channel as Channel
(CLI/Feishu) + participant Bus as MessageBus + participant SM as SessionManager + participant AL as AgentLoop + participant LLM as LLM Provider + participant Tool as Tools + + Channel->>Bus: InboundMessage (user input) + Bus->>SM: DialogEvent + SM->>SM: Load/Resolve Session + SM->>AL: Process (session state) + AL->>LLM: ChatCompletionRequest + LLM-->>AL: response / tool_calls + alt Tool Calls + AL->>Tool: execute tool + Tool-->>AL: result + AL->>LLM: continue with tool result + end + AL-->>SM: AgentProcessResult (text + token count) + SM->>SM: Persist to SQLite + SM->>Bus: OutboundMessage + Bus->>Channel: response to user +``` + +## Features + +### Multi-Channel Support +- **CLI Chat Client** — Full TUI with session management, Markdown rendering, slash commands +- **Feishu (Lark)** — Webhook-based integration with typing indicators and media support + +### Multi-Provider LLM +- OpenAI-compatible API (GPT-4, DashScope, Volcengine, etc.) +- Anthropic Messages API (Claude) +- Cross-provider JSON Schema normalization for tool calling compatibility + +### Session Management +- Multi-session conversations per channel/chat +- Create, switch, rename, archive, delete dialogs via slash commands or WebSocket +- SQLite-persisted session history with automatic TTL-based cleanup +- Context compression for long conversations approaching token limits + +### Tool System +| Tool | Description | +|------|-------------| +| `bash` | Execute shell commands in workspace | +| `file_read` | Read file contents | +| `file_write` | Create/overwrite files | +| `file_edit` | Precise string substitution in files | +| `http_request` | Make HTTP API requests | +| `web_fetch` | Fetch and parse web pages | +| `calculator` | Evaluate mathematical expressions | +| `get_skill` | Load agent skills from local skill files | +| `send_message` | Send messages to other channels | +| `cron_add/list/remove/enable/disable/update` | Manage scheduled jobs | + +### Scheduling +- Cron-based recurring jobs with optional timezone support +- One-shot (`at`) and interval (`every`) schedules +- Jobs trigger agent processing via specified channel/chat + +### Skills System +- Load Markdown skill files from `~/.picobot/skills` and `~/.agent/skills` +- Skills inject specialized system prompts for specific tasks +- Automatic hot-reload on file changes + +### Observability +- Observer pattern for agent and tool telemetry +- Events: `AgentStart`, `AgentEnd`, `ToolCallStart`, `ToolCall` +- Structured JSON logging with file rotation + +## Quick Start + +### Prerequisites +- Rust nightly (edition 2024) — use `rustup` to install + +### Build + +```bash +cargo build +``` + +### Configure + +1. Create `config.json` (or `~/.picobot/config.json`): + +```json +{ + "providers": { + "openai": { + "type": "openai", + "base_url": "https://api.openai.com/v1", + "api_key": "" + } + }, + "models": { + "gpt-4o": { + "model_id": "gpt-4o", + "temperature": 0.7, + "max_tokens": 4096 + } + }, + "agents": { + "default": { + "provider": "openai", + "model": "gpt-4o", + "max_tool_iterations": 20, + "token_limit": 128000 + } + } +} +``` + +2. Set API keys via `.env` file (one `KEY=VALUE` per line): + +```env +OPENAI_API_KEY=sk-xxxxx +``` + +### Run + +**Start gateway server:** + +```bash +cargo run -- gateway +``` + +Binds `127.0.0.1:19876` by default. Override with `--host` and `--port`. + +**Connect CLI client:** + +```bash +cargo run -- chat +``` + +Connects to `ws://127.0.0.1:19876/ws`. Override with `--gateway-url`. + +## Configuration Reference + +Config load order: `~/.picobot/config.json` → `./config.json` (fallback). + +### Full Config Structure + +```mermaid +graph LR + Config["config.json"] + Config --> Providers["providers
ProviderConfig{}"] + Config --> Models["models
ModelConfig{}"] + Config --> Agents["agents
AgentConfig{}"] + Config --> Gateway["gateway
GatewayConfig"] + Config --> Client["client
ClientConfig"] + Config --> Channels["channels
ChannelConfig{}"] + Config --> Workspace["workspace_dir"] + + Providers --> PT["type (openai / anthropic)
base_url
api_key
extra_headers"] + Models --> MT["model_id
temperature
max_tokens"] + Agents --> AT["provider (ref)
model (ref)
max_tool_iterations
token_limit"] + Gateway --> GT["host / port
session_ttl_hours
cleanup_interval_minutes
session_db_path
scheduler"] + Channels --> CT["feishu: app_id, app_secret
allow_from, agent, media_dir"] +``` + +### Environment Variables + +The `.env` file in the working directory is loaded manually (not via dotenv crate). Placeholders in `config.json` written as `` are substituted at load time. + +### Gateway Config + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `host` | string | `127.0.0.1` | Bind address | +| `port` | u16 | `19876` | Listen port | +| `session_ttl_hours` | number | `4` | Inactive session expiration (hours) | +| `cleanup_interval_minutes` | number | `60` | Session cleanup interval | +| `session_db_path` | string | workspace `.picobot_sessions.db` | SQLite database path | +| `scheduler.enabled` | bool | `false` | Enable cron scheduler | + +### Agent Config + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `provider` | string | — | Provider name (key in `providers`) | +| `model` | string | — | Model name (key in `models`) | +| `max_tool_iterations` | number | `20` | Max tool call iterations per turn | +| `token_limit` | number | `128000` | Context window token limit | + +## Slash Commands + +Available in CLI chat and Feishu: + +| Command | Alias | Description | +|---------|-------|-------------| +| `/new` | `/刷新` | Create a new dialog | +| `/list` | `/对话列表` | List all dialogs | +| `/switch ` | — | Switch to a dialog | +| `/rename ` | — | Rename current dialog | +| `/archive` | — | Archive current dialog | +| `/delete` | — | Delete current dialog | +| `/clear` | `/清空` | Clear current dialog history | + +## WebSocket Protocol + +The gateway exposes a WebSocket endpoint at `/ws`. Messages use typed JSON with a `type` discriminator field. + +### Client → Server (WsInbound) + +| Type | Fields | +|------|--------| +| `user_input` | `content`, `channel?`, `chat_id?`, `sender_id?` | +| `create_session` | `title?` | +| `list_sessions` | `include_archived` | +| `load_session` | `session_id` | +| `rename_session` | `session_id?`, `title` | +| `archive_session` | `session_id?` | +| `delete_session` | `session_id?` | +| `clear_history` | `chat_id?`, `session_id?` | +| `get_slash_commands` | — | +| `ping` | — | + +### Server → Client (WsOutbound) + +| Type | Fields | +|------|--------| +| `assistant_response` | `session_id`, `response`, `tokens_used?`, `tool_calls?` | +| `session_list` | `sessions[]` | +| `session_loaded` | `session_id`, `messages[]` | +| `session_created` | `session_id`, `title` | +| `session_renamed` | `session_id`, `title` | +| `session_archived` | `session_id` | +| `session_deleted` | `session_id` | +| `slash_commands` | `commands[]` | +| `error` | `message` | +| `pong` | — | + +## HTTP Endpoints + +| Method | Path | Description | +|--------|------|-------------| +| `GET` | `/health` | Health check — returns `{"status":"ok","version":"x.y.z"}` | +| `GET` | `/ws` | WebSocket upgrade for chat clients | + +## Testing + +```bash +# Unit tests (no external dependencies) +cargo test --lib + +# Integration tests (require API keys) +cp tests/test.env.example tests/test.env +# Fill in your API keys in tests/test.env +cargo test --test test_integration -- --ignored +cargo test --test test_tool_calling -- --ignored +cargo test --test test_request_format -- --ignored + +# Run all tests +cargo test -- --ignored +``` + +Integration tests are `#[ignore]` by default because they make real API calls. + +## Project Structure + +``` +├── src/ +│ ├── main.rs # CLI entrypoint (clap-based subcommands) +│ ├── lib.rs # Module declarations +│ ├── gateway/ # HTTP/WS server, GatewayState initialization +│ │ ├── mod.rs +│ │ ├── http.rs # Health endpoint +│ │ └── ws.rs # WebSocket handler +│ ├── client/ # TUI chat client +│ │ ├── mod.rs +│ │ └── tui/ # Ratatui-based terminal UI +│ ├── channels/ # Channel integrations +│ │ ├── base.rs # Channel trait +│ │ ├── cli_chat.rs # CLI WebSocket channel +│ │ ├── feishu.rs # Feishu/Lark webhook channel +│ │ ├── manager.rs # ChannelManager +│ │ └── slash_command.rs # Slash command parser +│ ├── bus/ # Async message bus +│ │ ├── mod.rs # MessageBus (tokio mpsc channels) +│ │ ├── message.rs # Message types +│ │ └── dispatcher.rs # OutboundDispatcher +│ ├── session/ # Session & dialog management +│ │ ├── mod.rs +│ │ ├── session.rs # Session, SessionManager +│ │ ├── session_id.rs # UnifiedSessionId +│ │ ├── commands.rs # SessionCommand enum +│ │ └── events.rs # SessionEvent, DialogInfo +│ ├── agent/ # LLM interaction loop +│ │ ├── mod.rs +│ │ ├── agent_loop.rs # AgentLoop (stateless) +│ │ ├── context_compressor.rs # Token estimation & summarization +│ │ └── system_prompt.rs # System prompt builder +│ ├── providers/ # LLM API clients +│ │ ├── mod.rs # Factory: create_provider() +│ │ ├── traits.rs # LLMProvider trait +│ │ ├── openai.rs # OpenAI-compatible client +│ │ └── anthropic.rs # Anthropic Messages API client +│ ├── tools/ # Agent tools +│ │ ├── mod.rs # create_default_tools() +│ │ ├── registry.rs # ToolRegistry +│ │ ├── traits.rs # Tool trait, ToolResult +│ │ ├── schema.rs # Cross-provider JSON Schema cleaner +│ │ ├── bash.rs # Shell command execution +│ │ ├── calculator.rs # Math expression evaluator +│ │ ├── chat_manager.rs # Session management tool +│ │ ├── cron.rs # Cron job management tools +│ │ ├── file_read.rs # File reader +│ │ ├── file_write.rs # File writer +│ │ ├── file_edit.rs # File editor (string substitution) +│ │ ├── get_skill.rs # Skill loader tool +│ │ ├── http_request.rs # HTTP request tool +│ │ ├── send_message.rs # Cross-channel messaging +│ │ └── web_fetch.rs # Web page fetcher +│ ├── skills/ # Skills loading from markdown files +│ │ └── mod.rs # SkillsLoader, Skill +│ ├── storage/ # SQLite persistence +│ │ ├── mod.rs # Storage, schema init +│ │ ├── session.rs # Session CRUD operations +│ │ ├── message.rs # Message persistence +│ │ ├── scheduler.rs # ScheduledJob, JobRun storage +│ │ └── error.rs # StorageError +│ ├── scheduler/ # Cron scheduler runtime +│ │ ├── mod.rs # Scheduler, next_run_for_schedule() +│ │ └── types.rs # Schedule enum (At/Every/Cron) +│ ├── observability/ # Telemetry observer pattern +│ │ └── mod.rs # Observer trait, ObserverEvent, MultiObserver +│ ├── protocol.rs # WebSocket message types (WsInbound/WsOutbound) +│ ├── config/ # Config loading & env substitution +│ │ └── mod.rs # Config, LLMProviderConfig, load_env_file() +│ └── logging.rs # Tracing subscriber init with file rotation +├── tests/ +│ ├── test_integration.rs # LLM provider integration tests +│ ├── test_tool_calling.rs # Tool calling integration tests +│ ├── test_request_format.rs # Request format tests +│ ├── test_scheduler.rs # Scheduler unit tests +│ ├── test.env.example # Test environment template +│ └── test.env # Actual test keys (gitignored) +├── reference/ # Third-party reference code (do not modify) +├── config.example.json # Full config example +└── Cargo.toml +``` + +## Key Dependencies + +| Crate | Purpose | +|-------|---------| +| `axum` + `tokio-tungstenite` | HTTP server & WebSocket | +| `sqlx` (SQLite) | Session/Message/Job persistence | +| `reqwest` (rustls) | LLM API & external HTTP calls | +| `ratatui` + `crossterm` | Terminal UI | +| `clap` | CLI argument parsing | +| `tracing` + `tracing-subscriber` | Structured logging | +| `cron` + `chrono-tz` | Cron schedule parsing | +| `meval` | Mathematical expression evaluation | +| `uuid` | Session/Dialog ID generation | +| `dirs` | Platform config directory resolution | diff --git a/session_plan.md b/session_plan.md deleted file mode 100644 index 7fbee17..0000000 --- a/session_plan.md +++ /dev/null @@ -1,433 +0,0 @@ -# Session 管理详细设计 - -## 一、设计目标 - -1. Session 数据持久化到 SQLite,系统重启后可恢复 -2. 支持 Dialog 的完整生命周期管理(创建/列表/切换/重命名/删除) -3. 基于 TTL 的自动内存清理(DB 保留所有数据) -4. LLM 自动生成会话标题(title),帮助用户和 AI 理解对话上下文 -5. 每条消息实时写入 DB,失败后重试 + 告警 - ---- - -## 二、概念定义 - -### 2.1 层级结构 - -``` -Channel (渠道) -└── Chat (聊天) - └── Dialog (对话) - └── Session (会话实例) -``` - -- **Channel**:消息来源渠道(如 `cli_chat`、`feishu`) -- **Chat**:同一渠道下的一个聊天会话(如某个 CLI session ID 或飞书 open_conversation_id) -- **Dialog**:聊天内的多个独立对话线程(如 `/new` 创建的新对话) -- **Session**:一个 Dialog 的运行时实例,包含消息历史、LLM 配置、工具等 - -### 2.2 UnifiedSessionId - -``` -格式:{channel}:{chat_id}:{dialog_id} -示例:cli_chat:sid_abc123:dialog_xyz - feishu:oc_123456:default -``` - -| 字段 | 说明 | -|------|------| -| channel | 渠道标识 | -| chat_id | 聊天标识 | -| dialog_id | 对话标识(默认 `default`) | - -### 2.3 Session 与 Dialog 的关系 - -- 每个 Dialog 在运行时对应一个 `Session` 实例 -- `Session` 存在于内存中,可通过 `UnifiedSessionId` 访问 -- `Dialog` 是 Storage 中的持久化记录,`Session` 是其运行时投影 - ---- - -## 三、数据库 Schema - -### 3.1 sessions 表 - -```sql -CREATE TABLE sessions ( - id TEXT PRIMARY KEY, - channel TEXT NOT NULL, - chat_id TEXT NOT NULL, - dialog_id TEXT NOT NULL, - title TEXT NOT NULL, - created_at INTEGER NOT NULL, - last_active_at INTEGER NOT NULL, - message_count INTEGER DEFAULT 0, - deleted_at INTEGER, - UNIQUE(channel, chat_id, dialog_id) -); -``` - -> 注意:已删除 `archived_at` 字段,不保留归档概念。 - -### 3.2 messages 表 - -```sql -CREATE TABLE messages ( - id TEXT PRIMARY KEY, - session_id TEXT NOT NULL, - seq INTEGER NOT NULL, - role TEXT NOT NULL, - content TEXT NOT NULL, - media_refs TEXT, - tool_call_id TEXT, - tool_name TEXT, - tool_calls TEXT, - created_at INTEGER NOT NULL, - FOREIGN KEY (session_id) REFERENCES sessions(id) ON DELETE CASCADE -); - -CREATE INDEX idx_messages_session_seq ON messages(session_id, seq); -CREATE INDEX idx_sessions_chat ON sessions(channel, chat_id, deleted_at); -``` - ---- - -## 四、Storage API - -### 4.1 Storage 职责 - -- 唯一的持久化 source of truth -- 由调用方(GatewayState)构造,通过 `Arc<Storage>` 注入 `SessionManager` -- 所有写操作失败后重试 3 次(100ms, 200ms, 300ms 退避),仍失败则触发系统通知告警 - -### 4.2 Session 操作 - -| 方法 | 说明 | -|------|------| -| `new(db_path) -> Storage` | 打开/创建数据库 | -| `upsert_session(meta) -> Result<(), StorageError>` | 插入或更新 session 元数据 | -| `get_session(id) -> Result<SessionMeta, StorageError>` | 获取单个 session | -| `list_sessions(channel, chat_id, limit) -> Result<Vec<SessionMeta>>` | 最近 N 条(供 `/sessions`) | -| `delete_session(id) -> Result<(), StorageError>` | 物理删除 session 及关联消息 | -| `touch_session(id, message_count, last_active_at)` | 更新计数和最后活跃时间 | - -### 4.3 Message 操作 - -| 方法 | 说明 | -|------|------| -| `append_message(session_id, msg) -> Result<i64, StorageError>` | 追加单条消息,返回 seq | -| `append_messages(session_id, msgs) -> Result<Vec<i64>, StorageError>` | 批量追加 | -| `load_messages(session_id, from_seq) -> Result<Vec<MessageMeta>>` | 从指定 seq 加载消息 | -| `clear_messages(session_id) -> Result<(), StorageError>` | 清除消息(保留 session) | - -### 4.4 写入失败处理 - -```rust -async fn append_message_with_retry(&self, session_id: &str, msg: &MessageMeta) -> Result<i64, StorageError> { - let delays = [100, 200, 300]; - for (i, delay) in delays.iter().enumerate() { - match self.append_message(session_id, msg) { - Ok(seq) => return Ok(seq), - Err(e) if i < delays.len() - 1 => { - sleep(Duration::from_millis(*delay)).await; - tracing::warn!("Storage write failed, retrying: {}", e); - } - Err(e) => { - // 全部重试失败后,通过 Session 发送系统通知 - return Err(e); - } - } - } - unreachable!() -} -``` - ---- - -## 五、Session 结构 - -```rust -pub struct Session { - pub id: UnifiedSessionId, - pub title: String, // 会话标题(用户指定或 LLM 自动生成) - pub created_at: i64, // 创建时间(ms) - pub last_active_at: i64, // 最后活跃时间(ms) - pub message_count: i64, // 用户消息计数(触发 title 自动生成) - pub total_message_count: i64, // 含系统消息的总数 - - messages: Vec<ChatMessage>, // 内存中的消息历史(压缩后) - seq_counter: i64, // 下一个消息的 seq - - provider_config: LLMProviderConfig, - provider: Arc<dyn LLMProvider>, - tools: Arc<ToolRegistry>, - compressor: ContextCompressor, - user_tx: mpsc::Sender<WsOutbound>, - storage: Arc<Storage>, // 持久化 sink -} -``` - -### 5.1 初始化流程 - -``` -new() 或 from_storage() - ↓ -注入 storage 引用 - ↓ -创建 provider, tools, compressor - ↓ -从 Storage 加载 messages(from_seq = 0) - ↓ -设置 seq_counter = messages.len() + 1 - ↓ -返回 Session 实例 -``` - -### 5.2 消息管理 - -```rust -pub async fn add_message(&mut self, msg: ChatMessage) -> Result<(), StorageError> { - // 1. 分配序号: seq = seq_counter; seq_counter += 1 - // 2. 转换为 MessageMeta - // 3. 写入 Storage(重试 + 告警) - // 4. 更新内存: messages.push(msg) - // 5. 更新计数: message_count / total_message_count += 1 - // 6. 更新 last_active_at -} -``` - -### 5.3 系统通知接口(不记历史) - -```rust -impl Session { - /// 发送系统通知(不记录进 session 历史) - pub async fn send_system_notification(&self, content: &str) { - let msg = WsOutbound::SystemNotification { - content: content.to_string(), - }; - let _ = self.user_tx.send(msg).await; - } -} -``` - -### 5.4 Title 自动生成 - -调用时机: -1. Session 首次创建时(初始 title = "Dialog {dialog_id}") -2. `message_count` 达到阈值(10 条)且 title 仍为默认值时,自动更新为 LLM 生成 -3. 用户执行 `/rename` 命令手动更新 - -生成 Prompt: -``` -给定以下对话历史,生成一个简短的会话标题(5-15 个中文字符), -概括这个对话的核心内容或用户的主要需求。只返回一个标题,不要解释。 - -历史: -{messages} -``` - ---- - -## 六、SessionManager 设计 - -### 6.1 数据结构 - -```rust -pub struct SessionManager { - inner: Arc<Mutex<SessionManagerInner>>, - provider_config: LLMProviderConfig, - tools: Arc<ToolRegistry>, - skills_loader: Arc<SkillsLoader>, - storage: Arc<Storage>, // 由调用方注入 - cleanup_interval: Duration, -} - -struct SessionManagerInner { - sessions: HashMap<String, Arc<Mutex<Session>>>, - session_timestamps: HashMap<String, Instant>, - session_ttl: Duration, -} -``` - -### 6.2 handle_message 完整流程 - -``` -handle_message(channel, sender_id, chat_id, dialog_id, content, media) - │ - ├── 1. 确定 UnifiedSessionId - │ │ - │ ├── dialog_id 有值 → 直接使用 - │ │ - │ └── dialog_id 无值 → 查找 channel:chat_id 下最近活跃的 session - │ ├── 找到且未过期 → 使用该 session - │ └── 未找到或已过期 → 创建新 session - │ - ├── 2. 获取或创建 Session - │ 有 → 更新 session_timestamps - │ 无 → 从 Storage 恢复 或 创建新 Session - │ - ├── 3. 添加用户消息并持久化 - │ seq = seq_counter; seq_counter += 1 - │ Storage.append_message()(失败重试 → 仍失败则 send_system_notification) - │ messages.push(user_msg) - │ message_count += 1 - │ - ├── 4. 检查 title 自动生成条件 - │ message_count == 10 → 调用 LLM 生成 → 更新 title → 写回 Storage - │ - ├── 5. 注入 skills_prompt(index 0 之后) - │ - ├── 6. 新 session 注入欢迎消息(作为系统消息,不计入 message_count) - │ - ├── 7. 上下文压缩(如需要) - │ - ├── 8. 调用 AgentLoop - │ - ├── 9. 保存 Agent 响应消息并持久化(同样流程) - │ - └── 10. 返回最终响应 -``` - -**欢迎消息**(仅新 session 创建时注入历史): -``` -新对话已创建!会话 ID: {dialog_id} -使用 /sessions 查看所有对话,/switch <id> 切换对话。 -``` - -### 6.3 Dialog 生命周期 - -| 操作 | 方法 | 说明 | -|------|------|------| -| 创建 | `create_dialog()` | 生成 dialog_id,创建 Session,写入 Storage | -| 列表 | `list_dialogs()` | 从 Storage 读取,limit=10 | -| 切换 | `switch_dialog()` | 从 Storage 加载 session,激活到内存 | -| 重命名 | `rename_dialog()` | 更新 Storage 和内存 title | -| 删除 | `delete_dialog()` | 删除内存 session + 删除 Storage 记录 | -| 软重置 | **已删除** | 用户直接 `/new` 开新 session | - -### 6.4 TTL 清理 - -```rust -fn start_cleanup_task(&self) { - tokio::spawn(async move { - loop { - sleep(cleanup_interval).await; - self.run_cleanup().await; - } - }); -} - -async fn run_cleanup(&self) { - // 扫描 session_timestamps - // 超时的 session → 从内存 HashMap 移除 - // Storage 中的 session 记录保留(用户切回可重新加载) -} -``` - -清理策略: -- 内存 session 超时 → 仅释放内存,Storage 记录保留 -- 用户切换回该 session → 从 Storage 重新加载到内存 - ---- - -## 七、斜杠命令 - -| 命令 | 触发词 | 说明 | -|------|--------|------| -| new | `/new [标题]` | 创建新 dialog | -| sessions | `/sessions` | 列出当前 chat 最近 10 条 dialog | -| switch | `/switch <dialog_id>` | 切换到指定 dialog | -| rename | `/rename <新标题>` | 重命名当前 dialog | -| delete | `/delete` | 删除当前 dialog(内存 + Storage) | -| compact | `/compact` | 手动触发上下文压缩 | -| info | `/info` | 显示当前 dialog 信息 | - ---- - -## 八、错误处理 - -```rust -pub enum StorageError { - NotFound(String), - AlreadyExists(String), - Database(String), - Serialization(String), -} -``` - -| 场景 | 处理 | -|------|------| -| Storage 写入失败 | 重试 3 次 → 发送系统通知告警 | -| Storage 读取失败 | 若 session 在内存中,继续使用内存数据 | -| Session 不存在 | 创建新 session | -| 并发冲突 | SQLite transaction 保护 | - ---- - -## 九、配置项 - -```json -{ - "session": { - "ttl_hours": 24, - "cleanup_interval_minutes": 60, - "auto_title_after_n_messages": 10, - "storage_retry_delays_ms": [100, 200, 300] - } -} -``` - ---- - -## 十、文件结构 - -``` -src/ -├── storage/ -│ ├── mod.rs # Storage 主模块 -│ ├── session.rs # Session CRUD -│ ├── message.rs # Message CRUD -│ └── error.rs # StorageError -│ -└── session/ - ├── mod.rs # 导出 Session, SessionManager - ├── session.rs # Session, SessionManager 实现 - ├── session_id.rs # UnifiedSessionId(已有) - ├── commands.rs # SessionCommand(已有) - ├── events.rs # SessionEvent, DialogInfo(已有) - └── error.rs # SessionError(已有) -``` - ---- - -## 十一、实现顺序 - -### Phase 1: Storage 基础 -1. `Storage` 结构和数据库初始化 -2. Session CRUD(upsert_session, get_session, list_sessions, delete_session) -3. Message CRUD(append_message, load_messages) -4. 写入失败重试逻辑 -5. 单元测试 - -### Phase 2: Session 扩展 -1. 扩展 `Session` 结构体(添加 storage 引用、计数字段、seq_counter) -2. `add_message` 持久化集成 -3. `send_system_notification` 接口 -4. `from_storage()` 恢复逻辑 -5. Title 自动生成 LLM 调用 - -### Phase 3: SessionManager 完善 -1. 将 `Arc<Storage>` 集成到 `SessionManager` -2. 实现 `list_dialogs()`(limit=10) -3. 实现 `switch_dialog()`(从 Storage 加载) -4. 实现 `delete_dialog()`(内存 + Storage) -5. 实现 `rename_dialog()` -6. 后台 TTL 清理任务 -7. 集成测试 - -### Phase 4: 斜杠命令 -1. 实现 `/sessions`(列出最近 10 条) -2. 实现 `/switch` -3. 实现 `/rename` -4. 实现 `/delete` -5. 端到端测试 diff --git a/src/agent/context_compressor.rs b/src/agent/context_compressor.rs index 0bf43ac..61f3ba5 100644 --- a/src/agent/context_compressor.rs +++ b/src/agent/context_compressor.rs @@ -96,6 +96,11 @@ impl ContextCompressor { self.session_id = id; } + /// Always true — memory is always available (memory system is always on). + pub fn has_memory(&self) -> bool { + true + } + /// Get the compression threshold in tokens. fn threshold(&self) -> usize { (self.context_window as f64 * self.threshold_ratio) as usize diff --git a/src/session/session.rs b/src/session/session.rs index 2e20853..631e29e 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -1328,6 +1328,13 @@ impl SessionManager { .compress_if_needed(history) .await?; + // Advance consolidation pointer — future compressions skip already-processed messages + let now = chrono::Utc::now().timestamp_millis(); + session_guard.last_consolidated_at = Some(now); + if let Err(e) = session_guard.persist_session_meta().await { + tracing::warn!(error = %e, "Failed to persist consolidation timestamp"); + } + let agent = session_guard.create_agent_with_notify(notify_tx)?; let result = agent.process(history).await?;