pub mod types; pub mod store; pub mod tools; use std::sync::Arc; use std::time::Instant; use tokio::time; use crate::config::SchedulerConfig; use crate::session::session::HandleResult; use crate::session::SessionManager; pub use types::{JobRun, Schedule, ScheduledJob}; /// Compute the next execution time (Unix ms) for a schedule, given `from` (Unix ms). /// Returns `None` if no next time can be determined (e.g., invalid cron expression). pub fn next_run_for_schedule(schedule: &Schedule, from: i64) -> Option { use chrono::{TimeZone, Utc}; use std::str::FromStr; match schedule { Schedule::At { at } => Some(*at), Schedule::Every { every_ms } => Some(from + *every_ms as i64), Schedule::Cron { expr, tz } => { let cron_schedule = cron::Schedule::from_str(expr.as_str()).ok()?; let from_secs = (from / 1000) as i64; let from_nanos = ((from % 1000) * 1_000_000) as u32; let from_dt = Utc.timestamp_opt(from_secs, from_nanos).single()?; let next_utc = if let Some(tz_str) = tz { let tz: chrono_tz::Tz = tz_str.parse().ok()?; let _from_local = from_dt.with_timezone(&tz); let next_local = cron_schedule.upcoming(tz).next()?; next_local.with_timezone(&Utc) } else { cron_schedule.upcoming(Utc).next()? }; Some(next_utc.timestamp_millis()) } } } fn now_ms() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as i64 } /// The scheduler runs as a background tokio task, periodically checking for due jobs /// and executing them via `SessionManager::handle_cron_message`. pub struct Scheduler { pool: sqlx::SqlitePool, session_manager: Arc, config: SchedulerConfig, } impl Scheduler { pub fn new( pool: sqlx::SqlitePool, session_manager: Arc, config: SchedulerConfig, ) -> Self { Self { pool, session_manager, config, } } /// Run the scheduler loop. This is a long-running async function meant to be /// spawned as a tokio background task. pub async fn run(self: Arc) { let poll_duration = time::Duration::from_secs(self.config.poll_interval_secs); let mut interval = time::interval(poll_duration); interval.tick().await; tracing::info!( "Scheduler started (poll interval: {}s, max concurrent: {})", self.config.poll_interval_secs, self.config.max_concurrent, ); loop { interval.tick().await; let now = now_ms(); let due = match store::SchedulerStore::due_jobs(&self.pool, now, self.config.max_concurrent).await { Ok(jobs) => jobs, Err(e) => { tracing::error!("scheduler: failed to query due jobs: {}", e); continue; } }; if due.is_empty() { continue; } tracing::info!("scheduler: found {} due job(s)", due.len()); for job in &due { let start = Instant::now(); let started_at = now_ms(); if let Err(e) = store::SchedulerStore::touch_last_run(&self.pool, &job.id, started_at).await { tracing::error!(job_id = %job.id, "scheduler: failed to touch last_run_at: {}", e); continue; } tracing::info!( job_id = %job.id, job_name = %job.name, "scheduler: executing cron job" ); let result = self .session_manager .handle_cron_message( &job.channel, &job.chat_id, &job.prompt, &job.id, &job.name, ) .await; let finished_at = now_ms(); let duration_ms = start.elapsed().as_millis() as i64; match result { Ok(HandleResult::AgentResponse(output)) => { let output_truncated = if output.len() > 8000 { format!("{}...[truncated]", &output[..8000]) } else { output.clone() }; let run = JobRun { id: 0, job_id: job.id.clone(), started_at, finished_at, status: "ok".to_string(), output: Some(output_truncated), error: None, duration_ms, }; if let Err(e) = store::SchedulerStore::record_run(&self.pool, &run).await { tracing::error!(job_id = %job.id, "scheduler: failed to record run: {}", e); } if let Err(e) = store::SchedulerStore::set_last_status(&self.pool, &job.id, "ok", None).await { tracing::error!(job_id = %job.id, "scheduler: failed to set last_status: {}", e); } tracing::info!( job_id = %job.id, duration_ms = %duration_ms, "scheduler: job completed successfully" ); } Ok(HandleResult::CommandOutput(output)) => { let run = JobRun { id: 0, job_id: job.id.clone(), started_at, finished_at, status: "ok".to_string(), output: Some(output), error: None, duration_ms, }; let _ = store::SchedulerStore::record_run(&self.pool, &run).await; } Err(e) => { let error_str = e.to_string(); let run = JobRun { id: 0, job_id: job.id.clone(), started_at, finished_at, status: "error".to_string(), output: None, error: Some(error_str.clone()), duration_ms, }; if let Err(e2) = store::SchedulerStore::record_run(&self.pool, &run).await { tracing::error!(job_id = %job.id, "scheduler: failed to record error run: {}", e2); } if let Err(e2) = store::SchedulerStore::set_last_status( &self.pool, &job.id, "error", Some(&error_str), ).await { tracing::error!(job_id = %job.id, "scheduler: failed to set error status: {}", e2); } tracing::error!( job_id = %job.id, duration_ms = %duration_ms, error = %error_str, "scheduler: job failed" ); } } if let Err(e) = self.reschedule_after_run(job).await { tracing::error!(job_id = %job.id, "scheduler: failed to reschedule: {}", e); } } } } /// After a job runs, compute its next execution time or disable/delete it. async fn reschedule_after_run( &self, job: &ScheduledJob, ) -> anyhow::Result<()> { let now = now_ms(); match &job.schedule { Schedule::At { .. } => { if job.delete_after_run { store::SchedulerStore::remove_job(&self.pool, &job.id).await?; tracing::info!(job_id = %job.id, "scheduler: one-shot job deleted after run"); } else { store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?; tracing::info!(job_id = %job.id, "scheduler: one-shot job disabled after run"); } } Schedule::Every { .. } | Schedule::Cron { .. } => { if let Some(next) = next_run_for_schedule(&job.schedule, now) { store::SchedulerStore::set_next_run(&self.pool, &job.id, next).await?; tracing::info!(job_id = %job.id, next_run_at = %next, "scheduler: job rescheduled"); } else { tracing::error!(job_id = %job.id, "scheduler: could not compute next run -- disabling job"); store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?; } } } Ok(()) } } #[cfg(test)] mod tests { use super::*; #[test] fn test_next_run_at_schedule() { let now = 1000000; let next = next_run_for_schedule(&Schedule::At { at: 2000000 }, now); assert_eq!(next, Some(2000000)); } #[test] fn test_next_run_every_schedule() { let now = 1000000; let next = next_run_for_schedule(&Schedule::Every { every_ms: 5000 }, now); assert_eq!(next, Some(1005000)); } #[test] fn test_next_run_cron_every_minute() { let expr = "0 * * * * *".to_string(); let schedule = Schedule::Cron { expr, tz: None }; let now = 1000000; let next = next_run_for_schedule(&schedule, now); assert!(next.is_some()); assert!(next.unwrap() > now); } #[test] fn test_next_run_cron_every_day_at_9am() { let expr = "0 0 9 * * *".to_string(); let schedule = Schedule::Cron { expr, tz: None }; let now = 1000000; let next = next_run_for_schedule(&schedule, now); assert!(next.is_some()); let next_ms = next.unwrap(); assert!(next_ms > now); } }