feat: 添加恢复中断调度任务的功能,支持从上次会话中恢复运行中的任务

This commit is contained in:
oudecheng 2026-05-12 16:18:45 +08:00
parent 53e069c97c
commit 7238bd20d8
3 changed files with 102 additions and 0 deletions

View File

@ -94,6 +94,10 @@ impl Scheduler {
tracing::error!(error = %error, "Failed to sync scheduler config jobs"); tracing::error!(error = %error, "Failed to sync scheduler config jobs");
} }
if let Err(error) = self.recover_interrupted_jobs().await {
tracing::error!(error = %error, "Failed to recover interrupted scheduler jobs");
}
let tick_ms = self.config.tick_resolution_ms.max(100); let tick_ms = self.config.tick_resolution_ms.max(100);
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(tick_ms)); let mut ticker = tokio::time::interval(std::time::Duration::from_millis(tick_ms));
@ -132,6 +136,77 @@ impl Scheduler {
Ok(()) Ok(())
} }
async fn recover_interrupted_jobs(&self) -> anyhow::Result<usize> {
let now = Utc::now();
let running_jobs = self.jobs.list_running_scheduler_jobs()?;
let mut recovered_count = 0;
for record in running_jobs {
let error_msg = format!(
"Job interrupted due to application restart at {}",
now.to_rfc3339()
);
let next_fire_at = match record.next_fire_at {
Some(timestamp) if timestamp <= now.timestamp_millis() => {
match self.config.misfire_policy {
SchedulerMisfirePolicy::CatchUp => Some(timestamp),
SchedulerMisfirePolicy::Skip => {
let schedule_result = deserialize_schedule(
&record.schedule,
record.interval_secs,
record.startup_delay_secs,
);
match schedule_result {
Ok(schedule) => compute_next_fire_at(
&schedule,
now,
Some(timestamp),
self.config.misfire_policy,
self.timezone,
)
.ok()
.flatten(),
Err(_) => Some(timestamp),
}
}
}
}
other => other,
};
self.jobs.update_scheduler_job_runtime(
&record.id,
SchedulerJobState::Scheduled,
Some(SchedulerJobStatus::Error),
Some(&error_msg),
record.run_count,
record.last_fired_at,
next_fire_at,
record.paused_at,
record.completed_at,
)?;
recovered_count += 1;
tracing::info!(
job_id = %record.id,
old_next_fire_at = ?record.next_fire_at,
new_next_fire_at = ?next_fire_at,
"Recovered interrupted job from previous session"
);
}
if recovered_count > 0 {
tracing::info!(
recovered_count = recovered_count,
"Scheduler recovered {} interrupted jobs from previous session",
recovered_count
);
}
Ok(recovered_count)
}
async fn process_tick(&self) -> anyhow::Result<()> { async fn process_tick(&self) -> anyhow::Result<()> {
let now = Utc::now(); let now = Utc::now();
let jobs = self.jobs.list_scheduler_jobs(true)?; let jobs = self.jobs.list_scheduler_jobs(true)?;

View File

@ -957,6 +957,27 @@ impl SessionStore {
Ok(jobs) Ok(jobs)
} }
pub fn list_running_scheduler_jobs(&self) -> Result<Vec<SchedulerJobRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let sql = "
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
FROM scheduler_jobs
WHERE state = 'running'
ORDER BY COALESCE(next_fire_at, created_at) ASC, id ASC
";
let mut stmt = conn.prepare(sql)?;
let rows = stmt.query_map([], map_scheduler_job_record)?;
let mut jobs = Vec::new();
for row in rows {
jobs.push(row?);
}
Ok(jobs)
}
pub fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> { pub fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned"); let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute("DELETE FROM scheduler_jobs WHERE id = ?1", params![job_id])?; conn.execute("DELETE FROM scheduler_jobs WHERE id = ?1", params![job_id])?;

View File

@ -90,6 +90,8 @@ pub trait SchedulerJobRepository: Send + Sync + 'static {
enabled_only: bool, enabled_only: bool,
) -> Result<Vec<SchedulerJobRecord>, StorageError>; ) -> Result<Vec<SchedulerJobRecord>, StorageError>;
fn list_running_scheduler_jobs(&self) -> Result<Vec<SchedulerJobRecord>, StorageError>;
fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError>; fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError>;
fn update_scheduler_job_runtime( fn update_scheduler_job_runtime(
@ -253,6 +255,10 @@ impl SchedulerJobRepository for super::SessionStore {
super::SessionStore::list_scheduler_jobs(self, enabled_only) super::SessionStore::list_scheduler_jobs(self, enabled_only)
} }
fn list_running_scheduler_jobs(&self) -> Result<Vec<SchedulerJobRecord>, StorageError> {
super::SessionStore::list_running_scheduler_jobs(self)
}
fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> { fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> {
super::SessionStore::delete_scheduler_job(self, job_id) super::SessionStore::delete_scheduler_job(self, job_id)
} }