feat: 添加 OutboundDispatcher 模块,重构消息分发逻辑,优化渠道消息处理

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
ooodc 2026-04-28 14:52:33 +08:00
parent c547b88a12
commit 6756a3d0ae
5 changed files with 21 additions and 23 deletions

View File

@ -37,8 +37,8 @@ PicoBot 的设计目标不是“只会聊天”的单进程 Bot而是一个
主要模块如下: 主要模块如下:
- src/gateway网关入口、HTTP 健康检查、WebSocket 控制面、Session 池、CLI 会话服务与 Agent 执行编排 - src/gateway网关入口、HTTP 健康检查、WebSocket 控制面、Session 池、CLI 会话服务、OutboundDispatcher 与 Agent 执行编排
- src/bus消息总线与消息结构定义 - src/bus消息总线队列与消息结构定义,不包含渠道投递逻辑
- src/agentAgentLoop 与上下文压缩器 - src/agentAgentLoop 与上下文压缩器
- src/providers不同 LLM Provider 的统一抽象,当前支持 openai 和 anthropic - src/providers不同 LLM Provider 的统一抽象,当前支持 openai 和 anthropic
- src/tools内置工具集合 - src/tools内置工具集合

View File

@ -1,8 +1,6 @@
pub mod dispatcher;
pub mod message; pub mod message;
pub use crate::domain::messages::ContentBlock; pub use crate::domain::messages::ContentBlock;
pub use dispatcher::OutboundDispatcher;
pub use message::{ pub use message::{
ChatMessage, InboundMessage, MediaItem, OutboundMessage, SYSTEM_CONTEXT_AGENT_PROMPT, ChatMessage, InboundMessage, MediaItem, OutboundMessage, SYSTEM_CONTEXT_AGENT_PROMPT,
SYSTEM_CONTEXT_HISTORY_COMPACTION, SYSTEM_CONTEXT_SCHEDULED_PROMPT, SYSTEM_CONTEXT_HISTORY_COMPACTION, SYSTEM_CONTEXT_SCHEDULED_PROMPT,
@ -12,7 +10,7 @@ use std::sync::Arc;
use tokio::sync::{Mutex, mpsc}; use tokio::sync::{Mutex, mpsc};
// ============================================================================ // ============================================================================
// MessageBus - Async message queue for Channel <-> Agent communication // MessageBus - async inbound/outbound queues
// ============================================================================ // ============================================================================
pub struct MessageBus { pub struct MessageBus {
@ -35,7 +33,7 @@ impl MessageBus {
}) })
} }
/// Publish an inbound message (Channel -> Bus) /// Publish a message to the inbound queue
pub async fn publish_inbound(&self, msg: InboundMessage) -> Result<(), BusError> { pub async fn publish_inbound(&self, msg: InboundMessage) -> Result<(), BusError> {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::debug!(channel = %msg.channel, sender = %msg.sender_id, chat = %msg.chat_id, content_len = %msg.content.len(), media_count = %msg.media.len(), "Bus: publishing inbound message"); tracing::debug!(channel = %msg.channel, sender = %msg.sender_id, chat = %msg.chat_id, content_len = %msg.content.len(), media_count = %msg.media.len(), "Bus: publishing inbound message");
@ -45,7 +43,7 @@ impl MessageBus {
.map_err(|_| BusError::Closed) .map_err(|_| BusError::Closed)
} }
/// Consume an inbound message (Agent -> Bus) /// Consume a message from the inbound queue
pub async fn consume_inbound(&self) -> InboundMessage { pub async fn consume_inbound(&self) -> InboundMessage {
let msg = self let msg = self
.inbound_rx .inbound_rx
@ -59,7 +57,7 @@ impl MessageBus {
msg msg
} }
/// Publish an outbound message (Agent -> Bus) /// Publish a message to the outbound queue
pub async fn publish_outbound(&self, msg: OutboundMessage) -> Result<(), BusError> { pub async fn publish_outbound(&self, msg: OutboundMessage) -> Result<(), BusError> {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::debug!(channel = %msg.channel, chat_id = %msg.chat_id, content_len = %msg.content.len(), "Bus: publishing outbound message"); tracing::debug!(channel = %msg.channel, chat_id = %msg.chat_id, content_len = %msg.content.len(), "Bus: publishing outbound message");
@ -69,7 +67,7 @@ impl MessageBus {
.map_err(|_| BusError::Closed) .map_err(|_| BusError::Closed)
} }
/// Consume an outbound message (Dispatcher -> Bus) /// Consume an outbound message from the outbound queue
pub async fn consume_outbound(&self) -> OutboundMessage { pub async fn consume_outbound(&self) -> OutboundMessage {
self.outbound_rx self.outbound_rx
.lock() .lock()

View File

@ -43,7 +43,7 @@ pub trait Channel: Send + Sync + 'static {
/// Stop the channel /// Stop the channel
async fn stop(&self) -> Result<(), ChannelError>; async fn stop(&self) -> Result<(), ChannelError>;
/// Send a message to the channel (called by OutboundDispatcher) /// Send a message to the channel (called by gateway outbound dispatch)
async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError>; async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError>;
/// Send a streaming delta (optional, for channels that support it) /// Send a streaming delta (optional, for channels that support it)

View File

@ -8,6 +8,7 @@ pub mod http;
pub mod memory_maintenance; pub mod memory_maintenance;
pub mod memory_maintenance_coordinator; pub mod memory_maintenance_coordinator;
pub mod message_prepare; pub mod message_prepare;
pub mod outbound_dispatcher;
pub mod processor; pub mod processor;
pub mod prompt; pub mod prompt;
pub mod prompt_injector; pub mod prompt_injector;
@ -27,7 +28,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use crate::bus::{MessageBus, OutboundDispatcher}; use crate::bus::MessageBus;
use crate::channels::ChannelManager; use crate::channels::ChannelManager;
use crate::config::Config; use crate::config::Config;
use crate::config::LLMProviderConfig; use crate::config::LLMProviderConfig;
@ -35,6 +36,7 @@ use crate::logging;
use crate::scheduler::Scheduler; use crate::scheduler::Scheduler;
use crate::skills::SkillRuntime; use crate::skills::SkillRuntime;
use agent_task_executor::{AgentTaskExecutor, SchedulerMaintenanceService}; use agent_task_executor::{AgentTaskExecutor, SchedulerMaintenanceService};
use outbound_dispatcher::OutboundDispatcher;
use processor::InboundProcessor; use processor::InboundProcessor;
use session::SessionManager; use session::SessionManager;

View File

@ -1,12 +1,12 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::bus::{MessageBus, OutboundMessage}; use crate::bus::{MessageBus, OutboundMessage};
use crate::channels::base::{Channel, ChannelError}; use crate::channels::base::{Channel, ChannelError};
/// OutboundDispatcher consumes outbound messages from the MessageBus /// Consumes outbound messages from MessageBus and dispatches them to channels.
/// and dispatches them to the appropriate Channel
pub struct OutboundDispatcher { pub struct OutboundDispatcher {
bus: Arc<MessageBus>, bus: Arc<MessageBus>,
channels: Arc<RwLock<HashMap<String, Arc<dyn Channel + Send + Sync>>>>, channels: Arc<RwLock<HashMap<String, Arc<dyn Channel + Send + Sync>>>>,
@ -20,7 +20,6 @@ impl OutboundDispatcher {
} }
} }
/// Register a channel with the dispatcher
pub async fn register_channel(&self, name: &str, channel: Arc<dyn Channel + Send + Sync>) { pub async fn register_channel(&self, name: &str, channel: Arc<dyn Channel + Send + Sync>) {
self.channels self.channels
.write() .write()
@ -28,7 +27,6 @@ impl OutboundDispatcher {
.insert(name.to_string(), channel); .insert(name.to_string(), channel);
} }
/// Run the dispatcher loop - consumes from bus and dispatches to channels
pub async fn run(&self) { pub async fn run(&self) {
tracing::info!("OutboundDispatcher started"); tracing::info!("OutboundDispatcher started");
@ -47,8 +45,8 @@ impl OutboundDispatcher {
match channel { match channel {
Some(ch) => { Some(ch) => {
if let Err(e) = self.send_with_retry(&*ch, msg).await { if let Err(error) = self.send_with_retry(&*ch, msg).await {
tracing::error!(channel = %channel_name, error = %e, "Failed to send message after retries"); tracing::error!(channel = %channel_name, error = %error, "Failed to send message after retries");
} }
} }
None => { None => {
@ -58,7 +56,6 @@ impl OutboundDispatcher {
} }
} }
/// Send a message with exponential retry
async fn send_with_retry( async fn send_with_retry(
&self, &self,
channel: &dyn Channel, channel: &dyn Channel,
@ -66,21 +63,22 @@ impl OutboundDispatcher {
) -> Result<(), ChannelError> { ) -> Result<(), ChannelError> {
const DELAYS: [u64; 3] = [1, 2, 4]; const DELAYS: [u64; 3] = [1, 2, 4];
for (i, delay) in DELAYS.iter().enumerate() { for (attempt_index, delay) in DELAYS.iter().enumerate() {
match channel.send(msg.clone()).await { match channel.send(msg.clone()).await {
Ok(()) => return Ok(()), Ok(()) => return Ok(()),
Err(e) if i < DELAYS.len() - 1 => { Err(error) if attempt_index < DELAYS.len() - 1 => {
tracing::warn!( tracing::warn!(
attempt = i + 1, attempt = attempt_index + 1,
delay = delay, delay = delay,
error = %e, error = %error,
"Send failed, retrying" "Send failed, retrying"
); );
tokio::time::sleep(tokio::time::Duration::from_secs(*delay)).await; tokio::time::sleep(tokio::time::Duration::from_secs(*delay)).await;
} }
Err(e) => return Err(e), Err(error) => return Err(error),
} }
} }
unreachable!() unreachable!()
} }
} }