PicoBot/docs/superpowers/plans/2026-05-04-scheduled-tasks.md
xiaoski 62f4326131 refactor: move scheduler store to storage module, cron tools to tools module
- storage/scheduler.rs: ScheduledJob/JobRun types + CRUD on Storage
- tools/cron.rs: 6 cron agent tools (add/list/remove/enable/disable/update)
- scheduler/types.rs: keep only Schedule enum
- scheduler/mod.rs: use Arc<Storage> instead of raw SqlitePool
- gateway/mod.rs: inject Storage directly, replace pool field
- storage/mod.rs: scheduler tables in init_schema
2026-05-05 00:49:54 +08:00

78 KiB

Scheduled Tasks (Cron Jobs) Implementation Plan

For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (- [ ]) syntax for tracking.

Goal: Add a cron-like scheduled task system to PicoBot that triggers agent LLM prompts on a schedule (cron expression / fixed interval / one-shot), with results delivered to channels.

Architecture: A new src/scheduler/ module with its own SQLite store (sharing the existing sqlx::SqlitePool via storage.pool()). The scheduler runs as a tokio background task, calling SessionManager::handle_cron_message() directly. Agent-facing tools (cron_add, cron_list, etc.) let the LLM manage jobs. Schedule computation uses the cron and chrono-tz crates.

Tech Stack: Rust, tokio, sqlx, cron 0.15, chrono-tz 0.10


File Structure

File Create/Modify Responsibility
Cargo.toml Modify Add cron, chrono-tz dependencies
src/lib.rs Modify Add pub mod scheduler;
src/config/mod.rs Modify Add SchedulerConfig to GatewayConfig
src/scheduler/types.rs Create Schedule, ScheduledJob, JobRun data types
src/scheduler/store.rs Create SQLite schema + CRUD for scheduled_jobs and job_runs
src/scheduler/mod.rs Create Scheduler struct, run() loop, next_run_for_schedule()
src/scheduler/tools.rs Create 6 agent tools: cron_add/list/remove/enable/disable/update
src/bus/message.rs Modify Add ChatMessage::user_with_source() factory
src/session/session.rs Modify Add handle_cron_message() method
src/gateway/mod.rs Modify Create Scheduler, spawn background task, register cron tools
src/session/session_id.rs Modify Add from_components() convenience constructor

Task 1: Add Cron Dependencies

Files:

  • Modify: Cargo.toml

  • Step 1: Add cron and chrono-tz to dependencies

After line 28 (tempfile = "3"), insert:

cron = "0.15"
chrono-tz = "0.10"
  • Step 2: Verify build

Run: cargo check 2>&1 Expected: Dependencies download and resolve. No code referencing them yet, so no compile errors.

  • Step 3: Commit
git add Cargo.toml Cargo.lock
git commit -m "deps: add cron and chrono-tz for scheduled tasks"

Task 2: Add SchedulerConfig

Files:

  • Modify: src/config/mod.rs:143 (after existing session_db_path field)

  • Step 1: Define SchedulerConfig struct

Add after the closing } of GatewayConfig (after line 143, before the ClientConfig block):

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchedulerConfig {
    /// Whether the scheduler is enabled
    #[serde(default = "default_scheduler_enabled")]
    pub enabled: bool,
    /// Poll interval in seconds (how often to check for due jobs)
    #[serde(default = "default_poll_interval_secs")]
    pub poll_interval_secs: u64,
    /// Maximum concurrent job executions (currently sequential, reserved for future)
    #[serde(default = "default_max_concurrent")]
    pub max_concurrent: usize,
}

fn default_scheduler_enabled() -> bool { true }

fn default_poll_interval_secs() -> u64 { 60 }

fn default_max_concurrent() -> usize { 1 }

impl Default for SchedulerConfig {
    fn default() -> Self {
        Self {
            enabled: true,
            poll_interval_secs: 60,
            max_concurrent: 1,
        }
    }
}
  • Step 2: Add scheduler field to GatewayConfig

In GatewayConfig (line 132-143), add after session_db_path:

#[serde(default)]
pub scheduler: Option<SchedulerConfig>,

The full struct becomes:

#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GatewayConfig {
    #[serde(default = "default_gateway_host")]
    pub host: String,
    #[serde(default = "default_gateway_port")]
    pub port: u16,
    #[serde(default, rename = "session_ttl_hours")]
    pub session_ttl_hours: Option<u64>,
    #[serde(default, rename = "cleanup_interval_minutes")]
    pub cleanup_interval_minutes: Option<u64>,
    #[serde(default, rename = "session_db_path")]
    pub session_db_path: Option<String>,
    #[serde(default)]
    pub scheduler: Option<SchedulerConfig>,
}
  • Step 3: Update Default for GatewayConfig

In the impl Default for GatewayConfig block (around line 163), add:

scheduler: None,
  • Step 4: Verify build

Run: cargo check 2>&1 Expected: Compiles successfully.

  • Step 5: Commit
git add src/config/mod.rs
git commit -m "feat: add SchedulerConfig to GatewayConfig"

Task 3: Create Scheduler Data Types

Files:

  • Create: src/scheduler/types.rs

  • Modify: src/scheduler/mod.rs (stub)

  • Modify: src/lib.rs

  • Step 1: Register the scheduler module

In src/lib.rs, after pub mod providers; (line 14), add:

pub mod scheduler;
  • Step 2: Create stub src/scheduler/mod.rs
pub mod types;
pub mod store;
pub mod tools;

pub use types::{JobRun, Schedule, ScheduledJob};
  • Step 3: Define types in src/scheduler/types.rs
use serde::{Deserialize, Serialize};

/// How a job is scheduled. Serialized as JSON in the database `schedule` column.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Schedule {
    /// One-shot: fires once at a specific Unix millisecond timestamp, then disables.
    #[serde(rename = "at")]
    At { at: i64 },
    /// Recurring: fires every `every_ms` milliseconds.
    #[serde(rename = "every")]
    Every { every_ms: u64 },
    /// Recurring: fires on a cron schedule with optional timezone.
    #[serde(rename = "cron")]
    Cron { expr: String, tz: Option<String> },
}

/// A scheduled job stored in the database.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledJob {
    pub id: String,
    pub name: String,
    /// JSON-serialized `Schedule` stored as TEXT in SQLite.
    pub schedule: Schedule,
    pub prompt: String,
    pub channel: String,
    pub chat_id: String,
    pub model: Option<String>,
    pub enabled: bool,
    pub delete_after_run: bool,
    pub next_run_at: i64,
    pub last_run_at: Option<i64>,
    pub last_status: Option<String>,
    pub last_error: Option<String>,
    pub created_at: i64,
    pub updated_at: i64,
}

/// A single execution record for a job.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobRun {
    pub id: i64,
    pub job_id: String,
    pub started_at: i64,
    pub finished_at: i64,
    pub status: String,
    pub output: Option<String>,
    pub error: Option<String>,
    pub duration_ms: i64,
}
  • Step 4: Verify build

Run: cargo check 2>&1 Expected: Compiles successfully.

  • Step 5: Commit
git add src/lib.rs src/scheduler/
git commit -m "feat: add scheduler data types (Schedule, ScheduledJob, JobRun)"

Task 4: Add ChatMessage::user_with_source and Session::create_user_message_with_source

Files:

  • Modify: src/bus/message.rs:198 (after pub fn tool(...))

  • Modify: src/session/session.rs:252 (after create_user_message)

  • Step 1: Write the failing test

Create inline test in src/bus/message.rs, immediately before the closing } of the impl ChatMessage block (after line 197):

    pub fn user_with_source(content: impl Into<String>, source: MessageSource) -> Self {
        Self {
            id: uuid::Uuid::new_v4().to_string(),
            role: "user".to_string(),
            content: content.into(),
            media_refs: Vec::new(),
            timestamp: current_timestamp(),
            tool_call_id: None,
            tool_name: None,
            tool_calls: None,
            source: Some(source),
        }
    }

No test for this factory method separately — it's pure data construction.

  • Step 2: Write the failing test for create_user_message_with_source

In src/session/session.rs, after create_user_message (line 252), add this test:

    pub fn create_user_message_with_source(&self, content: &str, media_refs: Vec<String>, source: crate::bus::MessageSource) -> ChatMessage {
        if media_refs.is_empty() {
            ChatMessage::user_with_source(content, source)
        } else {
            // For simplicity, ignore media in cron messages (media is always empty from scheduler)
            ChatMessage::user_with_source(content, source)
        }
    }

No test for this separately — it's used in handle_cron_message which gets tested via integration.

  • Step 3: Verify build

Run: cargo check 2>&1 Expected: Compiles successfully.

  • Step 4: Commit
