From 7238bd20d8701d63363eaa461bf36600cecabc7b Mon Sep 17 00:00:00 2001 From: oudecheng <13802883547@139.com> Date: Tue, 12 May 2026 16:18:45 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=81=A2=E5=A4=8D?= =?UTF-8?q?=E4=B8=AD=E6=96=AD=E8=B0=83=E5=BA=A6=E4=BB=BB=E5=8A=A1=E7=9A=84?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81=E4=BB=8E=E4=B8=8A?= =?UTF-8?q?=E6=AC=A1=E4=BC=9A=E8=AF=9D=E4=B8=AD=E6=81=A2=E5=A4=8D=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=E4=B8=AD=E7=9A=84=E4=BB=BB=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/scheduler/mod.rs | 75 ++++++++++++++++++++++++++++++++++++++++++++ src/storage/mod.rs | 21 +++++++++++++ src/storage/ports.rs | 6 ++++ 3 files changed, 102 insertions(+) diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 8fe109a..63f4227 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -94,6 +94,10 @@ impl Scheduler { tracing::error!(error = %error, "Failed to sync scheduler config jobs"); } + if let Err(error) = self.recover_interrupted_jobs().await { + tracing::error!(error = %error, "Failed to recover interrupted scheduler jobs"); + } + let tick_ms = self.config.tick_resolution_ms.max(100); let mut ticker = tokio::time::interval(std::time::Duration::from_millis(tick_ms)); @@ -132,6 +136,77 @@ impl Scheduler { Ok(()) } + async fn recover_interrupted_jobs(&self) -> anyhow::Result { + let now = Utc::now(); + let running_jobs = self.jobs.list_running_scheduler_jobs()?; + let mut recovered_count = 0; + + for record in running_jobs { + let error_msg = format!( + "Job interrupted due to application restart at {}", + now.to_rfc3339() + ); + + let next_fire_at = match record.next_fire_at { + Some(timestamp) if timestamp <= now.timestamp_millis() => { + match self.config.misfire_policy { + SchedulerMisfirePolicy::CatchUp => Some(timestamp), + SchedulerMisfirePolicy::Skip => { + let schedule_result = deserialize_schedule( + &record.schedule, + record.interval_secs, + record.startup_delay_secs, + ); + match schedule_result { + Ok(schedule) => compute_next_fire_at( + &schedule, + now, + Some(timestamp), + self.config.misfire_policy, + self.timezone, + ) + .ok() + .flatten(), + Err(_) => Some(timestamp), + } + } + } + } + other => other, + }; + + self.jobs.update_scheduler_job_runtime( + &record.id, + SchedulerJobState::Scheduled, + Some(SchedulerJobStatus::Error), + Some(&error_msg), + record.run_count, + record.last_fired_at, + next_fire_at, + record.paused_at, + record.completed_at, + )?; + + recovered_count += 1; + tracing::info!( + job_id = %record.id, + old_next_fire_at = ?record.next_fire_at, + new_next_fire_at = ?next_fire_at, + "Recovered interrupted job from previous session" + ); + } + + if recovered_count > 0 { + tracing::info!( + recovered_count = recovered_count, + "Scheduler recovered {} interrupted jobs from previous session", + recovered_count + ); + } + + Ok(recovered_count) + } + async fn process_tick(&self) -> anyhow::Result<()> { let now = Utc::now(); let jobs = self.jobs.list_scheduler_jobs(true)?; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 3f3aae5..3dded44 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -957,6 +957,27 @@ impl SessionStore { Ok(jobs) } + pub fn list_running_scheduler_jobs(&self) -> Result, StorageError> { + let conn = self.conn.lock().expect("session db mutex poisoned"); + let sql = " + SELECT id, kind, schedule_json, interval_secs, startup_delay_secs, + target_json, payload_json, enabled, state, last_status, last_error, + run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at, + created_at, updated_at + FROM scheduler_jobs + WHERE state = 'running' + ORDER BY COALESCE(next_fire_at, created_at) ASC, id ASC + "; + + let mut stmt = conn.prepare(sql)?; + let rows = stmt.query_map([], map_scheduler_job_record)?; + let mut jobs = Vec::new(); + for row in rows { + jobs.push(row?); + } + Ok(jobs) + } + pub fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> { let conn = self.conn.lock().expect("session db mutex poisoned"); conn.execute("DELETE FROM scheduler_jobs WHERE id = ?1", params![job_id])?; diff --git a/src/storage/ports.rs b/src/storage/ports.rs index a2edc72..3a18aba 100644 --- a/src/storage/ports.rs +++ b/src/storage/ports.rs @@ -90,6 +90,8 @@ pub trait SchedulerJobRepository: Send + Sync + 'static { enabled_only: bool, ) -> Result, StorageError>; + fn list_running_scheduler_jobs(&self) -> Result, StorageError>; + fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError>; fn update_scheduler_job_runtime( @@ -253,6 +255,10 @@ impl SchedulerJobRepository for super::SessionStore { super::SessionStore::list_scheduler_jobs(self, enabled_only) } + fn list_running_scheduler_jobs(&self) -> Result, StorageError> { + super::SessionStore::list_running_scheduler_jobs(self) + } + fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> { super::SessionStore::delete_scheduler_job(self, job_id) }