249 lines
10 KiB
Rust
249 lines
10 KiB
Rust
pub mod http;
|
|
pub mod ws;
|
|
|
|
use std::sync::Arc;
|
|
use axum::{routing, Router};
|
|
use tokio::net::TcpListener;
|
|
|
|
use crate::bus::{ControlMessage, OutboundDispatcher};
|
|
use crate::channels::{ChannelManager, CliChatChannel};
|
|
use crate::channels::base::{Channel, ChannelError};
|
|
use crate::config::{Config, expand_path, ensure_workspace_dir};
|
|
use crate::logging;
|
|
use crate::session::SessionManager;
|
|
|
|
pub struct GatewayState {
|
|
pub config: Config,
|
|
pub workspace_dir: std::path::PathBuf,
|
|
pub session_manager: SessionManager,
|
|
pub channel_manager: ChannelManager,
|
|
}
|
|
|
|
impl GatewayState {
|
|
pub fn new() -> Result<Self, Box<dyn std::error::Error>> {
|
|
let config = Config::load_default()?;
|
|
|
|
// Initialize workspace directory: expand path and ensure it exists
|
|
let workspace_path = expand_path(&config.workspace_dir);
|
|
let workspace_path = ensure_workspace_dir(&workspace_path)?;
|
|
|
|
// Switch current working directory to workspace
|
|
std::env::set_current_dir(&workspace_path)
|
|
.map_err(|e| format!("Failed to switch to workspace directory {}: {}", workspace_path.display(), e))?;
|
|
|
|
tracing::info!("Using workspace directory: {}", workspace_path.display());
|
|
|
|
// Get provider config for SessionManager
|
|
let mut provider_config = config.get_provider_config("default")?;
|
|
// Override workspace_dir with the ensured path
|
|
provider_config.workspace_dir = workspace_path.clone();
|
|
|
|
// Session TTL from config (default 4 hours)
|
|
let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4);
|
|
|
|
let session_manager = SessionManager::new(session_ttl_hours, provider_config.clone())?;
|
|
|
|
// Create CLI Chat Channel first (needed for ChannelManager)
|
|
let cli_chat_channel = Arc::new(CliChatChannel::new());
|
|
let channel_manager = ChannelManager::new(cli_chat_channel);
|
|
|
|
Ok(Self {
|
|
config,
|
|
workspace_dir: workspace_path,
|
|
session_manager,
|
|
channel_manager,
|
|
})
|
|
}
|
|
|
|
/// Get a reference to the MessageBus
|
|
pub fn bus(&self) -> Arc<crate::bus::MessageBus> {
|
|
self.channel_manager.bus()
|
|
}
|
|
|
|
/// Get CLI chat channel for WebSocket handling
|
|
pub fn cli_chat_channel(&self) -> Arc<CliChatChannel> {
|
|
self.channel_manager.cli_chat_channel()
|
|
}
|
|
|
|
/// Start the message processing loops
|
|
pub async fn start_message_processing(&self) {
|
|
let bus = self.bus();
|
|
let bus_for_outbound = bus.clone();
|
|
let session_manager = self.session_manager.clone();
|
|
|
|
// Start CLI Chat Channel (it's already registered in ChannelManager)
|
|
let cli_chat_channel = self.cli_chat_channel();
|
|
if let Err(e) = cli_chat_channel.start(bus.clone()).await {
|
|
tracing::error!(error = %e, "Failed to start CLI chat channel");
|
|
}
|
|
|
|
// Spawn unified message processor
|
|
// This handles both inbound AI messages and control messages in one loop
|
|
tokio::spawn(async move {
|
|
tracing::info!("Message processor started");
|
|
|
|
loop {
|
|
tokio::select! {
|
|
// Inbound: AI message flow
|
|
inbound = bus.consume_inbound() => {
|
|
match session_manager.handle_message(
|
|
&inbound.channel,
|
|
&inbound.sender_id,
|
|
&inbound.chat_id,
|
|
inbound.dialog_id.as_deref(),
|
|
&inbound.content,
|
|
inbound.media,
|
|
).await {
|
|
Ok(response_content) => {
|
|
let outbound = crate::bus::OutboundMessage {
|
|
channel: inbound.channel.clone(),
|
|
chat_id: inbound.chat_id.clone(),
|
|
content: response_content,
|
|
reply_to: None,
|
|
media: vec![],
|
|
metadata: inbound.forwarded_metadata,
|
|
};
|
|
if let Err(e) = bus.publish_outbound(outbound).await {
|
|
tracing::error!(error = %e, "Failed to publish outbound");
|
|
}
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(error = %e, "Failed to handle message");
|
|
}
|
|
}
|
|
}
|
|
|
|
// Control: session management operations
|
|
msg = bus.consume_control() => {
|
|
Self::handle_control_message(&session_manager, msg).await;
|
|
}
|
|
}
|
|
}
|
|
});
|
|
|
|
// Spawn outbound dispatcher
|
|
let dispatcher = OutboundDispatcher::new(bus_for_outbound, self.channel_manager.clone());
|
|
|
|
tokio::spawn(async move {
|
|
tracing::info!("Outbound dispatcher started");
|
|
dispatcher.run().await;
|
|
});
|
|
}
|
|
|
|
/// Handle control messages (session management operations)
|
|
async fn handle_control_message(
|
|
session_manager: &SessionManager,
|
|
msg: ControlMessage,
|
|
) {
|
|
use crate::session::{SessionCommand::*, SessionEvent};
|
|
|
|
let reply_tx = msg.reply_tx;
|
|
let result: Result<SessionEvent, ChannelError> = match msg.op {
|
|
CreateDialog { channel, chat_id, title } => {
|
|
session_manager.create_dialog(&channel, &chat_id, title.as_deref()).await
|
|
.map(|(session_id, title)| SessionEvent::DialogCreated { session_id, title })
|
|
.map_err(|e| ChannelError::Other(e.to_string()))
|
|
}
|
|
ListDialogs { channel, chat_id, include_archived } => {
|
|
session_manager.list_dialogs(&channel, &chat_id, include_archived).await
|
|
.map(|(dialogs, current_dialog_id)| SessionEvent::DialogList { dialogs, current_dialog_id })
|
|
.map_err(|e| ChannelError::Other(e.to_string()))
|
|
}
|
|
GetCurrentDialog { channel, chat_id } => {
|
|
session_manager.get_current_dialog(&channel, &chat_id).await
|
|
.map(|session_id| SessionEvent::CurrentDialog { session_id })
|
|
.map_err(|e| ChannelError::Other(e.to_string()))
|
|
}
|
|
SwitchDialog { channel, chat_id, dialog_id } => {
|
|
session_manager.switch_dialog(&channel, &chat_id, &dialog_id).await
|
|
.map(|session_id| SessionEvent::DialogSwitched { session_id })
|
|
.map_err(|e| ChannelError::Other(e.to_string()))
|
|
}
|
|
RenameDialog { session_id, title } => {
|
|
session_manager.rename_dialog(&session_id, &title)
|
|
.map(|()| SessionEvent::DialogRenamed { session_id, title })
|
|
.map_err(|e| ChannelError::Other(e.to_string()))
|
|
}
|
|
ArchiveDialog { session_id } => {
|
|
session_manager.archive_dialog(&session_id)
|
|
.map(|()| SessionEvent::DialogArchived { session_id })
|
|
.map_err(|e| ChannelError::Other(e.to_string()))
|
|
}
|
|
DeleteDialog { session_id } => {
|
|
session_manager.delete_dialog(&session_id)
|
|
.map(|()| SessionEvent::DialogDeleted { session_id })
|
|
.map_err(|e| ChannelError::Other(e.to_string()))
|
|
}
|
|
ClearHistory { session_id } => {
|
|
session_manager.clear_dialog_history(&session_id)
|
|
.map(|()| SessionEvent::HistoryCleared { session_id })
|
|
.map_err(|e| ChannelError::Other(e.to_string()))
|
|
}
|
|
GetSlashCommands { channel: _, chat_id: _ } => {
|
|
let commands = session_manager.get_slash_commands().to_vec();
|
|
Ok(SessionEvent::SlashCommandsList { commands })
|
|
}
|
|
ExecuteSlashCommand { command, channel, chat_id, current_session_id } => {
|
|
session_manager.execute_slash_command(&command, &channel, &chat_id, current_session_id.as_ref())
|
|
.await
|
|
.map(|(new_id, msg)| SessionEvent::SlashCommandExecuted { new_session_id: new_id, message: msg })
|
|
.map_err(|e| ChannelError::Other(e.to_string()))
|
|
}
|
|
};
|
|
|
|
let _ = reply_tx.send(result).await;
|
|
}
|
|
}
|
|
|
|
pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn std::error::Error>> {
|
|
// Initialize logging
|
|
logging::init_logging();
|
|
tracing::info!("Starting PicoBot Gateway");
|
|
|
|
let state = Arc::new(GatewayState::new()?);
|
|
|
|
// Initialize and start channels with workspace directory
|
|
state.channel_manager.init(
|
|
&state.config,
|
|
state.workspace_dir.clone(),
|
|
).await?;
|
|
state.channel_manager.start_all().await?;
|
|
|
|
// Start message processing (inbound processor + control processor + outbound dispatcher)
|
|
state.start_message_processing().await;
|
|
|
|
// CLI args override config file values
|
|
let bind_host = host.unwrap_or_else(|| state.config.gateway.host.clone());
|
|
let bind_port = port.unwrap_or(state.config.gateway.port);
|
|
|
|
let app = Router::new()
|
|
.route("/health", routing::get(http::health))
|
|
.route("/ws", routing::get(ws::ws_handler))
|
|
.with_state(state.clone());
|
|
|
|
let addr = format!("{}:{}", bind_host, bind_port);
|
|
let listener = TcpListener::bind(&addr).await?;
|
|
tracing::info!(address = %addr, "Gateway listening");
|
|
|
|
// Graceful shutdown using oneshot channel
|
|
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel::<()>();
|
|
let channel_manager = state.channel_manager.clone();
|
|
|
|
// Spawn ctrl_c handler
|
|
tokio::spawn(async move {
|
|
tokio::signal::ctrl_c().await.ok();
|
|
tracing::info!("Shutdown signal received");
|
|
let _ = channel_manager.stop_all().await;
|
|
let _ = shutdown_tx.send(());
|
|
});
|
|
|
|
// Serve with graceful shutdown
|
|
axum::serve(listener, app)
|
|
.with_graceful_shutdown(async {
|
|
shutdown_rx.await.ok();
|
|
})
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|