git add src/bus/message.rs src/session/session.rs
git commit -m "feat: add ChatMessage::user_with_source and Session::create_user_message_with_source"

Task 5: Create SchedulerStore (SQLite Schema + CRUD)

Files:

  • Create: src/scheduler/store.rs

  • Step 1: Write the failing test

Write inline tests at the bottom of src/scheduler/store.rs. These test SchedulerStore against an in-memory SQLite database:

#[cfg(test)]
mod tests {
    use super::*;
    use crate::scheduler::types::{Schedule, ScheduledJob, JobRun};
    use sqlx::SqlitePool;

    fn now() -> i64 { 
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_millis() as i64 
    }

    async fn setup_pool() -> SqlitePool {
        let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
        SchedulerStore::init(&pool).await.unwrap();
        pool
    }

    #[tokio::test]
    async fn test_init_creates_tables() {
        let pool = setup_pool().await;
        let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scheduled_jobs")
            .fetch_one(&pool).await.unwrap();
        assert_eq!(row.0, 0);
    }

    #[tokio::test]
    async fn test_add_and_get_job() {
        let pool = setup_pool().await;
        let t = now();
        let job = ScheduledJob {
            id: "job-1".into(),
            name: "test job".into(),
            schedule: Schedule::Every { every_ms: 3600000 },
            prompt: "say hello".into(),
            channel: "cli_chat".into(),
            chat_id: "conn-1".into(),
            model: None,
            enabled: true,
            delete_after_run: false,
            next_run_at: t + 3600000,
            last_run_at: None,
            last_status: None,
            last_error: None,
            created_at: t,
            updated_at: t,
        };
        SchedulerStore::add_job(&pool, &job).await.unwrap();
        let got = SchedulerStore::get_job(&pool, "job-1").await.unwrap();
        assert_eq!(got.id, "job-1");
        assert_eq!(got.name, "test job");
        assert_eq!(got.prompt, "say hello");
    }

    #[tokio::test]
    async fn test_list_jobs() {
        let pool = setup_pool().await;
        let t = now();
        for i in 0..3 {
            let job = ScheduledJob {
                id: format!("job-{}", i),
                name: format!("job {}", i),
                schedule: Schedule::Every { every_ms: 3600000 },
                prompt: "ping".into(),
                channel: "cli_chat".into(),
                chat_id: "conn-1".into(),
                model: None, enabled: true, delete_after_run: false,
                next_run_at: t + 1000, last_run_at: None,
                last_status: None, last_error: None,
                created_at: t, updated_at: t,
            };
            SchedulerStore::add_job(&pool, &job).await.unwrap();
        }
        let jobs = SchedulerStore::list_jobs(&pool).await.unwrap();
        assert_eq!(jobs.len(), 3);
    }

    #[tokio::test]
    async fn test_remove_job() {
        let pool = setup_pool().await;
        let t = now();
        let job = ScheduledJob {
            id: "job-rm".into(), name: "remove me".into(),
            schedule: Schedule::Every { every_ms: 1000 },
            prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
            model: None, enabled: true, delete_after_run: false,
            next_run_at: t, last_run_at: None,
            last_status: None, last_error: None,
            created_at: t, updated_at: t,
        };
        SchedulerStore::add_job(&pool, &job).await.unwrap();
        SchedulerStore::remove_job(&pool, "job-rm").await.unwrap();
        let result = SchedulerStore::get_job(&pool, "job-rm").await;
        assert!(result.is_err());
    }

    #[tokio::test]
    async fn test_set_enabled() {
        let pool = setup_pool().await;
        let t = now();
        let job = ScheduledJob {
            id: "job-toggle".into(), name: "toggle".into(),
            schedule: Schedule::Every { every_ms: 1000 },
            prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
            model: None, enabled: true, delete_after_run: false,
            next_run_at: t, last_run_at: None,
            last_status: None, last_error: None,
            created_at: t, updated_at: t,
        };
        SchedulerStore::add_job(&pool, &job).await.unwrap();
        SchedulerStore::set_enabled(&pool, "job-toggle", false).await.unwrap();
        let got = SchedulerStore::get_job(&pool, "job-toggle").await.unwrap();
        assert!(!got.enabled);
    }

    #[tokio::test]
    async fn test_due_jobs_only_returns_enabled_and_overdue() {
        let pool = setup_pool().await;
        let t = now();
        // Three jobs: due, not-due-yet, disabled-but-due
        let jobs = vec![
            ScheduledJob {
                id: "due".into(), name: "due".into(),
                schedule: Schedule::At { at: t }, prompt: "1".into(),
                channel: "cli_chat".into(), chat_id: "c".into(),
                model: None, enabled: true, delete_after_run: false,
                next_run_at: t - 1000, last_run_at: None,
                last_status: None, last_error: None,
                created_at: t, updated_at: t,
            },
            ScheduledJob {
                id: "future".into(), name: "future".into(),
                schedule: Schedule::At { at: t + 99999999 }, prompt: "2".into(),
                channel: "cli_chat".into(), chat_id: "c".into(),
                model: None, enabled: true, delete_after_run: false,
                next_run_at: t + 99999999, last_run_at: None,
                last_status: None, last_error: None,
                created_at: t, updated_at: t,
            },
            ScheduledJob {
                id: "disabled-due".into(), name: "disabled due".into(),
                schedule: Schedule::At { at: t }, prompt: "3".into(),
                channel: "cli_chat".into(), chat_id: "c".into(),
                model: None, enabled: false, delete_after_run: false,
                next_run_at: t - 1000, last_run_at: None,
                last_status: None, last_error: None,
                created_at: t, updated_at: t,
            },
        ];
        for j in &jobs {
            SchedulerStore::add_job(&pool, j).await.unwrap();
        }
        let due = SchedulerStore::due_jobs(&pool, t, 10).await.unwrap();
        assert_eq!(due.len(), 1);
        assert_eq!(due[0].id, "due");
    }

    #[tokio::test]
    async fn test_record_run_and_list_runs() {
        let pool = setup_pool().await;
        let t = now();
        let job = ScheduledJob {
            id: "job-run".into(), name: "run test".into(),
            schedule: Schedule::Every { every_ms: 1000 },
            prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
            model: None, enabled: true, delete_after_run: false,
            next_run_at: t, last_run_at: None,
            last_status: None, last_error: None,
            created_at: t, updated_at: t,
        };
        SchedulerStore::add_job(&pool, &job).await.unwrap();

        let run = JobRun {
            id: 0, job_id: "job-run".into(),
            started_at: t, finished_at: t + 500,
            status: "ok".into(), output: Some("hello".into()),
            error: None, duration_ms: 500,
        };
        SchedulerStore::record_run(&pool, &run).await.unwrap();
        let runs = SchedulerStore::list_runs(&pool, "job-run", 10).await.unwrap();
        assert_eq!(runs.len(), 1);
        assert_eq!(runs[0].status, "ok");
        assert_eq!(runs[0].output.as_deref(), Some("hello"));
    }

    #[tokio::test]
    async fn test_update_job() {
        let pool = setup_pool().await;
        let t = now();
        let job = ScheduledJob {
            id: "job-update".into(), name: "old name".into(),
            schedule: Schedule::Every { every_ms: 1000 },
            prompt: "old prompt".into(), channel: "feishu".into(),
            chat_id: "oc_1".into(), model: None,
            enabled: true, delete_after_run: false,
            next_run_at: t, last_run_at: None,
            last_status: None, last_error: None,
            created_at: t, updated_at: t,
        };
        SchedulerStore::add_job(&pool, &job).await.unwrap();
        SchedulerStore::update_job(
            &pool, "job-update",
            Some("new prompt".into()),
            Some(Schedule::Every { every_ms: 60000 }),
            None, None, None,
        ).await.unwrap();
        let got = SchedulerStore::get_job(&pool, "job-update").await.unwrap();
        assert_eq!(got.prompt, "new prompt");
    }
}
  • Step 2: Run tests to verify they fail

Run: cargo test --lib scheduler::store -- 2>&1 Expected: All tests FAIL because SchedulerStore and its methods don't exist yet (compilation errors).

  • Step 3: Implement SchedulerStore

Write src/scheduler/store.rs:

use sqlx::Row;
use sqlx::SqlitePool;

use super::types::{JobRun, Schedule, ScheduledJob};

/// Persistence layer for scheduled jobs and run history.
/// Uses a shared `sqlx::SqlitePool` (obtained from `crate::storage::Storage::pool()`).
pub struct SchedulerStore;

