- storage/scheduler.rs: ScheduledJob/JobRun types + CRUD on Storage - tools/cron.rs: 6 cron agent tools (add/list/remove/enable/disable/update) - scheduler/types.rs: keep only Schedule enum - scheduler/mod.rs: use Arc<Storage> instead of raw SqlitePool - gateway/mod.rs: inject Storage directly, replace pool field - storage/mod.rs: scheduler tables in init_schema
78 KiB
Scheduled Tasks (Cron Jobs) Implementation Plan
For agentic workers: REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (
- [ ]) syntax for tracking.
Goal: Add a cron-like scheduled task system to PicoBot that triggers agent LLM prompts on a schedule (cron expression / fixed interval / one-shot), with results delivered to channels.
Architecture: A new src/scheduler/ module with its own SQLite store (sharing the existing sqlx::SqlitePool via storage.pool()). The scheduler runs as a tokio background task, calling SessionManager::handle_cron_message() directly. Agent-facing tools (cron_add, cron_list, etc.) let the LLM manage jobs. Schedule computation uses the cron and chrono-tz crates.
Tech Stack: Rust, tokio, sqlx, cron 0.15, chrono-tz 0.10
File Structure
| File | Create/Modify | Responsibility |
|---|---|---|
Cargo.toml |
Modify | Add cron, chrono-tz dependencies |
src/lib.rs |
Modify | Add pub mod scheduler; |
src/config/mod.rs |
Modify | Add SchedulerConfig to GatewayConfig |
src/scheduler/types.rs |
Create | Schedule, ScheduledJob, JobRun data types |
src/scheduler/store.rs |
Create | SQLite schema + CRUD for scheduled_jobs and job_runs |
src/scheduler/mod.rs |
Create | Scheduler struct, run() loop, next_run_for_schedule() |
src/scheduler/tools.rs |
Create | 6 agent tools: cron_add/list/remove/enable/disable/update |
src/bus/message.rs |
Modify | Add ChatMessage::user_with_source() factory |
src/session/session.rs |
Modify | Add handle_cron_message() method |
src/gateway/mod.rs |
Modify | Create Scheduler, spawn background task, register cron tools |
src/session/session_id.rs |
Modify | Add from_components() convenience constructor |
Task 1: Add Cron Dependencies
Files:
-
Modify:
Cargo.toml -
Step 1: Add
cronandchrono-tzto dependencies
After line 28 (tempfile = "3"), insert:
cron = "0.15"
chrono-tz = "0.10"
- Step 2: Verify build
Run: cargo check 2>&1
Expected: Dependencies download and resolve. No code referencing them yet, so no compile errors.
- Step 3: Commit
git add Cargo.toml Cargo.lock
git commit -m "deps: add cron and chrono-tz for scheduled tasks"
Task 2: Add SchedulerConfig
Files:
-
Modify:
src/config/mod.rs:143(after existingsession_db_pathfield) -
Step 1: Define
SchedulerConfigstruct
Add after the closing } of GatewayConfig (after line 143, before the ClientConfig block):
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchedulerConfig {
/// Whether the scheduler is enabled
#[serde(default = "default_scheduler_enabled")]
pub enabled: bool,
/// Poll interval in seconds (how often to check for due jobs)
#[serde(default = "default_poll_interval_secs")]
pub poll_interval_secs: u64,
/// Maximum concurrent job executions (currently sequential, reserved for future)
#[serde(default = "default_max_concurrent")]
pub max_concurrent: usize,
}
fn default_scheduler_enabled() -> bool { true }
fn default_poll_interval_secs() -> u64 { 60 }
fn default_max_concurrent() -> usize { 1 }
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
enabled: true,
poll_interval_secs: 60,
max_concurrent: 1,
}
}
}
- Step 2: Add
schedulerfield toGatewayConfig
In GatewayConfig (line 132-143), add after session_db_path:
#[serde(default)]
pub scheduler: Option<SchedulerConfig>,
The full struct becomes:
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GatewayConfig {
#[serde(default = "default_gateway_host")]
pub host: String,
#[serde(default = "default_gateway_port")]
pub port: u16,
#[serde(default, rename = "session_ttl_hours")]
pub session_ttl_hours: Option<u64>,
#[serde(default, rename = "cleanup_interval_minutes")]
pub cleanup_interval_minutes: Option<u64>,
#[serde(default, rename = "session_db_path")]
pub session_db_path: Option<String>,
#[serde(default)]
pub scheduler: Option<SchedulerConfig>,
}
- Step 3: Update
Default for GatewayConfig
In the impl Default for GatewayConfig block (around line 163), add:
scheduler: None,
- Step 4: Verify build
Run: cargo check 2>&1
Expected: Compiles successfully.
- Step 5: Commit
git add src/config/mod.rs
git commit -m "feat: add SchedulerConfig to GatewayConfig"
Task 3: Create Scheduler Data Types
Files:
-
Create:
src/scheduler/types.rs -
Modify:
src/scheduler/mod.rs(stub) -
Modify:
src/lib.rs -
Step 1: Register the scheduler module
In src/lib.rs, after pub mod providers; (line 14), add:
pub mod scheduler;
- Step 2: Create stub
src/scheduler/mod.rs
pub mod types;
pub mod store;
pub mod tools;
pub use types::{JobRun, Schedule, ScheduledJob};
- Step 3: Define types in
src/scheduler/types.rs
use serde::{Deserialize, Serialize};
/// How a job is scheduled. Serialized as JSON in the database `schedule` column.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum Schedule {
/// One-shot: fires once at a specific Unix millisecond timestamp, then disables.
#[serde(rename = "at")]
At { at: i64 },
/// Recurring: fires every `every_ms` milliseconds.
#[serde(rename = "every")]
Every { every_ms: u64 },
/// Recurring: fires on a cron schedule with optional timezone.
#[serde(rename = "cron")]
Cron { expr: String, tz: Option<String> },
}
/// A scheduled job stored in the database.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScheduledJob {
pub id: String,
pub name: String,
/// JSON-serialized `Schedule` stored as TEXT in SQLite.
pub schedule: Schedule,
pub prompt: String,
pub channel: String,
pub chat_id: String,
pub model: Option<String>,
pub enabled: bool,
pub delete_after_run: bool,
pub next_run_at: i64,
pub last_run_at: Option<i64>,
pub last_status: Option<String>,
pub last_error: Option<String>,
pub created_at: i64,
pub updated_at: i64,
}
/// A single execution record for a job.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JobRun {
pub id: i64,
pub job_id: String,
pub started_at: i64,
pub finished_at: i64,
pub status: String,
pub output: Option<String>,
pub error: Option<String>,
pub duration_ms: i64,
}
- Step 4: Verify build
Run: cargo check 2>&1
Expected: Compiles successfully.
- Step 5: Commit
git add src/lib.rs src/scheduler/
git commit -m "feat: add scheduler data types (Schedule, ScheduledJob, JobRun)"
Task 4: Add ChatMessage::user_with_source and Session::create_user_message_with_source
Files:
-
Modify:
src/bus/message.rs:198(afterpub fn tool(...)) -
Modify:
src/session/session.rs:252(aftercreate_user_message) -
Step 1: Write the failing test
Create inline test in src/bus/message.rs, immediately before the closing } of the impl ChatMessage block (after line 197):
pub fn user_with_source(content: impl Into<String>, source: MessageSource) -> Self {
Self {
id: uuid::Uuid::new_v4().to_string(),
role: "user".to_string(),
content: content.into(),
media_refs: Vec::new(),
timestamp: current_timestamp(),
tool_call_id: None,
tool_name: None,
tool_calls: None,
source: Some(source),
}
}
No test for this factory method separately — it's pure data construction.
- Step 2: Write the failing test for
create_user_message_with_source
In src/session/session.rs, after create_user_message (line 252), add this test:
pub fn create_user_message_with_source(&self, content: &str, media_refs: Vec<String>, source: crate::bus::MessageSource) -> ChatMessage {
if media_refs.is_empty() {
ChatMessage::user_with_source(content, source)
} else {
// For simplicity, ignore media in cron messages (media is always empty from scheduler)
ChatMessage::user_with_source(content, source)
}
}
No test for this separately — it's used in handle_cron_message which gets tested via integration.
- Step 3: Verify build
Run: cargo check 2>&1
Expected: Compiles successfully.
- Step 4: Commit
git add src/bus/message.rs src/session/session.rs
git commit -m "feat: add ChatMessage::user_with_source and Session::create_user_message_with_source"
Task 5: Create SchedulerStore (SQLite Schema + CRUD)
Files:
-
Create:
src/scheduler/store.rs -
Step 1: Write the failing test
Write inline tests at the bottom of src/scheduler/store.rs. These test SchedulerStore against an in-memory SQLite database:
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::types::{Schedule, ScheduledJob, JobRun};
use sqlx::SqlitePool;
fn now() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}
async fn setup_pool() -> SqlitePool {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
SchedulerStore::init(&pool).await.unwrap();
pool
}
#[tokio::test]
async fn test_init_creates_tables() {
let pool = setup_pool().await;
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scheduled_jobs")
.fetch_one(&pool).await.unwrap();
assert_eq!(row.0, 0);
}
#[tokio::test]
async fn test_add_and_get_job() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-1".into(),
name: "test job".into(),
schedule: Schedule::Every { every_ms: 3600000 },
prompt: "say hello".into(),
channel: "cli_chat".into(),
chat_id: "conn-1".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t + 3600000,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
let got = SchedulerStore::get_job(&pool, "job-1").await.unwrap();
assert_eq!(got.id, "job-1");
assert_eq!(got.name, "test job");
assert_eq!(got.prompt, "say hello");
}
#[tokio::test]
async fn test_list_jobs() {
let pool = setup_pool().await;
let t = now();
for i in 0..3 {
let job = ScheduledJob {
id: format!("job-{}", i),
name: format!("job {}", i),
schedule: Schedule::Every { every_ms: 3600000 },
prompt: "ping".into(),
channel: "cli_chat".into(),
chat_id: "conn-1".into(),
model: None, enabled: true, delete_after_run: false,
next_run_at: t + 1000, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
}
let jobs = SchedulerStore::list_jobs(&pool).await.unwrap();
assert_eq!(jobs.len(), 3);
}
#[tokio::test]
async fn test_remove_job() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-rm".into(), name: "remove me".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
model: None, enabled: true, delete_after_run: false,
next_run_at: t, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
SchedulerStore::remove_job(&pool, "job-rm").await.unwrap();
let result = SchedulerStore::get_job(&pool, "job-rm").await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_set_enabled() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-toggle".into(), name: "toggle".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
model: None, enabled: true, delete_after_run: false,
next_run_at: t, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
SchedulerStore::set_enabled(&pool, "job-toggle", false).await.unwrap();
let got = SchedulerStore::get_job(&pool, "job-toggle").await.unwrap();
assert!(!got.enabled);
}
#[tokio::test]
async fn test_due_jobs_only_returns_enabled_and_overdue() {
let pool = setup_pool().await;
let t = now();
// Three jobs: due, not-due-yet, disabled-but-due
let jobs = vec![
ScheduledJob {
id: "due".into(), name: "due".into(),
schedule: Schedule::At { at: t }, prompt: "1".into(),
channel: "cli_chat".into(), chat_id: "c".into(),
model: None, enabled: true, delete_after_run: false,
next_run_at: t - 1000, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
},
ScheduledJob {
id: "future".into(), name: "future".into(),
schedule: Schedule::At { at: t + 99999999 }, prompt: "2".into(),
channel: "cli_chat".into(), chat_id: "c".into(),
model: None, enabled: true, delete_after_run: false,
next_run_at: t + 99999999, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
},
ScheduledJob {
id: "disabled-due".into(), name: "disabled due".into(),
schedule: Schedule::At { at: t }, prompt: "3".into(),
channel: "cli_chat".into(), chat_id: "c".into(),
model: None, enabled: false, delete_after_run: false,
next_run_at: t - 1000, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
},
];
for j in &jobs {
SchedulerStore::add_job(&pool, j).await.unwrap();
}
let due = SchedulerStore::due_jobs(&pool, t, 10).await.unwrap();
assert_eq!(due.len(), 1);
assert_eq!(due[0].id, "due");
}
#[tokio::test]
async fn test_record_run_and_list_runs() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-run".into(), name: "run test".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
model: None, enabled: true, delete_after_run: false,
next_run_at: t, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
let run = JobRun {
id: 0, job_id: "job-run".into(),
started_at: t, finished_at: t + 500,
status: "ok".into(), output: Some("hello".into()),
error: None, duration_ms: 500,
};
SchedulerStore::record_run(&pool, &run).await.unwrap();
let runs = SchedulerStore::list_runs(&pool, "job-run", 10).await.unwrap();
assert_eq!(runs.len(), 1);
assert_eq!(runs[0].status, "ok");
assert_eq!(runs[0].output.as_deref(), Some("hello"));
}
#[tokio::test]
async fn test_update_job() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-update".into(), name: "old name".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "old prompt".into(), channel: "feishu".into(),
chat_id: "oc_1".into(), model: None,
enabled: true, delete_after_run: false,
next_run_at: t, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
SchedulerStore::update_job(
&pool, "job-update",
Some("new prompt".into()),
Some(Schedule::Every { every_ms: 60000 }),
None, None, None,
).await.unwrap();
let got = SchedulerStore::get_job(&pool, "job-update").await.unwrap();
assert_eq!(got.prompt, "new prompt");
}
}
- Step 2: Run tests to verify they fail
Run: cargo test --lib scheduler::store -- 2>&1
Expected: All tests FAIL because SchedulerStore and its methods don't exist yet (compilation errors).
- Step 3: Implement
SchedulerStore
Write src/scheduler/store.rs:
use sqlx::Row;
use sqlx::SqlitePool;
use super::types::{JobRun, Schedule, ScheduledJob};
/// Persistence layer for scheduled jobs and run history.
/// Uses a shared `sqlx::SqlitePool` (obtained from `crate::storage::Storage::pool()`).
pub struct SchedulerStore;
impl SchedulerStore {
/// Initialize the scheduler tables. Idempotent (CREATE TABLE IF NOT EXISTS).
pub async fn init(pool: &SqlitePool) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS scheduled_jobs (
id TEXT PRIMARY KEY,
name TEXT NOT NULL,
schedule TEXT NOT NULL,
prompt TEXT NOT NULL,
channel TEXT NOT NULL,
chat_id TEXT NOT NULL,
model TEXT,
enabled INTEGER NOT NULL DEFAULT 1,
delete_after_run INTEGER NOT NULL DEFAULT 0,
next_run_at INTEGER NOT NULL,
last_run_at INTEGER,
last_status TEXT,
last_error TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS job_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
job_id TEXT NOT NULL REFERENCES scheduled_jobs(id) ON DELETE CASCADE,
started_at INTEGER NOT NULL,
finished_at INTEGER NOT NULL,
status TEXT NOT NULL,
output TEXT,
error TEXT,
duration_ms INTEGER NOT NULL
)
"#,
)
.execute(pool)
.await?;
sqlx::query(
"CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(enabled, next_run_at)",
)
.execute(pool)
.await?;
sqlx::query("CREATE INDEX IF NOT EXISTS idx_runs_job_id ON job_runs(job_id)")
.execute(pool)
.await?;
Ok(())
}
/// Insert a new job. Returns an error if a job with the same ID already exists.
pub async fn add_job(
pool: &SqlitePool,
job: &ScheduledJob,
) -> Result<(), Box<dyn std::error::Error>> {
let schedule_json = serde_json::to_string(&job.schedule)?;
sqlx::query(
r#"
INSERT INTO scheduled_jobs
(id, name, schedule, prompt, channel, chat_id, model,
enabled, delete_after_run, next_run_at, last_run_at,
last_status, last_error, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(&job.id)
.bind(&job.name)
.bind(&schedule_json)
.bind(&job.prompt)
.bind(&job.channel)
.bind(&job.chat_id)
.bind(&job.model)
.bind(job.enabled as i32)
.bind(job.delete_after_run as i32)
.bind(job.next_run_at)
.bind(job.last_run_at)
.bind(&job.last_status)
.bind(&job.last_error)
.bind(job.created_at)
.bind(job.updated_at)
.execute(pool)
.await?;
Ok(())
}
/// Fetch a single job by ID.
pub async fn get_job(
pool: &SqlitePool,
id: &str,
) -> Result<ScheduledJob, Box<dyn std::error::Error>> {
let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?")
.bind(id)
.fetch_optional(pool)
.await?
.ok_or_else(|| format!("job not found: {id}"))?;
Ok(row_to_job(&row)?)
}
/// List all jobs, ordered by next_run_at ascending.
pub async fn list_jobs(
pool: &SqlitePool,
) -> Result<Vec<ScheduledJob>, Box<dyn std::error::Error>> {
let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC")
.fetch_all(pool)
.await?;
rows.iter().map(row_to_job).collect()
}
/// Delete a job (cascades to job_runs).
pub async fn remove_job(
pool: &SqlitePool,
id: &str,
) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?")
.bind(id)
.execute(pool)
.await?;
Ok(())
}
/// Enable or disable a job.
pub async fn set_enabled(
pool: &SqlitePool,
id: &str,
enabled: bool,
) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?")
.bind(enabled as i32)
.bind(now_ms())
.bind(id)
.execute(pool)
.await?;
Ok(())
}
/// Update selective fields on a job. Pass `None` for fields that should not change.
#[allow(clippy::too_many_arguments)]
pub async fn update_job(
pool: &SqlitePool,
id: &str,
prompt: Option<String>,
schedule: Option<Schedule>,
channel: Option<String>,
chat_id: Option<String>,
model: Option<String>,
) -> Result<(), Box<dyn std::error::Error>> {
let now = now_ms();
if let Some(p) = prompt {
sqlx::query(
"UPDATE scheduled_jobs SET prompt = ?, updated_at = ? WHERE id = ?",
)
.bind(&p)
.bind(now)
.bind(id)
.execute(pool)
.await?;
}
if let Some(s) = schedule {
let json = serde_json::to_string(&s)?;
sqlx::query(
"UPDATE scheduled_jobs SET schedule = ?, updated_at = ? WHERE id = ?",
)
.bind(&json)
.bind(now)
.bind(id)
.execute(pool)
.await?;
}
if let Some(c) = channel {
sqlx::query(
"UPDATE scheduled_jobs SET channel = ?, updated_at = ? WHERE id = ?",
)
.bind(&c)
.bind(now)
.bind(id)
.execute(pool)
.await?;
}
if let Some(c) = chat_id {
sqlx::query(
"UPDATE scheduled_jobs SET chat_id = ?, updated_at = ? WHERE id = ?",
)
.bind(&c)
.bind(now)
.bind(id)
.execute(pool)
.await?;
}
if let Some(m) = model {
sqlx::query(
"UPDATE scheduled_jobs SET model = ?, updated_at = ? WHERE id = ?",
)
.bind(&m)
.bind(now)
.bind(id)
.execute(pool)
.await?;
}
Ok(())
}
/// Update next_run_at and last_run_at for a job (used during reschedule).
pub async fn set_next_run(
pool: &SqlitePool,
id: &str,
next_run_at: i64,
) -> Result<(), Box<dyn std::error::Error>> {
let now = now_ms();
sqlx::query(
"UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?",
)
.bind(next_run_at)
.bind(now)
.bind(now)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
/// Set last_run_at and optionally last_status / last_error (used when starting job execution).
pub async fn touch_last_run(
pool: &SqlitePool,
id: &str,
at: i64,
) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query(
"UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?",
)
.bind(at)
.bind(at)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
/// Set last_status and last_error after job completion.
pub async fn set_last_status(
pool: &SqlitePool,
id: &str,
status: &str,
error: Option<&str>,
) -> Result<(), Box<dyn std::error::Error>> {
let now = now_ms();
sqlx::query(
"UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?",
)
.bind(status)
.bind(error)
.bind(now)
.bind(id)
.execute(pool)
.await?;
Ok(())
}
/// Fetch enabled jobs whose next_run_at <= now, up to `limit`.
pub async fn due_jobs(
pool: &SqlitePool,
now: i64,
limit: usize,
) -> Result<Vec<ScheduledJob>, Box<dyn std::error::Error>> {
let rows = sqlx::query(
"SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?",
)
.bind(now)
.bind(limit as i64)
.fetch_all(pool)
.await?;
rows.iter().map(row_to_job).collect()
}
/// Record a run execution.
pub async fn record_run(
pool: &SqlitePool,
run: &JobRun,
) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query(
r#"
INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms)
VALUES (?, ?, ?, ?, ?, ?, ?)
"#,
)
.bind(&run.job_id)
.bind(run.started_at)
.bind(run.finished_at)
.bind(&run.status)
.bind(&run.output)
.bind(&run.error)
.bind(run.duration_ms)
.execute(pool)
.await?;
Ok(())
}
/// List the most recent `limit` runs for a job, newest first.
pub async fn list_runs(
pool: &SqlitePool,
job_id: &str,
limit: usize,
) -> Result<Vec<JobRun>, Box<dyn std::error::Error>> {
let rows = sqlx::query(
"SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?",
)
.bind(job_id)
.bind(limit as i64)
.fetch_all(pool)
.await?;
rows.iter()
.map(|r| {
Ok(JobRun {
id: r.try_get("id")?,
job_id: r.try_get("job_id")?,
started_at: r.try_get("started_at")?,
finished_at: r.try_get("finished_at")?,
status: r.try_get("status")?,
output: r.try_get("output")?,
error: r.try_get("error")?,
duration_ms: r.try_get("duration_ms")?,
})
})
.collect()
}
/// Delete jobs created before `before` that are disabled.
pub async fn cleanup_disabled(
pool: &SqlitePool,
before: i64,
) -> Result<(), Box<dyn std::error::Error>> {
sqlx::query(
"DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?",
)
.bind(before)
.execute(pool)
.await?;
Ok(())
}
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> Result<ScheduledJob, Box<dyn std::error::Error>> {
let schedule_json: String = row.try_get("schedule")?;
let schedule: Schedule = serde_json::from_str(&schedule_json)?;
Ok(ScheduledJob {
id: row.try_get("id")?,
name: row.try_get("name")?,
schedule,
prompt: row.try_get("prompt")?,
channel: row.try_get("channel")?,
chat_id: row.try_get("chat_id")?,
model: row.try_get("model")?,
enabled: row.try_get::<i32, _>("enabled")? != 0,
delete_after_run: row.try_get::<i32, _>("delete_after_run")? != 0,
next_run_at: row.try_get("next_run_at")?,
last_run_at: row.try_get("last_run_at")?,
last_status: row.try_get("last_status")?,
last_error: row.try_get("last_error")?,
created_at: row.try_get("created_at")?,
updated_at: row.try_get("updated_at")?,
})
}
- Step 4: Run tests to verify they pass
Run: cargo test --lib scheduler::store -- 2>&1
Expected: All 7 tests PASS.
- Step 5: Commit
git add src/scheduler/store.rs
git commit -m "feat: add SchedulerStore with SQLite schema and CRUD"
Task 6: Add handle_cron_message to SessionManager
Files:
-
Modify:
src/session/session.rs:1247(afterhandle_message) -
Step 1: Write the failing test
In src/session/session.rs, add this test inside the existing #[cfg(test)] mod tests block. If there is no existing test module, add it at the bottom of the file:
// Note: this test verifies only the compile-time signature. Full integration
// testing of handle_cron_message requires a running Gateway (see integration test).
#[tokio::test]
async fn test_handle_cron_message_exists() {
// Compile-time assertion that the method exists on SessionManager
// The actual behavior is tested in the integration test
assert!(true);
}
- Step 2: Run test to verify it fails
Run: cargo test --lib session::session::test_handle_cron_message_exists -- 2>&1
Expected: PASS (trivial test, always passes). The real verification is that cargo check succeeds after we add the method in step 3.
- Step 3: Add
handle_cron_messagemethod
In src/session/session.rs, add after handle_message (after line 1247):
/// Handle a message triggered by a scheduled cron job.
///
/// This is similar to `handle_message`, but the user message is created with
/// `SourceKind::ExternalTrigger` source metadata so that the cron job identity
/// is preserved in the conversation history and database.
pub async fn handle_cron_message(
&self,
channel: &str,
chat_id: &str,
prompt: &str,
job_id: &str,
job_name: &str,
) -> Result<HandleResult, AgentError> {
use crate::bus::{MessageSource, SourceKind};
let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
*self.current_source_session.lock().await = Some(unified_id.to_string());
tracing::debug!(unified_id = %unified_id, job_id = %job_id, "handle_cron_message resolved");
let session = self.get_or_create_session(&unified_id).await?;
// Normal message handling through LLM (cron messages skip slash command check)
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
// Spawn notification publisher
{
use std::collections::HashMap;
use crate::bus::OutboundMessage;
let bus = self.bus.clone();
let ch = channel.to_string();
let cid = chat_id.to_string();
tokio::spawn(async move {
while let Some(notif) = notify_rx.recv().await {
let mut metadata = HashMap::new();
metadata.insert("_type".to_string(), "notification".to_string());
let outbound = OutboundMessage {
channel: ch.clone(),
chat_id: cid.clone(),
content: notif,
reply_to: None,
media: vec![],
metadata,
};
let _ = bus.publish_outbound(outbound).await;
}
});
}
let response: String = {
let mut session_guard = session.lock().await;
// Build the user message with ExternalTrigger source
let source = MessageSource {
kind: SourceKind::ExternalTrigger,
from_channel: Some(channel.to_string()),
from_session: None,
from_user_id: None,
system_name: Some(job_name.to_string()),
task_id: Some(job_id.to_string()),
};
let user_message = session_guard.create_user_message_with_source(prompt, vec![], source);
session_guard.add_message(user_message, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
let mut history = session_guard.get_history().to_vec();
let skills_prompt = self.skills_loader.build_skills_prompt();
let system_prompt = session_guard.build_system_prompt(&skills_prompt);
history.insert(0, ChatMessage::system(system_prompt));
let history = session_guard.compressor
.compress_if_needed(history)
.await?;
let agent = session_guard.create_agent_with_notify(notify_tx)?;
let result = agent.process(history).await?;
for msg in result.emitted_messages {
session_guard.add_message(msg, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
}
if session_guard.should_generate_title() {
if let Err(e) = session_guard.generate_title().await {
tracing::warn!("failed to generate title: {}", e);
}
}
result.final_response.content
};
#[cfg(debug_assertions)]
tracing::debug!(
channel = %channel,
chat_id = %chat_id,
job_id = %job_id,
response_len = %response.len(),
"Cron agent response received"
);
*self.current_source_session.lock().await = None;
Ok(HandleResult::AgentResponse(response))
}
- Step 4: Verify build
Run: cargo check 2>&1
Expected: Compiles successfully.
- Step 5: Commit
git add src/session/session.rs
git commit -m "feat: add SessionManager::handle_cron_message for scheduled task execution"
Task 7: Implement next_run_for_schedule and Scheduler Loop
Files:
-
Modify:
src/scheduler/mod.rs(replace stub) -
Step 1: Write the failing test
In src/scheduler/mod.rs, add inline tests:
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_next_run_at_schedule() {
let now = 1000000;
let next = next_run_for_schedule(&Schedule::At { at: 2000000 }, now);
assert_eq!(next, Some(2000000));
}
#[test]
fn test_next_run_every_schedule() {
let now = 1000000;
let next = next_run_for_schedule(&Schedule::Every { every_ms: 5000 }, now);
assert_eq!(next, Some(1005000));
}
#[test]
fn test_next_run_cron_schedule() {
use chrono::Timelike;
// Schedule: "every minute at second 0"
let expr = "0 * * * * *".to_string();
let schedule = Schedule::Cron { expr, tz: None };
// Use a known time
let now = 1000000;
let next = next_run_for_schedule(&schedule, now);
assert!(next.is_some());
assert!(next.unwrap() > now);
}
#[test]
fn test_next_run_cron_every_day_at_9am() {
// "0 0 9 * * *" = every day at 9:00:00
let expr = "0 0 9 * * *".to_string();
let schedule = Schedule::Cron { expr, tz: None };
let now = 1000000;
let next = next_run_for_schedule(&schedule, now);
assert!(next.is_some());
let next_ms = next.unwrap();
assert!(next_ms > now);
// Round-trip to DateTime to check hour
let next_dt = ms_to_datetime(next_ms);
assert_eq!(next_dt.hour(), 9);
assert_eq!(next_dt.minute(), 0);
}
}
- Step 2: Run tests to verify they fail
Run: cargo test --lib scheduler::mod -- 2>&1
Expected: FAIL — next_run_for_schedule not defined.
- Step 3: Implement the full
src/scheduler/mod.rs
pub mod types;
pub mod store;
pub mod tools;
use std::sync::Arc;
use std::time::Instant;
use tokio::time;
use crate::config::SchedulerConfig;
use crate::session::session::{self, HandleResult};
use crate::session::SessionManager;
pub use types::{JobRun, Schedule, ScheduledJob};
/// Compute the next execution time (Unix ms) for a schedule, given `from` (Unix ms).
/// Returns `None` if no next time can be determined (e.g., invalid cron expression).
pub fn next_run_for_schedule(schedule: &Schedule, from: i64) -> Option<i64> {
use chrono::{DateTime, TimeZone, Utc};
match schedule {
Schedule::At { at } => Some(*at),
Schedule::Every { every_ms } => Some(from + *every_ms as i64),
Schedule::Cron { expr, tz } => {
let schedule = cron::Schedule::from_str(expr.as_str()).ok()?;
// Convert Unix ms to UTC DateTime
let from_secs = (from / 1000) as i64;
let from_nanos = ((from % 1000) * 1_000_000) as u32;
let from_dt = Utc.timestamp_opt(from_secs, from_nanos).single()?;
// If timezone is specified, convert from local to UTC for comparison
let next_utc = if let Some(ref tz_str) = tz {
let tz: chrono_tz::Tz = tz_str.parse().ok()?;
let from_local = from_dt.with_timezone(&tz);
// Find the next match in the given timezone, then convert back to UTC
let next_local = schedule.upcoming(tz).next()?;
next_local.with_timezone(&Utc)
} else {
schedule.upcoming(Utc).next()?
};
Some(next_utc.timestamp_millis())
}
}
}
/// Convert Unix milliseconds to DateTime<Utc>.
fn ms_to_datetime(ms: i64) -> chrono::DateTime<chrono::Utc> {
use chrono::{TimeZone, Utc};
let secs = (ms / 1000) as i64;
let nanos = ((ms % 1000) * 1_000_000) as u32;
Utc.timestamp_opt(secs, nanos).single().unwrap_or_default()
}
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
/// The scheduler runs as a background tokio task, periodically checking for due jobs
/// and executing them via `SessionManager::handle_cron_message`.
pub struct Scheduler {
pool: sqlx::SqlitePool,
session_manager: Arc<SessionManager>,
config: SchedulerConfig,
}
impl Scheduler {
pub fn new(
pool: sqlx::SqlitePool,
session_manager: Arc<SessionManager>,
config: SchedulerConfig,
) -> Self {
Self {
pool,
session_manager,
config,
}
}
/// Run the scheduler loop. This is a long-running async function meant to be
/// spawned as a tokio background task.
pub async fn run(self: Arc<Self>) {
let poll_duration = time::Duration::from_secs(self.config.poll_interval_secs);
let mut interval = time::interval(poll_duration);
// Skip the immediate first tick (tokio::time::interval fires immediately on first poll)
interval.tick().await;
tracing::info!(
"Scheduler started (poll interval: {}s, max concurrent: {})",
self.config.poll_interval_secs,
self.config.max_concurrent,
);
loop {
interval.tick().await;
let now = now_ms();
let due = match store::SchedulerStore::due_jobs(&self.pool, now, self.config.max_concurrent).await {
Ok(jobs) => jobs,
Err(e) => {
tracing::error!("scheduler: failed to query due jobs: {}", e);
continue;
}
};
if due.is_empty() {
continue;
}
tracing::info!("scheduler: found {} due job(s)", due.len());
for job in &due {
let start = Instant::now();
let started_at = now_ms();
// Update last_run_at so next poll doesn't re-execute
if let Err(e) = store::SchedulerStore::touch_last_run(&self.pool, &job.id, started_at).await {
tracing::error!(job_id = %job.id, "scheduler: failed to touch last_run_at: {}", e);
continue;
}
tracing::info!(
job_id = %job.id,
job_name = %job.name,
"scheduler: executing cron job"
);
let result = self
.session_manager
.handle_cron_message(
&job.channel,
&job.chat_id,
&job.prompt,
&job.id,
&job.name,
)
.await;
let finished_at = now_ms();
let duration_ms = start.elapsed().as_millis() as i64;
match result {
Ok(HandleResult::AgentResponse(output)) => {
let output_truncated = if output.len() > 8000 {
format!("{}...[truncated]", &output[..8000])
} else {
output.clone()
};
let run = JobRun {
id: 0,
job_id: job.id.clone(),
started_at,
finished_at,
status: "ok".to_string(),
output: Some(output_truncated),
error: None,
duration_ms,
};
if let Err(e) = store::SchedulerStore::record_run(&self.pool, &run).await {
tracing::error!(job_id = %job.id, "scheduler: failed to record run: {}", e);
}
if let Err(e) = store::SchedulerStore::set_last_status(&self.pool, &job.id, "ok", None).await {
tracing::error!(job_id = %job.id, "scheduler: failed to set last_status: {}", e);
}
tracing::info!(
job_id = %job.id,
duration_ms = %duration_ms,
"scheduler: job completed successfully"
);
}
Ok(HandleResult::CommandOutput(output)) => {
// Cron jobs shouldn't trigger commands, but handle gracefully
let run = JobRun {
id: 0,
job_id: job.id.clone(),
started_at,
finished_at,
status: "ok".to_string(),
output: Some(output),
error: None,
duration_ms,
};
let _ = store::SchedulerStore::record_run(&self.pool, &run).await;
}
Err(e) => {
let error_str = e.to_string();
let run = JobRun {
id: 0,
job_id: job.id.clone(),
started_at,
finished_at,
status: "error".to_string(),
output: None,
error: Some(error_str.clone()),
duration_ms,
};
if let Err(e2) = store::SchedulerStore::record_run(&self.pool, &run).await {
tracing::error!(job_id = %job.id, "scheduler: failed to record error run: {}", e2);
}
if let Err(e2) = store::SchedulerStore::set_last_status(
&self.pool, &job.id, "error", Some(&error_str),
).await {
tracing::error!(job_id = %job.id, "scheduler: failed to set error status: {}", e2);
}
tracing::error!(
job_id = %job.id,
duration_ms = %duration_ms,
error = %error_str,
"scheduler: job failed"
);
}
}
// Reschedule the job
if let Err(e) = self.reschedule_after_run(job).await {
tracing::error!(job_id = %job.id, "scheduler: failed to reschedule: {}", e);
}
}
}
}
/// After a job runs, compute its next execution time or disable/delete it.
async fn reschedule_after_run(
&self,
job: &ScheduledJob,
) -> Result<(), Box<dyn std::error::Error>> {
let now = now_ms();
match &job.schedule {
Schedule::At { .. } => {
if job.delete_after_run {
store::SchedulerStore::remove_job(&self.pool, &job.id).await?;
tracing::info!(job_id = %job.id, "scheduler: one-shot job deleted after run");
} else {
store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?;
tracing::info!(job_id = %job.id, "scheduler: one-shot job disabled after run");
}
}
Schedule::Every { .. } | Schedule::Cron { .. } => {
if let Some(next) = next_run_for_schedule(&job.schedule, now) {
store::SchedulerStore::set_next_run(&self.pool, &job.id, next).await?;
tracing::info!(job_id = %job.id, next_run_at = %next, "scheduler: job rescheduled");
} else {
tracing::error!(job_id = %job.id, "scheduler: could not compute next run — disabling job");
store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?;
}
}
}
Ok(())
}
}
- Step 4: Run tests to verify they pass
Run: cargo test --lib scheduler::mod -- 2>&1
Expected: All 5 tests PASS (next_run_at_schedule, next_run_every_schedule, next_run_cron_schedule, next_run_cron_every_day_at_9am, and the compile-time assertion).
- Step 5: Commit
git add src/scheduler/mod.rs
git commit -m "feat: add Scheduler run loop and next_run_for_schedule"
Task 8: Wire Scheduler into Gateway
Files:
-
Modify:
src/gateway/mod.rs -
Step 1: Import scheduler module
In src/gateway/mod.rs, add import after use crate::session::SessionManager; (line 13):
use crate::scheduler::Scheduler;
use crate::scheduler::store as scheduler_store;
- Step 2: Create scheduler in
GatewayState::new()
In GatewayState::new() (after line 76 — after session_manager.register_outbound_tool(...)), add:
// Initialize scheduler if enabled in config
let scheduler_config = config.gateway.scheduler.clone().unwrap_or_default();
if scheduler_config.enabled {
// Initialize scheduler tables in the database
scheduler_store::SchedulerStore::init(storage.pool())
.await
.map_err(|e| format!("failed to initialize scheduler store: {}", e))?;
tracing::info!("Scheduler store initialized");
}
- Step 3: Spawn scheduler in
start_message_processing()
In start_message_processing() (after line 170 — after the outbound dispatcher spawn), add:
// Spawn scheduler background task if enabled
let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default();
if scheduler_config.enabled {
let sched = Arc::new(Scheduler::new(
storage.pool().clone(),
self.session_manager.clone(),
scheduler_config,
));
tokio::spawn(async move {
sched.run().await;
});
tracing::info!("Scheduler background task spawned");
}
Wait — there's a problem. start_message_processing takes &self, and storage is not a field of GatewayState. We need to pass the pool reference to start_message_processing.
Let me adjust start_message_processing to accept a pool: sqlx::SqlitePool parameter, or store the pool in GatewayState. Looking at the existing code, Storage is only referenced in GatewayState::new() — it's not stored as a field. We need the pool.
Solution: Store sqlx::SqlitePool directly in GatewayState.
- Step 3a: Add
poolfield toGatewayState
pub struct GatewayState {
pub config: Config,
pub workspace_dir: std::path::PathBuf,
pub session_manager: Arc<SessionManager>,
pub channel_manager: ChannelManager,
pub pool: sqlx::SqlitePool, // <-- add this
}
- Step 3b: Set pool in
GatewayState::new()
After creating storage (line 53), clone the pool:
let pool = storage.pool().clone();
Then in the Ok(Self { ... }) block, add:
pool,
- Step 3c: Use pool in
start_message_processing
After the outbound dispatcher spawn block (after line 170), add:
// Spawn scheduler background task if enabled
let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default();
if scheduler_config.enabled {
let sched = Arc::new(Scheduler::new(
self.pool.clone(),
self.session_manager.clone(),
scheduler_config,
));
tokio::spawn(async move {
sched.run().await;
});
tracing::info!("Scheduler background task spawned");
}
- Step 4: Verify build
Run: cargo check 2>&1
Expected: Compiles successfully.
- Step 5: Commit
git add src/gateway/mod.rs
git commit -m "feat: wire scheduler into GatewayState startup and message processing"
Task 9: Implement Agent Tools
Files:
-
Create:
src/scheduler/tools.rs -
Modify:
src/gateway/mod.rs(register tools) -
Step 1: Write the failing test
At the bottom of src/scheduler/tools.rs, add:
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::types::{Schedule, ScheduledJob};
use crate::scheduler::store::SchedulerStore;
use serde_json::json;
use sqlx::SqlitePool;
async fn setup_pool() -> SqlitePool {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
SchedulerStore::init(&pool).await.unwrap();
pool
}
fn now() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}
#[tokio::test]
async fn test_cron_add_tool() {
let pool = setup_pool().await;
let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]);
let result = tool.execute(json!({
"schedule": {"type": "every", "every_ms": 3600000},
"prompt": "report status",
"channel": "cli_chat",
"chat_id": "test-chat-1",
"name": "hourly report"
})).await.unwrap();
assert!(result.success);
assert!(result.output.contains("hourly report"));
let jobs = SchedulerStore::list_jobs(&pool).await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].name, "hourly report");
}
#[tokio::test]
async fn test_cron_add_invalid_channel() {
let pool = setup_pool().await;
let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]);
let result = tool.execute(json!({
"schedule": {"type": "every", "every_ms": 3600000},
"prompt": "test",
"channel": "nonexistent",
"chat_id": "x",
"name": "test"
})).await.unwrap();
assert!(!result.success);
assert!(result.error.as_ref().unwrap().contains("Unknown channel"));
}
#[tokio::test]
async fn test_cron_list_tool() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: uuid::Uuid::new_v4().to_string(),
name: "list-test".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
model: None, enabled: true, delete_after_run: false,
next_run_at: t + 1000, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
let tool = CronListTool::new(pool.clone());
let result = tool.execute(json!({})).await.unwrap();
assert!(result.success);
assert!(result.output.contains("list-test"));
}
#[tokio::test]
async fn test_cron_remove_tool() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-rm-tool".into(), name: "rm me".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
model: None, enabled: true, delete_after_run: false,
next_run_at: t, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
let tool = CronRemoveTool::new(pool.clone());
let result = tool.execute(json!({"job_id": "job-rm-tool"})).await.unwrap();
assert!(result.success);
assert!(SchedulerStore::get_job(&pool, "job-rm-tool").await.is_err());
}
#[tokio::test]
async fn test_cron_enable_disable_tools() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-toggle-tool".into(), name: "toggle".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
model: None, enabled: true, delete_after_run: false,
next_run_at: t, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
let disable_tool = CronDisableTool::new(pool.clone());
let result = disable_tool.execute(json!({"job_id": "job-toggle-tool"})).await.unwrap();
assert!(result.success);
let got = SchedulerStore::get_job(&pool, "job-toggle-tool").await.unwrap();
assert!(!got.enabled);
let enable_tool = CronEnableTool::new(pool.clone());
let result = enable_tool.execute(json!({"job_id": "job-toggle-tool"})).await.unwrap();
assert!(result.success);
let got = SchedulerStore::get_job(&pool, "job-toggle-tool").await.unwrap();
assert!(got.enabled);
}
#[tokio::test]
async fn test_cron_update_tool() {
let pool = setup_pool().await;
let t = now();
let job = ScheduledJob {
id: "job-update-tool".into(), name: "old".into(),
schedule: Schedule::Every { every_ms: 3600000 },
prompt: "old prompt".into(), channel: "feishu".into(),
chat_id: "oc_1".into(), model: None,
enabled: true, delete_after_run: false,
next_run_at: t + 1000, last_run_at: None,
last_status: None, last_error: None,
created_at: t, updated_at: t,
};
SchedulerStore::add_job(&pool, &job).await.unwrap();
let tool = CronUpdateTool::new(pool.clone());
let result = tool.execute(json!({
"job_id": "job-update-tool",
"prompt": "new prompt",
"schedule": {"type": "every", "every_ms": 60000}
})).await.unwrap();
assert!(result.success);
let got = SchedulerStore::get_job(&pool, "job-update-tool").await.unwrap();
assert_eq!(got.prompt, "new prompt");
}
}
- Step 2: Run tests to verify they fail
Run: cargo test --lib scheduler::tools -- 2>&1
Expected: FAIL — tool structs not defined.
- Step 3: Implement
src/scheduler/tools.rs
use async_trait::async_trait;
use serde_json::{json, Value};
use sqlx::SqlitePool;
use uuid::Uuid;
use crate::scheduler::store::SchedulerStore;
use crate::scheduler::types::{Schedule, ScheduledJob};
use crate::tools::traits::{Tool, ToolResult};
use crate::scheduler::next_run_for_schedule;
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
// ── CronAddTool ──────────────────────────────────────────────────────────────
pub struct CronAddTool {
pool: SqlitePool,
valid_channels: Vec<String>,
}
impl CronAddTool {
pub fn new(pool: SqlitePool, valid_channels: Vec<String>) -> Self {
Self { pool, valid_channels }
}
}
#[async_trait]
impl Tool for CronAddTool {
fn name(&self) -> &str { "cron_add" }
fn description(&self) -> &str {
"Create a new scheduled task (cron job). The task will execute an AI prompt on a schedule \
and deliver the result to the specified channel/chat. \
Schedule formats: \
- 'every': {\"type\":\"every\",\"every_ms\":3600000} for every hour, \
- 'at': {\"type\":\"at\",\"at\":<unix_timestamp_ms>} for one-shot, \
- 'cron': {\"type\":\"cron\",\"expr\":\"0 0 9 * * *\"} for cron expressions (6-field: sec min hour dom month dow)."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"schedule": {
"type": "object",
"description": "Schedule definition. One of: {\"type\":\"every\",\"every_ms\":<ms>}, {\"type\":\"at\",\"at\":<unix_ms>}, or {\"type\":\"cron\",\"expr\":\"<cron_expr>\",\"tz\":\"<tz>\"}",
"required": ["type"]
},
"prompt": {
"type": "string",
"description": "The AI prompt to execute on each trigger"
},
"channel": {
"type": "string",
"description": "Target channel for delivering results (e.g., 'feishu', 'cli_chat')"
},
"chat_id": {
"type": "string",
"description": "Target chat ID within the channel"
},
"name": {
"type": "string",
"description": "Human-readable name for the job (optional, defaults to truncated prompt)"
},
"model": {
"type": "string",
"description": "Optional model override for this job"
}
},
"required": ["schedule", "prompt", "channel", "chat_id"]
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let schedule_json = args.get("schedule").ok_or_else(|| anyhow::anyhow!("missing 'schedule'"))?;
let schedule: Schedule = serde_json::from_value(schedule_json.clone())
.map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?;
let prompt = args.get("prompt").and_then(|v| v.as_str()).unwrap_or("").to_string();
if prompt.is_empty() {
return Ok(ToolResult { success: false, output: String::new(), error: Some("prompt is required".into()) });
}
let channel = args.get("channel").and_then(|v| v.as_str()).unwrap_or("").to_string();
if !self.valid_channels.contains(&channel) {
return Ok(ToolResult {
success: false,
output: format!("Unknown channel '{}'. Available: {}",
channel, self.valid_channels.join(", ")),
error: Some(format!("Unknown channel: {}", channel)),
});
}
let chat_id = args.get("chat_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
if chat_id.is_empty() {
return Ok(ToolResult { success: false, output: String::new(), error: Some("chat_id is required".into()) });
}
let name = args.get("name").and_then(|v| v.as_str()).unwrap_or(&prompt[..prompt.len().min(50)]).to_string();
let model = args.get("model").and_then(|v| v.as_str()).map(|s| s.to_string());
let now = now_ms();
let next_run_at = next_run_for_schedule(&schedule, now)
.ok_or_else(|| anyhow::anyhow!("could not compute next run time from schedule"))?;
let id = Uuid::new_v4().to_string();
let job = ScheduledJob {
id: id.clone(),
name: name.clone(),
schedule,
prompt,
channel,
chat_id,
model,
enabled: true,
delete_after_run: false,
next_run_at,
last_run_at: None,
last_status: None,
last_error: None,
created_at: now,
updated_at: now,
};
SchedulerStore::add_job(&self.pool, &job).await?;
Ok(ToolResult {
success: true,
output: format!("Scheduled job created: id={}, name=\"{}\", next_run_at={}", id, name, next_run_at),
error: None,
})
}
}
// ── CronListTool ─────────────────────────────────────────────────────────────
pub struct CronListTool {
pool: SqlitePool,
}
impl CronListTool {
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
}
#[async_trait]
impl Tool for CronListTool {
fn name(&self) -> &str { "cron_list" }
fn description(&self) -> &str {
"List all scheduled tasks (cron jobs) with their status and next run time."
}
fn read_only(&self) -> bool { true }
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["all", "enabled", "disabled"],
"description": "Filter by job status (default: all)"
}
}
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let filter = args.get("status").and_then(|v| v.as_str()).unwrap_or("all");
let jobs = SchedulerStore::list_jobs(&self.pool).await?;
let filtered: Vec<&ScheduledJob> = match filter {
"enabled" => jobs.iter().filter(|j| j.enabled).collect(),
"disabled" => jobs.iter().filter(|j| !j.enabled).collect(),
_ => jobs.iter().collect(),
};
if filtered.is_empty() {
return Ok(ToolResult { success: true, output: "No scheduled jobs found.".into(), error: None });
}
let mut lines = Vec::new();
for j in &filtered {
let status = if j.enabled { "🟢" } else { "⚫" };
let last = match (&j.last_status, &j.last_error) {
(Some(s), _) if s == "ok" => " last:✅".to_string(),
(Some(_), Some(e)) => format!(" last:❌({})", &e[..e.len().min(40)]),
_ => String::new(),
};
let model = j.model.as_deref().unwrap_or("default");
lines.push(format!(
"{} id={} name=\"{}\" channel={} chat={} model={} next={}{}",
status, j.id, j.name, j.channel, j.chat_id, model, j.next_run_at, last
));
}
Ok(ToolResult { success: true, output: lines.join("\n"), error: None })
}
}
// ── CronRemoveTool ───────────────────────────────────────────────────────────
pub struct CronRemoveTool {
pool: SqlitePool,
}
impl CronRemoveTool {
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
}
#[async_trait]
impl Tool for CronRemoveTool {
fn name(&self) -> &str { "cron_remove" }
fn description(&self) -> &str {
"Delete a scheduled task permanently by its job ID. Use cron_list first to find the ID."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "The ID of the job to delete"
}
},
"required": ["job_id"]
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
if job_id.is_empty() {
return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
}
// Verify job exists
match SchedulerStore::get_job(&self.pool, &job_id).await {
Ok(_) => {},
Err(_) => return Ok(ToolResult { success: false, output: format!("Job {} not found.", job_id), error: Some("not found".into()) }),
}
SchedulerStore::remove_job(&self.pool, &job_id).await?;
Ok(ToolResult { success: true, output: format!("Job {} deleted.", job_id), error: None })
}
}
// ── CronEnableTool ───────────────────────────────────────────────────────────
pub struct CronEnableTool {
pool: SqlitePool,
}
impl CronEnableTool {
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
}
#[async_trait]
impl Tool for CronEnableTool {
fn name(&self) -> &str { "cron_enable" }
fn description(&self) -> &str { "Enable a disabled scheduled task by its job ID." }
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "The ID of the job to enable"
}
},
"required": ["job_id"]
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
if job_id.is_empty() {
return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
}
let job = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?;
let next = next_run_for_schedule(&job.schedule, now_ms());
SchedulerStore::set_enabled(&self.pool, &job_id, true).await?;
if let Some(n) = next {
SchedulerStore::set_next_run(&self.pool, &job_id, n).await?;
}
Ok(ToolResult { success: true, output: format!("Job {} enabled.", job_id), error: None })
}
}
// ── CronDisableTool ──────────────────────────────────────────────────────────
pub struct CronDisableTool {
pool: SqlitePool,
}
impl CronDisableTool {
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
}
#[async_trait]
impl Tool for CronDisableTool {
fn name(&self) -> &str { "cron_disable" }
fn description(&self) -> &str { "Disable a scheduled task by its job ID without deleting it." }
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "The ID of the job to disable"
}
},
"required": ["job_id"]
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
if job_id.is_empty() {
return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
}
let _ = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?;
SchedulerStore::set_enabled(&self.pool, &job_id, false).await?;
Ok(ToolResult { success: true, output: format!("Job {} disabled.", job_id), error: None })
}
}
// ── CronUpdateTool ───────────────────────────────────────────────────────────
pub struct CronUpdateTool {
pool: SqlitePool,
}
impl CronUpdateTool {
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
}
#[async_trait]
impl Tool for CronUpdateTool {
fn name(&self) -> &str { "cron_update" }
fn description(&self) -> &str {
"Update fields of an existing scheduled task. Only specified fields are changed."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "The ID of the job to update"
},
"prompt": {
"type": "string",
"description": "New AI prompt"
},
"schedule": {
"type": "object",
"description": "New schedule definition"
},
"channel": {
"type": "string",
"description": "New target channel"
},
"chat_id": {
"type": "string",
"description": "New target chat ID"
},
"model": {
"type": "string",
"description": "New model override"
}
},
"required": ["job_id"]
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
if job_id.is_empty() {
return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
}
let _ = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?;
let prompt = args.get("prompt").and_then(|v| v.as_str()).map(|s| s.to_string());
let schedule: Option<Schedule> = match args.get("schedule") {
Some(s) => Some(serde_json::from_value(s.clone()).map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?),
None => None,
};
let channel = args.get("channel").and_then(|v| v.as_str()).map(|s| s.to_string());
let chat_id = args.get("chat_id").and_then(|v| v.as_str()).map(|s| s.to_string());
let model = args.get("model").and_then(|v| v.as_str()).map(|s| s.to_string());
SchedulerStore::update_job(&self.pool, &job_id, prompt, schedule, channel, chat_id, model).await?;
// If schedule changed, recompute next_run_at
if args.get("schedule").is_some() {
let job = SchedulerStore::get_job(&self.pool, &job_id).await?;
if let Some(next) = next_run_for_schedule(&job.schedule, now_ms()) {
SchedulerStore::set_next_run(&self.pool, &job_id, next).await?;
}
}
Ok(ToolResult { success: true, output: format!("Job {} updated.", job_id), error: None })
}
}
- Step 4: Run tests to verify they pass
Run: cargo test --lib scheduler::tools -- 2>&1
Expected: All 6 tests PASS.
- Step 5: Register tools in gateway
In src/gateway/mod.rs, after session_manager.register_outbound_tool(available_channels); (line 76), add:
// Register cron tools if scheduler is enabled
if config.gateway.scheduler.as_ref().map_or(true, |c| c.enabled) {
let scheduler_pool = pool.clone();
let valid_channels = available_channels.clone();
session_manager.tools().register(
crate::scheduler::tools::CronAddTool::new(scheduler_pool.clone(), valid_channels)
);
session_manager.tools().register(
crate::scheduler::tools::CronListTool::new(scheduler_pool.clone())
);
session_manager.tools().register(
crate::scheduler::tools::CronRemoveTool::new(scheduler_pool.clone())
);
session_manager.tools().register(
crate::scheduler::tools::CronEnableTool::new(scheduler_pool.clone())
);
session_manager.tools().register(
crate::scheduler::tools::CronDisableTool::new(scheduler_pool.clone())
);
session_manager.tools().register(
crate::scheduler::tools::CronUpdateTool::new(scheduler_pool.clone())
);
tracing::info!("Cron tools registered");
}
- Step 6: Verify build
Run: cargo check 2>&1
Expected: Compiles successfully.
- Step 7: Commit
git add src/scheduler/tools.rs src/gateway/mod.rs
git commit -m "feat: add 6 cron agent tools (add/list/remove/enable/disable/update)"
Task 10: Full Build, Lint, and Unit Tests
Files:
-
All above
-
Step 1: Run full check
Run: cargo check 2>&1
Expected: Compiles successfully, no errors, no warnings.
- Step 2: Run cargo clippy
Run: cargo clippy -- -D warnings 2>&1
Expected: No warnings emitted.
- Step 3: Run all unit tests
Run: cargo test --lib 2>&1
Expected: All tests PASS, including existing tests and the 18 new scheduler tests.
- Step 4: Commit
git add --all
git commit -m "feat: complete scheduled tasks implementation"
Task 11: Integration Test
Files:
-
Modify:
tests/test_integration.rs(or createtests/test_scheduler.rs) -
Step 1: Write integration test
Create tests/test_scheduler.rs:
//! Integration tests for the scheduled tasks (cron) system.
//! Requires `.env` with real API keys and a running Gateway.
//! Run with: cargo test --test test_scheduler -- --ignored
use serde_json::json;
/// This test verifies that the scheduler module compiles and its types are
/// accessible. Full integration testing requires a running Gateway instance
/// with API keys, so the actual job-execution flow is tested there.
#[tokio::test]
async fn test_scheduler_types_roundtrip() {
use picobot::scheduler::Schedule;
// Verify JSON (de)serialization works
let s1 = Schedule::Every { every_ms: 3600000 };
let json = serde_json::to_string(&s1).unwrap();
let s2: Schedule = serde_json::from_str(&json).unwrap();
match s2 {
Schedule::Every { every_ms } => assert_eq!(every_ms, 3600000),
_ => panic!("expected Every"),
}
let s1 = Schedule::At { at: 1000000 };
let json = serde_json::to_string(&s1).unwrap();
let s2: Schedule = serde_json::from_str(&json).unwrap();
match s2 {
Schedule::At { at } => assert_eq!(at, 1000000),
_ => panic!("expected At"),
}
let s1 = Schedule::Cron { expr: "0 0 9 * * *".into(), tz: None };
let json = serde_json::to_string(&s1).unwrap();
let s2: Schedule = serde_json::from_str(&json).unwrap();
match s2 {
Schedule::Cron { expr, tz } => {
assert_eq!(expr, "0 0 9 * * *");
assert!(tz.is_none());
}
_ => panic!("expected Cron"),
}
}
/// Verify that next_run_for_schedule produces valid future timestamps.
#[test]
fn test_next_run_always_future() {
use picobot::scheduler::{next_run_for_schedule, Schedule};
let now = 1700000000000_i64; // Some fixed reference time
let schedules = vec![
Schedule::Every { every_ms: 60000 },
Schedule::Cron { expr: "0 0 9 * * *".into(), tz: None },
];
for s in &schedules {
let next = next_run_for_schedule(s, now);
assert!(next.is_some(), "expected next run for {:?}", s);
assert!(next.unwrap() > now, "next run should be after now for {:?}", s);
}
}
/// Verify that one-shot At schedule disables after run (logic tested in unit tests,
/// this just ensures the schedule round-trips correctly).
#[test]
fn test_at_schedule_is_one_shot_by_contract() {
use picobot::scheduler::Schedule;
// At schedules by definition fire once — the scheduler loop handles
// disabling/deleting after run. This test confirms the type is correct.
let s = Schedule::At { at: 1700000000000 };
let json = serde_json::to_string(&s).unwrap();
assert!(json.contains("\"at\""));
}
- Step 2: Run integration test
Run: cargo test --test test_scheduler 2>&1
Expected: All 3 tests PASS.
- Step 3: Commit
git add tests/test_scheduler.rs
git commit -m "test: add scheduler integration tests"
Task 12: Final Verification
- Step 1: Full test suite
Run: cargo test --lib 2>&1
Expected: All tests PASS.
Run: cargo test --test test_scheduler 2>&1
Expected: All tests PASS.
- Step 2: Build binary
Run: cargo build 2>&1
Expected: Build succeeds with no errors.
- Step 3: Grep for TODOs / placeholders
Run: grep -rn "TODO\|FIXME\|TBD\|todo!\|unimplemented!" src/scheduler/ 2>&1
Expected: No output (no placeholders).
- Step 4: Final commit (if any changes)
git status
git add --all
git commit -m "chore: final cleanup for scheduled tasks"