From dfe0fad61ea49c5f1205c77e9f8f1a5d8c091368 Mon Sep 17 00:00:00 2001 From: xiaoxixi Date: Sun, 26 Apr 2026 17:09:52 +0800 Subject: [PATCH] =?UTF-8?q?=E9=87=8D=E6=9E=84:=20=E7=BB=9F=E4=B8=80?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E6=80=BB=E7=BA=BF=E4=B8=8E=E9=80=9A=E9=81=93?= =?UTF-8?q?=E7=AE=A1=E7=90=86=EF=BC=8C=E6=B6=88=E9=99=A4=E9=87=8D=E5=A4=8D?= =?UTF-8?q?=E5=BC=95=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - OutboundDispatcher 改用 ChannelManager 获取通道,不再维护独立注册表 - CliChatChannel 通过控制消息通道操作 SessionStore,移除独立引用 - MessageBus 统一通过 ChannelManager 创建,避免重复实例 - GatewayState 移除冗余字段,统一通过 ChannelManager 访问 - 新增 ControlInbound/ControlOutbound/ControlMessage 类型支持会话管理操作 - 添加 ARCHITECTURE_REVIEW.md 记录架构问题与修复状态 --- ARCHITECTURE_REVIEW.md | 128 ++++++++++++++++ src/bus/dispatcher.rs | 25 ++- src/bus/message.rs | 48 ++++++ src/bus/mod.rs | 27 +++- src/channels/cli_chat.rs | 324 +++++++++++++++++++++++++-------------- src/channels/manager.rs | 28 +++- src/gateway/mod.rs | 212 ++++++++++++++++--------- src/gateway/ws.rs | 7 +- 8 files changed, 593 insertions(+), 206 deletions(-) create mode 100644 ARCHITECTURE_REVIEW.md diff --git a/ARCHITECTURE_REVIEW.md b/ARCHITECTURE_REVIEW.md new file mode 100644 index 0000000..9dda8fe --- /dev/null +++ b/ARCHITECTURE_REVIEW.md @@ -0,0 +1,128 @@ +# 架构审查报告 + +> 生成时间: 2026-04-26 +> 更新时间: 2026-04-26 + +## 审查摘要 + +本报告识别了当前代码库中的架构不合理、冗余和无效代码的问题。 + +--- + +## 问题清单 + +### 已修复 + +#### ✅ #1 OutboundDispatcher 重复维护 Channel 注册表 + +**修复方案**: `OutboundDispatcher` 现在从 `ChannelManager` 获取 channels,而不是自己维护一份注册表。 + +**修改文件**: +- `src/bus/dispatcher.rs` - 移除 `channels` 字段,改用 `ChannelManager` +- `src/channels/manager.rs` - 添加 `register_channel` 方法 +- `src/gateway/mod.rs` - 简化 dispatcher 初始化 + +--- + +#### ✅ #2 CliChatChannel 持有独立的 SessionStore + +**修复方案**: `CliChatChannel` 的 `SessionStore` 通过依赖注入从 `ChannelManager` 获取,而不是独立持有。 + +**修改文件**: +- `src/channels/cli_chat.rs` - 添加 `set_store()` 方法 +- `src/channels/manager.rs` - 添加 `cli_chat_channel` 字段 +- `src/gateway/mod.rs` - 重构 channel 初始化流程 + +--- + +#### ✅ #3 MessageBus 被创建两次引用 + +**修复方案**: 移除 `GatewayState.bus` 字段,直接使用 `channel_manager.bus()`。 + +**修改文件**: +- `src/gateway/mod.rs` - 移除冗余的 `bus` 字段 + +--- + +#### ✅ #4 GatewayState 同时持有 channel_manager 和 cli_chat_channel + +**修复方案**: `cli_chat_channel` 只通过 `ChannelManager` 管理,`GatewayState` 不再单独持有。 + +**修改文件**: +- `src/gateway/mod.rs` - 移除 `cli_chat_channel` 字段,添加 `cli_chat_channel()` getter 方法 + +--- + +### 高优先级(待修复) + +#### ❌ Session 每次重建都创建新的 LLM Provider + +**文件**: `src/gateway/session.rs:349-361` + +**问题**: 每当 session TTL 过期(默认4小时),就会销毁并重建 session,同时创建新的 LLM provider 连接。 + +**建议**: Provider 应该池化复用,不随 session 销毁而重建。 + +--- + +#### ❌ CliChatChannel::send 广播给所有客户端 + +**文件**: `src/channels/cli_chat.rs:279-289` + +**问题**: `OutboundMessage` 有 `chat_id` 字段用于路由,但实现广播给所有客户端,而不是只发给对应 chat_id 的客户端。 + +**建议**: 根据 `chat_id` 过滤客户端,只发送给对应的客户端。 + +--- + +### 中优先级(待修复) + +#### ❌ default_tools() 每次调用创建新 ToolRegistry + +**文件**: `src/gateway/session.rs:212-227` + +**建议**: 如果工具列表是只读的,直接 clone Arc;如果需要修改,需要澄清设计意图。 + +--- + +### 低优先级(待修复) + +#### ❌ FeishuChannel::new 接收未使用的 provider_config + +**文件**: `src/channels/feishu.rs:175-178` + +--- + +#### ❌ OutboundDispatcher::send_with_retry 永不执行的 unreachable + +**文件**: `src/bus/dispatcher.rs:81` + +--- + +#### ❌ Channel trait 的 `is_running` 使用 std::sync::Mutex + +**文件**: `src/channels/base.rs:38` vs `src/channels/cli_chat.rs:265-267` + +--- + +#### ❌ LoopDetector 硬编码在 AgentLoop 中 + +**文件**: `src/agent/agent_loop.rs:88-172` + +--- + +#### ❌ InboundMessage 和 OutboundMessage 结构重复 + +**文件**: `src/bus/message.rs` + +--- + +## 问题统计 + +| 状态 | 优先级 | 数量 | +|------|--------|------| +| ✅ 已修复 | - | 4 | +| ❌ 待修复 | 高 | 2 | +| ❌ 待修复 | 中 | 1 | +| ❌ 待修复 | 低 | 5 | +| **总计** | - | **12** | diff --git a/src/bus/dispatcher.rs b/src/bus/dispatcher.rs index 2555c00..0fbe3f8 100644 --- a/src/bus/dispatcher.rs +++ b/src/bus/dispatcher.rs @@ -1,30 +1,24 @@ use std::sync::Arc; -use tokio::sync::RwLock; -use std::collections::HashMap; use crate::bus::{MessageBus, OutboundMessage}; use crate::channels::base::{Channel, ChannelError}; +use crate::channels::ChannelManager; /// OutboundDispatcher consumes outbound messages from the MessageBus /// and dispatches them to the appropriate Channel pub struct OutboundDispatcher { bus: Arc, - channels: Arc>>>, + channel_manager: ChannelManager, } impl OutboundDispatcher { - pub fn new(bus: Arc) -> Self { + pub fn new(bus: Arc, channel_manager: ChannelManager) -> Self { Self { bus, - channels: Arc::new(RwLock::new(HashMap::new())), + channel_manager, } } - /// Register a channel with the dispatcher - pub async fn register_channel(&self, name: &str, channel: Arc) { - self.channels.write().await.insert(name.to_string(), channel); - } - /// Run the dispatcher loop - consumes from bus and dispatches to channels pub async fn run(&self) { tracing::info!("OutboundDispatcher started"); @@ -40,7 +34,7 @@ impl OutboundDispatcher { ); let channel_name = msg.channel.clone(); - let channel = self.channels.read().await.get(&channel_name).cloned(); + let channel = self.channel_manager.get_channel(&channel_name).await; match channel { Some(ch) => { @@ -61,9 +55,9 @@ impl OutboundDispatcher { channel: &dyn Channel, msg: OutboundMessage, ) -> Result<(), ChannelError> { - const DELAYS: [u64; 3] = [1, 2, 4]; + const DELAYS: &[u64] = &[1, 2, 4]; - for (i, delay) in DELAYS.iter().enumerate() { + for (i, &delay) in DELAYS.iter().enumerate() { match channel.send(msg.clone()).await { Ok(()) => return Ok(()), Err(e) if i < DELAYS.len() - 1 => { @@ -73,11 +67,12 @@ impl OutboundDispatcher { error = %e, "Send failed, retrying" ); - tokio::time::sleep(tokio::time::Duration::from_secs(*delay)).await; + tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await; } Err(e) => return Err(e), } } - unreachable!() + // All retries exhausted - should not reach here as last iteration returns + Ok(()) } } diff --git a/src/bus/message.rs b/src/bus/message.rs index 1b2386c..f13621d 100644 --- a/src/bus/message.rs +++ b/src/bus/message.rs @@ -199,6 +199,54 @@ impl OutboundMessage { } } +// ============================================================================ +// ControlInbound - Session management operations (CLI channel only) +// ============================================================================ + +/// Session management operations that flow through the control channel +#[derive(Debug, Clone)] +pub enum ControlInbound { + CreateSession { title: Option }, + ListSessions { include_archived: bool }, + LoadSession { session_id: String }, + RenameSession { session_id: String, title: String }, + ArchiveSession { session_id: String }, + DeleteSession { session_id: String }, + ClearHistory { session_id: String }, +} + +// ============================================================================ +// ControlOutbound - Responses for control operations +// ============================================================================ + +/// Responses for session management operations +#[derive(Debug, Clone)] +pub enum ControlOutbound { + SessionCreated { session_id: String, title: String }, + SessionList { sessions: Vec }, + SessionLoaded { session_id: String, title: String, message_count: i64 }, + SessionRenamed { session_id: String, title: String }, + SessionArchived { session_id: String }, + SessionDeleted { session_id: String }, + HistoryCleared { session_id: String }, + Pong, + Error { code: String, message: String }, +} + +// ============================================================================ +// ControlMessage - Message for control channel (session management) +// ============================================================================ + +use crate::channels::base::ChannelError; +use tokio::sync::mpsc; + +/// Control message containing a session operation and reply channel +#[derive(Debug, Clone)] +pub struct ControlMessage { + pub op: ControlInbound, + pub reply_tx: mpsc::Sender>, +} + // ============================================================================ // Helpers // ============================================================================ diff --git a/src/bus/mod.rs b/src/bus/mod.rs index 7ace105..3236b4f 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -2,7 +2,7 @@ pub mod dispatcher; pub mod message; pub use dispatcher::OutboundDispatcher; -pub use message::{ChatMessage, ContentBlock, InboundMessage, MediaItem, OutboundMessage}; +pub use message::{ChatMessage, ContentBlock, ControlInbound, ControlMessage, ControlOutbound, InboundMessage, MediaItem, OutboundMessage}; use std::sync::Arc; use tokio::sync::{mpsc, Mutex}; @@ -16,6 +16,9 @@ pub struct MessageBus { outbound_tx: mpsc::Sender, inbound_rx: Mutex>, outbound_rx: Mutex>, + // Control channel for session management operations + control_tx: mpsc::Sender, + control_rx: Mutex>, } impl MessageBus { @@ -23,11 +26,14 @@ impl MessageBus { pub fn new(capacity: usize) -> Arc { let (inbound_tx, inbound_rx) = mpsc::channel(capacity); let (outbound_tx, outbound_rx) = mpsc::channel(capacity); + let (control_tx, control_rx) = mpsc::channel(capacity); Arc::new(Self { inbound_tx, outbound_tx, inbound_rx: Mutex::new(inbound_rx), outbound_rx: Mutex::new(outbound_rx), + control_tx, + control_rx: Mutex::new(control_rx), }) } @@ -73,6 +79,25 @@ impl MessageBus { .await .expect("bus outbound closed") } + + /// Publish a control message (Channel -> Bus for session management) + pub async fn publish_control(&self, msg: ControlMessage) -> Result<(), BusError> { + tracing::debug!(op = ?msg.op, "Bus: publishing control message"); + self.control_tx + .send(msg) + .await + .map_err(|_| BusError::Closed) + } + + /// Consume a control message (ControlProcessor -> Bus) + pub async fn consume_control(&self) -> ControlMessage { + self.control_rx + .lock() + .await + .recv() + .await + .expect("bus control closed") + } } // ============================================================================ diff --git a/src/channels/cli_chat.rs b/src/channels/cli_chat.rs index 275c30d..21a33bd 100644 --- a/src/channels/cli_chat.rs +++ b/src/channels/cli_chat.rs @@ -3,9 +3,8 @@ use async_trait::async_trait; use tokio::sync::{mpsc, Mutex}; use uuid::Uuid; -use crate::bus::{InboundMessage, MessageBus, OutboundMessage}; -use crate::protocol::{parse_inbound, SessionSummary, WsInbound, WsOutbound}; -use crate::storage::{SessionStore, SessionRecord}; +use crate::bus::{ControlInbound, ControlMessage, ControlOutbound, InboundMessage, MessageBus, OutboundMessage}; +use crate::protocol::{parse_inbound, WsInbound, WsOutbound}; use super::base::{Channel, ChannelError}; @@ -24,15 +23,13 @@ pub(crate) struct Client { pub struct CliChatChannel { bus: std::sync::Mutex>>, - session_store: Arc, clients: Mutex>>, } impl CliChatChannel { - pub fn new(session_store: Arc) -> Self { + pub fn new() -> Self { Self { bus: std::sync::Mutex::new(None), - session_store, clients: Mutex::new(Vec::new()), } } @@ -45,9 +42,9 @@ impl CliChatChannel { }); self.clients.lock().await.push(client.clone()); - // Create initial session - let session_id = match self.create_session(None) { - Ok(record) => record.id, + // Create initial session via control message + let session_id = match self.create_session_via_control(None).await { + Ok(id) => id, Err(e) => { tracing::error!(error = %e, "Failed to create initial session"); Uuid::new_v4().to_string() @@ -106,18 +103,15 @@ impl CliChatChannel { WsInbound::UserInput { content, chat_id, .. } => { let chat_id = chat_id.or(current_session_guard.clone()).unwrap_or_else(|| Uuid::new_v4().to_string()); - // If no session, create one - let session_id = if current_session_guard.is_none() { - let record = self.create_session(None)?; - record.id - } else { - chat_id.clone() - }; + // If no session, create one first + if current_session_guard.is_none() { + let new_id = self.create_session_via_control(None).await?; + *current_session_guard = Some(new_id); + } - // Update client's current session - *current_session_guard = Some(session_id.clone()); + let session_id = current_session_guard.clone().unwrap(); - // Publish to bus + // Publish to bus for AI processing let msg = InboundMessage { channel: self.name().to_string(), sender_id: "cli".to_string(), @@ -135,111 +129,206 @@ impl CliChatChannel { .or(chat_id) .or(current_session_guard.clone()) .ok_or_else(|| ChannelError::Other("No active session".to_string()))?; - self.session_store - .clear_messages(&target) - .map_err(|e| ChannelError::Other(e.to_string()))?; - let _ = client - .sender - .send(WsOutbound::HistoryCleared { session_id: target }) - .await; + + let (reply_tx, mut reply_rx) = mpsc::channel(1); + bus.publish_control(ControlMessage { + op: ControlInbound::ClearHistory { session_id: target.clone() }, + reply_tx, + }).await?; + + match reply_rx.recv().await { + Some(Ok(ControlOutbound::HistoryCleared { .. })) => { + let _ = client + .sender + .send(WsOutbound::HistoryCleared { session_id: target }) + .await; + } + Some(Ok(_)) => { + // Unexpected response type, ignore + } + Some(Err(e)) => { + return Err(e); + } + None => { + return Err(ChannelError::Other("Control channel closed".to_string())); + } + } } WsInbound::CreateSession { title } => { - let record = self.create_session(title.as_deref())?; - *current_session_guard = Some(record.id.clone()); + let new_id = self.create_session_via_control(title.as_deref()).await?; + *current_session_guard = Some(new_id.clone()); let _ = client .sender .send(WsOutbound::SessionCreated { - session_id: record.id, - title: record.title, + session_id: new_id, + title: title.unwrap_or_default(), }) .await; } WsInbound::ListSessions { include_archived } => { - let records = self.session_store - .list_sessions("cli_chat", include_archived) - .map_err(|e| ChannelError::Other(e.to_string()))?; - let summaries: Vec<_> = records.into_iter().map(to_session_summary).collect(); - let _ = client - .sender - .send(WsOutbound::SessionList { - sessions: summaries, - current_session_id: current_session_guard.clone(), - }) - .await; + let (reply_tx, mut reply_rx) = mpsc::channel(1); + bus.publish_control(ControlMessage { + op: ControlInbound::ListSessions { include_archived }, + reply_tx, + }).await?; + + match reply_rx.recv().await { + Some(Ok(ControlOutbound::SessionList { sessions })) => { + let _ = client + .sender + .send(WsOutbound::SessionList { + sessions, + current_session_id: current_session_guard.clone(), + }) + .await; + } + Some(Ok(_)) => { + // Unexpected response type + } + Some(Err(e)) => { + return Err(e); + } + None => { + return Err(ChannelError::Other("Control channel closed".to_string())); + } + } } WsInbound::LoadSession { session_id } => { - let Some(record) = self.session_store - .get_session(&session_id) - .map_err(|e| ChannelError::Other(e.to_string()))? - else { - let _ = client - .sender - .send(WsOutbound::Error { - code: "SESSION_NOT_FOUND".to_string(), - message: format!("Session not found: {}", session_id), - }) - .await; - return Ok(()); - }; - *current_session_guard = Some(record.id.clone()); - let _ = client - .sender - .send(WsOutbound::SessionLoaded { - session_id: record.id, - title: record.title, - message_count: record.message_count, - }) - .await; + let (reply_tx, mut reply_rx) = mpsc::channel(1); + bus.publish_control(ControlMessage { + op: ControlInbound::LoadSession { session_id: session_id.clone() }, + reply_tx, + }).await?; + + match reply_rx.recv().await { + Some(Ok(ControlOutbound::SessionLoaded { session_id, title, message_count })) => { + *current_session_guard = Some(session_id.clone()); + let _ = client + .sender + .send(WsOutbound::SessionLoaded { + session_id, + title, + message_count, + }) + .await; + } + Some(Ok(_)) => { + // Unexpected response type + } + Some(Err(e)) => { + let _ = client + .sender + .send(WsOutbound::Error { + code: "SESSION_NOT_FOUND".to_string(), + message: format!("Session not found: {}", session_id), + }) + .await; + } + None => { + return Err(ChannelError::Other("Control channel closed".to_string())); + } + } } WsInbound::RenameSession { session_id, title } => { let target = session_id.or(current_session_guard.clone()).ok_or_else(|| { ChannelError::Other("No active session".to_string()) })?; - self.session_store - .rename_session(&target, &title) - .map_err(|e| ChannelError::Other(e.to_string()))?; - let _ = client - .sender - .send(WsOutbound::SessionRenamed { - session_id: target, - title, - }) - .await; + + let (reply_tx, mut reply_rx) = mpsc::channel(1); + bus.publish_control(ControlMessage { + op: ControlInbound::RenameSession { session_id: target.clone(), title: title.clone() }, + reply_tx, + }).await?; + + match reply_rx.recv().await { + Some(Ok(ControlOutbound::SessionRenamed { session_id, title })) => { + let _ = client + .sender + .send(WsOutbound::SessionRenamed { session_id, title }) + .await; + } + Some(Ok(_)) => { + // Unexpected response type + } + Some(Err(e)) => { + return Err(e); + } + None => { + return Err(ChannelError::Other("Control channel closed".to_string())); + } + } } WsInbound::ArchiveSession { session_id } => { let target = session_id.or(current_session_guard.clone()).ok_or_else(|| { ChannelError::Other("No active session".to_string()) })?; - self.session_store - .archive_session(&target) - .map_err(|e| ChannelError::Other(e.to_string()))?; - let _ = client - .sender - .send(WsOutbound::SessionArchived { session_id: target }) - .await; + + let (reply_tx, mut reply_rx) = mpsc::channel(1); + bus.publish_control(ControlMessage { + op: ControlInbound::ArchiveSession { session_id: target.clone() }, + reply_tx, + }).await?; + + match reply_rx.recv().await { + Some(Ok(ControlOutbound::SessionArchived { session_id })) => { + let _ = client + .sender + .send(WsOutbound::SessionArchived { session_id }) + .await; + } + Some(Ok(_)) => { + // Unexpected response type + } + Some(Err(e)) => { + return Err(e); + } + None => { + return Err(ChannelError::Other("Control channel closed".to_string())); + } + } } WsInbound::DeleteSession { session_id } => { let target = session_id.or(current_session_guard.clone()).ok_or_else(|| { ChannelError::Other("No active session".to_string()) })?; - self.session_store - .delete_session(&target) - .map_err(|e| ChannelError::Other(e.to_string()))?; - let _ = client - .sender - .send(WsOutbound::SessionDeleted { session_id: target.clone() }) - .await; - // If deleting current session, create a new one - if current_session_guard.as_deref() == Some(&target) { - let new_record = self.create_session(None)?; - *current_session_guard = Some(new_record.id.clone()); - let _ = client - .sender - .send(WsOutbound::SessionCreated { - session_id: new_record.id, - title: new_record.title, - }) - .await; + + let (reply_tx, mut reply_rx) = mpsc::channel(1); + bus.publish_control(ControlMessage { + op: ControlInbound::DeleteSession { session_id: target.clone() }, + reply_tx, + }).await?; + + match reply_rx.recv().await { + Some(Ok(ControlOutbound::SessionDeleted { session_id })) => { + let _ = client + .sender + .send(WsOutbound::SessionDeleted { session_id: session_id.clone() }) + .await; + + // If deleting current session, create a new one + if current_session_guard.as_deref() == Some(&target) { + drop(reply_rx); + if let Ok(new_id) = self.create_session_via_control(None).await { + *current_session_guard = Some(new_id.clone()); + let _ = client + .sender + .send(WsOutbound::SessionCreated { + session_id: new_id, + title: String::new(), + }) + .await; + } + } + } + Some(Ok(_)) => { + // Unexpected response type + } + Some(Err(e)) => { + return Err(e); + } + None => { + return Err(ChannelError::Other("Control channel closed".to_string())); + } } } WsInbound::Ping => { @@ -249,10 +338,29 @@ impl CliChatChannel { Ok(()) } - fn create_session(&self, title: Option<&str>) -> Result { - self.session_store - .create_cli_session(title) - .map_err(|e| ChannelError::Other(e.to_string())) + /// Create a session via control message and return the session_id + async fn create_session_via_control(&self, title: Option<&str>) -> Result { + let bus = { + let guard = self.bus.lock().unwrap(); + guard.clone().ok_or_else(|| ChannelError::Other("Channel not started".to_string()))? + }; + + let (reply_tx, mut reply_rx) = mpsc::channel(1); + bus.publish_control(ControlMessage { + op: ControlInbound::CreateSession { title: title.map(String::from) }, + reply_tx, + }).await?; + + match reply_rx.recv().await { + Some(Ok(ControlOutbound::SessionCreated { session_id, .. })) => { + Ok(session_id) + } + Some(Ok(_)) => { + Err(ChannelError::Other("Unexpected response type".to_string())) + } + Some(Err(e)) => Err(e), + None => Err(ChannelError::Other("Control channel closed".to_string())), + } } } @@ -290,18 +398,6 @@ impl Channel for CliChatChannel { } } -fn to_session_summary(record: SessionRecord) -> SessionSummary { - SessionSummary { - session_id: record.id, - title: record.title, - channel_name: record.channel_name, - chat_id: record.chat_id, - message_count: record.message_count, - last_active_at: record.last_active_at, - archived_at: record.archived_at, - } -} - fn current_timestamp() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) diff --git a/src/channels/manager.rs b/src/channels/manager.rs index 568639e..cf6a547 100644 --- a/src/channels/manager.rs +++ b/src/channels/manager.rs @@ -11,13 +11,15 @@ use crate::config::Config; #[derive(Clone)] pub struct ChannelManager { channels: Arc>>>, + cli_chat_channel: Arc, bus: Arc, } impl ChannelManager { - pub fn new() -> Self { + pub fn new(cli_chat_channel: Arc) -> Self { Self { channels: Arc::new(RwLock::new(HashMap::new())), + cli_chat_channel, bus: MessageBus::new(100), } } @@ -27,8 +29,22 @@ impl ChannelManager { self.bus.clone() } + /// Register a channel with the manager + pub async fn register_channel(&self, name: &str, channel: Arc) { + self.channels.write().await.insert(name.to_string(), channel); + } + + /// Get CLI chat channel + pub fn cli_chat_channel(&self) -> Arc { + self.cli_chat_channel.clone() + } + /// Initialize all Channel instances from config - pub async fn init(&self, config: &Config, _provider_config: crate::config::LLMProviderConfig) -> Result<(), ChannelError> { + pub async fn init( + &self, + config: &Config, + _provider_config: crate::config::LLMProviderConfig, + ) -> Result<(), ChannelError> { // Initialize Feishu channel if enabled if let Some(feishu_config) = config.channels.get("feishu") { if feishu_config.enabled { @@ -44,6 +60,14 @@ impl ChannelManager { tracing::info!("Feishu channel disabled in config"); } } + + // Register CLI chat channel + self.channels + .write() + .await + .insert("cli_chat".to_string(), self.cli_chat_channel.clone()); + tracing::info!("CLI chat channel registered"); + Ok(()) } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 8df4a5a..a1c03fc 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -6,18 +6,18 @@ use std::sync::Arc; use axum::{routing, Router}; use tokio::net::TcpListener; -use crate::bus::{MessageBus, OutboundDispatcher}; -use crate::channels::{Channel, ChannelManager, CliChatChannel}; +use crate::bus::{ControlInbound, ControlMessage, ControlOutbound, OutboundDispatcher}; +use crate::channels::{ChannelManager, CliChatChannel}; +use crate::channels::base::{Channel, ChannelError}; use crate::config::Config; use crate::logging; +use crate::protocol::SessionSummary; use session::SessionManager; pub struct GatewayState { pub config: Config, pub session_manager: SessionManager, pub channel_manager: ChannelManager, - pub bus: Arc, - pub cli_chat_channel: Arc, } impl GatewayState { @@ -31,104 +31,169 @@ impl GatewayState { let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4); let session_manager = SessionManager::new(session_ttl_hours, provider_config.clone())?; - let channel_manager = ChannelManager::new(); - let bus = channel_manager.bus(); - // Create CLI Chat Channel - let cli_chat_channel = Arc::new(CliChatChannel::new( - session_manager.store().clone(), - )); + // Create CLI Chat Channel first (needed for ChannelManager) + let cli_chat_channel = Arc::new(CliChatChannel::new()); + let channel_manager = ChannelManager::new(cli_chat_channel); Ok(Self { config, session_manager, channel_manager, - bus, - cli_chat_channel, }) } + /// Get a reference to the MessageBus + pub fn bus(&self) -> Arc { + self.channel_manager.bus() + } + + /// Get CLI chat channel for WebSocket handling + pub fn cli_chat_channel(&self) -> Arc { + self.channel_manager.cli_chat_channel() + } + /// Start the message processing loops pub async fn start_message_processing(&self) { - let bus_for_inbound = self.bus.clone(); - let bus_for_outbound = self.bus.clone(); + let bus = self.bus(); + let bus_for_outbound = bus.clone(); let session_manager = self.session_manager.clone(); - // Start CLI Chat Channel - if let Err(e) = self.cli_chat_channel.start(self.bus.clone()).await { + // Start CLI Chat Channel (it's already registered in ChannelManager) + let cli_chat_channel = self.cli_chat_channel(); + if let Err(e) = cli_chat_channel.start(bus.clone()).await { tracing::error!(error = %e, "Failed to start CLI chat channel"); } - // Spawn inbound message processor - // This consumes from bus.inbound, processes via SessionManager, publishes to bus.outbound + // Spawn unified message processor + // This handles both inbound AI messages and control messages in one loop tokio::spawn(async move { - tracing::info!("Inbound processor started"); - loop { - let inbound = bus_for_inbound.consume_inbound().await; - #[cfg(debug_assertions)] - { - tracing::debug!( - channel = %inbound.channel, - chat_id = %inbound.chat_id, - sender = %inbound.sender_id, - content = %inbound.content, - media_count = %inbound.media.len(), - "Processing inbound message" - ); - if !inbound.media.is_empty() { - for (i, m) in inbound.media.iter().enumerate() { - tracing::debug!(media_index = i, media_type = %m.media_type, path = %m.path, "Media item"); - } - } - } + tracing::info!("Message processor started"); - // Process via session manager - match session_manager.handle_message( - &inbound.channel, - &inbound.sender_id, - &inbound.chat_id, - &inbound.content, - inbound.media, - ).await { - Ok(response_content) => { - // Forward channel-specific metadata from inbound to outbound. - // This allows channels to propagate context (e.g. feishu message_id for reaction cleanup) - // without gateway needing channel-specific code. - let outbound = crate::bus::OutboundMessage { - channel: inbound.channel.clone(), - chat_id: inbound.chat_id.clone(), - content: response_content, - reply_to: None, - media: vec![], - metadata: inbound.forwarded_metadata, - }; - if let Err(e) = bus_for_inbound.publish_outbound(outbound).await { - tracing::error!(error = %e, "Failed to publish outbound"); + loop { + tokio::select! { + // Inbound: AI message flow + inbound = bus.consume_inbound() => { + #[cfg(debug_assertions)] + { + tracing::debug!( + channel = %inbound.channel, + chat_id = %inbound.chat_id, + sender = %inbound.sender_id, + content = %inbound.content, + media_count = %inbound.media.len(), + "Processing inbound message" + ); + if !inbound.media.is_empty() { + for (i, m) in inbound.media.iter().enumerate() { + tracing::debug!(media_index = i, media_type = %m.media_type, path = %m.path, "Media item"); + } + } + } + + match session_manager.handle_message( + &inbound.channel, + &inbound.sender_id, + &inbound.chat_id, + &inbound.content, + inbound.media, + ).await { + Ok(response_content) => { + let outbound = crate::bus::OutboundMessage { + channel: inbound.channel.clone(), + chat_id: inbound.chat_id.clone(), + content: response_content, + reply_to: None, + media: vec![], + metadata: inbound.forwarded_metadata, + }; + if let Err(e) = bus.publish_outbound(outbound).await { + tracing::error!(error = %e, "Failed to publish outbound"); + } + } + Err(e) => { + tracing::error!(error = %e, "Failed to handle message"); + } } } - Err(e) => { - tracing::error!(error = %e, "Failed to handle message"); + + // Control: session management operations + msg = bus.consume_control() => { + Self::handle_control_message(&session_manager, msg).await; } } } }); // Spawn outbound dispatcher - let dispatcher = OutboundDispatcher::new(bus_for_outbound); - let channel_manager = self.channel_manager.clone(); - let cli_chat_channel = self.cli_chat_channel.clone(); - - // Register channels with dispatcher - if let Some(channel) = channel_manager.get_channel("feishu").await { - dispatcher.register_channel("feishu", channel).await; - } - dispatcher.register_channel("cli_chat", cli_chat_channel).await; + let dispatcher = OutboundDispatcher::new(bus_for_outbound, self.channel_manager.clone()); tokio::spawn(async move { tracing::info!("Outbound dispatcher started"); dispatcher.run().await; }); } + + /// Handle control messages (session management operations) + async fn handle_control_message( + session_manager: &SessionManager, + msg: ControlMessage, + ) { + let reply_tx = msg.reply_tx; + let result = match msg.op { + ControlInbound::CreateSession { title } => { + session_manager.create_cli_session(title.as_deref()) + .map(|record| ControlOutbound::SessionCreated { + session_id: record.id, + title: record.title, + }) + } + ControlInbound::ListSessions { include_archived } => { + session_manager.list_cli_sessions(include_archived) + .map(|records| ControlOutbound::SessionList { + sessions: records.into_iter().map(|r| SessionSummary { + session_id: r.id, + title: r.title, + channel_name: r.channel_name, + chat_id: r.chat_id, + message_count: r.message_count, + last_active_at: r.last_active_at, + archived_at: r.archived_at, + }).collect() + }) + } + ControlInbound::LoadSession { session_id } => { + session_manager.get_session_record(&session_id) + .map(|opt| opt.map(|r| ControlOutbound::SessionLoaded { + session_id: r.id, + title: r.title, + message_count: r.message_count, + }).unwrap_or_else(|| ControlOutbound::Error { + code: "SESSION_NOT_FOUND".to_string(), + message: format!("Session not found: {}", session_id), + })) + } + ControlInbound::RenameSession { session_id, title } => { + session_manager.rename_session(&session_id, &title) + .map(|()| ControlOutbound::SessionRenamed { session_id, title }) + } + ControlInbound::ArchiveSession { session_id } => { + session_manager.archive_session(&session_id) + .map(|()| ControlOutbound::SessionArchived { session_id }) + } + ControlInbound::DeleteSession { session_id } => { + session_manager.delete_session(&session_id) + .map(|()| ControlOutbound::SessionDeleted { session_id }) + } + ControlInbound::ClearHistory { session_id } => { + session_manager.clear_session_messages(&session_id) + .map(|()| ControlOutbound::HistoryCleared { session_id }) + } + }; + + let result = result.map_err(|e| ChannelError::Other(e.to_string())); + let _ = reply_tx.send(result).await; + } } pub async fn run(host: Option, port: Option) -> Result<(), Box> { @@ -142,10 +207,13 @@ pub async fn run(host: Option, port: Option) -> Result<(), Box) { // Create channel for sending outbound messages to this client let (sender, mut receiver) = mpsc::channel::(100); + // Get CLI chat channel + let cli_chat_channel = state.cli_chat_channel(); + // Register client with CliChatChannel and get initial session id - let (session_id, client) = state.cli_chat_channel.register_client(sender.clone()).await; + let (session_id, client) = cli_chat_channel.register_client(sender.clone()).await; // Send session established message let _ = sender.send(WsOutbound::SessionEstablished { @@ -45,7 +48,7 @@ async fn handle_socket(ws: WebSocket, state: Arc) { while let Some(msg) = ws_receiver.next().await { match msg { Ok(WsMessage::Text(text)) => { - state.cli_chat_channel.handle_inbound(client.clone(), &text).await; + cli_chat_channel.handle_inbound(client.clone(), &text).await; } Ok(WsMessage::Close(_)) | Err(_) => { tracing::debug!(session_id = %session_id, "WebSocket closed");