impl SchedulerStore {
    /// Initialize the scheduler tables. Idempotent (CREATE TABLE IF NOT EXISTS).
    pub async fn init(pool: &SqlitePool) -> Result<(), Box<dyn std::error::Error>> {
        sqlx::query(
            r#"
            CREATE TABLE IF NOT EXISTS scheduled_jobs (
                id                TEXT PRIMARY KEY,
                name              TEXT NOT NULL,
                schedule          TEXT NOT NULL,
                prompt            TEXT NOT NULL,
                channel           TEXT NOT NULL,
                chat_id           TEXT NOT NULL,
                model             TEXT,
                enabled           INTEGER NOT NULL DEFAULT 1,
                delete_after_run  INTEGER NOT NULL DEFAULT 0,
                next_run_at       INTEGER NOT NULL,
                last_run_at       INTEGER,
                last_status       TEXT,
                last_error        TEXT,
                created_at        INTEGER NOT NULL,
                updated_at        INTEGER NOT NULL
            )
            "#,
        )
        .execute(pool)
        .await?;

        sqlx::query(
            r#"
            CREATE TABLE IF NOT EXISTS job_runs (
                id           INTEGER PRIMARY KEY AUTOINCREMENT,
                job_id       TEXT NOT NULL REFERENCES scheduled_jobs(id) ON DELETE CASCADE,
                started_at   INTEGER NOT NULL,
                finished_at  INTEGER NOT NULL,
                status       TEXT NOT NULL,
                output       TEXT,
                error        TEXT,
                duration_ms  INTEGER NOT NULL
            )
            "#,
        )
        .execute(pool)
        .await?;

        sqlx::query(
            "CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(enabled, next_run_at)",
        )
        .execute(pool)
        .await?;

        sqlx::query("CREATE INDEX IF NOT EXISTS idx_runs_job_id ON job_runs(job_id)")
            .execute(pool)
            .await?;

        Ok(())
    }

    /// Insert a new job. Returns an error if a job with the same ID already exists.
    pub async fn add_job(
        pool: &SqlitePool,
        job: &ScheduledJob,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let schedule_json = serde_json::to_string(&job.schedule)?;
        sqlx::query(
            r#"
            INSERT INTO scheduled_jobs
                (id, name, schedule, prompt, channel, chat_id, model,
                 enabled, delete_after_run, next_run_at, last_run_at,
                 last_status, last_error, created_at, updated_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
            "#,
        )
        .bind(&job.id)
        .bind(&job.name)
        .bind(&schedule_json)
        .bind(&job.prompt)
        .bind(&job.channel)
        .bind(&job.chat_id)
        .bind(&job.model)
        .bind(job.enabled as i32)
        .bind(job.delete_after_run as i32)
        .bind(job.next_run_at)
        .bind(job.last_run_at)
        .bind(&job.last_status)
        .bind(&job.last_error)
        .bind(job.created_at)
        .bind(job.updated_at)
        .execute(pool)
        .await?;
        Ok(())
    }

    /// Fetch a single job by ID.
    pub async fn get_job(
        pool: &SqlitePool,
        id: &str,
    ) -> Result<ScheduledJob, Box<dyn std::error::Error>> {
        let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?")
            .bind(id)
            .fetch_optional(pool)
            .await?
            .ok_or_else(|| format!("job not found: {id}"))?;
        Ok(row_to_job(&row)?)
    }

    /// List all jobs, ordered by next_run_at ascending.
    pub async fn list_jobs(
        pool: &SqlitePool,
    ) -> Result<Vec<ScheduledJob>, Box<dyn std::error::Error>> {
        let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC")
            .fetch_all(pool)
            .await?;
        rows.iter().map(row_to_job).collect()
    }

    /// Delete a job (cascades to job_runs).
    pub async fn remove_job(
        pool: &SqlitePool,
        id: &str,
    ) -> Result<(), Box<dyn std::error::Error>> {
        sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?")
            .bind(id)
            .execute(pool)
            .await?;
        Ok(())
    }

    /// Enable or disable a job.
    pub async fn set_enabled(
        pool: &SqlitePool,
        id: &str,
        enabled: bool,
    ) -> Result<(), Box<dyn std::error::Error>> {
        sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?")
            .bind(enabled as i32)
            .bind(now_ms())
            .bind(id)
            .execute(pool)
            .await?;
        Ok(())
    }

    /// Update selective fields on a job. Pass `None` for fields that should not change.
    #[allow(clippy::too_many_arguments)]
    pub async fn update_job(
        pool: &SqlitePool,
        id: &str,
        prompt: Option<String>,
        schedule: Option<Schedule>,
        channel: Option<String>,
        chat_id: Option<String>,
        model: Option<String>,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let now = now_ms();

        if let Some(p) = prompt {
            sqlx::query(
                "UPDATE scheduled_jobs SET prompt = ?, updated_at = ? WHERE id = ?",
            )
            .bind(&p)
            .bind(now)
            .bind(id)
            .execute(pool)
            .await?;
        }
        if let Some(s) = schedule {
            let json = serde_json::to_string(&s)?;
            sqlx::query(
                "UPDATE scheduled_jobs SET schedule = ?, updated_at = ? WHERE id = ?",
            )
            .bind(&json)
            .bind(now)
            .bind(id)
            .execute(pool)
            .await?;
        }
        if let Some(c) = channel {
            sqlx::query(
                "UPDATE scheduled_jobs SET channel = ?, updated_at = ? WHERE id = ?",
            )
            .bind(&c)
            .bind(now)
            .bind(id)
            .execute(pool)
            .await?;
        }
        if let Some(c) = chat_id {
            sqlx::query(
                "UPDATE scheduled_jobs SET chat_id = ?, updated_at = ? WHERE id = ?",
            )
            .bind(&c)
            .bind(now)
            .bind(id)
            .execute(pool)
            .await?;
        }
        if let Some(m) = model {
            sqlx::query(
                "UPDATE scheduled_jobs SET model = ?, updated_at = ? WHERE id = ?",
            )
            .bind(&m)
            .bind(now)
            .bind(id)
            .execute(pool)
            .await?;
        }
        Ok(())
    }

    /// Update next_run_at and last_run_at for a job (used during reschedule).
    pub async fn set_next_run(
        pool: &SqlitePool,
        id: &str,
        next_run_at: i64,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let now = now_ms();
        sqlx::query(
            "UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?",
        )
        .bind(next_run_at)
        .bind(now)
        .bind(now)
        .bind(id)
        .execute(pool)
        .await?;
        Ok(())
    }

    /// Set last_run_at and optionally last_status / last_error (used when starting job execution).
    pub async fn touch_last_run(
        pool: &SqlitePool,
        id: &str,
        at: i64,
    ) -> Result<(), Box<dyn std::error::Error>> {
        sqlx::query(
            "UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?",
        )
        .bind(at)
        .bind(at)
        .bind(id)
        .execute(pool)
        .await?;
        Ok(())
    }

    /// Set last_status and last_error after job completion.
    pub async fn set_last_status(
        pool: &SqlitePool,
        id: &str,
        status: &str,
        error: Option<&str>,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let now = now_ms();
        sqlx::query(
            "UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?",
        )
        .bind(status)
        .bind(error)
        .bind(now)
        .bind(id)
        .execute(pool)
        .await?;
        Ok(())
    }

    /// Fetch enabled jobs whose next_run_at <= now, up to `limit`.
    pub async fn due_jobs(
        pool: &SqlitePool,
        now: i64,
        limit: usize,
    ) -> Result<Vec<ScheduledJob>, Box<dyn std::error::Error>> {
        let rows = sqlx::query(
            "SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?",
        )
        .bind(now)
        .bind(limit as i64)
        .fetch_all(pool)
        .await?;
        rows.iter().map(row_to_job).collect()
    }

    /// Record a run execution.
    pub async fn record_run(
        pool: &SqlitePool,
        run: &JobRun,
    ) -> Result<(), Box<dyn std::error::Error>> {
        sqlx::query(
            r#"
            INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms)
            VALUES (?, ?, ?, ?, ?, ?, ?)
            "#,
        )
        .bind(&run.job_id)
        .bind(run.started_at)
        .bind(run.finished_at)
        .bind(&run.status)
        .bind(&run.output)
        .bind(&run.error)
        .bind(run.duration_ms)
        .execute(pool)
        .await?;
        Ok(())
    }

    /// List the most recent `limit` runs for a job, newest first.
    pub async fn list_runs(
        pool: &SqlitePool,
        job_id: &str,
        limit: usize,
    ) -> Result<Vec<JobRun>, Box<dyn std::error::Error>> {
        let rows = sqlx::query(
            "SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?",
        )
        .bind(job_id)
        .bind(limit as i64)
        .fetch_all(pool)
        .await?;
        rows.iter()
            .map(|r| {
                Ok(JobRun {
                    id: r.try_get("id")?,
                    job_id: r.try_get("job_id")?,
                    started_at: r.try_get("started_at")?,
                    finished_at: r.try_get("finished_at")?,
                    status: r.try_get("status")?,
                    output: r.try_get("output")?,
                    error: r.try_get("error")?,
                    duration_ms: r.try_get("duration_ms")?,
                })
            })
            .collect()
    }

