From 8415e85026787503fd735d5842641ee819ee1f7d Mon Sep 17 00:00:00 2001 From: xiaoski Date: Tue, 5 May 2026 00:12:10 +0800 Subject: [PATCH] feat: add SchedulerStore with SQLite schema and CRUD --- src/scheduler/store.rs | 669 ++++++++++++++++++++++++++++++++++++++++- 1 file changed, 668 insertions(+), 1 deletion(-) diff --git a/src/scheduler/store.rs b/src/scheduler/store.rs index 27cd0ca..c1635c6 100644 --- a/src/scheduler/store.rs +++ b/src/scheduler/store.rs @@ -1 +1,668 @@ -// Stub — will be filled in Task 5 +use sqlx::Row; +use sqlx::SqlitePool; + +use super::types::{JobRun, Schedule, ScheduledJob}; + +/// Persistence layer for scheduled jobs and run history. +/// Uses a shared `sqlx::SqlitePool` (obtained from `crate::storage::Storage::pool()`). +pub struct SchedulerStore; + +impl SchedulerStore { + /// Initialize the scheduler tables. Idempotent (CREATE TABLE IF NOT EXISTS). + pub async fn init(pool: &SqlitePool) -> Result<(), Box> { + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS scheduled_jobs ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + schedule TEXT NOT NULL, + prompt TEXT NOT NULL, + channel TEXT NOT NULL, + chat_id TEXT NOT NULL, + model TEXT, + enabled INTEGER NOT NULL DEFAULT 1, + delete_after_run INTEGER NOT NULL DEFAULT 0, + next_run_at INTEGER NOT NULL, + last_run_at INTEGER, + last_status TEXT, + last_error TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ) + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS job_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id TEXT NOT NULL REFERENCES scheduled_jobs(id) ON DELETE CASCADE, + started_at INTEGER NOT NULL, + finished_at INTEGER NOT NULL, + status TEXT NOT NULL, + output TEXT, + error TEXT, + duration_ms INTEGER NOT NULL + ) + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(enabled, next_run_at)", + ) + .execute(pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_runs_job_id ON job_runs(job_id)") + .execute(pool) + .await?; + + Ok(()) + } + + /// Insert a new job. Returns an error if a job with the same ID already exists. + pub async fn add_job( + pool: &SqlitePool, + job: &ScheduledJob, + ) -> Result<(), Box> { + let schedule_json = serde_json::to_string(&job.schedule)?; + sqlx::query( + r#" + INSERT INTO scheduled_jobs + (id, name, schedule, prompt, channel, chat_id, model, + enabled, delete_after_run, next_run_at, last_run_at, + last_status, last_error, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(&job.id) + .bind(&job.name) + .bind(&schedule_json) + .bind(&job.prompt) + .bind(&job.channel) + .bind(&job.chat_id) + .bind(&job.model) + .bind(job.enabled as i32) + .bind(job.delete_after_run as i32) + .bind(job.next_run_at) + .bind(job.last_run_at) + .bind(&job.last_status) + .bind(&job.last_error) + .bind(job.created_at) + .bind(job.updated_at) + .execute(pool) + .await?; + Ok(()) + } + + /// Fetch a single job by ID. + pub async fn get_job( + pool: &SqlitePool, + id: &str, + ) -> Result> { + let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| format!("job not found: {id}"))?; + Ok(row_to_job(&row)?) + } + + /// List all jobs, ordered by next_run_at ascending. + pub async fn list_jobs( + pool: &SqlitePool, + ) -> Result, Box> { + let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC") + .fetch_all(pool) + .await?; + rows.iter().map(row_to_job).collect() + } + + /// Delete a job (cascades to job_runs). + pub async fn remove_job( + pool: &SqlitePool, + id: &str, + ) -> Result<(), Box> { + sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?") + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + /// Enable or disable a job. + pub async fn set_enabled( + pool: &SqlitePool, + id: &str, + enabled: bool, + ) -> Result<(), Box> { + sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?") + .bind(enabled as i32) + .bind(now_ms()) + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + /// Update selective fields on a job. Pass `None` for fields that should not change. + pub async fn update_job( + pool: &SqlitePool, + id: &str, + prompt: Option, + schedule: Option, + channel: Option, + chat_id: Option, + model: Option, + ) -> Result<(), Box> { + let now = now_ms(); + + if let Some(p) = prompt { + sqlx::query( + "UPDATE scheduled_jobs SET prompt = ?, updated_at = ? WHERE id = ?", + ) + .bind(&p) + .bind(now) + .bind(id) + .execute(pool) + .await?; + } + if let Some(s) = schedule { + let json = serde_json::to_string(&s)?; + sqlx::query( + "UPDATE scheduled_jobs SET schedule = ?, updated_at = ? WHERE id = ?", + ) + .bind(&json) + .bind(now) + .bind(id) + .execute(pool) + .await?; + } + if let Some(c) = channel { + sqlx::query( + "UPDATE scheduled_jobs SET channel = ?, updated_at = ? WHERE id = ?", + ) + .bind(&c) + .bind(now) + .bind(id) + .execute(pool) + .await?; + } + if let Some(c) = chat_id { + sqlx::query( + "UPDATE scheduled_jobs SET chat_id = ?, updated_at = ? WHERE id = ?", + ) + .bind(&c) + .bind(now) + .bind(id) + .execute(pool) + .await?; + } + if let Some(m) = model { + sqlx::query( + "UPDATE scheduled_jobs SET model = ?, updated_at = ? WHERE id = ?", + ) + .bind(&m) + .bind(now) + .bind(id) + .execute(pool) + .await?; + } + Ok(()) + } + + /// Update next_run_at and last_run_at for a job (used during reschedule). + pub async fn set_next_run( + pool: &SqlitePool, + id: &str, + next_run_at: i64, + ) -> Result<(), Box> { + let now = now_ms(); + sqlx::query( + "UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?", + ) + .bind(next_run_at) + .bind(now) + .bind(now) + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + /// Set last_run_at (used when starting job execution). + pub async fn touch_last_run( + pool: &SqlitePool, + id: &str, + at: i64, + ) -> Result<(), Box> { + sqlx::query( + "UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?", + ) + .bind(at) + .bind(at) + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + /// Set last_status and last_error after job completion. + pub async fn set_last_status( + pool: &SqlitePool, + id: &str, + status: &str, + error: Option<&str>, + ) -> Result<(), Box> { + let now = now_ms(); + sqlx::query( + "UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?", + ) + .bind(status) + .bind(error) + .bind(now) + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + /// Fetch enabled jobs whose next_run_at <= now, up to `limit`. + pub async fn due_jobs( + pool: &SqlitePool, + now: i64, + limit: usize, + ) -> Result, Box> { + let rows = sqlx::query( + "SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?", + ) + .bind(now) + .bind(limit as i64) + .fetch_all(pool) + .await?; + rows.iter().map(row_to_job).collect() + } + + /// Record a run execution. + pub async fn record_run( + pool: &SqlitePool, + run: &JobRun, + ) -> Result<(), Box> { + sqlx::query( + r#" + INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms) + VALUES (?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(&run.job_id) + .bind(run.started_at) + .bind(run.finished_at) + .bind(&run.status) + .bind(&run.output) + .bind(&run.error) + .bind(run.duration_ms) + .execute(pool) + .await?; + Ok(()) + } + + /// List the most recent `limit` runs for a job, newest first. + pub async fn list_runs( + pool: &SqlitePool, + job_id: &str, + limit: usize, + ) -> Result, Box> { + let rows = sqlx::query( + "SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?", + ) + .bind(job_id) + .bind(limit as i64) + .fetch_all(pool) + .await?; + rows.iter() + .map(|r| { + Ok(JobRun { + id: r.try_get("id")?, + job_id: r.try_get("job_id")?, + started_at: r.try_get("started_at")?, + finished_at: r.try_get("finished_at")?, + status: r.try_get("status")?, + output: r.try_get("output")?, + error: r.try_get("error")?, + duration_ms: r.try_get("duration_ms")?, + }) + }) + .collect() + } + + /// Delete disabled jobs whose updated_at is before `before`. + pub async fn cleanup_disabled( + pool: &SqlitePool, + before: i64, + ) -> Result<(), Box> { + sqlx::query( + "DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?", + ) + .bind(before) + .execute(pool) + .await?; + Ok(()) + } +} + +fn now_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64 +} + +fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> Result> { + let schedule_json: String = row.try_get("schedule")?; + let schedule: Schedule = serde_json::from_str(&schedule_json)?; + Ok(ScheduledJob { + id: row.try_get("id")?, + name: row.try_get("name")?, + schedule, + prompt: row.try_get("prompt")?, + channel: row.try_get("channel")?, + chat_id: row.try_get("chat_id")?, + model: row.try_get("model")?, + enabled: row.try_get::("enabled")? != 0, + delete_after_run: row.try_get::("delete_after_run")? != 0, + next_run_at: row.try_get("next_run_at")?, + last_run_at: row.try_get("last_run_at")?, + last_status: row.try_get("last_status")?, + last_error: row.try_get("last_error")?, + created_at: row.try_get("created_at")?, + updated_at: row.try_get("updated_at")?, + }) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::scheduler::types::{JobRun, Schedule, ScheduledJob}; + use sqlx::SqlitePool; + + fn now() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as i64 + } + + async fn setup_pool() -> SqlitePool { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + SchedulerStore::init(&pool).await.unwrap(); + pool + } + + #[tokio::test] + async fn test_init_creates_tables() { + let pool = setup_pool().await; + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scheduled_jobs") + .fetch_one(&pool) + .await + .unwrap(); + assert_eq!(row.0, 0); + } + + #[tokio::test] + async fn test_add_and_get_job() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-1".into(), + name: "test job".into(), + schedule: Schedule::Every { every_ms: 3600000 }, + prompt: "say hello".into(), + channel: "cli_chat".into(), + chat_id: "conn-1".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t + 3600000, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + let got = SchedulerStore::get_job(&pool, "job-1").await.unwrap(); + assert_eq!(got.id, "job-1"); + assert_eq!(got.name, "test job"); + assert_eq!(got.prompt, "say hello"); + } + + #[tokio::test] + async fn test_list_jobs() { + let pool = setup_pool().await; + let t = now(); + for i in 0..3 { + let job = ScheduledJob { + id: format!("job-{}", i), + name: format!("job {}", i), + schedule: Schedule::Every { every_ms: 3600000 }, + prompt: "ping".into(), + channel: "cli_chat".into(), + chat_id: "conn-1".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t + 1000, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + } + let jobs = SchedulerStore::list_jobs(&pool).await.unwrap(); + assert_eq!(jobs.len(), 3); + } + + #[tokio::test] + async fn test_remove_job() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-rm".into(), + name: "remove me".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + SchedulerStore::remove_job(&pool, "job-rm").await.unwrap(); + let result = SchedulerStore::get_job(&pool, "job-rm").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_set_enabled() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-toggle".into(), + name: "toggle".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + SchedulerStore::set_enabled(&pool, "job-toggle", false).await.unwrap(); + let got = SchedulerStore::get_job(&pool, "job-toggle").await.unwrap(); + assert!(!got.enabled); + } + + #[tokio::test] + async fn test_due_jobs_only_returns_enabled_and_overdue() { + let pool = setup_pool().await; + let t = now(); + let jobs = vec![ + ScheduledJob { + id: "due".into(), + name: "due".into(), + schedule: Schedule::At { at: t }, + prompt: "1".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t - 1000, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }, + ScheduledJob { + id: "future".into(), + name: "future".into(), + schedule: Schedule::At { at: t + 99999999 }, + prompt: "2".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t + 99999999, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }, + ScheduledJob { + id: "disabled-due".into(), + name: "disabled due".into(), + schedule: Schedule::At { at: t }, + prompt: "3".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: false, + delete_after_run: false, + next_run_at: t - 1000, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }, + ]; + for j in &jobs { + SchedulerStore::add_job(&pool, j).await.unwrap(); + } + let due = SchedulerStore::due_jobs(&pool, t, 10).await.unwrap(); + assert_eq!(due.len(), 1); + assert_eq!(due[0].id, "due"); + } + + #[tokio::test] + async fn test_record_run_and_list_runs() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-run".into(), + name: "run test".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + + let run = JobRun { + id: 0, + job_id: "job-run".into(), + started_at: t, + finished_at: t + 500, + status: "ok".into(), + output: Some("hello".into()), + error: None, + duration_ms: 500, + }; + SchedulerStore::record_run(&pool, &run).await.unwrap(); + let runs = SchedulerStore::list_runs(&pool, "job-run", 10) + .await + .unwrap(); + assert_eq!(runs.len(), 1); + assert_eq!(runs[0].status, "ok"); + assert_eq!(runs[0].output.as_deref(), Some("hello")); + } + + #[tokio::test] + async fn test_update_job() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-update".into(), + name: "old name".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "old prompt".into(), + channel: "feishu".into(), + chat_id: "oc_1".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + SchedulerStore::update_job( + &pool, + "job-update", + Some("new prompt".into()), + Some(Schedule::Every { every_ms: 60000 }), + None, + None, + None, + ) + .await + .unwrap(); + let got = SchedulerStore::get_job(&pool, "job-update").await.unwrap(); + assert_eq!(got.prompt, "new prompt"); + } +}