- Removed InputHandler and related input event handling code. - Updated GatewayState to handle new session commands for dialogs. - Introduced UnifiedSessionId for managing session identifiers across channels and chats. - Refactored Session and SessionManager to manage dialogs instead of sessions. - Added methods for creating, listing, switching, renaming, archiving, and deleting dialogs. - Updated storage functions to accommodate dialog IDs in persistent session management. - Enhanced tests to cover new dialog functionalities and ensure stability.
121 lines
4.1 KiB
Rust
121 lines
4.1 KiB
Rust
pub mod dispatcher;
|
|
pub mod message;
|
|
|
|
pub use dispatcher::OutboundDispatcher;
|
|
pub use message::{ChatMessage, ContentBlock, ControlMessage, InboundMessage, MediaItem, OutboundMessage};
|
|
|
|
use std::sync::Arc;
|
|
use tokio::sync::{mpsc, Mutex};
|
|
|
|
// ============================================================================
|
|
// MessageBus - Async message queue for Channel <-> Agent communication
|
|
// ============================================================================
|
|
|
|
pub struct MessageBus {
|
|
inbound_tx: mpsc::Sender<InboundMessage>,
|
|
outbound_tx: mpsc::Sender<OutboundMessage>,
|
|
inbound_rx: Mutex<mpsc::Receiver<InboundMessage>>,
|
|
outbound_rx: Mutex<mpsc::Receiver<OutboundMessage>>,
|
|
// Control channel for session management operations
|
|
control_tx: mpsc::Sender<ControlMessage>,
|
|
control_rx: Mutex<mpsc::Receiver<ControlMessage>>,
|
|
}
|
|
|
|
impl MessageBus {
|
|
/// Create a new MessageBus with the given channel capacity
|
|
pub fn new(capacity: usize) -> Arc<Self> {
|
|
let (inbound_tx, inbound_rx) = mpsc::channel(capacity);
|
|
let (outbound_tx, outbound_rx) = mpsc::channel(capacity);
|
|
let (control_tx, control_rx) = mpsc::channel(capacity);
|
|
Arc::new(Self {
|
|
inbound_tx,
|
|
outbound_tx,
|
|
inbound_rx: Mutex::new(inbound_rx),
|
|
outbound_rx: Mutex::new(outbound_rx),
|
|
control_tx,
|
|
control_rx: Mutex::new(control_rx),
|
|
})
|
|
}
|
|
|
|
/// Publish an inbound message (Channel -> Bus)
|
|
pub async fn publish_inbound(&self, msg: InboundMessage) -> Result<(), BusError> {
|
|
#[cfg(debug_assertions)]
|
|
tracing::debug!(channel = %msg.channel, sender = %msg.sender_id, chat = %msg.chat_id, content_len = %msg.content.len(), media_count = %msg.media.len(), "Bus: publishing inbound message");
|
|
self.inbound_tx
|
|
.send(msg)
|
|
.await
|
|
.map_err(|_| BusError::Closed)
|
|
}
|
|
|
|
/// Consume an inbound message (Agent -> Bus)
|
|
pub async fn consume_inbound(&self) -> InboundMessage {
|
|
let msg = self.inbound_rx
|
|
.lock()
|
|
.await
|
|
.recv()
|
|
.await
|
|
.expect("bus inbound closed");
|
|
#[cfg(debug_assertions)]
|
|
tracing::debug!(channel = %msg.channel, sender = %msg.sender_id, chat = %msg.chat_id, "Bus: consuming inbound message");
|
|
msg
|
|
}
|
|
|
|
/// Publish an outbound message (Agent -> Bus)
|
|
pub async fn publish_outbound(&self, msg: OutboundMessage) -> Result<(), BusError> {
|
|
#[cfg(debug_assertions)]
|
|
tracing::debug!(channel = %msg.channel, chat_id = %msg.chat_id, content_len = %msg.content.len(), "Bus: publishing outbound message");
|
|
self.outbound_tx
|
|
.send(msg)
|
|
.await
|
|
.map_err(|_| BusError::Closed)
|
|
}
|
|
|
|
/// Consume an outbound message (Dispatcher -> Bus)
|
|
pub async fn consume_outbound(&self) -> OutboundMessage {
|
|
self.outbound_rx
|
|
.lock()
|
|
.await
|
|
.recv()
|
|
.await
|
|
.expect("bus outbound closed")
|
|
}
|
|
|
|
/// Publish a control message (Channel -> Bus for session management)
|
|
pub async fn publish_control(&self, msg: ControlMessage) -> Result<(), BusError> {
|
|
tracing::debug!(op = ?msg.op, "Bus: publishing control message");
|
|
self.control_tx
|
|
.send(msg)
|
|
.await
|
|
.map_err(|_| BusError::Closed)
|
|
}
|
|
|
|
/// Consume a control message (ControlProcessor -> Bus)
|
|
pub async fn consume_control(&self) -> ControlMessage {
|
|
self.control_rx
|
|
.lock()
|
|
.await
|
|
.recv()
|
|
.await
|
|
.expect("bus control closed")
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// BusError
|
|
// ============================================================================
|
|
|
|
#[derive(Debug)]
|
|
pub enum BusError {
|
|
Closed,
|
|
}
|
|
|
|
impl std::fmt::Display for BusError {
|
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
match self {
|
|
BusError::Closed => write!(f, "Bus channel closed"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl std::error::Error for BusError {}
|