feat: add SchedulerStore with SQLite schema and CRUD

This commit is contained in:
xiaoski 2026-05-05 00:12:10 +08:00
parent eccae20a0a
commit 8415e85026

View File

@ -1 +1,668 @@
// Stub — will be filled in Task 5 use sqlx::Row;
use sqlx::SqlitePool;
use super::types::{JobRun, Schedule, ScheduledJob};
/// Persistence layer for scheduled jobs and run history.
/// Uses a shared `sqlx::SqlitePool` (obtained from `crate::storage::Storage::pool()`).
pub struct SchedulerStore;
impl SchedulerStore {
/// Initialize the scheduler tables. Idempotent (CREATE TABLE IF NOT EXISTS).
pub async fn init(pool: &SqlitePool) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS scheduled_jobs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
schedule TEXT NOT NULL,
prompt TEXT NOT NULL,
channel TEXT NOT NULL,
chat_id TEXT NOT NULL,
model TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
delete_after_run INTEGER NOT NULL DEFAULT 0,
next_run_at INTEGER NOT NULL,
last_run_at INTEGER,
last_status TEXT,
last_error TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS job_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL REFERENCES scheduled_jobs(id) ON DELETE CASCADE,
started_at INTEGER NOT NULL,
finished_at INTEGER NOT NULL,
status TEXT NOT NULL,
output TEXT,
error TEXT,
duration_ms INTEGER NOT NULL
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(enabled, next_run_at)",
)
.execute(pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_runs_job_id ON job_runs(job_id)")
.execute(pool)
.await?;
Ok(())
}
/// Insert a new job. Returns an error if a job with the same ID already exists.
pub async fn add_job(
pool: &SqlitePool,
job: &ScheduledJob,
) -> Result<(), Box<dyn std::error::Error>> {
let schedule_json = serde_json::to_string(&job.schedule)?;
sqlx::query(
r#"
INSERT INTO scheduled_jobs
(id, name, schedule, prompt, channel, chat_id, model,
enabled, delete_after_run, next_run_at, last_run_at,
last_status, last_error, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(&job.id)
.bind(&job.name)
.bind(&schedule_json)
.bind(&job.prompt)
.bind(&job.channel)
.bind(&job.chat_id)
.bind(&job.model)
.bind(job.enabled as i32)
.bind(job.delete_after_run as i32)
.bind(job.next_run_at)
.bind(job.last_run_at)
.bind(&job.last_status)
.bind(&job.last_error)
.bind(job.created_at)
.bind(job.updated_at)
.execute(pool)
.await?;
Ok(())
}
/// Fetch a single job by ID.
pub async fn get_job(
pool: &SqlitePool,
id: &str,
) -> Result<ScheduledJob, Box<dyn std::error::Error>> {
let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?")
.bind(id)
.fetch_optional(pool)
.await?
.ok_or_else(|| format!("job not found: {id}"))?;
Ok(row_to_job(&row)?)
}
/// List all jobs, ordered by next_run_at ascending.
pub async fn list_jobs(
pool: &SqlitePool,
) -> Result<Vec<ScheduledJob>, Box<dyn std::error::Error>> {
let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC")
.fetch_all(pool)
.await?;
rows.iter().map(row_to_job).collect()
}
/// Delete a job (cascades to job_runs).
pub async fn remove_job(
pool: &SqlitePool,
id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?")
.bind(id)
.execute(pool)
.await?;
Ok(())
}
/// Enable or disable a job.
pub async fn set_enabled(
pool: &SqlitePool,
id: &str,
enabled: bool,
) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?")
.bind(enabled as i32)
.bind(now_ms())
.bind(id)
.execute(pool)
.await?;
Ok(())
}
/// Update selective fields on a job. Pass `None` for fields that should not change.
pub async fn update_job(
pool: &SqlitePool,
id: &str,
prompt: Option<String>,
schedule: Option<Schedule>,
channel: Option<String>,
chat_id: Option<String>,
model: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let now = now_ms();
if let Some(p) = prompt {
sqlx::query(
"UPDATE scheduled_jobs SET prompt = ?, updated_at = ? WHERE id = ?",
)
.bind(&p)
.bind(now)
.bind(id)
.execute(pool)
.await?;
}
if let Some(s) = schedule {
let json = serde_json::to_string(&s)?;
sqlx::query(
"UPDATE scheduled_jobs SET schedule = ?, updated_at = ? WHERE id = ?",
)
.bind(&json)
.bind(now)
.bind(id)
.execute(pool)
.await?;
}
if let Some(c) = channel {
sqlx::query(
"UPDATE scheduled_jobs SET channel = ?, updated_at = ? WHERE id = ?",
)
.bind(&c)
.bind(now)
.bind(id)
.execute(pool)
.await?;
}
if let Some(c) = chat_id {
sqlx::query(
"UPDATE scheduled_jobs SET chat_id = ?, updated_at = ? WHERE id = ?",
)
.bind(&c)
.bind(now)
.bind(id)
.execute(pool)
.await?;
}
if let Some(m) = model {
sqlx::query(
"UPDATE scheduled_jobs SET model = ?, updated_at = ? WHERE id = ?",
)
.bind(&m)
.bind(now)
.bind(id)
.execute(pool)
.await?;
}
Ok(())
}
/// Update next_run_at and last_run_at for a job (used during reschedule).
pub async fn set_next_run(
pool: &SqlitePool,
id: &str,
next_run_at: i64,
) -> Result<(), Box<dyn std::error::Error>> {
let now = now_ms();
sqlx::query(
"UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?",
)
.bind(next_run_at)
.bind(now)
.bind(now)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
/// Set last_run_at (used when starting job execution).
pub async fn touch_last_run(
pool: &SqlitePool,
id: &str,
at: i64,
) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query(
"UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?",
)
.bind(at)
.bind(at)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
/// Set last_status and last_error after job completion.
pub async fn set_last_status(
pool: &SqlitePool,
id: &str,
status: &str,
error: Option<&str>,
) -> Result<(), Box<dyn std::error::Error>> {
let now = now_ms();
sqlx::query(
"UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?",
)
.bind(status)
.bind(error)
.bind(now)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
/// Fetch enabled jobs whose next_run_at <= now, up to `limit`.
pub async fn due_jobs(
pool: &SqlitePool,
now: i64,
limit: usize,
) -> Result<Vec<ScheduledJob>, Box<dyn std::error::Error>> {
let rows = sqlx::query(
"SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?",
)
.bind(now)
.bind(limit as i64)
.fetch_all(pool)
.await?;
rows.iter().map(row_to_job).collect()
}
/// Record a run execution.
pub async fn record_run(
pool: &SqlitePool,
run: &JobRun,
) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query(
r#"
INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(&run.job_id)
.bind(run.started_at)
.bind(run.finished_at)
.bind(&run.status)
.bind(&run.output)
.bind(&run.error)
.bind(run.duration_ms)
.execute(pool)
.await?;
Ok(())
}
/// List the most recent `limit` runs for a job, newest first.
pub async fn list_runs(
pool: &SqlitePool,
job_id: &str,
limit: usize,
) -> Result<Vec<JobRun>, Box<dyn std::error::Error>> {
let rows = sqlx::query(
"SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?",
)
.bind(job_id)
.bind(limit as i64)
.fetch_all(pool)
.await?;
rows.iter()
.map(|r| {
Ok(JobRun {
id: r.try_get("id")?,
job_id: r.try_get("job_id")?,
started_at: r.try_get("started_at")?,
finished_at: r.try_get("finished_at")?,
status: r.try_get("status")?,
output: r.try_get("output")?,
error: r.try_get("error")?,
duration_ms: r.try_get("duration_ms")?,
})
})
.collect()
}
/// Delete disabled jobs whose updated_at is before `before`.
pub async fn cleanup_disabled(
pool: &SqlitePool,
before: i64,
) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query(
"DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?",
)
.bind(before)
.execute(pool)
.await?;
Ok(())
}
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> Result<ScheduledJob, Box<dyn std::error::Error>> {
let schedule_json: String = row.try_get("schedule")?;
let schedule: Schedule = serde_json::from_str(&schedule_json)?;
Ok(ScheduledJob {
id: row.try_get("id")?,
name: row.try_get("name")?,
schedule,
prompt: row.try_get("prompt")?,
channel: row.try_get("channel")?,
chat_id: row.try_get("chat_id")?,
model: row.try_get("model")?,
enabled: row.try_get::<i32, _>("enabled")? != 0,
delete_after_run: row.try_get::<i32, _>("delete_after_run")? != 0,
next_run_at: row.try_get("next_run_at")?,
last_run_at: row.try_get("last_run_at")?,
last_status: row.try_get("last_status")?,
last_error: row.try_get("last_error")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
})
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::types::{JobRun, Schedule, ScheduledJob};
use sqlx::SqlitePool;
fn now() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}
async fn setup_pool() -> SqlitePool {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
SchedulerStore::init(&pool).await.unwrap();
pool
}
#[tokio::test]
async fn test_init_creates_tables() {
let pool = setup_pool().await;
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scheduled_jobs")
.fetch_one(&pool)
.await
.unwrap();
assert_eq!(row.0, 0);
}
#[tokio::test]
async fn test_add_and_get_job() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-1".into(),
name: "test job".into(),
schedule: Schedule::Every { every_ms: 3600000 },
prompt: "say hello".into(),
channel: "cli_chat".into(),
chat_id: "conn-1".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t + 3600000,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
let got = SchedulerStore::get_job(&pool, "job-1").await.unwrap();
assert_eq!(got.id, "job-1");
assert_eq!(got.name, "test job");
assert_eq!(got.prompt, "say hello");
}
#[tokio::test]
async fn test_list_jobs() {
let pool = setup_pool().await;
let t = now();
for i in 0..3 {
let job = ScheduledJob {
id: format!("job-{}", i),
name: format!("job {}", i),
schedule: Schedule::Every { every_ms: 3600000 },
prompt: "ping".into(),
channel: "cli_chat".into(),
chat_id: "conn-1".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t + 1000,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
}
let jobs = SchedulerStore::list_jobs(&pool).await.unwrap();
assert_eq!(jobs.len(), 3);
}
#[tokio::test]
async fn test_remove_job() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-rm".into(),
name: "remove me".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(),
channel: "cli_chat".into(),
chat_id: "c".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
SchedulerStore::remove_job(&pool, "job-rm").await.unwrap();
let result = SchedulerStore::get_job(&pool, "job-rm").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_set_enabled() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-toggle".into(),
name: "toggle".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(),
channel: "cli_chat".into(),
chat_id: "c".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
SchedulerStore::set_enabled(&pool, "job-toggle", false).await.unwrap();
let got = SchedulerStore::get_job(&pool, "job-toggle").await.unwrap();
assert!(!got.enabled);
}
#[tokio::test]
async fn test_due_jobs_only_returns_enabled_and_overdue() {
let pool = setup_pool().await;
let t = now();
let jobs = vec![
ScheduledJob {
id: "due".into(),
name: "due".into(),
schedule: Schedule::At { at: t },
prompt: "1".into(),
channel: "cli_chat".into(),
chat_id: "c".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t - 1000,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
},
ScheduledJob {
id: "future".into(),
name: "future".into(),
schedule: Schedule::At { at: t + 99999999 },
prompt: "2".into(),
channel: "cli_chat".into(),
chat_id: "c".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t + 99999999,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
},
ScheduledJob {
id: "disabled-due".into(),
name: "disabled due".into(),
schedule: Schedule::At { at: t },
prompt: "3".into(),
channel: "cli_chat".into(),
chat_id: "c".into(),
model: None,
enabled: false,
delete_after_run: false,
next_run_at: t - 1000,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
},
];
for j in &jobs {
SchedulerStore::add_job(&pool, j).await.unwrap();
}
let due = SchedulerStore::due_jobs(&pool, t, 10).await.unwrap();
assert_eq!(due.len(), 1);
assert_eq!(due[0].id, "due");
}
#[tokio::test]
async fn test_record_run_and_list_runs() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-run".into(),
name: "run test".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(),
channel: "cli_chat".into(),
chat_id: "c".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
let run = JobRun {
id: 0,
job_id: "job-run".into(),
started_at: t,
finished_at: t + 500,
status: "ok".into(),
output: Some("hello".into()),
error: None,
duration_ms: 500,
};
SchedulerStore::record_run(&pool, &run).await.unwrap();
let runs = SchedulerStore::list_runs(&pool, "job-run", 10)
.await
.unwrap();
assert_eq!(runs.len(), 1);
assert_eq!(runs[0].status, "ok");
assert_eq!(runs[0].output.as_deref(), Some("hello"));
}
#[tokio::test]
async fn test_update_job() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-update".into(),
name: "old name".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "old prompt".into(),
channel: "feishu".into(),
chat_id: "oc_1".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
SchedulerStore::update_job(
&pool,
"job-update",
Some("new prompt".into()),
Some(Schedule::Every { every_ms: 60000 }),
None,
None,
None,
)
.await
.unwrap();
let got = SchedulerStore::get_job(&pool, "job-update").await.unwrap();
assert_eq!(got.prompt, "new prompt");
}
}