Compare commits

...

7 Commits

Author SHA1 Message Date
oudecheng
a2d4ed9193 feat: 添加历史话题管理功能,支持获取和记录每个 chat 的历史话题 2026-05-15 17:43:12 +08:00
oudecheng
34938f57b8 feat: 添加 SessionManager 支持到 SessionCommandHandler,优化会话管理 2026-05-15 16:51:06 +08:00
oudecheng
2cc3b1ce9c feat: 添加话题管理功能,支持按 chat_id 隔离话题 2026-05-15 16:41:00 +08:00
oudecheng
e709773464 feat: 添加话题管理功能,支持切换和持久化话题历史 2026-05-15 15:28:07 +08:00
oudecheng
2e13f6932c feat: Enhance session management with topic support
- Added topic management capabilities, allowing users to create, switch, and query topics within sessions.
- Updated command structure to include new commands: SwitchSession and GetCurrentSession.
- Introduced TopicRecord for managing topic data in the storage layer.
- Modified session handlers to accommodate topic operations, including listing and loading topics.
- Enhanced database schema to support topics, including new tables and relationships.
- Updated input adapters to recognize new commands and handle topic-related actions.
- Improved logging for session and topic operations to aid in debugging and monitoring.
2026-05-15 15:01:58 +08:00
oudecheng
025c0b5d7f feat: 添加 session_id 支持到 OutboundMessage,优化会话管理 2026-05-15 10:00:17 +08:00
oudecheng
0095ace411 feat: 添加按通道查询会话功能,优化会话列表处理 2026-05-15 08:43:33 +08:00
24 changed files with 1007 additions and 257 deletions

272
README.md
View File

@ -4,14 +4,15 @@ PicoBot 是一个用 Rust 构建的多通道 Agent 网关。它把消息接入
当前代码库已经实现以下核心能力: 当前代码库已经实现以下核心能力:
- 基于 Gateway 的统一消息入口,支持 WebSocket CLI 与飞书通道 - 基于 Gateway 的统一消息入口,支持 WebSocket CLI、飞书通道和微信通道
- 面向工具调用的 Agent 循环,支持多轮 tool calling - 面向工具调用的 Agent 循环,支持多轮 tool calling
- SQLite 持久化会话、消息、长期记忆、技能事件和调度任务 - SQLite 持久化会话、消息、长期记忆、技能事件和调度任务
- 基于用户维度的长期记忆检索与写入机制 - 基于用户维度的长期记忆检索与写入机制
- 基于 SKILL.md 的项目级 / 用户级技能加载与运行时管理 - 基于 SKILL.md 的项目级 / 用户级 / OpenClaw 级技能加载与运行时管理
- 定时任务系统,支持延迟、周期、绝对时间和 cron 调度 - 定时任务系统,支持延迟、周期、绝对时间和 cron 调度
- 超长上下文压缩与历史摘要 - 超长上下文压缩与历史摘要
- 持久化 Agent 配置文件注入与周期性重注入 - 持久化 Agent 配置文件注入与周期性重注入
- 会话管理支持按通道查询和切换
## 1. 项目定位 ## 1. 项目定位
@ -37,110 +38,116 @@ PicoBot 的设计目标不是“只会聊天”的单进程 Bot而是一个
### 2.1 消息流转图 ### 2.1 消息流转图
```mermaid ```text
sequenceDiagram 用户消息输入
autonumber
participant U as User / External Chat
participant C as Channel ┌─────────────┐
participant B as MessageBus │ Channel │ ◄── WebSocket CLI / 飞书等
participant P as InboundProcessor └──────┬──────┘
participant SM as SessionManager │ publish_inbound
participant SES as Session
participant AES as AgentExecutionService ┌─────────────┐
participant AL as AgentLoop │ MessageBus │ ◄── 统一消息总线
participant T as ToolRegistry └──────┬──────┘
participant DB as SQLite │ consume_inbound
participant OD as OutboundDispatcher
┌──────────────────┐
│ InboundProcessor │ ◄── 入站处理器
└────────┬─────────┘
│ handle_message
┌────────────────┐
│ SessionManager │ ◄── 会话管理器
└────────┬───────┘
│ 定位/创建 Session
┌────────────────────────┐
│ AgentExecutionService │ ◄── 准备上下文、执行 Agent
└────────┬───────────────┘
┌────┴────┐
▼ ▼
┌───────┐ ┌──────────┐
│SQLite │ │ AgentLoop│ ◄── 调用 LLM、工具执行
└───────┘ └────┬─────┘
▲ │
│ ▼
│ ┌──────────┐
└────┤ToolRegistry│ ◄── 工具注册表
└──────────┘
┌─────────────────┐
│ OutboundDispatcher │ ◄── 出站分发
└────────┬────────┘
│ dispatch
┌─────────────────┐
│ Channel │
└────────┬────────┘
│ 最终回复
用户
U->>C: 输入消息 关键步骤说明:
C->>B: publish_inbound 1. Channel 接收外部消息 → 2. MessageBus 统一路由 → 3. InboundProcessor 处理
B->>P: consume_inbound 4. SessionManager 定位 Session → 5. AgentExecutionService 执行
P->>SM: handle_message(channel, sender, chat, content) 6. 消息持久化到 SQLite → 7. AgentLoop 推理/工具调用
SM->>SES: active_session(channel) 8. 结果经 OutboundDispatcher 返回 Channel
AES->>SES: ensure_persistent_session / ensure_chat_loaded
AES->>DB: 追加 user / system 消息
AES->>AL: process(history)
AL->>T: 调用工具
T-->>AL: 返回 tool result
AL-->>AES: emitted_messages
AES->>DB: 按真实顺序持久化 assistant / tool / assistant
AES->>SES: 安排后台历史压缩
AES-->>SM: outbound messages
SM-->>P: outbound messages
P->>B: publish_outbound
B->>OD: consume_outbound
OD->>C: dispatch
C-->>U: 最终回复
``` ```
### 2.2 项目架构图 ### 2.2 项目架构图
```mermaid ```text
flowchart TB 接入层 (Edge)
subgraph Edge[接入层]
CLI[CLI Client / WebSocket] ├── CLI Client / WebSocket ──┐
FEI[Feishu Channel] ├── Feishu Channel ─────────┼──► 网关与运行时编排 (Gateway)
HTTP[HTTP Health / WS Gateway] └── HTTP Health / WS ────────┘ │
end ├── ChannelManager ◄──┐
├── MessageBus ────────┼── 双向通信
├── InboundProcessor ──┘
├── OutboundDispatcher
├── SessionManager
├── SessionLifecycle / Message / ScheduledTask Services
└── AgentExecutionService ◄── 调用 Agent 执行层
Agent 执行层 (Agent) ◄─────────────────────────────────────────┘
├── AgentLoop ──► ToolRegistry (工具调用)
├── ContextCompressor (上下文压缩)
├── SkillRuntime (技能系统)
└── LLM Providers (OpenAI / Anthropic)
subgraph Gateway[网关与运行时编排] 持久化与后台能力 (Runtime)
CM[ChannelManager]
BUS[MessageBus] ├── SessionStore / SQLite (会话/消息/记忆存储)
IP[InboundProcessor] ├── Scheduler (定时任务调度)
OD[OutboundDispatcher] └── Memory Maintenance (记忆维护)
SSM[SessionManager]
SVC[SessionLifecycle / Message / ScheduledTask Services]
AES[AgentExecutionService]
end
subgraph Agent[Agent 执行层] 数据流向:
LOOP[AgentLoop] - 接入层 ◄──► 网关 ◄──► Agent 执行层
COMP[ContextCompressor] - 网关 ◄──► 持久化层 (SQLite)
SK[SkillRuntime] - Scheduler ◄──► 总线 ◄──► SessionManager
TOOLS[ToolRegistry]
PROV[LLM Providers]
end
subgraph Runtime[持久化与后台能力]
STORE[SessionStore / SQLite]
SCH[Scheduler]
MEM[Memory Maintenance]
end
CLI --> HTTP
FEI --> CM
HTTP --> CM
CM --> BUS
BUS --> IP
IP --> SSM
SSM --> SVC
SVC --> AES
AES --> LOOP
LOOP --> TOOLS
LOOP --> SK
LOOP --> PROV
AES --> COMP
SSM --> STORE
AES --> STORE
SCH --> BUS
SCH --> SSM
MEM --> STORE
BUS --> OD
OD --> CM
``` ```
主要模块如下: 主要模块如下:
- src/gateway网关生命周期、InboundProcessor、OutboundDispatcher、SessionManager以及消息执行、调度任务执行、Prompt 注入、历史压缩和记忆维护编排 - src/agentAgentLoop、上下文压缩器、运行时配置、系统提示构建
- src/bus消息总线队列与消息结构定义不包含渠道投递逻辑 - src/bus消息总线队列与消息结构定义不包含渠道投递逻辑
- src/agentAgentLoop 与上下文压缩器 - src/channels渠道适配层当前已有 CLI、飞书、微信通道
- src/cli本地 CLI 客户端、输入命令解析
- src/clientWebSocket CLI 客户端实现
- src/command命令系统包括处理器、适配器、上下文和响应处理
- src/config配置解析与默认值定义
- src/domain领域模型包含消息和工具定义
- src/gateway网关生命周期、InboundProcessor、OutboundDispatcher、SessionManager以及消息执行、调度任务执行、Prompt 注入、历史压缩和记忆维护编排
- src/providers不同 LLM Provider 的统一抽象,当前支持 openai 和 anthropic - src/providers不同 LLM Provider 的统一抽象,当前支持 openai 和 anthropic
- src/tools内置工具集合与 ToolRegistry - src/tools内置工具集合与 ToolRegistry
- src/storageSQLite 持久化实现 - src/storageSQLite 持久化实现
- src/channels渠道适配层当前已有 CLI 与飞书通道
- src/scheduler数据库驱动的计划任务调度器 - src/scheduler数据库驱动的计划任务调度器
- src/skills技能发现、加载与运行时管理 - src/skills技能发现、加载与运行时管理
- src/client / src/cli本地 CLI 客户端、输入命令解析与会话交互
- src/protocolWebSocket 入站 / 出站协议结构 - src/protocolWebSocket 入站 / 出站协议结构
## 3. 消息机制 ## 3. 消息机制
@ -379,8 +386,10 @@ PicoBot 支持基于文件系统的技能系统,用来给 Agent 注入某一
- 用户级技能:~/.picobot/skills/*/SKILL.md - 用户级技能:~/.picobot/skills/*/SKILL.md
- 用户 Agent 级技能:~/.agents/skills/*/SKILL.md - 用户 Agent 级技能:~/.agents/skills/*/SKILL.md
- 用户 OpenClaw 级技能:~/.openclaw/skills/*/SKILL.md
- 项目级技能:.picobot/skills/*/SKILL.md - 项目级技能:.picobot/skills/*/SKILL.md
- 项目 Agent 级技能:.agents/skills/*/SKILL.md - 项目 Agent 级技能:.agents/skills/*/SKILL.md
- 项目 OpenClaw 级技能:.openclaw/skills/*/SKILL.md
### 7.2 最小 SKILL.md 格式 ### 7.2 最小 SKILL.md 格式
@ -459,7 +468,7 @@ skills 配置示例:
{ {
"skills": { "skills": {
"enabled": true, "enabled": true,
"sources": ["user", "user_agent", "project", "project_agent"], "sources": ["user", "user_agent", "user_openclaw", "project", "project_agent", "project_openclaw"],
"max_index_chars": 4000, "max_index_chars": 4000,
"max_listed_skills": 32 "max_listed_skills": 32
} }
@ -654,6 +663,7 @@ silent_agent_task 和 agent_task 使用同一套 Agent 执行能力,但路由
- WebSocket CLI 客户端 - WebSocket CLI 客户端
- 飞书通道 - 飞书通道
- 微信通道
### 10.2 Gateway 接口 ### 10.2 Gateway 接口
@ -678,13 +688,15 @@ cargo run -- agent
CLI 中已实现的交互命令包括: CLI 中已实现的交互命令包括:
- /new [title] - /new [title] - 创建新会话
- /reset - /reset - 重置当前会话上下文
- /sessions - /sessions - 列出当前通道的所有会话(支持跨通道隔离)
- /use <session> - /use <session> - 切换到指定会话
- /rename <title> - /rename <title> - 重命名当前会话
- /archive - /archive - 归档当前会话
- /delete - /delete - 删除指定会话
- /clear - 清屏
- /quit - 退出 CLI
- /clear - /clear
- /quit - /quit
@ -707,7 +719,8 @@ CLI 中已实现的交互命令包括:
"base_url": "<OPENAI_BASE_URL>", "base_url": "<OPENAI_BASE_URL>",
"api_key": "<OPENAI_API_KEY>", "api_key": "<OPENAI_API_KEY>",
"extra_headers": {}, "extra_headers": {},
"llm_timeout_secs": 120 "llm_timeout_secs": 120,
"memory_maintenance_timeout_secs": 600
} }
}, },
"models": { "models": {
@ -744,12 +757,12 @@ CLI 中已实现的交互命令包括:
常用配置项: 常用配置项:
- providersProvider 连接信息 - providersProvider 连接信息,包含 llm_timeout_secsLLM 调用超时,默认 120 秒)和 memory_maintenance_timeout_secs记忆维护超时默认 600 秒)
- models模型参数包括上下文窗口估算所用的 context_window_tokens - models模型参数包括上下文窗口估算所用的 context_window_tokens
- agentsAgent 级别的工具轮次、工具结果裁剪与上下文裁剪参数 - agentsAgent 级别的工具轮次、工具结果裁剪与上下文裁剪参数
- gateway监听地址、端口、工具结果展示、会话 TTL、Prompt 重注入策略 - gateway监听地址、端口、工具结果展示、会话 TTL、Prompt 重注入策略
- scheduler调度器开关、worker 队列容量、误触发策略和任务列表 - scheduler调度器开关、worker 队列容量、误触发策略和任务列表
- channels飞书等通道配置 - channels飞书、微信等通道配置
- skills技能来源与索引限制 - skills技能来源与索引限制
- tools工具启用/禁用配置(通过 disabled 列表指定禁用的工具) - tools工具启用/禁用配置(通过 disabled 列表指定禁用的工具)
- time.timezone时区默认应使用 IANA 时区名,例如 Asia/Shanghai - time.timezone时区默认应使用 IANA 时区名,例如 Asia/Shanghai
@ -761,6 +774,46 @@ CLI 中已实现的交互命令包括:
1. 复制并修改 config.json或把配置放到 ~/.picobot/config.json 1. 复制并修改 config.json或把配置放到 ~/.picobot/config.json
2. 配置好 Provider 的 base_url、api_key、model_id 2. 配置好 Provider 的 base_url、api_key、model_id
3. 如果要接飞书,再补充 channels.feishu 配置 3. 如果要接飞书,再补充 channels.feishu 配置
4. 如果要接微信,再补充 channels.wechat 配置
飞书通道配置示例:
```json
{
"channels": {
"feishu": {
"type": "feishu",
"app_id": "<FEISHU_APP_ID>",
"app_secret": "<FEISHU_APP_SECRET>",
"enabled": true,
"allow_from": ["*"],
"agent": "default",
"media_dir": "~/.picobot/media/feishu",
"reaction_emoji": "Typing",
"max_message_chars": 20000,
"reply_context_max_chars": 20000
}
}
}
```
微信通道配置示例:
```json
{
"channels": {
"wechat": {
"type": "wechat",
"enabled": true,
"allow_from": ["*"],
"agent": "default",
"base_url": "https://ilinkai.weixin.qq.com",
"cred_path": "~/.picobot/wechat/credentials.json",
"force_login": false
}
}
}
```
### 12.2 启动网关 ### 12.2 启动网关
@ -797,17 +850,23 @@ curl http://127.0.0.1:19876/health
```text ```text
PicoBot/ PicoBot/
├── src/ ├── src/
│ ├── agent/ # AgentLoop、上下文压缩 │ ├── agent/ # AgentLoop、上下文压缩、运行时配置、系统提示
│ ├── bus/ # 消息总线与消息结构 │ ├── bus/ # 消息总线与消息结构
│ ├── channels/ # CLI / 飞书等渠道适配 │ ├── channels/ # CLI / 飞书 / 微信等渠道适配
│ ├── cli/ # CLI 输入命令 │ ├── cli/ # CLI 输入命令与通道实现
│ ├── client/ # WebSocket CLI 客户端 │ ├── client/ # WebSocket CLI 客户端
│ ├── command/ # 命令系统(处理器、适配器、上下文)
│ ├── config/ # 配置解析 │ ├── config/ # 配置解析
│ ├── gateway/ # Gateway、Session 编排、WS/HTTP 控制面 │ ├── domain/ # 领域模型(消息、工具定义)
│ ├── gateway/ # Gateway、Session 编排、WS/HTTP 控制面、执行服务
│ ├── logging/ # 日志配置
│ ├── observability/ # 可观测性支持
│ ├── platform/ # 平台抽象
│ ├── providers/ # OpenAI / Anthropic Provider │ ├── providers/ # OpenAI / Anthropic Provider
│ ├── scheduler/ # 定时任务系统 │ ├── scheduler/ # 定时任务系统
│ ├── skills/ # 技能运行时 │ ├── skills/ # 技能运行时
│ ├── storage/ # SQLite 持久化 │ ├── storage/ # SQLite 持久化(存储、端口、记录、错误)
│ ├── text/ # 文本处理工具
│ └── tools/ # 内置工具集合 │ └── tools/ # 内置工具集合
├── docs/ ├── docs/
│ ├── IMPLEMENTATION_LOG.md │ ├── IMPLEMENTATION_LOG.md
@ -824,8 +883,11 @@ PicoBot/
- docs/PERSISTENCE.md持久化结构是否与代码一致 - docs/PERSISTENCE.md持久化结构是否与代码一致
- src/gateway/session.rs会话状态、会话路由和运行时服务编排 - src/gateway/session.rs会话状态、会话路由和运行时服务编排
- src/gateway/execution.rsAgent 执行服务
- src/storage/mod.rsSQLite schema 变更 - src/storage/mod.rsSQLite schema 变更
- src/config/mod.rs配置项变更是否同步到 README - src/config/mod.rs配置项变更是否同步到 README
- src/bus/message.rs消息结构变更如 OutboundMessage 新增 session_id
- src/command/handlers/:命令处理器实现
## 15. 总结 ## 15. 总结

