use std::sync::Arc; use tokio::sync::RwLock; use std::collections::HashMap; use crate::bus::{MessageBus, OutboundMessage}; use crate::channels::base::{Channel, ChannelError}; /// OutboundDispatcher consumes outbound messages from the MessageBus /// and dispatches them to the appropriate Channel pub struct OutboundDispatcher { bus: Arc, channels: Arc>>>, } impl OutboundDispatcher { pub fn new(bus: Arc) -> Self { Self { bus, channels: Arc::new(RwLock::new(HashMap::new())), } } /// Register a channel with the dispatcher pub async fn register_channel(&self, name: &str, channel: Arc) { self.channels.write().await.insert(name.to_string(), channel); } /// Run the dispatcher loop - consumes from bus and dispatches to channels pub async fn run(&self) { tracing::info!("OutboundDispatcher started"); loop { let msg = self.bus.consume_outbound().await; #[cfg(debug_assertions)] tracing::debug!( channel = %msg.channel, chat_id = %msg.chat_id, content_len = msg.content.len(), "OutboundDispatcher received message" ); let channel_name = msg.channel.clone(); let channel = self.channels.read().await.get(&channel_name).cloned(); match channel { Some(ch) => { if let Err(e) = self.send_with_retry(&*ch, msg).await { tracing::error!(channel = %channel_name, error = %e, "Failed to send message after retries"); } } None => { tracing::warn!(channel = %channel_name, "No channel found for message"); } } } } /// Send a message with exponential retry async fn send_with_retry( &self, channel: &dyn Channel, msg: OutboundMessage, ) -> Result<(), ChannelError> { const DELAYS: [u64; 3] = [1, 2, 4]; for (i, delay) in DELAYS.iter().enumerate() { match channel.send(msg.clone()).await { Ok(()) => return Ok(()), Err(e) if i < DELAYS.len() - 1 => { tracing::warn!( attempt = i + 1, delay = delay, error = %e, "Send failed, retrying" ); tokio::time::sleep(tokio::time::Duration::from_secs(*delay)).await; } Err(e) => return Err(e), } } unreachable!() } }