    /// Delete jobs created before `before` that are disabled.
    pub async fn cleanup_disabled(
        pool: &SqlitePool,
        before: i64,
    ) -> Result<(), Box<dyn std::error::Error>> {
        sqlx::query(
            "DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?",
        )
        .bind(before)
        .execute(pool)
        .await?;
        Ok(())
    }
}

fn now_ms() -> i64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis() as i64
}

fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> Result<ScheduledJob, Box<dyn std::error::Error>> {
    let schedule_json: String = row.try_get("schedule")?;
    let schedule: Schedule = serde_json::from_str(&schedule_json)?;
    Ok(ScheduledJob {
        id: row.try_get("id")?,
        name: row.try_get("name")?,
        schedule,
        prompt: row.try_get("prompt")?,
        channel: row.try_get("channel")?,
        chat_id: row.try_get("chat_id")?,
        model: row.try_get("model")?,
        enabled: row.try_get::<i32, _>("enabled")? != 0,
        delete_after_run: row.try_get::<i32, _>("delete_after_run")? != 0,
        next_run_at: row.try_get("next_run_at")?,
        last_run_at: row.try_get("last_run_at")?,
        last_status: row.try_get("last_status")?,
        last_error: row.try_get("last_error")?,
        created_at: row.try_get("created_at")?,
        updated_at: row.try_get("updated_at")?,
    })
}
  • Step 4: Run tests to verify they pass

Run: cargo test --lib scheduler::store -- 2>&1 Expected: All 7 tests PASS.

  • Step 5: Commit
git add src/scheduler/store.rs
git commit -m "feat: add SchedulerStore with SQLite schema and CRUD"

Task 6: Add handle_cron_message to SessionManager

Files:

  • Modify: src/session/session.rs:1247 (after handle_message)

  • Step 1: Write the failing test

In src/session/session.rs, add this test inside the existing #[cfg(test)] mod tests block. If there is no existing test module, add it at the bottom of the file:

// Note: this test verifies only the compile-time signature. Full integration
// testing of handle_cron_message requires a running Gateway (see integration test).
#[tokio::test]
async fn test_handle_cron_message_exists() {
    // Compile-time assertion that the method exists on SessionManager
    // The actual behavior is tested in the integration test
    assert!(true);
}
  • Step 2: Run test to verify it fails

Run: cargo test --lib session::session::test_handle_cron_message_exists -- 2>&1 Expected: PASS (trivial test, always passes). The real verification is that cargo check succeeds after we add the method in step 3.

  • Step 3: Add handle_cron_message method

In src/session/session.rs, add after handle_message (after line 1247):

    /// Handle a message triggered by a scheduled cron job.
    ///
    /// This is similar to `handle_message`, but the user message is created with
    /// `SourceKind::ExternalTrigger` source metadata so that the cron job identity
    /// is preserved in the conversation history and database.
    pub async fn handle_cron_message(
        &self,
        channel: &str,
        chat_id: &str,
        prompt: &str,
        job_id: &str,
        job_name: &str,
    ) -> Result<HandleResult, AgentError> {
        use crate::bus::{MessageSource, SourceKind};

        let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
        *self.current_source_session.lock().await = Some(unified_id.to_string());
        tracing::debug!(unified_id = %unified_id, job_id = %job_id, "handle_cron_message resolved");

        let session = self.get_or_create_session(&unified_id).await?;

        // Normal message handling through LLM (cron messages skip slash command check)
        let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();

        // Spawn notification publisher
        {
            use std::collections::HashMap;
            use crate::bus::OutboundMessage;
            let bus = self.bus.clone();
            let ch = channel.to_string();
            let cid = chat_id.to_string();
            tokio::spawn(async move {
                while let Some(notif) = notify_rx.recv().await {
                    let mut metadata = HashMap::new();
                    metadata.insert("_type".to_string(), "notification".to_string());
                    let outbound = OutboundMessage {
                        channel: ch.clone(),
                        chat_id: cid.clone(),
                        content: notif,
                        reply_to: None,
                        media: vec![],
                        metadata,
                    };
                    let _ = bus.publish_outbound(outbound).await;
                }
            });
        }

        let response: String = {
            let mut session_guard = session.lock().await;

            // Build the user message with ExternalTrigger source
            let source = MessageSource {
                kind: SourceKind::ExternalTrigger,
                from_channel: Some(channel.to_string()),
                from_session: None,
                from_user_id: None,
                system_name: Some(job_name.to_string()),
                task_id: Some(job_id.to_string()),
            };
            let user_message = session_guard.create_user_message_with_source(prompt, vec![], source);
            session_guard.add_message(user_message, true).await
                .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;

            let mut history = session_guard.get_history().to_vec();

            let skills_prompt = self.skills_loader.build_skills_prompt();
            let system_prompt = session_guard.build_system_prompt(&skills_prompt);
            history.insert(0, ChatMessage::system(system_prompt));

            let history = session_guard.compressor
                .compress_if_needed(history)
                .await?;

            let agent = session_guard.create_agent_with_notify(notify_tx)?;
            let result = agent.process(history).await?;

            for msg in result.emitted_messages {
                session_guard.add_message(msg, true).await
                    .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
            }

            if session_guard.should_generate_title() {
                if let Err(e) = session_guard.generate_title().await {
                    tracing::warn!("failed to generate title: {}", e);
                }
            }

            result.final_response.content
        };

        #[cfg(debug_assertions)]
        tracing::debug!(
            channel = %channel,
            chat_id = %chat_id,
            job_id = %job_id,
            response_len = %response.len(),
            "Cron agent response received"
        );

        *self.current_source_session.lock().await = None;

        Ok(HandleResult::AgentResponse(response))
    }
  • Step 4: Verify build

Run: cargo check 2>&1 Expected: Compiles successfully.

  • Step 5: Commit
git add src/session/session.rs
git commit -m "feat: add SessionManager::handle_cron_message for scheduled task execution"

Task 7: Implement next_run_for_schedule and Scheduler Loop

Files:

  • Modify: src/scheduler/mod.rs (replace stub)

  • Step 1: Write the failing test

In src/scheduler/mod.rs, add inline tests:

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_next_run_at_schedule() {
        let now = 1000000;
        let next = next_run_for_schedule(&Schedule::At { at: 2000000 }, now);
        assert_eq!(next, Some(2000000));
    }

    #[test]
    fn test_next_run_every_schedule() {
        let now = 1000000;
        let next = next_run_for_schedule(&Schedule::Every { every_ms: 5000 }, now);
        assert_eq!(next, Some(1005000));
    }

    #[test]
    fn test_next_run_cron_schedule() {
        use chrono::Timelike;
        // Schedule: "every minute at second 0"
        let expr = "0 * * * * *".to_string();
        let schedule = Schedule::Cron { expr, tz: None };
        // Use a known time
        let now = 1000000;
        let next = next_run_for_schedule(&schedule, now);
        assert!(next.is_some());
        assert!(next.unwrap() > now);
    }

    #[test]
    fn test_next_run_cron_every_day_at_9am() {
        // "0 0 9 * * *" = every day at 9:00:00
        let expr = "0 0 9 * * *".to_string();
        let schedule = Schedule::Cron { expr, tz: None };
        let now = 1000000;
        let next = next_run_for_schedule(&schedule, now);
        assert!(next.is_some());
        let next_ms = next.unwrap();
        assert!(next_ms > now);
        // Round-trip to DateTime to check hour
        let next_dt = ms_to_datetime(next_ms);
        assert_eq!(next_dt.hour(), 9);
        assert_eq!(next_dt.minute(), 0);
    }
}
  • Step 2: Run tests to verify they fail

Run: cargo test --lib scheduler::mod -- 2>&1 Expected: FAIL — next_run_for_schedule not defined.

  • Step 3: Implement the full src/scheduler/mod.rs
pub mod types;
pub mod store;
pub mod tools;

use std::sync::Arc;
use std::time::Instant;
use tokio::time;

use crate::config::SchedulerConfig;
use crate::session::session::{self, HandleResult};
use crate::session::SessionManager;

pub use types::{JobRun, Schedule, ScheduledJob};

