feat: wire scheduler into GatewayState startup and message processing

This commit is contained in:
xiaoski 2026-05-05 00:17:19 +08:00
parent 3a94b9718f
commit 205b814933

View File

@ -11,12 +11,15 @@ use crate::channels::base::{Channel, ChannelError};
use crate::config::{Config, expand_path, ensure_workspace_dir}; use crate::config::{Config, expand_path, ensure_workspace_dir};
use crate::logging; use crate::logging;
use crate::session::SessionManager; use crate::session::SessionManager;
use crate::scheduler::Scheduler;
use crate::scheduler::store as scheduler_store;
pub struct GatewayState { pub struct GatewayState {
pub config: Config, pub config: Config,
pub workspace_dir: std::path::PathBuf, pub workspace_dir: std::path::PathBuf,
pub session_manager: Arc<SessionManager>, pub session_manager: Arc<SessionManager>,
pub channel_manager: ChannelManager, pub channel_manager: ChannelManager,
pub pool: sqlx::SqlitePool,
} }
impl GatewayState { impl GatewayState {
@ -53,6 +56,8 @@ impl GatewayState {
); );
tracing::info!("Session storage: {}", db_path.display()); tracing::info!("Session storage: {}", db_path.display());
let pool = storage.pool().clone();
// Create MessageBus first (shared by SessionManager and ChannelManager) // Create MessageBus first (shared by SessionManager and ChannelManager)
let bus = MessageBus::new(100); let bus = MessageBus::new(100);
@ -75,11 +80,21 @@ impl GatewayState {
let available_channels = channel_manager.list_channel_names().await; let available_channels = channel_manager.list_channel_names().await;
session_manager.register_outbound_tool(available_channels); session_manager.register_outbound_tool(available_channels);
// Initialize scheduler if enabled in config
let scheduler_config = config.gateway.scheduler.clone().unwrap_or_default();
if scheduler_config.enabled {
scheduler_store::SchedulerStore::init(&pool)
.await
.map_err(|e| format!("failed to initialize scheduler store: {}", e))?;
tracing::info!("Scheduler store initialized");
}
Ok(Self { Ok(Self {
config, config,
workspace_dir: workspace_path, workspace_dir: workspace_path,
session_manager: session_manager.clone(), session_manager: session_manager.clone(),
channel_manager, channel_manager,
pool,
}) })
} }
@ -168,6 +183,20 @@ impl GatewayState {
tracing::info!("Outbound dispatcher started"); tracing::info!("Outbound dispatcher started");
dispatcher.run().await; dispatcher.run().await;
}); });
// Spawn scheduler background task if enabled
let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default();
if scheduler_config.enabled {
let sched = Arc::new(Scheduler::new(
self.pool.clone(),
self.session_manager.clone(),
scheduler_config,
));
tokio::spawn(async move {
sched.run().await;
});
tracing::info!("Scheduler background task spawned");
}
} }
/// Handle control messages (session management operations) /// Handle control messages (session management operations)