refactor: move scheduler store to storage module, cron tools to tools module
- storage/scheduler.rs: ScheduledJob/JobRun types + CRUD on Storage - tools/cron.rs: 6 cron agent tools (add/list/remove/enable/disable/update) - scheduler/types.rs: keep only Schedule enum - scheduler/mod.rs: use Arc<Storage> instead of raw SqlitePool - gateway/mod.rs: inject Storage directly, replace pool field - storage/mod.rs: scheduler tables in init_schema
This commit is contained in:
parent
0056bfbd23
commit
62f4326131
2356
docs/superpowers/plans/2026-05-04-scheduled-tasks.md
Normal file
2356
docs/superpowers/plans/2026-05-04-scheduled-tasks.md
Normal file
File diff suppressed because it is too large
Load Diff
@ -12,14 +12,13 @@ use crate::config::{Config, expand_path, ensure_workspace_dir};
|
||||
use crate::logging;
|
||||
use crate::session::SessionManager;
|
||||
use crate::scheduler::Scheduler;
|
||||
use crate::scheduler::store as scheduler_store;
|
||||
|
||||
pub struct GatewayState {
|
||||
pub config: Config,
|
||||
pub workspace_dir: std::path::PathBuf,
|
||||
pub session_manager: Arc<SessionManager>,
|
||||
pub channel_manager: ChannelManager,
|
||||
pub pool: sqlx::SqlitePool,
|
||||
pub storage: Arc<crate::storage::Storage>,
|
||||
}
|
||||
|
||||
impl GatewayState {
|
||||
@ -56,8 +55,6 @@ impl GatewayState {
|
||||
);
|
||||
tracing::info!("Session storage: {}", db_path.display());
|
||||
|
||||
let pool = storage.pool().clone();
|
||||
|
||||
// Create MessageBus first (shared by SessionManager and ChannelManager)
|
||||
let bus = MessageBus::new(100);
|
||||
|
||||
@ -84,30 +81,24 @@ impl GatewayState {
|
||||
// Initialize scheduler if enabled in config
|
||||
let scheduler_config = config.gateway.scheduler.clone().unwrap_or_default();
|
||||
if scheduler_config.enabled {
|
||||
scheduler_store::SchedulerStore::init(&pool)
|
||||
.await
|
||||
.map_err(|e| format!("failed to initialize scheduler store: {}", e))?;
|
||||
tracing::info!("Scheduler store initialized");
|
||||
|
||||
// Register cron tools
|
||||
let scheduler_pool = pool.clone();
|
||||
session_manager.tools().register(
|
||||
crate::scheduler::tools::CronAddTool::new(scheduler_pool.clone(), valid_channels),
|
||||
crate::tools::cron::CronAddTool::new(storage.clone(), valid_channels),
|
||||
);
|
||||
session_manager.tools().register(
|
||||
crate::scheduler::tools::CronListTool::new(scheduler_pool.clone()),
|
||||
crate::tools::cron::CronListTool::new(storage.clone()),
|
||||
);
|
||||
session_manager.tools().register(
|
||||
crate::scheduler::tools::CronRemoveTool::new(scheduler_pool.clone()),
|
||||
crate::tools::cron::CronRemoveTool::new(storage.clone()),
|
||||
);
|
||||
session_manager.tools().register(
|
||||
crate::scheduler::tools::CronEnableTool::new(scheduler_pool.clone()),
|
||||
crate::tools::cron::CronEnableTool::new(storage.clone()),
|
||||
);
|
||||
session_manager.tools().register(
|
||||
crate::scheduler::tools::CronDisableTool::new(scheduler_pool.clone()),
|
||||
crate::tools::cron::CronDisableTool::new(storage.clone()),
|
||||
);
|
||||
session_manager.tools().register(
|
||||
crate::scheduler::tools::CronUpdateTool::new(scheduler_pool.clone()),
|
||||
crate::tools::cron::CronUpdateTool::new(storage.clone()),
|
||||
);
|
||||
tracing::info!("Cron tools registered");
|
||||
}
|
||||
@ -117,7 +108,7 @@ impl GatewayState {
|
||||
workspace_dir: workspace_path,
|
||||
session_manager: session_manager.clone(),
|
||||
channel_manager,
|
||||
pool,
|
||||
storage,
|
||||
})
|
||||
}
|
||||
|
||||
@ -211,7 +202,7 @@ impl GatewayState {
|
||||
let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default();
|
||||
if scheduler_config.enabled {
|
||||
let sched = Arc::new(Scheduler::new(
|
||||
self.pool.clone(),
|
||||
self.storage.clone(),
|
||||
self.session_manager.clone(),
|
||||
scheduler_config,
|
||||
));
|
||||
|
||||
@ -1,6 +1,4 @@
|
||||
pub mod types;
|
||||
pub mod store;
|
||||
pub mod tools;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
@ -9,8 +7,9 @@ use tokio::time;
|
||||
use crate::config::SchedulerConfig;
|
||||
use crate::session::session::HandleResult;
|
||||
use crate::session::SessionManager;
|
||||
use crate::storage::{JobRun, ScheduledJob, Storage};
|
||||
|
||||
pub use types::{JobRun, Schedule, ScheduledJob};
|
||||
pub use types::Schedule;
|
||||
|
||||
/// Compute the next execution time (Unix ms) for a schedule, given `from` (Unix ms).
|
||||
/// Returns `None` if no next time can be determined (e.g., invalid cron expression).
|
||||
@ -51,19 +50,19 @@ fn now_ms() -> i64 {
|
||||
/// The scheduler runs as a background tokio task, periodically checking for due jobs
|
||||
/// and executing them via `SessionManager::handle_cron_message`.
|
||||
pub struct Scheduler {
|
||||
pool: sqlx::SqlitePool,
|
||||
storage: Arc<Storage>,
|
||||
session_manager: Arc<SessionManager>,
|
||||
config: SchedulerConfig,
|
||||
}
|
||||
|
||||
impl Scheduler {
|
||||
pub fn new(
|
||||
pool: sqlx::SqlitePool,
|
||||
storage: Arc<Storage>,
|
||||
session_manager: Arc<SessionManager>,
|
||||
config: SchedulerConfig,
|
||||
) -> Self {
|
||||
Self {
|
||||
pool,
|
||||
storage,
|
||||
session_manager,
|
||||
config,
|
||||
}
|
||||
@ -88,7 +87,7 @@ impl Scheduler {
|
||||
|
||||
let now = now_ms();
|
||||
|
||||
let due = match store::SchedulerStore::due_jobs(&self.pool, now, self.config.max_concurrent).await {
|
||||
let due = match self.storage.due_scheduled_jobs(now, self.config.max_concurrent).await {
|
||||
Ok(jobs) => jobs,
|
||||
Err(e) => {
|
||||
tracing::error!("scheduler: failed to query due jobs: {}", e);
|
||||
@ -106,7 +105,7 @@ impl Scheduler {
|
||||
let start = Instant::now();
|
||||
let started_at = now_ms();
|
||||
|
||||
if let Err(e) = store::SchedulerStore::touch_last_run(&self.pool, &job.id, started_at).await {
|
||||
if let Err(e) = self.storage.touch_scheduled_job_last_run(&job.id, started_at).await {
|
||||
tracing::error!(job_id = %job.id, "scheduler: failed to touch last_run_at: {}", e);
|
||||
continue;
|
||||
}
|
||||
@ -150,11 +149,11 @@ impl Scheduler {
|
||||
duration_ms,
|
||||
};
|
||||
|
||||
if let Err(e) = store::SchedulerStore::record_run(&self.pool, &run).await {
|
||||
if let Err(e) = self.storage.record_scheduled_job_run(&run).await {
|
||||
tracing::error!(job_id = %job.id, "scheduler: failed to record run: {}", e);
|
||||
}
|
||||
|
||||
if let Err(e) = store::SchedulerStore::set_last_status(&self.pool, &job.id, "ok", None).await {
|
||||
if let Err(e) = self.storage.set_scheduled_job_last_status(&job.id, "ok", None).await {
|
||||
tracing::error!(job_id = %job.id, "scheduler: failed to set last_status: {}", e);
|
||||
}
|
||||
|
||||
@ -176,7 +175,7 @@ impl Scheduler {
|
||||
duration_ms,
|
||||
};
|
||||
|
||||
let _ = store::SchedulerStore::record_run(&self.pool, &run).await;
|
||||
let _ = self.storage.record_scheduled_job_run(&run).await;
|
||||
}
|
||||
Err(e) => {
|
||||
let error_str = e.to_string();
|
||||
@ -191,12 +190,12 @@ impl Scheduler {
|
||||
duration_ms,
|
||||
};
|
||||
|
||||
if let Err(e2) = store::SchedulerStore::record_run(&self.pool, &run).await {
|
||||
if let Err(e2) = self.storage.record_scheduled_job_run(&run).await {
|
||||
tracing::error!(job_id = %job.id, "scheduler: failed to record error run: {}", e2);
|
||||
}
|
||||
|
||||
if let Err(e2) = store::SchedulerStore::set_last_status(
|
||||
&self.pool, &job.id, "error", Some(&error_str),
|
||||
if let Err(e2) = self.storage.set_scheduled_job_last_status(
|
||||
&job.id, "error", Some(&error_str),
|
||||
).await {
|
||||
tracing::error!(job_id = %job.id, "scheduler: failed to set error status: {}", e2);
|
||||
}
|
||||
@ -218,29 +217,26 @@ impl Scheduler {
|
||||
}
|
||||
|
||||
/// After a job runs, compute its next execution time or disable/delete it.
|
||||
async fn reschedule_after_run(
|
||||
&self,
|
||||
job: &ScheduledJob,
|
||||
) -> anyhow::Result<()> {
|
||||
async fn reschedule_after_run(&self, job: &ScheduledJob) -> anyhow::Result<()> {
|
||||
let now = now_ms();
|
||||
|
||||
match &job.schedule {
|
||||
Schedule::At { .. } => {
|
||||
if job.delete_after_run {
|
||||
store::SchedulerStore::remove_job(&self.pool, &job.id).await?;
|
||||
self.storage.remove_scheduled_job(&job.id).await?;
|
||||
tracing::info!(job_id = %job.id, "scheduler: one-shot job deleted after run");
|
||||
} else {
|
||||
store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?;
|
||||
self.storage.set_scheduled_job_enabled(&job.id, false).await?;
|
||||
tracing::info!(job_id = %job.id, "scheduler: one-shot job disabled after run");
|
||||
}
|
||||
}
|
||||
Schedule::Every { .. } | Schedule::Cron { .. } => {
|
||||
if let Some(next) = next_run_for_schedule(&job.schedule, now) {
|
||||
store::SchedulerStore::set_next_run(&self.pool, &job.id, next).await?;
|
||||
self.storage.set_scheduled_job_next_run(&job.id, next).await?;
|
||||
tracing::info!(job_id = %job.id, next_run_at = %next, "scheduler: job rescheduled");
|
||||
} else {
|
||||
tracing::error!(job_id = %job.id, "scheduler: could not compute next run -- disabling job");
|
||||
store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?;
|
||||
self.storage.set_scheduled_job_enabled(&job.id, false).await?;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,668 +0,0 @@
|
||||
use sqlx::Row;
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
use super::types::{JobRun, Schedule, ScheduledJob};
|
||||
|
||||
/// Persistence layer for scheduled jobs and run history.
|
||||
/// Uses a shared `sqlx::SqlitePool` (obtained from `crate::storage::Storage::pool()`).
|
||||
pub struct SchedulerStore;
|
||||
|
||||
impl SchedulerStore {
|
||||
/// Initialize the scheduler tables. Idempotent (CREATE TABLE IF NOT EXISTS).
|
||||
pub async fn init(pool: &SqlitePool) -> anyhow::Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS scheduled_jobs (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
schedule TEXT NOT NULL,
|
||||
prompt TEXT NOT NULL,
|
||||
channel TEXT NOT NULL,
|
||||
chat_id TEXT NOT NULL,
|
||||
model TEXT,
|
||||
enabled INTEGER NOT NULL DEFAULT 1,
|
||||
delete_after_run INTEGER NOT NULL DEFAULT 0,
|
||||
next_run_at INTEGER NOT NULL,
|
||||
last_run_at INTEGER,
|
||||
last_status TEXT,
|
||||
last_error TEXT,
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS job_runs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
job_id TEXT NOT NULL REFERENCES scheduled_jobs(id) ON DELETE CASCADE,
|
||||
started_at INTEGER NOT NULL,
|
||||
finished_at INTEGER NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
output TEXT,
|
||||
error TEXT,
|
||||
duration_ms INTEGER NOT NULL
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
"CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(enabled, next_run_at)",
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_runs_job_id ON job_runs(job_id)")
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Insert a new job. Returns an error if a job with the same ID already exists.
|
||||
pub async fn add_job(
|
||||
pool: &SqlitePool,
|
||||
job: &ScheduledJob,
|
||||
) -> anyhow::Result<()> {
|
||||
let schedule_json = serde_json::to_string(&job.schedule)?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO scheduled_jobs
|
||||
(id, name, schedule, prompt, channel, chat_id, model,
|
||||
enabled, delete_after_run, next_run_at, last_run_at,
|
||||
last_status, last_error, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(&job.id)
|
||||
.bind(&job.name)
|
||||
.bind(&schedule_json)
|
||||
.bind(&job.prompt)
|
||||
.bind(&job.channel)
|
||||
.bind(&job.chat_id)
|
||||
.bind(&job.model)
|
||||
.bind(job.enabled as i32)
|
||||
.bind(job.delete_after_run as i32)
|
||||
.bind(job.next_run_at)
|
||||
.bind(job.last_run_at)
|
||||
.bind(&job.last_status)
|
||||
.bind(&job.last_error)
|
||||
.bind(job.created_at)
|
||||
.bind(job.updated_at)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch a single job by ID.
|
||||
pub async fn get_job(
|
||||
pool: &SqlitePool,
|
||||
id: &str,
|
||||
) -> anyhow::Result<ScheduledJob> {
|
||||
let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?")
|
||||
.bind(id)
|
||||
.fetch_optional(pool)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("job not found: {id}"))?;
|
||||
row_to_job(&row)
|
||||
}
|
||||
|
||||
/// List all jobs, ordered by next_run_at ascending.
|
||||
pub async fn list_jobs(
|
||||
pool: &SqlitePool,
|
||||
) -> anyhow::Result<Vec<ScheduledJob>> {
|
||||
let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC")
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
rows.iter().map(row_to_job).collect()
|
||||
}
|
||||
|
||||
/// Delete a job (cascades to job_runs).
|
||||
pub async fn remove_job(
|
||||
pool: &SqlitePool,
|
||||
id: &str,
|
||||
) -> anyhow::Result<()> {
|
||||
sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?")
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enable or disable a job.
|
||||
pub async fn set_enabled(
|
||||
pool: &SqlitePool,
|
||||
id: &str,
|
||||
enabled: bool,
|
||||
) -> anyhow::Result<()> {
|
||||
sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?")
|
||||
.bind(enabled as i32)
|
||||
.bind(now_ms())
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update selective fields on a job. Pass `None` for fields that should not change.
|
||||
pub async fn update_job(
|
||||
pool: &SqlitePool,
|
||||
id: &str,
|
||||
prompt: Option<String>,
|
||||
schedule: Option<Schedule>,
|
||||
channel: Option<String>,
|
||||
chat_id: Option<String>,
|
||||
model: Option<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
let now = now_ms();
|
||||
|
||||
if let Some(p) = prompt {
|
||||
sqlx::query(
|
||||
"UPDATE scheduled_jobs SET prompt = ?, updated_at = ? WHERE id = ?",
|
||||
)
|
||||
.bind(&p)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
}
|
||||
if let Some(s) = schedule {
|
||||
let json = serde_json::to_string(&s)?;
|
||||
sqlx::query(
|
||||
"UPDATE scheduled_jobs SET schedule = ?, updated_at = ? WHERE id = ?",
|
||||
)
|
||||
.bind(&json)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
}
|
||||
if let Some(c) = channel {
|
||||
sqlx::query(
|
||||
"UPDATE scheduled_jobs SET channel = ?, updated_at = ? WHERE id = ?",
|
||||
)
|
||||
.bind(&c)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
}
|
||||
if let Some(c) = chat_id {
|
||||
sqlx::query(
|
||||
"UPDATE scheduled_jobs SET chat_id = ?, updated_at = ? WHERE id = ?",
|
||||
)
|
||||
.bind(&c)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
}
|
||||
if let Some(m) = model {
|
||||
sqlx::query(
|
||||
"UPDATE scheduled_jobs SET model = ?, updated_at = ? WHERE id = ?",
|
||||
)
|
||||
.bind(&m)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update next_run_at and last_run_at for a job (used during reschedule).
|
||||
pub async fn set_next_run(
|
||||
pool: &SqlitePool,
|
||||
id: &str,
|
||||
next_run_at: i64,
|
||||
) -> anyhow::Result<()> {
|
||||
let now = now_ms();
|
||||
sqlx::query(
|
||||
"UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?",
|
||||
)
|
||||
.bind(next_run_at)
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set last_run_at (used when starting job execution).
|
||||
pub async fn touch_last_run(
|
||||
pool: &SqlitePool,
|
||||
id: &str,
|
||||
at: i64,
|
||||
) -> anyhow::Result<()> {
|
||||
sqlx::query(
|
||||
"UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?",
|
||||
)
|
||||
.bind(at)
|
||||
.bind(at)
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set last_status and last_error after job completion.
|
||||
pub async fn set_last_status(
|
||||
pool: &SqlitePool,
|
||||
id: &str,
|
||||
status: &str,
|
||||
error: Option<&str>,
|
||||
) -> anyhow::Result<()> {
|
||||
let now = now_ms();
|
||||
sqlx::query(
|
||||
"UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?",
|
||||
)
|
||||
.bind(status)
|
||||
.bind(error)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch enabled jobs whose next_run_at <= now, up to `limit`.
|
||||
pub async fn due_jobs(
|
||||
pool: &SqlitePool,
|
||||
now: i64,
|
||||
limit: usize,
|
||||
) -> anyhow::Result<Vec<ScheduledJob>> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?",
|
||||
)
|
||||
.bind(now)
|
||||
.bind(limit as i64)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
rows.iter().map(row_to_job).collect()
|
||||
}
|
||||
|
||||
/// Record a run execution.
|
||||
pub async fn record_run(
|
||||
pool: &SqlitePool,
|
||||
run: &JobRun,
|
||||
) -> anyhow::Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(&run.job_id)
|
||||
.bind(run.started_at)
|
||||
.bind(run.finished_at)
|
||||
.bind(&run.status)
|
||||
.bind(&run.output)
|
||||
.bind(&run.error)
|
||||
.bind(run.duration_ms)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List the most recent `limit` runs for a job, newest first.
|
||||
pub async fn list_runs(
|
||||
pool: &SqlitePool,
|
||||
job_id: &str,
|
||||
limit: usize,
|
||||
) -> anyhow::Result<Vec<JobRun>> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?",
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(limit as i64)
|
||||
.fetch_all(pool)
|
||||
.await?;
|
||||
rows.iter()
|
||||
.map(|r| {
|
||||
Ok(JobRun {
|
||||
id: r.try_get("id")?,
|
||||
job_id: r.try_get("job_id")?,
|
||||
started_at: r.try_get("started_at")?,
|
||||
finished_at: r.try_get("finished_at")?,
|
||||
status: r.try_get("status")?,
|
||||
output: r.try_get("output")?,
|
||||
error: r.try_get("error")?,
|
||||
duration_ms: r.try_get("duration_ms")?,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Delete disabled jobs whose updated_at is before `before`.
|
||||
pub async fn cleanup_disabled(
|
||||
pool: &SqlitePool,
|
||||
before: i64,
|
||||
) -> anyhow::Result<()> {
|
||||
sqlx::query(
|
||||
"DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?",
|
||||
)
|
||||
.bind(before)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn now_ms() -> i64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64
|
||||
}
|
||||
|
||||
fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> anyhow::Result<ScheduledJob> {
|
||||
let schedule_json: String = row.try_get("schedule")?;
|
||||
let schedule: Schedule = serde_json::from_str(&schedule_json)?;
|
||||
Ok(ScheduledJob {
|
||||
id: row.try_get("id")?,
|
||||
name: row.try_get("name")?,
|
||||
schedule,
|
||||
prompt: row.try_get("prompt")?,
|
||||
channel: row.try_get("channel")?,
|
||||
chat_id: row.try_get("chat_id")?,
|
||||
model: row.try_get("model")?,
|
||||
enabled: row.try_get::<i32, _>("enabled")? != 0,
|
||||
delete_after_run: row.try_get::<i32, _>("delete_after_run")? != 0,
|
||||
next_run_at: row.try_get("next_run_at")?,
|
||||
last_run_at: row.try_get("last_run_at")?,
|
||||
last_status: row.try_get("last_status")?,
|
||||
last_error: row.try_get("last_error")?,
|
||||
created_at: row.try_get("created_at")?,
|
||||
updated_at: row.try_get("updated_at")?,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::scheduler::types::{JobRun, Schedule, ScheduledJob};
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
fn now() -> i64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64
|
||||
}
|
||||
|
||||
async fn setup_pool() -> SqlitePool {
|
||||
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
|
||||
SchedulerStore::init(&pool).await.unwrap();
|
||||
pool
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_init_creates_tables() {
|
||||
let pool = setup_pool().await;
|
||||
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scheduled_jobs")
|
||||
.fetch_one(&pool)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(row.0, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_add_and_get_job() {
|
||||
let pool = setup_pool().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-1".into(),
|
||||
name: "test job".into(),
|
||||
schedule: Schedule::Every { every_ms: 3600000 },
|
||||
prompt: "say hello".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "conn-1".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t + 3600000,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
||||
let got = SchedulerStore::get_job(&pool, "job-1").await.unwrap();
|
||||
assert_eq!(got.id, "job-1");
|
||||
assert_eq!(got.name, "test job");
|
||||
assert_eq!(got.prompt, "say hello");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_jobs() {
|
||||
let pool = setup_pool().await;
|
||||
let t = now();
|
||||
for i in 0..3 {
|
||||
let job = ScheduledJob {
|
||||
id: format!("job-{}", i),
|
||||
name: format!("job {}", i),
|
||||
schedule: Schedule::Every { every_ms: 3600000 },
|
||||
prompt: "ping".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "conn-1".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t + 1000,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
||||
}
|
||||
let jobs = SchedulerStore::list_jobs(&pool).await.unwrap();
|
||||
assert_eq!(jobs.len(), 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_job() {
|
||||
let pool = setup_pool().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-rm".into(),
|
||||
name: "remove me".into(),
|
||||
schedule: Schedule::Every { every_ms: 1000 },
|
||||
prompt: "hi".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "c".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
||||
SchedulerStore::remove_job(&pool, "job-rm").await.unwrap();
|
||||
let result = SchedulerStore::get_job(&pool, "job-rm").await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_enabled() {
|
||||
let pool = setup_pool().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-toggle".into(),
|
||||
name: "toggle".into(),
|
||||
schedule: Schedule::Every { every_ms: 1000 },
|
||||
prompt: "hi".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "c".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
||||
SchedulerStore::set_enabled(&pool, "job-toggle", false).await.unwrap();
|
||||
let got = SchedulerStore::get_job(&pool, "job-toggle").await.unwrap();
|
||||
assert!(!got.enabled);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_due_jobs_only_returns_enabled_and_overdue() {
|
||||
let pool = setup_pool().await;
|
||||
let t = now();
|
||||
let jobs = vec![
|
||||
ScheduledJob {
|
||||
id: "due".into(),
|
||||
name: "due".into(),
|
||||
schedule: Schedule::At { at: t },
|
||||
prompt: "1".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "c".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t - 1000,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
},
|
||||
ScheduledJob {
|
||||
id: "future".into(),
|
||||
name: "future".into(),
|
||||
schedule: Schedule::At { at: t + 99999999 },
|
||||
prompt: "2".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "c".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t + 99999999,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
},
|
||||
ScheduledJob {
|
||||
id: "disabled-due".into(),
|
||||
name: "disabled due".into(),
|
||||
schedule: Schedule::At { at: t },
|
||||
prompt: "3".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "c".into(),
|
||||
model: None,
|
||||
enabled: false,
|
||||
delete_after_run: false,
|
||||
next_run_at: t - 1000,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
},
|
||||
];
|
||||
for j in &jobs {
|
||||
SchedulerStore::add_job(&pool, j).await.unwrap();
|
||||
}
|
||||
let due = SchedulerStore::due_jobs(&pool, t, 10).await.unwrap();
|
||||
assert_eq!(due.len(), 1);
|
||||
assert_eq!(due[0].id, "due");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_record_run_and_list_runs() {
|
||||
let pool = setup_pool().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-run".into(),
|
||||
name: "run test".into(),
|
||||
schedule: Schedule::Every { every_ms: 1000 },
|
||||
prompt: "hi".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "c".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
||||
|
||||
let run = JobRun {
|
||||
id: 0,
|
||||
job_id: "job-run".into(),
|
||||
started_at: t,
|
||||
finished_at: t + 500,
|
||||
status: "ok".into(),
|
||||
output: Some("hello".into()),
|
||||
error: None,
|
||||
duration_ms: 500,
|
||||
};
|
||||
SchedulerStore::record_run(&pool, &run).await.unwrap();
|
||||
let runs = SchedulerStore::list_runs(&pool, "job-run", 10)
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(runs.len(), 1);
|
||||
assert_eq!(runs[0].status, "ok");
|
||||
assert_eq!(runs[0].output.as_deref(), Some("hello"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_job() {
|
||||
let pool = setup_pool().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-update".into(),
|
||||
name: "old name".into(),
|
||||
schedule: Schedule::Every { every_ms: 1000 },
|
||||
prompt: "old prompt".into(),
|
||||
channel: "feishu".into(),
|
||||
chat_id: "oc_1".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
||||
SchedulerStore::update_job(
|
||||
&pool,
|
||||
"job-update",
|
||||
Some("new prompt".into()),
|
||||
Some(Schedule::Every { every_ms: 60000 }),
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
let got = SchedulerStore::get_job(&pool, "job-update").await.unwrap();
|
||||
assert_eq!(got.prompt, "new prompt");
|
||||
}
|
||||
}
|
||||
@ -14,37 +14,3 @@ pub enum Schedule {
|
||||
#[serde(rename = "cron")]
|
||||
Cron { expr: String, tz: Option<String> },
|
||||
}
|
||||
|
||||
/// A scheduled job stored in the database.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ScheduledJob {
|
||||
pub id: String,
|
||||
pub name: String,
|
||||
/// JSON-serialized `Schedule` stored as TEXT in SQLite.
|
||||
pub schedule: Schedule,
|
||||
pub prompt: String,
|
||||
pub channel: String,
|
||||
pub chat_id: String,
|
||||
pub model: Option<String>,
|
||||
pub enabled: bool,
|
||||
pub delete_after_run: bool,
|
||||
pub next_run_at: i64,
|
||||
pub last_run_at: Option<i64>,
|
||||
pub last_status: Option<String>,
|
||||
pub last_error: Option<String>,
|
||||
pub created_at: i64,
|
||||
pub updated_at: i64,
|
||||
}
|
||||
|
||||
/// A single execution record for a job.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct JobRun {
|
||||
pub id: i64,
|
||||
pub job_id: String,
|
||||
pub started_at: i64,
|
||||
pub finished_at: i64,
|
||||
pub status: String,
|
||||
pub output: Option<String>,
|
||||
pub error: Option<String>,
|
||||
pub duration_ms: i64,
|
||||
}
|
||||
|
||||
@ -1,15 +1,17 @@
|
||||
pub mod error;
|
||||
pub mod session;
|
||||
pub mod message;
|
||||
pub mod scheduler;
|
||||
pub mod session;
|
||||
|
||||
pub use error::StorageError;
|
||||
pub use scheduler::{JobRun, ScheduledJob};
|
||||
|
||||
use sqlx::{Pool, Row, Sqlite, SqlitePool};
|
||||
use tokio::time::{sleep, Duration};
|
||||
use std::path::Path;
|
||||
|
||||
pub struct Storage {
|
||||
pool: Pool<Sqlite>,
|
||||
pub(crate) pool: Pool<Sqlite>,
|
||||
}
|
||||
|
||||
impl Storage {
|
||||
@ -109,6 +111,66 @@ impl Storage {
|
||||
.execute(&self.pool)
|
||||
.await?;
|
||||
|
||||
if let Err(e) = Self::init_scheduler_schema(&self.pool).await {
|
||||
tracing::warn!("Failed to init scheduler schema (tables may already exist): {}", e);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Initialize the scheduler tables (idempotent).
|
||||
pub(crate) async fn init_scheduler_schema(pool: &Pool<Sqlite>) -> Result<(), StorageError> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS scheduled_jobs (
|
||||
id TEXT PRIMARY KEY,
|
||||
name TEXT NOT NULL,
|
||||
schedule TEXT NOT NULL,
|
||||
prompt TEXT NOT NULL,
|
||||
channel TEXT NOT NULL,
|
||||
chat_id TEXT NOT NULL,
|
||||
model TEXT,
|
||||
enabled INTEGER NOT NULL DEFAULT 1,
|
||||
delete_after_run INTEGER NOT NULL DEFAULT 0,
|
||||
next_run_at INTEGER NOT NULL,
|
||||
last_run_at INTEGER,
|
||||
last_status TEXT,
|
||||
last_error TEXT,
|
||||
created_at INTEGER NOT NULL,
|
||||
updated_at INTEGER NOT NULL
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
r#"
|
||||
CREATE TABLE IF NOT EXISTS job_runs (
|
||||
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
||||
job_id TEXT NOT NULL REFERENCES scheduled_jobs(id) ON DELETE CASCADE,
|
||||
started_at INTEGER NOT NULL,
|
||||
finished_at INTEGER NOT NULL,
|
||||
status TEXT NOT NULL,
|
||||
output TEXT,
|
||||
error TEXT,
|
||||
duration_ms INTEGER NOT NULL
|
||||
)
|
||||
"#,
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query(
|
||||
"CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(enabled, next_run_at)",
|
||||
)
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
sqlx::query("CREATE INDEX IF NOT EXISTS idx_runs_job_id ON job_runs(job_id)")
|
||||
.execute(pool)
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
551
src/storage/scheduler.rs
Normal file
551
src/storage/scheduler.rs
Normal file
@ -0,0 +1,551 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sqlx::Row;
|
||||
|
||||
use crate::scheduler::Schedule;
|
||||
|
||||
/// A scheduled job stored in the database.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ScheduledJob {
|
||||
pub id: String,
|
||||
pub name: String,
|
||||
/// JSON-serialized `Schedule` stored as TEXT in SQLite.
|
||||
pub schedule: Schedule,
|
||||
pub prompt: String,
|
||||
pub channel: String,
|
||||
pub chat_id: String,
|
||||
pub model: Option<String>,
|
||||
pub enabled: bool,
|
||||
pub delete_after_run: bool,
|
||||
pub next_run_at: i64,
|
||||
pub last_run_at: Option<i64>,
|
||||
pub last_status: Option<String>,
|
||||
pub last_error: Option<String>,
|
||||
pub created_at: i64,
|
||||
pub updated_at: i64,
|
||||
}
|
||||
|
||||
/// A single execution record for a job.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct JobRun {
|
||||
pub id: i64,
|
||||
pub job_id: String,
|
||||
pub started_at: i64,
|
||||
pub finished_at: i64,
|
||||
pub status: String,
|
||||
pub output: Option<String>,
|
||||
pub error: Option<String>,
|
||||
pub duration_ms: i64,
|
||||
}
|
||||
|
||||
impl crate::storage::Storage {
|
||||
/// Insert a new scheduled job.
|
||||
pub async fn add_scheduled_job(&self, job: &ScheduledJob) -> anyhow::Result<()> {
|
||||
let schedule_json = serde_json::to_string(&job.schedule)?;
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO scheduled_jobs
|
||||
(id, name, schedule, prompt, channel, chat_id, model,
|
||||
enabled, delete_after_run, next_run_at, last_run_at,
|
||||
last_status, last_error, created_at, updated_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(&job.id)
|
||||
.bind(&job.name)
|
||||
.bind(&schedule_json)
|
||||
.bind(&job.prompt)
|
||||
.bind(&job.channel)
|
||||
.bind(&job.chat_id)
|
||||
.bind(&job.model)
|
||||
.bind(job.enabled as i32)
|
||||
.bind(job.delete_after_run as i32)
|
||||
.bind(job.next_run_at)
|
||||
.bind(job.last_run_at)
|
||||
.bind(&job.last_status)
|
||||
.bind(&job.last_error)
|
||||
.bind(job.created_at)
|
||||
.bind(job.updated_at)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch a single scheduled job by ID.
|
||||
pub async fn get_scheduled_job(&self, id: &str) -> anyhow::Result<ScheduledJob> {
|
||||
let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?")
|
||||
.bind(id)
|
||||
.fetch_optional(self.pool())
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("job not found: {id}"))?;
|
||||
row_to_job(&row)
|
||||
}
|
||||
|
||||
/// List all scheduled jobs, ordered by next_run_at ascending.
|
||||
pub async fn list_scheduled_jobs(&self) -> anyhow::Result<Vec<ScheduledJob>> {
|
||||
let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC")
|
||||
.fetch_all(self.pool())
|
||||
.await?;
|
||||
rows.iter().map(row_to_job).collect()
|
||||
}
|
||||
|
||||
/// Delete a scheduled job (cascades to job_runs).
|
||||
pub async fn remove_scheduled_job(&self, id: &str) -> anyhow::Result<()> {
|
||||
sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?")
|
||||
.bind(id)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Enable or disable a scheduled job.
|
||||
pub async fn set_scheduled_job_enabled(&self, id: &str, enabled: bool) -> anyhow::Result<()> {
|
||||
sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?")
|
||||
.bind(enabled as i32)
|
||||
.bind(now_ms())
|
||||
.bind(id)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update selective fields on a scheduled job.
|
||||
pub async fn update_scheduled_job(
|
||||
&self,
|
||||
id: &str,
|
||||
prompt: Option<String>,
|
||||
schedule: Option<Schedule>,
|
||||
channel: Option<String>,
|
||||
chat_id: Option<String>,
|
||||
model: Option<String>,
|
||||
) -> anyhow::Result<()> {
|
||||
let now = now_ms();
|
||||
|
||||
if let Some(p) = prompt {
|
||||
sqlx::query("UPDATE scheduled_jobs SET prompt = ?, updated_at = ? WHERE id = ?")
|
||||
.bind(&p)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
}
|
||||
if let Some(s) = schedule {
|
||||
let json = serde_json::to_string(&s)?;
|
||||
sqlx::query("UPDATE scheduled_jobs SET schedule = ?, updated_at = ? WHERE id = ?")
|
||||
.bind(&json)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
}
|
||||
if let Some(c) = channel {
|
||||
sqlx::query("UPDATE scheduled_jobs SET channel = ?, updated_at = ? WHERE id = ?")
|
||||
.bind(&c)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
}
|
||||
if let Some(c) = chat_id {
|
||||
sqlx::query("UPDATE scheduled_jobs SET chat_id = ?, updated_at = ? WHERE id = ?")
|
||||
.bind(&c)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
}
|
||||
if let Some(m) = model {
|
||||
sqlx::query("UPDATE scheduled_jobs SET model = ?, updated_at = ? WHERE id = ?")
|
||||
.bind(&m)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Update next_run_at and last_run_at for a job.
|
||||
pub async fn set_scheduled_job_next_run(&self, id: &str, next_run_at: i64) -> anyhow::Result<()> {
|
||||
let now = now_ms();
|
||||
sqlx::query(
|
||||
"UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?",
|
||||
)
|
||||
.bind(next_run_at)
|
||||
.bind(now)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set last_run_at for a job (used when starting execution).
|
||||
pub async fn touch_scheduled_job_last_run(&self, id: &str, at: i64) -> anyhow::Result<()> {
|
||||
sqlx::query("UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?")
|
||||
.bind(at)
|
||||
.bind(at)
|
||||
.bind(id)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Set last_status and last_error after job completion.
|
||||
pub async fn set_scheduled_job_last_status(
|
||||
&self,
|
||||
id: &str,
|
||||
status: &str,
|
||||
error: Option<&str>,
|
||||
) -> anyhow::Result<()> {
|
||||
let now = now_ms();
|
||||
sqlx::query(
|
||||
"UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?",
|
||||
)
|
||||
.bind(status)
|
||||
.bind(error)
|
||||
.bind(now)
|
||||
.bind(id)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Fetch enabled jobs whose next_run_at <= now, up to `limit`.
|
||||
pub async fn due_scheduled_jobs(
|
||||
&self,
|
||||
now: i64,
|
||||
limit: usize,
|
||||
) -> anyhow::Result<Vec<ScheduledJob>> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?",
|
||||
)
|
||||
.bind(now)
|
||||
.bind(limit as i64)
|
||||
.fetch_all(self.pool())
|
||||
.await?;
|
||||
rows.iter().map(row_to_job).collect()
|
||||
}
|
||||
|
||||
/// Record a job execution run.
|
||||
pub async fn record_scheduled_job_run(&self, run: &JobRun) -> anyhow::Result<()> {
|
||||
sqlx::query(
|
||||
r#"
|
||||
INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
"#,
|
||||
)
|
||||
.bind(&run.job_id)
|
||||
.bind(run.started_at)
|
||||
.bind(run.finished_at)
|
||||
.bind(&run.status)
|
||||
.bind(&run.output)
|
||||
.bind(&run.error)
|
||||
.bind(run.duration_ms)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// List recent runs for a job, newest first.
|
||||
pub async fn list_scheduled_job_runs(
|
||||
&self,
|
||||
job_id: &str,
|
||||
limit: usize,
|
||||
) -> anyhow::Result<Vec<JobRun>> {
|
||||
let rows = sqlx::query(
|
||||
"SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?",
|
||||
)
|
||||
.bind(job_id)
|
||||
.bind(limit as i64)
|
||||
.fetch_all(self.pool())
|
||||
.await?;
|
||||
rows.iter()
|
||||
.map(|r| {
|
||||
Ok(JobRun {
|
||||
id: r.try_get("id")?,
|
||||
job_id: r.try_get("job_id")?,
|
||||
started_at: r.try_get("started_at")?,
|
||||
finished_at: r.try_get("finished_at")?,
|
||||
status: r.try_get("status")?,
|
||||
output: r.try_get("output")?,
|
||||
error: r.try_get("error")?,
|
||||
duration_ms: r.try_get("duration_ms")?,
|
||||
})
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Delete disabled jobs whose updated_at is before `before`.
|
||||
pub async fn cleanup_disabled_scheduled_jobs(&self, before: i64) -> anyhow::Result<()> {
|
||||
sqlx::query("DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?")
|
||||
.bind(before)
|
||||
.execute(self.pool())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
fn now_ms() -> i64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_millis() as i64
|
||||
}
|
||||
|
||||
fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> anyhow::Result<ScheduledJob> {
|
||||
let schedule_json: String = row.try_get("schedule")?;
|
||||
let schedule: Schedule = serde_json::from_str(&schedule_json)?;
|
||||
Ok(ScheduledJob {
|
||||
id: row.try_get("id")?,
|
||||
name: row.try_get("name")?,
|
||||
schedule,
|
||||
prompt: row.try_get("prompt")?,
|
||||
channel: row.try_get("channel")?,
|
||||
chat_id: row.try_get("chat_id")?,
|
||||
model: row.try_get("model")?,
|
||||
enabled: row.try_get::<i32, _>("enabled")? != 0,
|
||||
delete_after_run: row.try_get::<i32, _>("delete_after_run")? != 0,
|
||||
next_run_at: row.try_get("next_run_at")?,
|
||||
last_run_at: row.try_get("last_run_at")?,
|
||||
last_status: row.try_get("last_status")?,
|
||||
last_error: row.try_get("last_error")?,
|
||||
created_at: row.try_get("created_at")?,
|
||||
updated_at: row.try_get("updated_at")?,
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::ScheduledJob;
|
||||
use crate::scheduler::Schedule;
|
||||
use crate::storage::Storage;
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
fn now() -> i64 {
|
||||
std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis() as i64
|
||||
}
|
||||
|
||||
async fn setup_storage() -> Storage {
|
||||
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
|
||||
let storage = Storage { pool };
|
||||
Storage::init_scheduler_schema(storage.pool()).await.unwrap();
|
||||
storage
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_init_creates_tables() {
|
||||
let storage = setup_storage().await;
|
||||
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scheduled_jobs")
|
||||
.fetch_one(storage.pool())
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(row.0, 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_add_and_get_job() {
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-1".into(),
|
||||
name: "test job".into(),
|
||||
schedule: Schedule::Every { every_ms: 3600000 },
|
||||
prompt: "say hello".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "conn-1".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t + 3600000,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
storage.add_scheduled_job(&job).await.unwrap();
|
||||
let got = storage.get_scheduled_job("job-1").await.unwrap();
|
||||
assert_eq!(got.id, "job-1");
|
||||
assert_eq!(got.name, "test job");
|
||||
assert_eq!(got.prompt, "say hello");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_list_jobs() {
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
for i in 0..3 {
|
||||
let job = ScheduledJob {
|
||||
id: format!("job-{}", i),
|
||||
name: format!("job {}", i),
|
||||
schedule: Schedule::Every { every_ms: 3600000 },
|
||||
prompt: "ping".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "conn-1".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t + 1000,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
storage.add_scheduled_job(&job).await.unwrap();
|
||||
}
|
||||
let jobs = storage.list_scheduled_jobs().await.unwrap();
|
||||
assert_eq!(jobs.len(), 3);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_remove_job() {
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-rm".into(),
|
||||
name: "remove me".into(),
|
||||
schedule: Schedule::Every { every_ms: 1000 },
|
||||
prompt: "hi".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "c".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
storage.add_scheduled_job(&job).await.unwrap();
|
||||
storage.remove_scheduled_job("job-rm").await.unwrap();
|
||||
let result = storage.get_scheduled_job("job-rm").await;
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_set_enabled() {
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-toggle".into(),
|
||||
name: "toggle".into(),
|
||||
schedule: Schedule::Every { every_ms: 1000 },
|
||||
prompt: "hi".into(),
|
||||
channel: "cli_chat".into(),
|
||||
chat_id: "c".into(),
|
||||
model: None,
|
||||
enabled: true,
|
||||
delete_after_run: false,
|
||||
next_run_at: t,
|
||||
last_run_at: None,
|
||||
last_status: None,
|
||||
last_error: None,
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
storage.add_scheduled_job(&job).await.unwrap();
|
||||
storage.set_scheduled_job_enabled("job-toggle", false).await.unwrap();
|
||||
let got = storage.get_scheduled_job("job-toggle").await.unwrap();
|
||||
assert!(!got.enabled);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_due_jobs_only_returns_enabled_and_overdue() {
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
let jobs = vec![
|
||||
ScheduledJob {
|
||||
id: "due".into(), name: "due".into(),
|
||||
schedule: Schedule::At { at: t }, prompt: "1".into(),
|
||||
channel: "cli_chat".into(), chat_id: "c".into(),
|
||||
model: None, enabled: true, delete_after_run: false,
|
||||
next_run_at: t - 1000, last_run_at: None,
|
||||
last_status: None, last_error: None,
|
||||
created_at: t, updated_at: t,
|
||||
},
|
||||
ScheduledJob {
|
||||
id: "future".into(), name: "future".into(),
|
||||
schedule: Schedule::At { at: t + 99999999 }, prompt: "2".into(),
|
||||
channel: "cli_chat".into(), chat_id: "c".into(),
|
||||
model: None, enabled: true, delete_after_run: false,
|
||||
next_run_at: t + 99999999, last_run_at: None,
|
||||
last_status: None, last_error: None,
|
||||
created_at: t, updated_at: t,
|
||||
},
|
||||
ScheduledJob {
|
||||
id: "disabled-due".into(), name: "disabled due".into(),
|
||||
schedule: Schedule::At { at: t }, prompt: "3".into(),
|
||||
channel: "cli_chat".into(), chat_id: "c".into(),
|
||||
model: None, enabled: false, delete_after_run: false,
|
||||
next_run_at: t - 1000, last_run_at: None,
|
||||
last_status: None, last_error: None,
|
||||
created_at: t, updated_at: t,
|
||||
},
|
||||
];
|
||||
for j in &jobs {
|
||||
storage.add_scheduled_job(j).await.unwrap();
|
||||
}
|
||||
let due = storage.due_scheduled_jobs(t, 10).await.unwrap();
|
||||
assert_eq!(due.len(), 1);
|
||||
assert_eq!(due[0].id, "due");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_record_run_and_list_runs() {
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-run".into(), name: "run test".into(),
|
||||
schedule: Schedule::Every { every_ms: 1000 },
|
||||
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
|
||||
model: None, enabled: true, delete_after_run: false,
|
||||
next_run_at: t, last_run_at: None,
|
||||
last_status: None, last_error: None,
|
||||
created_at: t, updated_at: t,
|
||||
};
|
||||
storage.add_scheduled_job(&job).await.unwrap();
|
||||
|
||||
let run = super::JobRun {
|
||||
id: 0, job_id: "job-run".into(),
|
||||
started_at: t, finished_at: t + 500,
|
||||
status: "ok".into(), output: Some("hello".into()),
|
||||
error: None, duration_ms: 500,
|
||||
};
|
||||
storage.record_scheduled_job_run(&run).await.unwrap();
|
||||
let runs = storage.list_scheduled_job_runs("job-run", 10).await.unwrap();
|
||||
assert_eq!(runs.len(), 1);
|
||||
assert_eq!(runs[0].status, "ok");
|
||||
assert_eq!(runs[0].output.as_deref(), Some("hello"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_update_job() {
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-update".into(), name: "old name".into(),
|
||||
schedule: Schedule::Every { every_ms: 1000 },
|
||||
prompt: "old prompt".into(), channel: "feishu".into(),
|
||||
chat_id: "oc_1".into(), model: None,
|
||||
enabled: true, delete_after_run: false,
|
||||
next_run_at: t, last_run_at: None,
|
||||
last_status: None, last_error: None,
|
||||
created_at: t, updated_at: t,
|
||||
};
|
||||
storage.add_scheduled_job(&job).await.unwrap();
|
||||
storage.update_scheduled_job(
|
||||
"job-update",
|
||||
Some("new prompt".into()),
|
||||
Some(Schedule::Every { every_ms: 60000 }),
|
||||
None, None, None,
|
||||
).await.unwrap();
|
||||
let got = storage.get_scheduled_job("job-update").await.unwrap();
|
||||
assert_eq!(got.prompt, "new prompt");
|
||||
}
|
||||
}
|
||||
@ -1,11 +1,11 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use serde_json::{json, Value};
|
||||
use sqlx::SqlitePool;
|
||||
use uuid::Uuid;
|
||||
|
||||
use crate::scheduler::next_run_for_schedule;
|
||||
use crate::scheduler::store::SchedulerStore;
|
||||
use crate::scheduler::types::{Schedule, ScheduledJob};
|
||||
use crate::scheduler::{next_run_for_schedule, Schedule};
|
||||
use crate::storage::{ScheduledJob, Storage};
|
||||
use crate::tools::traits::{Tool, ToolResult};
|
||||
|
||||
fn now_ms() -> i64 {
|
||||
@ -15,17 +15,15 @@ fn now_ms() -> i64 {
|
||||
.as_millis() as i64
|
||||
}
|
||||
|
||||
// ── CronAddTool ──────────────────────────────────────────────────────────────
|
||||
|
||||
pub struct CronAddTool {
|
||||
pool: SqlitePool,
|
||||
storage: Arc<Storage>,
|
||||
valid_channels: Vec<String>,
|
||||
}
|
||||
|
||||
impl CronAddTool {
|
||||
pub fn new(pool: SqlitePool, valid_channels: Vec<String>) -> Self {
|
||||
pub fn new(storage: Arc<Storage>, valid_channels: Vec<String>) -> Self {
|
||||
Self {
|
||||
pool,
|
||||
storage,
|
||||
valid_channels,
|
||||
}
|
||||
}
|
||||
@ -163,7 +161,7 @@ impl Tool for CronAddTool {
|
||||
updated_at: now,
|
||||
};
|
||||
|
||||
SchedulerStore::add_job(&self.pool, &job).await?;
|
||||
self.storage.add_scheduled_job(&job).await?;
|
||||
|
||||
Ok(ToolResult {
|
||||
success: true,
|
||||
@ -179,12 +177,12 @@ impl Tool for CronAddTool {
|
||||
// ── CronListTool ─────────────────────────────────────────────────────────────
|
||||
|
||||
pub struct CronListTool {
|
||||
pool: SqlitePool,
|
||||
storage: Arc<Storage>,
|
||||
}
|
||||
|
||||
impl CronListTool {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
pub fn new(storage: Arc<Storage>) -> Self {
|
||||
Self { storage }
|
||||
}
|
||||
}
|
||||
|
||||
@ -220,7 +218,7 @@ impl Tool for CronListTool {
|
||||
.get("status")
|
||||
.and_then(|v| v.as_str())
|
||||
.unwrap_or("all");
|
||||
let jobs = SchedulerStore::list_jobs(&self.pool).await?;
|
||||
let jobs = self.storage.list_scheduled_jobs().await?;
|
||||
|
||||
let filtered: Vec<&ScheduledJob> = match filter {
|
||||
"enabled" => jobs.iter().filter(|j| j.enabled).collect(),
|
||||
@ -262,12 +260,12 @@ impl Tool for CronListTool {
|
||||
// ── CronRemoveTool ───────────────────────────────────────────────────────────
|
||||
|
||||
pub struct CronRemoveTool {
|
||||
pool: SqlitePool,
|
||||
storage: Arc<Storage>,
|
||||
}
|
||||
|
||||
impl CronRemoveTool {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
pub fn new(storage: Arc<Storage>) -> Self {
|
||||
Self { storage }
|
||||
}
|
||||
}
|
||||
|
||||
@ -308,7 +306,7 @@ impl Tool for CronRemoveTool {
|
||||
});
|
||||
}
|
||||
|
||||
match SchedulerStore::get_job(&self.pool, &job_id).await {
|
||||
match self.storage.get_scheduled_job(&job_id).await {
|
||||
Ok(_) => {}
|
||||
Err(_) => {
|
||||
return Ok(ToolResult {
|
||||
@ -319,7 +317,7 @@ impl Tool for CronRemoveTool {
|
||||
}
|
||||
}
|
||||
|
||||
SchedulerStore::remove_job(&self.pool, &job_id).await?;
|
||||
self.storage.remove_scheduled_job(&job_id).await?;
|
||||
Ok(ToolResult {
|
||||
success: true,
|
||||
output: format!("Job {} deleted.", job_id),
|
||||
@ -331,12 +329,12 @@ impl Tool for CronRemoveTool {
|
||||
// ── CronEnableTool ───────────────────────────────────────────────────────────
|
||||
|
||||
pub struct CronEnableTool {
|
||||
pool: SqlitePool,
|
||||
storage: Arc<Storage>,
|
||||
}
|
||||
|
||||
impl CronEnableTool {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
pub fn new(storage: Arc<Storage>) -> Self {
|
||||
Self { storage }
|
||||
}
|
||||
}
|
||||
|
||||
@ -377,14 +375,16 @@ impl Tool for CronEnableTool {
|
||||
});
|
||||
}
|
||||
|
||||
let job = SchedulerStore::get_job(&self.pool, &job_id)
|
||||
let job = self
|
||||
.storage
|
||||
.get_scheduled_job(&job_id)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?;
|
||||
|
||||
let next = next_run_for_schedule(&job.schedule, now_ms());
|
||||
SchedulerStore::set_enabled(&self.pool, &job_id, true).await?;
|
||||
self.storage.set_scheduled_job_enabled(&job_id, true).await?;
|
||||
if let Some(n) = next {
|
||||
SchedulerStore::set_next_run(&self.pool, &job_id, n).await?;
|
||||
self.storage.set_scheduled_job_next_run(&job_id, n).await?;
|
||||
}
|
||||
|
||||
Ok(ToolResult {
|
||||
@ -398,12 +398,12 @@ impl Tool for CronEnableTool {
|
||||
// ── CronDisableTool ──────────────────────────────────────────────────────────
|
||||
|
||||
pub struct CronDisableTool {
|
||||
pool: SqlitePool,
|
||||
storage: Arc<Storage>,
|
||||
}
|
||||
|
||||
impl CronDisableTool {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
pub fn new(storage: Arc<Storage>) -> Self {
|
||||
Self { storage }
|
||||
}
|
||||
}
|
||||
|
||||
@ -444,10 +444,12 @@ impl Tool for CronDisableTool {
|
||||
});
|
||||
}
|
||||
|
||||
let _ = SchedulerStore::get_job(&self.pool, &job_id)
|
||||
let _ = self
|
||||
.storage
|
||||
.get_scheduled_job(&job_id)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?;
|
||||
SchedulerStore::set_enabled(&self.pool, &job_id, false).await?;
|
||||
self.storage.set_scheduled_job_enabled(&job_id, false).await?;
|
||||
|
||||
Ok(ToolResult {
|
||||
success: true,
|
||||
@ -460,12 +462,12 @@ impl Tool for CronDisableTool {
|
||||
// ── CronUpdateTool ───────────────────────────────────────────────────────────
|
||||
|
||||
pub struct CronUpdateTool {
|
||||
pool: SqlitePool,
|
||||
storage: Arc<Storage>,
|
||||
}
|
||||
|
||||
impl CronUpdateTool {
|
||||
pub fn new(pool: SqlitePool) -> Self {
|
||||
Self { pool }
|
||||
pub fn new(storage: Arc<Storage>) -> Self {
|
||||
Self { storage }
|
||||
}
|
||||
}
|
||||
|
||||
@ -526,7 +528,9 @@ impl Tool for CronUpdateTool {
|
||||
});
|
||||
}
|
||||
|
||||
let _ = SchedulerStore::get_job(&self.pool, &job_id)
|
||||
let _ = self
|
||||
.storage
|
||||
.get_scheduled_job(&job_id)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?;
|
||||
|
||||
@ -554,13 +558,14 @@ impl Tool for CronUpdateTool {
|
||||
.and_then(|v| v.as_str())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
SchedulerStore::update_job(&self.pool, &job_id, prompt, schedule, channel, chat_id, model)
|
||||
self.storage
|
||||
.update_scheduled_job(&job_id, prompt, schedule, channel, chat_id, model)
|
||||
.await?;
|
||||
|
||||
if args.get("schedule").is_some() {
|
||||
let job = SchedulerStore::get_job(&self.pool, &job_id).await?;
|
||||
let job = self.storage.get_scheduled_job(&job_id).await?;
|
||||
if let Some(next) = next_run_for_schedule(&job.schedule, now_ms()) {
|
||||
SchedulerStore::set_next_run(&self.pool, &job_id, next).await?;
|
||||
self.storage.set_scheduled_job_next_run(&job_id, next).await?;
|
||||
}
|
||||
}
|
||||
|
||||
@ -575,15 +580,15 @@ impl Tool for CronUpdateTool {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::scheduler::store::SchedulerStore;
|
||||
use crate::scheduler::types::{Schedule, ScheduledJob};
|
||||
use crate::scheduler::Schedule;
|
||||
use crate::storage::{ScheduledJob, Storage};
|
||||
use serde_json::json;
|
||||
use sqlx::SqlitePool;
|
||||
|
||||
async fn setup_pool() -> SqlitePool {
|
||||
async fn setup_storage() -> Arc<Storage> {
|
||||
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
|
||||
SchedulerStore::init(&pool).await.unwrap();
|
||||
pool
|
||||
Storage::init_scheduler_schema(&pool).await.unwrap();
|
||||
Arc::new(Storage { pool })
|
||||
}
|
||||
|
||||
fn now() -> i64 {
|
||||
@ -595,8 +600,8 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cron_add_tool() {
|
||||
let pool = setup_pool().await;
|
||||
let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]);
|
||||
let storage = setup_storage().await;
|
||||
let tool = CronAddTool::new(storage.clone(), vec!["cli_chat".to_string()]);
|
||||
let result = tool
|
||||
.execute(json!({
|
||||
"schedule": {"type": "every", "every_ms": 3600000},
|
||||
@ -610,15 +615,15 @@ mod tests {
|
||||
assert!(result.success);
|
||||
assert!(result.output.contains("hourly report"));
|
||||
|
||||
let jobs = SchedulerStore::list_jobs(&pool).await.unwrap();
|
||||
let jobs = storage.list_scheduled_jobs().await.unwrap();
|
||||
assert_eq!(jobs.len(), 1);
|
||||
assert_eq!(jobs[0].name, "hourly report");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cron_add_invalid_channel() {
|
||||
let pool = setup_pool().await;
|
||||
let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]);
|
||||
let storage = setup_storage().await;
|
||||
let tool = CronAddTool::new(storage.clone(), vec!["cli_chat".to_string()]);
|
||||
let result = tool
|
||||
.execute(json!({
|
||||
"schedule": {"type": "every", "every_ms": 3600000},
|
||||
@ -635,7 +640,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cron_list_tool() {
|
||||
let pool = setup_pool().await;
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: uuid::Uuid::new_v4().to_string(),
|
||||
@ -654,9 +659,9 @@ mod tests {
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
||||
storage.add_scheduled_job(&job).await.unwrap();
|
||||
|
||||
let tool = CronListTool::new(pool.clone());
|
||||
let tool = CronListTool::new(storage.clone());
|
||||
let result = tool.execute(json!({})).await.unwrap();
|
||||
assert!(result.success);
|
||||
assert!(result.output.contains("list-test"));
|
||||
@ -664,7 +669,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cron_remove_tool() {
|
||||
let pool = setup_pool().await;
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-rm-tool".into(),
|
||||
@ -683,22 +688,20 @@ mod tests {
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
||||
storage.add_scheduled_job(&job).await.unwrap();
|
||||
|
||||
let tool = CronRemoveTool::new(pool.clone());
|
||||
let tool = CronRemoveTool::new(storage.clone());
|
||||
let result = tool
|
||||
.execute(json!({"job_id": "job-rm-tool"}))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.success);
|
||||
assert!(SchedulerStore::get_job(&pool, "job-rm-tool")
|
||||
.await
|
||||
.is_err());
|
||||
assert!(storage.get_scheduled_job("job-rm-tool").await.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cron_enable_disable_tools() {
|
||||
let pool = setup_pool().await;
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-toggle-tool".into(),
|
||||
@ -717,36 +720,32 @@ mod tests {
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
||||
storage.add_scheduled_job(&job).await.unwrap();
|
||||
|
||||
let disable_tool = CronDisableTool::new(pool.clone());
|
||||
let disable_tool = CronDisableTool::new(storage.clone());
|
||||
let result = disable_tool
|
||||
.execute(json!({"job_id": "job-toggle-tool"}))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.success);
|
||||
|
||||
let got = SchedulerStore::get_job(&pool, "job-toggle-tool")
|
||||
.await
|
||||
.unwrap();
|
||||
let got = storage.get_scheduled_job("job-toggle-tool").await.unwrap();
|
||||
assert!(!got.enabled);
|
||||
|
||||
let enable_tool = CronEnableTool::new(pool.clone());
|
||||
let enable_tool = CronEnableTool::new(storage.clone());
|
||||
let result = enable_tool
|
||||
.execute(json!({"job_id": "job-toggle-tool"}))
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(result.success);
|
||||
|
||||
let got = SchedulerStore::get_job(&pool, "job-toggle-tool")
|
||||
.await
|
||||
.unwrap();
|
||||
let got = storage.get_scheduled_job("job-toggle-tool").await.unwrap();
|
||||
assert!(got.enabled);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_cron_update_tool() {
|
||||
let pool = setup_pool().await;
|
||||
let storage = setup_storage().await;
|
||||
let t = now();
|
||||
let job = ScheduledJob {
|
||||
id: "job-update-tool".into(),
|
||||
@ -767,9 +766,9 @@ mod tests {
|
||||
created_at: t,
|
||||
updated_at: t,
|
||||
};
|
||||
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
||||
storage.add_scheduled_job(&job).await.unwrap();
|
||||
|
||||
let tool = CronUpdateTool::new(pool.clone());
|
||||
let tool = CronUpdateTool::new(storage.clone());
|
||||
let result = tool
|
||||
.execute(json!({
|
||||
"job_id": "job-update-tool",
|
||||
@ -780,9 +779,7 @@ mod tests {
|
||||
.unwrap();
|
||||
assert!(result.success);
|
||||
|
||||
let got = SchedulerStore::get_job(&pool, "job-update-tool")
|
||||
.await
|
||||
.unwrap();
|
||||
let got = storage.get_scheduled_job("job-update-tool").await.unwrap();
|
||||
assert_eq!(got.prompt, "new prompt");
|
||||
}
|
||||
}
|
||||
@ -1,5 +1,6 @@
|
||||
pub mod bash;
|
||||
pub mod calculator;
|
||||
pub mod cron;
|
||||
pub mod file_edit;
|
||||
pub mod file_read;
|
||||
pub mod file_write;
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user