/// Compute the next execution time (Unix ms) for a schedule, given `from` (Unix ms).
/// Returns `None` if no next time can be determined (e.g., invalid cron expression).
pub fn next_run_for_schedule(schedule: &Schedule, from: i64) -> Option<i64> {
    use chrono::{DateTime, TimeZone, Utc};

    match schedule {
        Schedule::At { at } => Some(*at),
        Schedule::Every { every_ms } => Some(from + *every_ms as i64),
        Schedule::Cron { expr, tz } => {
            let schedule = cron::Schedule::from_str(expr.as_str()).ok()?;
            // Convert Unix ms to UTC DateTime
            let from_secs = (from / 1000) as i64;
            let from_nanos = ((from % 1000) * 1_000_000) as u32;
            let from_dt = Utc.timestamp_opt(from_secs, from_nanos).single()?;

            // If timezone is specified, convert from local to UTC for comparison
            let next_utc = if let Some(ref tz_str) = tz {
                let tz: chrono_tz::Tz = tz_str.parse().ok()?;
                let from_local = from_dt.with_timezone(&tz);
                // Find the next match in the given timezone, then convert back to UTC
                let next_local = schedule.upcoming(tz).next()?;
                next_local.with_timezone(&Utc)
            } else {
                schedule.upcoming(Utc).next()?
            };

            Some(next_utc.timestamp_millis())
        }
    }
}

/// Convert Unix milliseconds to DateTime<Utc>.
fn ms_to_datetime(ms: i64) -> chrono::DateTime<chrono::Utc> {
    use chrono::{TimeZone, Utc};
    let secs = (ms / 1000) as i64;
    let nanos = ((ms % 1000) * 1_000_000) as u32;
    Utc.timestamp_opt(secs, nanos).single().unwrap_or_default()
}

fn now_ms() -> i64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis() as i64
}

/// The scheduler runs as a background tokio task, periodically checking for due jobs
/// and executing them via `SessionManager::handle_cron_message`.
pub struct Scheduler {
    pool: sqlx::SqlitePool,
    session_manager: Arc<SessionManager>,
    config: SchedulerConfig,
}

impl Scheduler {
    pub fn new(
        pool: sqlx::SqlitePool,
        session_manager: Arc<SessionManager>,
        config: SchedulerConfig,
    ) -> Self {
        Self {
            pool,
            session_manager,
            config,
        }
    }

    /// Run the scheduler loop. This is a long-running async function meant to be
    /// spawned as a tokio background task.
    pub async fn run(self: Arc<Self>) {
        let poll_duration = time::Duration::from_secs(self.config.poll_interval_secs);
        let mut interval = time::interval(poll_duration);

        // Skip the immediate first tick (tokio::time::interval fires immediately on first poll)
        interval.tick().await;

        tracing::info!(
            "Scheduler started (poll interval: {}s, max concurrent: {})",
            self.config.poll_interval_secs,
            self.config.max_concurrent,
        );

        loop {
            interval.tick().await;

            let now = now_ms();

            let due = match store::SchedulerStore::due_jobs(&self.pool, now, self.config.max_concurrent).await {
                Ok(jobs) => jobs,
                Err(e) => {
                    tracing::error!("scheduler: failed to query due jobs: {}", e);
                    continue;
                }
            };

            if due.is_empty() {
                continue;
            }

            tracing::info!("scheduler: found {} due job(s)", due.len());

            for job in &due {
                let start = Instant::now();
                let started_at = now_ms();

                // Update last_run_at so next poll doesn't re-execute
                if let Err(e) = store::SchedulerStore::touch_last_run(&self.pool, &job.id, started_at).await {
                    tracing::error!(job_id = %job.id, "scheduler: failed to touch last_run_at: {}", e);
                    continue;
                }

                tracing::info!(
                    job_id = %job.id,
                    job_name = %job.name,
                    "scheduler: executing cron job"
                );

                let result = self
                    .session_manager
                    .handle_cron_message(
                        &job.channel,
                        &job.chat_id,
                        &job.prompt,
                        &job.id,
                        &job.name,
                    )
                    .await;

                let finished_at = now_ms();
                let duration_ms = start.elapsed().as_millis() as i64;

                match result {
                    Ok(HandleResult::AgentResponse(output)) => {
                        let output_truncated = if output.len() > 8000 {
                            format!("{}...[truncated]", &output[..8000])
                        } else {
                            output.clone()
                        };

                        let run = JobRun {
                            id: 0,
                            job_id: job.id.clone(),
                            started_at,
                            finished_at,
                            status: "ok".to_string(),
                            output: Some(output_truncated),
                            error: None,
                            duration_ms,
                        };

                        if let Err(e) = store::SchedulerStore::record_run(&self.pool, &run).await {
                            tracing::error!(job_id = %job.id, "scheduler: failed to record run: {}", e);
                        }

                        if let Err(e) = store::SchedulerStore::set_last_status(&self.pool, &job.id, "ok", None).await {
                            tracing::error!(job_id = %job.id, "scheduler: failed to set last_status: {}", e);
                        }

                        tracing::info!(
                            job_id = %job.id,
                            duration_ms = %duration_ms,
                            "scheduler: job completed successfully"
                        );
                    }
                    Ok(HandleResult::CommandOutput(output)) => {
                        // Cron jobs shouldn't trigger commands, but handle gracefully
                        let run = JobRun {
                            id: 0,
                            job_id: job.id.clone(),
                            started_at,
                            finished_at,
                            status: "ok".to_string(),
                            output: Some(output),
                            error: None,
                            duration_ms,
                        };

                        let _ = store::SchedulerStore::record_run(&self.pool, &run).await;
                    }
                    Err(e) => {
                        let error_str = e.to_string();
                        let run = JobRun {
                            id: 0,
                            job_id: job.id.clone(),
                            started_at,
                            finished_at,
                            status: "error".to_string(),
                            output: None,
                            error: Some(error_str.clone()),
                            duration_ms,
                        };

                        if let Err(e2) = store::SchedulerStore::record_run(&self.pool, &run).await {
                            tracing::error!(job_id = %job.id, "scheduler: failed to record error run: {}", e2);
                        }

                        if let Err(e2) = store::SchedulerStore::set_last_status(
                            &self.pool, &job.id, "error", Some(&error_str),
                        ).await {
                            tracing::error!(job_id = %job.id, "scheduler: failed to set error status: {}", e2);
                        }

                        tracing::error!(
                            job_id = %job.id,
                            duration_ms = %duration_ms,
                            error = %error_str,
                            "scheduler: job failed"
                        );
                    }
                }

                // Reschedule the job
                if let Err(e) = self.reschedule_after_run(job).await {
                    tracing::error!(job_id = %job.id, "scheduler: failed to reschedule: {}", e);
                }
            }
        }
    }

    /// After a job runs, compute its next execution time or disable/delete it.
    async fn reschedule_after_run(
        &self,
        job: &ScheduledJob,
    ) -> Result<(), Box<dyn std::error::Error>> {
        let now = now_ms();

        match &job.schedule {
            Schedule::At { .. } => {
                if job.delete_after_run {
                    store::SchedulerStore::remove_job(&self.pool, &job.id).await?;
                    tracing::info!(job_id = %job.id, "scheduler: one-shot job deleted after run");
                } else {
                    store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?;
                    tracing::info!(job_id = %job.id, "scheduler: one-shot job disabled after run");
                }
            }
            Schedule::Every { .. } | Schedule::Cron { .. } => {
                if let Some(next) = next_run_for_schedule(&job.schedule, now) {
                    store::SchedulerStore::set_next_run(&self.pool, &job.id, next).await?;
                    tracing::info!(job_id = %job.id, next_run_at = %next, "scheduler: job rescheduled");
                } else {
                    tracing::error!(job_id = %job.id, "scheduler: could not compute next run — disabling job");
                    store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?;
                }
            }
        }

        Ok(())
    }
}
  • Step 4: Run tests to verify they pass

Run: cargo test --lib scheduler::mod -- 2>&1 Expected: All 5 tests PASS (next_run_at_schedule, next_run_every_schedule, next_run_cron_schedule, next_run_cron_every_day_at_9am, and the compile-time assertion).

  • Step 5: Commit
git add src/scheduler/mod.rs
git commit -m "feat: add Scheduler run loop and next_run_for_schedule"

Task 8: Wire Scheduler into Gateway

Files:

  • Modify: src/gateway/mod.rs

  • Step 1: Import scheduler module

In src/gateway/mod.rs, add import after use crate::session::SessionManager; (line 13):

use crate::scheduler::Scheduler;
use crate::scheduler::store as scheduler_store;
  • Step 2: Create scheduler in GatewayState::new()

In GatewayState::new() (after line 76 — after session_manager.register_outbound_tool(...)), add:

        // Initialize scheduler if enabled in config
        let scheduler_config = config.gateway.scheduler.clone().unwrap_or_default();
        if scheduler_config.enabled {
            // Initialize scheduler tables in the database
            scheduler_store::SchedulerStore::init(storage.pool())
                .await
                .map_err(|e| format!("failed to initialize scheduler store: {}", e))?;
            tracing::info!("Scheduler store initialized");
        }
  • Step 3: Spawn scheduler in start_message_processing()

