From 6756a3d0ae08650031fda7b89dc662e443abdf48 Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Tue, 28 Apr 2026 14:52:33 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20OutboundDispatcher?= =?UTF-8?q?=20=E6=A8=A1=E5=9D=97=EF=BC=8C=E9=87=8D=E6=9E=84=E6=B6=88?= =?UTF-8?q?=E6=81=AF=E5=88=86=E5=8F=91=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=B8=A0=E9=81=93=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot --- README.md | 4 ++-- src/bus/mod.rs | 12 +++++----- src/channels/base.rs | 2 +- src/gateway/mod.rs | 4 +++- .../outbound_dispatcher.rs} | 22 +++++++++---------- 5 files changed, 21 insertions(+), 23 deletions(-) rename src/{bus/dispatcher.rs => gateway/outbound_dispatcher.rs} (77%) diff --git a/README.md b/README.md index 2b7f579..37b98e5 100644 --- a/README.md +++ b/README.md @@ -37,8 +37,8 @@ PicoBot 的设计目标不是“只会聊天”的单进程 Bot,而是一个 主要模块如下: -- src/gateway:网关入口、HTTP 健康检查、WebSocket 控制面、Session 池、CLI 会话服务与 Agent 执行编排 -- src/bus:消息总线与消息结构定义 +- src/gateway:网关入口、HTTP 健康检查、WebSocket 控制面、Session 池、CLI 会话服务、OutboundDispatcher 与 Agent 执行编排 +- src/bus:消息总线队列与消息结构定义,不包含渠道投递逻辑 - src/agent:AgentLoop 与上下文压缩器 - src/providers:不同 LLM Provider 的统一抽象,当前支持 openai 和 anthropic - src/tools:内置工具集合 diff --git a/src/bus/mod.rs b/src/bus/mod.rs index c0325b0..7b09308 100644 --- a/src/bus/mod.rs +++ b/src/bus/mod.rs @@ -1,8 +1,6 @@ -pub mod dispatcher; pub mod message; pub use crate::domain::messages::ContentBlock; -pub use dispatcher::OutboundDispatcher; pub use message::{ ChatMessage, InboundMessage, MediaItem, OutboundMessage, SYSTEM_CONTEXT_AGENT_PROMPT, SYSTEM_CONTEXT_HISTORY_COMPACTION, SYSTEM_CONTEXT_SCHEDULED_PROMPT, @@ -12,7 +10,7 @@ use std::sync::Arc; use tokio::sync::{Mutex, mpsc}; // ============================================================================ -// MessageBus - Async message queue for Channel <-> Agent communication +// MessageBus - async inbound/outbound queues // ============================================================================ pub struct MessageBus { @@ -35,7 +33,7 @@ impl MessageBus { }) } - /// Publish an inbound message (Channel -> Bus) + /// Publish a message to the inbound queue pub async fn publish_inbound(&self, msg: InboundMessage) -> Result<(), BusError> { #[cfg(debug_assertions)] tracing::debug!(channel = %msg.channel, sender = %msg.sender_id, chat = %msg.chat_id, content_len = %msg.content.len(), media_count = %msg.media.len(), "Bus: publishing inbound message"); @@ -45,7 +43,7 @@ impl MessageBus { .map_err(|_| BusError::Closed) } - /// Consume an inbound message (Agent -> Bus) + /// Consume a message from the inbound queue pub async fn consume_inbound(&self) -> InboundMessage { let msg = self .inbound_rx @@ -59,7 +57,7 @@ impl MessageBus { msg } - /// Publish an outbound message (Agent -> Bus) + /// Publish a message to the outbound queue pub async fn publish_outbound(&self, msg: OutboundMessage) -> Result<(), BusError> { #[cfg(debug_assertions)] tracing::debug!(channel = %msg.channel, chat_id = %msg.chat_id, content_len = %msg.content.len(), "Bus: publishing outbound message"); @@ -69,7 +67,7 @@ impl MessageBus { .map_err(|_| BusError::Closed) } - /// Consume an outbound message (Dispatcher -> Bus) + /// Consume an outbound message from the outbound queue pub async fn consume_outbound(&self) -> OutboundMessage { self.outbound_rx .lock() diff --git a/src/channels/base.rs b/src/channels/base.rs index d91afea..e929399 100644 --- a/src/channels/base.rs +++ b/src/channels/base.rs @@ -43,7 +43,7 @@ pub trait Channel: Send + Sync + 'static { /// Stop the channel async fn stop(&self) -> Result<(), ChannelError>; - /// Send a message to the channel (called by OutboundDispatcher) + /// Send a message to the channel (called by gateway outbound dispatch) async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError>; /// Send a streaming delta (optional, for channels that support it) diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 235cf66..e3481d3 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -8,6 +8,7 @@ pub mod http; pub mod memory_maintenance; pub mod memory_maintenance_coordinator; pub mod message_prepare; +pub mod outbound_dispatcher; pub mod processor; pub mod prompt; pub mod prompt_injector; @@ -27,7 +28,7 @@ use std::collections::HashMap; use std::sync::Arc; use tokio::net::TcpListener; -use crate::bus::{MessageBus, OutboundDispatcher}; +use crate::bus::MessageBus; use crate::channels::ChannelManager; use crate::config::Config; use crate::config::LLMProviderConfig; @@ -35,6 +36,7 @@ use crate::logging; use crate::scheduler::Scheduler; use crate::skills::SkillRuntime; use agent_task_executor::{AgentTaskExecutor, SchedulerMaintenanceService}; +use outbound_dispatcher::OutboundDispatcher; use processor::InboundProcessor; use session::SessionManager; diff --git a/src/bus/dispatcher.rs b/src/gateway/outbound_dispatcher.rs similarity index 77% rename from src/bus/dispatcher.rs rename to src/gateway/outbound_dispatcher.rs index 9678b9f..11a37da 100644 --- a/src/bus/dispatcher.rs +++ b/src/gateway/outbound_dispatcher.rs @@ -1,12 +1,12 @@ use std::collections::HashMap; use std::sync::Arc; + use tokio::sync::RwLock; use crate::bus::{MessageBus, OutboundMessage}; use crate::channels::base::{Channel, ChannelError}; -/// OutboundDispatcher consumes outbound messages from the MessageBus -/// and dispatches them to the appropriate Channel +/// Consumes outbound messages from MessageBus and dispatches them to channels. pub struct OutboundDispatcher { bus: Arc, channels: Arc>>>, @@ -20,7 +20,6 @@ impl OutboundDispatcher { } } - /// Register a channel with the dispatcher pub async fn register_channel(&self, name: &str, channel: Arc) { self.channels .write() @@ -28,7 +27,6 @@ impl OutboundDispatcher { .insert(name.to_string(), channel); } - /// Run the dispatcher loop - consumes from bus and dispatches to channels pub async fn run(&self) { tracing::info!("OutboundDispatcher started"); @@ -47,8 +45,8 @@ impl OutboundDispatcher { match channel { Some(ch) => { - if let Err(e) = self.send_with_retry(&*ch, msg).await { - tracing::error!(channel = %channel_name, error = %e, "Failed to send message after retries"); + if let Err(error) = self.send_with_retry(&*ch, msg).await { + tracing::error!(channel = %channel_name, error = %error, "Failed to send message after retries"); } } None => { @@ -58,7 +56,6 @@ impl OutboundDispatcher { } } - /// Send a message with exponential retry async fn send_with_retry( &self, channel: &dyn Channel, @@ -66,21 +63,22 @@ impl OutboundDispatcher { ) -> Result<(), ChannelError> { const DELAYS: [u64; 3] = [1, 2, 4]; - for (i, delay) in DELAYS.iter().enumerate() { + for (attempt_index, delay) in DELAYS.iter().enumerate() { match channel.send(msg.clone()).await { Ok(()) => return Ok(()), - Err(e) if i < DELAYS.len() - 1 => { + Err(error) if attempt_index < DELAYS.len() - 1 => { tracing::warn!( - attempt = i + 1, + attempt = attempt_index + 1, delay = delay, - error = %e, + error = %error, "Send failed, retrying" ); tokio::time::sleep(tokio::time::Duration::from_secs(*delay)).await; } - Err(e) => return Err(e), + Err(error) => return Err(error), } } + unreachable!() } }