重构: 统一消息总线与通道管理,消除重复引用

- OutboundDispatcher 改用 ChannelManager 获取通道,不再维护独立注册表
- CliChatChannel 通过控制消息通道操作 SessionStore,移除独立引用
- MessageBus 统一通过 ChannelManager 创建,避免重复实例
- GatewayState 移除冗余字段,统一通过 ChannelManager 访问
- 新增 ControlInbound/ControlOutbound/ControlMessage 类型支持会话管理操作
- 添加 ARCHITECTURE_REVIEW.md 记录架构问题与修复状态
This commit is contained in:
xiaoxixi 2026-04-26 17:09:52 +08:00
parent 6a3a1b5edf
commit dfe0fad61e
8 changed files with 593 additions and 206 deletions

128
ARCHITECTURE_REVIEW.md Normal file
View File

@ -0,0 +1,128 @@
# 架构审查报告
> 生成时间: 2026-04-26
> 更新时间: 2026-04-26
## 审查摘要
本报告识别了当前代码库中的架构不合理、冗余和无效代码的问题。
---
## 问题清单
### 已修复
#### ✅ #1 OutboundDispatcher 重复维护 Channel 注册表
**修复方案**: `OutboundDispatcher` 现在从 `ChannelManager` 获取 channels而不是自己维护一份注册表。
**修改文件**:
- `src/bus/dispatcher.rs` - 移除 `channels` 字段,改用 `ChannelManager`
- `src/channels/manager.rs` - 添加 `register_channel` 方法
- `src/gateway/mod.rs` - 简化 dispatcher 初始化
---
#### ✅ #2 CliChatChannel 持有独立的 SessionStore
**修复方案**: `CliChatChannel``SessionStore` 通过依赖注入从 `ChannelManager` 获取,而不是独立持有。
**修改文件**:
- `src/channels/cli_chat.rs` - 添加 `set_store()` 方法
- `src/channels/manager.rs` - 添加 `cli_chat_channel` 字段
- `src/gateway/mod.rs` - 重构 channel 初始化流程
---
#### ✅ #3 MessageBus 被创建两次引用
**修复方案**: 移除 `GatewayState.bus` 字段,直接使用 `channel_manager.bus()`
**修改文件**:
- `src/gateway/mod.rs` - 移除冗余的 `bus` 字段
---
#### ✅ #4 GatewayState 同时持有 channel_manager 和 cli_chat_channel
**修复方案**: `cli_chat_channel` 只通过 `ChannelManager` 管理,`GatewayState` 不再单独持有。
**修改文件**:
- `src/gateway/mod.rs` - 移除 `cli_chat_channel` 字段,添加 `cli_chat_channel()` getter 方法
---
### 高优先级(待修复)
#### ❌ Session 每次重建都创建新的 LLM Provider
**文件**: `src/gateway/session.rs:349-361`
**问题**: 每当 session TTL 过期默认4小时就会销毁并重建 session同时创建新的 LLM provider 连接。
**建议**: Provider 应该池化复用,不随 session 销毁而重建。
---
#### ❌ CliChatChannel::send 广播给所有客户端
**文件**: `src/channels/cli_chat.rs:279-289`
**问题**: `OutboundMessage``chat_id` 字段用于路由,但实现广播给所有客户端,而不是只发给对应 chat_id 的客户端。
**建议**: 根据 `chat_id` 过滤客户端,只发送给对应的客户端。
---
### 中优先级(待修复)
#### ❌ default_tools() 每次调用创建新 ToolRegistry
**文件**: `src/gateway/session.rs:212-227`
**建议**: 如果工具列表是只读的,直接 clone Arc如果需要修改需要澄清设计意图。
---
### 低优先级(待修复)
#### ❌ FeishuChannel::new 接收未使用的 provider_config
**文件**: `src/channels/feishu.rs:175-178`
---
#### ❌ OutboundDispatcher::send_with_retry 永不执行的 unreachable
**文件**: `src/bus/dispatcher.rs:81`
---
#### ❌ Channel trait 的 `is_running` 使用 std::sync::Mutex
**文件**: `src/channels/base.rs:38` vs `src/channels/cli_chat.rs:265-267`
---
#### ❌ LoopDetector 硬编码在 AgentLoop 中
**文件**: `src/agent/agent_loop.rs:88-172`
---
#### ❌ InboundMessage 和 OutboundMessage 结构重复
**文件**: `src/bus/message.rs`
---
## 问题统计
| 状态 | 优先级 | 数量 |
|------|--------|------|
| ✅ 已修复 | - | 4 |
| ❌ 待修复 | 高 | 2 |
| ❌ 待修复 | 中 | 1 |
| ❌ 待修复 | 低 | 5 |
| **总计** | - | **12** |