In start_message_processing() (after line 170 — after the outbound dispatcher spawn), add:

        // Spawn scheduler background task if enabled
        let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default();
        if scheduler_config.enabled {
            let sched = Arc::new(Scheduler::new(
                storage.pool().clone(),
                self.session_manager.clone(),
                scheduler_config,
            ));
            tokio::spawn(async move {
                sched.run().await;
            });
            tracing::info!("Scheduler background task spawned");
        }

Wait — there's a problem. start_message_processing takes &self, and storage is not a field of GatewayState. We need to pass the pool reference to start_message_processing.

Let me adjust start_message_processing to accept a pool: sqlx::SqlitePool parameter, or store the pool in GatewayState. Looking at the existing code, Storage is only referenced in GatewayState::new() — it's not stored as a field. We need the pool.

Solution: Store sqlx::SqlitePool directly in GatewayState.

  • Step 3a: Add pool field to GatewayState
pub struct GatewayState {
    pub config: Config,
    pub workspace_dir: std::path::PathBuf,
    pub session_manager: Arc<SessionManager>,
    pub channel_manager: ChannelManager,
    pub pool: sqlx::SqlitePool,  // <-- add this
}
  • Step 3b: Set pool in GatewayState::new()

After creating storage (line 53), clone the pool:

let pool = storage.pool().clone();

Then in the Ok(Self { ... }) block, add:

            pool,
  • Step 3c: Use pool in start_message_processing

After the outbound dispatcher spawn block (after line 170), add:

        // Spawn scheduler background task if enabled
        let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default();
        if scheduler_config.enabled {
            let sched = Arc::new(Scheduler::new(
                self.pool.clone(),
                self.session_manager.clone(),
                scheduler_config,
            ));
            tokio::spawn(async move {
                sched.run().await;
            });
            tracing::info!("Scheduler background task spawned");
        }
  • Step 4: Verify build

Run: cargo check 2>&1 Expected: Compiles successfully.

  • Step 5: Commit
git add src/gateway/mod.rs
git commit -m "feat: wire scheduler into GatewayState startup and message processing"

Task 9: Implement Agent Tools

Files:

  • Create: src/scheduler/tools.rs

  • Modify: src/gateway/mod.rs (register tools)

  • Step 1: Write the failing test

At the bottom of src/scheduler/tools.rs, add:

#[cfg(test)]
mod tests {
    use super::*;
    use crate::scheduler::types::{Schedule, ScheduledJob};
    use crate::scheduler::store::SchedulerStore;
    use serde_json::json;
    use sqlx::SqlitePool;

    async fn setup_pool() -> SqlitePool {
        let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
        SchedulerStore::init(&pool).await.unwrap();
        pool
    }

    fn now() -> i64 {
        std::time::SystemTime::now()
            .duration_since(std::time::UNIX_EPOCH)
            .unwrap()
            .as_millis() as i64
    }

    #[tokio::test]
    async fn test_cron_add_tool() {
        let pool = setup_pool().await;
        let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]);
        let result = tool.execute(json!({
            "schedule": {"type": "every", "every_ms": 3600000},
            "prompt": "report status",
            "channel": "cli_chat",
            "chat_id": "test-chat-1",
            "name": "hourly report"
        })).await.unwrap();
        assert!(result.success);
        assert!(result.output.contains("hourly report"));

        let jobs = SchedulerStore::list_jobs(&pool).await.unwrap();
        assert_eq!(jobs.len(), 1);
        assert_eq!(jobs[0].name, "hourly report");
    }

    #[tokio::test]
    async fn test_cron_add_invalid_channel() {
        let pool = setup_pool().await;
        let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]);
        let result = tool.execute(json!({
            "schedule": {"type": "every", "every_ms": 3600000},
            "prompt": "test",
            "channel": "nonexistent",
            "chat_id": "x",
            "name": "test"
        })).await.unwrap();
        assert!(!result.success);
        assert!(result.error.as_ref().unwrap().contains("Unknown channel"));
    }

    #[tokio::test]
    async fn test_cron_list_tool() {
        let pool = setup_pool().await;
        let t = now();
        let job = ScheduledJob {
            id: uuid::Uuid::new_v4().to_string(),
            name: "list-test".into(),
            schedule: Schedule::Every { every_ms: 1000 },
            prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
            model: None, enabled: true, delete_after_run: false,
            next_run_at: t + 1000, last_run_at: None,
            last_status: None, last_error: None,
            created_at: t, updated_at: t,
        };
        SchedulerStore::add_job(&pool, &job).await.unwrap();

        let tool = CronListTool::new(pool.clone());
        let result = tool.execute(json!({})).await.unwrap();
        assert!(result.success);
        assert!(result.output.contains("list-test"));
    }

    #[tokio::test]
    async fn test_cron_remove_tool() {
        let pool = setup_pool().await;
        let t = now();
        let job = ScheduledJob {
            id: "job-rm-tool".into(), name: "rm me".into(),
            schedule: Schedule::Every { every_ms: 1000 },
            prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
            model: None, enabled: true, delete_after_run: false,
            next_run_at: t, last_run_at: None,
            last_status: None, last_error: None,
            created_at: t, updated_at: t,
        };
        SchedulerStore::add_job(&pool, &job).await.unwrap();

        let tool = CronRemoveTool::new(pool.clone());
        let result = tool.execute(json!({"job_id": "job-rm-tool"})).await.unwrap();
        assert!(result.success);
        assert!(SchedulerStore::get_job(&pool, "job-rm-tool").await.is_err());
    }

    #[tokio::test]
    async fn test_cron_enable_disable_tools() {
        let pool = setup_pool().await;
        let t = now();
        let job = ScheduledJob {
            id: "job-toggle-tool".into(), name: "toggle".into(),
            schedule: Schedule::Every { every_ms: 1000 },
            prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
            model: None, enabled: true, delete_after_run: false,
            next_run_at: t, last_run_at: None,
            last_status: None, last_error: None,
            created_at: t, updated_at: t,
        };
        SchedulerStore::add_job(&pool, &job).await.unwrap();

        let disable_tool = CronDisableTool::new(pool.clone());
        let result = disable_tool.execute(json!({"job_id": "job-toggle-tool"})).await.unwrap();
        assert!(result.success);

        let got = SchedulerStore::get_job(&pool, "job-toggle-tool").await.unwrap();
        assert!(!got.enabled);

        let enable_tool = CronEnableTool::new(pool.clone());
        let result = enable_tool.execute(json!({"job_id": "job-toggle-tool"})).await.unwrap();
        assert!(result.success);

        let got = SchedulerStore::get_job(&pool, "job-toggle-tool").await.unwrap();
        assert!(got.enabled);
    }

    #[tokio::test]
    async fn test_cron_update_tool() {
        let pool = setup_pool().await;
        let t = now();
        let job = ScheduledJob {
            id: "job-update-tool".into(), name: "old".into(),
            schedule: Schedule::Every { every_ms: 3600000 },
            prompt: "old prompt".into(), channel: "feishu".into(),
            chat_id: "oc_1".into(), model: None,
            enabled: true, delete_after_run: false,
            next_run_at: t + 1000, last_run_at: None,
            last_status: None, last_error: None,
            created_at: t, updated_at: t,
        };
        SchedulerStore::add_job(&pool, &job).await.unwrap();

        let tool = CronUpdateTool::new(pool.clone());
        let result = tool.execute(json!({
            "job_id": "job-update-tool",
            "prompt": "new prompt",
            "schedule": {"type": "every", "every_ms": 60000}
        })).await.unwrap();
        assert!(result.success);

        let got = SchedulerStore::get_job(&pool, "job-update-tool").await.unwrap();
        assert_eq!(got.prompt, "new prompt");
    }
}
  • Step 2: Run tests to verify they fail

Run: cargo test --lib scheduler::tools -- 2>&1 Expected: FAIL — tool structs not defined.

  • Step 3: Implement src/scheduler/tools.rs
use async_trait::async_trait;
use serde_json::{json, Value};
use sqlx::SqlitePool;
use uuid::Uuid;

use crate::scheduler::store::SchedulerStore;
use crate::scheduler::types::{Schedule, ScheduledJob};
use crate::tools::traits::{Tool, ToolResult};
use crate::scheduler::next_run_for_schedule;

fn now_ms() -> i64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .as_millis() as i64
}

// ── CronAddTool ──────────────────────────────────────────────────────────────

pub struct CronAddTool {
    pool: SqlitePool,
    valid_channels: Vec<String>,
}

impl CronAddTool {
    pub fn new(pool: SqlitePool, valid_channels: Vec<String>) -> Self {
        Self { pool, valid_channels }
    }
}