View File

@ -251,7 +251,12 @@ impl InboundMessage {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct OutboundMessage { pub struct OutboundMessage {
pub channel: String, pub channel: String,
/// 消息发送目标 ID如飞书 open_id、微信 chat_id
/// 注意:这始终是原始入站消息的 chat_id不会被修改为会话 ID
pub chat_id: String, pub chat_id: String,
/// 内部会话 ID对应 sessions.id
/// 用于会话管理和消息持久化,与消息发送目标无关
pub session_id: Option<String>,
pub content: String, pub content: String,
pub reply_to: Option<String>, pub reply_to: Option<String>,
pub media: Vec<MediaItem>, pub media: Vec<MediaItem>,
@ -281,6 +286,7 @@ impl OutboundMessage {
pub fn assistant( pub fn assistant(
channel: impl Into<String>, channel: impl Into<String>,
chat_id: impl Into<String>, chat_id: impl Into<String>,
session_id: Option<String>,
content: impl Into<String>, content: impl Into<String>,
reply_to: Option<String>, reply_to: Option<String>,
metadata: HashMap<String, String>, metadata: HashMap<String, String>,
@ -288,6 +294,7 @@ impl OutboundMessage {
Self { Self {
channel: channel.into(), channel: channel.into(),
chat_id: chat_id.into(), chat_id: chat_id.into(),
session_id,
content: content.into(), content: content.into(),
reply_to, reply_to,
media: Vec::new(), media: Vec::new(),
@ -303,11 +310,12 @@ impl OutboundMessage {
pub fn scheduler_notification( pub fn scheduler_notification(
channel: impl Into<String>, channel: impl Into<String>,
chat_id: impl Into<String>, chat_id: impl Into<String>,
session_id: Option<String>,
content: impl Into<String>, content: impl Into<String>,
reply_to: Option<String>, reply_to: Option<String>,
metadata: HashMap<String, String>, metadata: HashMap<String, String>,
) -> Self { ) -> Self {
let mut message = Self::assistant(channel, chat_id, content, reply_to, metadata); let mut message = Self::assistant(channel, chat_id, session_id, content, reply_to, metadata);
message.event_kind = OutboundEventKind::SchedulerNotification; message.event_kind = OutboundEventKind::SchedulerNotification;
message message
} }
@ -315,11 +323,12 @@ impl OutboundMessage {
pub fn error_notification( pub fn error_notification(
channel: impl Into<String>, channel: impl Into<String>,
chat_id: impl Into<String>, chat_id: impl Into<String>,
session_id: Option<String>,
content: impl Into<String>, content: impl Into<String>,
reply_to: Option<String>, reply_to: Option<String>,
metadata: HashMap<String, String>, metadata: HashMap<String, String>,
) -> Self { ) -> Self {
let mut message = Self::assistant(channel, chat_id, content, reply_to, metadata); let mut message = Self::assistant(channel, chat_id, session_id, content, reply_to, metadata);
message.event_kind = OutboundEventKind::ErrorNotification; message.event_kind = OutboundEventKind::ErrorNotification;
message message
} }
@ -327,6 +336,7 @@ impl OutboundMessage {
pub fn tool_call( pub fn tool_call(
channel: impl Into<String>, channel: impl Into<String>,
chat_id: impl Into<String>, chat_id: impl Into<String>,
session_id: Option<String>,
message_id: impl Into<String>, message_id: impl Into<String>,
tool_name: impl Into<String>, tool_name: impl Into<String>,
tool_arguments: serde_json::Value, tool_arguments: serde_json::Value,
@ -338,6 +348,7 @@ impl OutboundMessage {
Self { Self {
channel: channel.into(), channel: channel.into(),
chat_id: chat_id.into(), chat_id: chat_id.into(),
session_id,
content, content,
reply_to, reply_to,
media: Vec::new(), media: Vec::new(),
@ -353,6 +364,7 @@ impl OutboundMessage {
pub fn tool_result( pub fn tool_result(
channel: impl Into<String>, channel: impl Into<String>,
chat_id: impl Into<String>, chat_id: impl Into<String>,
session_id: Option<String>,
tool_call_id: impl Into<String>, tool_call_id: impl Into<String>,
tool_name: impl Into<String>, tool_name: impl Into<String>,
content: impl Into<String>, content: impl Into<String>,
@ -365,6 +377,7 @@ impl OutboundMessage {
Self { Self {
channel: channel.into(), channel: channel.into(),
chat_id: chat_id.into(), chat_id: chat_id.into(),
session_id,
content, content,
reply_to, reply_to,
media: Vec::new(), media: Vec::new(),
@ -380,6 +393,7 @@ impl OutboundMessage {
pub fn tool_pending( pub fn tool_pending(
channel: impl Into<String>, channel: impl Into<String>,
chat_id: impl Into<String>, chat_id: impl Into<String>,
session_id: Option<String>,
tool_call_id: impl Into<String>, tool_call_id: impl Into<String>,
tool_name: impl Into<String>, tool_name: impl Into<String>,
content: impl Into<String>, content: impl Into<String>,
@ -392,6 +406,7 @@ impl OutboundMessage {
Self { Self {
channel: channel.into(), channel: channel.into(),
chat_id: chat_id.into(), chat_id: chat_id.into(),
session_id,
content, content,
reply_to, reply_to,
media: Vec::new(), media: Vec::new(),
@ -407,6 +422,7 @@ impl OutboundMessage {
pub fn from_chat_message( pub fn from_chat_message(
channel: &str, channel: &str,
chat_id: &str, chat_id: &str,
session_id: Option<String>,
reply_to: Option<String>, reply_to: Option<String>,
metadata: &HashMap<String, String>, metadata: &HashMap<String, String>,
message: &ChatMessage, message: &ChatMessage,
@ -419,6 +435,7 @@ impl OutboundMessage {
outbound.push(Self::assistant( outbound.push(Self::assistant(
channel.to_string(), channel.to_string(),
chat_id.to_string(), chat_id.to_string(),
session_id.clone(),
message.content.clone(), message.content.clone(),
reply_to.clone(), reply_to.clone(),
metadata.clone(), metadata.clone(),
@ -429,6 +446,7 @@ impl OutboundMessage {
Self::tool_call( Self::tool_call(
channel.to_string(), channel.to_string(),
chat_id.to_string(), chat_id.to_string(),
session_id.clone(),
tool_call.id.clone(), tool_call.id.clone(),
tool_call.name.clone(), tool_call.name.clone(),
tool_call.arguments.clone(), tool_call.arguments.clone(),
@ -441,6 +459,7 @@ impl OutboundMessage {
vec![Self::assistant( vec![Self::assistant(
channel.to_string(), channel.to_string(),
chat_id.to_string(), chat_id.to_string(),
session_id,
message.content.clone(), message.content.clone(),
reply_to, reply_to,
metadata.clone(), metadata.clone(),
@ -455,6 +474,7 @@ impl OutboundMessage {
ToolMessageState::Completed => vec![Self::tool_result( ToolMessageState::Completed => vec![Self::tool_result(
channel.to_string(), channel.to_string(),
chat_id.to_string(), chat_id.to_string(),
session_id,
message.tool_call_id.clone().unwrap_or_default(), message.tool_call_id.clone().unwrap_or_default(),
message.tool_name.clone().unwrap_or_default(), message.tool_name.clone().unwrap_or_default(),
message.content.clone(), message.content.clone(),
@ -464,6 +484,7 @@ impl OutboundMessage {
ToolMessageState::PendingUserAction => vec![Self::tool_pending( ToolMessageState::PendingUserAction => vec![Self::tool_pending(
channel.to_string(), channel.to_string(),
chat_id.to_string(), chat_id.to_string(),
session_id,
message.tool_call_id.clone().unwrap_or_default(), message.tool_call_id.clone().unwrap_or_default(),
message.tool_name.clone().unwrap_or_default(), message.tool_name.clone().unwrap_or_default(),
message.content.clone(), message.content.clone(),
@ -562,6 +583,7 @@ mod tests {
TEST_CHANNEL, TEST_CHANNEL,
"chat-1", "chat-1",
None, None,
None,
&HashMap::new(), &HashMap::new(),
&message, &message,
); );
@ -599,6 +621,7 @@ mod tests {
TEST_CHANNEL, TEST_CHANNEL,
"chat-1", "chat-1",
None, None,
None,
&HashMap::new(), &HashMap::new(),
&message, &message,
); );
@ -618,6 +641,7 @@ mod tests {
TEST_CHANNEL, TEST_CHANNEL,
"chat-1", "chat-1",
None, None,
None,
&HashMap::new(), &HashMap::new(),
&message, &message,
); );
@ -639,6 +663,7 @@ mod tests {
TEST_CHANNEL, TEST_CHANNEL,
"chat-1", "chat-1",
None, None,
None,
&HashMap::new(), &HashMap::new(),
&message, &message,
); );

View File

@ -119,6 +119,7 @@ mod tests {
.send(OutboundMessage::assistant( .send(OutboundMessage::assistant(
"cli", "cli",
"session-1", "session-1",
None, // session_id
"hello", "hello",
None, None,
HashMap::new(), HashMap::new(),
@ -143,6 +144,7 @@ mod tests {
.send(OutboundMessage::assistant( .send(OutboundMessage::assistant(
"cli", "cli",
"session-1", "session-1",
None, // session_id
"hello", "hello",
None, None,
HashMap::new(), HashMap::new(),

View File

@ -78,14 +78,19 @@ impl InputAdapter for ChannelInputAdapter {
})); }));
} }
// 解析 /use 命令 // 解析 /use 命令 - 切换会话(支持 session_id 或序号)
if let Some(session_id) = trimmed.strip_prefix("/use ") { if let Some(session_id) = trimmed.strip_prefix("/use ") {
let session_id = session_id.trim(); let session_id = session_id.trim();
return Ok(Some(Command::LoadSession { return Ok(Some(Command::SwitchSession {
session_id: session_id.to_string(), session_id: session_id.to_string(),
})); }));
} }
// 解析 /current 命令 - 获取当前会话信息
if trimmed == "/current" {
return Ok(Some(Command::GetCurrentSession));
}
// 不是命令,返回 None // 不是命令,返回 None
Ok(None) Ok(None)
} }

View File

@ -79,14 +79,19 @@ impl InputAdapter for CliInputAdapter {
})); }));
} }
// 解析 /use 命令 // 解析 /use 命令 - 切换会话(支持 session_id 或序号)
if let Some(session_id) = trimmed.strip_prefix("/use ") { if let Some(session_id) = trimmed.strip_prefix("/use ") {
let session_id = session_id.trim(); let session_id = session_id.trim();
return Ok(Some(Command::LoadSession { return Ok(Some(Command::SwitchSession {
session_id: session_id.to_string(), session_id: session_id.to_string(),
})); }));
} }
// 解析 /current 命令 - 获取当前会话信息
if trimmed == "/current" {
return Ok(Some(Command::GetCurrentSession));
}
// 不是命令,返回 None // 不是命令,返回 None
Ok(None) Ok(None)
} }

View File

@ -8,6 +8,8 @@ pub struct CommandContext {
pub request_id: Uuid, pub request_id: Uuid,
/// 当前会话ID /// 当前会话ID
pub session_id: Option<String>, pub session_id: Option<String>,
/// 当前话题ID
pub topic_id: Option<String>,
/// 当前聊天ID /// 当前聊天ID
pub chat_id: Option<String>, pub chat_id: Option<String>,
/// 发送者ID /// 发送者ID
@ -24,6 +26,7 @@ impl CommandContext {
Self { Self {
request_id: Uuid::new_v4(), request_id: Uuid::new_v4(),
session_id: None, session_id: None,
topic_id: None,
chat_id: None, chat_id: None,
sender_id: sender_id.into(), sender_id: sender_id.into(),
channel_name: channel_name.into(), channel_name: channel_name.into(),
@ -37,6 +40,12 @@ impl CommandContext {
self self
} }
/// 设置话题ID
pub fn with_topic_id(mut self, topic_id: impl Into<String>) -> Self {
self.topic_id = Some(topic_id.into());
self
}
/// 设置聊天ID /// 设置聊天ID
pub fn with_chat_id(mut self, chat_id: impl Into<String>) -> Self { pub fn with_chat_id(mut self, chat_id: impl Into<String>) -> Self {
self.chat_id = Some(chat_id.into()); self.chat_id = Some(chat_id.into());
@ -55,6 +64,8 @@ impl CommandContext {
pub struct AdapterContext { pub struct AdapterContext {
/// 当前会话ID /// 当前会话ID
pub session_id: Option<String>, pub session_id: Option<String>,
/// 当前话题ID
pub topic_id: Option<String>,
/// 当前聊天ID /// 当前聊天ID
pub chat_id: Option<String>, pub chat_id: Option<String>,
/// 发送者ID /// 发送者ID
@ -66,6 +77,7 @@ impl AdapterContext {
pub fn new(sender_id: impl Into<String>) -> Self { pub fn new(sender_id: impl Into<String>) -> Self {
Self { Self {
session_id: None, session_id: None,
topic_id: None,
chat_id: None, chat_id: None,
sender_id: sender_id.into(), sender_id: sender_id.into(),
} }
@ -77,6 +89,12 @@ impl AdapterContext {
self self
} }
/// 设置话题ID
pub fn with_topic_id(mut self, topic_id: impl Into<String>) -> Self {
self.topic_id = Some(topic_id.into());
self
}
/// 设置聊天ID /// 设置聊天ID
pub fn with_chat_id(mut self, chat_id: impl Into<String>) -> Self { pub fn with_chat_id(mut self, chat_id: impl Into<String>) -> Self {
self.chat_id = Some(chat_id.into()); self.chat_id = Some(chat_id.into());

View File

@ -128,11 +128,39 @@ async fn handle_save_session(
include_all: bool, include_all: bool,
ctx: CommandContext, ctx: CommandContext,
) -> Result<CommandResponse, CommandError> { ) -> Result<CommandResponse, CommandError> {
tracing::debug!(
ctx_session_id = ?ctx.session_id,
ctx_chat_id = ?ctx.chat_id,
channel = %ctx.channel_name,
"SaveSession command received"
);
let session_id = ctx let session_id = ctx
.session_id .session_id
.as_deref() .as_deref()
.ok_or_else(|| CommandError::new("NO_SESSION", "No active session".to_string()))?; .ok_or_else(|| CommandError::new("NO_SESSION", "No active session".to_string()))?;
tracing::debug!(session_id = %session_id, "Attempting to save session");
// 先检查会话是否存在
match handler.store.get_session(session_id) {
Ok(Some(record)) => {
tracing::debug!(
session_id = %session_id,
title = %record.title,
chat_id = %record.chat_id,
message_count = record.message_count,
"Session found for saving"
);
}
Ok(None) => {
tracing::warn!(session_id = %session_id, "Session not found in store");
}
Err(e) => {
tracing::error!(session_id = %session_id, error = %e, "Error querying session");
}
}
// 调用公共函数 // 调用公共函数
let output_path = save_session_to_file( let output_path = save_session_to_file(
session_id, session_id,

View File

@ -2,23 +2,35 @@ use crate::command::context::CommandContext;
use crate::command::handler::CommandHandler; use crate::command::handler::CommandHandler;
use crate::command::response::{CommandError, CommandResponse, MessageKind}; use crate::command::response::{CommandError, CommandResponse, MessageKind};
use crate::command::Command; use crate::command::Command;
use crate::gateway::cli_session::CliSessionService; use crate::gateway::session::SessionManager;
use crate::storage::SessionStore;
use async_trait::async_trait; use async_trait::async_trait;
use std::sync::Arc;
/// 会话命令处理器 /// 会话命令处理器
/// ///
/// 处理与会话管理相关的命令 /// 处理与会话管理相关的命令
pub struct SessionCommandHandler { pub struct SessionCommandHandler {
cli_sessions: CliSessionService, store: Arc<SessionStore>,
session_manager: Option<SessionManager>,
} }
impl SessionCommandHandler { impl SessionCommandHandler {
/// 创建新的会话命令处理器 /// 创建新的会话命令处理器
/// ///
/// # Arguments /// # Arguments
/// * `cli_sessions` - CLI 会话服务 /// * `store` - Session 存储
pub(crate) fn new(cli_sessions: CliSessionService) -> Self { pub(crate) fn new(store: Arc<SessionStore>) -> Self {
Self { cli_sessions } Self {
store,
session_manager: None,
}
}
/// 设置 SessionManager用于 CreateSession 命令自动切换话题)
pub fn with_session_manager(mut self, session_manager: SessionManager) -> Self {
self.session_manager = Some(session_manager);
self
} }
} }
@ -47,37 +59,63 @@ async fn handle_create_session(
title: Option<String>, title: Option<String>,
ctx: CommandContext, ctx: CommandContext,
) -> Result<CommandResponse, CommandError> { ) -> Result<CommandResponse, CommandError> {
let record = handler // 获取当前 session_id如果没有则报错
.cli_sessions let session_id = ctx.session_id.as_deref()
.create_with_channel(&ctx.channel_name, title.as_deref()) .ok_or_else(|| CommandError::new("NO_SESSION", "No active session. Please ensure a session exists first."))?;
.map_err(|e| CommandError::new("CREATE_SESSION_ERROR", e.to_string()))?;
// 创建新话题(在同一个 Session 内)
let topic_title = title.unwrap_or_else(|| {
format!("Topic {}", &uuid::Uuid::new_v4().to_string()[..8])
});
let topic = handler
.store
.create_topic(session_id, &topic_title, None)
.map_err(|e| CommandError::new("CREATE_TOPIC_ERROR", e.to_string()))?;
// 获取 chat_id
let chat_id = ctx.chat_id.as_deref()
.ok_or_else(|| CommandError::new("NO_CHAT_ID", "No chat_id in context"))?;
// 如果有 SessionManager自动切换到新话题
if let Some(ref session_manager) = handler.session_manager {
if let Some(session) = session_manager.get(&ctx.channel_name).await {
let mut session_guard = session.lock().await;
session_guard.switch_topic(chat_id, &topic.id)
.map_err(|e| CommandError::new("SWITCH_TOPIC_ERROR", e.to_string()))?;
}
}
Ok(CommandResponse::success(ctx.request_id) Ok(CommandResponse::success(ctx.request_id)
.with_message(MessageKind::Notification, &record.title) .with_message(MessageKind::Notification, &topic.title)
.with_metadata("session_id", &record.id) .with_metadata("topic_id", &topic.id)
.with_metadata("channel_name", &record.channel_name) .with_metadata("session_id", &topic.session_id)
.with_metadata("message_count", &record.message_count.to_string())) .with_metadata("message_count", &topic.message_count.to_string()))
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use crate::command::response::MessageKind; use crate::storage::SessionStore;
use crate::storage::{SessionRecord, SessionStore};
use std::sync::Arc; use std::sync::Arc;
fn create_test_service() -> CliSessionService { fn create_test_handler() -> SessionCommandHandler {
let store = Arc::new(SessionStore::in_memory().unwrap()); let store = Arc::new(SessionStore::in_memory().unwrap());
CliSessionService::new(store) SessionCommandHandler::new(store)
} }
#[tokio::test] #[tokio::test]
async fn test_create_session_with_title() { async fn test_create_session_with_title() {
let service = create_test_service(); let handler = create_test_handler();
let handler = SessionCommandHandler::new(service); // 需要先创建一个 session
let ctx = CommandContext::new("test", "test"); let store = handler.store.clone();
let session = store.create_session("cli", Some("test session")).unwrap();
let ctx = CommandContext::new("test", "cli")
.with_session_id(&session.id)
.with_chat_id(&session.id);
let cmd = Command::CreateSession { let cmd = Command::CreateSession {
title: Some("my session".to_string()), title: Some("my topic".to_string()),
}; };
let result = handler.handle(cmd, ctx).await; let result = handler.handle(cmd, ctx).await;
@ -85,16 +123,18 @@ mod tests {
assert!(result.is_ok()); assert!(result.is_ok());
let resp = result.unwrap(); let resp = result.unwrap();
assert!(resp.success); assert!(resp.success);
assert_eq!(resp.messages.len(), 1); assert!(resp.metadata.contains_key("topic_id"));
assert_eq!(resp.messages[0].content, "my session");
assert!(resp.metadata.contains_key("session_id"));
} }
#[tokio::test] #[tokio::test]
async fn test_create_session_without_title() { async fn test_create_session_without_title() {
let service = create_test_service(); let handler = create_test_handler();
let handler = SessionCommandHandler::new(service); let store = handler.store.clone();
let ctx = CommandContext::new("test", "test"); let session = store.create_session("cli", Some("test session")).unwrap();
let ctx = CommandContext::new("test", "cli")
.with_session_id(&session.id)
.with_chat_id(&session.id);
let cmd = Command::CreateSession { title: None }; let cmd = Command::CreateSession { title: None };
let result = handler.handle(cmd, ctx).await; let result = handler.handle(cmd, ctx).await;
@ -102,15 +142,11 @@ mod tests {
assert!(result.is_ok()); assert!(result.is_ok());
let resp = result.unwrap(); let resp = result.unwrap();
assert!(resp.success); assert!(resp.success);
assert_eq!(resp.messages.len(), 1);
// 自动生成的标题
assert!(!resp.messages[0].content.is_empty());
} }
#[test] #[test]
fn test_can_handle() { fn test_can_handle() {
let service = create_test_service(); let handler = create_test_handler();
let handler = SessionCommandHandler::new(service);
assert!(handler.can_handle(&Command::CreateSession { title: None })); assert!(handler.can_handle(&Command::CreateSession { title: None }));
} }

View File

@ -2,28 +2,39 @@ use crate::command::context::CommandContext;
use crate::command::handler::CommandHandler; use crate::command::handler::CommandHandler;
use crate::command::response::{CommandError, CommandResponse, MessageKind}; use crate::command::response::{CommandError, CommandResponse, MessageKind};
use crate::command::Command; use crate::command::Command;
use crate::gateway::cli_session::CliSessionService; use crate::gateway::session::SessionManager;
use crate::protocol::SessionSummary; use crate::storage::SessionStore;
use async_trait::async_trait; use async_trait::async_trait;
use std::sync::Arc;
/// 会话查询命令处理器 /// 会话查询命令处理器
/// ///
/// 处理 ListSessions 和 LoadSession 命令 /// 处理 ListSessions、LoadSession 和 SwitchSession 命令(现在操作 Topic
pub struct SessionQueryCommandHandler { pub struct SessionQueryCommandHandler {
cli_sessions: CliSessionService, store: Arc<SessionStore>,
session_manager: Option<SessionManager>,
} }
impl SessionQueryCommandHandler { impl SessionQueryCommandHandler {
/// 创建新的会话查询命令处理器 /// 创建新的会话查询命令处理器
pub fn new(cli_sessions: CliSessionService) -> Self { pub fn new(store: Arc<SessionStore>) -> Self {
Self { cli_sessions } Self {
store,
session_manager: None,
}
}
/// 设置 SessionManager用于 SwitchSession 命令)
pub fn with_session_manager(mut self, session_manager: SessionManager) -> Self {
self.session_manager = Some(session_manager);
self
} }
} }
#[async_trait] #[async_trait]
impl CommandHandler for SessionQueryCommandHandler { impl CommandHandler for SessionQueryCommandHandler {
fn can_handle(&self, cmd: &Command) -> bool { fn can_handle(&self, cmd: &Command) -> bool {
matches!(cmd, Command::ListSessions { .. } | Command::LoadSession { .. }) matches!(cmd, Command::ListSessions { .. } | Command::LoadSession { .. } | Command::SwitchSession { .. } | Command::GetCurrentSession)
} }
async fn handle( async fn handle(
@ -38,82 +49,228 @@ impl CommandHandler for SessionQueryCommandHandler {
Command::LoadSession { session_id } => { Command::LoadSession { session_id } => {
handle_load_session(self, session_id, ctx).await handle_load_session(self, session_id, ctx).await
} }
Command::SwitchSession { session_id } => {
handle_switch_session(self, session_id, ctx).await
}
Command::GetCurrentSession => {
handle_get_current_session(self, ctx).await
}
_ => unreachable!(), _ => unreachable!(),
} }
} }
} }
/// 处理列出会话命令 /// 处理列出命令
async fn handle_list_sessions( async fn handle_list_sessions(
handler: &SessionQueryCommandHandler, handler: &SessionQueryCommandHandler,
include_archived: bool, _include_archived: bool,
ctx: CommandContext, ctx: CommandContext,
) -> Result<CommandResponse, CommandError> { ) -> Result<CommandResponse, CommandError> {
let records = handler // 获取当前 session_id
.cli_sessions let session_id = ctx.session_id.as_deref()
.list(include_archived) .ok_or_else(|| CommandError::new("NO_SESSION", "No active session"))?;
.map_err(|e| CommandError::new("LIST_SESSIONS_ERROR", e.to_string()))?;
let summaries: Vec<SessionSummary> = records // 查询该 session 的所有 topic
.into_iter() let topics = handler
.map(|r| SessionSummary { .store
session_id: r.id, .list_topics(session_id)
title: r.title, .map_err(|e| CommandError::new("LIST_TOPICS_ERROR", e.to_string()))?;
channel_name: r.channel_name,
chat_id: r.chat_id,
message_count: r.message_count,
last_active_at: r.last_active_at,
archived_at: r.archived_at,
})
.collect();
// 将会话列表序列化为 JSON 存储在 metadata 中 // 获取当前 topic ID
let sessions_json = let current_topic_id = ctx.topic_id.as_deref().unwrap_or("");
serde_json::to_string(&summaries).map_err(|e| CommandError::new("SERIALIZE_ERROR", e.to_string()))?;
// 构建可读的会话列表消息 // 构建表格格式的话题列表消息
let message = if summaries.is_empty() { let message = if topics.is_empty() {
"No sessions found.".to_string() "No topics found. Use /new <title> to create a topic.".to_string()
} else { } else {
let mut lines = vec![format!("Found {} session(s):", summaries.len())]; let mut lines = vec![format!("Found {} topic(s):", topics.len())];
for summary in &summaries { lines.push(String::new());
let archived_info = summary
.archived_at // 表格头部
.map(|_| " [archived]") lines.push("┌────┬─────────────────┬──────────────────────┬──────────┬─────────────────┐".to_string());
.unwrap_or(""); lines.push("│ No │ Topic ID │ Title │ Messages │ Last Active │".to_string());
lines.push("├────┼─────────────────┼──────────────────────┼──────────┼─────────────────┤".to_string());
// 表格内容
for (idx, topic) in topics.iter().enumerate() {
let row_num = idx + 1;
let is_current = topic.id == current_topic_id;
let num_marker = if is_current { " * ".to_string() } else { format!(" {:<2}", row_num) };
// 截断过长的字段
let topic_id_display = if topic.id.len() > 15 {
format!("{}...", &topic.id[..12])
} else {
topic.id.clone()
};
let title_display = if topic.title.len() > 20 {
format!("{}...", &topic.title[..17])
} else {
topic.title.clone()
};
let last_active = format_time_ago(topic.last_active_at);
lines.push(format!( lines.push(format!(
" - {}: {}{}", "│{}│ {:<15} │ {:<20} │ {:<8} │ {:<15} │",
summary.session_id, summary.title, archived_info num_marker,
topic_id_display,
title_display,
topic.message_count,
last_active
)); ));
} }
lines.push("".to_string());
lines.push("Use /use <session_id> to switch to a session".to_string()); // 表格底部
lines.push("└────┴─────────────────┴──────────────────────┴──────────┴─────────────────┘".to_string());
lines.push(String::new());
lines.push("* = current topic".to_string());
lines.push("Use /use <number> or /use <topic_id> to switch".to_string());
lines.join("\n") lines.join("\n")
}; };
let topics_json = serde_json::to_string(&topics)
.map_err(|e| CommandError::new("SERIALIZE_ERROR", e.to_string()))?;
Ok(CommandResponse::success(ctx.request_id) Ok(CommandResponse::success(ctx.request_id)
.with_message(MessageKind::Notification, &message) .with_message(MessageKind::Notification, &message)
.with_metadata("sessions", &sessions_json) .with_metadata("topics", &topics_json)
.with_metadata("count", &summaries.len().to_string())) .with_metadata("count", &topics.len().to_string())
.with_metadata("current_topic_id", current_topic_id))
} }
/// 处理加载会话命令 /// 处理加载命令
async fn handle_load_session( async fn handle_load_session(
handler: &SessionQueryCommandHandler, handler: &SessionQueryCommandHandler,
session_id: String, topic_id: String,
ctx: CommandContext, ctx: CommandContext,
) -> Result<CommandResponse, CommandError> { ) -> Result<CommandResponse, CommandError> {
let record = handler let topic = handler
.cli_sessions .store
.get(&session_id) .get_topic(&topic_id)
.map_err(|e| CommandError::new("LOAD_SESSION_ERROR", e.to_string()))? .map_err(|e| CommandError::new("LOAD_TOPIC_ERROR", e.to_string()))?
.ok_or_else(|| CommandError::new("SESSION_NOT_FOUND", format!("Session not found: {}", session_id)))?; .ok_or_else(|| CommandError::new("TOPIC_NOT_FOUND", format!("Topic not found: {}", topic_id)))?;
Ok(CommandResponse::success(ctx.request_id) Ok(CommandResponse::success(ctx.request_id)
.with_message(MessageKind::Notification, &record.title) .with_message(MessageKind::Notification, &topic.title)
.with_metadata("session_id", &record.id) .with_metadata("topic_id", &topic.id)
.with_metadata("title", &record.title) .with_metadata("title", &topic.title)
.with_metadata("message_count", &record.message_count.to_string())) .with_metadata("message_count", &topic.message_count.to_string()))
}
/// 格式化时间为相对时间(如 "2 mins ago"
fn format_time_ago(timestamp_ms: i64) -> String {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64;
let diff_ms = now - timestamp_ms;
let diff_secs = diff_ms / 1000;
if diff_secs < 60 {
"just now".to_string()
} else if diff_secs < 3600 {
format!("{} mins ago", diff_secs / 60)
} else if diff_secs < 86400 {
format!("{} hours ago", diff_secs / 3600)
} else {
format!("{} days ago", diff_secs / 86400)
}
}
/// 处理获取当前话题命令
async fn handle_get_current_session(
handler: &SessionQueryCommandHandler,
ctx: CommandContext,
) -> Result<CommandResponse, CommandError> {
let topic_id = ctx.topic_id.as_deref()
.ok_or_else(|| CommandError::new("NO_CURRENT_TOPIC", "No current topic"))?;
let topic = handler
.store
.get_topic(topic_id)
.map_err(|e| CommandError::new("GET_TOPIC_ERROR", e.to_string()))?
.ok_or_else(|| CommandError::new("TOPIC_NOT_FOUND", format!("Topic not found: {}", topic_id)))?;
let last_active = format_time_ago(topic.last_active_at);
let created_at = format_time_ago(topic.created_at);
let message = format!(
"Current Topic:\n\n Topic ID: {}\n Title: {}\n Messages: {}\n Created: {}\n Last Active: {}",
topic.id,
topic.title,
topic.message_count,
created_at,
last_active
);
Ok(CommandResponse::success(ctx.request_id)
.with_message(MessageKind::Notification, &message)
.with_metadata("topic_id", &topic.id)
.with_metadata("title", &topic.title)
.with_metadata("message_count", &topic.message_count.to_string()))
}
/// 处理切换话题命令
async fn handle_switch_session(
handler: &SessionQueryCommandHandler,
topic_id: String,
ctx: CommandContext,
) -> Result<CommandResponse, CommandError> {
// 获取当前 session_id 和 chat_id
let session_id = ctx.session_id.as_deref()
.ok_or_else(|| CommandError::new("NO_SESSION", "No active session"))?;
let chat_id = ctx.chat_id.as_deref()
.ok_or_else(|| CommandError::new("NO_CHAT_ID", "No chat_id in context"))?;
// 尝试解析为序号
let target_topic_id = if let Ok(index) = topic_id.parse::<usize>() {
let topics = handler
.store
.list_topics(session_id)
.map_err(|e| CommandError::new("LIST_TOPICS_ERROR", e.to_string()))?;
let index = index.saturating_sub(1);
if index >= topics.len() {
return Err(CommandError::new(
"INVALID_TOPIC_INDEX",
format!("Topic index {} is out of range (1-{})", index + 1, topics.len())
));
}
topics[index].id.clone()
} else {
topic_id
};
// 验证目标话题存在
let topic = handler
.store
.get_topic(&target_topic_id)
.map_err(|e| CommandError::new("SWITCH_TOPIC_ERROR", e.to_string()))?
.ok_or_else(|| CommandError::new("TOPIC_NOT_FOUND", format!("Topic not found: {}", target_topic_id)))?;
// 如果有 SessionManager实际切换话题历史
if let Some(ref session_manager) = handler.session_manager {
if let Some(session) = session_manager.get(&ctx.channel_name).await {
let mut session_guard = session.lock().await;
session_guard.switch_topic(chat_id, &target_topic_id)
.map_err(|e| CommandError::new("SWITCH_TOPIC_ERROR", e.to_string()))?;
}
}
// 返回切换成功响应
let message = format!(
"✓ Switched to topic: {} ({} messages)",
topic.title, topic.message_count
);
Ok(CommandResponse::success(ctx.request_id)
.with_message(MessageKind::Notification, &message)
.with_metadata("topic_id", &topic.id)
.with_metadata("title", &topic.title)
.with_metadata("message_count", &topic.message_count.to_string()))
} }
#[cfg(test)] #[cfg(test)]
@ -122,16 +279,20 @@ mod tests {
use crate::storage::SessionStore; use crate::storage::SessionStore;
use std::sync::Arc; use std::sync::Arc;
fn create_test_service() -> CliSessionService { fn create_test_handler() -> SessionQueryCommandHandler {
let store = Arc::new(SessionStore::in_memory().unwrap()); let store = Arc::new(SessionStore::in_memory().unwrap());
CliSessionService::new(store) SessionQueryCommandHandler::new(store)
} }
#[tokio::test] #[tokio::test]
async fn test_list_sessions_empty() { async fn test_list_sessions_empty() {
let service = create_test_service(); let handler = create_test_handler();
let handler = SessionQueryCommandHandler::new(service); // 需要先创建一个 session 和 topic
let ctx = CommandContext::new("test", "test"); let store = handler.store.clone();
let session = store.create_session("cli", Some("test")).unwrap();
let ctx = CommandContext::new("test", "cli")
.with_session_id(&session.id);
let cmd = Command::ListSessions { let cmd = Command::ListSessions {
include_archived: false, include_archived: false,
}; };
@ -141,18 +302,20 @@ mod tests {
assert!(result.is_ok()); assert!(result.is_ok());
let resp = result.unwrap(); let resp = result.unwrap();
assert!(resp.success); assert!(resp.success);
assert!(resp.messages[0].content.contains("No sessions")); assert!(resp.messages[0].content.contains("No topics"));
} }
#[tokio::test] #[tokio::test]
async fn test_list_sessions_with_items() { async fn test_list_sessions_with_items() {
let service = create_test_service(); let handler = create_test_handler();
let handler = SessionQueryCommandHandler::new(service.clone()); let store = handler.store.clone();
let session = store.create_session("cli", Some("test")).unwrap();
// 创建一些会话 // 创建一个 topic
service.create(Some("test session")).unwrap(); store.create_topic(&session.id, "Test Topic", None).unwrap();
let ctx = CommandContext::new("test", "test"); let ctx = CommandContext::new("test", "cli")
.with_session_id(&session.id);
let cmd = Command::ListSessions { let cmd = Command::ListSessions {
include_archived: false, include_archived: false,
}; };
@ -162,14 +325,17 @@ mod tests {
assert!(result.is_ok()); assert!(result.is_ok());
let resp = result.unwrap(); let resp = result.unwrap();
assert!(resp.success); assert!(resp.success);
assert!(resp.metadata.contains_key("sessions")); assert!(resp.metadata.contains_key("topics"));
} }
#[tokio::test] #[tokio::test]
async fn test_load_session_not_found() { async fn test_load_session_not_found() {
let service = create_test_service(); let handler = create_test_handler();
let handler = SessionQueryCommandHandler::new(service); let store = handler.store.clone();
let ctx = CommandContext::new("test", "test"); let session = store.create_session("cli", Some("test")).unwrap();
let ctx = CommandContext::new("test", "test")
.with_session_id(&session.id);
let cmd = Command::LoadSession { let cmd = Command::LoadSession {
session_id: "nonexistent".to_string(), session_id: "nonexistent".to_string(),
}; };
@ -181,15 +347,15 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_load_session_success() { async fn test_load_session_success() {
let service = create_test_service(); let handler = create_test_handler();
let handler = SessionQueryCommandHandler::new(service.clone()); let store = handler.store.clone();
let session = store.create_session("cli", Some("test")).unwrap();
let topic = store.create_topic(&session.id, "Test Topic", None).unwrap();
// 创建会话 let ctx = CommandContext::new("test", "test")
let record = service.create(Some("test session")).unwrap(); .with_session_id(&session.id);
let ctx = CommandContext::new("test", "test");
let cmd = Command::LoadSession { let cmd = Command::LoadSession {
session_id: record.id.clone(), session_id: topic.id.clone(),
}; };
let result = handler.handle(cmd, ctx).await; let result = handler.handle(cmd, ctx).await;
@ -197,6 +363,6 @@ mod tests {
assert!(result.is_ok()); assert!(result.is_ok());
let resp = result.unwrap(); let resp = result.unwrap();
assert!(resp.success); assert!(resp.success);
assert_eq!(resp.metadata.get("session_id").unwrap(), &record.id); assert_eq!(resp.metadata.get("topic_id").unwrap(), &topic.id);
} }
} }

View File

@ -11,17 +11,21 @@ use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")] #[serde(tag = "type", rename_all = "snake_case")]
pub enum Command { pub enum Command {
/// 创建新 /// 创建新题(在同一个 Session 内)
CreateSession { title: Option<String> }, CreateSession { title: Option<String> },
/// 保存话内容到 Markdown 文件 /// 保存内容到 Markdown 文件
SaveSession { SaveSession {
filepath: Option<String>, filepath: Option<String>,
include_all: bool, include_all: bool,
}, },
/// 列出会话 /// 列出当前 Session 的所有话题
ListSessions { include_archived: bool }, ListSessions { include_archived: bool },
/// 加载指定 /// 加载指定
LoadSession { session_id: String }, LoadSession { session_id: String },
/// 切换到指定话题(清理当前历史并加载新话题)
SwitchSession { session_id: String },
/// 获取当前话题信息
GetCurrentSession,
} }
impl Command { impl Command {
@ -32,6 +36,8 @@ impl Command {
Command::SaveSession { .. } => "save_session", Command::SaveSession { .. } => "save_session",
Command::ListSessions { .. } => "list_sessions", Command::ListSessions { .. } => "list_sessions",
Command::LoadSession { .. } => "load_session", Command::LoadSession { .. } => "load_session",
Command::SwitchSession { .. } => "switch_session",
Command::GetCurrentSession => "get_current_session",
} }
} }
} }

View File

@ -42,6 +42,17 @@ impl CliSessionService {
.map_err(|err| AgentError::Other(format!("list sessions error: {}", err))) .map_err(|err| AgentError::Other(format!("list sessions error: {}", err)))
} }
/// 列出指定通道的会话
pub(crate) fn list_by_channel(
&self,
channel_name: &str,
include_archived: bool,
) -> Result<Vec<SessionRecord>, AgentError> {
self.store
.list_sessions(channel_name, include_archived)
.map_err(|err| AgentError::Other(format!("list sessions error: {}", err)))
}
pub(crate) fn rename(&self, session_id: &str, title: &str) -> Result<(), AgentError> { pub(crate) fn rename(&self, session_id: &str, title: &str) -> Result<(), AgentError> {
self.store self.store
.rename_session(session_id, title) .rename_session(session_id, title)

View File

@ -114,6 +114,7 @@ impl AgentExecutionService {
OutboundMessage::from_chat_message( OutboundMessage::from_chat_message(
request.channel_name, request.channel_name,
request.chat_id, request.chat_id,
None, // session_id
None, None,
request.metadata, request.metadata,
message, message,

View File

@ -14,6 +14,7 @@ use crate::command::Command;
use crate::config::LLMProviderConfig; use crate::config::LLMProviderConfig;
use crate::gateway::agent_prompt_provider::AgentPromptProvider; use crate::gateway::agent_prompt_provider::AgentPromptProvider;
use crate::skills::SkillPromptProvider; use crate::skills::SkillPromptProvider;
use crate::storage::persistent_session_id;
use super::session::{BusToolCallEmitter, SessionManager}; use super::session::{BusToolCallEmitter, SessionManager};
@ -35,13 +36,17 @@ impl InboundProcessor {
) -> Self { ) -> Self {
// 创建命令路由器并注册处理器 // 创建命令路由器并注册处理器
let mut command_router = CommandRouter::new(); let mut command_router = CommandRouter::new();
let store = session_manager.store();
// 注册 Session 处理器 // 注册 Session 处理器
let cli_sessions = session_manager.cli_sessions(); let session_handler = SessionCommandHandler::new(store.clone())
command_router.register(Box::new(SessionCommandHandler::new(cli_sessions.clone()))); .with_session_manager(session_manager.clone());
command_router.register(Box::new(session_handler));
// 注册 session_query 处理器 // 注册 session_query 处理器
command_router.register(Box::new(SessionQueryCommandHandler::new(cli_sessions))); let session_query_handler = SessionQueryCommandHandler::new(store)
.with_session_manager(session_manager.clone());
command_router.register(Box::new(session_query_handler));
// 注册 save_session 处理器 // 注册 save_session 处理器
let store = session_manager.store(); let store = session_manager.store();
@ -115,42 +120,37 @@ impl InboundProcessor {
} }
async fn process_one(&self, inbound: InboundMessage) -> Result<(), AgentError> { async fn process_one(&self, inbound: InboundMessage) -> Result<(), AgentError> {
// 计算正确的 session_id根据 channel_name 和 chat_id
let session_id = persistent_session_id(&inbound.channel, &inbound.chat_id);
// 使用 ChannelInputAdapter 尝试解析命令 // 使用 ChannelInputAdapter 尝试解析命令
let adapter = ChannelInputAdapter::new(); let adapter = ChannelInputAdapter::new();
let ctx = crate::command::context::AdapterContext::new(&inbound.channel) let ctx = crate::command::context::AdapterContext::new(&inbound.channel)
.with_session_id(&inbound.chat_id); .with_session_id(&session_id);
if let Ok(Some(cmd)) = adapter.try_parse(&inbound.content, ctx) { if let Ok(Some(cmd)) = adapter.try_parse(&inbound.content, ctx) {
// 使用命令路由器处理 // 使用命令路由器处理
let cmd_ctx = crate::command::context::CommandContext::new(&inbound.channel, &inbound.channel) let cmd_ctx = crate::command::context::CommandContext::new(&inbound.channel, &inbound.channel)
.with_session_id(&inbound.chat_id); .with_session_id(&session_id)
.with_chat_id(&inbound.chat_id);
// 记录是否是创建会话命令(用于后续自动切换 // 记录是否是创建会话命令(用于后续处理
let is_create_session = matches!(cmd, Command::CreateSession { .. }); let _is_create_session = matches!(cmd, Command::CreateSession { .. });
let response = self.command_router.dispatch_with_response(cmd, cmd_ctx).await; let response = self.command_router.dispatch_with_response(cmd, cmd_ctx).await;
// 发送响应给用户 // 发送响应给用户
if response.success { if response.success {
// 如果是创建会话,更新 chat_id 到新会话
let target_chat_id = if let Some(session_id) = response.metadata.get("session_id") {
if is_create_session {
// 自动切换到新会话
session_id.clone()
} else {
inbound.chat_id.clone()
}
} else {
inbound.chat_id.clone()
};
// 提取响应消息 // 提取响应消息
// chat_id 保持为 inbound.chat_id飞书 open_id
// session_id 放入 metadata 用于会话管理
for msg in &response.messages { for msg in &response.messages {
if let Err(error) = self if let Err(error) = self
.bus .bus
.publish_outbound(OutboundMessage::assistant( .publish_outbound(OutboundMessage::assistant(
inbound.channel.clone(), inbound.channel.clone(),
target_chat_id.clone(), inbound.chat_id.clone(),
response.metadata.get("session_id").cloned(),
msg.content.clone(), msg.content.clone(),
None, None,
inbound.forwarded_metadata.clone(), inbound.forwarded_metadata.clone(),
@ -166,6 +166,7 @@ impl InboundProcessor {
.publish_outbound(OutboundMessage::assistant( .publish_outbound(OutboundMessage::assistant(
inbound.channel.clone(), inbound.channel.clone(),
inbound.chat_id.clone(), inbound.chat_id.clone(),
response.metadata.get("session_id").cloned(),
format!("Error [{}]: {}", error.code, error.message), format!("Error [{}]: {}", error.code, error.message),
None, None,
inbound.forwarded_metadata.clone(), inbound.forwarded_metadata.clone(),
@ -216,6 +217,7 @@ impl InboundProcessor {
.publish_outbound(OutboundMessage::error_notification( .publish_outbound(OutboundMessage::error_notification(
inbound.channel, inbound.channel,
inbound.chat_id, inbound.chat_id,
None, // session_id
error.to_string(), error.to_string(),
None, None,
metadata, metadata,

View File

@ -102,6 +102,7 @@ pub(crate) fn build_session_manager_with_sender(
agent_factory, agent_factory,
conversations, conversations,
skill_events, skill_events,
store.clone(),
chat_history_ttl_hours, chat_history_ttl_hours,
); );
let lifecycle = SessionLifecycleService::new(session_factory, session_ttl_hours); let lifecycle = SessionLifecycleService::new(session_factory, session_ttl_hours);

View File

@ -32,6 +32,7 @@ use super::session_message_service::SessionMessageService;
/// Session 按 channel 隔离,每个 channel 一个 Session /// Session 按 channel 隔离,每个 channel 一个 Session
/// History 按 chat_id 隔离,由 Session 统一管理 /// History 按 chat_id 隔离,由 Session 统一管理
/// Topic 按 chat_id 隔离,存储在 SessionHistory 中
pub struct Session { pub struct Session {
pub id: Uuid, pub id: Uuid,
pub channel_name: String, pub channel_name: String,
@ -41,6 +42,7 @@ pub struct Session {
agent_factory: AgentFactory, agent_factory: AgentFactory,
compressor: ContextCompressor, compressor: ContextCompressor,
history: SessionHistory, history: SessionHistory,
store: Arc<SessionStore>,
} }
pub struct BusToolCallEmitter { pub struct BusToolCallEmitter {
@ -79,6 +81,7 @@ impl EmittedMessageHandler for BusToolCallEmitter {
for outbound in OutboundMessage::from_chat_message( for outbound in OutboundMessage::from_chat_message(
&self.channel_name, &self.channel_name,
&self.chat_id, &self.chat_id,
None, // session_id
None, None,
&self.metadata, &self.metadata,
&message, &message,
@ -119,6 +122,7 @@ impl Session {
conversations, conversations,
skill_events, skill_events,
chat_history_ttl_hours, chat_history_ttl_hours,
store,
) )
.await .await
} }
@ -132,6 +136,7 @@ impl Session {
conversations: Arc<dyn ConversationRepository>, conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>, skill_events: Arc<dyn SkillEventRepository>,
chat_history_ttl_hours: Option<u64>, chat_history_ttl_hours: Option<u64>,
store: Arc<SessionStore>,
) -> Result<Self, AgentError> { ) -> Result<Self, AgentError> {
Ok(Self { Ok(Self {
id: Uuid::new_v4(), id: Uuid::new_v4(),
@ -147,6 +152,7 @@ impl Session {
skill_events, skill_events,
chat_history_ttl_hours, chat_history_ttl_hours,
), ),
store,
}) })
} }
@ -154,14 +160,78 @@ impl Session {
self.history.persistent_session_id(chat_id) self.history.persistent_session_id(chat_id)
} }
/// 设置当前话题 ID指定 chat
pub fn set_current_topic(&mut self, chat_id: &str, topic_id: Option<String>) {
if let Some(topic_id) = topic_id {
self.history.set_chat_topic(chat_id, topic_id);
} else {
self.history.clear_chat_topic(chat_id);
}
}
/// 获取当前话题 ID指定 chat
pub fn current_topic(&self, chat_id: &str) -> Option<&str> {
self.history.chat_topic(chat_id)
}
/// 获取历史所对应的话题 ID指定 chat
pub fn history_topic(&self, chat_id: &str) -> Option<&str> {
self.history.history_topic(chat_id)
}
/// 切换话题 - 清除当前历史并加载新话题的历史
pub fn switch_topic(&mut self, chat_id: &str, topic_id: &str) -> Result<(), AgentError> {
// 清除当前历史
self.history.remove_history(chat_id);
// 加载新话题的历史
let messages = self
.store
.load_messages_for_topic(topic_id)
.map_err(|e| AgentError::Other(format!("load topic messages error: {}", e)))?;
self.history.set_history(chat_id, messages);
self.history.set_chat_topic(chat_id, topic_id.to_string());
tracing::info!(
topic_id = %topic_id,
chat_id = %chat_id,
"Switched to topic"
);
Ok(())
}
pub fn ensure_persistent_session(&self, chat_id: &str) -> Result<SessionRecord, AgentError> { pub fn ensure_persistent_session(&self, chat_id: &str) -> Result<SessionRecord, AgentError> {
self.history.ensure_persistent_session(chat_id) self.history.ensure_persistent_session(chat_id)
} }
pub fn ensure_chat_loaded(&mut self, chat_id: &str) -> Result<(), AgentError> { pub fn ensure_chat_loaded(&mut self, chat_id: &str) -> Result<(), AgentError> {
// 检查历史是否存在且对应正确的话题
let current_topic = self.history.chat_topic(chat_id);
let stored_topic = self.history.history_topic(chat_id);
if self.chat_history_exists(chat_id) {
// 如果历史已存在,但话题不匹配,需要重新加载
if current_topic != stored_topic {
tracing::info!(
chat_id = %chat_id,
current_topic = ?current_topic,
stored_topic = ?stored_topic,
"Topic changed, reloading history"
);
self.reload_chat_history(chat_id)?;
}
return Ok(());
}
// 历史不存在,正常加载
self.history.ensure_chat_loaded(chat_id) self.history.ensure_chat_loaded(chat_id)
} }
fn chat_history_exists(&self, chat_id: &str) -> bool {
self.history.get_history(chat_id).is_some()
}
pub fn ensure_agent_prompt_before_user_message( pub fn ensure_agent_prompt_before_user_message(
&mut self, &mut self,
chat_id: &str, chat_id: &str,
@ -197,13 +267,29 @@ impl Session {
self.history.reset_chat_context(chat_id) self.history.reset_chat_context(chat_id)
} }
/// 将消息写入内存与持久化层 /// 将消息写入内存与持久化层(使用当前 topic
pub fn append_persisted_message( pub fn append_persisted_message(
&mut self, &mut self,
chat_id: &str, chat_id: &str,
message: ChatMessage, message: ChatMessage,
) -> Result<(), AgentError> { ) -> Result<(), AgentError> {
self.history.append_persisted_message(chat_id, message) let session_id = self.persistent_session_id(chat_id);
let topic_id = self.history.chat_topic(chat_id).map(|s| s.to_string());
self.store
.append_message_with_topic(&session_id, topic_id.as_deref(), &message)
.map_err(|err| {
AgentError::Other(format!("append message persistence error: {}", err))
})?;
self.add_message(chat_id, message);
// 更新 topic 的最后活跃时间
if let Some(ref topic_id) = topic_id {
if let Err(e) = self.store.touch_topic(topic_id) {
tracing::warn!(error = %e, topic_id = %topic_id, "Failed to touch topic");
}
}
Ok(())
} }
pub fn append_persisted_messages<I>( pub fn append_persisted_messages<I>(
@ -282,7 +368,18 @@ impl Session {
} }
pub(crate) fn reload_chat_history(&mut self, chat_id: &str) -> Result<(), AgentError> { pub(crate) fn reload_chat_history(&mut self, chat_id: &str) -> Result<(), AgentError> {
self.history.reload_chat_history(chat_id) // 如果当前有 topic加载该 topic 的消息
if let Some(topic_id) = self.history.chat_topic(chat_id) {
let messages = self
.store
.load_messages_for_topic(topic_id)
.map_err(|e| AgentError::Other(format!("load topic messages error: {}", e)))?;
self.history.set_history(chat_id, messages);
} else {
// 否则加载 session 的所有消息
self.history.reload_chat_history(chat_id)?;
}
Ok(())
} }
pub(crate) fn store(&self) -> Arc<dyn ConversationRepository> { pub(crate) fn store(&self) -> Arc<dyn ConversationRepository> {

View File

@ -6,7 +6,7 @@ use crate::agent::AgentError;
use crate::config::LLMProviderConfig; use crate::config::LLMProviderConfig;
use crate::protocol::WsOutbound; use crate::protocol::WsOutbound;
use crate::skills::SkillRuntime; use crate::skills::SkillRuntime;
use crate::storage::{ConversationRepository, SkillEventRepository}; use crate::storage::{ConversationRepository, SessionStore, SkillEventRepository};
use super::agent_factory::AgentFactory; use super::agent_factory::AgentFactory;
use super::session::Session; use super::session::Session;
@ -18,6 +18,7 @@ pub(crate) struct SessionFactory {
agent_factory: AgentFactory, agent_factory: AgentFactory,
conversations: Arc<dyn ConversationRepository>, conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>, skill_events: Arc<dyn SkillEventRepository>,
store: Arc<SessionStore>,
chat_history_ttl_hours: Option<u64>, chat_history_ttl_hours: Option<u64>,
} }
@ -28,6 +29,7 @@ impl SessionFactory {
agent_factory: AgentFactory, agent_factory: AgentFactory,
conversations: Arc<dyn ConversationRepository>, conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>, skill_events: Arc<dyn SkillEventRepository>,
store: Arc<SessionStore>,
chat_history_ttl_hours: Option<u64>, chat_history_ttl_hours: Option<u64>,
) -> Self { ) -> Self {
Self { Self {
@ -36,6 +38,7 @@ impl SessionFactory {
agent_factory, agent_factory,
conversations, conversations,
skill_events, skill_events,
store,
chat_history_ttl_hours, chat_history_ttl_hours,
} }
} }
@ -54,6 +57,7 @@ impl SessionFactory {
self.conversations.clone(), self.conversations.clone(),
self.skill_events.clone(), self.skill_events.clone(),
self.chat_history_ttl_hours, self.chat_history_ttl_hours,
self.store.clone(),
) )
.await .await
} }

View File

@ -25,6 +25,8 @@ fn current_timestamp() -> i64 {
pub(crate) struct SessionHistory { pub(crate) struct SessionHistory {
channel_name: String, channel_name: String,
chat_histories: HashMap<String, Vec<ChatMessage>>, chat_histories: HashMap<String, Vec<ChatMessage>>,
chat_topic_ids: HashMap<String, String>, // 每个 chat 的当前 topic
history_topic_ids: HashMap<String, String>, // 每个 chat 的历史所对应的话题
compression_in_flight: HashSet<String>, compression_in_flight: HashSet<String>,
conversations: Arc<dyn ConversationRepository>, conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>, skill_events: Arc<dyn SkillEventRepository>,
@ -41,6 +43,8 @@ impl SessionHistory {
Self { Self {
channel_name: channel_name.into(), channel_name: channel_name.into(),
chat_histories: HashMap::new(), chat_histories: HashMap::new(),
chat_topic_ids: HashMap::new(),
history_topic_ids: HashMap::new(),
compression_in_flight: HashSet::new(), compression_in_flight: HashSet::new(),
conversations, conversations,
skill_events, skill_events,
@ -113,6 +117,34 @@ impl SessionHistory {
self.chat_histories.get(chat_id) self.chat_histories.get(chat_id)
} }
pub(crate) fn set_history(&mut self, chat_id: &str, history: Vec<ChatMessage>) {
self.chat_histories.insert(chat_id.to_string(), history);
// 记录历史对应的话题(当前设置的话题)
if let Some(topic_id) = self.chat_topic_ids.get(chat_id) {
self.history_topic_ids.insert(chat_id.to_string(), topic_id.clone());
}
}
/// 获取指定 chat 的历史所对应的话题
pub(crate) fn history_topic(&self, chat_id: &str) -> Option<&str> {
self.history_topic_ids.get(chat_id).map(|s| s.as_str())
}
/// 设置指定 chat 的当前 topic
pub(crate) fn set_chat_topic(&mut self, chat_id: &str, topic_id: String) {
self.chat_topic_ids.insert(chat_id.to_string(), topic_id);
}
/// 获取指定 chat 的当前 topic
pub(crate) fn chat_topic(&self, chat_id: &str) -> Option<&str> {
self.chat_topic_ids.get(chat_id).map(|s| s.as_str())
}
/// 清除指定 chat 的 topic
pub(crate) fn clear_chat_topic(&mut self, chat_id: &str) {
self.chat_topic_ids.remove(chat_id);
}
pub(crate) fn add_message(&mut self, chat_id: &str, message: ChatMessage) { pub(crate) fn add_message(&mut self, chat_id: &str, message: ChatMessage) {
self.get_or_create_history(chat_id).push(message); self.get_or_create_history(chat_id).push(message);
} }
@ -120,6 +152,7 @@ impl SessionHistory {
pub(crate) fn remove_history(&mut self, chat_id: &str) { pub(crate) fn remove_history(&mut self, chat_id: &str) {
self.chat_histories.remove(chat_id); self.chat_histories.remove(chat_id);
self.compression_in_flight.remove(chat_id); self.compression_in_flight.remove(chat_id);
self.history_topic_ids.remove(chat_id);
} }
pub(crate) fn clear_chat_history(&mut self, chat_id: &str) -> Result<(), AgentError> { pub(crate) fn clear_chat_history(&mut self, chat_id: &str) -> Result<(), AgentError> {

View File

@ -47,6 +47,7 @@ impl SessionMessageSender for BusSessionMessageSender {
.publish_outbound(OutboundMessage::assistant( .publish_outbound(OutboundMessage::assistant(
channel_name.to_string(), channel_name.to_string(),
chat_id.to_string(), chat_id.to_string(),
None, // session_id
text, text,
None, None,
metadata.clone(), metadata.clone(),
@ -68,6 +69,7 @@ impl SessionMessageSender for BusSessionMessageSender {
let mut outbound = OutboundMessage::assistant( let mut outbound = OutboundMessage::assistant(
channel_name.to_string(), channel_name.to_string(),
chat_id.to_string(), chat_id.to_string(),
None, // session_id
String::new(), String::new(),
None, None,
metadata.clone(), metadata.clone(),

View File

@ -44,6 +44,7 @@ impl SessionMessageService {
} }
let session = self.lifecycle.active_session(channel_name).await?; let session = self.lifecycle.active_session(channel_name).await?;
let outbound_messages = AgentExecutionService::new(self.show_tool_results) let outbound_messages = AgentExecutionService::new(self.show_tool_results)
.prepare_and_execute_message(MessageExecutionRequest { .prepare_and_execute_message(MessageExecutionRequest {
session, session,

View File

@ -41,6 +41,7 @@ async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
let runtime_session_id = uuid::Uuid::new_v4().to_string(); let runtime_session_id = uuid::Uuid::new_v4().to_string();
let mut current_session_id = initial_record.id.clone(); let mut current_session_id = initial_record.id.clone();
let mut current_topic_id: Option<String> = None;
state state
.channel_manager .channel_manager
.cli_channel() .cli_channel()
@ -85,6 +86,7 @@ async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
&sender, &sender,
&runtime_session_id, &runtime_session_id,
&mut current_session_id, &mut current_session_id,
&mut current_topic_id,
inbound, inbound,
) )
.await .await
@ -132,6 +134,7 @@ async fn handle_inbound(
sender: &mpsc::Sender<WsOutbound>, sender: &mpsc::Sender<WsOutbound>,
runtime_session_id: &str, runtime_session_id: &str,
current_session_id: &mut String, current_session_id: &mut String,
current_topic_id: &mut Option<String>,
inbound: WsInbound, inbound: WsInbound,
) -> Result<(), AgentError> { ) -> Result<(), AgentError> {
match inbound { match inbound {
@ -204,7 +207,7 @@ async fn handle_inbound(
}; };
// 创建命令路由器 // 创建命令路由器
let cli_sessions = state.session_manager.cli_sessions(); let _cli_sessions = state.session_manager.cli_sessions();
let store = state.session_manager.store(); let store = state.session_manager.store();
let skills = state.session_manager.skills(); let skills = state.session_manager.skills();
let provider_config = state.config.get_provider_config("default") let provider_config = state.config.get_provider_config("default")
@ -221,16 +224,29 @@ async fn handle_inbound(
])); ]));
let mut router = CommandRouter::new(); let mut router = CommandRouter::new();
router.register(Box::new(SessionCommandHandler::new(cli_sessions.clone()))); // 注册 Session 处理器,添加 SessionManager
router.register(Box::new(SessionQueryCommandHandler::new(cli_sessions))); let session_handler = SessionCommandHandler::new(store.clone())
.with_session_manager(state.session_manager.clone());
router.register(Box::new(session_handler));
// 注册 SessionQuery 处理器
let session_query_handler = SessionQueryCommandHandler::new(store.clone())
.with_session_manager(state.session_manager.clone());
router.register(Box::new(session_query_handler));
router.register(Box::new(SaveSessionCommandHandler::new( router.register(Box::new(SaveSessionCommandHandler::new(
store, store,
system_prompt_provider, system_prompt_provider,
))); )));
// 构建命令上下文 // 构建命令上下文
tracing::debug!(
current_session_id = %current_session_id,
current_topic_id = ?current_topic_id,
"Building CommandContext for WebSocket command"
);
let cmd_ctx = CommandContext::new("websocket", "cli") let cmd_ctx = CommandContext::new("websocket", "cli")
.with_session_id(current_session_id.as_str()); .with_session_id(current_session_id.as_str())
.with_chat_id(current_session_id.as_str())
.with_topic_id(current_topic_id.as_deref().unwrap_or(""));
// 执行命令 // 执行命令
let response = router.dispatch_with_response(cmd, cmd_ctx).await; let response = router.dispatch_with_response(cmd, cmd_ctx).await;
@ -239,6 +255,11 @@ async fn handle_inbound(
if response.success { if response.success {
// 更新当前会话 ID如果是创建会话 // 更新当前会话 ID如果是创建会话
if let Some(session_id) = response.metadata.get("session_id") { if let Some(session_id) = response.metadata.get("session_id") {
tracing::info!(
old_session_id = %current_session_id,
new_session_id = %session_id,
"Updating current_session_id"
);
*current_session_id = session_id.clone(); *current_session_id = session_id.clone();
state state
.channel_manager .channel_manager
@ -250,6 +271,21 @@ async fn handle_inbound(
) )
.await; .await;
} }
// 更新当前话题 ID如果是创建话题或切换话题
if let Some(topic_id) = response.metadata.get("topic_id") {
tracing::info!(
old_topic_id = ?current_topic_id,
new_topic_id = %topic_id,
"Updating current_topic_id"
);
*current_topic_id = Some(topic_id.clone());
}
} else if let Some(ref error) = response.error {
tracing::warn!(
error_code = %error.code,
error_message = %error.message,
"Command failed"
);
} }
// 适配并发送响应 // 适配并发送响应

View File

@ -201,6 +201,7 @@ mod tests {
let message = OutboundMessage::tool_call( let message = OutboundMessage::tool_call(
"cli", "cli",
"session-1", "session-1",
None, // session_id
"call-1", "call-1",
"calculator", "calculator",
json!({"expression": "1 + 1"}), json!({"expression": "1 + 1"}),

View File

@ -409,6 +409,7 @@ impl Scheduler {
.publish_outbound(OutboundMessage::error_notification( .publish_outbound(OutboundMessage::error_notification(
channel, channel,
chat_id, chat_id,
None, // session_id
format!( format!(
"定时任务执行失败:{}\n{}", "定时任务执行失败:{}\n{}",
job.id, job.id,
@ -904,6 +905,7 @@ fn build_outbound_message(job: &RuntimeJob) -> anyhow::Result<OutboundMessage> {
Ok(OutboundMessage::scheduler_notification( Ok(OutboundMessage::scheduler_notification(
channel, channel,
chat_id, chat_id,
None, // session_id
content.to_string(), content.to_string(),
job.target.reply_to.clone(), job.target.reply_to.clone(),
metadata, metadata,

View File

@ -17,7 +17,7 @@ pub use ports::{
}; };
pub use records::{ pub use records::{
MemoryRecord, MemoryUpsert, SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, MemoryRecord, MemoryUpsert, SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus,
SchedulerJobUpsert, SessionRecord, SkillEventRecord, SchedulerJobUpsert, SessionRecord, SkillEventRecord, TopicRecord,
}; };
#[derive(Clone)] #[derive(Clone)]
@ -79,6 +79,7 @@ impl SessionStore {
CREATE TABLE IF NOT EXISTS messages ( CREATE TABLE IF NOT EXISTS messages (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
session_id TEXT NOT NULL, session_id TEXT NOT NULL,
topic_id TEXT,
seq INTEGER NOT NULL, seq INTEGER NOT NULL,
role TEXT NOT NULL, role TEXT NOT NULL,
content TEXT NOT NULL, content TEXT NOT NULL,
@ -90,6 +91,7 @@ impl SessionStore {
tool_calls_json TEXT, tool_calls_json TEXT,
created_at INTEGER NOT NULL, created_at INTEGER NOT NULL,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE, FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE,
FOREIGN KEY(topic_id) REFERENCES topics(id) ON DELETE SET NULL,
UNIQUE(session_id, seq) UNIQUE(session_id, seq)
); );
@ -98,6 +100,21 @@ impl SessionStore {
CREATE INDEX IF NOT EXISTS idx_messages_session_created CREATE INDEX IF NOT EXISTS idx_messages_session_created
ON messages(session_id, created_at); ON messages(session_id, created_at);
CREATE TABLE IF NOT EXISTS topics (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
title TEXT NOT NULL,
description TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL,
last_active_at INTEGER NOT NULL,
message_count INTEGER NOT NULL DEFAULT 0,
FOREIGN KEY(session_id) REFERENCES sessions(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_topics_session
ON topics(session_id, last_active_at DESC);
CREATE TABLE IF NOT EXISTS skill_events ( CREATE TABLE IF NOT EXISTS skill_events (
id TEXT PRIMARY KEY, id TEXT PRIMARY KEY,
session_id TEXT, session_id TEXT,
@ -211,6 +228,8 @@ impl SessionStore {
) -> Result<SessionRecord, StorageError> { ) -> Result<SessionRecord, StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let id = uuid::Uuid::new_v4().to_string(); let id = uuid::Uuid::new_v4().to_string();
// 统一使用 persistent_session_id 格式
let session_id = persistent_session_id(channel_name, &id);
let title = title let title = title
.map(str::trim) .map(str::trim)
.filter(|value| !value.is_empty()) .filter(|value| !value.is_empty())
@ -232,11 +251,11 @@ impl SessionStore {
reset_cutoff_seq, user_turn_count, agent_prompt_reinjection_count reset_cutoff_seq, user_turn_count, agent_prompt_reinjection_count
) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0, 0) ) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0, 0)
", ",
params![id, title, channel_name, id, now], params![&session_id, title, channel_name, id, now],
)?; )?;
drop(conn); drop(conn);
self.get_session(&id)? self.get_session(&session_id)?
.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into()) .ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
} }
@ -354,6 +373,103 @@ impl SessionStore {
Ok(()) Ok(())
} }
// ==================== Topic Methods ====================
pub fn create_topic(
&self,
session_id: &str,
title: &str,
description: Option<&str>,
) -> Result<TopicRecord, StorageError> {
let now = current_timestamp();
let id = format!("topic:{}", uuid::Uuid::new_v4());
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"INSERT INTO topics (id, session_id, title, description, created_at, updated_at, last_active_at, message_count) VALUES (?1, ?2, ?3, ?4, ?5, ?5, ?5, 0)",
params![&id, session_id, title, description.unwrap_or(""), now],
)?;
drop(conn);
self.get_topic(&id)?
.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
}
pub fn get_topic(&self, topic_id: &str) -> Result<Option<TopicRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"SELECT id, session_id, title, description, created_at, updated_at, last_active_at, message_count FROM topics WHERE id = ?1",
)?;
stmt.query_row(params![topic_id], |row| {
Ok(TopicRecord {
id: row.get(0)?,
session_id: row.get(1)?,
title: row.get(2)?,
description: row.get(3)?,
created_at: row.get(4)?,
updated_at: row.get(5)?,
last_active_at: row.get(6)?,
message_count: row.get(7)?,
})
})
.optional()
.map_err(StorageError::from)
}
pub fn list_topics(&self, session_id: &str) -> Result<Vec<TopicRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"SELECT id, session_id, title, description, created_at, updated_at, last_active_at, message_count FROM topics WHERE session_id = ?1 ORDER BY last_active_at DESC"
)?;
let rows = stmt.query_map(params![session_id], |row| {
Ok(TopicRecord {
id: row.get(0)?,
session_id: row.get(1)?,
title: row.get(2)?,
description: row.get(3)?,
created_at: row.get(4)?,
updated_at: row.get(5)?,
last_active_at: row.get(6)?,
message_count: row.get(7)?,
})
})?;
let mut topics = Vec::new();
for row in rows {
topics.push(row?);
}
Ok(topics)
}
pub fn update_topic_title(&self, topic_id: &str, title: &str) -> Result<(), StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"UPDATE topics SET title = ?2, updated_at = ?3 WHERE id = ?1",
params![topic_id, title.trim(), now],
)?;
Ok(())
}
pub fn delete_topic(&self, topic_id: &str) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
// Messages 的 topic_id 会被设为 NULLON DELETE SET NULL
conn.execute("DELETE FROM topics WHERE id = ?1", params![topic_id])?;
Ok(())
}
pub fn touch_topic(&self, topic_id: &str) -> Result<(), StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"UPDATE topics SET last_active_at = ?2 WHERE id = ?1",
params![topic_id, now],
)?;
Ok(())
}
pub fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> { pub fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> {
let now = current_timestamp(); let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.conn.lock().expect("session db mutex poisoned");
@ -410,6 +526,15 @@ impl SessionStore {
&self, &self,
session_id: &str, session_id: &str,
message: &ChatMessage, message: &ChatMessage,
) -> Result<(), StorageError> {
self.append_message_with_topic(session_id, None, message)
}
pub fn append_message_with_topic(
&self,
session_id: &str,
topic_id: Option<&str>,
message: &ChatMessage,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.conn.lock().expect("session db mutex poisoned");
let tx = conn.unchecked_transaction()?; let tx = conn.unchecked_transaction()?;
@ -429,13 +554,14 @@ impl SessionStore {
tx.execute( tx.execute(
" "
INSERT INTO messages ( INSERT INTO messages (
id, session_id, seq, role, content, id, session_id, topic_id, seq, role, content,
system_context, reasoning_content, media_refs_json, tool_call_id, tool_name, tool_calls_json, created_at system_context, reasoning_content, media_refs_json, tool_call_id, tool_name, tool_calls_json, created_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12) ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13)
", ",
params![ params![
message.id, message.id,
session_id, session_id,
topic_id,
seq, seq,
message.role, message.role,
message.content, message.content,
@ -1177,6 +1303,62 @@ impl SessionStore {
load_messages_after(&conn, session_id, cutoff_seq) load_messages_after(&conn, session_id, cutoff_seq)
} }
pub fn load_messages_for_topic(&self, topic_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT id, role, content, system_context, reasoning_content, media_refs_json, created_at, tool_call_id, tool_name, tool_calls_json
FROM messages
WHERE topic_id = ?1
ORDER BY seq ASC
",
)?;
let rows = stmt.query_map(params![topic_id], |row| {
let media_refs_json: String = row.get(5)?;
let media_refs: Vec<String> = serde_json::from_str(&media_refs_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
media_refs_json.len(),
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
let tool_calls_json: Option<String> = row.get(9)?;
let tool_calls = tool_calls_json
.as_deref()
.map(serde_json::from_str)
.transpose()
.map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
9,
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
Ok(ChatMessage {
id: row.get(0)?,
role: row.get(1)?,
content: row.get(2)?,
system_context: row.get(3)?,
reasoning_content: row.get(4)?,
media_refs,
timestamp: row.get(6)?,
tool_call_id: row.get(7)?,
tool_name: row.get(8)?,
tool_state: None,
tool_calls,
})
})?;
let mut messages = Vec::new();
for row in rows {
messages.push(row?);
}
Ok(messages)
}
pub fn load_all_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> { pub fn load_all_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.conn.lock().expect("session db mutex poisoned");
load_messages_after(&conn, session_id, 0) load_messages_after(&conn, session_id, 0)
@ -1349,6 +1531,18 @@ fn ensure_messages_schema(conn: &Connection) -> Result<(), StorageError> {
)?; )?;
} }
if !has_column(conn, "messages", "topic_id")? {
add_column_if_missing(conn, "ALTER TABLE messages ADD COLUMN topic_id TEXT")?;
// 添加外键约束SQLite 不支持 ALTER TABLE ADD FOREIGN KEY需要重建表
// 这里只添加列,外键约束由应用层保证
}
// 创建 topic_id 索引(如果不存在)
conn.execute(
"CREATE INDEX IF NOT EXISTS idx_messages_topic_seq ON messages(topic_id, seq) WHERE topic_id IS NOT NULL",
[],
)?;
Ok(()) Ok(())
} }

View File

@ -28,6 +28,18 @@ pub struct SessionRecord {
pub agent_prompt_reinjection_count: i64, pub agent_prompt_reinjection_count: i64,
} }
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TopicRecord {
pub id: String,
pub session_id: String,
pub title: String,
pub description: Option<String>,
pub created_at: i64,
pub updated_at: i64,
pub last_active_at: i64,
pub message_count: i64,
}
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MemoryRecord { pub struct MemoryRecord {
pub id: String, pub id: String,