feat: add Scheduler run loop and next_run_for_schedule

This commit is contained in:
xiaoski 2026-05-05 00:16:07 +08:00
parent 0757638c6f
commit 3a94b9718f

View File

@ -2,4 +2,289 @@ pub mod types;
pub mod store; pub mod store;
pub mod tools; pub mod tools;
use std::sync::Arc;
use std::time::Instant;
use tokio::time;
use crate::config::SchedulerConfig;
use crate::session::session::HandleResult;
use crate::session::SessionManager;
pub use types::{JobRun, Schedule, ScheduledJob}; pub use types::{JobRun, Schedule, ScheduledJob};
/// Compute the next execution time (Unix ms) for a schedule, given `from` (Unix ms).
/// Returns `None` if no next time can be determined (e.g., invalid cron expression).
pub fn next_run_for_schedule(schedule: &Schedule, from: i64) -> Option<i64> {
use chrono::{TimeZone, Utc};
use std::str::FromStr;
match schedule {
Schedule::At { at } => Some(*at),
Schedule::Every { every_ms } => Some(from + *every_ms as i64),
Schedule::Cron { expr, tz } => {
let cron_schedule = cron::Schedule::from_str(expr.as_str()).ok()?;
let from_secs = (from / 1000) as i64;
let from_nanos = ((from % 1000) * 1_000_000) as u32;
let from_dt = Utc.timestamp_opt(from_secs, from_nanos).single()?;
let next_utc = if let Some(tz_str) = tz {
let tz: chrono_tz::Tz = tz_str.parse().ok()?;
let _from_local = from_dt.with_timezone(&tz);
let next_local = cron_schedule.upcoming(tz).next()?;
next_local.with_timezone(&Utc)
} else {
cron_schedule.upcoming(Utc).next()?
};
Some(next_utc.timestamp_millis())
}
}
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
/// The scheduler runs as a background tokio task, periodically checking for due jobs
/// and executing them via `SessionManager::handle_cron_message`.
pub struct Scheduler {
pool: sqlx::SqlitePool,
session_manager: Arc<SessionManager>,
config: SchedulerConfig,
}
impl Scheduler {
pub fn new(
pool: sqlx::SqlitePool,
session_manager: Arc<SessionManager>,
config: SchedulerConfig,
) -> Self {
Self {
pool,
session_manager,
config,
}
}
/// Run the scheduler loop. This is a long-running async function meant to be
/// spawned as a tokio background task.
pub async fn run(self: Arc<Self>) {
let poll_duration = time::Duration::from_secs(self.config.poll_interval_secs);
let mut interval = time::interval(poll_duration);
interval.tick().await;
tracing::info!(
"Scheduler started (poll interval: {}s, max concurrent: {})",
self.config.poll_interval_secs,
self.config.max_concurrent,
);
loop {
interval.tick().await;
let now = now_ms();
let due = match store::SchedulerStore::due_jobs(&self.pool, now, self.config.max_concurrent).await {
Ok(jobs) => jobs,
Err(e) => {
tracing::error!("scheduler: failed to query due jobs: {}", e);
continue;
}
};
if due.is_empty() {
continue;
}
tracing::info!("scheduler: found {} due job(s)", due.len());
for job in &due {
let start = Instant::now();
let started_at = now_ms();
if let Err(e) = store::SchedulerStore::touch_last_run(&self.pool, &job.id, started_at).await {
tracing::error!(job_id = %job.id, "scheduler: failed to touch last_run_at: {}", e);
continue;
}
tracing::info!(
job_id = %job.id,
job_name = %job.name,
"scheduler: executing cron job"
);
let result = self
.session_manager
.handle_cron_message(
&job.channel,
&job.chat_id,
&job.prompt,
&job.id,
&job.name,
)
.await;
let finished_at = now_ms();
let duration_ms = start.elapsed().as_millis() as i64;
match result {
Ok(HandleResult::AgentResponse(output)) => {
let output_truncated = if output.len() > 8000 {
format!("{}...[truncated]", &output[..8000])
} else {
output.clone()
};
let run = JobRun {
id: 0,
job_id: job.id.clone(),
started_at,
finished_at,
status: "ok".to_string(),
output: Some(output_truncated),
error: None,
duration_ms,
};
if let Err(e) = store::SchedulerStore::record_run(&self.pool, &run).await {
tracing::error!(job_id = %job.id, "scheduler: failed to record run: {}", e);
}
if let Err(e) = store::SchedulerStore::set_last_status(&self.pool, &job.id, "ok", None).await {
tracing::error!(job_id = %job.id, "scheduler: failed to set last_status: {}", e);
}
tracing::info!(
job_id = %job.id,
duration_ms = %duration_ms,
"scheduler: job completed successfully"
);
}
Ok(HandleResult::CommandOutput(output)) => {
let run = JobRun {
id: 0,
job_id: job.id.clone(),
started_at,
finished_at,
status: "ok".to_string(),
output: Some(output),
error: None,
duration_ms,
};
let _ = store::SchedulerStore::record_run(&self.pool, &run).await;
}
Err(e) => {
let error_str = e.to_string();
let run = JobRun {
id: 0,
job_id: job.id.clone(),
started_at,
finished_at,
status: "error".to_string(),
output: None,
error: Some(error_str.clone()),
duration_ms,
};
if let Err(e2) = store::SchedulerStore::record_run(&self.pool, &run).await {
tracing::error!(job_id = %job.id, "scheduler: failed to record error run: {}", e2);
}
if let Err(e2) = store::SchedulerStore::set_last_status(
&self.pool, &job.id, "error", Some(&error_str),
).await {
tracing::error!(job_id = %job.id, "scheduler: failed to set error status: {}", e2);
}
tracing::error!(
job_id = %job.id,
duration_ms = %duration_ms,
error = %error_str,
"scheduler: job failed"
);
}
}
if let Err(e) = self.reschedule_after_run(job).await {
tracing::error!(job_id = %job.id, "scheduler: failed to reschedule: {}", e);
}
}
}
}
/// After a job runs, compute its next execution time or disable/delete it.
async fn reschedule_after_run(
&self,
job: &ScheduledJob,
) -> Result<(), Box<dyn std::error::Error>> {
let now = now_ms();
match &job.schedule {
Schedule::At { .. } => {
if job.delete_after_run {
store::SchedulerStore::remove_job(&self.pool, &job.id).await?;
tracing::info!(job_id = %job.id, "scheduler: one-shot job deleted after run");
} else {
store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?;
tracing::info!(job_id = %job.id, "scheduler: one-shot job disabled after run");
}
}
Schedule::Every { .. } | Schedule::Cron { .. } => {
if let Some(next) = next_run_for_schedule(&job.schedule, now) {
store::SchedulerStore::set_next_run(&self.pool, &job.id, next).await?;
tracing::info!(job_id = %job.id, next_run_at = %next, "scheduler: job rescheduled");
} else {
tracing::error!(job_id = %job.id, "scheduler: could not compute next run -- disabling job");
store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?;
}
}
}
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_next_run_at_schedule() {
let now = 1000000;
let next = next_run_for_schedule(&Schedule::At { at: 2000000 }, now);
assert_eq!(next, Some(2000000));
}
#[test]
fn test_next_run_every_schedule() {
let now = 1000000;
let next = next_run_for_schedule(&Schedule::Every { every_ms: 5000 }, now);
assert_eq!(next, Some(1005000));
}
#[test]
fn test_next_run_cron_every_minute() {
let expr = "0 * * * * *".to_string();
let schedule = Schedule::Cron { expr, tz: None };
let now = 1000000;
let next = next_run_for_schedule(&schedule, now);
assert!(next.is_some());
assert!(next.unwrap() > now);
}
#[test]
fn test_next_run_cron_every_day_at_9am() {
let expr = "0 0 9 * * *".to_string();
let schedule = Schedule::Cron { expr, tz: None };
let now = 1000000;
let next = next_run_for_schedule(&schedule, now);
assert!(next.is_some());
let next_ms = next.unwrap();
assert!(next_ms > now);
}
}