From 62f4326131f6b7936521efb73a3b6f32bf4fb45b Mon Sep 17 00:00:00 2001 From: xiaoski Date: Tue, 5 May 2026 00:49:54 +0800 Subject: [PATCH] refactor: move scheduler store to storage module, cron tools to tools module - storage/scheduler.rs: ScheduledJob/JobRun types + CRUD on Storage - tools/cron.rs: 6 cron agent tools (add/list/remove/enable/disable/update) - scheduler/types.rs: keep only Schedule enum - scheduler/mod.rs: use Arc instead of raw SqlitePool - gateway/mod.rs: inject Storage directly, replace pool field - storage/mod.rs: scheduler tables in init_schema --- .../plans/2026-05-04-scheduled-tasks.md | 2356 +++++++++++++++++ src/gateway/mod.rs | 27 +- src/scheduler/mod.rs | 40 +- src/scheduler/store.rs | 668 ----- src/scheduler/types.rs | 34 - src/storage/mod.rs | 66 +- src/storage/scheduler.rs | 551 ++++ src/{scheduler/tools.rs => tools/cron.rs} | 141 +- src/tools/mod.rs | 1 + 9 files changed, 3068 insertions(+), 816 deletions(-) create mode 100644 docs/superpowers/plans/2026-05-04-scheduled-tasks.md delete mode 100644 src/scheduler/store.rs create mode 100644 src/storage/scheduler.rs rename src/{scheduler/tools.rs => tools/cron.rs} (85%) diff --git a/docs/superpowers/plans/2026-05-04-scheduled-tasks.md b/docs/superpowers/plans/2026-05-04-scheduled-tasks.md new file mode 100644 index 0000000..7250357 --- /dev/null +++ b/docs/superpowers/plans/2026-05-04-scheduled-tasks.md @@ -0,0 +1,2356 @@ +# Scheduled Tasks (Cron Jobs) Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add a cron-like scheduled task system to PicoBot that triggers agent LLM prompts on a schedule (cron expression / fixed interval / one-shot), with results delivered to channels. + +**Architecture:** A new `src/scheduler/` module with its own SQLite store (sharing the existing `sqlx::SqlitePool` via `storage.pool()`). The scheduler runs as a tokio background task, calling `SessionManager::handle_cron_message()` directly. Agent-facing tools (cron_add, cron_list, etc.) let the LLM manage jobs. Schedule computation uses the `cron` and `chrono-tz` crates. + +**Tech Stack:** Rust, tokio, sqlx, `cron` 0.15, `chrono-tz` 0.10 + +--- + +## File Structure + +| File | Create/Modify | Responsibility | +|------|--------------|----------------| +| `Cargo.toml` | Modify | Add `cron`, `chrono-tz` dependencies | +| `src/lib.rs` | Modify | Add `pub mod scheduler;` | +| `src/config/mod.rs` | Modify | Add `SchedulerConfig` to `GatewayConfig` | +| `src/scheduler/types.rs` | Create | `Schedule`, `ScheduledJob`, `JobRun` data types | +| `src/scheduler/store.rs` | Create | SQLite schema + CRUD for `scheduled_jobs` and `job_runs` | +| `src/scheduler/mod.rs` | Create | `Scheduler` struct, `run()` loop, `next_run_for_schedule()` | +| `src/scheduler/tools.rs` | Create | 6 agent tools: `cron_add/list/remove/enable/disable/update` | +| `src/bus/message.rs` | Modify | Add `ChatMessage::user_with_source()` factory | +| `src/session/session.rs` | Modify | Add `handle_cron_message()` method | +| `src/gateway/mod.rs` | Modify | Create `Scheduler`, spawn background task, register cron tools | +| `src/session/session_id.rs` | Modify | Add `from_components()` convenience constructor | + +--- + +### Task 1: Add Cron Dependencies + +**Files:** +- Modify: `Cargo.toml` + +- [ ] **Step 1: Add `cron` and `chrono-tz` to dependencies** + +After line 28 (`tempfile = "3"`), insert: + +```toml +cron = "0.15" +chrono-tz = "0.10" +``` + +- [ ] **Step 2: Verify build** + +Run: `cargo check 2>&1` +Expected: Dependencies download and resolve. No code referencing them yet, so no compile errors. + +- [ ] **Step 3: Commit** + +```bash +git add Cargo.toml Cargo.lock +git commit -m "deps: add cron and chrono-tz for scheduled tasks" +``` + +--- + +### Task 2: Add SchedulerConfig + +**Files:** +- Modify: `src/config/mod.rs:143` (after existing `session_db_path` field) + +- [ ] **Step 1: Define `SchedulerConfig` struct** + +Add after the closing `}` of `GatewayConfig` (after line 143, before the `ClientConfig` block): + +```rust +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SchedulerConfig { + /// Whether the scheduler is enabled + #[serde(default = "default_scheduler_enabled")] + pub enabled: bool, + /// Poll interval in seconds (how often to check for due jobs) + #[serde(default = "default_poll_interval_secs")] + pub poll_interval_secs: u64, + /// Maximum concurrent job executions (currently sequential, reserved for future) + #[serde(default = "default_max_concurrent")] + pub max_concurrent: usize, +} + +fn default_scheduler_enabled() -> bool { true } + +fn default_poll_interval_secs() -> u64 { 60 } + +fn default_max_concurrent() -> usize { 1 } + +impl Default for SchedulerConfig { + fn default() -> Self { + Self { + enabled: true, + poll_interval_secs: 60, + max_concurrent: 1, + } + } +} +``` + +- [ ] **Step 2: Add `scheduler` field to `GatewayConfig`** + +In `GatewayConfig` (line 132-143), add after `session_db_path`: + +```rust +#[serde(default)] +pub scheduler: Option, +``` + +The full struct becomes: + +```rust +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct GatewayConfig { + #[serde(default = "default_gateway_host")] + pub host: String, + #[serde(default = "default_gateway_port")] + pub port: u16, + #[serde(default, rename = "session_ttl_hours")] + pub session_ttl_hours: Option, + #[serde(default, rename = "cleanup_interval_minutes")] + pub cleanup_interval_minutes: Option, + #[serde(default, rename = "session_db_path")] + pub session_db_path: Option, + #[serde(default)] + pub scheduler: Option, +} +``` + +- [ ] **Step 3: Update `Default for GatewayConfig`** + +In the `impl Default for GatewayConfig` block (around line 163), add: + +```rust +scheduler: None, +``` + +- [ ] **Step 4: Verify build** + +Run: `cargo check 2>&1` +Expected: Compiles successfully. + +- [ ] **Step 5: Commit** + +```bash +git add src/config/mod.rs +git commit -m "feat: add SchedulerConfig to GatewayConfig" +``` + +--- + +### Task 3: Create Scheduler Data Types + +**Files:** +- Create: `src/scheduler/types.rs` +- Modify: `src/scheduler/mod.rs` (stub) +- Modify: `src/lib.rs` + +- [ ] **Step 1: Register the scheduler module** + +In `src/lib.rs`, after `pub mod providers;` (line 14), add: + +```rust +pub mod scheduler; +``` + +- [ ] **Step 2: Create stub `src/scheduler/mod.rs`** + +```rust +pub mod types; +pub mod store; +pub mod tools; + +pub use types::{JobRun, Schedule, ScheduledJob}; +``` + +- [ ] **Step 3: Define types in `src/scheduler/types.rs`** + +```rust +use serde::{Deserialize, Serialize}; + +/// How a job is scheduled. Serialized as JSON in the database `schedule` column. +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(tag = "type")] +pub enum Schedule { + /// One-shot: fires once at a specific Unix millisecond timestamp, then disables. + #[serde(rename = "at")] + At { at: i64 }, + /// Recurring: fires every `every_ms` milliseconds. + #[serde(rename = "every")] + Every { every_ms: u64 }, + /// Recurring: fires on a cron schedule with optional timezone. + #[serde(rename = "cron")] + Cron { expr: String, tz: Option }, +} + +/// A scheduled job stored in the database. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScheduledJob { + pub id: String, + pub name: String, + /// JSON-serialized `Schedule` stored as TEXT in SQLite. + pub schedule: Schedule, + pub prompt: String, + pub channel: String, + pub chat_id: String, + pub model: Option, + pub enabled: bool, + pub delete_after_run: bool, + pub next_run_at: i64, + pub last_run_at: Option, + pub last_status: Option, + pub last_error: Option, + pub created_at: i64, + pub updated_at: i64, +} + +/// A single execution record for a job. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobRun { + pub id: i64, + pub job_id: String, + pub started_at: i64, + pub finished_at: i64, + pub status: String, + pub output: Option, + pub error: Option, + pub duration_ms: i64, +} +``` + +- [ ] **Step 4: Verify build** + +Run: `cargo check 2>&1` +Expected: Compiles successfully. + +- [ ] **Step 5: Commit** + +```bash +git add src/lib.rs src/scheduler/ +git commit -m "feat: add scheduler data types (Schedule, ScheduledJob, JobRun)" +``` + +--- + +### Task 4: Add `ChatMessage::user_with_source` and `Session::create_user_message_with_source` + +**Files:** +- Modify: `src/bus/message.rs:198` (after `pub fn tool(...)`) +- Modify: `src/session/session.rs:252` (after `create_user_message`) + +- [ ] **Step 1: Write the failing test** + +Create inline test in `src/bus/message.rs`, immediately before the closing `}` of the `impl ChatMessage` block (after line 197): + +```rust + pub fn user_with_source(content: impl Into, source: MessageSource) -> Self { + Self { + id: uuid::Uuid::new_v4().to_string(), + role: "user".to_string(), + content: content.into(), + media_refs: Vec::new(), + timestamp: current_timestamp(), + tool_call_id: None, + tool_name: None, + tool_calls: None, + source: Some(source), + } + } +``` + +No test for this factory method separately — it's pure data construction. + +- [ ] **Step 2: Write the failing test for `create_user_message_with_source`** + +In `src/session/session.rs`, after `create_user_message` (line 252), add this test: + +```rust + pub fn create_user_message_with_source(&self, content: &str, media_refs: Vec, source: crate::bus::MessageSource) -> ChatMessage { + if media_refs.is_empty() { + ChatMessage::user_with_source(content, source) + } else { + // For simplicity, ignore media in cron messages (media is always empty from scheduler) + ChatMessage::user_with_source(content, source) + } + } +``` + +No test for this separately — it's used in `handle_cron_message` which gets tested via integration. + +- [ ] **Step 3: Verify build** + +Run: `cargo check 2>&1` +Expected: Compiles successfully. + +- [ ] **Step 4: Commit** + +```bash +git add src/bus/message.rs src/session/session.rs +git commit -m "feat: add ChatMessage::user_with_source and Session::create_user_message_with_source" +``` + +--- + +### Task 5: Create SchedulerStore (SQLite Schema + CRUD) + +**Files:** +- Create: `src/scheduler/store.rs` + +- [ ] **Step 1: Write the failing test** + +Write inline tests at the bottom of `src/scheduler/store.rs`. These test `SchedulerStore` against an **in-memory** SQLite database: + +```rust +#[cfg(test)] +mod tests { + use super::*; + use crate::scheduler::types::{Schedule, ScheduledJob, JobRun}; + use sqlx::SqlitePool; + + fn now() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as i64 + } + + async fn setup_pool() -> SqlitePool { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + SchedulerStore::init(&pool).await.unwrap(); + pool + } + + #[tokio::test] + async fn test_init_creates_tables() { + let pool = setup_pool().await; + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scheduled_jobs") + .fetch_one(&pool).await.unwrap(); + assert_eq!(row.0, 0); + } + + #[tokio::test] + async fn test_add_and_get_job() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-1".into(), + name: "test job".into(), + schedule: Schedule::Every { every_ms: 3600000 }, + prompt: "say hello".into(), + channel: "cli_chat".into(), + chat_id: "conn-1".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t + 3600000, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + let got = SchedulerStore::get_job(&pool, "job-1").await.unwrap(); + assert_eq!(got.id, "job-1"); + assert_eq!(got.name, "test job"); + assert_eq!(got.prompt, "say hello"); + } + + #[tokio::test] + async fn test_list_jobs() { + let pool = setup_pool().await; + let t = now(); + for i in 0..3 { + let job = ScheduledJob { + id: format!("job-{}", i), + name: format!("job {}", i), + schedule: Schedule::Every { every_ms: 3600000 }, + prompt: "ping".into(), + channel: "cli_chat".into(), + chat_id: "conn-1".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t + 1000, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + } + let jobs = SchedulerStore::list_jobs(&pool).await.unwrap(); + assert_eq!(jobs.len(), 3); + } + + #[tokio::test] + async fn test_remove_job() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-rm".into(), name: "remove me".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + SchedulerStore::remove_job(&pool, "job-rm").await.unwrap(); + let result = SchedulerStore::get_job(&pool, "job-rm").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_set_enabled() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-toggle".into(), name: "toggle".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + SchedulerStore::set_enabled(&pool, "job-toggle", false).await.unwrap(); + let got = SchedulerStore::get_job(&pool, "job-toggle").await.unwrap(); + assert!(!got.enabled); + } + + #[tokio::test] + async fn test_due_jobs_only_returns_enabled_and_overdue() { + let pool = setup_pool().await; + let t = now(); + // Three jobs: due, not-due-yet, disabled-but-due + let jobs = vec![ + ScheduledJob { + id: "due".into(), name: "due".into(), + schedule: Schedule::At { at: t }, prompt: "1".into(), + channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t - 1000, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }, + ScheduledJob { + id: "future".into(), name: "future".into(), + schedule: Schedule::At { at: t + 99999999 }, prompt: "2".into(), + channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t + 99999999, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }, + ScheduledJob { + id: "disabled-due".into(), name: "disabled due".into(), + schedule: Schedule::At { at: t }, prompt: "3".into(), + channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: false, delete_after_run: false, + next_run_at: t - 1000, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }, + ]; + for j in &jobs { + SchedulerStore::add_job(&pool, j).await.unwrap(); + } + let due = SchedulerStore::due_jobs(&pool, t, 10).await.unwrap(); + assert_eq!(due.len(), 1); + assert_eq!(due[0].id, "due"); + } + + #[tokio::test] + async fn test_record_run_and_list_runs() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-run".into(), name: "run test".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + + let run = JobRun { + id: 0, job_id: "job-run".into(), + started_at: t, finished_at: t + 500, + status: "ok".into(), output: Some("hello".into()), + error: None, duration_ms: 500, + }; + SchedulerStore::record_run(&pool, &run).await.unwrap(); + let runs = SchedulerStore::list_runs(&pool, "job-run", 10).await.unwrap(); + assert_eq!(runs.len(), 1); + assert_eq!(runs[0].status, "ok"); + assert_eq!(runs[0].output.as_deref(), Some("hello")); + } + + #[tokio::test] + async fn test_update_job() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-update".into(), name: "old name".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "old prompt".into(), channel: "feishu".into(), + chat_id: "oc_1".into(), model: None, + enabled: true, delete_after_run: false, + next_run_at: t, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + SchedulerStore::update_job( + &pool, "job-update", + Some("new prompt".into()), + Some(Schedule::Every { every_ms: 60000 }), + None, None, None, + ).await.unwrap(); + let got = SchedulerStore::get_job(&pool, "job-update").await.unwrap(); + assert_eq!(got.prompt, "new prompt"); + } +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cargo test --lib scheduler::store -- 2>&1` +Expected: All tests FAIL because `SchedulerStore` and its methods don't exist yet (compilation errors). + +- [ ] **Step 3: Implement `SchedulerStore`** + +Write `src/scheduler/store.rs`: + +```rust +use sqlx::Row; +use sqlx::SqlitePool; + +use super::types::{JobRun, Schedule, ScheduledJob}; + +/// Persistence layer for scheduled jobs and run history. +/// Uses a shared `sqlx::SqlitePool` (obtained from `crate::storage::Storage::pool()`). +pub struct SchedulerStore; + +impl SchedulerStore { + /// Initialize the scheduler tables. Idempotent (CREATE TABLE IF NOT EXISTS). + pub async fn init(pool: &SqlitePool) -> Result<(), Box> { + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS scheduled_jobs ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + schedule TEXT NOT NULL, + prompt TEXT NOT NULL, + channel TEXT NOT NULL, + chat_id TEXT NOT NULL, + model TEXT, + enabled INTEGER NOT NULL DEFAULT 1, + delete_after_run INTEGER NOT NULL DEFAULT 0, + next_run_at INTEGER NOT NULL, + last_run_at INTEGER, + last_status TEXT, + last_error TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ) + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS job_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id TEXT NOT NULL REFERENCES scheduled_jobs(id) ON DELETE CASCADE, + started_at INTEGER NOT NULL, + finished_at INTEGER NOT NULL, + status TEXT NOT NULL, + output TEXT, + error TEXT, + duration_ms INTEGER NOT NULL + ) + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(enabled, next_run_at)", + ) + .execute(pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_runs_job_id ON job_runs(job_id)") + .execute(pool) + .await?; + + Ok(()) + } + + /// Insert a new job. Returns an error if a job with the same ID already exists. + pub async fn add_job( + pool: &SqlitePool, + job: &ScheduledJob, + ) -> Result<(), Box> { + let schedule_json = serde_json::to_string(&job.schedule)?; + sqlx::query( + r#" + INSERT INTO scheduled_jobs + (id, name, schedule, prompt, channel, chat_id, model, + enabled, delete_after_run, next_run_at, last_run_at, + last_status, last_error, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(&job.id) + .bind(&job.name) + .bind(&schedule_json) + .bind(&job.prompt) + .bind(&job.channel) + .bind(&job.chat_id) + .bind(&job.model) + .bind(job.enabled as i32) + .bind(job.delete_after_run as i32) + .bind(job.next_run_at) + .bind(job.last_run_at) + .bind(&job.last_status) + .bind(&job.last_error) + .bind(job.created_at) + .bind(job.updated_at) + .execute(pool) + .await?; + Ok(()) + } + + /// Fetch a single job by ID. + pub async fn get_job( + pool: &SqlitePool, + id: &str, + ) -> Result> { + let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?") + .bind(id) + .fetch_optional(pool) + .await? + .ok_or_else(|| format!("job not found: {id}"))?; + Ok(row_to_job(&row)?) + } + + /// List all jobs, ordered by next_run_at ascending. + pub async fn list_jobs( + pool: &SqlitePool, + ) -> Result, Box> { + let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC") + .fetch_all(pool) + .await?; + rows.iter().map(row_to_job).collect() + } + + /// Delete a job (cascades to job_runs). + pub async fn remove_job( + pool: &SqlitePool, + id: &str, + ) -> Result<(), Box> { + sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?") + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + /// Enable or disable a job. + pub async fn set_enabled( + pool: &SqlitePool, + id: &str, + enabled: bool, + ) -> Result<(), Box> { + sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?") + .bind(enabled as i32) + .bind(now_ms()) + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + /// Update selective fields on a job. Pass `None` for fields that should not change. + #[allow(clippy::too_many_arguments)] + pub async fn update_job( + pool: &SqlitePool, + id: &str, + prompt: Option, + schedule: Option, + channel: Option, + chat_id: Option, + model: Option, + ) -> Result<(), Box> { + let now = now_ms(); + + if let Some(p) = prompt { + sqlx::query( + "UPDATE scheduled_jobs SET prompt = ?, updated_at = ? WHERE id = ?", + ) + .bind(&p) + .bind(now) + .bind(id) + .execute(pool) + .await?; + } + if let Some(s) = schedule { + let json = serde_json::to_string(&s)?; + sqlx::query( + "UPDATE scheduled_jobs SET schedule = ?, updated_at = ? WHERE id = ?", + ) + .bind(&json) + .bind(now) + .bind(id) + .execute(pool) + .await?; + } + if let Some(c) = channel { + sqlx::query( + "UPDATE scheduled_jobs SET channel = ?, updated_at = ? WHERE id = ?", + ) + .bind(&c) + .bind(now) + .bind(id) + .execute(pool) + .await?; + } + if let Some(c) = chat_id { + sqlx::query( + "UPDATE scheduled_jobs SET chat_id = ?, updated_at = ? WHERE id = ?", + ) + .bind(&c) + .bind(now) + .bind(id) + .execute(pool) + .await?; + } + if let Some(m) = model { + sqlx::query( + "UPDATE scheduled_jobs SET model = ?, updated_at = ? WHERE id = ?", + ) + .bind(&m) + .bind(now) + .bind(id) + .execute(pool) + .await?; + } + Ok(()) + } + + /// Update next_run_at and last_run_at for a job (used during reschedule). + pub async fn set_next_run( + pool: &SqlitePool, + id: &str, + next_run_at: i64, + ) -> Result<(), Box> { + let now = now_ms(); + sqlx::query( + "UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?", + ) + .bind(next_run_at) + .bind(now) + .bind(now) + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + /// Set last_run_at and optionally last_status / last_error (used when starting job execution). + pub async fn touch_last_run( + pool: &SqlitePool, + id: &str, + at: i64, + ) -> Result<(), Box> { + sqlx::query( + "UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?", + ) + .bind(at) + .bind(at) + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + /// Set last_status and last_error after job completion. + pub async fn set_last_status( + pool: &SqlitePool, + id: &str, + status: &str, + error: Option<&str>, + ) -> Result<(), Box> { + let now = now_ms(); + sqlx::query( + "UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?", + ) + .bind(status) + .bind(error) + .bind(now) + .bind(id) + .execute(pool) + .await?; + Ok(()) + } + + /// Fetch enabled jobs whose next_run_at <= now, up to `limit`. + pub async fn due_jobs( + pool: &SqlitePool, + now: i64, + limit: usize, + ) -> Result, Box> { + let rows = sqlx::query( + "SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?", + ) + .bind(now) + .bind(limit as i64) + .fetch_all(pool) + .await?; + rows.iter().map(row_to_job).collect() + } + + /// Record a run execution. + pub async fn record_run( + pool: &SqlitePool, + run: &JobRun, + ) -> Result<(), Box> { + sqlx::query( + r#" + INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms) + VALUES (?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(&run.job_id) + .bind(run.started_at) + .bind(run.finished_at) + .bind(&run.status) + .bind(&run.output) + .bind(&run.error) + .bind(run.duration_ms) + .execute(pool) + .await?; + Ok(()) + } + + /// List the most recent `limit` runs for a job, newest first. + pub async fn list_runs( + pool: &SqlitePool, + job_id: &str, + limit: usize, + ) -> Result, Box> { + let rows = sqlx::query( + "SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?", + ) + .bind(job_id) + .bind(limit as i64) + .fetch_all(pool) + .await?; + rows.iter() + .map(|r| { + Ok(JobRun { + id: r.try_get("id")?, + job_id: r.try_get("job_id")?, + started_at: r.try_get("started_at")?, + finished_at: r.try_get("finished_at")?, + status: r.try_get("status")?, + output: r.try_get("output")?, + error: r.try_get("error")?, + duration_ms: r.try_get("duration_ms")?, + }) + }) + .collect() + } + + /// Delete jobs created before `before` that are disabled. + pub async fn cleanup_disabled( + pool: &SqlitePool, + before: i64, + ) -> Result<(), Box> { + sqlx::query( + "DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?", + ) + .bind(before) + .execute(pool) + .await?; + Ok(()) + } +} + +fn now_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64 +} + +fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> Result> { + let schedule_json: String = row.try_get("schedule")?; + let schedule: Schedule = serde_json::from_str(&schedule_json)?; + Ok(ScheduledJob { + id: row.try_get("id")?, + name: row.try_get("name")?, + schedule, + prompt: row.try_get("prompt")?, + channel: row.try_get("channel")?, + chat_id: row.try_get("chat_id")?, + model: row.try_get("model")?, + enabled: row.try_get::("enabled")? != 0, + delete_after_run: row.try_get::("delete_after_run")? != 0, + next_run_at: row.try_get("next_run_at")?, + last_run_at: row.try_get("last_run_at")?, + last_status: row.try_get("last_status")?, + last_error: row.try_get("last_error")?, + created_at: row.try_get("created_at")?, + updated_at: row.try_get("updated_at")?, + }) +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cargo test --lib scheduler::store -- 2>&1` +Expected: All 7 tests PASS. + +- [ ] **Step 5: Commit** + +```bash +git add src/scheduler/store.rs +git commit -m "feat: add SchedulerStore with SQLite schema and CRUD" +``` + +--- + +### Task 6: Add `handle_cron_message` to SessionManager + +**Files:** +- Modify: `src/session/session.rs:1247` (after `handle_message`) + +- [ ] **Step 1: Write the failing test** + +In `src/session/session.rs`, add this test inside the existing `#[cfg(test)] mod tests` block. If there is no existing test module, add it at the bottom of the file: + +```rust +// Note: this test verifies only the compile-time signature. Full integration +// testing of handle_cron_message requires a running Gateway (see integration test). +#[tokio::test] +async fn test_handle_cron_message_exists() { + // Compile-time assertion that the method exists on SessionManager + // The actual behavior is tested in the integration test + assert!(true); +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cargo test --lib session::session::test_handle_cron_message_exists -- 2>&1` +Expected: PASS (trivial test, always passes). The real verification is that `cargo check` succeeds after we add the method in step 3. + +- [ ] **Step 3: Add `handle_cron_message` method** + +In `src/session/session.rs`, add after `handle_message` (after line 1247): + +```rust + /// Handle a message triggered by a scheduled cron job. + /// + /// This is similar to `handle_message`, but the user message is created with + /// `SourceKind::ExternalTrigger` source metadata so that the cron job identity + /// is preserved in the conversation history and database. + pub async fn handle_cron_message( + &self, + channel: &str, + chat_id: &str, + prompt: &str, + job_id: &str, + job_name: &str, + ) -> Result { + use crate::bus::{MessageSource, SourceKind}; + + let unified_id = self.resolve_dialog_id(channel, chat_id).await?; + *self.current_source_session.lock().await = Some(unified_id.to_string()); + tracing::debug!(unified_id = %unified_id, job_id = %job_id, "handle_cron_message resolved"); + + let session = self.get_or_create_session(&unified_id).await?; + + // Normal message handling through LLM (cron messages skip slash command check) + let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel(); + + // Spawn notification publisher + { + use std::collections::HashMap; + use crate::bus::OutboundMessage; + let bus = self.bus.clone(); + let ch = channel.to_string(); + let cid = chat_id.to_string(); + tokio::spawn(async move { + while let Some(notif) = notify_rx.recv().await { + let mut metadata = HashMap::new(); + metadata.insert("_type".to_string(), "notification".to_string()); + let outbound = OutboundMessage { + channel: ch.clone(), + chat_id: cid.clone(), + content: notif, + reply_to: None, + media: vec![], + metadata, + }; + let _ = bus.publish_outbound(outbound).await; + } + }); + } + + let response: String = { + let mut session_guard = session.lock().await; + + // Build the user message with ExternalTrigger source + let source = MessageSource { + kind: SourceKind::ExternalTrigger, + from_channel: Some(channel.to_string()), + from_session: None, + from_user_id: None, + system_name: Some(job_name.to_string()), + task_id: Some(job_id.to_string()), + }; + let user_message = session_guard.create_user_message_with_source(prompt, vec![], source); + session_guard.add_message(user_message, true).await + .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; + + let mut history = session_guard.get_history().to_vec(); + + let skills_prompt = self.skills_loader.build_skills_prompt(); + let system_prompt = session_guard.build_system_prompt(&skills_prompt); + history.insert(0, ChatMessage::system(system_prompt)); + + let history = session_guard.compressor + .compress_if_needed(history) + .await?; + + let agent = session_guard.create_agent_with_notify(notify_tx)?; + let result = agent.process(history).await?; + + for msg in result.emitted_messages { + session_guard.add_message(msg, true).await + .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; + } + + if session_guard.should_generate_title() { + if let Err(e) = session_guard.generate_title().await { + tracing::warn!("failed to generate title: {}", e); + } + } + + result.final_response.content + }; + + #[cfg(debug_assertions)] + tracing::debug!( + channel = %channel, + chat_id = %chat_id, + job_id = %job_id, + response_len = %response.len(), + "Cron agent response received" + ); + + *self.current_source_session.lock().await = None; + + Ok(HandleResult::AgentResponse(response)) + } +``` + +- [ ] **Step 4: Verify build** + +Run: `cargo check 2>&1` +Expected: Compiles successfully. + +- [ ] **Step 5: Commit** + +```bash +git add src/session/session.rs +git commit -m "feat: add SessionManager::handle_cron_message for scheduled task execution" +``` + +--- + +### Task 7: Implement `next_run_for_schedule` and Scheduler Loop + +**Files:** +- Modify: `src/scheduler/mod.rs` (replace stub) + +- [ ] **Step 1: Write the failing test** + +In `src/scheduler/mod.rs`, add inline tests: + +```rust +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_next_run_at_schedule() { + let now = 1000000; + let next = next_run_for_schedule(&Schedule::At { at: 2000000 }, now); + assert_eq!(next, Some(2000000)); + } + + #[test] + fn test_next_run_every_schedule() { + let now = 1000000; + let next = next_run_for_schedule(&Schedule::Every { every_ms: 5000 }, now); + assert_eq!(next, Some(1005000)); + } + + #[test] + fn test_next_run_cron_schedule() { + use chrono::Timelike; + // Schedule: "every minute at second 0" + let expr = "0 * * * * *".to_string(); + let schedule = Schedule::Cron { expr, tz: None }; + // Use a known time + let now = 1000000; + let next = next_run_for_schedule(&schedule, now); + assert!(next.is_some()); + assert!(next.unwrap() > now); + } + + #[test] + fn test_next_run_cron_every_day_at_9am() { + // "0 0 9 * * *" = every day at 9:00:00 + let expr = "0 0 9 * * *".to_string(); + let schedule = Schedule::Cron { expr, tz: None }; + let now = 1000000; + let next = next_run_for_schedule(&schedule, now); + assert!(next.is_some()); + let next_ms = next.unwrap(); + assert!(next_ms > now); + // Round-trip to DateTime to check hour + let next_dt = ms_to_datetime(next_ms); + assert_eq!(next_dt.hour(), 9); + assert_eq!(next_dt.minute(), 0); + } +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cargo test --lib scheduler::mod -- 2>&1` +Expected: FAIL — `next_run_for_schedule` not defined. + +- [ ] **Step 3: Implement the full `src/scheduler/mod.rs`** + +```rust +pub mod types; +pub mod store; +pub mod tools; + +use std::sync::Arc; +use std::time::Instant; +use tokio::time; + +use crate::config::SchedulerConfig; +use crate::session::session::{self, HandleResult}; +use crate::session::SessionManager; + +pub use types::{JobRun, Schedule, ScheduledJob}; + +/// Compute the next execution time (Unix ms) for a schedule, given `from` (Unix ms). +/// Returns `None` if no next time can be determined (e.g., invalid cron expression). +pub fn next_run_for_schedule(schedule: &Schedule, from: i64) -> Option { + use chrono::{DateTime, TimeZone, Utc}; + + match schedule { + Schedule::At { at } => Some(*at), + Schedule::Every { every_ms } => Some(from + *every_ms as i64), + Schedule::Cron { expr, tz } => { + let schedule = cron::Schedule::from_str(expr.as_str()).ok()?; + // Convert Unix ms to UTC DateTime + let from_secs = (from / 1000) as i64; + let from_nanos = ((from % 1000) * 1_000_000) as u32; + let from_dt = Utc.timestamp_opt(from_secs, from_nanos).single()?; + + // If timezone is specified, convert from local to UTC for comparison + let next_utc = if let Some(ref tz_str) = tz { + let tz: chrono_tz::Tz = tz_str.parse().ok()?; + let from_local = from_dt.with_timezone(&tz); + // Find the next match in the given timezone, then convert back to UTC + let next_local = schedule.upcoming(tz).next()?; + next_local.with_timezone(&Utc) + } else { + schedule.upcoming(Utc).next()? + }; + + Some(next_utc.timestamp_millis()) + } + } +} + +/// Convert Unix milliseconds to DateTime. +fn ms_to_datetime(ms: i64) -> chrono::DateTime { + use chrono::{TimeZone, Utc}; + let secs = (ms / 1000) as i64; + let nanos = ((ms % 1000) * 1_000_000) as u32; + Utc.timestamp_opt(secs, nanos).single().unwrap_or_default() +} + +fn now_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64 +} + +/// The scheduler runs as a background tokio task, periodically checking for due jobs +/// and executing them via `SessionManager::handle_cron_message`. +pub struct Scheduler { + pool: sqlx::SqlitePool, + session_manager: Arc, + config: SchedulerConfig, +} + +impl Scheduler { + pub fn new( + pool: sqlx::SqlitePool, + session_manager: Arc, + config: SchedulerConfig, + ) -> Self { + Self { + pool, + session_manager, + config, + } + } + + /// Run the scheduler loop. This is a long-running async function meant to be + /// spawned as a tokio background task. + pub async fn run(self: Arc) { + let poll_duration = time::Duration::from_secs(self.config.poll_interval_secs); + let mut interval = time::interval(poll_duration); + + // Skip the immediate first tick (tokio::time::interval fires immediately on first poll) + interval.tick().await; + + tracing::info!( + "Scheduler started (poll interval: {}s, max concurrent: {})", + self.config.poll_interval_secs, + self.config.max_concurrent, + ); + + loop { + interval.tick().await; + + let now = now_ms(); + + let due = match store::SchedulerStore::due_jobs(&self.pool, now, self.config.max_concurrent).await { + Ok(jobs) => jobs, + Err(e) => { + tracing::error!("scheduler: failed to query due jobs: {}", e); + continue; + } + }; + + if due.is_empty() { + continue; + } + + tracing::info!("scheduler: found {} due job(s)", due.len()); + + for job in &due { + let start = Instant::now(); + let started_at = now_ms(); + + // Update last_run_at so next poll doesn't re-execute + if let Err(e) = store::SchedulerStore::touch_last_run(&self.pool, &job.id, started_at).await { + tracing::error!(job_id = %job.id, "scheduler: failed to touch last_run_at: {}", e); + continue; + } + + tracing::info!( + job_id = %job.id, + job_name = %job.name, + "scheduler: executing cron job" + ); + + let result = self + .session_manager + .handle_cron_message( + &job.channel, + &job.chat_id, + &job.prompt, + &job.id, + &job.name, + ) + .await; + + let finished_at = now_ms(); + let duration_ms = start.elapsed().as_millis() as i64; + + match result { + Ok(HandleResult::AgentResponse(output)) => { + let output_truncated = if output.len() > 8000 { + format!("{}...[truncated]", &output[..8000]) + } else { + output.clone() + }; + + let run = JobRun { + id: 0, + job_id: job.id.clone(), + started_at, + finished_at, + status: "ok".to_string(), + output: Some(output_truncated), + error: None, + duration_ms, + }; + + if let Err(e) = store::SchedulerStore::record_run(&self.pool, &run).await { + tracing::error!(job_id = %job.id, "scheduler: failed to record run: {}", e); + } + + if let Err(e) = store::SchedulerStore::set_last_status(&self.pool, &job.id, "ok", None).await { + tracing::error!(job_id = %job.id, "scheduler: failed to set last_status: {}", e); + } + + tracing::info!( + job_id = %job.id, + duration_ms = %duration_ms, + "scheduler: job completed successfully" + ); + } + Ok(HandleResult::CommandOutput(output)) => { + // Cron jobs shouldn't trigger commands, but handle gracefully + let run = JobRun { + id: 0, + job_id: job.id.clone(), + started_at, + finished_at, + status: "ok".to_string(), + output: Some(output), + error: None, + duration_ms, + }; + + let _ = store::SchedulerStore::record_run(&self.pool, &run).await; + } + Err(e) => { + let error_str = e.to_string(); + let run = JobRun { + id: 0, + job_id: job.id.clone(), + started_at, + finished_at, + status: "error".to_string(), + output: None, + error: Some(error_str.clone()), + duration_ms, + }; + + if let Err(e2) = store::SchedulerStore::record_run(&self.pool, &run).await { + tracing::error!(job_id = %job.id, "scheduler: failed to record error run: {}", e2); + } + + if let Err(e2) = store::SchedulerStore::set_last_status( + &self.pool, &job.id, "error", Some(&error_str), + ).await { + tracing::error!(job_id = %job.id, "scheduler: failed to set error status: {}", e2); + } + + tracing::error!( + job_id = %job.id, + duration_ms = %duration_ms, + error = %error_str, + "scheduler: job failed" + ); + } + } + + // Reschedule the job + if let Err(e) = self.reschedule_after_run(job).await { + tracing::error!(job_id = %job.id, "scheduler: failed to reschedule: {}", e); + } + } + } + } + + /// After a job runs, compute its next execution time or disable/delete it. + async fn reschedule_after_run( + &self, + job: &ScheduledJob, + ) -> Result<(), Box> { + let now = now_ms(); + + match &job.schedule { + Schedule::At { .. } => { + if job.delete_after_run { + store::SchedulerStore::remove_job(&self.pool, &job.id).await?; + tracing::info!(job_id = %job.id, "scheduler: one-shot job deleted after run"); + } else { + store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?; + tracing::info!(job_id = %job.id, "scheduler: one-shot job disabled after run"); + } + } + Schedule::Every { .. } | Schedule::Cron { .. } => { + if let Some(next) = next_run_for_schedule(&job.schedule, now) { + store::SchedulerStore::set_next_run(&self.pool, &job.id, next).await?; + tracing::info!(job_id = %job.id, next_run_at = %next, "scheduler: job rescheduled"); + } else { + tracing::error!(job_id = %job.id, "scheduler: could not compute next run — disabling job"); + store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?; + } + } + } + + Ok(()) + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cargo test --lib scheduler::mod -- 2>&1` +Expected: All 5 tests PASS (next_run_at_schedule, next_run_every_schedule, next_run_cron_schedule, next_run_cron_every_day_at_9am, and the compile-time assertion). + +- [ ] **Step 5: Commit** + +```bash +git add src/scheduler/mod.rs +git commit -m "feat: add Scheduler run loop and next_run_for_schedule" +``` + +--- + +### Task 8: Wire Scheduler into Gateway + +**Files:** +- Modify: `src/gateway/mod.rs` + +- [ ] **Step 1: Import scheduler module** + +In `src/gateway/mod.rs`, add import after `use crate::session::SessionManager;` (line 13): + +```rust +use crate::scheduler::Scheduler; +use crate::scheduler::store as scheduler_store; +``` + +- [ ] **Step 2: Create scheduler in `GatewayState::new()`** + +In `GatewayState::new()` (after line 76 — after `session_manager.register_outbound_tool(...)`), add: + +```rust + // Initialize scheduler if enabled in config + let scheduler_config = config.gateway.scheduler.clone().unwrap_or_default(); + if scheduler_config.enabled { + // Initialize scheduler tables in the database + scheduler_store::SchedulerStore::init(storage.pool()) + .await + .map_err(|e| format!("failed to initialize scheduler store: {}", e))?; + tracing::info!("Scheduler store initialized"); + } +``` + +- [ ] **Step 3: Spawn scheduler in `start_message_processing()`** + +In `start_message_processing()` (after line 170 — after the outbound dispatcher spawn), add: + +```rust + // Spawn scheduler background task if enabled + let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default(); + if scheduler_config.enabled { + let sched = Arc::new(Scheduler::new( + storage.pool().clone(), + self.session_manager.clone(), + scheduler_config, + )); + tokio::spawn(async move { + sched.run().await; + }); + tracing::info!("Scheduler background task spawned"); + } +``` + +Wait — there's a problem. `start_message_processing` takes `&self`, and `storage` is not a field of `GatewayState`. We need to pass the pool reference to `start_message_processing`. + +Let me adjust `start_message_processing` to accept a `pool: sqlx::SqlitePool` parameter, or store the pool in `GatewayState`. Looking at the existing code, `Storage` is only referenced in `GatewayState::new()` — it's not stored as a field. We need the pool. + +**Solution:** Store `sqlx::SqlitePool` directly in `GatewayState`. + +- [ ] **Step 3a: Add `pool` field to `GatewayState`** + +```rust +pub struct GatewayState { + pub config: Config, + pub workspace_dir: std::path::PathBuf, + pub session_manager: Arc, + pub channel_manager: ChannelManager, + pub pool: sqlx::SqlitePool, // <-- add this +} +``` + +- [ ] **Step 3b: Set pool in `GatewayState::new()`** + +After creating storage (line 53), clone the pool: + +```rust +let pool = storage.pool().clone(); +``` + +Then in the `Ok(Self { ... })` block, add: + +```rust + pool, +``` + +- [ ] **Step 3c: Use pool in `start_message_processing`** + +After the outbound dispatcher spawn block (after line 170), add: + +```rust + // Spawn scheduler background task if enabled + let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default(); + if scheduler_config.enabled { + let sched = Arc::new(Scheduler::new( + self.pool.clone(), + self.session_manager.clone(), + scheduler_config, + )); + tokio::spawn(async move { + sched.run().await; + }); + tracing::info!("Scheduler background task spawned"); + } +``` + +- [ ] **Step 4: Verify build** + +Run: `cargo check 2>&1` +Expected: Compiles successfully. + +- [ ] **Step 5: Commit** + +```bash +git add src/gateway/mod.rs +git commit -m "feat: wire scheduler into GatewayState startup and message processing" +``` + +--- + +### Task 9: Implement Agent Tools + +**Files:** +- Create: `src/scheduler/tools.rs` +- Modify: `src/gateway/mod.rs` (register tools) + +- [ ] **Step 1: Write the failing test** + +At the bottom of `src/scheduler/tools.rs`, add: + +```rust +#[cfg(test)] +mod tests { + use super::*; + use crate::scheduler::types::{Schedule, ScheduledJob}; + use crate::scheduler::store::SchedulerStore; + use serde_json::json; + use sqlx::SqlitePool; + + async fn setup_pool() -> SqlitePool { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + SchedulerStore::init(&pool).await.unwrap(); + pool + } + + fn now() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as i64 + } + + #[tokio::test] + async fn test_cron_add_tool() { + let pool = setup_pool().await; + let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]); + let result = tool.execute(json!({ + "schedule": {"type": "every", "every_ms": 3600000}, + "prompt": "report status", + "channel": "cli_chat", + "chat_id": "test-chat-1", + "name": "hourly report" + })).await.unwrap(); + assert!(result.success); + assert!(result.output.contains("hourly report")); + + let jobs = SchedulerStore::list_jobs(&pool).await.unwrap(); + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].name, "hourly report"); + } + + #[tokio::test] + async fn test_cron_add_invalid_channel() { + let pool = setup_pool().await; + let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]); + let result = tool.execute(json!({ + "schedule": {"type": "every", "every_ms": 3600000}, + "prompt": "test", + "channel": "nonexistent", + "chat_id": "x", + "name": "test" + })).await.unwrap(); + assert!(!result.success); + assert!(result.error.as_ref().unwrap().contains("Unknown channel")); + } + + #[tokio::test] + async fn test_cron_list_tool() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: uuid::Uuid::new_v4().to_string(), + name: "list-test".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t + 1000, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + + let tool = CronListTool::new(pool.clone()); + let result = tool.execute(json!({})).await.unwrap(); + assert!(result.success); + assert!(result.output.contains("list-test")); + } + + #[tokio::test] + async fn test_cron_remove_tool() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-rm-tool".into(), name: "rm me".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + + let tool = CronRemoveTool::new(pool.clone()); + let result = tool.execute(json!({"job_id": "job-rm-tool"})).await.unwrap(); + assert!(result.success); + assert!(SchedulerStore::get_job(&pool, "job-rm-tool").await.is_err()); + } + + #[tokio::test] + async fn test_cron_enable_disable_tools() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-toggle-tool".into(), name: "toggle".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + + let disable_tool = CronDisableTool::new(pool.clone()); + let result = disable_tool.execute(json!({"job_id": "job-toggle-tool"})).await.unwrap(); + assert!(result.success); + + let got = SchedulerStore::get_job(&pool, "job-toggle-tool").await.unwrap(); + assert!(!got.enabled); + + let enable_tool = CronEnableTool::new(pool.clone()); + let result = enable_tool.execute(json!({"job_id": "job-toggle-tool"})).await.unwrap(); + assert!(result.success); + + let got = SchedulerStore::get_job(&pool, "job-toggle-tool").await.unwrap(); + assert!(got.enabled); + } + + #[tokio::test] + async fn test_cron_update_tool() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-update-tool".into(), name: "old".into(), + schedule: Schedule::Every { every_ms: 3600000 }, + prompt: "old prompt".into(), channel: "feishu".into(), + chat_id: "oc_1".into(), model: None, + enabled: true, delete_after_run: false, + next_run_at: t + 1000, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + + let tool = CronUpdateTool::new(pool.clone()); + let result = tool.execute(json!({ + "job_id": "job-update-tool", + "prompt": "new prompt", + "schedule": {"type": "every", "every_ms": 60000} + })).await.unwrap(); + assert!(result.success); + + let got = SchedulerStore::get_job(&pool, "job-update-tool").await.unwrap(); + assert_eq!(got.prompt, "new prompt"); + } +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cargo test --lib scheduler::tools -- 2>&1` +Expected: FAIL — tool structs not defined. + +- [ ] **Step 3: Implement `src/scheduler/tools.rs`** + +```rust +use async_trait::async_trait; +use serde_json::{json, Value}; +use sqlx::SqlitePool; +use uuid::Uuid; + +use crate::scheduler::store::SchedulerStore; +use crate::scheduler::types::{Schedule, ScheduledJob}; +use crate::tools::traits::{Tool, ToolResult}; +use crate::scheduler::next_run_for_schedule; + +fn now_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64 +} + +// ── CronAddTool ────────────────────────────────────────────────────────────── + +pub struct CronAddTool { + pool: SqlitePool, + valid_channels: Vec, +} + +impl CronAddTool { + pub fn new(pool: SqlitePool, valid_channels: Vec) -> Self { + Self { pool, valid_channels } + } +} + +#[async_trait] +impl Tool for CronAddTool { + fn name(&self) -> &str { "cron_add" } + + fn description(&self) -> &str { + "Create a new scheduled task (cron job). The task will execute an AI prompt on a schedule \ + and deliver the result to the specified channel/chat. \ + Schedule formats: \ + - 'every': {\"type\":\"every\",\"every_ms\":3600000} for every hour, \ + - 'at': {\"type\":\"at\",\"at\":} for one-shot, \ + - 'cron': {\"type\":\"cron\",\"expr\":\"0 0 9 * * *\"} for cron expressions (6-field: sec min hour dom month dow)." + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "schedule": { + "type": "object", + "description": "Schedule definition. One of: {\"type\":\"every\",\"every_ms\":}, {\"type\":\"at\",\"at\":}, or {\"type\":\"cron\",\"expr\":\"\",\"tz\":\"\"}", + "required": ["type"] + }, + "prompt": { + "type": "string", + "description": "The AI prompt to execute on each trigger" + }, + "channel": { + "type": "string", + "description": "Target channel for delivering results (e.g., 'feishu', 'cli_chat')" + }, + "chat_id": { + "type": "string", + "description": "Target chat ID within the channel" + }, + "name": { + "type": "string", + "description": "Human-readable name for the job (optional, defaults to truncated prompt)" + }, + "model": { + "type": "string", + "description": "Optional model override for this job" + } + }, + "required": ["schedule", "prompt", "channel", "chat_id"] + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let schedule_json = args.get("schedule").ok_or_else(|| anyhow::anyhow!("missing 'schedule'"))?; + let schedule: Schedule = serde_json::from_value(schedule_json.clone()) + .map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?; + + let prompt = args.get("prompt").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if prompt.is_empty() { + return Ok(ToolResult { success: false, output: String::new(), error: Some("prompt is required".into()) }); + } + + let channel = args.get("channel").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if !self.valid_channels.contains(&channel) { + return Ok(ToolResult { + success: false, + output: format!("Unknown channel '{}'. Available: {}", + channel, self.valid_channels.join(", ")), + error: Some(format!("Unknown channel: {}", channel)), + }); + } + + let chat_id = args.get("chat_id").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if chat_id.is_empty() { + return Ok(ToolResult { success: false, output: String::new(), error: Some("chat_id is required".into()) }); + } + + let name = args.get("name").and_then(|v| v.as_str()).unwrap_or(&prompt[..prompt.len().min(50)]).to_string(); + let model = args.get("model").and_then(|v| v.as_str()).map(|s| s.to_string()); + + let now = now_ms(); + let next_run_at = next_run_for_schedule(&schedule, now) + .ok_or_else(|| anyhow::anyhow!("could not compute next run time from schedule"))?; + + let id = Uuid::new_v4().to_string(); + let job = ScheduledJob { + id: id.clone(), + name: name.clone(), + schedule, + prompt, + channel, + chat_id, + model, + enabled: true, + delete_after_run: false, + next_run_at, + last_run_at: None, + last_status: None, + last_error: None, + created_at: now, + updated_at: now, + }; + + SchedulerStore::add_job(&self.pool, &job).await?; + + Ok(ToolResult { + success: true, + output: format!("Scheduled job created: id={}, name=\"{}\", next_run_at={}", id, name, next_run_at), + error: None, + }) + } +} + +// ── CronListTool ───────────────────────────────────────────────────────────── + +pub struct CronListTool { + pool: SqlitePool, +} + +impl CronListTool { + pub fn new(pool: SqlitePool) -> Self { Self { pool } } +} + +#[async_trait] +impl Tool for CronListTool { + fn name(&self) -> &str { "cron_list" } + + fn description(&self) -> &str { + "List all scheduled tasks (cron jobs) with their status and next run time." + } + + fn read_only(&self) -> bool { true } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": ["all", "enabled", "disabled"], + "description": "Filter by job status (default: all)" + } + } + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let filter = args.get("status").and_then(|v| v.as_str()).unwrap_or("all"); + let jobs = SchedulerStore::list_jobs(&self.pool).await?; + + let filtered: Vec<&ScheduledJob> = match filter { + "enabled" => jobs.iter().filter(|j| j.enabled).collect(), + "disabled" => jobs.iter().filter(|j| !j.enabled).collect(), + _ => jobs.iter().collect(), + }; + + if filtered.is_empty() { + return Ok(ToolResult { success: true, output: "No scheduled jobs found.".into(), error: None }); + } + + let mut lines = Vec::new(); + for j in &filtered { + let status = if j.enabled { "🟢" } else { "⚫" }; + let last = match (&j.last_status, &j.last_error) { + (Some(s), _) if s == "ok" => " last:✅".to_string(), + (Some(_), Some(e)) => format!(" last:❌({})", &e[..e.len().min(40)]), + _ => String::new(), + }; + let model = j.model.as_deref().unwrap_or("default"); + lines.push(format!( + "{} id={} name=\"{}\" channel={} chat={} model={} next={}{}", + status, j.id, j.name, j.channel, j.chat_id, model, j.next_run_at, last + )); + } + + Ok(ToolResult { success: true, output: lines.join("\n"), error: None }) + } +} + +// ── CronRemoveTool ─────────────────────────────────────────────────────────── + +pub struct CronRemoveTool { + pool: SqlitePool, +} + +impl CronRemoveTool { + pub fn new(pool: SqlitePool) -> Self { Self { pool } } +} + +#[async_trait] +impl Tool for CronRemoveTool { + fn name(&self) -> &str { "cron_remove" } + + fn description(&self) -> &str { + "Delete a scheduled task permanently by its job ID. Use cron_list first to find the ID." + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "The ID of the job to delete" + } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if job_id.is_empty() { + return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) }); + } + + // Verify job exists + match SchedulerStore::get_job(&self.pool, &job_id).await { + Ok(_) => {}, + Err(_) => return Ok(ToolResult { success: false, output: format!("Job {} not found.", job_id), error: Some("not found".into()) }), + } + + SchedulerStore::remove_job(&self.pool, &job_id).await?; + Ok(ToolResult { success: true, output: format!("Job {} deleted.", job_id), error: None }) + } +} + +// ── CronEnableTool ─────────────────────────────────────────────────────────── + +pub struct CronEnableTool { + pool: SqlitePool, +} + +impl CronEnableTool { + pub fn new(pool: SqlitePool) -> Self { Self { pool } } +} + +#[async_trait] +impl Tool for CronEnableTool { + fn name(&self) -> &str { "cron_enable" } + + fn description(&self) -> &str { "Enable a disabled scheduled task by its job ID." } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "The ID of the job to enable" + } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if job_id.is_empty() { + return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) }); + } + + let job = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?; + + let next = next_run_for_schedule(&job.schedule, now_ms()); + SchedulerStore::set_enabled(&self.pool, &job_id, true).await?; + if let Some(n) = next { + SchedulerStore::set_next_run(&self.pool, &job_id, n).await?; + } + + Ok(ToolResult { success: true, output: format!("Job {} enabled.", job_id), error: None }) + } +} + +// ── CronDisableTool ────────────────────────────────────────────────────────── + +pub struct CronDisableTool { + pool: SqlitePool, +} + +impl CronDisableTool { + pub fn new(pool: SqlitePool) -> Self { Self { pool } } +} + +#[async_trait] +impl Tool for CronDisableTool { + fn name(&self) -> &str { "cron_disable" } + + fn description(&self) -> &str { "Disable a scheduled task by its job ID without deleting it." } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "The ID of the job to disable" + } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if job_id.is_empty() { + return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) }); + } + + let _ = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?; + SchedulerStore::set_enabled(&self.pool, &job_id, false).await?; + + Ok(ToolResult { success: true, output: format!("Job {} disabled.", job_id), error: None }) + } +} + +// ── CronUpdateTool ─────────────────────────────────────────────────────────── + +pub struct CronUpdateTool { + pool: SqlitePool, +} + +impl CronUpdateTool { + pub fn new(pool: SqlitePool) -> Self { Self { pool } } +} + +#[async_trait] +impl Tool for CronUpdateTool { + fn name(&self) -> &str { "cron_update" } + + fn description(&self) -> &str { + "Update fields of an existing scheduled task. Only specified fields are changed." + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "The ID of the job to update" + }, + "prompt": { + "type": "string", + "description": "New AI prompt" + }, + "schedule": { + "type": "object", + "description": "New schedule definition" + }, + "channel": { + "type": "string", + "description": "New target channel" + }, + "chat_id": { + "type": "string", + "description": "New target chat ID" + }, + "model": { + "type": "string", + "description": "New model override" + } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string(); + if job_id.is_empty() { + return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) }); + } + + let _ = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?; + + let prompt = args.get("prompt").and_then(|v| v.as_str()).map(|s| s.to_string()); + let schedule: Option = match args.get("schedule") { + Some(s) => Some(serde_json::from_value(s.clone()).map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?), + None => None, + }; + let channel = args.get("channel").and_then(|v| v.as_str()).map(|s| s.to_string()); + let chat_id = args.get("chat_id").and_then(|v| v.as_str()).map(|s| s.to_string()); + let model = args.get("model").and_then(|v| v.as_str()).map(|s| s.to_string()); + + SchedulerStore::update_job(&self.pool, &job_id, prompt, schedule, channel, chat_id, model).await?; + + // If schedule changed, recompute next_run_at + if args.get("schedule").is_some() { + let job = SchedulerStore::get_job(&self.pool, &job_id).await?; + if let Some(next) = next_run_for_schedule(&job.schedule, now_ms()) { + SchedulerStore::set_next_run(&self.pool, &job_id, next).await?; + } + } + + Ok(ToolResult { success: true, output: format!("Job {} updated.", job_id), error: None }) + } +} +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cargo test --lib scheduler::tools -- 2>&1` +Expected: All 6 tests PASS. + +- [ ] **Step 5: Register tools in gateway** + +In `src/gateway/mod.rs`, after `session_manager.register_outbound_tool(available_channels);` (line 76), add: + +```rust + // Register cron tools if scheduler is enabled + if config.gateway.scheduler.as_ref().map_or(true, |c| c.enabled) { + let scheduler_pool = pool.clone(); + let valid_channels = available_channels.clone(); + session_manager.tools().register( + crate::scheduler::tools::CronAddTool::new(scheduler_pool.clone(), valid_channels) + ); + session_manager.tools().register( + crate::scheduler::tools::CronListTool::new(scheduler_pool.clone()) + ); + session_manager.tools().register( + crate::scheduler::tools::CronRemoveTool::new(scheduler_pool.clone()) + ); + session_manager.tools().register( + crate::scheduler::tools::CronEnableTool::new(scheduler_pool.clone()) + ); + session_manager.tools().register( + crate::scheduler::tools::CronDisableTool::new(scheduler_pool.clone()) + ); + session_manager.tools().register( + crate::scheduler::tools::CronUpdateTool::new(scheduler_pool.clone()) + ); + tracing::info!("Cron tools registered"); + } +``` + +- [ ] **Step 6: Verify build** + +Run: `cargo check 2>&1` +Expected: Compiles successfully. + +- [ ] **Step 7: Commit** + +```bash +git add src/scheduler/tools.rs src/gateway/mod.rs +git commit -m "feat: add 6 cron agent tools (add/list/remove/enable/disable/update)" +``` + +--- + +### Task 10: Full Build, Lint, and Unit Tests + +**Files:** +- All above + +- [ ] **Step 1: Run full check** + +Run: `cargo check 2>&1` +Expected: Compiles successfully, no errors, no warnings. + +- [ ] **Step 2: Run cargo clippy** + +Run: `cargo clippy -- -D warnings 2>&1` +Expected: No warnings emitted. + +- [ ] **Step 3: Run all unit tests** + +Run: `cargo test --lib 2>&1` +Expected: All tests PASS, including existing tests and the 18 new scheduler tests. + +- [ ] **Step 4: Commit** + +```bash +git add --all +git commit -m "feat: complete scheduled tasks implementation" +``` + +--- + +### Task 11: Integration Test + +**Files:** +- Modify: `tests/test_integration.rs` (or create `tests/test_scheduler.rs`) + +- [ ] **Step 1: Write integration test** + +Create `tests/test_scheduler.rs`: + +```rust +//! Integration tests for the scheduled tasks (cron) system. +//! Requires `.env` with real API keys and a running Gateway. +//! Run with: cargo test --test test_scheduler -- --ignored + +use serde_json::json; + +/// This test verifies that the scheduler module compiles and its types are +/// accessible. Full integration testing requires a running Gateway instance +/// with API keys, so the actual job-execution flow is tested there. +#[tokio::test] +async fn test_scheduler_types_roundtrip() { + use picobot::scheduler::Schedule; + + // Verify JSON (de)serialization works + let s1 = Schedule::Every { every_ms: 3600000 }; + let json = serde_json::to_string(&s1).unwrap(); + let s2: Schedule = serde_json::from_str(&json).unwrap(); + match s2 { + Schedule::Every { every_ms } => assert_eq!(every_ms, 3600000), + _ => panic!("expected Every"), + } + + let s1 = Schedule::At { at: 1000000 }; + let json = serde_json::to_string(&s1).unwrap(); + let s2: Schedule = serde_json::from_str(&json).unwrap(); + match s2 { + Schedule::At { at } => assert_eq!(at, 1000000), + _ => panic!("expected At"), + } + + let s1 = Schedule::Cron { expr: "0 0 9 * * *".into(), tz: None }; + let json = serde_json::to_string(&s1).unwrap(); + let s2: Schedule = serde_json::from_str(&json).unwrap(); + match s2 { + Schedule::Cron { expr, tz } => { + assert_eq!(expr, "0 0 9 * * *"); + assert!(tz.is_none()); + } + _ => panic!("expected Cron"), + } +} + +/// Verify that next_run_for_schedule produces valid future timestamps. +#[test] +fn test_next_run_always_future() { + use picobot::scheduler::{next_run_for_schedule, Schedule}; + + let now = 1700000000000_i64; // Some fixed reference time + + let schedules = vec![ + Schedule::Every { every_ms: 60000 }, + Schedule::Cron { expr: "0 0 9 * * *".into(), tz: None }, + ]; + + for s in &schedules { + let next = next_run_for_schedule(s, now); + assert!(next.is_some(), "expected next run for {:?}", s); + assert!(next.unwrap() > now, "next run should be after now for {:?}", s); + } +} + +/// Verify that one-shot At schedule disables after run (logic tested in unit tests, +/// this just ensures the schedule round-trips correctly). +#[test] +fn test_at_schedule_is_one_shot_by_contract() { + use picobot::scheduler::Schedule; + // At schedules by definition fire once — the scheduler loop handles + // disabling/deleting after run. This test confirms the type is correct. + let s = Schedule::At { at: 1700000000000 }; + let json = serde_json::to_string(&s).unwrap(); + assert!(json.contains("\"at\"")); +} +``` + +- [ ] **Step 2: Run integration test** + +Run: `cargo test --test test_scheduler 2>&1` +Expected: All 3 tests PASS. + +- [ ] **Step 3: Commit** + +```bash +git add tests/test_scheduler.rs +git commit -m "test: add scheduler integration tests" +``` + +--- + +### Task 12: Final Verification + +- [ ] **Step 1: Full test suite** + +Run: `cargo test --lib 2>&1` +Expected: All tests PASS. + +Run: `cargo test --test test_scheduler 2>&1` +Expected: All tests PASS. + +- [ ] **Step 2: Build binary** + +Run: `cargo build 2>&1` +Expected: Build succeeds with no errors. + +- [ ] **Step 3: Grep for TODOs / placeholders** + +Run: `grep -rn "TODO\|FIXME\|TBD\|todo!\|unimplemented!" src/scheduler/ 2>&1` +Expected: No output (no placeholders). + +- [ ] **Step 4: Final commit (if any changes)** + +```bash +git status +git add --all +git commit -m "chore: final cleanup for scheduled tasks" +``` +``` + diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index a171a26..78f3f55 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -12,14 +12,13 @@ use crate::config::{Config, expand_path, ensure_workspace_dir}; use crate::logging; use crate::session::SessionManager; use crate::scheduler::Scheduler; -use crate::scheduler::store as scheduler_store; pub struct GatewayState { pub config: Config, pub workspace_dir: std::path::PathBuf, pub session_manager: Arc, pub channel_manager: ChannelManager, - pub pool: sqlx::SqlitePool, + pub storage: Arc, } impl GatewayState { @@ -56,8 +55,6 @@ impl GatewayState { ); tracing::info!("Session storage: {}", db_path.display()); - let pool = storage.pool().clone(); - // Create MessageBus first (shared by SessionManager and ChannelManager) let bus = MessageBus::new(100); @@ -84,30 +81,24 @@ impl GatewayState { // Initialize scheduler if enabled in config let scheduler_config = config.gateway.scheduler.clone().unwrap_or_default(); if scheduler_config.enabled { - scheduler_store::SchedulerStore::init(&pool) - .await - .map_err(|e| format!("failed to initialize scheduler store: {}", e))?; - tracing::info!("Scheduler store initialized"); - // Register cron tools - let scheduler_pool = pool.clone(); session_manager.tools().register( - crate::scheduler::tools::CronAddTool::new(scheduler_pool.clone(), valid_channels), + crate::tools::cron::CronAddTool::new(storage.clone(), valid_channels), ); session_manager.tools().register( - crate::scheduler::tools::CronListTool::new(scheduler_pool.clone()), + crate::tools::cron::CronListTool::new(storage.clone()), ); session_manager.tools().register( - crate::scheduler::tools::CronRemoveTool::new(scheduler_pool.clone()), + crate::tools::cron::CronRemoveTool::new(storage.clone()), ); session_manager.tools().register( - crate::scheduler::tools::CronEnableTool::new(scheduler_pool.clone()), + crate::tools::cron::CronEnableTool::new(storage.clone()), ); session_manager.tools().register( - crate::scheduler::tools::CronDisableTool::new(scheduler_pool.clone()), + crate::tools::cron::CronDisableTool::new(storage.clone()), ); session_manager.tools().register( - crate::scheduler::tools::CronUpdateTool::new(scheduler_pool.clone()), + crate::tools::cron::CronUpdateTool::new(storage.clone()), ); tracing::info!("Cron tools registered"); } @@ -117,7 +108,7 @@ impl GatewayState { workspace_dir: workspace_path, session_manager: session_manager.clone(), channel_manager, - pool, + storage, }) } @@ -211,7 +202,7 @@ impl GatewayState { let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default(); if scheduler_config.enabled { let sched = Arc::new(Scheduler::new( - self.pool.clone(), + self.storage.clone(), self.session_manager.clone(), scheduler_config, )); diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 2102fbb..f70c4d1 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -1,6 +1,4 @@ pub mod types; -pub mod store; -pub mod tools; use std::sync::Arc; use std::time::Instant; @@ -9,8 +7,9 @@ use tokio::time; use crate::config::SchedulerConfig; use crate::session::session::HandleResult; use crate::session::SessionManager; +use crate::storage::{JobRun, ScheduledJob, Storage}; -pub use types::{JobRun, Schedule, ScheduledJob}; +pub use types::Schedule; /// Compute the next execution time (Unix ms) for a schedule, given `from` (Unix ms). /// Returns `None` if no next time can be determined (e.g., invalid cron expression). @@ -51,19 +50,19 @@ fn now_ms() -> i64 { /// The scheduler runs as a background tokio task, periodically checking for due jobs /// and executing them via `SessionManager::handle_cron_message`. pub struct Scheduler { - pool: sqlx::SqlitePool, + storage: Arc, session_manager: Arc, config: SchedulerConfig, } impl Scheduler { pub fn new( - pool: sqlx::SqlitePool, + storage: Arc, session_manager: Arc, config: SchedulerConfig, ) -> Self { Self { - pool, + storage, session_manager, config, } @@ -88,7 +87,7 @@ impl Scheduler { let now = now_ms(); - let due = match store::SchedulerStore::due_jobs(&self.pool, now, self.config.max_concurrent).await { + let due = match self.storage.due_scheduled_jobs(now, self.config.max_concurrent).await { Ok(jobs) => jobs, Err(e) => { tracing::error!("scheduler: failed to query due jobs: {}", e); @@ -106,7 +105,7 @@ impl Scheduler { let start = Instant::now(); let started_at = now_ms(); - if let Err(e) = store::SchedulerStore::touch_last_run(&self.pool, &job.id, started_at).await { + if let Err(e) = self.storage.touch_scheduled_job_last_run(&job.id, started_at).await { tracing::error!(job_id = %job.id, "scheduler: failed to touch last_run_at: {}", e); continue; } @@ -150,11 +149,11 @@ impl Scheduler { duration_ms, }; - if let Err(e) = store::SchedulerStore::record_run(&self.pool, &run).await { + if let Err(e) = self.storage.record_scheduled_job_run(&run).await { tracing::error!(job_id = %job.id, "scheduler: failed to record run: {}", e); } - if let Err(e) = store::SchedulerStore::set_last_status(&self.pool, &job.id, "ok", None).await { + if let Err(e) = self.storage.set_scheduled_job_last_status(&job.id, "ok", None).await { tracing::error!(job_id = %job.id, "scheduler: failed to set last_status: {}", e); } @@ -176,7 +175,7 @@ impl Scheduler { duration_ms, }; - let _ = store::SchedulerStore::record_run(&self.pool, &run).await; + let _ = self.storage.record_scheduled_job_run(&run).await; } Err(e) => { let error_str = e.to_string(); @@ -191,12 +190,12 @@ impl Scheduler { duration_ms, }; - if let Err(e2) = store::SchedulerStore::record_run(&self.pool, &run).await { + if let Err(e2) = self.storage.record_scheduled_job_run(&run).await { tracing::error!(job_id = %job.id, "scheduler: failed to record error run: {}", e2); } - if let Err(e2) = store::SchedulerStore::set_last_status( - &self.pool, &job.id, "error", Some(&error_str), + if let Err(e2) = self.storage.set_scheduled_job_last_status( + &job.id, "error", Some(&error_str), ).await { tracing::error!(job_id = %job.id, "scheduler: failed to set error status: {}", e2); } @@ -218,29 +217,26 @@ impl Scheduler { } /// After a job runs, compute its next execution time or disable/delete it. - async fn reschedule_after_run( - &self, - job: &ScheduledJob, - ) -> anyhow::Result<()> { + async fn reschedule_after_run(&self, job: &ScheduledJob) -> anyhow::Result<()> { let now = now_ms(); match &job.schedule { Schedule::At { .. } => { if job.delete_after_run { - store::SchedulerStore::remove_job(&self.pool, &job.id).await?; + self.storage.remove_scheduled_job(&job.id).await?; tracing::info!(job_id = %job.id, "scheduler: one-shot job deleted after run"); } else { - store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?; + self.storage.set_scheduled_job_enabled(&job.id, false).await?; tracing::info!(job_id = %job.id, "scheduler: one-shot job disabled after run"); } } Schedule::Every { .. } | Schedule::Cron { .. } => { if let Some(next) = next_run_for_schedule(&job.schedule, now) { - store::SchedulerStore::set_next_run(&self.pool, &job.id, next).await?; + self.storage.set_scheduled_job_next_run(&job.id, next).await?; tracing::info!(job_id = %job.id, next_run_at = %next, "scheduler: job rescheduled"); } else { tracing::error!(job_id = %job.id, "scheduler: could not compute next run -- disabling job"); - store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?; + self.storage.set_scheduled_job_enabled(&job.id, false).await?; } } } diff --git a/src/scheduler/store.rs b/src/scheduler/store.rs deleted file mode 100644 index f7d5dd9..0000000 --- a/src/scheduler/store.rs +++ /dev/null @@ -1,668 +0,0 @@ -use sqlx::Row; -use sqlx::SqlitePool; - -use super::types::{JobRun, Schedule, ScheduledJob}; - -/// Persistence layer for scheduled jobs and run history. -/// Uses a shared `sqlx::SqlitePool` (obtained from `crate::storage::Storage::pool()`). -pub struct SchedulerStore; - -impl SchedulerStore { - /// Initialize the scheduler tables. Idempotent (CREATE TABLE IF NOT EXISTS). - pub async fn init(pool: &SqlitePool) -> anyhow::Result<()> { - sqlx::query( - r#" - CREATE TABLE IF NOT EXISTS scheduled_jobs ( - id TEXT PRIMARY KEY, - name TEXT NOT NULL, - schedule TEXT NOT NULL, - prompt TEXT NOT NULL, - channel TEXT NOT NULL, - chat_id TEXT NOT NULL, - model TEXT, - enabled INTEGER NOT NULL DEFAULT 1, - delete_after_run INTEGER NOT NULL DEFAULT 0, - next_run_at INTEGER NOT NULL, - last_run_at INTEGER, - last_status TEXT, - last_error TEXT, - created_at INTEGER NOT NULL, - updated_at INTEGER NOT NULL - ) - "#, - ) - .execute(pool) - .await?; - - sqlx::query( - r#" - CREATE TABLE IF NOT EXISTS job_runs ( - id INTEGER PRIMARY KEY AUTOINCREMENT, - job_id TEXT NOT NULL REFERENCES scheduled_jobs(id) ON DELETE CASCADE, - started_at INTEGER NOT NULL, - finished_at INTEGER NOT NULL, - status TEXT NOT NULL, - output TEXT, - error TEXT, - duration_ms INTEGER NOT NULL - ) - "#, - ) - .execute(pool) - .await?; - - sqlx::query( - "CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(enabled, next_run_at)", - ) - .execute(pool) - .await?; - - sqlx::query("CREATE INDEX IF NOT EXISTS idx_runs_job_id ON job_runs(job_id)") - .execute(pool) - .await?; - - Ok(()) - } - - /// Insert a new job. Returns an error if a job with the same ID already exists. - pub async fn add_job( - pool: &SqlitePool, - job: &ScheduledJob, - ) -> anyhow::Result<()> { - let schedule_json = serde_json::to_string(&job.schedule)?; - sqlx::query( - r#" - INSERT INTO scheduled_jobs - (id, name, schedule, prompt, channel, chat_id, model, - enabled, delete_after_run, next_run_at, last_run_at, - last_status, last_error, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - "#, - ) - .bind(&job.id) - .bind(&job.name) - .bind(&schedule_json) - .bind(&job.prompt) - .bind(&job.channel) - .bind(&job.chat_id) - .bind(&job.model) - .bind(job.enabled as i32) - .bind(job.delete_after_run as i32) - .bind(job.next_run_at) - .bind(job.last_run_at) - .bind(&job.last_status) - .bind(&job.last_error) - .bind(job.created_at) - .bind(job.updated_at) - .execute(pool) - .await?; - Ok(()) - } - - /// Fetch a single job by ID. - pub async fn get_job( - pool: &SqlitePool, - id: &str, - ) -> anyhow::Result { - let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?") - .bind(id) - .fetch_optional(pool) - .await? - .ok_or_else(|| anyhow::anyhow!("job not found: {id}"))?; - row_to_job(&row) - } - - /// List all jobs, ordered by next_run_at ascending. - pub async fn list_jobs( - pool: &SqlitePool, - ) -> anyhow::Result> { - let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC") - .fetch_all(pool) - .await?; - rows.iter().map(row_to_job).collect() - } - - /// Delete a job (cascades to job_runs). - pub async fn remove_job( - pool: &SqlitePool, - id: &str, - ) -> anyhow::Result<()> { - sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?") - .bind(id) - .execute(pool) - .await?; - Ok(()) - } - - /// Enable or disable a job. - pub async fn set_enabled( - pool: &SqlitePool, - id: &str, - enabled: bool, - ) -> anyhow::Result<()> { - sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?") - .bind(enabled as i32) - .bind(now_ms()) - .bind(id) - .execute(pool) - .await?; - Ok(()) - } - - /// Update selective fields on a job. Pass `None` for fields that should not change. - pub async fn update_job( - pool: &SqlitePool, - id: &str, - prompt: Option, - schedule: Option, - channel: Option, - chat_id: Option, - model: Option, - ) -> anyhow::Result<()> { - let now = now_ms(); - - if let Some(p) = prompt { - sqlx::query( - "UPDATE scheduled_jobs SET prompt = ?, updated_at = ? WHERE id = ?", - ) - .bind(&p) - .bind(now) - .bind(id) - .execute(pool) - .await?; - } - if let Some(s) = schedule { - let json = serde_json::to_string(&s)?; - sqlx::query( - "UPDATE scheduled_jobs SET schedule = ?, updated_at = ? WHERE id = ?", - ) - .bind(&json) - .bind(now) - .bind(id) - .execute(pool) - .await?; - } - if let Some(c) = channel { - sqlx::query( - "UPDATE scheduled_jobs SET channel = ?, updated_at = ? WHERE id = ?", - ) - .bind(&c) - .bind(now) - .bind(id) - .execute(pool) - .await?; - } - if let Some(c) = chat_id { - sqlx::query( - "UPDATE scheduled_jobs SET chat_id = ?, updated_at = ? WHERE id = ?", - ) - .bind(&c) - .bind(now) - .bind(id) - .execute(pool) - .await?; - } - if let Some(m) = model { - sqlx::query( - "UPDATE scheduled_jobs SET model = ?, updated_at = ? WHERE id = ?", - ) - .bind(&m) - .bind(now) - .bind(id) - .execute(pool) - .await?; - } - Ok(()) - } - - /// Update next_run_at and last_run_at for a job (used during reschedule). - pub async fn set_next_run( - pool: &SqlitePool, - id: &str, - next_run_at: i64, - ) -> anyhow::Result<()> { - let now = now_ms(); - sqlx::query( - "UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?", - ) - .bind(next_run_at) - .bind(now) - .bind(now) - .bind(id) - .execute(pool) - .await?; - Ok(()) - } - - /// Set last_run_at (used when starting job execution). - pub async fn touch_last_run( - pool: &SqlitePool, - id: &str, - at: i64, - ) -> anyhow::Result<()> { - sqlx::query( - "UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?", - ) - .bind(at) - .bind(at) - .bind(id) - .execute(pool) - .await?; - Ok(()) - } - - /// Set last_status and last_error after job completion. - pub async fn set_last_status( - pool: &SqlitePool, - id: &str, - status: &str, - error: Option<&str>, - ) -> anyhow::Result<()> { - let now = now_ms(); - sqlx::query( - "UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?", - ) - .bind(status) - .bind(error) - .bind(now) - .bind(id) - .execute(pool) - .await?; - Ok(()) - } - - /// Fetch enabled jobs whose next_run_at <= now, up to `limit`. - pub async fn due_jobs( - pool: &SqlitePool, - now: i64, - limit: usize, - ) -> anyhow::Result> { - let rows = sqlx::query( - "SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?", - ) - .bind(now) - .bind(limit as i64) - .fetch_all(pool) - .await?; - rows.iter().map(row_to_job).collect() - } - - /// Record a run execution. - pub async fn record_run( - pool: &SqlitePool, - run: &JobRun, - ) -> anyhow::Result<()> { - sqlx::query( - r#" - INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms) - VALUES (?, ?, ?, ?, ?, ?, ?) - "#, - ) - .bind(&run.job_id) - .bind(run.started_at) - .bind(run.finished_at) - .bind(&run.status) - .bind(&run.output) - .bind(&run.error) - .bind(run.duration_ms) - .execute(pool) - .await?; - Ok(()) - } - - /// List the most recent `limit` runs for a job, newest first. - pub async fn list_runs( - pool: &SqlitePool, - job_id: &str, - limit: usize, - ) -> anyhow::Result> { - let rows = sqlx::query( - "SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?", - ) - .bind(job_id) - .bind(limit as i64) - .fetch_all(pool) - .await?; - rows.iter() - .map(|r| { - Ok(JobRun { - id: r.try_get("id")?, - job_id: r.try_get("job_id")?, - started_at: r.try_get("started_at")?, - finished_at: r.try_get("finished_at")?, - status: r.try_get("status")?, - output: r.try_get("output")?, - error: r.try_get("error")?, - duration_ms: r.try_get("duration_ms")?, - }) - }) - .collect() - } - - /// Delete disabled jobs whose updated_at is before `before`. - pub async fn cleanup_disabled( - pool: &SqlitePool, - before: i64, - ) -> anyhow::Result<()> { - sqlx::query( - "DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?", - ) - .bind(before) - .execute(pool) - .await?; - Ok(()) - } -} - -fn now_ms() -> i64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_millis() as i64 -} - -fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> anyhow::Result { - let schedule_json: String = row.try_get("schedule")?; - let schedule: Schedule = serde_json::from_str(&schedule_json)?; - Ok(ScheduledJob { - id: row.try_get("id")?, - name: row.try_get("name")?, - schedule, - prompt: row.try_get("prompt")?, - channel: row.try_get("channel")?, - chat_id: row.try_get("chat_id")?, - model: row.try_get("model")?, - enabled: row.try_get::("enabled")? != 0, - delete_after_run: row.try_get::("delete_after_run")? != 0, - next_run_at: row.try_get("next_run_at")?, - last_run_at: row.try_get("last_run_at")?, - last_status: row.try_get("last_status")?, - last_error: row.try_get("last_error")?, - created_at: row.try_get("created_at")?, - updated_at: row.try_get("updated_at")?, - }) -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::scheduler::types::{JobRun, Schedule, ScheduledJob}; - use sqlx::SqlitePool; - - fn now() -> i64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis() as i64 - } - - async fn setup_pool() -> SqlitePool { - let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); - SchedulerStore::init(&pool).await.unwrap(); - pool - } - - #[tokio::test] - async fn test_init_creates_tables() { - let pool = setup_pool().await; - let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scheduled_jobs") - .fetch_one(&pool) - .await - .unwrap(); - assert_eq!(row.0, 0); - } - - #[tokio::test] - async fn test_add_and_get_job() { - let pool = setup_pool().await; - let t = now(); - let job = ScheduledJob { - id: "job-1".into(), - name: "test job".into(), - schedule: Schedule::Every { every_ms: 3600000 }, - prompt: "say hello".into(), - channel: "cli_chat".into(), - chat_id: "conn-1".into(), - model: None, - enabled: true, - delete_after_run: false, - next_run_at: t + 3600000, - last_run_at: None, - last_status: None, - last_error: None, - created_at: t, - updated_at: t, - }; - SchedulerStore::add_job(&pool, &job).await.unwrap(); - let got = SchedulerStore::get_job(&pool, "job-1").await.unwrap(); - assert_eq!(got.id, "job-1"); - assert_eq!(got.name, "test job"); - assert_eq!(got.prompt, "say hello"); - } - - #[tokio::test] - async fn test_list_jobs() { - let pool = setup_pool().await; - let t = now(); - for i in 0..3 { - let job = ScheduledJob { - id: format!("job-{}", i), - name: format!("job {}", i), - schedule: Schedule::Every { every_ms: 3600000 }, - prompt: "ping".into(), - channel: "cli_chat".into(), - chat_id: "conn-1".into(), - model: None, - enabled: true, - delete_after_run: false, - next_run_at: t + 1000, - last_run_at: None, - last_status: None, - last_error: None, - created_at: t, - updated_at: t, - }; - SchedulerStore::add_job(&pool, &job).await.unwrap(); - } - let jobs = SchedulerStore::list_jobs(&pool).await.unwrap(); - assert_eq!(jobs.len(), 3); - } - - #[tokio::test] - async fn test_remove_job() { - let pool = setup_pool().await; - let t = now(); - let job = ScheduledJob { - id: "job-rm".into(), - name: "remove me".into(), - schedule: Schedule::Every { every_ms: 1000 }, - prompt: "hi".into(), - channel: "cli_chat".into(), - chat_id: "c".into(), - model: None, - enabled: true, - delete_after_run: false, - next_run_at: t, - last_run_at: None, - last_status: None, - last_error: None, - created_at: t, - updated_at: t, - }; - SchedulerStore::add_job(&pool, &job).await.unwrap(); - SchedulerStore::remove_job(&pool, "job-rm").await.unwrap(); - let result = SchedulerStore::get_job(&pool, "job-rm").await; - assert!(result.is_err()); - } - - #[tokio::test] - async fn test_set_enabled() { - let pool = setup_pool().await; - let t = now(); - let job = ScheduledJob { - id: "job-toggle".into(), - name: "toggle".into(), - schedule: Schedule::Every { every_ms: 1000 }, - prompt: "hi".into(), - channel: "cli_chat".into(), - chat_id: "c".into(), - model: None, - enabled: true, - delete_after_run: false, - next_run_at: t, - last_run_at: None, - last_status: None, - last_error: None, - created_at: t, - updated_at: t, - }; - SchedulerStore::add_job(&pool, &job).await.unwrap(); - SchedulerStore::set_enabled(&pool, "job-toggle", false).await.unwrap(); - let got = SchedulerStore::get_job(&pool, "job-toggle").await.unwrap(); - assert!(!got.enabled); - } - - #[tokio::test] - async fn test_due_jobs_only_returns_enabled_and_overdue() { - let pool = setup_pool().await; - let t = now(); - let jobs = vec![ - ScheduledJob { - id: "due".into(), - name: "due".into(), - schedule: Schedule::At { at: t }, - prompt: "1".into(), - channel: "cli_chat".into(), - chat_id: "c".into(), - model: None, - enabled: true, - delete_after_run: false, - next_run_at: t - 1000, - last_run_at: None, - last_status: None, - last_error: None, - created_at: t, - updated_at: t, - }, - ScheduledJob { - id: "future".into(), - name: "future".into(), - schedule: Schedule::At { at: t + 99999999 }, - prompt: "2".into(), - channel: "cli_chat".into(), - chat_id: "c".into(), - model: None, - enabled: true, - delete_after_run: false, - next_run_at: t + 99999999, - last_run_at: None, - last_status: None, - last_error: None, - created_at: t, - updated_at: t, - }, - ScheduledJob { - id: "disabled-due".into(), - name: "disabled due".into(), - schedule: Schedule::At { at: t }, - prompt: "3".into(), - channel: "cli_chat".into(), - chat_id: "c".into(), - model: None, - enabled: false, - delete_after_run: false, - next_run_at: t - 1000, - last_run_at: None, - last_status: None, - last_error: None, - created_at: t, - updated_at: t, - }, - ]; - for j in &jobs { - SchedulerStore::add_job(&pool, j).await.unwrap(); - } - let due = SchedulerStore::due_jobs(&pool, t, 10).await.unwrap(); - assert_eq!(due.len(), 1); - assert_eq!(due[0].id, "due"); - } - - #[tokio::test] - async fn test_record_run_and_list_runs() { - let pool = setup_pool().await; - let t = now(); - let job = ScheduledJob { - id: "job-run".into(), - name: "run test".into(), - schedule: Schedule::Every { every_ms: 1000 }, - prompt: "hi".into(), - channel: "cli_chat".into(), - chat_id: "c".into(), - model: None, - enabled: true, - delete_after_run: false, - next_run_at: t, - last_run_at: None, - last_status: None, - last_error: None, - created_at: t, - updated_at: t, - }; - SchedulerStore::add_job(&pool, &job).await.unwrap(); - - let run = JobRun { - id: 0, - job_id: "job-run".into(), - started_at: t, - finished_at: t + 500, - status: "ok".into(), - output: Some("hello".into()), - error: None, - duration_ms: 500, - }; - SchedulerStore::record_run(&pool, &run).await.unwrap(); - let runs = SchedulerStore::list_runs(&pool, "job-run", 10) - .await - .unwrap(); - assert_eq!(runs.len(), 1); - assert_eq!(runs[0].status, "ok"); - assert_eq!(runs[0].output.as_deref(), Some("hello")); - } - - #[tokio::test] - async fn test_update_job() { - let pool = setup_pool().await; - let t = now(); - let job = ScheduledJob { - id: "job-update".into(), - name: "old name".into(), - schedule: Schedule::Every { every_ms: 1000 }, - prompt: "old prompt".into(), - channel: "feishu".into(), - chat_id: "oc_1".into(), - model: None, - enabled: true, - delete_after_run: false, - next_run_at: t, - last_run_at: None, - last_status: None, - last_error: None, - created_at: t, - updated_at: t, - }; - SchedulerStore::add_job(&pool, &job).await.unwrap(); - SchedulerStore::update_job( - &pool, - "job-update", - Some("new prompt".into()), - Some(Schedule::Every { every_ms: 60000 }), - None, - None, - None, - ) - .await - .unwrap(); - let got = SchedulerStore::get_job(&pool, "job-update").await.unwrap(); - assert_eq!(got.prompt, "new prompt"); - } -} diff --git a/src/scheduler/types.rs b/src/scheduler/types.rs index 865af41..5f83ced 100644 --- a/src/scheduler/types.rs +++ b/src/scheduler/types.rs @@ -14,37 +14,3 @@ pub enum Schedule { #[serde(rename = "cron")] Cron { expr: String, tz: Option }, } - -/// A scheduled job stored in the database. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ScheduledJob { - pub id: String, - pub name: String, - /// JSON-serialized `Schedule` stored as TEXT in SQLite. - pub schedule: Schedule, - pub prompt: String, - pub channel: String, - pub chat_id: String, - pub model: Option, - pub enabled: bool, - pub delete_after_run: bool, - pub next_run_at: i64, - pub last_run_at: Option, - pub last_status: Option, - pub last_error: Option, - pub created_at: i64, - pub updated_at: i64, -} - -/// A single execution record for a job. -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct JobRun { - pub id: i64, - pub job_id: String, - pub started_at: i64, - pub finished_at: i64, - pub status: String, - pub output: Option, - pub error: Option, - pub duration_ms: i64, -} diff --git a/src/storage/mod.rs b/src/storage/mod.rs index b9a7a8d..df77744 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -1,15 +1,17 @@ pub mod error; -pub mod session; pub mod message; +pub mod scheduler; +pub mod session; pub use error::StorageError; +pub use scheduler::{JobRun, ScheduledJob}; use sqlx::{Pool, Row, Sqlite, SqlitePool}; use tokio::time::{sleep, Duration}; use std::path::Path; pub struct Storage { - pool: Pool, + pub(crate) pool: Pool, } impl Storage { @@ -109,6 +111,66 @@ impl Storage { .execute(&self.pool) .await?; + if let Err(e) = Self::init_scheduler_schema(&self.pool).await { + tracing::warn!("Failed to init scheduler schema (tables may already exist): {}", e); + } + + Ok(()) + } + + /// Initialize the scheduler tables (idempotent). + pub(crate) async fn init_scheduler_schema(pool: &Pool) -> Result<(), StorageError> { + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS scheduled_jobs ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + schedule TEXT NOT NULL, + prompt TEXT NOT NULL, + channel TEXT NOT NULL, + chat_id TEXT NOT NULL, + model TEXT, + enabled INTEGER NOT NULL DEFAULT 1, + delete_after_run INTEGER NOT NULL DEFAULT 0, + next_run_at INTEGER NOT NULL, + last_run_at INTEGER, + last_status TEXT, + last_error TEXT, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ) + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + r#" + CREATE TABLE IF NOT EXISTS job_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + job_id TEXT NOT NULL REFERENCES scheduled_jobs(id) ON DELETE CASCADE, + started_at INTEGER NOT NULL, + finished_at INTEGER NOT NULL, + status TEXT NOT NULL, + output TEXT, + error TEXT, + duration_ms INTEGER NOT NULL + ) + "#, + ) + .execute(pool) + .await?; + + sqlx::query( + "CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(enabled, next_run_at)", + ) + .execute(pool) + .await?; + + sqlx::query("CREATE INDEX IF NOT EXISTS idx_runs_job_id ON job_runs(job_id)") + .execute(pool) + .await?; + Ok(()) } diff --git a/src/storage/scheduler.rs b/src/storage/scheduler.rs new file mode 100644 index 0000000..999b294 --- /dev/null +++ b/src/storage/scheduler.rs @@ -0,0 +1,551 @@ +use serde::{Deserialize, Serialize}; +use sqlx::Row; + +use crate::scheduler::Schedule; + +/// A scheduled job stored in the database. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ScheduledJob { + pub id: String, + pub name: String, + /// JSON-serialized `Schedule` stored as TEXT in SQLite. + pub schedule: Schedule, + pub prompt: String, + pub channel: String, + pub chat_id: String, + pub model: Option, + pub enabled: bool, + pub delete_after_run: bool, + pub next_run_at: i64, + pub last_run_at: Option, + pub last_status: Option, + pub last_error: Option, + pub created_at: i64, + pub updated_at: i64, +} + +/// A single execution record for a job. +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct JobRun { + pub id: i64, + pub job_id: String, + pub started_at: i64, + pub finished_at: i64, + pub status: String, + pub output: Option, + pub error: Option, + pub duration_ms: i64, +} + +impl crate::storage::Storage { + /// Insert a new scheduled job. + pub async fn add_scheduled_job(&self, job: &ScheduledJob) -> anyhow::Result<()> { + let schedule_json = serde_json::to_string(&job.schedule)?; + sqlx::query( + r#" + INSERT INTO scheduled_jobs + (id, name, schedule, prompt, channel, chat_id, model, + enabled, delete_after_run, next_run_at, last_run_at, + last_status, last_error, created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(&job.id) + .bind(&job.name) + .bind(&schedule_json) + .bind(&job.prompt) + .bind(&job.channel) + .bind(&job.chat_id) + .bind(&job.model) + .bind(job.enabled as i32) + .bind(job.delete_after_run as i32) + .bind(job.next_run_at) + .bind(job.last_run_at) + .bind(&job.last_status) + .bind(&job.last_error) + .bind(job.created_at) + .bind(job.updated_at) + .execute(self.pool()) + .await?; + Ok(()) + } + + /// Fetch a single scheduled job by ID. + pub async fn get_scheduled_job(&self, id: &str) -> anyhow::Result { + let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?") + .bind(id) + .fetch_optional(self.pool()) + .await? + .ok_or_else(|| anyhow::anyhow!("job not found: {id}"))?; + row_to_job(&row) + } + + /// List all scheduled jobs, ordered by next_run_at ascending. + pub async fn list_scheduled_jobs(&self) -> anyhow::Result> { + let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC") + .fetch_all(self.pool()) + .await?; + rows.iter().map(row_to_job).collect() + } + + /// Delete a scheduled job (cascades to job_runs). + pub async fn remove_scheduled_job(&self, id: &str) -> anyhow::Result<()> { + sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?") + .bind(id) + .execute(self.pool()) + .await?; + Ok(()) + } + + /// Enable or disable a scheduled job. + pub async fn set_scheduled_job_enabled(&self, id: &str, enabled: bool) -> anyhow::Result<()> { + sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?") + .bind(enabled as i32) + .bind(now_ms()) + .bind(id) + .execute(self.pool()) + .await?; + Ok(()) + } + + /// Update selective fields on a scheduled job. + pub async fn update_scheduled_job( + &self, + id: &str, + prompt: Option, + schedule: Option, + channel: Option, + chat_id: Option, + model: Option, + ) -> anyhow::Result<()> { + let now = now_ms(); + + if let Some(p) = prompt { + sqlx::query("UPDATE scheduled_jobs SET prompt = ?, updated_at = ? WHERE id = ?") + .bind(&p) + .bind(now) + .bind(id) + .execute(self.pool()) + .await?; + } + if let Some(s) = schedule { + let json = serde_json::to_string(&s)?; + sqlx::query("UPDATE scheduled_jobs SET schedule = ?, updated_at = ? WHERE id = ?") + .bind(&json) + .bind(now) + .bind(id) + .execute(self.pool()) + .await?; + } + if let Some(c) = channel { + sqlx::query("UPDATE scheduled_jobs SET channel = ?, updated_at = ? WHERE id = ?") + .bind(&c) + .bind(now) + .bind(id) + .execute(self.pool()) + .await?; + } + if let Some(c) = chat_id { + sqlx::query("UPDATE scheduled_jobs SET chat_id = ?, updated_at = ? WHERE id = ?") + .bind(&c) + .bind(now) + .bind(id) + .execute(self.pool()) + .await?; + } + if let Some(m) = model { + sqlx::query("UPDATE scheduled_jobs SET model = ?, updated_at = ? WHERE id = ?") + .bind(&m) + .bind(now) + .bind(id) + .execute(self.pool()) + .await?; + } + Ok(()) + } + + /// Update next_run_at and last_run_at for a job. + pub async fn set_scheduled_job_next_run(&self, id: &str, next_run_at: i64) -> anyhow::Result<()> { + let now = now_ms(); + sqlx::query( + "UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?", + ) + .bind(next_run_at) + .bind(now) + .bind(now) + .bind(id) + .execute(self.pool()) + .await?; + Ok(()) + } + + /// Set last_run_at for a job (used when starting execution). + pub async fn touch_scheduled_job_last_run(&self, id: &str, at: i64) -> anyhow::Result<()> { + sqlx::query("UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?") + .bind(at) + .bind(at) + .bind(id) + .execute(self.pool()) + .await?; + Ok(()) + } + + /// Set last_status and last_error after job completion. + pub async fn set_scheduled_job_last_status( + &self, + id: &str, + status: &str, + error: Option<&str>, + ) -> anyhow::Result<()> { + let now = now_ms(); + sqlx::query( + "UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?", + ) + .bind(status) + .bind(error) + .bind(now) + .bind(id) + .execute(self.pool()) + .await?; + Ok(()) + } + + /// Fetch enabled jobs whose next_run_at <= now, up to `limit`. + pub async fn due_scheduled_jobs( + &self, + now: i64, + limit: usize, + ) -> anyhow::Result> { + let rows = sqlx::query( + "SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?", + ) + .bind(now) + .bind(limit as i64) + .fetch_all(self.pool()) + .await?; + rows.iter().map(row_to_job).collect() + } + + /// Record a job execution run. + pub async fn record_scheduled_job_run(&self, run: &JobRun) -> anyhow::Result<()> { + sqlx::query( + r#" + INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms) + VALUES (?, ?, ?, ?, ?, ?, ?) + "#, + ) + .bind(&run.job_id) + .bind(run.started_at) + .bind(run.finished_at) + .bind(&run.status) + .bind(&run.output) + .bind(&run.error) + .bind(run.duration_ms) + .execute(self.pool()) + .await?; + Ok(()) + } + + /// List recent runs for a job, newest first. + pub async fn list_scheduled_job_runs( + &self, + job_id: &str, + limit: usize, + ) -> anyhow::Result> { + let rows = sqlx::query( + "SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?", + ) + .bind(job_id) + .bind(limit as i64) + .fetch_all(self.pool()) + .await?; + rows.iter() + .map(|r| { + Ok(JobRun { + id: r.try_get("id")?, + job_id: r.try_get("job_id")?, + started_at: r.try_get("started_at")?, + finished_at: r.try_get("finished_at")?, + status: r.try_get("status")?, + output: r.try_get("output")?, + error: r.try_get("error")?, + duration_ms: r.try_get("duration_ms")?, + }) + }) + .collect() + } + + /// Delete disabled jobs whose updated_at is before `before`. + pub async fn cleanup_disabled_scheduled_jobs(&self, before: i64) -> anyhow::Result<()> { + sqlx::query("DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?") + .bind(before) + .execute(self.pool()) + .await?; + Ok(()) + } +} + +fn now_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64 +} + +fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> anyhow::Result { + let schedule_json: String = row.try_get("schedule")?; + let schedule: Schedule = serde_json::from_str(&schedule_json)?; + Ok(ScheduledJob { + id: row.try_get("id")?, + name: row.try_get("name")?, + schedule, + prompt: row.try_get("prompt")?, + channel: row.try_get("channel")?, + chat_id: row.try_get("chat_id")?, + model: row.try_get("model")?, + enabled: row.try_get::("enabled")? != 0, + delete_after_run: row.try_get::("delete_after_run")? != 0, + next_run_at: row.try_get("next_run_at")?, + last_run_at: row.try_get("last_run_at")?, + last_status: row.try_get("last_status")?, + last_error: row.try_get("last_error")?, + created_at: row.try_get("created_at")?, + updated_at: row.try_get("updated_at")?, + }) +} + +#[cfg(test)] +mod tests { + use super::ScheduledJob; + use crate::scheduler::Schedule; + use crate::storage::Storage; + use sqlx::SqlitePool; + + fn now() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as i64 + } + + async fn setup_storage() -> Storage { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + let storage = Storage { pool }; + Storage::init_scheduler_schema(storage.pool()).await.unwrap(); + storage + } + + #[tokio::test] + async fn test_init_creates_tables() { + let storage = setup_storage().await; + let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scheduled_jobs") + .fetch_one(storage.pool()) + .await + .unwrap(); + assert_eq!(row.0, 0); + } + + #[tokio::test] + async fn test_add_and_get_job() { + let storage = setup_storage().await; + let t = now(); + let job = ScheduledJob { + id: "job-1".into(), + name: "test job".into(), + schedule: Schedule::Every { every_ms: 3600000 }, + prompt: "say hello".into(), + channel: "cli_chat".into(), + chat_id: "conn-1".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t + 3600000, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + storage.add_scheduled_job(&job).await.unwrap(); + let got = storage.get_scheduled_job("job-1").await.unwrap(); + assert_eq!(got.id, "job-1"); + assert_eq!(got.name, "test job"); + assert_eq!(got.prompt, "say hello"); + } + + #[tokio::test] + async fn test_list_jobs() { + let storage = setup_storage().await; + let t = now(); + for i in 0..3 { + let job = ScheduledJob { + id: format!("job-{}", i), + name: format!("job {}", i), + schedule: Schedule::Every { every_ms: 3600000 }, + prompt: "ping".into(), + channel: "cli_chat".into(), + chat_id: "conn-1".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t + 1000, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + storage.add_scheduled_job(&job).await.unwrap(); + } + let jobs = storage.list_scheduled_jobs().await.unwrap(); + assert_eq!(jobs.len(), 3); + } + + #[tokio::test] + async fn test_remove_job() { + let storage = setup_storage().await; + let t = now(); + let job = ScheduledJob { + id: "job-rm".into(), + name: "remove me".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + storage.add_scheduled_job(&job).await.unwrap(); + storage.remove_scheduled_job("job-rm").await.unwrap(); + let result = storage.get_scheduled_job("job-rm").await; + assert!(result.is_err()); + } + + #[tokio::test] + async fn test_set_enabled() { + let storage = setup_storage().await; + let t = now(); + let job = ScheduledJob { + id: "job-toggle".into(), + name: "toggle".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + storage.add_scheduled_job(&job).await.unwrap(); + storage.set_scheduled_job_enabled("job-toggle", false).await.unwrap(); + let got = storage.get_scheduled_job("job-toggle").await.unwrap(); + assert!(!got.enabled); + } + + #[tokio::test] + async fn test_due_jobs_only_returns_enabled_and_overdue() { + let storage = setup_storage().await; + let t = now(); + let jobs = vec![ + ScheduledJob { + id: "due".into(), name: "due".into(), + schedule: Schedule::At { at: t }, prompt: "1".into(), + channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t - 1000, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }, + ScheduledJob { + id: "future".into(), name: "future".into(), + schedule: Schedule::At { at: t + 99999999 }, prompt: "2".into(), + channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t + 99999999, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }, + ScheduledJob { + id: "disabled-due".into(), name: "disabled due".into(), + schedule: Schedule::At { at: t }, prompt: "3".into(), + channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: false, delete_after_run: false, + next_run_at: t - 1000, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }, + ]; + for j in &jobs { + storage.add_scheduled_job(j).await.unwrap(); + } + let due = storage.due_scheduled_jobs(t, 10).await.unwrap(); + assert_eq!(due.len(), 1); + assert_eq!(due[0].id, "due"); + } + + #[tokio::test] + async fn test_record_run_and_list_runs() { + let storage = setup_storage().await; + let t = now(); + let job = ScheduledJob { + id: "job-run".into(), name: "run test".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(), + model: None, enabled: true, delete_after_run: false, + next_run_at: t, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + storage.add_scheduled_job(&job).await.unwrap(); + + let run = super::JobRun { + id: 0, job_id: "job-run".into(), + started_at: t, finished_at: t + 500, + status: "ok".into(), output: Some("hello".into()), + error: None, duration_ms: 500, + }; + storage.record_scheduled_job_run(&run).await.unwrap(); + let runs = storage.list_scheduled_job_runs("job-run", 10).await.unwrap(); + assert_eq!(runs.len(), 1); + assert_eq!(runs[0].status, "ok"); + assert_eq!(runs[0].output.as_deref(), Some("hello")); + } + + #[tokio::test] + async fn test_update_job() { + let storage = setup_storage().await; + let t = now(); + let job = ScheduledJob { + id: "job-update".into(), name: "old name".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "old prompt".into(), channel: "feishu".into(), + chat_id: "oc_1".into(), model: None, + enabled: true, delete_after_run: false, + next_run_at: t, last_run_at: None, + last_status: None, last_error: None, + created_at: t, updated_at: t, + }; + storage.add_scheduled_job(&job).await.unwrap(); + storage.update_scheduled_job( + "job-update", + Some("new prompt".into()), + Some(Schedule::Every { every_ms: 60000 }), + None, None, None, + ).await.unwrap(); + let got = storage.get_scheduled_job("job-update").await.unwrap(); + assert_eq!(got.prompt, "new prompt"); + } +} diff --git a/src/scheduler/tools.rs b/src/tools/cron.rs similarity index 85% rename from src/scheduler/tools.rs rename to src/tools/cron.rs index 93a8f74..792e64e 100644 --- a/src/scheduler/tools.rs +++ b/src/tools/cron.rs @@ -1,11 +1,11 @@ +use std::sync::Arc; + use async_trait::async_trait; use serde_json::{json, Value}; -use sqlx::SqlitePool; use uuid::Uuid; -use crate::scheduler::next_run_for_schedule; -use crate::scheduler::store::SchedulerStore; -use crate::scheduler::types::{Schedule, ScheduledJob}; +use crate::scheduler::{next_run_for_schedule, Schedule}; +use crate::storage::{ScheduledJob, Storage}; use crate::tools::traits::{Tool, ToolResult}; fn now_ms() -> i64 { @@ -15,17 +15,15 @@ fn now_ms() -> i64 { .as_millis() as i64 } -// ── CronAddTool ────────────────────────────────────────────────────────────── - pub struct CronAddTool { - pool: SqlitePool, + storage: Arc, valid_channels: Vec, } impl CronAddTool { - pub fn new(pool: SqlitePool, valid_channels: Vec) -> Self { + pub fn new(storage: Arc, valid_channels: Vec) -> Self { Self { - pool, + storage, valid_channels, } } @@ -163,7 +161,7 @@ impl Tool for CronAddTool { updated_at: now, }; - SchedulerStore::add_job(&self.pool, &job).await?; + self.storage.add_scheduled_job(&job).await?; Ok(ToolResult { success: true, @@ -179,12 +177,12 @@ impl Tool for CronAddTool { // ── CronListTool ───────────────────────────────────────────────────────────── pub struct CronListTool { - pool: SqlitePool, + storage: Arc, } impl CronListTool { - pub fn new(pool: SqlitePool) -> Self { - Self { pool } + pub fn new(storage: Arc) -> Self { + Self { storage } } } @@ -220,7 +218,7 @@ impl Tool for CronListTool { .get("status") .and_then(|v| v.as_str()) .unwrap_or("all"); - let jobs = SchedulerStore::list_jobs(&self.pool).await?; + let jobs = self.storage.list_scheduled_jobs().await?; let filtered: Vec<&ScheduledJob> = match filter { "enabled" => jobs.iter().filter(|j| j.enabled).collect(), @@ -262,12 +260,12 @@ impl Tool for CronListTool { // ── CronRemoveTool ─────────────────────────────────────────────────────────── pub struct CronRemoveTool { - pool: SqlitePool, + storage: Arc, } impl CronRemoveTool { - pub fn new(pool: SqlitePool) -> Self { - Self { pool } + pub fn new(storage: Arc) -> Self { + Self { storage } } } @@ -308,7 +306,7 @@ impl Tool for CronRemoveTool { }); } - match SchedulerStore::get_job(&self.pool, &job_id).await { + match self.storage.get_scheduled_job(&job_id).await { Ok(_) => {} Err(_) => { return Ok(ToolResult { @@ -319,7 +317,7 @@ impl Tool for CronRemoveTool { } } - SchedulerStore::remove_job(&self.pool, &job_id).await?; + self.storage.remove_scheduled_job(&job_id).await?; Ok(ToolResult { success: true, output: format!("Job {} deleted.", job_id), @@ -331,12 +329,12 @@ impl Tool for CronRemoveTool { // ── CronEnableTool ─────────────────────────────────────────────────────────── pub struct CronEnableTool { - pool: SqlitePool, + storage: Arc, } impl CronEnableTool { - pub fn new(pool: SqlitePool) -> Self { - Self { pool } + pub fn new(storage: Arc) -> Self { + Self { storage } } } @@ -377,14 +375,16 @@ impl Tool for CronEnableTool { }); } - let job = SchedulerStore::get_job(&self.pool, &job_id) + let job = self + .storage + .get_scheduled_job(&job_id) .await .map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?; let next = next_run_for_schedule(&job.schedule, now_ms()); - SchedulerStore::set_enabled(&self.pool, &job_id, true).await?; + self.storage.set_scheduled_job_enabled(&job_id, true).await?; if let Some(n) = next { - SchedulerStore::set_next_run(&self.pool, &job_id, n).await?; + self.storage.set_scheduled_job_next_run(&job_id, n).await?; } Ok(ToolResult { @@ -398,12 +398,12 @@ impl Tool for CronEnableTool { // ── CronDisableTool ────────────────────────────────────────────────────────── pub struct CronDisableTool { - pool: SqlitePool, + storage: Arc, } impl CronDisableTool { - pub fn new(pool: SqlitePool) -> Self { - Self { pool } + pub fn new(storage: Arc) -> Self { + Self { storage } } } @@ -444,10 +444,12 @@ impl Tool for CronDisableTool { }); } - let _ = SchedulerStore::get_job(&self.pool, &job_id) + let _ = self + .storage + .get_scheduled_job(&job_id) .await .map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?; - SchedulerStore::set_enabled(&self.pool, &job_id, false).await?; + self.storage.set_scheduled_job_enabled(&job_id, false).await?; Ok(ToolResult { success: true, @@ -460,12 +462,12 @@ impl Tool for CronDisableTool { // ── CronUpdateTool ─────────────────────────────────────────────────────────── pub struct CronUpdateTool { - pool: SqlitePool, + storage: Arc, } impl CronUpdateTool { - pub fn new(pool: SqlitePool) -> Self { - Self { pool } + pub fn new(storage: Arc) -> Self { + Self { storage } } } @@ -526,7 +528,9 @@ impl Tool for CronUpdateTool { }); } - let _ = SchedulerStore::get_job(&self.pool, &job_id) + let _ = self + .storage + .get_scheduled_job(&job_id) .await .map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?; @@ -554,13 +558,14 @@ impl Tool for CronUpdateTool { .and_then(|v| v.as_str()) .map(|s| s.to_string()); - SchedulerStore::update_job(&self.pool, &job_id, prompt, schedule, channel, chat_id, model) + self.storage + .update_scheduled_job(&job_id, prompt, schedule, channel, chat_id, model) .await?; if args.get("schedule").is_some() { - let job = SchedulerStore::get_job(&self.pool, &job_id).await?; + let job = self.storage.get_scheduled_job(&job_id).await?; if let Some(next) = next_run_for_schedule(&job.schedule, now_ms()) { - SchedulerStore::set_next_run(&self.pool, &job_id, next).await?; + self.storage.set_scheduled_job_next_run(&job_id, next).await?; } } @@ -575,15 +580,15 @@ impl Tool for CronUpdateTool { #[cfg(test)] mod tests { use super::*; - use crate::scheduler::store::SchedulerStore; - use crate::scheduler::types::{Schedule, ScheduledJob}; + use crate::scheduler::Schedule; + use crate::storage::{ScheduledJob, Storage}; use serde_json::json; use sqlx::SqlitePool; - async fn setup_pool() -> SqlitePool { + async fn setup_storage() -> Arc { let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); - SchedulerStore::init(&pool).await.unwrap(); - pool + Storage::init_scheduler_schema(&pool).await.unwrap(); + Arc::new(Storage { pool }) } fn now() -> i64 { @@ -595,8 +600,8 @@ mod tests { #[tokio::test] async fn test_cron_add_tool() { - let pool = setup_pool().await; - let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]); + let storage = setup_storage().await; + let tool = CronAddTool::new(storage.clone(), vec!["cli_chat".to_string()]); let result = tool .execute(json!({ "schedule": {"type": "every", "every_ms": 3600000}, @@ -610,15 +615,15 @@ mod tests { assert!(result.success); assert!(result.output.contains("hourly report")); - let jobs = SchedulerStore::list_jobs(&pool).await.unwrap(); + let jobs = storage.list_scheduled_jobs().await.unwrap(); assert_eq!(jobs.len(), 1); assert_eq!(jobs[0].name, "hourly report"); } #[tokio::test] async fn test_cron_add_invalid_channel() { - let pool = setup_pool().await; - let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]); + let storage = setup_storage().await; + let tool = CronAddTool::new(storage.clone(), vec!["cli_chat".to_string()]); let result = tool .execute(json!({ "schedule": {"type": "every", "every_ms": 3600000}, @@ -635,7 +640,7 @@ mod tests { #[tokio::test] async fn test_cron_list_tool() { - let pool = setup_pool().await; + let storage = setup_storage().await; let t = now(); let job = ScheduledJob { id: uuid::Uuid::new_v4().to_string(), @@ -654,9 +659,9 @@ mod tests { created_at: t, updated_at: t, }; - SchedulerStore::add_job(&pool, &job).await.unwrap(); + storage.add_scheduled_job(&job).await.unwrap(); - let tool = CronListTool::new(pool.clone()); + let tool = CronListTool::new(storage.clone()); let result = tool.execute(json!({})).await.unwrap(); assert!(result.success); assert!(result.output.contains("list-test")); @@ -664,7 +669,7 @@ mod tests { #[tokio::test] async fn test_cron_remove_tool() { - let pool = setup_pool().await; + let storage = setup_storage().await; let t = now(); let job = ScheduledJob { id: "job-rm-tool".into(), @@ -683,22 +688,20 @@ mod tests { created_at: t, updated_at: t, }; - SchedulerStore::add_job(&pool, &job).await.unwrap(); + storage.add_scheduled_job(&job).await.unwrap(); - let tool = CronRemoveTool::new(pool.clone()); + let tool = CronRemoveTool::new(storage.clone()); let result = tool .execute(json!({"job_id": "job-rm-tool"})) .await .unwrap(); assert!(result.success); - assert!(SchedulerStore::get_job(&pool, "job-rm-tool") - .await - .is_err()); + assert!(storage.get_scheduled_job("job-rm-tool").await.is_err()); } #[tokio::test] async fn test_cron_enable_disable_tools() { - let pool = setup_pool().await; + let storage = setup_storage().await; let t = now(); let job = ScheduledJob { id: "job-toggle-tool".into(), @@ -717,36 +720,32 @@ mod tests { created_at: t, updated_at: t, }; - SchedulerStore::add_job(&pool, &job).await.unwrap(); + storage.add_scheduled_job(&job).await.unwrap(); - let disable_tool = CronDisableTool::new(pool.clone()); + let disable_tool = CronDisableTool::new(storage.clone()); let result = disable_tool .execute(json!({"job_id": "job-toggle-tool"})) .await .unwrap(); assert!(result.success); - let got = SchedulerStore::get_job(&pool, "job-toggle-tool") - .await - .unwrap(); + let got = storage.get_scheduled_job("job-toggle-tool").await.unwrap(); assert!(!got.enabled); - let enable_tool = CronEnableTool::new(pool.clone()); + let enable_tool = CronEnableTool::new(storage.clone()); let result = enable_tool .execute(json!({"job_id": "job-toggle-tool"})) .await .unwrap(); assert!(result.success); - let got = SchedulerStore::get_job(&pool, "job-toggle-tool") - .await - .unwrap(); + let got = storage.get_scheduled_job("job-toggle-tool").await.unwrap(); assert!(got.enabled); } #[tokio::test] async fn test_cron_update_tool() { - let pool = setup_pool().await; + let storage = setup_storage().await; let t = now(); let job = ScheduledJob { id: "job-update-tool".into(), @@ -767,9 +766,9 @@ mod tests { created_at: t, updated_at: t, }; - SchedulerStore::add_job(&pool, &job).await.unwrap(); + storage.add_scheduled_job(&job).await.unwrap(); - let tool = CronUpdateTool::new(pool.clone()); + let tool = CronUpdateTool::new(storage.clone()); let result = tool .execute(json!({ "job_id": "job-update-tool", @@ -780,9 +779,7 @@ mod tests { .unwrap(); assert!(result.success); - let got = SchedulerStore::get_job(&pool, "job-update-tool") - .await - .unwrap(); + let got = storage.get_scheduled_job("job-update-tool").await.unwrap(); assert_eq!(got.prompt, "new prompt"); } } diff --git a/src/tools/mod.rs b/src/tools/mod.rs index 11a37be..a007f5b 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -1,5 +1,6 @@ pub mod bash; pub mod calculator; +pub mod cron; pub mod file_edit; pub mod file_read; pub mod file_write;