#[async_trait]
impl Tool for CronAddTool {
    fn name(&self) -> &str { "cron_add" }

    fn description(&self) -> &str {
        "Create a new scheduled task (cron job). The task will execute an AI prompt on a schedule \
         and deliver the result to the specified channel/chat. \
         Schedule formats: \
         - 'every': {\"type\":\"every\",\"every_ms\":3600000} for every hour, \
         - 'at': {\"type\":\"at\",\"at\":<unix_timestamp_ms>} for one-shot, \
         - 'cron': {\"type\":\"cron\",\"expr\":\"0 0 9 * * *\"} for cron expressions (6-field: sec min hour dom month dow)."
    }

    fn parameters_schema(&self) -> Value {
        json!({
            "type": "object",
            "properties": {
                "schedule": {
                    "type": "object",
                    "description": "Schedule definition. One of: {\"type\":\"every\",\"every_ms\":<ms>}, {\"type\":\"at\",\"at\":<unix_ms>}, or {\"type\":\"cron\",\"expr\":\"<cron_expr>\",\"tz\":\"<tz>\"}",
                    "required": ["type"]
                },
                "prompt": {
                    "type": "string",
                    "description": "The AI prompt to execute on each trigger"
                },
                "channel": {
                    "type": "string",
                    "description": "Target channel for delivering results (e.g., 'feishu', 'cli_chat')"
                },
                "chat_id": {
                    "type": "string",
                    "description": "Target chat ID within the channel"
                },
                "name": {
                    "type": "string",
                    "description": "Human-readable name for the job (optional, defaults to truncated prompt)"
                },
                "model": {
                    "type": "string",
                    "description": "Optional model override for this job"
                }
            },
            "required": ["schedule", "prompt", "channel", "chat_id"]
        })
    }

    async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
        let schedule_json = args.get("schedule").ok_or_else(|| anyhow::anyhow!("missing 'schedule'"))?;
        let schedule: Schedule = serde_json::from_value(schedule_json.clone())
            .map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?;

        let prompt = args.get("prompt").and_then(|v| v.as_str()).unwrap_or("").to_string();
        if prompt.is_empty() {
            return Ok(ToolResult { success: false, output: String::new(), error: Some("prompt is required".into()) });
        }

        let channel = args.get("channel").and_then(|v| v.as_str()).unwrap_or("").to_string();
        if !self.valid_channels.contains(&channel) {
            return Ok(ToolResult {
                success: false,
                output: format!("Unknown channel '{}'. Available: {}",
                    channel, self.valid_channels.join(", ")),
                error: Some(format!("Unknown channel: {}", channel)),
            });
        }

        let chat_id = args.get("chat_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
        if chat_id.is_empty() {
            return Ok(ToolResult { success: false, output: String::new(), error: Some("chat_id is required".into()) });
        }

        let name = args.get("name").and_then(|v| v.as_str()).unwrap_or(&prompt[..prompt.len().min(50)]).to_string();
        let model = args.get("model").and_then(|v| v.as_str()).map(|s| s.to_string());

        let now = now_ms();
        let next_run_at = next_run_for_schedule(&schedule, now)
            .ok_or_else(|| anyhow::anyhow!("could not compute next run time from schedule"))?;

        let id = Uuid::new_v4().to_string();
        let job = ScheduledJob {
            id: id.clone(),
            name: name.clone(),
            schedule,
            prompt,
            channel,
            chat_id,
            model,
            enabled: true,
            delete_after_run: false,
            next_run_at,
            last_run_at: None,
            last_status: None,
            last_error: None,
            created_at: now,
            updated_at: now,
        };

        SchedulerStore::add_job(&self.pool, &job).await?;

        Ok(ToolResult {
            success: true,
            output: format!("Scheduled job created: id={}, name=\"{}\", next_run_at={}", id, name, next_run_at),
            error: None,
        })
    }
}

// ── CronListTool ─────────────────────────────────────────────────────────────

pub struct CronListTool {
    pool: SqlitePool,
}

impl CronListTool {
    pub fn new(pool: SqlitePool) -> Self { Self { pool } }
}

#[async_trait]
impl Tool for CronListTool {
    fn name(&self) -> &str { "cron_list" }

    fn description(&self) -> &str {
        "List all scheduled tasks (cron jobs) with their status and next run time."
    }

    fn read_only(&self) -> bool { true }

    fn parameters_schema(&self) -> Value {
        json!({
            "type": "object",
            "properties": {
                "status": {
                    "type": "string",
                    "enum": ["all", "enabled", "disabled"],
                    "description": "Filter by job status (default: all)"
                }
            }
        })
    }

    async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
        let filter = args.get("status").and_then(|v| v.as_str()).unwrap_or("all");
        let jobs = SchedulerStore::list_jobs(&self.pool).await?;

        let filtered: Vec<&ScheduledJob> = match filter {
            "enabled" => jobs.iter().filter(|j| j.enabled).collect(),
            "disabled" => jobs.iter().filter(|j| !j.enabled).collect(),
            _ => jobs.iter().collect(),
        };

        if filtered.is_empty() {
            return Ok(ToolResult { success: true, output: "No scheduled jobs found.".into(), error: None });
        }

        let mut lines = Vec::new();
        for j in &filtered {
            let status = if j.enabled { "🟢" } else { "⚫" };
            let last = match (&j.last_status, &j.last_error) {
                (Some(s), _) if s == "ok" => " last:✅".to_string(),
                (Some(_), Some(e)) => format!(" last:❌({})", &e[..e.len().min(40)]),
                _ => String::new(),
            };
            let model = j.model.as_deref().unwrap_or("default");
            lines.push(format!(
                "{} id={} name=\"{}\" channel={} chat={} model={} next={}{}",
                status, j.id, j.name, j.channel, j.chat_id, model, j.next_run_at, last
            ));
        }

        Ok(ToolResult { success: true, output: lines.join("\n"), error: None })
    }
}

// ── CronRemoveTool ───────────────────────────────────────────────────────────

pub struct CronRemoveTool {
    pool: SqlitePool,
}

impl CronRemoveTool {
    pub fn new(pool: SqlitePool) -> Self { Self { pool } }
}

#[async_trait]
impl Tool for CronRemoveTool {
    fn name(&self) -> &str { "cron_remove" }

    fn description(&self) -> &str {
        "Delete a scheduled task permanently by its job ID. Use cron_list first to find the ID."
    }

    fn parameters_schema(&self) -> Value {
        json!({
            "type": "object",
            "properties": {
                "job_id": {
                    "type": "string",
                    "description": "The ID of the job to delete"
                }
            },
            "required": ["job_id"]
        })
    }

    async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
        let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
        if job_id.is_empty() {
            return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
        }

        // Verify job exists
        match SchedulerStore::get_job(&self.pool, &job_id).await {
            Ok(_) => {},
            Err(_) => return Ok(ToolResult { success: false, output: format!("Job {} not found.", job_id), error: Some("not found".into()) }),
        }

        SchedulerStore::remove_job(&self.pool, &job_id).await?;
        Ok(ToolResult { success: true, output: format!("Job {} deleted.", job_id), error: None })
    }
}

// ── CronEnableTool ───────────────────────────────────────────────────────────

pub struct CronEnableTool {
    pool: SqlitePool,
}

impl CronEnableTool {
    pub fn new(pool: SqlitePool) -> Self { Self { pool } }
}

#[async_trait]
impl Tool for CronEnableTool {
    fn name(&self) -> &str { "cron_enable" }

    fn description(&self) -> &str { "Enable a disabled scheduled task by its job ID." }

    fn parameters_schema(&self) -> Value {
        json!({
            "type": "object",
            "properties": {
                "job_id": {
                    "type": "string",
                    "description": "The ID of the job to enable"
                }
            },
            "required": ["job_id"]
        })
    }

    async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
        let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
        if job_id.is_empty() {
            return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
        }

        let job = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?;

        let next = next_run_for_schedule(&job.schedule, now_ms());
        SchedulerStore::set_enabled(&self.pool, &job_id, true).await?;
        if let Some(n) = next {
            SchedulerStore::set_next_run(&self.pool, &job_id, n).await?;
        }

        Ok(ToolResult { success: true, output: format!("Job {} enabled.", job_id), error: None })
    }
}

// ── CronDisableTool ──────────────────────────────────────────────────────────

pub struct CronDisableTool {
    pool: SqlitePool,
}

impl CronDisableTool {
    pub fn new(pool: SqlitePool) -> Self { Self { pool } }
}

#[async_trait]
impl Tool for CronDisableTool {
    fn name(&self) -> &str { "cron_disable" }

    fn description(&self) -> &str { "Disable a scheduled task by its job ID without deleting it." }

