From f48b132bb9720c5d7990964072a7dd4840cf2b87 Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Tue, 28 Apr 2026 15:40:50 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=E8=B0=83=E5=BA=A6?= =?UTF-8?q?=E5=99=A8=E5=AD=98=E5=82=A8=E9=80=BB=E8=BE=91=EF=BC=8C=E4=BD=BF?= =?UTF-8?q?=E7=94=A8=20SchedulerJobRepository=20=E6=9B=BF=E4=BB=A3=20Sessi?= =?UTF-8?q?onStore=EF=BC=8C=E6=B7=BB=E5=8A=A0=E6=9B=B4=E6=96=B0=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E4=BD=9C=E4=B8=9A=E8=BF=90=E8=A1=8C=E6=97=B6=E7=9A=84?= =?UTF-8?q?=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/scheduler/mod.rs | 19 ++++++++++--------- src/storage/ports.rs | 43 +++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 51 insertions(+), 11 deletions(-) diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index cf7d4ca..fd7487d 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -13,7 +13,8 @@ use crate::config::{ SchedulerMisfirePolicy, SchedulerSchedule, }; use crate::storage::{ - SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore, + SchedulerJobRecord, SchedulerJobRepository, SchedulerJobState, SchedulerJobStatus, + SchedulerJobUpsert, }; #[derive(Debug, Clone, Default)] @@ -61,7 +62,7 @@ pub struct Scheduler { bus: Arc, config: SchedulerConfig, timezone: Tz, - store: Arc, + jobs: Arc, agent_task_executor: Arc, maintenance_executor: Arc, } @@ -71,7 +72,7 @@ impl Scheduler { bus: Arc, config: SchedulerConfig, timezone: Tz, - store: Arc, + jobs: Arc, agent_task_executor: A, maintenance_executor: M, ) -> Self @@ -83,7 +84,7 @@ impl Scheduler { bus, config, timezone, - store, + jobs, agent_task_executor: Arc::new(agent_task_executor), maintenance_executor: Arc::new(maintenance_executor), } @@ -128,14 +129,14 @@ impl Scheduler { }) { let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?; - self.store.upsert_scheduler_job(&runtime.to_upsert())?; + self.jobs.upsert_scheduler_job(&runtime.to_upsert())?; } Ok(()) } async fn process_tick(&self) -> anyhow::Result<()> { let now = Utc::now(); - let jobs = self.store.list_scheduler_jobs(true)?; + let jobs = self.jobs.list_scheduler_jobs(true)?; for record in jobs { let Some(mut job) = @@ -145,7 +146,7 @@ impl Scheduler { }; if record.next_fire_at.is_none() && job.next_fire_at.is_some() { - self.store.update_scheduler_job_runtime( + self.jobs.update_scheduler_job_runtime( &job.id, job.state.clone(), job.last_status.clone(), @@ -162,7 +163,7 @@ impl Scheduler { continue; } - self.store.update_scheduler_job_runtime( + self.jobs.update_scheduler_job_runtime( &job.id, SchedulerJobState::Running, job.last_status.clone(), @@ -192,7 +193,7 @@ impl Scheduler { tracing::error!(job_id = %job.id, error = %error, "Scheduler job failed"); } - self.store.update_scheduler_job_runtime( + self.jobs.update_scheduler_job_runtime( &job.id, job.state.clone(), status, diff --git a/src/storage/ports.rs b/src/storage/ports.rs index 12b46c0..697e93d 100644 --- a/src/storage/ports.rs +++ b/src/storage/ports.rs @@ -1,6 +1,6 @@ use super::{ - MemoryRecord, MemoryUpsert, SchedulerJobRecord, SchedulerJobUpsert, SkillEventRecord, - StorageError, + MemoryRecord, MemoryUpsert, SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, + SchedulerJobUpsert, SkillEventRecord, StorageError, }; pub trait MemoryRepository: Send + Sync + 'static { @@ -56,6 +56,19 @@ pub trait SchedulerJobRepository: Send + Sync + 'static { ) -> Result, StorageError>; fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError>; + + fn update_scheduler_job_runtime( + &self, + job_id: &str, + state: SchedulerJobState, + last_status: Option, + last_error: Option<&str>, + run_count: i64, + last_fired_at: Option, + next_fire_at: Option, + paused_at: Option, + completed_at: Option, + ) -> Result<(), StorageError>; } pub trait SkillEventRepository: Send + Sync + 'static { @@ -148,6 +161,32 @@ impl SchedulerJobRepository for super::SessionStore { fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> { super::SessionStore::delete_scheduler_job(self, job_id) } + + fn update_scheduler_job_runtime( + &self, + job_id: &str, + state: SchedulerJobState, + last_status: Option, + last_error: Option<&str>, + run_count: i64, + last_fired_at: Option, + next_fire_at: Option, + paused_at: Option, + completed_at: Option, + ) -> Result<(), StorageError> { + super::SessionStore::update_scheduler_job_runtime( + self, + job_id, + state, + last_status, + last_error, + run_count, + last_fired_at, + next_fire_at, + paused_at, + completed_at, + ) + } } impl SkillEventRepository for super::SessionStore {