Compare commits
4 Commits
ef80ae27ac
...
75281952d0
| Author | SHA1 | Date | |
|---|---|---|---|
| 75281952d0 | |||
| 72c888a41f | |||
| dfe0fad61e | |||
| 6a3a1b5edf |
128
ARCHITECTURE_REVIEW.md
Normal file
128
ARCHITECTURE_REVIEW.md
Normal file
@ -0,0 +1,128 @@
|
|||||||
|
# 架构审查报告
|
||||||
|
|
||||||
|
> 生成时间: 2026-04-26
|
||||||
|
> 更新时间: 2026-04-26
|
||||||
|
|
||||||
|
## 审查摘要
|
||||||
|
|
||||||
|
本报告识别了当前代码库中的架构不合理、冗余和无效代码的问题。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 问题清单
|
||||||
|
|
||||||
|
### 已修复
|
||||||
|
|
||||||
|
#### ✅ #1 OutboundDispatcher 重复维护 Channel 注册表
|
||||||
|
|
||||||
|
**修复方案**: `OutboundDispatcher` 现在从 `ChannelManager` 获取 channels,而不是自己维护一份注册表。
|
||||||
|
|
||||||
|
**修改文件**:
|
||||||
|
- `src/bus/dispatcher.rs` - 移除 `channels` 字段,改用 `ChannelManager`
|
||||||
|
- `src/channels/manager.rs` - 添加 `register_channel` 方法
|
||||||
|
- `src/gateway/mod.rs` - 简化 dispatcher 初始化
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### ✅ #2 CliChatChannel 持有独立的 SessionStore
|
||||||
|
|
||||||
|
**修复方案**: `CliChatChannel` 的 `SessionStore` 通过依赖注入从 `ChannelManager` 获取,而不是独立持有。
|
||||||
|
|
||||||
|
**修改文件**:
|
||||||
|
- `src/channels/cli_chat.rs` - 添加 `set_store()` 方法
|
||||||
|
- `src/channels/manager.rs` - 添加 `cli_chat_channel` 字段
|
||||||
|
- `src/gateway/mod.rs` - 重构 channel 初始化流程
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### ✅ #3 MessageBus 被创建两次引用
|
||||||
|
|
||||||
|
**修复方案**: 移除 `GatewayState.bus` 字段,直接使用 `channel_manager.bus()`。
|
||||||
|
|
||||||
|
**修改文件**:
|
||||||
|
- `src/gateway/mod.rs` - 移除冗余的 `bus` 字段
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### ✅ #4 GatewayState 同时持有 channel_manager 和 cli_chat_channel
|
||||||
|
|
||||||
|
**修复方案**: `cli_chat_channel` 只通过 `ChannelManager` 管理,`GatewayState` 不再单独持有。
|
||||||
|
|
||||||
|
**修改文件**:
|
||||||
|
- `src/gateway/mod.rs` - 移除 `cli_chat_channel` 字段,添加 `cli_chat_channel()` getter 方法
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 高优先级(待修复)
|
||||||
|
|
||||||
|
#### ❌ Session 每次重建都创建新的 LLM Provider
|
||||||
|
|
||||||
|
**文件**: `src/gateway/session.rs:349-361`
|
||||||
|
|
||||||
|
**问题**: 每当 session TTL 过期(默认4小时),就会销毁并重建 session,同时创建新的 LLM provider 连接。
|
||||||
|
|
||||||
|
**建议**: Provider 应该池化复用,不随 session 销毁而重建。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### ❌ CliChatChannel::send 广播给所有客户端
|
||||||
|
|
||||||
|
**文件**: `src/channels/cli_chat.rs:279-289`
|
||||||
|
|
||||||
|
**问题**: `OutboundMessage` 有 `chat_id` 字段用于路由,但实现广播给所有客户端,而不是只发给对应 chat_id 的客户端。
|
||||||
|
|
||||||
|
**建议**: 根据 `chat_id` 过滤客户端,只发送给对应的客户端。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 中优先级(待修复)
|
||||||
|
|
||||||
|
#### ❌ default_tools() 每次调用创建新 ToolRegistry
|
||||||
|
|
||||||
|
**文件**: `src/gateway/session.rs:212-227`
|
||||||
|
|
||||||
|
**建议**: 如果工具列表是只读的,直接 clone Arc;如果需要修改,需要澄清设计意图。
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
### 低优先级(待修复)
|
||||||
|
|
||||||
|
#### ❌ FeishuChannel::new 接收未使用的 provider_config
|
||||||
|
|
||||||
|
**文件**: `src/channels/feishu.rs:175-178`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### ❌ OutboundDispatcher::send_with_retry 永不执行的 unreachable
|
||||||
|
|
||||||
|
**文件**: `src/bus/dispatcher.rs:81`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### ❌ Channel trait 的 `is_running` 使用 std::sync::Mutex
|
||||||
|
|
||||||
|
**文件**: `src/channels/base.rs:38` vs `src/channels/cli_chat.rs:265-267`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### ❌ LoopDetector 硬编码在 AgentLoop 中
|
||||||
|
|
||||||
|
**文件**: `src/agent/agent_loop.rs:88-172`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
#### ❌ InboundMessage 和 OutboundMessage 结构重复
|
||||||
|
|
||||||
|
**文件**: `src/bus/message.rs`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## 问题统计
|
||||||
|
|
||||||
|
| 状态 | 优先级 | 数量 |
|
||||||
|
|------|--------|------|
|
||||||
|
| ✅ 已修复 | - | 4 |
|
||||||
|
| ❌ 待修复 | 高 | 2 |
|
||||||
|
| ❌ 待修复 | 中 | 1 |
|
||||||
|
| ❌ 待修复 | 低 | 5 |
|
||||||
|
| **总计** | - | **12** |
|
||||||
40
docs/plans/2026-04-26-client-refactor-design.md
Normal file
40
docs/plans/2026-04-26-client-refactor-design.md
Normal file
@ -0,0 +1,40 @@
|
|||||||
|
# 客户端代码整合设计
|
||||||
|
|
||||||
|
## 目标
|
||||||
|
|
||||||
|
将分散在 `src/cli/` 和 `src/client/` 的客户端代码整合到 `src/client/` 目录。
|
||||||
|
|
||||||
|
## 变更
|
||||||
|
|
||||||
|
### 目录结构
|
||||||
|
|
||||||
|
```
|
||||||
|
src/
|
||||||
|
├── client/ # 整合后的客户端模块
|
||||||
|
│ ├── mod.rs # 主程序入口 (run 函数)
|
||||||
|
│ ├── input.rs # InputHandler + InputCommand (从 cli/input.rs 合并)
|
||||||
|
│ └── channel.rs # CliChannel (从 cli/channel.rs 合并)
|
||||||
|
├── cli/ # 删除
|
||||||
|
└── protocol.rs # 保留
|
||||||
|
```
|
||||||
|
|
||||||
|
### 关键变更
|
||||||
|
|
||||||
|
| 变更 | 说明 |
|
||||||
|
|------|------|
|
||||||
|
| `InputEvent::Message(String)` | 简化为只携带文本内容,不再使用 `ChatMessage` |
|
||||||
|
| `cli` 模块删除 | 代码合并到 `client` |
|
||||||
|
| 解耦 | `client` 不再依赖 `bus::ChatMessage` |
|
||||||
|
|
||||||
|
## 实施步骤
|
||||||
|
|
||||||
|
1. 创建 `src/client/input.rs` - 从 `cli/input.rs` 合并,修改 `InputEvent::Message` 为 `String`
|
||||||
|
2. 创建 `src/client/channel.rs` - 从 `cli/channel.rs` 直接复制
|
||||||
|
3. 更新 `src/client/mod.rs` - 更新 import
|
||||||
|
4. 更新 `src/lib.rs` - 删除 `pub mod cli;`
|
||||||
|
5. 删除 `src/cli/` 目录
|
||||||
|
|
||||||
|
## 验证
|
||||||
|
|
||||||
|
- `cargo build` 通过
|
||||||
|
- 功能保持不变
|
||||||
@ -1,30 +1,24 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use crate::bus::{MessageBus, OutboundMessage};
|
use crate::bus::{MessageBus, OutboundMessage};
|
||||||
use crate::channels::base::{Channel, ChannelError};
|
use crate::channels::base::{Channel, ChannelError};
|
||||||
|
use crate::channels::ChannelManager;
|
||||||
|
|
||||||
/// OutboundDispatcher consumes outbound messages from the MessageBus
|
/// OutboundDispatcher consumes outbound messages from the MessageBus
|
||||||
/// and dispatches them to the appropriate Channel
|
/// and dispatches them to the appropriate Channel
|
||||||
pub struct OutboundDispatcher {
|
pub struct OutboundDispatcher {
|
||||||
bus: Arc<MessageBus>,
|
bus: Arc<MessageBus>,
|
||||||
channels: Arc<RwLock<HashMap<String, Arc<dyn Channel + Send + Sync>>>>,
|
channel_manager: ChannelManager,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl OutboundDispatcher {
|
impl OutboundDispatcher {
|
||||||
pub fn new(bus: Arc<MessageBus>) -> Self {
|
pub fn new(bus: Arc<MessageBus>, channel_manager: ChannelManager) -> Self {
|
||||||
Self {
|
Self {
|
||||||
bus,
|
bus,
|
||||||
channels: Arc::new(RwLock::new(HashMap::new())),
|
channel_manager,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Register a channel with the dispatcher
|
|
||||||
pub async fn register_channel(&self, name: &str, channel: Arc<dyn Channel + Send + Sync>) {
|
|
||||||
self.channels.write().await.insert(name.to_string(), channel);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Run the dispatcher loop - consumes from bus and dispatches to channels
|
/// Run the dispatcher loop - consumes from bus and dispatches to channels
|
||||||
pub async fn run(&self) {
|
pub async fn run(&self) {
|
||||||
tracing::info!("OutboundDispatcher started");
|
tracing::info!("OutboundDispatcher started");
|
||||||
@ -40,7 +34,7 @@ impl OutboundDispatcher {
|
|||||||
);
|
);
|
||||||
|
|
||||||
let channel_name = msg.channel.clone();
|
let channel_name = msg.channel.clone();
|
||||||
let channel = self.channels.read().await.get(&channel_name).cloned();
|
let channel = self.channel_manager.get_channel(&channel_name).await;
|
||||||
|
|
||||||
match channel {
|
match channel {
|
||||||
Some(ch) => {
|
Some(ch) => {
|
||||||
@ -61,9 +55,9 @@ impl OutboundDispatcher {
|
|||||||
channel: &dyn Channel,
|
channel: &dyn Channel,
|
||||||
msg: OutboundMessage,
|
msg: OutboundMessage,
|
||||||
) -> Result<(), ChannelError> {
|
) -> Result<(), ChannelError> {
|
||||||
const DELAYS: [u64; 3] = [1, 2, 4];
|
const DELAYS: &[u64] = &[1, 2, 4];
|
||||||
|
|
||||||
for (i, delay) in DELAYS.iter().enumerate() {
|
for (i, &delay) in DELAYS.iter().enumerate() {
|
||||||
match channel.send(msg.clone()).await {
|
match channel.send(msg.clone()).await {
|
||||||
Ok(()) => return Ok(()),
|
Ok(()) => return Ok(()),
|
||||||
Err(e) if i < DELAYS.len() - 1 => {
|
Err(e) if i < DELAYS.len() - 1 => {
|
||||||
@ -73,11 +67,12 @@ impl OutboundDispatcher {
|
|||||||
error = %e,
|
error = %e,
|
||||||
"Send failed, retrying"
|
"Send failed, retrying"
|
||||||
);
|
);
|
||||||
tokio::time::sleep(tokio::time::Duration::from_secs(*delay)).await;
|
tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await;
|
||||||
}
|
}
|
||||||
Err(e) => return Err(e),
|
Err(e) => return Err(e),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
unreachable!()
|
// All retries exhausted - should not reach here as last iteration returns
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -199,11 +199,59 @@ impl OutboundMessage {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// ControlInbound - Session management operations (CLI channel only)
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/// Session management operations that flow through the control channel
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum ControlInbound {
|
||||||
|
CreateSession { title: Option<String> },
|
||||||
|
ListSessions { include_archived: bool },
|
||||||
|
LoadSession { session_id: String },
|
||||||
|
RenameSession { session_id: String, title: String },
|
||||||
|
ArchiveSession { session_id: String },
|
||||||
|
DeleteSession { session_id: String },
|
||||||
|
ClearHistory { session_id: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// ControlOutbound - Responses for control operations
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
/// Responses for session management operations
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub enum ControlOutbound {
|
||||||
|
SessionCreated { session_id: String, title: String },
|
||||||
|
SessionList { sessions: Vec<crate::protocol::SessionSummary> },
|
||||||
|
SessionLoaded { session_id: String, title: String, message_count: i64 },
|
||||||
|
SessionRenamed { session_id: String, title: String },
|
||||||
|
SessionArchived { session_id: String },
|
||||||
|
SessionDeleted { session_id: String },
|
||||||
|
HistoryCleared { session_id: String },
|
||||||
|
Pong,
|
||||||
|
Error { code: String, message: String },
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// ControlMessage - Message for control channel (session management)
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
use crate::channels::base::ChannelError;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
/// Control message containing a session operation and reply channel
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct ControlMessage {
|
||||||
|
pub op: ControlInbound,
|
||||||
|
pub reply_tx: mpsc::Sender<Result<ControlOutbound, ChannelError>>,
|
||||||
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
// Helpers
|
// Helpers
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|
||||||
fn current_timestamp() -> i64 {
|
pub(crate) fn current_timestamp() -> i64 {
|
||||||
std::time::SystemTime::now()
|
std::time::SystemTime::now()
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
.unwrap()
|
.unwrap()
|
||||||
|
|||||||
@ -2,7 +2,7 @@ pub mod dispatcher;
|
|||||||
pub mod message;
|
pub mod message;
|
||||||
|
|
||||||
pub use dispatcher::OutboundDispatcher;
|
pub use dispatcher::OutboundDispatcher;
|
||||||
pub use message::{ChatMessage, ContentBlock, InboundMessage, MediaItem, OutboundMessage};
|
pub use message::{ChatMessage, ContentBlock, ControlInbound, ControlMessage, ControlOutbound, InboundMessage, MediaItem, OutboundMessage};
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::{mpsc, Mutex};
|
use tokio::sync::{mpsc, Mutex};
|
||||||
@ -16,6 +16,9 @@ pub struct MessageBus {
|
|||||||
outbound_tx: mpsc::Sender<OutboundMessage>,
|
outbound_tx: mpsc::Sender<OutboundMessage>,
|
||||||
inbound_rx: Mutex<mpsc::Receiver<InboundMessage>>,
|
inbound_rx: Mutex<mpsc::Receiver<InboundMessage>>,
|
||||||
outbound_rx: Mutex<mpsc::Receiver<OutboundMessage>>,
|
outbound_rx: Mutex<mpsc::Receiver<OutboundMessage>>,
|
||||||
|
// Control channel for session management operations
|
||||||
|
control_tx: mpsc::Sender<ControlMessage>,
|
||||||
|
control_rx: Mutex<mpsc::Receiver<ControlMessage>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MessageBus {
|
impl MessageBus {
|
||||||
@ -23,11 +26,14 @@ impl MessageBus {
|
|||||||
pub fn new(capacity: usize) -> Arc<Self> {
|
pub fn new(capacity: usize) -> Arc<Self> {
|
||||||
let (inbound_tx, inbound_rx) = mpsc::channel(capacity);
|
let (inbound_tx, inbound_rx) = mpsc::channel(capacity);
|
||||||
let (outbound_tx, outbound_rx) = mpsc::channel(capacity);
|
let (outbound_tx, outbound_rx) = mpsc::channel(capacity);
|
||||||
|
let (control_tx, control_rx) = mpsc::channel(capacity);
|
||||||
Arc::new(Self {
|
Arc::new(Self {
|
||||||
inbound_tx,
|
inbound_tx,
|
||||||
outbound_tx,
|
outbound_tx,
|
||||||
inbound_rx: Mutex::new(inbound_rx),
|
inbound_rx: Mutex::new(inbound_rx),
|
||||||
outbound_rx: Mutex::new(outbound_rx),
|
outbound_rx: Mutex::new(outbound_rx),
|
||||||
|
control_tx,
|
||||||
|
control_rx: Mutex::new(control_rx),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,6 +79,25 @@ impl MessageBus {
|
|||||||
.await
|
.await
|
||||||
.expect("bus outbound closed")
|
.expect("bus outbound closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Publish a control message (Channel -> Bus for session management)
|
||||||
|
pub async fn publish_control(&self, msg: ControlMessage) -> Result<(), BusError> {
|
||||||
|
tracing::debug!(op = ?msg.op, "Bus: publishing control message");
|
||||||
|
self.control_tx
|
||||||
|
.send(msg)
|
||||||
|
.await
|
||||||
|
.map_err(|_| BusError::Closed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Consume a control message (ControlProcessor -> Bus)
|
||||||
|
pub async fn consume_control(&self) -> ControlMessage {
|
||||||
|
self.control_rx
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.recv()
|
||||||
|
.await
|
||||||
|
.expect("bus control closed")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ============================================================================
|
// ============================================================================
|
||||||
|
|||||||
399
src/channels/cli_chat.rs
Normal file
399
src/channels/cli_chat.rs
Normal file
@ -0,0 +1,399 @@
|
|||||||
|
use std::sync::Arc;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use tokio::sync::{mpsc, Mutex};
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
|
use crate::bus::{ControlInbound, ControlMessage, ControlOutbound, InboundMessage, MessageBus, OutboundMessage};
|
||||||
|
use crate::protocol::{parse_inbound, WsInbound, WsOutbound};
|
||||||
|
|
||||||
|
use super::base::{Channel, ChannelError};
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// Client - Connected CLI client
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
pub(crate) struct Client {
|
||||||
|
sender: mpsc::Sender<WsOutbound>,
|
||||||
|
current_session_id: Mutex<Option<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
// ============================================================================
|
||||||
|
// CliChatChannel - Channel implementation for CLI chat
|
||||||
|
// ============================================================================
|
||||||
|
|
||||||
|
pub struct CliChatChannel {
|
||||||
|
bus: std::sync::Mutex<Option<Arc<MessageBus>>>,
|
||||||
|
clients: Mutex<Vec<Arc<Client>>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CliChatChannel {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
bus: std::sync::Mutex::new(None),
|
||||||
|
clients: Mutex::new(Vec::new()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Register a new client connection, returns (session_id, client)
|
||||||
|
pub(crate) async fn register_client(&self, sender: mpsc::Sender<WsOutbound>) -> (String, Arc<Client>) {
|
||||||
|
let client = Arc::new(Client {
|
||||||
|
sender,
|
||||||
|
current_session_id: Mutex::new(None),
|
||||||
|
});
|
||||||
|
self.clients.lock().await.push(client.clone());
|
||||||
|
|
||||||
|
// Create initial session via control message
|
||||||
|
let session_id = match self.create_session_via_control(None).await {
|
||||||
|
Ok(id) => id,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, "Failed to create initial session");
|
||||||
|
Uuid::new_v4().to_string()
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Set current session id in client
|
||||||
|
{
|
||||||
|
let mut current = client.current_session_id.lock().await;
|
||||||
|
*current = Some(session_id.clone());
|
||||||
|
}
|
||||||
|
|
||||||
|
(session_id, client)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle an inbound message from a client
|
||||||
|
pub(crate) async fn handle_inbound(&self, client: Arc<Client>, raw_msg: &str) {
|
||||||
|
match parse_inbound(raw_msg) {
|
||||||
|
Ok(inbound) => {
|
||||||
|
match self.handle_ws_inbound(client.clone(), inbound).await {
|
||||||
|
Ok(()) => {}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "Failed to handle inbound message");
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::Error {
|
||||||
|
code: "INTERNAL_ERROR".to_string(),
|
||||||
|
message: e.to_string(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "Failed to parse inbound message");
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::Error {
|
||||||
|
code: "PARSE_ERROR".to_string(),
|
||||||
|
message: e.to_string(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn handle_ws_inbound(&self, client: Arc<Client>, inbound: WsInbound) -> Result<(), ChannelError> {
|
||||||
|
let bus = {
|
||||||
|
let guard = self.bus.lock().unwrap();
|
||||||
|
guard.clone().ok_or_else(|| ChannelError::Other("Channel not started".to_string()))?
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut current_session_guard = client.current_session_id.lock().await;
|
||||||
|
|
||||||
|
match inbound {
|
||||||
|
WsInbound::UserInput { content, chat_id, .. } => {
|
||||||
|
let chat_id = chat_id.or(current_session_guard.clone()).unwrap_or_else(|| Uuid::new_v4().to_string());
|
||||||
|
|
||||||
|
// If no session, create one first
|
||||||
|
if current_session_guard.is_none() {
|
||||||
|
let new_id = self.create_session_via_control(None).await?;
|
||||||
|
*current_session_guard = Some(new_id);
|
||||||
|
}
|
||||||
|
|
||||||
|
let session_id = current_session_guard.clone().unwrap();
|
||||||
|
|
||||||
|
// Publish to bus for AI processing
|
||||||
|
let msg = InboundMessage {
|
||||||
|
channel: self.name().to_string(),
|
||||||
|
sender_id: "cli".to_string(),
|
||||||
|
chat_id: session_id.clone(),
|
||||||
|
content,
|
||||||
|
timestamp: crate::bus::message::current_timestamp(),
|
||||||
|
media: Vec::new(),
|
||||||
|
metadata: Default::default(),
|
||||||
|
forwarded_metadata: Default::default(),
|
||||||
|
};
|
||||||
|
bus.publish_inbound(msg).await?;
|
||||||
|
}
|
||||||
|
WsInbound::ClearHistory { chat_id, session_id } => {
|
||||||
|
let target = session_id
|
||||||
|
.or(chat_id)
|
||||||
|
.or(current_session_guard.clone())
|
||||||
|
.ok_or_else(|| ChannelError::Other("No active session".to_string()))?;
|
||||||
|
|
||||||
|
let (reply_tx, mut reply_rx) = mpsc::channel(1);
|
||||||
|
bus.publish_control(ControlMessage {
|
||||||
|
op: ControlInbound::ClearHistory { session_id: target.clone() },
|
||||||
|
reply_tx,
|
||||||
|
}).await?;
|
||||||
|
|
||||||
|
match reply_rx.recv().await {
|
||||||
|
Some(Ok(ControlOutbound::HistoryCleared { .. })) => {
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::HistoryCleared { session_id: target })
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Some(Ok(_)) => {
|
||||||
|
// Unexpected response type, ignore
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Err(ChannelError::Other("Control channel closed".to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
WsInbound::CreateSession { title } => {
|
||||||
|
let new_id = self.create_session_via_control(title.as_deref()).await?;
|
||||||
|
*current_session_guard = Some(new_id.clone());
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::SessionCreated {
|
||||||
|
session_id: new_id,
|
||||||
|
title: title.unwrap_or_default(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
WsInbound::ListSessions { include_archived } => {
|
||||||
|
let (reply_tx, mut reply_rx) = mpsc::channel(1);
|
||||||
|
bus.publish_control(ControlMessage {
|
||||||
|
op: ControlInbound::ListSessions { include_archived },
|
||||||
|
reply_tx,
|
||||||
|
}).await?;
|
||||||
|
|
||||||
|
match reply_rx.recv().await {
|
||||||
|
Some(Ok(ControlOutbound::SessionList { sessions })) => {
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::SessionList {
|
||||||
|
sessions,
|
||||||
|
current_session_id: current_session_guard.clone(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Some(Ok(_)) => {
|
||||||
|
// Unexpected response type
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Err(ChannelError::Other("Control channel closed".to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
WsInbound::LoadSession { session_id } => {
|
||||||
|
let (reply_tx, mut reply_rx) = mpsc::channel(1);
|
||||||
|
bus.publish_control(ControlMessage {
|
||||||
|
op: ControlInbound::LoadSession { session_id: session_id.clone() },
|
||||||
|
reply_tx,
|
||||||
|
}).await?;
|
||||||
|
|
||||||
|
match reply_rx.recv().await {
|
||||||
|
Some(Ok(ControlOutbound::SessionLoaded { session_id, title, message_count })) => {
|
||||||
|
*current_session_guard = Some(session_id.clone());
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::SessionLoaded {
|
||||||
|
session_id,
|
||||||
|
title,
|
||||||
|
message_count,
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Some(Ok(_)) => {
|
||||||
|
// Unexpected response type
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::Error {
|
||||||
|
code: "SESSION_NOT_FOUND".to_string(),
|
||||||
|
message: format!("Session not found: {}", session_id),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Err(ChannelError::Other("Control channel closed".to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
WsInbound::RenameSession { session_id, title } => {
|
||||||
|
let target = session_id.or(current_session_guard.clone()).ok_or_else(|| {
|
||||||
|
ChannelError::Other("No active session".to_string())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let (reply_tx, mut reply_rx) = mpsc::channel(1);
|
||||||
|
bus.publish_control(ControlMessage {
|
||||||
|
op: ControlInbound::RenameSession { session_id: target.clone(), title: title.clone() },
|
||||||
|
reply_tx,
|
||||||
|
}).await?;
|
||||||
|
|
||||||
|
match reply_rx.recv().await {
|
||||||
|
Some(Ok(ControlOutbound::SessionRenamed { session_id, title })) => {
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::SessionRenamed { session_id, title })
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Some(Ok(_)) => {
|
||||||
|
// Unexpected response type
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Err(ChannelError::Other("Control channel closed".to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
WsInbound::ArchiveSession { session_id } => {
|
||||||
|
let target = session_id.or(current_session_guard.clone()).ok_or_else(|| {
|
||||||
|
ChannelError::Other("No active session".to_string())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let (reply_tx, mut reply_rx) = mpsc::channel(1);
|
||||||
|
bus.publish_control(ControlMessage {
|
||||||
|
op: ControlInbound::ArchiveSession { session_id: target.clone() },
|
||||||
|
reply_tx,
|
||||||
|
}).await?;
|
||||||
|
|
||||||
|
match reply_rx.recv().await {
|
||||||
|
Some(Ok(ControlOutbound::SessionArchived { session_id })) => {
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::SessionArchived { session_id })
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Some(Ok(_)) => {
|
||||||
|
// Unexpected response type
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Err(ChannelError::Other("Control channel closed".to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
WsInbound::DeleteSession { session_id } => {
|
||||||
|
let target = session_id.or(current_session_guard.clone()).ok_or_else(|| {
|
||||||
|
ChannelError::Other("No active session".to_string())
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let (reply_tx, mut reply_rx) = mpsc::channel(1);
|
||||||
|
bus.publish_control(ControlMessage {
|
||||||
|
op: ControlInbound::DeleteSession { session_id: target.clone() },
|
||||||
|
reply_tx,
|
||||||
|
}).await?;
|
||||||
|
|
||||||
|
match reply_rx.recv().await {
|
||||||
|
Some(Ok(ControlOutbound::SessionDeleted { session_id })) => {
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::SessionDeleted { session_id: session_id.clone() })
|
||||||
|
.await;
|
||||||
|
|
||||||
|
// If deleting current session, create a new one
|
||||||
|
if current_session_guard.as_deref() == Some(&target) {
|
||||||
|
drop(reply_rx);
|
||||||
|
if let Ok(new_id) = self.create_session_via_control(None).await {
|
||||||
|
*current_session_guard = Some(new_id.clone());
|
||||||
|
let _ = client
|
||||||
|
.sender
|
||||||
|
.send(WsOutbound::SessionCreated {
|
||||||
|
session_id: new_id,
|
||||||
|
title: String::new(),
|
||||||
|
})
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(Ok(_)) => {
|
||||||
|
// Unexpected response type
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
return Err(e);
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Err(ChannelError::Other("Control channel closed".to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
WsInbound::Ping => {
|
||||||
|
let _ = client.sender.send(WsOutbound::Pong).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a session via control message and return the session_id
|
||||||
|
async fn create_session_via_control(&self, title: Option<&str>) -> Result<String, ChannelError> {
|
||||||
|
let bus = {
|
||||||
|
let guard = self.bus.lock().unwrap();
|
||||||
|
guard.clone().ok_or_else(|| ChannelError::Other("Channel not started".to_string()))?
|
||||||
|
};
|
||||||
|
|
||||||
|
let (reply_tx, mut reply_rx) = mpsc::channel(1);
|
||||||
|
bus.publish_control(ControlMessage {
|
||||||
|
op: ControlInbound::CreateSession { title: title.map(String::from) },
|
||||||
|
reply_tx,
|
||||||
|
}).await?;
|
||||||
|
|
||||||
|
match reply_rx.recv().await {
|
||||||
|
Some(Ok(ControlOutbound::SessionCreated { session_id, .. })) => {
|
||||||
|
Ok(session_id)
|
||||||
|
}
|
||||||
|
Some(Ok(_)) => {
|
||||||
|
Err(ChannelError::Other("Unexpected response type".to_string()))
|
||||||
|
}
|
||||||
|
Some(Err(e)) => Err(e),
|
||||||
|
None => Err(ChannelError::Other("Control channel closed".to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Channel for CliChatChannel {
|
||||||
|
fn name(&self) -> &str {
|
||||||
|
"cli_chat"
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_running(&self) -> bool {
|
||||||
|
self.bus.lock().unwrap().is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start(&self, bus: Arc<MessageBus>) -> Result<(), ChannelError> {
|
||||||
|
*self.bus.lock().unwrap() = Some(bus);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stop(&self) -> Result<(), ChannelError> {
|
||||||
|
*self.bus.lock().unwrap() = None;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError> {
|
||||||
|
let clients = self.clients.lock().await.clone();
|
||||||
|
for client in clients {
|
||||||
|
let outbound = WsOutbound::AssistantResponse {
|
||||||
|
id: Uuid::new_v4().to_string(),
|
||||||
|
content: msg.content.clone(),
|
||||||
|
role: "assistant".to_string(),
|
||||||
|
};
|
||||||
|
let _ = client.sender.send(outbound).await;
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
@ -1107,10 +1107,7 @@ impl FeishuChannel {
|
|||||||
sender_id: parsed.open_id.clone(),
|
sender_id: parsed.open_id.clone(),
|
||||||
chat_id: parsed.chat_id.clone(),
|
chat_id: parsed.chat_id.clone(),
|
||||||
content: parsed.content.clone(),
|
content: parsed.content.clone(),
|
||||||
timestamp: std::time::SystemTime::now()
|
timestamp: crate::bus::message::current_timestamp(),
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.unwrap()
|
|
||||||
.as_millis() as i64,
|
|
||||||
media: parsed.media.map(|m| vec![m]).unwrap_or_default(),
|
media: parsed.media.map(|m| vec![m]).unwrap_or_default(),
|
||||||
metadata: std::collections::HashMap::new(),
|
metadata: std::collections::HashMap::new(),
|
||||||
forwarded_metadata,
|
forwarded_metadata,
|
||||||
|
|||||||
@ -11,13 +11,15 @@ use crate::config::Config;
|
|||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct ChannelManager {
|
pub struct ChannelManager {
|
||||||
channels: Arc<RwLock<HashMap<String, Arc<dyn Channel + Send + Sync>>>>,
|
channels: Arc<RwLock<HashMap<String, Arc<dyn Channel + Send + Sync>>>>,
|
||||||
|
cli_chat_channel: Arc<crate::channels::CliChatChannel>,
|
||||||
bus: Arc<MessageBus>,
|
bus: Arc<MessageBus>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ChannelManager {
|
impl ChannelManager {
|
||||||
pub fn new() -> Self {
|
pub fn new(cli_chat_channel: Arc<crate::channels::CliChatChannel>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
channels: Arc::new(RwLock::new(HashMap::new())),
|
channels: Arc::new(RwLock::new(HashMap::new())),
|
||||||
|
cli_chat_channel,
|
||||||
bus: MessageBus::new(100),
|
bus: MessageBus::new(100),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -27,8 +29,22 @@ impl ChannelManager {
|
|||||||
self.bus.clone()
|
self.bus.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Register a channel with the manager
|
||||||
|
pub async fn register_channel(&self, name: &str, channel: Arc<dyn Channel + Send + Sync>) {
|
||||||
|
self.channels.write().await.insert(name.to_string(), channel);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get CLI chat channel
|
||||||
|
pub fn cli_chat_channel(&self) -> Arc<crate::channels::CliChatChannel> {
|
||||||
|
self.cli_chat_channel.clone()
|
||||||
|
}
|
||||||
|
|
||||||
/// Initialize all Channel instances from config
|
/// Initialize all Channel instances from config
|
||||||
pub async fn init(&self, config: &Config, _provider_config: crate::config::LLMProviderConfig) -> Result<(), ChannelError> {
|
pub async fn init(
|
||||||
|
&self,
|
||||||
|
config: &Config,
|
||||||
|
_provider_config: crate::config::LLMProviderConfig,
|
||||||
|
) -> Result<(), ChannelError> {
|
||||||
// Initialize Feishu channel if enabled
|
// Initialize Feishu channel if enabled
|
||||||
if let Some(feishu_config) = config.channels.get("feishu") {
|
if let Some(feishu_config) = config.channels.get("feishu") {
|
||||||
if feishu_config.enabled {
|
if feishu_config.enabled {
|
||||||
@ -44,6 +60,14 @@ impl ChannelManager {
|
|||||||
tracing::info!("Feishu channel disabled in config");
|
tracing::info!("Feishu channel disabled in config");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Register CLI chat channel
|
||||||
|
self.channels
|
||||||
|
.write()
|
||||||
|
.await
|
||||||
|
.insert("cli_chat".to_string(), self.cli_chat_channel.clone());
|
||||||
|
tracing::info!("CLI chat channel registered");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,7 +1,9 @@
|
|||||||
pub mod base;
|
pub mod base;
|
||||||
pub mod feishu;
|
pub mod feishu;
|
||||||
|
pub mod cli_chat;
|
||||||
pub mod manager;
|
pub mod manager;
|
||||||
|
|
||||||
pub use base::{Channel, ChannelError};
|
pub use base::{Channel, ChannelError};
|
||||||
pub use manager::ChannelManager;
|
pub use manager::ChannelManager;
|
||||||
pub use feishu::FeishuChannel;
|
pub use feishu::FeishuChannel;
|
||||||
|
pub use cli_chat::CliChatChannel;
|
||||||
|
|||||||
@ -1,5 +0,0 @@
|
|||||||
pub mod channel;
|
|
||||||
pub mod input;
|
|
||||||
|
|
||||||
pub use channel::CliChannel;
|
|
||||||
pub use input::{InputCommand, InputEvent, InputHandler};
|
|
||||||
@ -1,9 +1,7 @@
|
|||||||
use crate::bus::ChatMessage;
|
|
||||||
|
|
||||||
use super::channel::CliChannel;
|
use super::channel::CliChannel;
|
||||||
|
|
||||||
pub enum InputEvent {
|
pub enum InputEvent {
|
||||||
Message(ChatMessage),
|
Message(String),
|
||||||
Command(InputCommand),
|
Command(InputCommand),
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -41,7 +39,7 @@ impl InputHandler {
|
|||||||
return Ok(Some(InputEvent::Command(cmd)));
|
return Ok(Some(InputEvent::Command(cmd)));
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(Some(InputEvent::Message(ChatMessage::user(line))))
|
Ok(Some(InputEvent::Message(line)))
|
||||||
}
|
}
|
||||||
Ok(None) => Ok(None),
|
Ok(None) => Ok(None),
|
||||||
Err(e) => Err(InputError::IoError(e)),
|
Err(e) => Err(InputError::IoError(e)),
|
||||||
@ -1,9 +1,12 @@
|
|||||||
pub use crate::protocol::{WsInbound, WsOutbound, serialize_inbound, serialize_outbound};
|
pub use crate::protocol::{WsInbound, WsOutbound, serialize_inbound, serialize_outbound};
|
||||||
|
|
||||||
|
mod channel;
|
||||||
|
mod input;
|
||||||
|
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
use tokio_tungstenite::{connect_async, tungstenite::Message};
|
||||||
|
|
||||||
use crate::cli::{InputCommand, InputEvent, InputHandler};
|
use input::{InputCommand, InputEvent, InputHandler};
|
||||||
|
|
||||||
fn format_session_list(sessions: &[crate::protocol::SessionSummary], current_session_id: Option<&str>) -> String {
|
fn format_session_list(sessions: &[crate::protocol::SessionSummary], current_session_id: Option<&str>) -> String {
|
||||||
if sessions.is_empty() {
|
if sessions.is_empty() {
|
||||||
@ -181,9 +184,9 @@ pub async fn run(gateway_url: &str) -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
InputEvent::Message(msg) => {
|
InputEvent::Message(content) => {
|
||||||
let inbound = WsInbound::UserInput {
|
let inbound = WsInbound::UserInput {
|
||||||
content: msg.content,
|
content,
|
||||||
channel: None,
|
channel: None,
|
||||||
chat_id: current_session_id.clone(),
|
chat_id: current_session_id.clone(),
|
||||||
sender_id: None,
|
sender_id: None,
|
||||||
|
|||||||
@ -6,17 +6,18 @@ use std::sync::Arc;
|
|||||||
use axum::{routing, Router};
|
use axum::{routing, Router};
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
|
|
||||||
use crate::bus::{MessageBus, OutboundDispatcher};
|
use crate::bus::{ControlInbound, ControlMessage, ControlOutbound, OutboundDispatcher};
|
||||||
use crate::channels::ChannelManager;
|
use crate::channels::{ChannelManager, CliChatChannel};
|
||||||
|
use crate::channels::base::{Channel, ChannelError};
|
||||||
use crate::config::Config;
|
use crate::config::Config;
|
||||||
use crate::logging;
|
use crate::logging;
|
||||||
|
use crate::protocol::SessionSummary;
|
||||||
use session::SessionManager;
|
use session::SessionManager;
|
||||||
|
|
||||||
pub struct GatewayState {
|
pub struct GatewayState {
|
||||||
pub config: Config,
|
pub config: Config,
|
||||||
pub session_manager: SessionManager,
|
pub session_manager: SessionManager,
|
||||||
pub channel_manager: ChannelManager,
|
pub channel_manager: ChannelManager,
|
||||||
pub bus: Arc<MessageBus>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GatewayState {
|
impl GatewayState {
|
||||||
@ -29,30 +30,50 @@ impl GatewayState {
|
|||||||
// Session TTL from config (default 4 hours)
|
// Session TTL from config (default 4 hours)
|
||||||
let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4);
|
let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4);
|
||||||
|
|
||||||
let session_manager = SessionManager::new(session_ttl_hours, provider_config)?;
|
let session_manager = SessionManager::new(session_ttl_hours, provider_config.clone())?;
|
||||||
let channel_manager = ChannelManager::new();
|
|
||||||
let bus = channel_manager.bus();
|
// Create CLI Chat Channel first (needed for ChannelManager)
|
||||||
|
let cli_chat_channel = Arc::new(CliChatChannel::new());
|
||||||
|
let channel_manager = ChannelManager::new(cli_chat_channel);
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
config,
|
config,
|
||||||
session_manager,
|
session_manager,
|
||||||
channel_manager,
|
channel_manager,
|
||||||
bus,
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the MessageBus
|
||||||
|
pub fn bus(&self) -> Arc<crate::bus::MessageBus> {
|
||||||
|
self.channel_manager.bus()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get CLI chat channel for WebSocket handling
|
||||||
|
pub fn cli_chat_channel(&self) -> Arc<CliChatChannel> {
|
||||||
|
self.channel_manager.cli_chat_channel()
|
||||||
|
}
|
||||||
|
|
||||||
/// Start the message processing loops
|
/// Start the message processing loops
|
||||||
pub async fn start_message_processing(&self) {
|
pub async fn start_message_processing(&self) {
|
||||||
let bus_for_inbound = self.bus.clone();
|
let bus = self.bus();
|
||||||
let bus_for_outbound = self.bus.clone();
|
let bus_for_outbound = bus.clone();
|
||||||
let session_manager = self.session_manager.clone();
|
let session_manager = self.session_manager.clone();
|
||||||
|
|
||||||
// Spawn inbound message processor
|
// Start CLI Chat Channel (it's already registered in ChannelManager)
|
||||||
// This consumes from bus.inbound, processes via SessionManager, publishes to bus.outbound
|
let cli_chat_channel = self.cli_chat_channel();
|
||||||
|
if let Err(e) = cli_chat_channel.start(bus.clone()).await {
|
||||||
|
tracing::error!(error = %e, "Failed to start CLI chat channel");
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawn unified message processor
|
||||||
|
// This handles both inbound AI messages and control messages in one loop
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::info!("Inbound processor started");
|
tracing::info!("Message processor started");
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let inbound = bus_for_inbound.consume_inbound().await;
|
tokio::select! {
|
||||||
|
// Inbound: AI message flow
|
||||||
|
inbound = bus.consume_inbound() => {
|
||||||
#[cfg(debug_assertions)]
|
#[cfg(debug_assertions)]
|
||||||
{
|
{
|
||||||
tracing::debug!(
|
tracing::debug!(
|
||||||
@ -70,7 +91,6 @@ impl GatewayState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process via session manager
|
|
||||||
match session_manager.handle_message(
|
match session_manager.handle_message(
|
||||||
&inbound.channel,
|
&inbound.channel,
|
||||||
&inbound.sender_id,
|
&inbound.sender_id,
|
||||||
@ -79,9 +99,6 @@ impl GatewayState {
|
|||||||
inbound.media,
|
inbound.media,
|
||||||
).await {
|
).await {
|
||||||
Ok(response_content) => {
|
Ok(response_content) => {
|
||||||
// Forward channel-specific metadata from inbound to outbound.
|
|
||||||
// This allows channels to propagate context (e.g. feishu message_id for reaction cleanup)
|
|
||||||
// without gateway needing channel-specific code.
|
|
||||||
let outbound = crate::bus::OutboundMessage {
|
let outbound = crate::bus::OutboundMessage {
|
||||||
channel: inbound.channel.clone(),
|
channel: inbound.channel.clone(),
|
||||||
chat_id: inbound.chat_id.clone(),
|
chat_id: inbound.chat_id.clone(),
|
||||||
@ -90,7 +107,7 @@ impl GatewayState {
|
|||||||
media: vec![],
|
media: vec![],
|
||||||
metadata: inbound.forwarded_metadata,
|
metadata: inbound.forwarded_metadata,
|
||||||
};
|
};
|
||||||
if let Err(e) = bus_for_inbound.publish_outbound(outbound).await {
|
if let Err(e) = bus.publish_outbound(outbound).await {
|
||||||
tracing::error!(error = %e, "Failed to publish outbound");
|
tracing::error!(error = %e, "Failed to publish outbound");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -99,22 +116,84 @@ impl GatewayState {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Control: session management operations
|
||||||
|
msg = bus.consume_control() => {
|
||||||
|
Self::handle_control_message(&session_manager, msg).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Spawn outbound dispatcher
|
// Spawn outbound dispatcher
|
||||||
let dispatcher = OutboundDispatcher::new(bus_for_outbound);
|
let dispatcher = OutboundDispatcher::new(bus_for_outbound, self.channel_manager.clone());
|
||||||
let channel_manager = self.channel_manager.clone();
|
|
||||||
|
|
||||||
// Register channels with dispatcher
|
|
||||||
if let Some(channel) = channel_manager.get_channel("feishu").await {
|
|
||||||
dispatcher.register_channel("feishu", channel).await;
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
tracing::info!("Outbound dispatcher started");
|
tracing::info!("Outbound dispatcher started");
|
||||||
dispatcher.run().await;
|
dispatcher.run().await;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Handle control messages (session management operations)
|
||||||
|
async fn handle_control_message(
|
||||||
|
session_manager: &SessionManager,
|
||||||
|
msg: ControlMessage,
|
||||||
|
) {
|
||||||
|
let reply_tx = msg.reply_tx;
|
||||||
|
let result = match msg.op {
|
||||||
|
ControlInbound::CreateSession { title } => {
|
||||||
|
session_manager.create_cli_session(title.as_deref())
|
||||||
|
.map(|record| ControlOutbound::SessionCreated {
|
||||||
|
session_id: record.id,
|
||||||
|
title: record.title,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
ControlInbound::ListSessions { include_archived } => {
|
||||||
|
session_manager.list_cli_sessions(include_archived)
|
||||||
|
.map(|records| ControlOutbound::SessionList {
|
||||||
|
sessions: records.into_iter().map(|r| SessionSummary {
|
||||||
|
session_id: r.id,
|
||||||
|
title: r.title,
|
||||||
|
channel_name: r.channel_name,
|
||||||
|
chat_id: r.chat_id,
|
||||||
|
message_count: r.message_count,
|
||||||
|
last_active_at: r.last_active_at,
|
||||||
|
archived_at: r.archived_at,
|
||||||
|
}).collect()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
ControlInbound::LoadSession { session_id } => {
|
||||||
|
session_manager.get_session_record(&session_id)
|
||||||
|
.map(|opt| opt.map(|r| ControlOutbound::SessionLoaded {
|
||||||
|
session_id: r.id,
|
||||||
|
title: r.title,
|
||||||
|
message_count: r.message_count,
|
||||||
|
}).unwrap_or_else(|| ControlOutbound::Error {
|
||||||
|
code: "SESSION_NOT_FOUND".to_string(),
|
||||||
|
message: format!("Session not found: {}", session_id),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
ControlInbound::RenameSession { session_id, title } => {
|
||||||
|
session_manager.rename_session(&session_id, &title)
|
||||||
|
.map(|()| ControlOutbound::SessionRenamed { session_id, title })
|
||||||
|
}
|
||||||
|
ControlInbound::ArchiveSession { session_id } => {
|
||||||
|
session_manager.archive_session(&session_id)
|
||||||
|
.map(|()| ControlOutbound::SessionArchived { session_id })
|
||||||
|
}
|
||||||
|
ControlInbound::DeleteSession { session_id } => {
|
||||||
|
session_manager.delete_session(&session_id)
|
||||||
|
.map(|()| ControlOutbound::SessionDeleted { session_id })
|
||||||
|
}
|
||||||
|
ControlInbound::ClearHistory { session_id } => {
|
||||||
|
session_manager.clear_session_messages(&session_id)
|
||||||
|
.map(|()| ControlOutbound::HistoryCleared { session_id })
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = result.map_err(|e| ChannelError::Other(e.to_string()));
|
||||||
|
let _ = reply_tx.send(result).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn std::error::Error>> {
|
pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
@ -128,10 +207,13 @@ pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn
|
|||||||
let provider_config = state.config.get_provider_config("default")?;
|
let provider_config = state.config.get_provider_config("default")?;
|
||||||
|
|
||||||
// Initialize and start channels
|
// Initialize and start channels
|
||||||
state.channel_manager.init(&state.config, provider_config.clone()).await?;
|
state.channel_manager.init(
|
||||||
|
&state.config,
|
||||||
|
provider_config.clone(),
|
||||||
|
).await?;
|
||||||
state.channel_manager.start_all().await?;
|
state.channel_manager.start_all().await?;
|
||||||
|
|
||||||
// Start message processing (inbound processor + outbound dispatcher)
|
// Start message processing (inbound processor + control processor + outbound dispatcher)
|
||||||
state.start_message_processing().await;
|
state.start_message_processing().await;
|
||||||
|
|
||||||
// CLI args override config file values
|
// CLI args override config file values
|
||||||
|
|||||||
@ -3,328 +3,60 @@ use axum::extract::ws::{WebSocket, WebSocketUpgrade, Message as WsMessage};
|
|||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
use axum::response::Response;
|
use axum::response::Response;
|
||||||
use futures_util::{SinkExt, StreamExt};
|
use futures_util::{SinkExt, StreamExt};
|
||||||
use tokio::sync::{mpsc, Mutex};
|
use tokio::sync::mpsc;
|
||||||
use crate::protocol::{parse_inbound, serialize_outbound, SessionSummary, WsInbound, WsOutbound};
|
use crate::protocol::serialize_outbound;
|
||||||
use super::{GatewayState, session::{Session, handle_in_chat_command}};
|
use crate::protocol::WsOutbound;
|
||||||
|
use super::GatewayState;
|
||||||
|
|
||||||
pub async fn ws_handler(ws: WebSocketUpgrade, State(state): State<Arc<GatewayState>>) -> Response {
|
pub async fn ws_handler(ws: WebSocketUpgrade, State(state): State<Arc<GatewayState>>) -> Response {
|
||||||
ws.on_upgrade(|socket| async {
|
ws.on_upgrade(|socket| async move {
|
||||||
handle_socket(socket, state).await;
|
handle_socket(socket, state).await;
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
|
async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
|
||||||
let (sender, receiver) = mpsc::channel::<WsOutbound>(100);
|
// Create channel for sending outbound messages to this client
|
||||||
|
let (sender, mut receiver) = mpsc::channel::<WsOutbound>(100);
|
||||||
|
|
||||||
let provider_config = match state.config.get_provider_config("default") {
|
// Get CLI chat channel
|
||||||
Ok(cfg) => cfg,
|
let cli_chat_channel = state.cli_chat_channel();
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(error = %e, "Failed to get provider config");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let initial_record = match state.session_manager.create_cli_session(None) {
|
// Register client with CliChatChannel and get initial session id
|
||||||
Ok(record) => record,
|
let (session_id, client) = cli_chat_channel.register_client(sender.clone()).await;
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(error = %e, "Failed to create initial CLI session");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let channel_name = "cli".to_string();
|
// Send session established message
|
||||||
|
let _ = sender.send(WsOutbound::SessionEstablished {
|
||||||
|
session_id: session_id.clone(),
|
||||||
|
}).await;
|
||||||
|
|
||||||
// 创建 CLI session
|
tracing::info!(session_id = %session_id, "CLI session established");
|
||||||
let session = match Session::new(
|
|
||||||
channel_name.clone(),
|
|
||||||
provider_config,
|
|
||||||
sender,
|
|
||||||
state.session_manager.tools(),
|
|
||||||
state.session_manager.store(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(s) => Arc::new(Mutex::new(s)),
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(error = %e, "Failed to create session");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if let Err(e) = session.lock().await.ensure_chat_loaded(&initial_record.id) {
|
|
||||||
tracing::error!(error = %e, session_id = %initial_record.id, "Failed to load initial CLI session history");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
let runtime_session_id = session.lock().await.id;
|
|
||||||
let mut current_session_id = initial_record.id.clone();
|
|
||||||
tracing::info!(runtime_session_id = %runtime_session_id, session_id = %current_session_id, "CLI session established");
|
|
||||||
|
|
||||||
let _ = session
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.send(WsOutbound::SessionEstablished {
|
|
||||||
session_id: current_session_id.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let (mut ws_sender, mut ws_receiver) = ws.split();
|
let (mut ws_sender, mut ws_receiver) = ws.split();
|
||||||
|
|
||||||
let mut receiver = receiver;
|
// Task: forward from receiver to WebSocket
|
||||||
let session_id_for_sender = runtime_session_id;
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while let Some(msg) = receiver.recv().await {
|
while let Some(msg) = receiver.recv().await {
|
||||||
if let Ok(text) = serialize_outbound(&msg) {
|
if let Ok(text) = serialize_outbound(&msg) {
|
||||||
if ws_sender.send(WsMessage::Text(text.into())).await.is_err() {
|
if ws_sender.send(WsMessage::Text(text.into())).await.is_err() {
|
||||||
#[cfg(debug_assertions)]
|
|
||||||
tracing::debug!(session_id = %session_id_for_sender, "WebSocket send error");
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Main loop: receive WebSocket messages and forward to CliChatChannel
|
||||||
while let Some(msg) = ws_receiver.next().await {
|
while let Some(msg) = ws_receiver.next().await {
|
||||||
match msg {
|
match msg {
|
||||||
Ok(WsMessage::Text(text)) => {
|
Ok(WsMessage::Text(text)) => {
|
||||||
let text = text.to_string();
|
cli_chat_channel.handle_inbound(client.clone(), &text).await;
|
||||||
match parse_inbound(&text) {
|
|
||||||
Ok(inbound) => {
|
|
||||||
if let Err(e) = handle_inbound(&state, &session, &mut current_session_id, inbound).await {
|
|
||||||
tracing::warn!(error = %e, session_id = %current_session_id, "Failed to handle inbound message");
|
|
||||||
let _ = session
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.send(WsOutbound::Error {
|
|
||||||
code: "SESSION_ERROR".to_string(),
|
|
||||||
message: e.to_string(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(error = %e, "Failed to parse inbound message");
|
|
||||||
let _ = session
|
|
||||||
.lock()
|
|
||||||
.await
|
|
||||||
.send(WsOutbound::Error {
|
|
||||||
code: "PARSE_ERROR".to_string(),
|
|
||||||
message: e.to_string(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Ok(WsMessage::Close(_)) | Err(_) => {
|
Ok(WsMessage::Close(_)) | Err(_) => {
|
||||||
#[cfg(debug_assertions)]
|
tracing::debug!(session_id = %session_id, "WebSocket closed");
|
||||||
tracing::debug!(session_id = %runtime_session_id, "WebSocket closed");
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
tracing::info!(session_id = %runtime_session_id, current_session_id = %current_session_id, "CLI session ended");
|
tracing::info!(session_id = %session_id, "CLI session ended");
|
||||||
}
|
|
||||||
|
|
||||||
fn to_session_summary(record: crate::storage::SessionRecord) -> SessionSummary {
|
|
||||||
SessionSummary {
|
|
||||||
session_id: record.id,
|
|
||||||
title: record.title,
|
|
||||||
channel_name: record.channel_name,
|
|
||||||
chat_id: record.chat_id,
|
|
||||||
message_count: record.message_count,
|
|
||||||
last_active_at: record.last_active_at,
|
|
||||||
archived_at: record.archived_at,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_inbound(
|
|
||||||
state: &Arc<GatewayState>,
|
|
||||||
session: &Arc<Mutex<Session>>,
|
|
||||||
current_session_id: &mut String,
|
|
||||||
inbound: WsInbound,
|
|
||||||
) -> Result<(), crate::agent::AgentError> {
|
|
||||||
match inbound {
|
|
||||||
WsInbound::UserInput { content, chat_id, .. } => {
|
|
||||||
let chat_id = chat_id.unwrap_or_else(|| current_session_id.clone());
|
|
||||||
let mut session_guard = session.lock().await;
|
|
||||||
|
|
||||||
session_guard.ensure_persistent_session(&chat_id)?;
|
|
||||||
session_guard.ensure_chat_loaded(&chat_id)?;
|
|
||||||
|
|
||||||
if let Some(command_response) = handle_in_chat_command(&mut session_guard, &chat_id, &content)? {
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::AssistantResponse {
|
|
||||||
id: uuid::Uuid::new_v4().to_string(),
|
|
||||||
content: command_response,
|
|
||||||
role: "assistant".to_string(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
let user_message = session_guard.create_user_message(&content, Vec::new());
|
|
||||||
session_guard.append_persisted_message(&chat_id, user_message)?;
|
|
||||||
|
|
||||||
let raw_history = session_guard.get_or_create_history(&chat_id).clone();
|
|
||||||
let history = match session_guard
|
|
||||||
.compressor()
|
|
||||||
.compress_if_needed(raw_history)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(history) => history,
|
|
||||||
Err(error) => {
|
|
||||||
tracing::warn!(chat_id = %chat_id, error = %error, "Compression failed, using original history");
|
|
||||||
session_guard.get_or_create_history(&chat_id).clone()
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let agent = session_guard.create_agent()?;
|
|
||||||
match agent.process(history).await {
|
|
||||||
Ok(result) => {
|
|
||||||
session_guard.append_persisted_messages(&chat_id, result.emitted_messages.clone())?;
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::AssistantResponse {
|
|
||||||
id: result.final_response.id,
|
|
||||||
content: result.final_response.content,
|
|
||||||
role: result.final_response.role,
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
Err(error) => {
|
|
||||||
tracing::error!(chat_id = %chat_id, error = %error, "Agent process error");
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::Error {
|
|
||||||
code: "LLM_ERROR".to_string(),
|
|
||||||
message: error.to_string(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
WsInbound::ClearHistory { session_id, chat_id } => {
|
|
||||||
let target = session_id.or(chat_id).unwrap_or_else(|| current_session_id.clone());
|
|
||||||
state.session_manager.clear_session_messages(&target)?;
|
|
||||||
|
|
||||||
let mut session_guard = session.lock().await;
|
|
||||||
session_guard.remove_history(&target);
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::HistoryCleared {
|
|
||||||
session_id: target,
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
WsInbound::CreateSession { title } => {
|
|
||||||
let record = state.session_manager.create_cli_session(title.as_deref())?;
|
|
||||||
*current_session_id = record.id.clone();
|
|
||||||
|
|
||||||
let mut session_guard = session.lock().await;
|
|
||||||
session_guard.ensure_chat_loaded(&record.id)?;
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::SessionCreated {
|
|
||||||
session_id: record.id,
|
|
||||||
title: record.title,
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
WsInbound::ListSessions { include_archived } => {
|
|
||||||
let records = state.session_manager.list_cli_sessions(include_archived)?;
|
|
||||||
let summaries = records.into_iter().map(to_session_summary).collect();
|
|
||||||
|
|
||||||
let session_guard = session.lock().await;
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::SessionList {
|
|
||||||
sessions: summaries,
|
|
||||||
current_session_id: Some(current_session_id.clone()),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
WsInbound::LoadSession { session_id } => {
|
|
||||||
let Some(record) = state.session_manager.get_session_record(&session_id)? else {
|
|
||||||
let session_guard = session.lock().await;
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::Error {
|
|
||||||
code: "SESSION_NOT_FOUND".to_string(),
|
|
||||||
message: format!("Session not found: {}", session_id),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
return Ok(());
|
|
||||||
};
|
|
||||||
|
|
||||||
*current_session_id = record.id.clone();
|
|
||||||
let mut session_guard = session.lock().await;
|
|
||||||
session_guard.ensure_chat_loaded(&record.id)?;
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::SessionLoaded {
|
|
||||||
session_id: record.id,
|
|
||||||
title: record.title,
|
|
||||||
message_count: record.message_count,
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
WsInbound::RenameSession { session_id, title } => {
|
|
||||||
let target = session_id.unwrap_or_else(|| current_session_id.clone());
|
|
||||||
state.session_manager.rename_session(&target, &title)?;
|
|
||||||
let session_guard = session.lock().await;
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::SessionRenamed {
|
|
||||||
session_id: target,
|
|
||||||
title,
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
WsInbound::ArchiveSession { session_id } => {
|
|
||||||
let target = session_id.unwrap_or_else(|| current_session_id.clone());
|
|
||||||
state.session_manager.archive_session(&target)?;
|
|
||||||
let session_guard = session.lock().await;
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::SessionArchived { session_id: target })
|
|
||||||
.await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
WsInbound::DeleteSession { session_id } => {
|
|
||||||
let target = session_id.unwrap_or_else(|| current_session_id.clone());
|
|
||||||
state.session_manager.delete_session(&target)?;
|
|
||||||
|
|
||||||
let replacement = if target == *current_session_id {
|
|
||||||
Some(state.session_manager.create_cli_session(None)?)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut session_guard = session.lock().await;
|
|
||||||
session_guard.remove_history(&target);
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::SessionDeleted {
|
|
||||||
session_id: target.clone(),
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
|
|
||||||
if let Some(record) = replacement {
|
|
||||||
*current_session_id = record.id.clone();
|
|
||||||
session_guard.ensure_chat_loaded(&record.id)?;
|
|
||||||
let _ = session_guard
|
|
||||||
.send(WsOutbound::SessionCreated {
|
|
||||||
session_id: record.id,
|
|
||||||
title: record.title,
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
WsInbound::Ping => {
|
|
||||||
let session_guard = session.lock().await;
|
|
||||||
let _ = session_guard.send(WsOutbound::Pong).await;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,7 +1,6 @@
|
|||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod providers;
|
pub mod providers;
|
||||||
pub mod bus;
|
pub mod bus;
|
||||||
pub mod cli;
|
|
||||||
pub mod agent;
|
pub mod agent;
|
||||||
pub mod gateway;
|
pub mod gateway;
|
||||||
pub mod client;
|
pub mod client;
|
||||||
|
|||||||
@ -5,7 +5,7 @@ use clap::{Parser, CommandFactory};
|
|||||||
#[command(about = "A CLI chatbot", long_about = None)]
|
#[command(about = "A CLI chatbot", long_about = None)]
|
||||||
enum Command {
|
enum Command {
|
||||||
/// Connect to gateway
|
/// Connect to gateway
|
||||||
Agent {
|
Chat {
|
||||||
/// Gateway WebSocket URL (e.g., ws://127.0.0.1:19876/ws)
|
/// Gateway WebSocket URL (e.g., ws://127.0.0.1:19876/ws)
|
||||||
#[arg(long)]
|
#[arg(long)]
|
||||||
gateway_url: Option<String>,
|
gateway_url: Option<String>,
|
||||||
@ -33,7 +33,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
match Command::parse() {
|
match Command::parse() {
|
||||||
Command::Agent { gateway_url } => {
|
Command::Chat { gateway_url } => {
|
||||||
let config = picobot::config::Config::load_default().ok();
|
let config = picobot::config::Config::load_default().ok();
|
||||||
let url = gateway_url
|
let url = gateway_url
|
||||||
.or_else(|| config.as_ref().map(|c| c.client.gateway_url.clone()))
|
.or_else(|| config.as_ref().map(|c| c.client.gateway_url.clone()))
|
||||||
|
|||||||
@ -113,7 +113,7 @@ impl SessionStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn create_cli_session(&self, title: Option<&str>) -> Result<SessionRecord, StorageError> {
|
pub fn create_cli_session(&self, title: Option<&str>) -> Result<SessionRecord, StorageError> {
|
||||||
let now = current_timestamp();
|
let now = crate::bus::message::current_timestamp();
|
||||||
let id = uuid::Uuid::new_v4().to_string();
|
let id = uuid::Uuid::new_v4().to_string();
|
||||||
let title = title
|
let title = title
|
||||||
.map(str::trim)
|
.map(str::trim)
|
||||||
@ -127,7 +127,7 @@ impl SessionStore {
|
|||||||
INSERT INTO sessions (
|
INSERT INTO sessions (
|
||||||
id, title, channel_name, chat_id, summary,
|
id, title, channel_name, chat_id, summary,
|
||||||
created_at, updated_at, last_active_at, archived_at, deleted_at, message_count
|
created_at, updated_at, last_active_at, archived_at, deleted_at, message_count
|
||||||
) VALUES (?1, ?2, 'cli', ?3, NULL, ?4, ?4, ?4, NULL, NULL, 0)
|
) VALUES (?1, ?2, 'cli_chat', ?3, NULL, ?4, ?4, ?4, NULL, NULL, 0)
|
||||||
",
|
",
|
||||||
params![id, title, id, now],
|
params![id, title, id, now],
|
||||||
)?;
|
)?;
|
||||||
@ -146,7 +146,7 @@ impl SessionStore {
|
|||||||
return Ok(record);
|
return Ok(record);
|
||||||
}
|
}
|
||||||
|
|
||||||
let now = current_timestamp();
|
let now = crate::bus::message::current_timestamp();
|
||||||
let title = format!("{}:{}", channel_name, chat_id);
|
let title = format!("{}:{}", channel_name, chat_id);
|
||||||
let conn = self.conn.lock().expect("session db mutex poisoned");
|
let conn = self.conn.lock().expect("session db mutex poisoned");
|
||||||
conn.execute(
|
conn.execute(
|
||||||
@ -213,7 +213,7 @@ impl SessionStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn rename_session(&self, session_id: &str, title: &str) -> Result<(), StorageError> {
|
pub fn rename_session(&self, session_id: &str, title: &str) -> Result<(), StorageError> {
|
||||||
let now = current_timestamp();
|
let now = crate::bus::message::current_timestamp();
|
||||||
let conn = self.conn.lock().expect("session db mutex poisoned");
|
let conn = self.conn.lock().expect("session db mutex poisoned");
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"UPDATE sessions SET title = ?2, updated_at = ?3 WHERE id = ?1 AND deleted_at IS NULL",
|
"UPDATE sessions SET title = ?2, updated_at = ?3 WHERE id = ?1 AND deleted_at IS NULL",
|
||||||
@ -223,7 +223,7 @@ impl SessionStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn archive_session(&self, session_id: &str) -> Result<(), StorageError> {
|
pub fn archive_session(&self, session_id: &str) -> Result<(), StorageError> {
|
||||||
let now = current_timestamp();
|
let now = crate::bus::message::current_timestamp();
|
||||||
let conn = self.conn.lock().expect("session db mutex poisoned");
|
let conn = self.conn.lock().expect("session db mutex poisoned");
|
||||||
conn.execute(
|
conn.execute(
|
||||||
"UPDATE sessions SET archived_at = ?2, updated_at = ?2 WHERE id = ?1 AND deleted_at IS NULL",
|
"UPDATE sessions SET archived_at = ?2, updated_at = ?2 WHERE id = ?1 AND deleted_at IS NULL",
|
||||||
@ -240,7 +240,7 @@ impl SessionStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> {
|
pub fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> {
|
||||||
let now = current_timestamp();
|
let now = crate::bus::message::current_timestamp();
|
||||||
let conn = self.conn.lock().expect("session db mutex poisoned");
|
let conn = self.conn.lock().expect("session db mutex poisoned");
|
||||||
conn.execute("DELETE FROM messages WHERE session_id = ?1", params![session_id])?;
|
conn.execute("DELETE FROM messages WHERE session_id = ?1", params![session_id])?;
|
||||||
conn.execute(
|
conn.execute(
|
||||||
@ -255,7 +255,7 @@ impl SessionStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn reset_session(&self, session_id: &str) -> Result<(), StorageError> {
|
pub fn reset_session(&self, session_id: &str) -> Result<(), StorageError> {
|
||||||
let now = current_timestamp();
|
let now = crate::bus::message::current_timestamp();
|
||||||
let conn = self.conn.lock().expect("session db mutex poisoned");
|
let conn = self.conn.lock().expect("session db mutex poisoned");
|
||||||
let tx = conn.unchecked_transaction()?;
|
let tx = conn.unchecked_transaction()?;
|
||||||
|
|
||||||
@ -314,7 +314,7 @@ impl SessionStore {
|
|||||||
],
|
],
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let now = current_timestamp();
|
let now = crate::bus::message::current_timestamp();
|
||||||
tx.execute(
|
tx.execute(
|
||||||
"
|
"
|
||||||
UPDATE sessions
|
UPDATE sessions
|
||||||
@ -344,7 +344,7 @@ impl SessionStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub fn persistent_session_id(channel_name: &str, chat_id: &str) -> String {
|
pub fn persistent_session_id(channel_name: &str, chat_id: &str) -> String {
|
||||||
if channel_name == "cli" {
|
if channel_name == "cli" || channel_name == "cli_chat" {
|
||||||
chat_id.to_string()
|
chat_id.to_string()
|
||||||
} else {
|
} else {
|
||||||
format!("{}:{}", channel_name, chat_id)
|
format!("{}:{}", channel_name, chat_id)
|
||||||
@ -467,13 +467,6 @@ fn load_messages_after(
|
|||||||
Ok(messages)
|
Ok(messages)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn current_timestamp() -> i64 {
|
|
||||||
std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.expect("system clock before unix epoch")
|
|
||||||
.as_millis() as i64
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
@ -482,6 +475,7 @@ mod tests {
|
|||||||
#[test]
|
#[test]
|
||||||
fn test_persistent_session_id_for_cli_and_channel() {
|
fn test_persistent_session_id_for_cli_and_channel() {
|
||||||
assert_eq!(persistent_session_id("cli", "abc"), "abc");
|
assert_eq!(persistent_session_id("cli", "abc"), "abc");
|
||||||
|
assert_eq!(persistent_session_id("cli_chat", "abc"), "abc");
|
||||||
assert_eq!(persistent_session_id("feishu", "abc"), "feishu:abc");
|
assert_eq!(persistent_session_id("feishu", "abc"), "feishu:abc");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -491,7 +485,7 @@ mod tests {
|
|||||||
|
|
||||||
let session = store.create_cli_session(Some("demo")).unwrap();
|
let session = store.create_cli_session(Some("demo")).unwrap();
|
||||||
assert_eq!(session.title, "demo");
|
assert_eq!(session.title, "demo");
|
||||||
assert_eq!(session.channel_name, "cli");
|
assert_eq!(session.channel_name, "cli_chat");
|
||||||
assert_eq!(session.chat_id, session.id);
|
assert_eq!(session.chat_id, session.id);
|
||||||
assert_eq!(session.message_count, 0);
|
assert_eq!(session.message_count, 0);
|
||||||
assert_eq!(session.reset_cutoff_seq, 0);
|
assert_eq!(session.reset_cutoff_seq, 0);
|
||||||
@ -521,10 +515,10 @@ mod tests {
|
|||||||
let archived = store.get_session(&session.id).unwrap().unwrap();
|
let archived = store.get_session(&session.id).unwrap().unwrap();
|
||||||
assert!(archived.archived_at.is_some());
|
assert!(archived.archived_at.is_some());
|
||||||
|
|
||||||
let active_only = store.list_sessions("cli", false).unwrap();
|
let active_only = store.list_sessions("cli_chat", false).unwrap();
|
||||||
assert!(active_only.is_empty());
|
assert!(active_only.is_empty());
|
||||||
|
|
||||||
let including_archived = store.list_sessions("cli", true).unwrap();
|
let including_archived = store.list_sessions("cli_chat", true).unwrap();
|
||||||
assert_eq!(including_archived.len(), 1);
|
assert_eq!(including_archived.len(), 1);
|
||||||
|
|
||||||
store.clear_messages(&session.id).unwrap();
|
store.clear_messages(&session.id).unwrap();
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user