From 4f7a8ed6454dbd8ed7574f046fed9c1028515383 Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Fri, 8 May 2026 09:37:15 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=BC=BA=E8=B0=83=E5=BA=A6?= =?UTF-8?q?=E5=99=A8=E4=BB=BB=E5=8A=A1=E6=9B=B4=E6=96=B0=E9=80=BB=E8=BE=91?= =?UTF-8?q?=EF=BC=8C=E6=94=AF=E6=8C=81=E6=8C=81=E4=B9=85=E5=8C=96=E8=BF=90?= =?UTF-8?q?=E8=A1=8C=E6=97=B6=E7=8A=B6=E6=80=81=E5=B9=B6=E5=8C=B9=E9=85=8D?= =?UTF-8?q?=E7=8E=B0=E6=9C=89=E4=BB=BB=E5=8A=A1=E5=AE=9A=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/scheduler/mod.rs | 169 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 166 insertions(+), 3 deletions(-) diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 271f0f4..821ea48 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -126,7 +126,11 @@ impl Scheduler { }) { let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?; - self.jobs.upsert_scheduler_job(&runtime.to_upsert())?; + let mut upsert = runtime.to_upsert(); + if let Some(existing) = self.jobs.get_scheduler_job(&runtime.id)? { + preserve_persisted_runtime(&mut upsert, &existing); + } + self.jobs.upsert_scheduler_job(&upsert)?; } Ok(()) } @@ -285,6 +289,60 @@ impl Scheduler { } } +fn preserve_persisted_runtime(input: &mut SchedulerJobUpsert, existing: &SchedulerJobRecord) { + if !scheduler_job_definition_matches(input, existing) { + return; + } + + input.last_status = existing.last_status.clone(); + input.last_error = existing.last_error.clone(); + input.run_count = existing.run_count; + input.max_runs = existing.max_runs; + input.last_fired_at = existing.last_fired_at; + input.next_fire_at = existing.next_fire_at; + + if existing.state == SchedulerJobState::Running { + input.state = SchedulerJobState::Scheduled; + input.paused_at = None; + input.completed_at = None; + return; + } + + input.state = existing.state.clone(); + input.paused_at = existing.paused_at; + input.completed_at = existing.completed_at; +} + +fn scheduler_job_definition_matches( + input: &SchedulerJobUpsert, + existing: &SchedulerJobRecord, +) -> bool { + let input_schedule = serde_json::from_value::(input.schedule.clone()).ok(); + let existing_schedule = + deserialize_schedule(&existing.schedule, existing.interval_secs, existing.startup_delay_secs) + .ok(); + let input_target = serde_json::from_value::(input.target.clone()).ok(); + let existing_target = serde_json::from_value::(existing.target.clone()).ok(); + let targets_match = match (input_target, existing_target) { + (Some(input_target), Some(existing_target)) => { + input_target.channel == existing_target.channel + && input_target.chat_id == existing_target.chat_id + && input_target.session_chat_id == existing_target.session_chat_id + && input_target.reply_to == existing_target.reply_to + } + (None, None) => true, + _ => false, + }; + + input.kind == existing.kind + && input_schedule == existing_schedule + && input.interval_secs == existing.interval_secs + && input.startup_delay_secs == existing.startup_delay_secs + && targets_match + && input.payload == existing.payload + && input.enabled == existing.enabled +} + #[derive(Debug, Clone)] struct RuntimeJob { id: String, @@ -314,6 +372,13 @@ impl RuntimeJob { timezone: Tz, ) -> anyhow::Result { let schedule = job.resolved_schedule()?; + let (interval_secs, startup_delay_secs) = match &schedule { + SchedulerSchedule::Interval { + seconds, + startup_delay_secs, + } => (*seconds as i64, *startup_delay_secs as i64), + _ => (0, 0), + }; let initial_state = if job.enabled { SchedulerJobState::Scheduled } else { @@ -328,8 +393,8 @@ impl RuntimeJob { Ok(Self { id: job.id.clone(), kind: job.kind.clone(), - interval_secs: job.interval_secs as i64, - startup_delay_secs: job.startup_delay_secs as i64, + interval_secs, + startup_delay_secs, schedule, target: job.target.clone(), payload: job.payload.clone(), @@ -1243,6 +1308,104 @@ mod tests { assert!(saved.next_fire_at.is_some()); } + #[test] + fn sync_config_jobs_preserves_persisted_next_fire_at_for_matching_jobs() { + let store = Arc::new(SessionStore::in_memory().unwrap()); + let persisted_next_fire_at = 1_700_000_300_000; + let config_job = SchedulerJobConfig { + id: "agent.heartbeat".to_string(), + enabled: true, + kind: SchedulerJobKind::OutboundMessage, + schedule: Some(SchedulerSchedule::Interval { + seconds: 300, + startup_delay_secs: 0, + }), + startup_delay_secs: 0, + interval_secs: 0, + target: SchedulerJobTarget { + channel: Some("test-channel".to_string()), + chat_id: Some("oc_demo".to_string()), + session_chat_id: None, + reply_to: None, + }, + payload: serde_json::json!({ + "content": "ping" + }), + }; + + store + .upsert_scheduler_job(&SchedulerJobUpsert { + id: "agent.heartbeat".to_string(), + kind: "outbound_message".to_string(), + schedule: serde_json::json!({ + "type": "interval", + "seconds": 300, + "startup_delay_secs": 0 + }), + interval_secs: 300, + startup_delay_secs: 0, + target: serde_json::json!({ + "channel": "test-channel", + "chat_id": "oc_demo" + }), + payload: serde_json::json!({ + "content": "ping" + }), + enabled: true, + state: SchedulerJobState::Scheduled, + last_status: Some(SchedulerJobStatus::Ok), + last_error: None, + run_count: 3, + max_runs: None, + last_fired_at: Some(1_700_000_000_000), + next_fire_at: Some(persisted_next_fire_at), + paused_at: None, + completed_at: None, + }) + .unwrap(); + + let probe_runtime = RuntimeJob::from_config( + &config_job, + Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap(), + SchedulerMisfirePolicy::Skip, + chrono_tz::Asia::Shanghai, + ) + .unwrap(); + let probe_existing = store + .get_scheduler_job("agent.heartbeat") + .unwrap() + .unwrap(); + let probe_upsert = probe_runtime.to_upsert(); + assert!(scheduler_job_definition_matches(&probe_upsert, &probe_existing)); + + let (agent_task_executor, maintenance_service) = test_scheduler_services(); + let scheduler = Scheduler::new( + MessageBus::new(8), + SchedulerConfig { + enabled: true, + tick_resolution_ms: 1000, + worker_queue_capacity: 64, + misfire_policy: SchedulerMisfirePolicy::Skip, + jobs: vec![config_job], + }, + chrono_tz::Asia::Shanghai, + store.clone(), + agent_task_executor, + maintenance_service, + ); + + scheduler.sync_config_jobs().unwrap(); + + let saved = store + .get_scheduler_job("agent.heartbeat") + .unwrap() + .unwrap(); + + assert_eq!(saved.next_fire_at, Some(persisted_next_fire_at)); + assert_eq!(saved.run_count, 3); + assert_eq!(saved.last_status, Some(SchedulerJobStatus::Ok)); + } + #[tokio::test] async fn silent_agent_task_failure_notifies_primary_chat() { let store = Arc::new(SessionStore::in_memory().unwrap());