From 3a94b9718f5e78c89308e864a7300e3269ea558f Mon Sep 17 00:00:00 2001 From: xiaoski Date: Tue, 5 May 2026 00:16:07 +0800 Subject: [PATCH] feat: add Scheduler run loop and next_run_for_schedule --- src/scheduler/mod.rs | 285 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 285 insertions(+) diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index c3c5ba3..12b82aa 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -2,4 +2,289 @@ pub mod types; pub mod store; pub mod tools; +use std::sync::Arc; +use std::time::Instant; +use tokio::time; + +use crate::config::SchedulerConfig; +use crate::session::session::HandleResult; +use crate::session::SessionManager; + pub use types::{JobRun, Schedule, ScheduledJob}; + +/// Compute the next execution time (Unix ms) for a schedule, given `from` (Unix ms). +/// Returns `None` if no next time can be determined (e.g., invalid cron expression). +pub fn next_run_for_schedule(schedule: &Schedule, from: i64) -> Option { + use chrono::{TimeZone, Utc}; + use std::str::FromStr; + + match schedule { + Schedule::At { at } => Some(*at), + Schedule::Every { every_ms } => Some(from + *every_ms as i64), + Schedule::Cron { expr, tz } => { + let cron_schedule = cron::Schedule::from_str(expr.as_str()).ok()?; + let from_secs = (from / 1000) as i64; + let from_nanos = ((from % 1000) * 1_000_000) as u32; + let from_dt = Utc.timestamp_opt(from_secs, from_nanos).single()?; + + let next_utc = if let Some(tz_str) = tz { + let tz: chrono_tz::Tz = tz_str.parse().ok()?; + let _from_local = from_dt.with_timezone(&tz); + let next_local = cron_schedule.upcoming(tz).next()?; + next_local.with_timezone(&Utc) + } else { + cron_schedule.upcoming(Utc).next()? + }; + + Some(next_utc.timestamp_millis()) + } + } +} + +fn now_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64 +} + +/// The scheduler runs as a background tokio task, periodically checking for due jobs +/// and executing them via `SessionManager::handle_cron_message`. +pub struct Scheduler { + pool: sqlx::SqlitePool, + session_manager: Arc, + config: SchedulerConfig, +} + +impl Scheduler { + pub fn new( + pool: sqlx::SqlitePool, + session_manager: Arc, + config: SchedulerConfig, + ) -> Self { + Self { + pool, + session_manager, + config, + } + } + + /// Run the scheduler loop. This is a long-running async function meant to be + /// spawned as a tokio background task. + pub async fn run(self: Arc) { + let poll_duration = time::Duration::from_secs(self.config.poll_interval_secs); + let mut interval = time::interval(poll_duration); + + interval.tick().await; + + tracing::info!( + "Scheduler started (poll interval: {}s, max concurrent: {})", + self.config.poll_interval_secs, + self.config.max_concurrent, + ); + + loop { + interval.tick().await; + + let now = now_ms(); + + let due = match store::SchedulerStore::due_jobs(&self.pool, now, self.config.max_concurrent).await { + Ok(jobs) => jobs, + Err(e) => { + tracing::error!("scheduler: failed to query due jobs: {}", e); + continue; + } + }; + + if due.is_empty() { + continue; + } + + tracing::info!("scheduler: found {} due job(s)", due.len()); + + for job in &due { + let start = Instant::now(); + let started_at = now_ms(); + + if let Err(e) = store::SchedulerStore::touch_last_run(&self.pool, &job.id, started_at).await { + tracing::error!(job_id = %job.id, "scheduler: failed to touch last_run_at: {}", e); + continue; + } + + tracing::info!( + job_id = %job.id, + job_name = %job.name, + "scheduler: executing cron job" + ); + + let result = self + .session_manager + .handle_cron_message( + &job.channel, + &job.chat_id, + &job.prompt, + &job.id, + &job.name, + ) + .await; + + let finished_at = now_ms(); + let duration_ms = start.elapsed().as_millis() as i64; + + match result { + Ok(HandleResult::AgentResponse(output)) => { + let output_truncated = if output.len() > 8000 { + format!("{}...[truncated]", &output[..8000]) + } else { + output.clone() + }; + + let run = JobRun { + id: 0, + job_id: job.id.clone(), + started_at, + finished_at, + status: "ok".to_string(), + output: Some(output_truncated), + error: None, + duration_ms, + }; + + if let Err(e) = store::SchedulerStore::record_run(&self.pool, &run).await { + tracing::error!(job_id = %job.id, "scheduler: failed to record run: {}", e); + } + + if let Err(e) = store::SchedulerStore::set_last_status(&self.pool, &job.id, "ok", None).await { + tracing::error!(job_id = %job.id, "scheduler: failed to set last_status: {}", e); + } + + tracing::info!( + job_id = %job.id, + duration_ms = %duration_ms, + "scheduler: job completed successfully" + ); + } + Ok(HandleResult::CommandOutput(output)) => { + let run = JobRun { + id: 0, + job_id: job.id.clone(), + started_at, + finished_at, + status: "ok".to_string(), + output: Some(output), + error: None, + duration_ms, + }; + + let _ = store::SchedulerStore::record_run(&self.pool, &run).await; + } + Err(e) => { + let error_str = e.to_string(); + let run = JobRun { + id: 0, + job_id: job.id.clone(), + started_at, + finished_at, + status: "error".to_string(), + output: None, + error: Some(error_str.clone()), + duration_ms, + }; + + if let Err(e2) = store::SchedulerStore::record_run(&self.pool, &run).await { + tracing::error!(job_id = %job.id, "scheduler: failed to record error run: {}", e2); + } + + if let Err(e2) = store::SchedulerStore::set_last_status( + &self.pool, &job.id, "error", Some(&error_str), + ).await { + tracing::error!(job_id = %job.id, "scheduler: failed to set error status: {}", e2); + } + + tracing::error!( + job_id = %job.id, + duration_ms = %duration_ms, + error = %error_str, + "scheduler: job failed" + ); + } + } + + if let Err(e) = self.reschedule_after_run(job).await { + tracing::error!(job_id = %job.id, "scheduler: failed to reschedule: {}", e); + } + } + } + } + + /// After a job runs, compute its next execution time or disable/delete it. + async fn reschedule_after_run( + &self, + job: &ScheduledJob, + ) -> Result<(), Box> { + let now = now_ms(); + + match &job.schedule { + Schedule::At { .. } => { + if job.delete_after_run { + store::SchedulerStore::remove_job(&self.pool, &job.id).await?; + tracing::info!(job_id = %job.id, "scheduler: one-shot job deleted after run"); + } else { + store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?; + tracing::info!(job_id = %job.id, "scheduler: one-shot job disabled after run"); + } + } + Schedule::Every { .. } | Schedule::Cron { .. } => { + if let Some(next) = next_run_for_schedule(&job.schedule, now) { + store::SchedulerStore::set_next_run(&self.pool, &job.id, next).await?; + tracing::info!(job_id = %job.id, next_run_at = %next, "scheduler: job rescheduled"); + } else { + tracing::error!(job_id = %job.id, "scheduler: could not compute next run -- disabling job"); + store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?; + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_next_run_at_schedule() { + let now = 1000000; + let next = next_run_for_schedule(&Schedule::At { at: 2000000 }, now); + assert_eq!(next, Some(2000000)); + } + + #[test] + fn test_next_run_every_schedule() { + let now = 1000000; + let next = next_run_for_schedule(&Schedule::Every { every_ms: 5000 }, now); + assert_eq!(next, Some(1005000)); + } + + #[test] + fn test_next_run_cron_every_minute() { + let expr = "0 * * * * *".to_string(); + let schedule = Schedule::Cron { expr, tz: None }; + let now = 1000000; + let next = next_run_for_schedule(&schedule, now); + assert!(next.is_some()); + assert!(next.unwrap() > now); + } + + #[test] + fn test_next_run_cron_every_day_at_9am() { + let expr = "0 0 9 * * *".to_string(); + let schedule = Schedule::Cron { expr, tz: None }; + let now = 1000000; + let next = next_run_for_schedule(&schedule, now); + assert!(next.is_some()); + let next_ms = next.unwrap(); + assert!(next_ms > now); + } +}