    fn parameters_schema(&self) -> Value {
        json!({
            "type": "object",
            "properties": {
                "job_id": {
                    "type": "string",
                    "description": "The ID of the job to disable"
                }
            },
            "required": ["job_id"]
        })
    }

    async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
        let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
        if job_id.is_empty() {
            return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
        }

        let _ = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?;
        SchedulerStore::set_enabled(&self.pool, &job_id, false).await?;

        Ok(ToolResult { success: true, output: format!("Job {} disabled.", job_id), error: None })
    }
}

// ── CronUpdateTool ───────────────────────────────────────────────────────────

pub struct CronUpdateTool {
    pool: SqlitePool,
}

impl CronUpdateTool {
    pub fn new(pool: SqlitePool) -> Self { Self { pool } }
}

#[async_trait]
impl Tool for CronUpdateTool {
    fn name(&self) -> &str { "cron_update" }

    fn description(&self) -> &str {
        "Update fields of an existing scheduled task. Only specified fields are changed."
    }

    fn parameters_schema(&self) -> Value {
        json!({
            "type": "object",
            "properties": {
                "job_id": {
                    "type": "string",
                    "description": "The ID of the job to update"
                },
                "prompt": {
                    "type": "string",
                    "description": "New AI prompt"
                },
                "schedule": {
                    "type": "object",
                    "description": "New schedule definition"
                },
                "channel": {
                    "type": "string",
                    "description": "New target channel"
                },
                "chat_id": {
                    "type": "string",
                    "description": "New target chat ID"
                },
                "model": {
                    "type": "string",
                    "description": "New model override"
                }
            },
            "required": ["job_id"]
        })
    }

    async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
        let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
        if job_id.is_empty() {
            return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
        }

        let _ = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?;

        let prompt = args.get("prompt").and_then(|v| v.as_str()).map(|s| s.to_string());
        let schedule: Option<Schedule> = match args.get("schedule") {
            Some(s) => Some(serde_json::from_value(s.clone()).map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?),
            None => None,
        };
        let channel = args.get("channel").and_then(|v| v.as_str()).map(|s| s.to_string());
        let chat_id = args.get("chat_id").and_then(|v| v.as_str()).map(|s| s.to_string());
        let model = args.get("model").and_then(|v| v.as_str()).map(|s| s.to_string());

        SchedulerStore::update_job(&self.pool, &job_id, prompt, schedule, channel, chat_id, model).await?;

        // If schedule changed, recompute next_run_at
        if args.get("schedule").is_some() {
            let job = SchedulerStore::get_job(&self.pool, &job_id).await?;
            if let Some(next) = next_run_for_schedule(&job.schedule, now_ms()) {
                SchedulerStore::set_next_run(&self.pool, &job_id, next).await?;
            }
        }

        Ok(ToolResult { success: true, output: format!("Job {} updated.", job_id), error: None })
    }
}
  • Step 4: Run tests to verify they pass

Run: cargo test --lib scheduler::tools -- 2>&1 Expected: All 6 tests PASS.

  • Step 5: Register tools in gateway

In src/gateway/mod.rs, after session_manager.register_outbound_tool(available_channels); (line 76), add:

        // Register cron tools if scheduler is enabled
        if config.gateway.scheduler.as_ref().map_or(true, |c| c.enabled) {
            let scheduler_pool = pool.clone();
            let valid_channels = available_channels.clone();
            session_manager.tools().register(
                crate::scheduler::tools::CronAddTool::new(scheduler_pool.clone(), valid_channels)
            );
            session_manager.tools().register(
                crate::scheduler::tools::CronListTool::new(scheduler_pool.clone())
            );
            session_manager.tools().register(
                crate::scheduler::tools::CronRemoveTool::new(scheduler_pool.clone())
            );
            session_manager.tools().register(
                crate::scheduler::tools::CronEnableTool::new(scheduler_pool.clone())
            );
            session_manager.tools().register(
                crate::scheduler::tools::CronDisableTool::new(scheduler_pool.clone())
            );
            session_manager.tools().register(
                crate::scheduler::tools::CronUpdateTool::new(scheduler_pool.clone())
            );
            tracing::info!("Cron tools registered");
        }
  • Step 6: Verify build

Run: cargo check 2>&1 Expected: Compiles successfully.

  • Step 7: Commit
git add src/scheduler/tools.rs src/gateway/mod.rs
git commit -m "feat: add 6 cron agent tools (add/list/remove/enable/disable/update)"

Task 10: Full Build, Lint, and Unit Tests

Files:

  • All above

  • Step 1: Run full check

Run: cargo check 2>&1 Expected: Compiles successfully, no errors, no warnings.

  • Step 2: Run cargo clippy

Run: cargo clippy -- -D warnings 2>&1 Expected: No warnings emitted.

  • Step 3: Run all unit tests

Run: cargo test --lib 2>&1 Expected: All tests PASS, including existing tests and the 18 new scheduler tests.

  • Step 4: Commit
git add --all
git commit -m "feat: complete scheduled tasks implementation"

Task 11: Integration Test

Files:

  • Modify: tests/test_integration.rs (or create tests/test_scheduler.rs)

  • Step 1: Write integration test

Create tests/test_scheduler.rs:

//! Integration tests for the scheduled tasks (cron) system.
//! Requires `.env` with real API keys and a running Gateway.
//! Run with: cargo test --test test_scheduler -- --ignored

use serde_json::json;

/// This test verifies that the scheduler module compiles and its types are
/// accessible. Full integration testing requires a running Gateway instance
/// with API keys, so the actual job-execution flow is tested there.
#[tokio::test]
async fn test_scheduler_types_roundtrip() {
    use picobot::scheduler::Schedule;

    // Verify JSON (de)serialization works
    let s1 = Schedule::Every { every_ms: 3600000 };
    let json = serde_json::to_string(&s1).unwrap();
    let s2: Schedule = serde_json::from_str(&json).unwrap();
    match s2 {
        Schedule::Every { every_ms } => assert_eq!(every_ms, 3600000),
        _ => panic!("expected Every"),
    }

    let s1 = Schedule::At { at: 1000000 };
    let json = serde_json::to_string(&s1).unwrap();
    let s2: Schedule = serde_json::from_str(&json).unwrap();
    match s2 {
        Schedule::At { at } => assert_eq!(at, 1000000),
        _ => panic!("expected At"),
    }

    let s1 = Schedule::Cron { expr: "0 0 9 * * *".into(), tz: None };
    let json = serde_json::to_string(&s1).unwrap();
    let s2: Schedule = serde_json::from_str(&json).unwrap();
    match s2 {
        Schedule::Cron { expr, tz } => {
            assert_eq!(expr, "0 0 9 * * *");
            assert!(tz.is_none());
        }
        _ => panic!("expected Cron"),
    }
}

/// Verify that next_run_for_schedule produces valid future timestamps.
#[test]
fn test_next_run_always_future() {
    use picobot::scheduler::{next_run_for_schedule, Schedule};

    let now = 1700000000000_i64; // Some fixed reference time

    let schedules = vec![
        Schedule::Every { every_ms: 60000 },
        Schedule::Cron { expr: "0 0 9 * * *".into(), tz: None },
    ];

    for s in &schedules {
        let next = next_run_for_schedule(s, now);
        assert!(next.is_some(), "expected next run for {:?}", s);
        assert!(next.unwrap() > now, "next run should be after now for {:?}", s);
    }
}

/// Verify that one-shot At schedule disables after run (logic tested in unit tests,
/// this just ensures the schedule round-trips correctly).
#[test]
fn test_at_schedule_is_one_shot_by_contract() {
    use picobot::scheduler::Schedule;
    // At schedules by definition fire once — the scheduler loop handles
    // disabling/deleting after run. This test confirms the type is correct.
    let s = Schedule::At { at: 1700000000000 };
    let json = serde_json::to_string(&s).unwrap();
    assert!(json.contains("\"at\""));
}
  • Step 2: Run integration test

Run: cargo test --test test_scheduler 2>&1 Expected: All 3 tests PASS.

  • Step 3: Commit
git add tests/test_scheduler.rs
git commit -m "test: add scheduler integration tests"

Task 12: Final Verification

  • Step 1: Full test suite

Run: cargo test --lib 2>&1 Expected: All tests PASS.

Run: cargo test --test test_scheduler 2>&1 Expected: All tests PASS.

  • Step 2: Build binary

Run: cargo build 2>&1 Expected: Build succeeds with no errors.

  • Step 3: Grep for TODOs / placeholders

Run: grep -rn "TODO\|FIXME\|TBD\|todo!\|unimplemented!" src/scheduler/ 2>&1 Expected: No output (no placeholders).

  • Step 4: Final commit (if any changes)
git status
git add --all
git commit -m "chore: final cleanup for scheduled tasks"