diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index cf7d4ca..fd7487d 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -13,7 +13,8 @@ use crate::config::{ SchedulerMisfirePolicy, SchedulerSchedule, }; use crate::storage::{ - SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore, + SchedulerJobRecord, SchedulerJobRepository, SchedulerJobState, SchedulerJobStatus, + SchedulerJobUpsert, }; #[derive(Debug, Clone, Default)] @@ -61,7 +62,7 @@ pub struct Scheduler { bus: Arc, config: SchedulerConfig, timezone: Tz, - store: Arc, + jobs: Arc, agent_task_executor: Arc, maintenance_executor: Arc, } @@ -71,7 +72,7 @@ impl Scheduler { bus: Arc, config: SchedulerConfig, timezone: Tz, - store: Arc, + jobs: Arc, agent_task_executor: A, maintenance_executor: M, ) -> Self @@ -83,7 +84,7 @@ impl Scheduler { bus, config, timezone, - store, + jobs, agent_task_executor: Arc::new(agent_task_executor), maintenance_executor: Arc::new(maintenance_executor), } @@ -128,14 +129,14 @@ impl Scheduler { }) { let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?; - self.store.upsert_scheduler_job(&runtime.to_upsert())?; + self.jobs.upsert_scheduler_job(&runtime.to_upsert())?; } Ok(()) } async fn process_tick(&self) -> anyhow::Result<()> { let now = Utc::now(); - let jobs = self.store.list_scheduler_jobs(true)?; + let jobs = self.jobs.list_scheduler_jobs(true)?; for record in jobs { let Some(mut job) = @@ -145,7 +146,7 @@ impl Scheduler { }; if record.next_fire_at.is_none() && job.next_fire_at.is_some() { - self.store.update_scheduler_job_runtime( + self.jobs.update_scheduler_job_runtime( &job.id, job.state.clone(), job.last_status.clone(), @@ -162,7 +163,7 @@ impl Scheduler { continue; } - self.store.update_scheduler_job_runtime( + self.jobs.update_scheduler_job_runtime( &job.id, SchedulerJobState::Running, job.last_status.clone(), @@ -192,7 +193,7 @@ impl Scheduler { tracing::error!(job_id = %job.id, error = %error, "Scheduler job failed"); } - self.store.update_scheduler_job_runtime( + self.jobs.update_scheduler_job_runtime( &job.id, job.state.clone(), status, diff --git a/src/storage/ports.rs b/src/storage/ports.rs index 12b46c0..697e93d 100644 --- a/src/storage/ports.rs +++ b/src/storage/ports.rs @@ -1,6 +1,6 @@ use super::{ - MemoryRecord, MemoryUpsert, SchedulerJobRecord, SchedulerJobUpsert, SkillEventRecord, - StorageError, + MemoryRecord, MemoryUpsert, SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, + SchedulerJobUpsert, SkillEventRecord, StorageError, }; pub trait MemoryRepository: Send + Sync + 'static { @@ -56,6 +56,19 @@ pub trait SchedulerJobRepository: Send + Sync + 'static { ) -> Result, StorageError>; fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError>; + + fn update_scheduler_job_runtime( + &self, + job_id: &str, + state: SchedulerJobState, + last_status: Option, + last_error: Option<&str>, + run_count: i64, + last_fired_at: Option, + next_fire_at: Option, + paused_at: Option, + completed_at: Option, + ) -> Result<(), StorageError>; } pub trait SkillEventRepository: Send + Sync + 'static { @@ -148,6 +161,32 @@ impl SchedulerJobRepository for super::SessionStore { fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> { super::SessionStore::delete_scheduler_job(self, job_id) } + + fn update_scheduler_job_runtime( + &self, + job_id: &str, + state: SchedulerJobState, + last_status: Option, + last_error: Option<&str>, + run_count: i64, + last_fired_at: Option, + next_fire_at: Option, + paused_at: Option, + completed_at: Option, + ) -> Result<(), StorageError> { + super::SessionStore::update_scheduler_job_runtime( + self, + job_id, + state, + last_status, + last_error, + run_count, + last_fired_at, + next_fire_at, + paused_at, + completed_at, + ) + } } impl SkillEventRepository for super::SessionStore {