pub mod http; pub mod ws; use std::sync::Arc; use axum::{routing, Router}; use tokio::net::TcpListener; use crate::bus::{ControlMessage, OutboundDispatcher}; use crate::channels::{ChannelManager, CliChatChannel}; use crate::channels::base::{Channel, ChannelError}; use crate::config::{Config, expand_path, ensure_workspace_dir}; use crate::logging; use crate::session::SessionManager; pub struct GatewayState { pub config: Config, pub workspace_dir: std::path::PathBuf, pub session_manager: SessionManager, pub channel_manager: ChannelManager, } impl GatewayState { pub fn new() -> Result> { let config = Config::load_default()?; // Initialize workspace directory: expand path and ensure it exists let workspace_path = expand_path(&config.workspace_dir); let workspace_path = ensure_workspace_dir(&workspace_path)?; // Switch current working directory to workspace std::env::set_current_dir(&workspace_path) .map_err(|e| format!("Failed to switch to workspace directory {}: {}", workspace_path.display(), e))?; tracing::info!("Using workspace directory: {}", workspace_path.display()); // Get provider config for SessionManager let mut provider_config = config.get_provider_config("default")?; // Override workspace_dir with the ensured path provider_config.workspace_dir = workspace_path.clone(); // Session TTL from config (default 4 hours) let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4); let session_manager = SessionManager::new(session_ttl_hours, provider_config.clone())?; // Create CLI Chat Channel first (needed for ChannelManager) let cli_chat_channel = Arc::new(CliChatChannel::new()); let channel_manager = ChannelManager::new(cli_chat_channel); Ok(Self { config, workspace_dir: workspace_path, session_manager, channel_manager, }) } /// Get a reference to the MessageBus pub fn bus(&self) -> Arc { self.channel_manager.bus() } /// Get CLI chat channel for WebSocket handling pub fn cli_chat_channel(&self) -> Arc { self.channel_manager.cli_chat_channel() } /// Start the message processing loops pub async fn start_message_processing(&self) { let bus = self.bus(); let bus_for_outbound = bus.clone(); let session_manager = self.session_manager.clone(); // Start CLI Chat Channel (it's already registered in ChannelManager) let cli_chat_channel = self.cli_chat_channel(); if let Err(e) = cli_chat_channel.start(bus.clone()).await { tracing::error!(error = %e, "Failed to start CLI chat channel"); } // Spawn unified message processor // This handles both inbound AI messages and control messages in one loop tokio::spawn(async move { tracing::info!("Message processor started"); loop { tokio::select! { // Inbound: AI message flow inbound = bus.consume_inbound() => { match session_manager.handle_message( &inbound.channel, &inbound.sender_id, &inbound.chat_id, inbound.dialog_id.as_deref(), &inbound.content, inbound.media, ).await { Ok(crate::session::session::HandleResult::AgentResponse(content)) => { let outbound = crate::bus::OutboundMessage { channel: inbound.channel.clone(), chat_id: inbound.chat_id.clone(), content, reply_to: None, media: vec![], metadata: inbound.forwarded_metadata, }; if let Err(e) = bus.publish_outbound(outbound).await { tracing::error!(error = %e, "Failed to publish outbound"); } } Ok(crate::session::session::HandleResult::CommandOutput(content)) => { let outbound = crate::bus::OutboundMessage { channel: inbound.channel.clone(), chat_id: inbound.chat_id.clone(), content, reply_to: None, media: vec![], metadata: inbound.forwarded_metadata, }; if let Err(e) = bus.publish_outbound(outbound).await { tracing::error!(error = %e, "Failed to publish outbound"); } } Err(e) => { tracing::error!(error = %e, "Failed to handle message"); } } } // Control: session management operations msg = bus.consume_control() => { Self::handle_control_message(&session_manager, msg).await; } } } }); // Spawn outbound dispatcher let dispatcher = OutboundDispatcher::new(bus_for_outbound, self.channel_manager.clone()); tokio::spawn(async move { tracing::info!("Outbound dispatcher started"); dispatcher.run().await; }); } /// Handle control messages (session management operations) async fn handle_control_message( session_manager: &SessionManager, msg: ControlMessage, ) { use crate::session::{SessionCommand::*, SessionEvent}; let reply_tx = msg.reply_tx; let result: Result = match msg.op { CreateDialog { channel, chat_id, title } => { session_manager.create_dialog(&channel, &chat_id, title.as_deref()).await .map(|(session_id, title)| SessionEvent::DialogCreated { session_id, title }) .map_err(|e| ChannelError::Other(e.to_string())) } ListDialogs { channel, chat_id, include_archived } => { session_manager.list_dialogs(&channel, &chat_id, include_archived).await .map(|(dialogs, current_dialog_id)| SessionEvent::DialogList { dialogs, current_dialog_id }) .map_err(|e| ChannelError::Other(e.to_string())) } GetCurrentDialog { channel, chat_id } => { session_manager.get_current_dialog(&channel, &chat_id).await .map(|session_id| SessionEvent::CurrentDialog { session_id }) .map_err(|e| ChannelError::Other(e.to_string())) } SwitchDialog { channel, chat_id, dialog_id } => { session_manager.switch_dialog(&channel, &chat_id, &dialog_id).await .map(|session_id| SessionEvent::DialogSwitched { session_id }) .map_err(|e| ChannelError::Other(e.to_string())) } RenameDialog { session_id, title } => { session_manager.rename_dialog(&session_id, &title) .map(|()| SessionEvent::DialogRenamed { session_id, title }) .map_err(|e| ChannelError::Other(e.to_string())) } ArchiveDialog { session_id } => { session_manager.archive_dialog(&session_id) .map(|()| SessionEvent::DialogArchived { session_id }) .map_err(|e| ChannelError::Other(e.to_string())) } DeleteDialog { session_id } => { session_manager.delete_dialog(&session_id) .map(|()| SessionEvent::DialogDeleted { session_id }) .map_err(|e| ChannelError::Other(e.to_string())) } ClearHistory { session_id } => { session_manager.clear_dialog_history(&session_id) .map(|()| SessionEvent::HistoryCleared { session_id }) .map_err(|e| ChannelError::Other(e.to_string())) } GetSlashCommands { channel: _, chat_id: _ } => { let commands = session_manager.get_slash_commands().to_vec(); Ok(SessionEvent::SlashCommandsList { commands }) } ExecuteSlashCommand { command, args, channel, chat_id, current_session_id } => { session_manager.execute_slash_command(&command, args.as_deref(), &channel, &chat_id, current_session_id.as_ref()) .await .map(|(new_id, msg)| SessionEvent::SlashCommandExecuted { new_session_id: new_id, message: msg }) .map_err(|e| ChannelError::Other(e.to_string())) } }; let _ = reply_tx.send(result).await; } } pub async fn run(host: Option, port: Option) -> Result<(), Box> { // Initialize logging logging::init_logging(); tracing::info!("Starting PicoBot Gateway"); let state = Arc::new(GatewayState::new()?); // Initialize and start channels with workspace directory state.channel_manager.init( &state.config, state.workspace_dir.clone(), ).await?; state.channel_manager.start_all().await?; // Start message processing (inbound processor + control processor + outbound dispatcher) state.start_message_processing().await; // CLI args override config file values let bind_host = host.unwrap_or_else(|| state.config.gateway.host.clone()); let bind_port = port.unwrap_or(state.config.gateway.port); let app = Router::new() .route("/health", routing::get(http::health)) .route("/ws", routing::get(ws::ws_handler)) .with_state(state.clone()); let addr = format!("{}:{}", bind_host, bind_port); let listener = TcpListener::bind(&addr).await?; tracing::info!(address = %addr, "Gateway listening"); // Graceful shutdown using oneshot channel let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>(); let channel_manager = state.channel_manager.clone(); // Spawn ctrl_c handler tokio::spawn(async move { tokio::signal::ctrl_c().await.ok(); tracing::info!("Shutdown signal received"); let _ = channel_manager.stop_all().await; let _ = shutdown_tx.send(()); }); // Serve with graceful shutdown axum::serve(listener, app) .with_graceful_shutdown(async { shutdown_rx.await.ok(); }) .await?; Ok(()) }