From 205b8149339b3548c79970f9c8d6f094b2994fca Mon Sep 17 00:00:00 2001 From: xiaoski Date: Tue, 5 May 2026 00:17:19 +0800 Subject: [PATCH] feat: wire scheduler into GatewayState startup and message processing --- src/gateway/mod.rs | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 8da35b6..fe28eff 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -11,12 +11,15 @@ use crate::channels::base::{Channel, ChannelError}; use crate::config::{Config, expand_path, ensure_workspace_dir}; use crate::logging; use crate::session::SessionManager; +use crate::scheduler::Scheduler; +use crate::scheduler::store as scheduler_store; pub struct GatewayState { pub config: Config, pub workspace_dir: std::path::PathBuf, pub session_manager: Arc, pub channel_manager: ChannelManager, + pub pool: sqlx::SqlitePool, } impl GatewayState { @@ -53,6 +56,8 @@ impl GatewayState { ); tracing::info!("Session storage: {}", db_path.display()); + let pool = storage.pool().clone(); + // Create MessageBus first (shared by SessionManager and ChannelManager) let bus = MessageBus::new(100); @@ -75,11 +80,21 @@ impl GatewayState { let available_channels = channel_manager.list_channel_names().await; session_manager.register_outbound_tool(available_channels); + // Initialize scheduler if enabled in config + let scheduler_config = config.gateway.scheduler.clone().unwrap_or_default(); + if scheduler_config.enabled { + scheduler_store::SchedulerStore::init(&pool) + .await + .map_err(|e| format!("failed to initialize scheduler store: {}", e))?; + tracing::info!("Scheduler store initialized"); + } + Ok(Self { config, workspace_dir: workspace_path, session_manager: session_manager.clone(), channel_manager, + pool, }) } @@ -168,6 +183,20 @@ impl GatewayState { tracing::info!("Outbound dispatcher started"); dispatcher.run().await; }); + + // Spawn scheduler background task if enabled + let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default(); + if scheduler_config.enabled { + let sched = Arc::new(Scheduler::new( + self.pool.clone(), + self.session_manager.clone(), + scheduler_config, + )); + tokio::spawn(async move { + sched.run().await; + }); + tracing::info!("Scheduler background task spawned"); + } } /// Handle control messages (session management operations)