feat: 增强调度器任务更新逻辑,支持持久化运行时状态并匹配现有任务定义

This commit is contained in:
ooodc 2026-05-08 09:37:15 +08:00
parent 42eb9f85d5
commit 4f7a8ed645

View File

@ -126,7 +126,11 @@ impl Scheduler {
}) {
let runtime =
RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?;
self.jobs.upsert_scheduler_job(&runtime.to_upsert())?;
let mut upsert = runtime.to_upsert();
if let Some(existing) = self.jobs.get_scheduler_job(&runtime.id)? {
preserve_persisted_runtime(&mut upsert, &existing);
}
self.jobs.upsert_scheduler_job(&upsert)?;
}
Ok(())
}
@ -285,6 +289,60 @@ impl Scheduler {
}
}
fn preserve_persisted_runtime(input: &mut SchedulerJobUpsert, existing: &SchedulerJobRecord) {
if !scheduler_job_definition_matches(input, existing) {
return;
}
input.last_status = existing.last_status.clone();
input.last_error = existing.last_error.clone();
input.run_count = existing.run_count;
input.max_runs = existing.max_runs;
input.last_fired_at = existing.last_fired_at;
input.next_fire_at = existing.next_fire_at;
if existing.state == SchedulerJobState::Running {
input.state = SchedulerJobState::Scheduled;
input.paused_at = None;
input.completed_at = None;
return;
}
input.state = existing.state.clone();
input.paused_at = existing.paused_at;
input.completed_at = existing.completed_at;
}
fn scheduler_job_definition_matches(
input: &SchedulerJobUpsert,
existing: &SchedulerJobRecord,
) -> bool {
let input_schedule = serde_json::from_value::<SchedulerSchedule>(input.schedule.clone()).ok();
let existing_schedule =
deserialize_schedule(&existing.schedule, existing.interval_secs, existing.startup_delay_secs)
.ok();
let input_target = serde_json::from_value::<SchedulerJobTarget>(input.target.clone()).ok();
let existing_target = serde_json::from_value::<SchedulerJobTarget>(existing.target.clone()).ok();
let targets_match = match (input_target, existing_target) {
(Some(input_target), Some(existing_target)) => {
input_target.channel == existing_target.channel
&& input_target.chat_id == existing_target.chat_id
&& input_target.session_chat_id == existing_target.session_chat_id
&& input_target.reply_to == existing_target.reply_to
}
(None, None) => true,
_ => false,
};
input.kind == existing.kind
&& input_schedule == existing_schedule
&& input.interval_secs == existing.interval_secs
&& input.startup_delay_secs == existing.startup_delay_secs
&& targets_match
&& input.payload == existing.payload
&& input.enabled == existing.enabled
}
#[derive(Debug, Clone)]
struct RuntimeJob {
id: String,
@ -314,6 +372,13 @@ impl RuntimeJob {
timezone: Tz,
) -> anyhow::Result<Self> {
let schedule = job.resolved_schedule()?;
let (interval_secs, startup_delay_secs) = match &schedule {
SchedulerSchedule::Interval {
seconds,
startup_delay_secs,
} => (*seconds as i64, *startup_delay_secs as i64),
_ => (0, 0),
};
let initial_state = if job.enabled {
SchedulerJobState::Scheduled
} else {
@ -328,8 +393,8 @@ impl RuntimeJob {
Ok(Self {
id: job.id.clone(),
kind: job.kind.clone(),
interval_secs: job.interval_secs as i64,
startup_delay_secs: job.startup_delay_secs as i64,
interval_secs,
startup_delay_secs,
schedule,
target: job.target.clone(),
payload: job.payload.clone(),
@ -1243,6 +1308,104 @@ mod tests {
assert!(saved.next_fire_at.is_some());
}
#[test]
fn sync_config_jobs_preserves_persisted_next_fire_at_for_matching_jobs() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let persisted_next_fire_at = 1_700_000_300_000;
let config_job = SchedulerJobConfig {
id: "agent.heartbeat".to_string(),
enabled: true,
kind: SchedulerJobKind::OutboundMessage,
schedule: Some(SchedulerSchedule::Interval {
seconds: 300,
startup_delay_secs: 0,
}),
startup_delay_secs: 0,
interval_secs: 0,
target: SchedulerJobTarget {
channel: Some("test-channel".to_string()),
chat_id: Some("oc_demo".to_string()),
session_chat_id: None,
reply_to: None,
},
payload: serde_json::json!({
"content": "ping"
}),
};
store
.upsert_scheduler_job(&SchedulerJobUpsert {
id: "agent.heartbeat".to_string(),
kind: "outbound_message".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 300,
"startup_delay_secs": 0
}),
interval_secs: 300,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "test-channel",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({
"content": "ping"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: Some(SchedulerJobStatus::Ok),
last_error: None,
run_count: 3,
max_runs: None,
last_fired_at: Some(1_700_000_000_000),
next_fire_at: Some(persisted_next_fire_at),
paused_at: None,
completed_at: None,
})
.unwrap();
let probe_runtime = RuntimeJob::from_config(
&config_job,
Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap(),
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap();
let probe_existing = store
.get_scheduler_job("agent.heartbeat")
.unwrap()
.unwrap();
let probe_upsert = probe_runtime.to_upsert();
assert!(scheduler_job_definition_matches(&probe_upsert, &probe_existing));
let (agent_task_executor, maintenance_service) = test_scheduler_services();
let scheduler = Scheduler::new(
MessageBus::new(8),
SchedulerConfig {
enabled: true,
tick_resolution_ms: 1000,
worker_queue_capacity: 64,
misfire_policy: SchedulerMisfirePolicy::Skip,
jobs: vec![config_job],
},
chrono_tz::Asia::Shanghai,
store.clone(),
agent_task_executor,
maintenance_service,
);
scheduler.sync_config_jobs().unwrap();
let saved = store
.get_scheduler_job("agent.heartbeat")
.unwrap()
.unwrap();
assert_eq!(saved.next_fire_at, Some(persisted_next_fire_at));
assert_eq!(saved.run_count, 3);
assert_eq!(saved.last_status, Some(SchedulerJobStatus::Ok));
}
#[tokio::test]
async fn silent_agent_task_failure_notifies_primary_chat() {
let store = Arc::new(SessionStore::in_memory().unwrap());