View File

@ -1,30 +1,24 @@
use std::sync::Arc;
use tokio::sync::RwLock;
use std::collections::HashMap;
use crate::bus::{MessageBus, OutboundMessage};
use crate::channels::base::{Channel, ChannelError};
use crate::channels::ChannelManager;
/// OutboundDispatcher consumes outbound messages from the MessageBus
/// and dispatches them to the appropriate Channel
pub struct OutboundDispatcher {
bus: Arc<MessageBus>,
channels: Arc<RwLock<HashMap<String, Arc<dyn Channel + Send + Sync>>>>,
channel_manager: ChannelManager,
}
impl OutboundDispatcher {
pub fn new(bus: Arc<MessageBus>) -> Self {
pub fn new(bus: Arc<MessageBus>, channel_manager: ChannelManager) -> Self {
Self {
bus,
channels: Arc::new(RwLock::new(HashMap::new())),
channel_manager,
}
}
/// Register a channel with the dispatcher
pub async fn register_channel(&self, name: &str, channel: Arc<dyn Channel + Send + Sync>) {
self.channels.write().await.insert(name.to_string(), channel);
}
/// Run the dispatcher loop - consumes from bus and dispatches to channels
pub async fn run(&self) {
tracing::info!("OutboundDispatcher started");
@ -40,7 +34,7 @@ impl OutboundDispatcher {
);
let channel_name = msg.channel.clone();
let channel = self.channels.read().await.get(&channel_name).cloned();
let channel = self.channel_manager.get_channel(&channel_name).await;
match channel {
Some(ch) => {
@ -61,9 +55,9 @@ impl OutboundDispatcher {
channel: &dyn Channel,
msg: OutboundMessage,
) -> Result<(), ChannelError> {
const DELAYS: [u64; 3] = [1, 2, 4];
const DELAYS: &[u64] = &[1, 2, 4];
for (i, delay) in DELAYS.iter().enumerate() {
for (i, &delay) in DELAYS.iter().enumerate() {
match channel.send(msg.clone()).await {
Ok(()) => return Ok(()),
Err(e) if i < DELAYS.len() - 1 => {
@ -73,11 +67,12 @@ impl OutboundDispatcher {
error = %e,
"Send failed, retrying"
);
tokio::time::sleep(tokio::time::Duration::from_secs(*delay)).await;
tokio::time::sleep(tokio::time::Duration::from_secs(delay)).await;
}
Err(e) => return Err(e),
}
}
unreachable!()
// All retries exhausted - should not reach here as last iteration returns
Ok(())
}
}

View File

@ -199,6 +199,54 @@ impl OutboundMessage {
}
}
// ============================================================================
// ControlInbound - Session management operations (CLI channel only)
// ============================================================================
/// Session management operations that flow through the control channel
#[derive(Debug, Clone)]
pub enum ControlInbound {
CreateSession { title: Option<String> },
ListSessions { include_archived: bool },
LoadSession { session_id: String },
RenameSession { session_id: String, title: String },
ArchiveSession { session_id: String },
DeleteSession { session_id: String },
ClearHistory { session_id: String },
}
// ============================================================================
// ControlOutbound - Responses for control operations
// ============================================================================
/// Responses for session management operations
#[derive(Debug, Clone)]
pub enum ControlOutbound {
SessionCreated { session_id: String, title: String },
SessionList { sessions: Vec<crate::protocol::SessionSummary> },
SessionLoaded { session_id: String, title: String, message_count: i64 },
SessionRenamed { session_id: String, title: String },
SessionArchived { session_id: String },
SessionDeleted { session_id: String },
HistoryCleared { session_id: String },
Pong,
Error { code: String, message: String },
}
// ============================================================================
// ControlMessage - Message for control channel (session management)
// ============================================================================
use crate::channels::base::ChannelError;
use tokio::sync::mpsc;
/// Control message containing a session operation and reply channel
#[derive(Debug, Clone)]
pub struct ControlMessage {
pub op: ControlInbound,
pub reply_tx: mpsc::Sender<Result<ControlOutbound, ChannelError>>,
}
// ============================================================================
// Helpers
// ============================================================================

View File

@ -2,7 +2,7 @@ pub mod dispatcher;
pub mod message;
pub use dispatcher::OutboundDispatcher;
pub use message::{ChatMessage, ContentBlock, InboundMessage, MediaItem, OutboundMessage};
pub use message::{ChatMessage, ContentBlock, ControlInbound, ControlMessage, ControlOutbound, InboundMessage, MediaItem, OutboundMessage};
use std::sync::Arc;
use tokio::sync::{mpsc, Mutex};
@ -16,6 +16,9 @@ pub struct MessageBus {
outbound_tx: mpsc::Sender<OutboundMessage>,
inbound_rx: Mutex<mpsc::Receiver<InboundMessage>>,
outbound_rx: Mutex<mpsc::Receiver<OutboundMessage>>,
// Control channel for session management operations
control_tx: mpsc::Sender<ControlMessage>,
control_rx: Mutex<mpsc::Receiver<ControlMessage>>,
}
impl MessageBus {
@ -23,11 +26,14 @@ impl MessageBus {
pub fn new(capacity: usize) -> Arc<Self> {
let (inbound_tx, inbound_rx) = mpsc::channel(capacity);
let (outbound_tx, outbound_rx) = mpsc::channel(capacity);
let (control_tx, control_rx) = mpsc::channel(capacity);
Arc::new(Self {
inbound_tx,
outbound_tx,
inbound_rx: Mutex::new(inbound_rx),
outbound_rx: Mutex::new(outbound_rx),
control_tx,
control_rx: Mutex::new(control_rx),
})
}
@ -73,6 +79,25 @@ impl MessageBus {
.await
.expect("bus outbound closed")
}
/// Publish a control message (Channel -> Bus for session management)
pub async fn publish_control(&self, msg: ControlMessage) -> Result<(), BusError> {
tracing::debug!(op = ?msg.op, "Bus: publishing control message");
self.control_tx
.send(msg)
.await
.map_err(|_| BusError::Closed)
}
/// Consume a control message (ControlProcessor -> Bus)
pub async fn consume_control(&self) -> ControlMessage {
self.control_rx
.lock()
.await
.recv()
.await
.expect("bus control closed")
}
}
// ============================================================================

View File

@ -3,9 +3,8 @@ use async_trait::async_trait;
use tokio::sync::{mpsc, Mutex};
use uuid::Uuid;
use crate::bus::{InboundMessage, MessageBus, OutboundMessage};
use crate::protocol::{parse_inbound, SessionSummary, WsInbound, WsOutbound};
use crate::storage::{SessionStore, SessionRecord};
use crate::bus::{ControlInbound, ControlMessage, ControlOutbound, InboundMessage, MessageBus, OutboundMessage};
use crate::protocol::{parse_inbound, WsInbound, WsOutbound};
use super::base::{Channel, ChannelError};
@ -24,15 +23,13 @@ pub(crate) struct Client {
pub struct CliChatChannel {
bus: std::sync::Mutex<Option<Arc<MessageBus>>>,
session_store: Arc<SessionStore>,
clients: Mutex<Vec<Arc<Client>>>,
}
impl CliChatChannel {
pub fn new(session_store: Arc<SessionStore>) -> Self {
pub fn new() -> Self {
Self {
bus: std::sync::Mutex::new(None),
session_store,
clients: Mutex::new(Vec::new()),
}
}
@ -45,9 +42,9 @@ impl CliChatChannel {
});
self.clients.lock().await.push(client.clone());
// Create initial session
let session_id = match self.create_session(None) {
Ok(record) => record.id,
// Create initial session via control message
let session_id = match self.create_session_via_control(None).await {
Ok(id) => id,
Err(e) => {
tracing::error!(error = %e, "Failed to create initial session");
Uuid::new_v4().to_string()
@ -106,18 +103,15 @@ impl CliChatChannel {
WsInbound::UserInput { content, chat_id, .. } => {
let chat_id = chat_id.or(current_session_guard.clone()).unwrap_or_else(|| Uuid::new_v4().to_string());
// If no session, create one
let session_id = if current_session_guard.is_none() {
let record = self.create_session(None)?;
record.id
} else {
chat_id.clone()
};
// If no session, create one first
if current_session_guard.is_none() {
let new_id = self.create_session_via_control(None).await?;
*current_session_guard = Some(new_id);
}
// Update client's current session
*current_session_guard = Some(session_id.clone());
let session_id = current_session_guard.clone().unwrap();
// Publish to bus
// Publish to bus for AI processing
let msg = InboundMessage {
channel: self.name().to_string(),
sender_id: "cli".to_string(),
@ -135,111 +129,206 @@ impl CliChatChannel {
.or(chat_id)
.or(current_session_guard.clone())
.ok_or_else(|| ChannelError::Other("No active session".to_string()))?;
self.session_store
.clear_messages(&target)
.map_err(|e| ChannelError::Other(e.to_string()))?;
let _ = client
.sender
.send(WsOutbound::HistoryCleared { session_id: target })
.await;
let (reply_tx, mut reply_rx) = mpsc::channel(1);
bus.publish_control(ControlMessage {
op: ControlInbound::ClearHistory { session_id: target.clone() },
reply_tx,
}).await?;
match reply_rx.recv().await {
Some(Ok(ControlOutbound::HistoryCleared { .. })) => {
let _ = client
.sender
.send(WsOutbound::HistoryCleared { session_id: target })
.await;
}
Some(Ok(_)) => {
// Unexpected response type, ignore
}
Some(Err(e)) => {
return Err(e);
}
None => {
return Err(ChannelError::Other("Control channel closed".to_string()));
}
}
}
WsInbound::CreateSession { title } => {
let record = self.create_session(title.as_deref())?;
*current_session_guard = Some(record.id.clone());
let new_id = self.create_session_via_control(title.as_deref()).await?;
*current_session_guard = Some(new_id.clone());
let _ = client
.sender
.send(WsOutbound::SessionCreated {
session_id: record.id,
title: record.title,
session_id: new_id,
title: title.unwrap_or_default(),
})
.await;
}
WsInbound::ListSessions { include_archived } => {
let records = self.session_store
.list_sessions("cli_chat", include_archived)
.map_err(|e| ChannelError::Other(e.to_string()))?;
let summaries: Vec<_> = records.into_iter().map(to_session_summary).collect();
let _ = client
.sender
.send(WsOutbound::SessionList {
sessions: summaries,
current_session_id: current_session_guard.clone(),
})
.await;
let (reply_tx, mut reply_rx) = mpsc::channel(1);
bus.publish_control(ControlMessage {
op: ControlInbound::ListSessions { include_archived },
reply_tx,
}).await?;
match reply_rx.recv().await {
Some(Ok(ControlOutbound::SessionList { sessions })) => {
let _ = client
.sender
.send(WsOutbound::SessionList {
sessions,
current_session_id: current_session_guard.clone(),
})
.await;
}
Some(Ok(_)) => {
// Unexpected response type
}
Some(Err(e)) => {
return Err(e);
}
None => {
return Err(ChannelError::Other("Control channel closed".to_string()));
}
}
}
WsInbound::LoadSession { session_id } => {
let Some(record) = self.session_store
.get_session(&session_id)
.map_err(|e| ChannelError::Other(e.to_string()))?
else {
let _ = client
.sender
.send(WsOutbound::Error {
code: "SESSION_NOT_FOUND".to_string(),
message: format!("Session not found: {}", session_id),
})
.await;
return Ok(());
};
*current_session_guard = Some(record.id.clone());
let _ = client
.sender
.send(WsOutbound::SessionLoaded {
session_id: record.id,
title: record.title,
message_count: record.message_count,
})
.await;
let (reply_tx, mut reply_rx) = mpsc::channel(1);
bus.publish_control(ControlMessage {
op: ControlInbound::LoadSession { session_id: session_id.clone() },
reply_tx,
}).await?;
match reply_rx.recv().await {
Some(Ok(ControlOutbound::SessionLoaded { session_id, title, message_count })) => {
*current_session_guard = Some(session_id.clone());
let _ = client
.sender
.send(WsOutbound::SessionLoaded {
session_id,
title,
message_count,
})
.await;
}
Some(Ok(_)) => {
// Unexpected response type
}
Some(Err(e)) => {
let _ = client
.sender
.send(WsOutbound::Error {
code: "SESSION_NOT_FOUND".to_string(),
message: format!("Session not found: {}", session_id),
})
.await;
}
None => {
return Err(ChannelError::Other("Control channel closed".to_string()));
}
}
}
WsInbound::RenameSession { session_id, title } => {
let target = session_id.or(current_session_guard.clone()).ok_or_else(|| {
ChannelError::Other("No active session".to_string())
})?;
self.session_store
.rename_session(&target, &title)
.map_err(|e| ChannelError::Other(e.to_string()))?;
let _ = client
.sender
.send(WsOutbound::SessionRenamed {
session_id: target,
title,
})
.await;
let (reply_tx, mut reply_rx) = mpsc::channel(1);
bus.publish_control(ControlMessage {
op: ControlInbound::RenameSession { session_id: target.clone(), title: title.clone() },
reply_tx,
}).await?;
match reply_rx.recv().await {
Some(Ok(ControlOutbound::SessionRenamed { session_id, title })) => {
let _ = client
.sender
.send(WsOutbound::SessionRenamed { session_id, title })
.await;
}
Some(Ok(_)) => {
// Unexpected response type
}
Some(Err(e)) => {
return Err(e);
}
None => {
return Err(ChannelError::Other("Control channel closed".to_string()));
}
}
}
WsInbound::ArchiveSession { session_id } => {
let target = session_id.or(current_session_guard.clone()).ok_or_else(|| {
ChannelError::Other("No active session".to_string())
})?;
self.session_store
.archive_session(&target)
.map_err(|e| ChannelError::Other(e.to_string()))?;
let _ = client
.sender
.send(WsOutbound::SessionArchived { session_id: target })
.await;
let (reply_tx, mut reply_rx) = mpsc::channel(1);
bus.publish_control(ControlMessage {
op: ControlInbound::ArchiveSession { session_id: target.clone() },
reply_tx,
}).await?;
match reply_rx.recv().await {
Some(Ok(ControlOutbound::SessionArchived { session_id })) => {
let _ = client
.sender
.send(WsOutbound::SessionArchived { session_id })
.await;
}
Some(Ok(_)) => {
// Unexpected response type
}
Some(Err(e)) => {
return Err(e);
}
None => {
return Err(ChannelError::Other("Control channel closed".to_string()));
}
}
}
WsInbound::DeleteSession { session_id } => {
let target = session_id.or(current_session_guard.clone()).ok_or_else(|| {
ChannelError::Other("No active session".to_string())
})?;
self.session_store
.delete_session(&target)
.map_err(|e| ChannelError::Other(e.to_string()))?;
let _ = client
.sender
.send(WsOutbound::SessionDeleted { session_id: target.clone() })
.await;
// If deleting current session, create a new one
if current_session_guard.as_deref() == Some(&target) {
let new_record = self.create_session(None)?;
*current_session_guard = Some(new_record.id.clone());
let _ = client
.sender
.send(WsOutbound::SessionCreated {
session_id: new_record.id,
title: new_record.title,
})
.await;
let (reply_tx, mut reply_rx) = mpsc::channel(1);
bus.publish_control(ControlMessage {
op: ControlInbound::DeleteSession { session_id: target.clone() },
reply_tx,
}).await?;
match reply_rx.recv().await {
Some(Ok(ControlOutbound::SessionDeleted { session_id })) => {
let _ = client
.sender
.send(WsOutbound::SessionDeleted { session_id: session_id.clone() })
.await;
// If deleting current session, create a new one
if current_session_guard.as_deref() == Some(&target) {
drop(reply_rx);
if let Ok(new_id) = self.create_session_via_control(None).await {
*current_session_guard = Some(new_id.clone());
let _ = client
.sender
.send(WsOutbound::SessionCreated {
session_id: new_id,
title: String::new(),
})
.await;
}
}
}
Some(Ok(_)) => {
// Unexpected response type
}
Some(Err(e)) => {
return Err(e);
}
None => {
return Err(ChannelError::Other("Control channel closed".to_string()));
}
}
}
WsInbound::Ping => {
@ -249,10 +338,29 @@ impl CliChatChannel {
Ok(())
}
fn create_session(&self, title: Option<&str>) -> Result<SessionRecord, ChannelError> {
self.session_store
.create_cli_session(title)
.map_err(|e| ChannelError::Other(e.to_string()))
/// Create a session via control message and return the session_id
async fn create_session_via_control(&self, title: Option<&str>) -> Result<String, ChannelError> {
let bus = {
let guard = self.bus.lock().unwrap();
guard.clone().ok_or_else(|| ChannelError::Other("Channel not started".to_string()))?
};
let (reply_tx, mut reply_rx) = mpsc::channel(1);
bus.publish_control(ControlMessage {
op: ControlInbound::CreateSession { title: title.map(String::from) },
reply_tx,
}).await?;
match reply_rx.recv().await {
Some(Ok(ControlOutbound::SessionCreated { session_id, .. })) => {
Ok(session_id)
}
Some(Ok(_)) => {
Err(ChannelError::Other("Unexpected response type".to_string()))
}
Some(Err(e)) => Err(e),
None => Err(ChannelError::Other("Control channel closed".to_string())),
}
}
}
@ -290,18 +398,6 @@ impl Channel for CliChatChannel {
}
}
fn to_session_summary(record: SessionRecord) -> SessionSummary {
SessionSummary {
session_id: record.id,
title: record.title,
channel_name: record.channel_name,
chat_id: record.chat_id,
message_count: record.message_count,
last_active_at: record.last_active_at,
archived_at: record.archived_at,
}
}
fn current_timestamp() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)

View File

@ -11,13 +11,15 @@ use crate::config::Config;
#[derive(Clone)]
pub struct ChannelManager {
channels: Arc<RwLock<HashMap<String, Arc<dyn Channel + Send + Sync>>>>,
cli_chat_channel: Arc<crate::channels::CliChatChannel>,
bus: Arc<MessageBus>,
}
impl ChannelManager {
pub fn new() -> Self {
pub fn new(cli_chat_channel: Arc<crate::channels::CliChatChannel>) -> Self {
Self {
channels: Arc::new(RwLock::new(HashMap::new())),
cli_chat_channel,
bus: MessageBus::new(100),
}
}
@ -27,8 +29,22 @@ impl ChannelManager {
self.bus.clone()
}
/// Register a channel with the manager
pub async fn register_channel(&self, name: &str, channel: Arc<dyn Channel + Send + Sync>) {
self.channels.write().await.insert(name.to_string(), channel);
}
/// Get CLI chat channel
pub fn cli_chat_channel(&self) -> Arc<crate::channels::CliChatChannel> {
self.cli_chat_channel.clone()
}
/// Initialize all Channel instances from config
pub async fn init(&self, config: &Config, _provider_config: crate::config::LLMProviderConfig) -> Result<(), ChannelError> {
pub async fn init(
&self,
config: &Config,
_provider_config: crate::config::LLMProviderConfig,
) -> Result<(), ChannelError> {
// Initialize Feishu channel if enabled
if let Some(feishu_config) = config.channels.get("feishu") {
if feishu_config.enabled {
@ -44,6 +60,14 @@ impl ChannelManager {
tracing::info!("Feishu channel disabled in config");
}
}
// Register CLI chat channel
self.channels
.write()
.await
.insert("cli_chat".to_string(), self.cli_chat_channel.clone());
tracing::info!("CLI chat channel registered");
Ok(())
}

View File

@ -6,18 +6,18 @@ use std::sync::Arc;
use axum::{routing, Router};
use tokio::net::TcpListener;
use crate::bus::{MessageBus, OutboundDispatcher};
use crate::channels::{Channel, ChannelManager, CliChatChannel};
use crate::bus::{ControlInbound, ControlMessage, ControlOutbound, OutboundDispatcher};
use crate::channels::{ChannelManager, CliChatChannel};
use crate::channels::base::{Channel, ChannelError};
use crate::config::Config;
use crate::logging;
use crate::protocol::SessionSummary;
use session::SessionManager;
pub struct GatewayState {
pub config: Config,
pub session_manager: SessionManager,
pub channel_manager: ChannelManager,
pub bus: Arc<MessageBus>,
pub cli_chat_channel: Arc<CliChatChannel>,
}
impl GatewayState {
@ -31,104 +31,169 @@ impl GatewayState {
let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4);
let session_manager = SessionManager::new(session_ttl_hours, provider_config.clone())?;
let channel_manager = ChannelManager::new();
let bus = channel_manager.bus();
// Create CLI Chat Channel
let cli_chat_channel = Arc::new(CliChatChannel::new(
session_manager.store().clone(),
));
// Create CLI Chat Channel first (needed for ChannelManager)
let cli_chat_channel = Arc::new(CliChatChannel::new());
let channel_manager = ChannelManager::new(cli_chat_channel);
Ok(Self {
config,
session_manager,
channel_manager,
bus,
cli_chat_channel,
})
}
/// Get a reference to the MessageBus
pub fn bus(&self) -> Arc<crate::bus::MessageBus> {
self.channel_manager.bus()
}
/// Get CLI chat channel for WebSocket handling
pub fn cli_chat_channel(&self) -> Arc<CliChatChannel> {
self.channel_manager.cli_chat_channel()
}
/// Start the message processing loops
pub async fn start_message_processing(&self) {
let bus_for_inbound = self.bus.clone();
let bus_for_outbound = self.bus.clone();
let bus = self.bus();
let bus_for_outbound = bus.clone();
let session_manager = self.session_manager.clone();
// Start CLI Chat Channel
if let Err(e) = self.cli_chat_channel.start(self.bus.clone()).await {
// Start CLI Chat Channel (it's already registered in ChannelManager)
let cli_chat_channel = self.cli_chat_channel();
if let Err(e) = cli_chat_channel.start(bus.clone()).await {
tracing::error!(error = %e, "Failed to start CLI chat channel");
}
// Spawn inbound message processor
// This consumes from bus.inbound, processes via SessionManager, publishes to bus.outbound
// Spawn unified message processor
// This handles both inbound AI messages and control messages in one loop
tokio::spawn(async move {
tracing::info!("Inbound processor started");
loop {
let inbound = bus_for_inbound.consume_inbound().await;
#[cfg(debug_assertions)]
{
tracing::debug!(
channel = %inbound.channel,
chat_id = %inbound.chat_id,
sender = %inbound.sender_id,
content = %inbound.content,
media_count = %inbound.media.len(),
"Processing inbound message"
);
if !inbound.media.is_empty() {
for (i, m) in inbound.media.iter().enumerate() {
tracing::debug!(media_index = i, media_type = %m.media_type, path = %m.path, "Media item");
}
}
}
tracing::info!("Message processor started");
// Process via session manager
match session_manager.handle_message(
&inbound.channel,
&inbound.sender_id,
&inbound.chat_id,
&inbound.content,
inbound.media,
).await {
Ok(response_content) => {
// Forward channel-specific metadata from inbound to outbound.
// This allows channels to propagate context (e.g. feishu message_id for reaction cleanup)
// without gateway needing channel-specific code.
let outbound = crate::bus::OutboundMessage {
channel: inbound.channel.clone(),
chat_id: inbound.chat_id.clone(),
content: response_content,
reply_to: None,
media: vec![],
metadata: inbound.forwarded_metadata,
};
if let Err(e) = bus_for_inbound.publish_outbound(outbound).await {
tracing::error!(error = %e, "Failed to publish outbound");
loop {
tokio::select! {
// Inbound: AI message flow
inbound = bus.consume_inbound() => {
#[cfg(debug_assertions)]
{
tracing::debug!(
channel = %inbound.channel,
chat_id = %inbound.chat_id,
sender = %inbound.sender_id,
content = %inbound.content,
media_count = %inbound.media.len(),
"Processing inbound message"
);
if !inbound.media.is_empty() {
for (i, m) in inbound.media.iter().enumerate() {
tracing::debug!(media_index = i, media_type = %m.media_type, path = %m.path, "Media item");
}
}
}
match session_manager.handle_message(
&inbound.channel,
&inbound.sender_id,
&inbound.chat_id,
&inbound.content,
inbound.media,
).await {
Ok(response_content) => {
let outbound = crate::bus::OutboundMessage {
channel: inbound.channel.clone(),
chat_id: inbound.chat_id.clone(),
content: response_content,
reply_to: None,
media: vec![],
metadata: inbound.forwarded_metadata,
};
if let Err(e) = bus.publish_outbound(outbound).await {
tracing::error!(error = %e, "Failed to publish outbound");
}
}
Err(e) => {
tracing::error!(error = %e, "Failed to handle message");
}
}
}
Err(e) => {
tracing::error!(error = %e, "Failed to handle message");
// Control: session management operations
msg = bus.consume_control() => {
Self::handle_control_message(&session_manager, msg).await;
}
}
}
});
// Spawn outbound dispatcher
let dispatcher = OutboundDispatcher::new(bus_for_outbound);
let channel_manager = self.channel_manager.clone();
let cli_chat_channel = self.cli_chat_channel.clone();
// Register channels with dispatcher
if let Some(channel) = channel_manager.get_channel("feishu").await {
dispatcher.register_channel("feishu", channel).await;
}
dispatcher.register_channel("cli_chat", cli_chat_channel).await;
let dispatcher = OutboundDispatcher::new(bus_for_outbound, self.channel_manager.clone());
tokio::spawn(async move {
tracing::info!("Outbound dispatcher started");
dispatcher.run().await;
});
}
/// Handle control messages (session management operations)
async fn handle_control_message(
session_manager: &SessionManager,
msg: ControlMessage,
) {
let reply_tx = msg.reply_tx;
let result = match msg.op {
ControlInbound::CreateSession { title } => {
session_manager.create_cli_session(title.as_deref())
.map(|record| ControlOutbound::SessionCreated {
session_id: record.id,
title: record.title,
})
}
ControlInbound::ListSessions { include_archived } => {
session_manager.list_cli_sessions(include_archived)
.map(|records| ControlOutbound::SessionList {
sessions: records.into_iter().map(|r| SessionSummary {
session_id: r.id,
title: r.title,
channel_name: r.channel_name,
chat_id: r.chat_id,
message_count: r.message_count,
last_active_at: r.last_active_at,
archived_at: r.archived_at,
}).collect()
})
}
ControlInbound::LoadSession { session_id } => {
session_manager.get_session_record(&session_id)
.map(|opt| opt.map(|r| ControlOutbound::SessionLoaded {
session_id: r.id,
title: r.title,
message_count: r.message_count,
}).unwrap_or_else(|| ControlOutbound::Error {
code: "SESSION_NOT_FOUND".to_string(),
message: format!("Session not found: {}", session_id),
}))
}
ControlInbound::RenameSession { session_id, title } => {
session_manager.rename_session(&session_id, &title)
.map(|()| ControlOutbound::SessionRenamed { session_id, title })
}
ControlInbound::ArchiveSession { session_id } => {
session_manager.archive_session(&session_id)
.map(|()| ControlOutbound::SessionArchived { session_id })
}
ControlInbound::DeleteSession { session_id } => {
session_manager.delete_session(&session_id)
.map(|()| ControlOutbound::SessionDeleted { session_id })
}
ControlInbound::ClearHistory { session_id } => {
session_manager.clear_session_messages(&session_id)
.map(|()| ControlOutbound::HistoryCleared { session_id })
}
};
let result = result.map_err(|e| ChannelError::Other(e.to_string()));
let _ = reply_tx.send(result).await;
}
}
pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn std::error::Error>> {
@ -142,10 +207,13 @@ pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn
let provider_config = state.config.get_provider_config("default")?;
// Initialize and start channels
state.channel_manager.init(&state.config, provider_config.clone()).await?;
state.channel_manager.init(
&state.config,
provider_config.clone(),
).await?;
state.channel_manager.start_all().await?;
// Start message processing (inbound processor + outbound dispatcher)
// Start message processing (inbound processor + control processor + outbound dispatcher)
state.start_message_processing().await;
// CLI args override config file values

View File

@ -18,8 +18,11 @@ async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
// Create channel for sending outbound messages to this client
let (sender, mut receiver) = mpsc::channel::<WsOutbound>(100);
// Get CLI chat channel
let cli_chat_channel = state.cli_chat_channel();
// Register client with CliChatChannel and get initial session id
let (session_id, client) = state.cli_chat_channel.register_client(sender.clone()).await;
let (session_id, client) = cli_chat_channel.register_client(sender.clone()).await;
// Send session established message
let _ = sender.send(WsOutbound::SessionEstablished {
@ -45,7 +48,7 @@ async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
while let Some(msg) = ws_receiver.next().await {
match msg {
Ok(WsMessage::Text(text)) => {
state.cli_chat_channel.handle_inbound(client.clone(), &text).await;
cli_chat_channel.handle_inbound(client.clone(), &text).await;
}
Ok(WsMessage::Close(_)) | Err(_) => {
tracing::debug!(session_id = %session_id, "WebSocket closed");