feat: 重构调度器存储逻辑,使用 SchedulerJobRepository 替代 SessionStore,添加更新调度作业运行时的方法

This commit is contained in:
ooodc 2026-04-28 15:40:50 +08:00
parent 90e44950cb
commit f48b132bb9
2 changed files with 51 additions and 11 deletions

View File

@ -13,7 +13,8 @@ use crate::config::{
SchedulerMisfirePolicy, SchedulerSchedule, SchedulerMisfirePolicy, SchedulerSchedule,
}; };
use crate::storage::{ use crate::storage::{
SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore, SchedulerJobRecord, SchedulerJobRepository, SchedulerJobState, SchedulerJobStatus,
SchedulerJobUpsert,
}; };
#[derive(Debug, Clone, Default)] #[derive(Debug, Clone, Default)]
@ -61,7 +62,7 @@ pub struct Scheduler {
bus: Arc<MessageBus>, bus: Arc<MessageBus>,
config: SchedulerConfig, config: SchedulerConfig,
timezone: Tz, timezone: Tz,
store: Arc<SessionStore>, jobs: Arc<dyn SchedulerJobRepository>,
agent_task_executor: Arc<dyn AgentTaskExecutor>, agent_task_executor: Arc<dyn AgentTaskExecutor>,
maintenance_executor: Arc<dyn MaintenanceExecutor>, maintenance_executor: Arc<dyn MaintenanceExecutor>,
} }
@ -71,7 +72,7 @@ impl Scheduler {
bus: Arc<MessageBus>, bus: Arc<MessageBus>,
config: SchedulerConfig, config: SchedulerConfig,
timezone: Tz, timezone: Tz,
store: Arc<SessionStore>, jobs: Arc<dyn SchedulerJobRepository>,
agent_task_executor: A, agent_task_executor: A,
maintenance_executor: M, maintenance_executor: M,
) -> Self ) -> Self
@ -83,7 +84,7 @@ impl Scheduler {
bus, bus,
config, config,
timezone, timezone,
store, jobs,
agent_task_executor: Arc::new(agent_task_executor), agent_task_executor: Arc::new(agent_task_executor),
maintenance_executor: Arc::new(maintenance_executor), maintenance_executor: Arc::new(maintenance_executor),
} }
@ -128,14 +129,14 @@ impl Scheduler {
}) { }) {
let runtime = let runtime =
RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?; RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?;
self.store.upsert_scheduler_job(&runtime.to_upsert())?; self.jobs.upsert_scheduler_job(&runtime.to_upsert())?;
} }
Ok(()) Ok(())
} }
async fn process_tick(&self) -> anyhow::Result<()> { async fn process_tick(&self) -> anyhow::Result<()> {
let now = Utc::now(); let now = Utc::now();
let jobs = self.store.list_scheduler_jobs(true)?; let jobs = self.jobs.list_scheduler_jobs(true)?;
for record in jobs { for record in jobs {
let Some(mut job) = let Some(mut job) =
@ -145,7 +146,7 @@ impl Scheduler {
}; };
if record.next_fire_at.is_none() && job.next_fire_at.is_some() { if record.next_fire_at.is_none() && job.next_fire_at.is_some() {
self.store.update_scheduler_job_runtime( self.jobs.update_scheduler_job_runtime(
&job.id, &job.id,
job.state.clone(), job.state.clone(),
job.last_status.clone(), job.last_status.clone(),
@ -162,7 +163,7 @@ impl Scheduler {
continue; continue;
} }
self.store.update_scheduler_job_runtime( self.jobs.update_scheduler_job_runtime(
&job.id, &job.id,
SchedulerJobState::Running, SchedulerJobState::Running,
job.last_status.clone(), job.last_status.clone(),
@ -192,7 +193,7 @@ impl Scheduler {
tracing::error!(job_id = %job.id, error = %error, "Scheduler job failed"); tracing::error!(job_id = %job.id, error = %error, "Scheduler job failed");
} }
self.store.update_scheduler_job_runtime( self.jobs.update_scheduler_job_runtime(
&job.id, &job.id,
job.state.clone(), job.state.clone(),
status, status,

View File

@ -1,6 +1,6 @@
use super::{ use super::{
MemoryRecord, MemoryUpsert, SchedulerJobRecord, SchedulerJobUpsert, SkillEventRecord, MemoryRecord, MemoryUpsert, SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus,
StorageError, SchedulerJobUpsert, SkillEventRecord, StorageError,
}; };
pub trait MemoryRepository: Send + Sync + 'static { pub trait MemoryRepository: Send + Sync + 'static {
@ -56,6 +56,19 @@ pub trait SchedulerJobRepository: Send + Sync + 'static {
) -> Result<Vec<SchedulerJobRecord>, StorageError>; ) -> Result<Vec<SchedulerJobRecord>, StorageError>;
fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError>; fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError>;
fn update_scheduler_job_runtime(
&self,
job_id: &str,
state: SchedulerJobState,
last_status: Option<SchedulerJobStatus>,
last_error: Option<&str>,
run_count: i64,
last_fired_at: Option<i64>,
next_fire_at: Option<i64>,
paused_at: Option<i64>,
completed_at: Option<i64>,
) -> Result<(), StorageError>;
} }
pub trait SkillEventRepository: Send + Sync + 'static { pub trait SkillEventRepository: Send + Sync + 'static {
@ -148,6 +161,32 @@ impl SchedulerJobRepository for super::SessionStore {
fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> { fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> {
super::SessionStore::delete_scheduler_job(self, job_id) super::SessionStore::delete_scheduler_job(self, job_id)
} }
fn update_scheduler_job_runtime(
&self,
job_id: &str,
state: SchedulerJobState,
last_status: Option<SchedulerJobStatus>,
last_error: Option<&str>,
run_count: i64,
last_fired_at: Option<i64>,
next_fire_at: Option<i64>,
paused_at: Option<i64>,
completed_at: Option<i64>,
) -> Result<(), StorageError> {
super::SessionStore::update_scheduler_job_runtime(
self,
job_id,
state,
last_status,
last_error,
run_count,
last_fired_at,
next_fire_at,
paused_at,
completed_at,
)
}
} }
impl SkillEventRepository for super::SessionStore { impl SkillEventRepository for super::SessionStore {