- storage/scheduler.rs: ScheduledJob/JobRun types + CRUD on Storage - tools/cron.rs: 6 cron agent tools (add/list/remove/enable/disable/update) - scheduler/types.rs: keep only Schedule enum - scheduler/mod.rs: use Arc<Storage> instead of raw SqlitePool - gateway/mod.rs: inject Storage directly, replace pool field - storage/mod.rs: scheduler tables in init_schema
2357 lines
78 KiB
Markdown
2357 lines
78 KiB
Markdown
# Scheduled Tasks (Cron Jobs) Implementation Plan
|
|
|
|
> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking.
|
|
|
|
**Goal:** Add a cron-like scheduled task system to PicoBot that triggers agent LLM prompts on a schedule (cron expression / fixed interval / one-shot), with results delivered to channels.
|
|
|
|
**Architecture:** A new `src/scheduler/` module with its own SQLite store (sharing the existing `sqlx::SqlitePool` via `storage.pool()`). The scheduler runs as a tokio background task, calling `SessionManager::handle_cron_message()` directly. Agent-facing tools (cron_add, cron_list, etc.) let the LLM manage jobs. Schedule computation uses the `cron` and `chrono-tz` crates.
|
|
|
|
**Tech Stack:** Rust, tokio, sqlx, `cron` 0.15, `chrono-tz` 0.10
|
|
|
|
---
|
|
|
|
## File Structure
|
|
|
|
| File | Create/Modify | Responsibility |
|
|
|------|--------------|----------------|
|
|
| `Cargo.toml` | Modify | Add `cron`, `chrono-tz` dependencies |
|
|
| `src/lib.rs` | Modify | Add `pub mod scheduler;` |
|
|
| `src/config/mod.rs` | Modify | Add `SchedulerConfig` to `GatewayConfig` |
|
|
| `src/scheduler/types.rs` | Create | `Schedule`, `ScheduledJob`, `JobRun` data types |
|
|
| `src/scheduler/store.rs` | Create | SQLite schema + CRUD for `scheduled_jobs` and `job_runs` |
|
|
| `src/scheduler/mod.rs` | Create | `Scheduler` struct, `run()` loop, `next_run_for_schedule()` |
|
|
| `src/scheduler/tools.rs` | Create | 6 agent tools: `cron_add/list/remove/enable/disable/update` |
|
|
| `src/bus/message.rs` | Modify | Add `ChatMessage::user_with_source()` factory |
|
|
| `src/session/session.rs` | Modify | Add `handle_cron_message()` method |
|
|
| `src/gateway/mod.rs` | Modify | Create `Scheduler`, spawn background task, register cron tools |
|
|
| `src/session/session_id.rs` | Modify | Add `from_components()` convenience constructor |
|
|
|
|
---
|
|
|
|
### Task 1: Add Cron Dependencies
|
|
|
|
**Files:**
|
|
- Modify: `Cargo.toml`
|
|
|
|
- [ ] **Step 1: Add `cron` and `chrono-tz` to dependencies**
|
|
|
|
After line 28 (`tempfile = "3"`), insert:
|
|
|
|
```toml
|
|
cron = "0.15"
|
|
chrono-tz = "0.10"
|
|
```
|
|
|
|
- [ ] **Step 2: Verify build**
|
|
|
|
Run: `cargo check 2>&1`
|
|
Expected: Dependencies download and resolve. No code referencing them yet, so no compile errors.
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
git add Cargo.toml Cargo.lock
|
|
git commit -m "deps: add cron and chrono-tz for scheduled tasks"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 2: Add SchedulerConfig
|
|
|
|
**Files:**
|
|
- Modify: `src/config/mod.rs:143` (after existing `session_db_path` field)
|
|
|
|
- [ ] **Step 1: Define `SchedulerConfig` struct**
|
|
|
|
Add after the closing `}` of `GatewayConfig` (after line 143, before the `ClientConfig` block):
|
|
|
|
```rust
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct SchedulerConfig {
|
|
/// Whether the scheduler is enabled
|
|
#[serde(default = "default_scheduler_enabled")]
|
|
pub enabled: bool,
|
|
/// Poll interval in seconds (how often to check for due jobs)
|
|
#[serde(default = "default_poll_interval_secs")]
|
|
pub poll_interval_secs: u64,
|
|
/// Maximum concurrent job executions (currently sequential, reserved for future)
|
|
#[serde(default = "default_max_concurrent")]
|
|
pub max_concurrent: usize,
|
|
}
|
|
|
|
fn default_scheduler_enabled() -> bool { true }
|
|
|
|
fn default_poll_interval_secs() -> u64 { 60 }
|
|
|
|
fn default_max_concurrent() -> usize { 1 }
|
|
|
|
impl Default for SchedulerConfig {
|
|
fn default() -> Self {
|
|
Self {
|
|
enabled: true,
|
|
poll_interval_secs: 60,
|
|
max_concurrent: 1,
|
|
}
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Add `scheduler` field to `GatewayConfig`**
|
|
|
|
In `GatewayConfig` (line 132-143), add after `session_db_path`:
|
|
|
|
```rust
|
|
#[serde(default)]
|
|
pub scheduler: Option<SchedulerConfig>,
|
|
```
|
|
|
|
The full struct becomes:
|
|
|
|
```rust
|
|
#[derive(Debug, Clone, Deserialize, Serialize)]
|
|
pub struct GatewayConfig {
|
|
#[serde(default = "default_gateway_host")]
|
|
pub host: String,
|
|
#[serde(default = "default_gateway_port")]
|
|
pub port: u16,
|
|
#[serde(default, rename = "session_ttl_hours")]
|
|
pub session_ttl_hours: Option<u64>,
|
|
#[serde(default, rename = "cleanup_interval_minutes")]
|
|
pub cleanup_interval_minutes: Option<u64>,
|
|
#[serde(default, rename = "session_db_path")]
|
|
pub session_db_path: Option<String>,
|
|
#[serde(default)]
|
|
pub scheduler: Option<SchedulerConfig>,
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3: Update `Default for GatewayConfig`**
|
|
|
|
In the `impl Default for GatewayConfig` block (around line 163), add:
|
|
|
|
```rust
|
|
scheduler: None,
|
|
```
|
|
|
|
- [ ] **Step 4: Verify build**
|
|
|
|
Run: `cargo check 2>&1`
|
|
Expected: Compiles successfully.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/config/mod.rs
|
|
git commit -m "feat: add SchedulerConfig to GatewayConfig"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 3: Create Scheduler Data Types
|
|
|
|
**Files:**
|
|
- Create: `src/scheduler/types.rs`
|
|
- Modify: `src/scheduler/mod.rs` (stub)
|
|
- Modify: `src/lib.rs`
|
|
|
|
- [ ] **Step 1: Register the scheduler module**
|
|
|
|
In `src/lib.rs`, after `pub mod providers;` (line 14), add:
|
|
|
|
```rust
|
|
pub mod scheduler;
|
|
```
|
|
|
|
- [ ] **Step 2: Create stub `src/scheduler/mod.rs`**
|
|
|
|
```rust
|
|
pub mod types;
|
|
pub mod store;
|
|
pub mod tools;
|
|
|
|
pub use types::{JobRun, Schedule, ScheduledJob};
|
|
```
|
|
|
|
- [ ] **Step 3: Define types in `src/scheduler/types.rs`**
|
|
|
|
```rust
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
/// How a job is scheduled. Serialized as JSON in the database `schedule` column.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
#[serde(tag = "type")]
|
|
pub enum Schedule {
|
|
/// One-shot: fires once at a specific Unix millisecond timestamp, then disables.
|
|
#[serde(rename = "at")]
|
|
At { at: i64 },
|
|
/// Recurring: fires every `every_ms` milliseconds.
|
|
#[serde(rename = "every")]
|
|
Every { every_ms: u64 },
|
|
/// Recurring: fires on a cron schedule with optional timezone.
|
|
#[serde(rename = "cron")]
|
|
Cron { expr: String, tz: Option<String> },
|
|
}
|
|
|
|
/// A scheduled job stored in the database.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct ScheduledJob {
|
|
pub id: String,
|
|
pub name: String,
|
|
/// JSON-serialized `Schedule` stored as TEXT in SQLite.
|
|
pub schedule: Schedule,
|
|
pub prompt: String,
|
|
pub channel: String,
|
|
pub chat_id: String,
|
|
pub model: Option<String>,
|
|
pub enabled: bool,
|
|
pub delete_after_run: bool,
|
|
pub next_run_at: i64,
|
|
pub last_run_at: Option<i64>,
|
|
pub last_status: Option<String>,
|
|
pub last_error: Option<String>,
|
|
pub created_at: i64,
|
|
pub updated_at: i64,
|
|
}
|
|
|
|
/// A single execution record for a job.
|
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
|
pub struct JobRun {
|
|
pub id: i64,
|
|
pub job_id: String,
|
|
pub started_at: i64,
|
|
pub finished_at: i64,
|
|
pub status: String,
|
|
pub output: Option<String>,
|
|
pub error: Option<String>,
|
|
pub duration_ms: i64,
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Verify build**
|
|
|
|
Run: `cargo check 2>&1`
|
|
Expected: Compiles successfully.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/lib.rs src/scheduler/
|
|
git commit -m "feat: add scheduler data types (Schedule, ScheduledJob, JobRun)"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 4: Add `ChatMessage::user_with_source` and `Session::create_user_message_with_source`
|
|
|
|
**Files:**
|
|
- Modify: `src/bus/message.rs:198` (after `pub fn tool(...)`)
|
|
- Modify: `src/session/session.rs:252` (after `create_user_message`)
|
|
|
|
- [ ] **Step 1: Write the failing test**
|
|
|
|
Create inline test in `src/bus/message.rs`, immediately before the closing `}` of the `impl ChatMessage` block (after line 197):
|
|
|
|
```rust
|
|
pub fn user_with_source(content: impl Into<String>, source: MessageSource) -> Self {
|
|
Self {
|
|
id: uuid::Uuid::new_v4().to_string(),
|
|
role: "user".to_string(),
|
|
content: content.into(),
|
|
media_refs: Vec::new(),
|
|
timestamp: current_timestamp(),
|
|
tool_call_id: None,
|
|
tool_name: None,
|
|
tool_calls: None,
|
|
source: Some(source),
|
|
}
|
|
}
|
|
```
|
|
|
|
No test for this factory method separately — it's pure data construction.
|
|
|
|
- [ ] **Step 2: Write the failing test for `create_user_message_with_source`**
|
|
|
|
In `src/session/session.rs`, after `create_user_message` (line 252), add this test:
|
|
|
|
```rust
|
|
pub fn create_user_message_with_source(&self, content: &str, media_refs: Vec<String>, source: crate::bus::MessageSource) -> ChatMessage {
|
|
if media_refs.is_empty() {
|
|
ChatMessage::user_with_source(content, source)
|
|
} else {
|
|
// For simplicity, ignore media in cron messages (media is always empty from scheduler)
|
|
ChatMessage::user_with_source(content, source)
|
|
}
|
|
}
|
|
```
|
|
|
|
No test for this separately — it's used in `handle_cron_message` which gets tested via integration.
|
|
|
|
- [ ] **Step 3: Verify build**
|
|
|
|
Run: `cargo check 2>&1`
|
|
Expected: Compiles successfully.
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add src/bus/message.rs src/session/session.rs
|
|
git commit -m "feat: add ChatMessage::user_with_source and Session::create_user_message_with_source"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 5: Create SchedulerStore (SQLite Schema + CRUD)
|
|
|
|
**Files:**
|
|
- Create: `src/scheduler/store.rs`
|
|
|
|
- [ ] **Step 1: Write the failing test**
|
|
|
|
Write inline tests at the bottom of `src/scheduler/store.rs`. These test `SchedulerStore` against an **in-memory** SQLite database:
|
|
|
|
```rust
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::scheduler::types::{Schedule, ScheduledJob, JobRun};
|
|
use sqlx::SqlitePool;
|
|
|
|
fn now() -> i64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_millis() as i64
|
|
}
|
|
|
|
async fn setup_pool() -> SqlitePool {
|
|
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
|
|
SchedulerStore::init(&pool).await.unwrap();
|
|
pool
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_init_creates_tables() {
|
|
let pool = setup_pool().await;
|
|
let row: (i64,) = sqlx::query_as("SELECT COUNT(*) FROM scheduled_jobs")
|
|
.fetch_one(&pool).await.unwrap();
|
|
assert_eq!(row.0, 0);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_add_and_get_job() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
let job = ScheduledJob {
|
|
id: "job-1".into(),
|
|
name: "test job".into(),
|
|
schedule: Schedule::Every { every_ms: 3600000 },
|
|
prompt: "say hello".into(),
|
|
channel: "cli_chat".into(),
|
|
chat_id: "conn-1".into(),
|
|
model: None,
|
|
enabled: true,
|
|
delete_after_run: false,
|
|
next_run_at: t + 3600000,
|
|
last_run_at: None,
|
|
last_status: None,
|
|
last_error: None,
|
|
created_at: t,
|
|
updated_at: t,
|
|
};
|
|
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
|
let got = SchedulerStore::get_job(&pool, "job-1").await.unwrap();
|
|
assert_eq!(got.id, "job-1");
|
|
assert_eq!(got.name, "test job");
|
|
assert_eq!(got.prompt, "say hello");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_list_jobs() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
for i in 0..3 {
|
|
let job = ScheduledJob {
|
|
id: format!("job-{}", i),
|
|
name: format!("job {}", i),
|
|
schedule: Schedule::Every { every_ms: 3600000 },
|
|
prompt: "ping".into(),
|
|
channel: "cli_chat".into(),
|
|
chat_id: "conn-1".into(),
|
|
model: None, enabled: true, delete_after_run: false,
|
|
next_run_at: t + 1000, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
};
|
|
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
|
}
|
|
let jobs = SchedulerStore::list_jobs(&pool).await.unwrap();
|
|
assert_eq!(jobs.len(), 3);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_remove_job() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
let job = ScheduledJob {
|
|
id: "job-rm".into(), name: "remove me".into(),
|
|
schedule: Schedule::Every { every_ms: 1000 },
|
|
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
|
|
model: None, enabled: true, delete_after_run: false,
|
|
next_run_at: t, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
};
|
|
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
|
SchedulerStore::remove_job(&pool, "job-rm").await.unwrap();
|
|
let result = SchedulerStore::get_job(&pool, "job-rm").await;
|
|
assert!(result.is_err());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_set_enabled() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
let job = ScheduledJob {
|
|
id: "job-toggle".into(), name: "toggle".into(),
|
|
schedule: Schedule::Every { every_ms: 1000 },
|
|
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
|
|
model: None, enabled: true, delete_after_run: false,
|
|
next_run_at: t, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
};
|
|
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
|
SchedulerStore::set_enabled(&pool, "job-toggle", false).await.unwrap();
|
|
let got = SchedulerStore::get_job(&pool, "job-toggle").await.unwrap();
|
|
assert!(!got.enabled);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_due_jobs_only_returns_enabled_and_overdue() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
// Three jobs: due, not-due-yet, disabled-but-due
|
|
let jobs = vec![
|
|
ScheduledJob {
|
|
id: "due".into(), name: "due".into(),
|
|
schedule: Schedule::At { at: t }, prompt: "1".into(),
|
|
channel: "cli_chat".into(), chat_id: "c".into(),
|
|
model: None, enabled: true, delete_after_run: false,
|
|
next_run_at: t - 1000, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
},
|
|
ScheduledJob {
|
|
id: "future".into(), name: "future".into(),
|
|
schedule: Schedule::At { at: t + 99999999 }, prompt: "2".into(),
|
|
channel: "cli_chat".into(), chat_id: "c".into(),
|
|
model: None, enabled: true, delete_after_run: false,
|
|
next_run_at: t + 99999999, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
},
|
|
ScheduledJob {
|
|
id: "disabled-due".into(), name: "disabled due".into(),
|
|
schedule: Schedule::At { at: t }, prompt: "3".into(),
|
|
channel: "cli_chat".into(), chat_id: "c".into(),
|
|
model: None, enabled: false, delete_after_run: false,
|
|
next_run_at: t - 1000, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
},
|
|
];
|
|
for j in &jobs {
|
|
SchedulerStore::add_job(&pool, j).await.unwrap();
|
|
}
|
|
let due = SchedulerStore::due_jobs(&pool, t, 10).await.unwrap();
|
|
assert_eq!(due.len(), 1);
|
|
assert_eq!(due[0].id, "due");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_record_run_and_list_runs() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
let job = ScheduledJob {
|
|
id: "job-run".into(), name: "run test".into(),
|
|
schedule: Schedule::Every { every_ms: 1000 },
|
|
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
|
|
model: None, enabled: true, delete_after_run: false,
|
|
next_run_at: t, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
};
|
|
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
|
|
|
let run = JobRun {
|
|
id: 0, job_id: "job-run".into(),
|
|
started_at: t, finished_at: t + 500,
|
|
status: "ok".into(), output: Some("hello".into()),
|
|
error: None, duration_ms: 500,
|
|
};
|
|
SchedulerStore::record_run(&pool, &run).await.unwrap();
|
|
let runs = SchedulerStore::list_runs(&pool, "job-run", 10).await.unwrap();
|
|
assert_eq!(runs.len(), 1);
|
|
assert_eq!(runs[0].status, "ok");
|
|
assert_eq!(runs[0].output.as_deref(), Some("hello"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_update_job() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
let job = ScheduledJob {
|
|
id: "job-update".into(), name: "old name".into(),
|
|
schedule: Schedule::Every { every_ms: 1000 },
|
|
prompt: "old prompt".into(), channel: "feishu".into(),
|
|
chat_id: "oc_1".into(), model: None,
|
|
enabled: true, delete_after_run: false,
|
|
next_run_at: t, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
};
|
|
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
|
SchedulerStore::update_job(
|
|
&pool, "job-update",
|
|
Some("new prompt".into()),
|
|
Some(Schedule::Every { every_ms: 60000 }),
|
|
None, None, None,
|
|
).await.unwrap();
|
|
let got = SchedulerStore::get_job(&pool, "job-update").await.unwrap();
|
|
assert_eq!(got.prompt, "new prompt");
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `cargo test --lib scheduler::store -- 2>&1`
|
|
Expected: All tests FAIL because `SchedulerStore` and its methods don't exist yet (compilation errors).
|
|
|
|
- [ ] **Step 3: Implement `SchedulerStore`**
|
|
|
|
Write `src/scheduler/store.rs`:
|
|
|
|
```rust
|
|
use sqlx::Row;
|
|
use sqlx::SqlitePool;
|
|
|
|
use super::types::{JobRun, Schedule, ScheduledJob};
|
|
|
|
/// Persistence layer for scheduled jobs and run history.
|
|
/// Uses a shared `sqlx::SqlitePool` (obtained from `crate::storage::Storage::pool()`).
|
|
pub struct SchedulerStore;
|
|
|
|
impl SchedulerStore {
|
|
/// Initialize the scheduler tables. Idempotent (CREATE TABLE IF NOT EXISTS).
|
|
pub async fn init(pool: &SqlitePool) -> Result<(), Box<dyn std::error::Error>> {
|
|
sqlx::query(
|
|
r#"
|
|
CREATE TABLE IF NOT EXISTS scheduled_jobs (
|
|
id TEXT PRIMARY KEY,
|
|
name TEXT NOT NULL,
|
|
schedule TEXT NOT NULL,
|
|
prompt TEXT NOT NULL,
|
|
channel TEXT NOT NULL,
|
|
chat_id TEXT NOT NULL,
|
|
model TEXT,
|
|
enabled INTEGER NOT NULL DEFAULT 1,
|
|
delete_after_run INTEGER NOT NULL DEFAULT 0,
|
|
next_run_at INTEGER NOT NULL,
|
|
last_run_at INTEGER,
|
|
last_status TEXT,
|
|
last_error TEXT,
|
|
created_at INTEGER NOT NULL,
|
|
updated_at INTEGER NOT NULL
|
|
)
|
|
"#,
|
|
)
|
|
.execute(pool)
|
|
.await?;
|
|
|
|
sqlx::query(
|
|
r#"
|
|
CREATE TABLE IF NOT EXISTS job_runs (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
job_id TEXT NOT NULL REFERENCES scheduled_jobs(id) ON DELETE CASCADE,
|
|
started_at INTEGER NOT NULL,
|
|
finished_at INTEGER NOT NULL,
|
|
status TEXT NOT NULL,
|
|
output TEXT,
|
|
error TEXT,
|
|
duration_ms INTEGER NOT NULL
|
|
)
|
|
"#,
|
|
)
|
|
.execute(pool)
|
|
.await?;
|
|
|
|
sqlx::query(
|
|
"CREATE INDEX IF NOT EXISTS idx_jobs_next_run ON scheduled_jobs(enabled, next_run_at)",
|
|
)
|
|
.execute(pool)
|
|
.await?;
|
|
|
|
sqlx::query("CREATE INDEX IF NOT EXISTS idx_runs_job_id ON job_runs(job_id)")
|
|
.execute(pool)
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
/// Insert a new job. Returns an error if a job with the same ID already exists.
|
|
pub async fn add_job(
|
|
pool: &SqlitePool,
|
|
job: &ScheduledJob,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
let schedule_json = serde_json::to_string(&job.schedule)?;
|
|
sqlx::query(
|
|
r#"
|
|
INSERT INTO scheduled_jobs
|
|
(id, name, schedule, prompt, channel, chat_id, model,
|
|
enabled, delete_after_run, next_run_at, last_run_at,
|
|
last_status, last_error, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
"#,
|
|
)
|
|
.bind(&job.id)
|
|
.bind(&job.name)
|
|
.bind(&schedule_json)
|
|
.bind(&job.prompt)
|
|
.bind(&job.channel)
|
|
.bind(&job.chat_id)
|
|
.bind(&job.model)
|
|
.bind(job.enabled as i32)
|
|
.bind(job.delete_after_run as i32)
|
|
.bind(job.next_run_at)
|
|
.bind(job.last_run_at)
|
|
.bind(&job.last_status)
|
|
.bind(&job.last_error)
|
|
.bind(job.created_at)
|
|
.bind(job.updated_at)
|
|
.execute(pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Fetch a single job by ID.
|
|
pub async fn get_job(
|
|
pool: &SqlitePool,
|
|
id: &str,
|
|
) -> Result<ScheduledJob, Box<dyn std::error::Error>> {
|
|
let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?")
|
|
.bind(id)
|
|
.fetch_optional(pool)
|
|
.await?
|
|
.ok_or_else(|| format!("job not found: {id}"))?;
|
|
Ok(row_to_job(&row)?)
|
|
}
|
|
|
|
/// List all jobs, ordered by next_run_at ascending.
|
|
pub async fn list_jobs(
|
|
pool: &SqlitePool,
|
|
) -> Result<Vec<ScheduledJob>, Box<dyn std::error::Error>> {
|
|
let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC")
|
|
.fetch_all(pool)
|
|
.await?;
|
|
rows.iter().map(row_to_job).collect()
|
|
}
|
|
|
|
/// Delete a job (cascades to job_runs).
|
|
pub async fn remove_job(
|
|
pool: &SqlitePool,
|
|
id: &str,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?")
|
|
.bind(id)
|
|
.execute(pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Enable or disable a job.
|
|
pub async fn set_enabled(
|
|
pool: &SqlitePool,
|
|
id: &str,
|
|
enabled: bool,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?")
|
|
.bind(enabled as i32)
|
|
.bind(now_ms())
|
|
.bind(id)
|
|
.execute(pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Update selective fields on a job. Pass `None` for fields that should not change.
|
|
#[allow(clippy::too_many_arguments)]
|
|
pub async fn update_job(
|
|
pool: &SqlitePool,
|
|
id: &str,
|
|
prompt: Option<String>,
|
|
schedule: Option<Schedule>,
|
|
channel: Option<String>,
|
|
chat_id: Option<String>,
|
|
model: Option<String>,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
let now = now_ms();
|
|
|
|
if let Some(p) = prompt {
|
|
sqlx::query(
|
|
"UPDATE scheduled_jobs SET prompt = ?, updated_at = ? WHERE id = ?",
|
|
)
|
|
.bind(&p)
|
|
.bind(now)
|
|
.bind(id)
|
|
.execute(pool)
|
|
.await?;
|
|
}
|
|
if let Some(s) = schedule {
|
|
let json = serde_json::to_string(&s)?;
|
|
sqlx::query(
|
|
"UPDATE scheduled_jobs SET schedule = ?, updated_at = ? WHERE id = ?",
|
|
)
|
|
.bind(&json)
|
|
.bind(now)
|
|
.bind(id)
|
|
.execute(pool)
|
|
.await?;
|
|
}
|
|
if let Some(c) = channel {
|
|
sqlx::query(
|
|
"UPDATE scheduled_jobs SET channel = ?, updated_at = ? WHERE id = ?",
|
|
)
|
|
.bind(&c)
|
|
.bind(now)
|
|
.bind(id)
|
|
.execute(pool)
|
|
.await?;
|
|
}
|
|
if let Some(c) = chat_id {
|
|
sqlx::query(
|
|
"UPDATE scheduled_jobs SET chat_id = ?, updated_at = ? WHERE id = ?",
|
|
)
|
|
.bind(&c)
|
|
.bind(now)
|
|
.bind(id)
|
|
.execute(pool)
|
|
.await?;
|
|
}
|
|
if let Some(m) = model {
|
|
sqlx::query(
|
|
"UPDATE scheduled_jobs SET model = ?, updated_at = ? WHERE id = ?",
|
|
)
|
|
.bind(&m)
|
|
.bind(now)
|
|
.bind(id)
|
|
.execute(pool)
|
|
.await?;
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Update next_run_at and last_run_at for a job (used during reschedule).
|
|
pub async fn set_next_run(
|
|
pool: &SqlitePool,
|
|
id: &str,
|
|
next_run_at: i64,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
let now = now_ms();
|
|
sqlx::query(
|
|
"UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?",
|
|
)
|
|
.bind(next_run_at)
|
|
.bind(now)
|
|
.bind(now)
|
|
.bind(id)
|
|
.execute(pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Set last_run_at and optionally last_status / last_error (used when starting job execution).
|
|
pub async fn touch_last_run(
|
|
pool: &SqlitePool,
|
|
id: &str,
|
|
at: i64,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
sqlx::query(
|
|
"UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?",
|
|
)
|
|
.bind(at)
|
|
.bind(at)
|
|
.bind(id)
|
|
.execute(pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Set last_status and last_error after job completion.
|
|
pub async fn set_last_status(
|
|
pool: &SqlitePool,
|
|
id: &str,
|
|
status: &str,
|
|
error: Option<&str>,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
let now = now_ms();
|
|
sqlx::query(
|
|
"UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?",
|
|
)
|
|
.bind(status)
|
|
.bind(error)
|
|
.bind(now)
|
|
.bind(id)
|
|
.execute(pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Fetch enabled jobs whose next_run_at <= now, up to `limit`.
|
|
pub async fn due_jobs(
|
|
pool: &SqlitePool,
|
|
now: i64,
|
|
limit: usize,
|
|
) -> Result<Vec<ScheduledJob>, Box<dyn std::error::Error>> {
|
|
let rows = sqlx::query(
|
|
"SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?",
|
|
)
|
|
.bind(now)
|
|
.bind(limit as i64)
|
|
.fetch_all(pool)
|
|
.await?;
|
|
rows.iter().map(row_to_job).collect()
|
|
}
|
|
|
|
/// Record a run execution.
|
|
pub async fn record_run(
|
|
pool: &SqlitePool,
|
|
run: &JobRun,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
sqlx::query(
|
|
r#"
|
|
INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
"#,
|
|
)
|
|
.bind(&run.job_id)
|
|
.bind(run.started_at)
|
|
.bind(run.finished_at)
|
|
.bind(&run.status)
|
|
.bind(&run.output)
|
|
.bind(&run.error)
|
|
.bind(run.duration_ms)
|
|
.execute(pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// List the most recent `limit` runs for a job, newest first.
|
|
pub async fn list_runs(
|
|
pool: &SqlitePool,
|
|
job_id: &str,
|
|
limit: usize,
|
|
) -> Result<Vec<JobRun>, Box<dyn std::error::Error>> {
|
|
let rows = sqlx::query(
|
|
"SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?",
|
|
)
|
|
.bind(job_id)
|
|
.bind(limit as i64)
|
|
.fetch_all(pool)
|
|
.await?;
|
|
rows.iter()
|
|
.map(|r| {
|
|
Ok(JobRun {
|
|
id: r.try_get("id")?,
|
|
job_id: r.try_get("job_id")?,
|
|
started_at: r.try_get("started_at")?,
|
|
finished_at: r.try_get("finished_at")?,
|
|
status: r.try_get("status")?,
|
|
output: r.try_get("output")?,
|
|
error: r.try_get("error")?,
|
|
duration_ms: r.try_get("duration_ms")?,
|
|
})
|
|
})
|
|
.collect()
|
|
}
|
|
|
|
/// Delete jobs created before `before` that are disabled.
|
|
pub async fn cleanup_disabled(
|
|
pool: &SqlitePool,
|
|
before: i64,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
sqlx::query(
|
|
"DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?",
|
|
)
|
|
.bind(before)
|
|
.execute(pool)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn now_ms() -> i64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_millis() as i64
|
|
}
|
|
|
|
fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> Result<ScheduledJob, Box<dyn std::error::Error>> {
|
|
let schedule_json: String = row.try_get("schedule")?;
|
|
let schedule: Schedule = serde_json::from_str(&schedule_json)?;
|
|
Ok(ScheduledJob {
|
|
id: row.try_get("id")?,
|
|
name: row.try_get("name")?,
|
|
schedule,
|
|
prompt: row.try_get("prompt")?,
|
|
channel: row.try_get("channel")?,
|
|
chat_id: row.try_get("chat_id")?,
|
|
model: row.try_get("model")?,
|
|
enabled: row.try_get::<i32, _>("enabled")? != 0,
|
|
delete_after_run: row.try_get::<i32, _>("delete_after_run")? != 0,
|
|
next_run_at: row.try_get("next_run_at")?,
|
|
last_run_at: row.try_get("last_run_at")?,
|
|
last_status: row.try_get("last_status")?,
|
|
last_error: row.try_get("last_error")?,
|
|
created_at: row.try_get("created_at")?,
|
|
updated_at: row.try_get("updated_at")?,
|
|
})
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `cargo test --lib scheduler::store -- 2>&1`
|
|
Expected: All 7 tests PASS.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/scheduler/store.rs
|
|
git commit -m "feat: add SchedulerStore with SQLite schema and CRUD"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 6: Add `handle_cron_message` to SessionManager
|
|
|
|
**Files:**
|
|
- Modify: `src/session/session.rs:1247` (after `handle_message`)
|
|
|
|
- [ ] **Step 1: Write the failing test**
|
|
|
|
In `src/session/session.rs`, add this test inside the existing `#[cfg(test)] mod tests` block. If there is no existing test module, add it at the bottom of the file:
|
|
|
|
```rust
|
|
// Note: this test verifies only the compile-time signature. Full integration
|
|
// testing of handle_cron_message requires a running Gateway (see integration test).
|
|
#[tokio::test]
|
|
async fn test_handle_cron_message_exists() {
|
|
// Compile-time assertion that the method exists on SessionManager
|
|
// The actual behavior is tested in the integration test
|
|
assert!(true);
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Run test to verify it fails**
|
|
|
|
Run: `cargo test --lib session::session::test_handle_cron_message_exists -- 2>&1`
|
|
Expected: PASS (trivial test, always passes). The real verification is that `cargo check` succeeds after we add the method in step 3.
|
|
|
|
- [ ] **Step 3: Add `handle_cron_message` method**
|
|
|
|
In `src/session/session.rs`, add after `handle_message` (after line 1247):
|
|
|
|
```rust
|
|
/// Handle a message triggered by a scheduled cron job.
|
|
///
|
|
/// This is similar to `handle_message`, but the user message is created with
|
|
/// `SourceKind::ExternalTrigger` source metadata so that the cron job identity
|
|
/// is preserved in the conversation history and database.
|
|
pub async fn handle_cron_message(
|
|
&self,
|
|
channel: &str,
|
|
chat_id: &str,
|
|
prompt: &str,
|
|
job_id: &str,
|
|
job_name: &str,
|
|
) -> Result<HandleResult, AgentError> {
|
|
use crate::bus::{MessageSource, SourceKind};
|
|
|
|
let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
|
|
*self.current_source_session.lock().await = Some(unified_id.to_string());
|
|
tracing::debug!(unified_id = %unified_id, job_id = %job_id, "handle_cron_message resolved");
|
|
|
|
let session = self.get_or_create_session(&unified_id).await?;
|
|
|
|
// Normal message handling through LLM (cron messages skip slash command check)
|
|
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
|
|
|
|
// Spawn notification publisher
|
|
{
|
|
use std::collections::HashMap;
|
|
use crate::bus::OutboundMessage;
|
|
let bus = self.bus.clone();
|
|
let ch = channel.to_string();
|
|
let cid = chat_id.to_string();
|
|
tokio::spawn(async move {
|
|
while let Some(notif) = notify_rx.recv().await {
|
|
let mut metadata = HashMap::new();
|
|
metadata.insert("_type".to_string(), "notification".to_string());
|
|
let outbound = OutboundMessage {
|
|
channel: ch.clone(),
|
|
chat_id: cid.clone(),
|
|
content: notif,
|
|
reply_to: None,
|
|
media: vec![],
|
|
metadata,
|
|
};
|
|
let _ = bus.publish_outbound(outbound).await;
|
|
}
|
|
});
|
|
}
|
|
|
|
let response: String = {
|
|
let mut session_guard = session.lock().await;
|
|
|
|
// Build the user message with ExternalTrigger source
|
|
let source = MessageSource {
|
|
kind: SourceKind::ExternalTrigger,
|
|
from_channel: Some(channel.to_string()),
|
|
from_session: None,
|
|
from_user_id: None,
|
|
system_name: Some(job_name.to_string()),
|
|
task_id: Some(job_id.to_string()),
|
|
};
|
|
let user_message = session_guard.create_user_message_with_source(prompt, vec![], source);
|
|
session_guard.add_message(user_message, true).await
|
|
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
|
|
|
let mut history = session_guard.get_history().to_vec();
|
|
|
|
let skills_prompt = self.skills_loader.build_skills_prompt();
|
|
let system_prompt = session_guard.build_system_prompt(&skills_prompt);
|
|
history.insert(0, ChatMessage::system(system_prompt));
|
|
|
|
let history = session_guard.compressor
|
|
.compress_if_needed(history)
|
|
.await?;
|
|
|
|
let agent = session_guard.create_agent_with_notify(notify_tx)?;
|
|
let result = agent.process(history).await?;
|
|
|
|
for msg in result.emitted_messages {
|
|
session_guard.add_message(msg, true).await
|
|
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
|
}
|
|
|
|
if session_guard.should_generate_title() {
|
|
if let Err(e) = session_guard.generate_title().await {
|
|
tracing::warn!("failed to generate title: {}", e);
|
|
}
|
|
}
|
|
|
|
result.final_response.content
|
|
};
|
|
|
|
#[cfg(debug_assertions)]
|
|
tracing::debug!(
|
|
channel = %channel,
|
|
chat_id = %chat_id,
|
|
job_id = %job_id,
|
|
response_len = %response.len(),
|
|
"Cron agent response received"
|
|
);
|
|
|
|
*self.current_source_session.lock().await = None;
|
|
|
|
Ok(HandleResult::AgentResponse(response))
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Verify build**
|
|
|
|
Run: `cargo check 2>&1`
|
|
Expected: Compiles successfully.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/session/session.rs
|
|
git commit -m "feat: add SessionManager::handle_cron_message for scheduled task execution"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 7: Implement `next_run_for_schedule` and Scheduler Loop
|
|
|
|
**Files:**
|
|
- Modify: `src/scheduler/mod.rs` (replace stub)
|
|
|
|
- [ ] **Step 1: Write the failing test**
|
|
|
|
In `src/scheduler/mod.rs`, add inline tests:
|
|
|
|
```rust
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn test_next_run_at_schedule() {
|
|
let now = 1000000;
|
|
let next = next_run_for_schedule(&Schedule::At { at: 2000000 }, now);
|
|
assert_eq!(next, Some(2000000));
|
|
}
|
|
|
|
#[test]
|
|
fn test_next_run_every_schedule() {
|
|
let now = 1000000;
|
|
let next = next_run_for_schedule(&Schedule::Every { every_ms: 5000 }, now);
|
|
assert_eq!(next, Some(1005000));
|
|
}
|
|
|
|
#[test]
|
|
fn test_next_run_cron_schedule() {
|
|
use chrono::Timelike;
|
|
// Schedule: "every minute at second 0"
|
|
let expr = "0 * * * * *".to_string();
|
|
let schedule = Schedule::Cron { expr, tz: None };
|
|
// Use a known time
|
|
let now = 1000000;
|
|
let next = next_run_for_schedule(&schedule, now);
|
|
assert!(next.is_some());
|
|
assert!(next.unwrap() > now);
|
|
}
|
|
|
|
#[test]
|
|
fn test_next_run_cron_every_day_at_9am() {
|
|
// "0 0 9 * * *" = every day at 9:00:00
|
|
let expr = "0 0 9 * * *".to_string();
|
|
let schedule = Schedule::Cron { expr, tz: None };
|
|
let now = 1000000;
|
|
let next = next_run_for_schedule(&schedule, now);
|
|
assert!(next.is_some());
|
|
let next_ms = next.unwrap();
|
|
assert!(next_ms > now);
|
|
// Round-trip to DateTime to check hour
|
|
let next_dt = ms_to_datetime(next_ms);
|
|
assert_eq!(next_dt.hour(), 9);
|
|
assert_eq!(next_dt.minute(), 0);
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `cargo test --lib scheduler::mod -- 2>&1`
|
|
Expected: FAIL — `next_run_for_schedule` not defined.
|
|
|
|
- [ ] **Step 3: Implement the full `src/scheduler/mod.rs`**
|
|
|
|
```rust
|
|
pub mod types;
|
|
pub mod store;
|
|
pub mod tools;
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Instant;
|
|
use tokio::time;
|
|
|
|
use crate::config::SchedulerConfig;
|
|
use crate::session::session::{self, HandleResult};
|
|
use crate::session::SessionManager;
|
|
|
|
pub use types::{JobRun, Schedule, ScheduledJob};
|
|
|
|
/// Compute the next execution time (Unix ms) for a schedule, given `from` (Unix ms).
|
|
/// Returns `None` if no next time can be determined (e.g., invalid cron expression).
|
|
pub fn next_run_for_schedule(schedule: &Schedule, from: i64) -> Option<i64> {
|
|
use chrono::{DateTime, TimeZone, Utc};
|
|
|
|
match schedule {
|
|
Schedule::At { at } => Some(*at),
|
|
Schedule::Every { every_ms } => Some(from + *every_ms as i64),
|
|
Schedule::Cron { expr, tz } => {
|
|
let schedule = cron::Schedule::from_str(expr.as_str()).ok()?;
|
|
// Convert Unix ms to UTC DateTime
|
|
let from_secs = (from / 1000) as i64;
|
|
let from_nanos = ((from % 1000) * 1_000_000) as u32;
|
|
let from_dt = Utc.timestamp_opt(from_secs, from_nanos).single()?;
|
|
|
|
// If timezone is specified, convert from local to UTC for comparison
|
|
let next_utc = if let Some(ref tz_str) = tz {
|
|
let tz: chrono_tz::Tz = tz_str.parse().ok()?;
|
|
let from_local = from_dt.with_timezone(&tz);
|
|
// Find the next match in the given timezone, then convert back to UTC
|
|
let next_local = schedule.upcoming(tz).next()?;
|
|
next_local.with_timezone(&Utc)
|
|
} else {
|
|
schedule.upcoming(Utc).next()?
|
|
};
|
|
|
|
Some(next_utc.timestamp_millis())
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Convert Unix milliseconds to DateTime<Utc>.
|
|
fn ms_to_datetime(ms: i64) -> chrono::DateTime<chrono::Utc> {
|
|
use chrono::{TimeZone, Utc};
|
|
let secs = (ms / 1000) as i64;
|
|
let nanos = ((ms % 1000) * 1_000_000) as u32;
|
|
Utc.timestamp_opt(secs, nanos).single().unwrap_or_default()
|
|
}
|
|
|
|
fn now_ms() -> i64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_millis() as i64
|
|
}
|
|
|
|
/// The scheduler runs as a background tokio task, periodically checking for due jobs
|
|
/// and executing them via `SessionManager::handle_cron_message`.
|
|
pub struct Scheduler {
|
|
pool: sqlx::SqlitePool,
|
|
session_manager: Arc<SessionManager>,
|
|
config: SchedulerConfig,
|
|
}
|
|
|
|
impl Scheduler {
|
|
pub fn new(
|
|
pool: sqlx::SqlitePool,
|
|
session_manager: Arc<SessionManager>,
|
|
config: SchedulerConfig,
|
|
) -> Self {
|
|
Self {
|
|
pool,
|
|
session_manager,
|
|
config,
|
|
}
|
|
}
|
|
|
|
/// Run the scheduler loop. This is a long-running async function meant to be
|
|
/// spawned as a tokio background task.
|
|
pub async fn run(self: Arc<Self>) {
|
|
let poll_duration = time::Duration::from_secs(self.config.poll_interval_secs);
|
|
let mut interval = time::interval(poll_duration);
|
|
|
|
// Skip the immediate first tick (tokio::time::interval fires immediately on first poll)
|
|
interval.tick().await;
|
|
|
|
tracing::info!(
|
|
"Scheduler started (poll interval: {}s, max concurrent: {})",
|
|
self.config.poll_interval_secs,
|
|
self.config.max_concurrent,
|
|
);
|
|
|
|
loop {
|
|
interval.tick().await;
|
|
|
|
let now = now_ms();
|
|
|
|
let due = match store::SchedulerStore::due_jobs(&self.pool, now, self.config.max_concurrent).await {
|
|
Ok(jobs) => jobs,
|
|
Err(e) => {
|
|
tracing::error!("scheduler: failed to query due jobs: {}", e);
|
|
continue;
|
|
}
|
|
};
|
|
|
|
if due.is_empty() {
|
|
continue;
|
|
}
|
|
|
|
tracing::info!("scheduler: found {} due job(s)", due.len());
|
|
|
|
for job in &due {
|
|
let start = Instant::now();
|
|
let started_at = now_ms();
|
|
|
|
// Update last_run_at so next poll doesn't re-execute
|
|
if let Err(e) = store::SchedulerStore::touch_last_run(&self.pool, &job.id, started_at).await {
|
|
tracing::error!(job_id = %job.id, "scheduler: failed to touch last_run_at: {}", e);
|
|
continue;
|
|
}
|
|
|
|
tracing::info!(
|
|
job_id = %job.id,
|
|
job_name = %job.name,
|
|
"scheduler: executing cron job"
|
|
);
|
|
|
|
let result = self
|
|
.session_manager
|
|
.handle_cron_message(
|
|
&job.channel,
|
|
&job.chat_id,
|
|
&job.prompt,
|
|
&job.id,
|
|
&job.name,
|
|
)
|
|
.await;
|
|
|
|
let finished_at = now_ms();
|
|
let duration_ms = start.elapsed().as_millis() as i64;
|
|
|
|
match result {
|
|
Ok(HandleResult::AgentResponse(output)) => {
|
|
let output_truncated = if output.len() > 8000 {
|
|
format!("{}...[truncated]", &output[..8000])
|
|
} else {
|
|
output.clone()
|
|
};
|
|
|
|
let run = JobRun {
|
|
id: 0,
|
|
job_id: job.id.clone(),
|
|
started_at,
|
|
finished_at,
|
|
status: "ok".to_string(),
|
|
output: Some(output_truncated),
|
|
error: None,
|
|
duration_ms,
|
|
};
|
|
|
|
if let Err(e) = store::SchedulerStore::record_run(&self.pool, &run).await {
|
|
tracing::error!(job_id = %job.id, "scheduler: failed to record run: {}", e);
|
|
}
|
|
|
|
if let Err(e) = store::SchedulerStore::set_last_status(&self.pool, &job.id, "ok", None).await {
|
|
tracing::error!(job_id = %job.id, "scheduler: failed to set last_status: {}", e);
|
|
}
|
|
|
|
tracing::info!(
|
|
job_id = %job.id,
|
|
duration_ms = %duration_ms,
|
|
"scheduler: job completed successfully"
|
|
);
|
|
}
|
|
Ok(HandleResult::CommandOutput(output)) => {
|
|
// Cron jobs shouldn't trigger commands, but handle gracefully
|
|
let run = JobRun {
|
|
id: 0,
|
|
job_id: job.id.clone(),
|
|
started_at,
|
|
finished_at,
|
|
status: "ok".to_string(),
|
|
output: Some(output),
|
|
error: None,
|
|
duration_ms,
|
|
};
|
|
|
|
let _ = store::SchedulerStore::record_run(&self.pool, &run).await;
|
|
}
|
|
Err(e) => {
|
|
let error_str = e.to_string();
|
|
let run = JobRun {
|
|
id: 0,
|
|
job_id: job.id.clone(),
|
|
started_at,
|
|
finished_at,
|
|
status: "error".to_string(),
|
|
output: None,
|
|
error: Some(error_str.clone()),
|
|
duration_ms,
|
|
};
|
|
|
|
if let Err(e2) = store::SchedulerStore::record_run(&self.pool, &run).await {
|
|
tracing::error!(job_id = %job.id, "scheduler: failed to record error run: {}", e2);
|
|
}
|
|
|
|
if let Err(e2) = store::SchedulerStore::set_last_status(
|
|
&self.pool, &job.id, "error", Some(&error_str),
|
|
).await {
|
|
tracing::error!(job_id = %job.id, "scheduler: failed to set error status: {}", e2);
|
|
}
|
|
|
|
tracing::error!(
|
|
job_id = %job.id,
|
|
duration_ms = %duration_ms,
|
|
error = %error_str,
|
|
"scheduler: job failed"
|
|
);
|
|
}
|
|
}
|
|
|
|
// Reschedule the job
|
|
if let Err(e) = self.reschedule_after_run(job).await {
|
|
tracing::error!(job_id = %job.id, "scheduler: failed to reschedule: {}", e);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
/// After a job runs, compute its next execution time or disable/delete it.
|
|
async fn reschedule_after_run(
|
|
&self,
|
|
job: &ScheduledJob,
|
|
) -> Result<(), Box<dyn std::error::Error>> {
|
|
let now = now_ms();
|
|
|
|
match &job.schedule {
|
|
Schedule::At { .. } => {
|
|
if job.delete_after_run {
|
|
store::SchedulerStore::remove_job(&self.pool, &job.id).await?;
|
|
tracing::info!(job_id = %job.id, "scheduler: one-shot job deleted after run");
|
|
} else {
|
|
store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?;
|
|
tracing::info!(job_id = %job.id, "scheduler: one-shot job disabled after run");
|
|
}
|
|
}
|
|
Schedule::Every { .. } | Schedule::Cron { .. } => {
|
|
if let Some(next) = next_run_for_schedule(&job.schedule, now) {
|
|
store::SchedulerStore::set_next_run(&self.pool, &job.id, next).await?;
|
|
tracing::info!(job_id = %job.id, next_run_at = %next, "scheduler: job rescheduled");
|
|
} else {
|
|
tracing::error!(job_id = %job.id, "scheduler: could not compute next run — disabling job");
|
|
store::SchedulerStore::set_enabled(&self.pool, &job.id, false).await?;
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `cargo test --lib scheduler::mod -- 2>&1`
|
|
Expected: All 5 tests PASS (next_run_at_schedule, next_run_every_schedule, next_run_cron_schedule, next_run_cron_every_day_at_9am, and the compile-time assertion).
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/scheduler/mod.rs
|
|
git commit -m "feat: add Scheduler run loop and next_run_for_schedule"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 8: Wire Scheduler into Gateway
|
|
|
|
**Files:**
|
|
- Modify: `src/gateway/mod.rs`
|
|
|
|
- [ ] **Step 1: Import scheduler module**
|
|
|
|
In `src/gateway/mod.rs`, add import after `use crate::session::SessionManager;` (line 13):
|
|
|
|
```rust
|
|
use crate::scheduler::Scheduler;
|
|
use crate::scheduler::store as scheduler_store;
|
|
```
|
|
|
|
- [ ] **Step 2: Create scheduler in `GatewayState::new()`**
|
|
|
|
In `GatewayState::new()` (after line 76 — after `session_manager.register_outbound_tool(...)`), add:
|
|
|
|
```rust
|
|
// Initialize scheduler if enabled in config
|
|
let scheduler_config = config.gateway.scheduler.clone().unwrap_or_default();
|
|
if scheduler_config.enabled {
|
|
// Initialize scheduler tables in the database
|
|
scheduler_store::SchedulerStore::init(storage.pool())
|
|
.await
|
|
.map_err(|e| format!("failed to initialize scheduler store: {}", e))?;
|
|
tracing::info!("Scheduler store initialized");
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3: Spawn scheduler in `start_message_processing()`**
|
|
|
|
In `start_message_processing()` (after line 170 — after the outbound dispatcher spawn), add:
|
|
|
|
```rust
|
|
// Spawn scheduler background task if enabled
|
|
let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default();
|
|
if scheduler_config.enabled {
|
|
let sched = Arc::new(Scheduler::new(
|
|
storage.pool().clone(),
|
|
self.session_manager.clone(),
|
|
scheduler_config,
|
|
));
|
|
tokio::spawn(async move {
|
|
sched.run().await;
|
|
});
|
|
tracing::info!("Scheduler background task spawned");
|
|
}
|
|
```
|
|
|
|
Wait — there's a problem. `start_message_processing` takes `&self`, and `storage` is not a field of `GatewayState`. We need to pass the pool reference to `start_message_processing`.
|
|
|
|
Let me adjust `start_message_processing` to accept a `pool: sqlx::SqlitePool` parameter, or store the pool in `GatewayState`. Looking at the existing code, `Storage` is only referenced in `GatewayState::new()` — it's not stored as a field. We need the pool.
|
|
|
|
**Solution:** Store `sqlx::SqlitePool` directly in `GatewayState`.
|
|
|
|
- [ ] **Step 3a: Add `pool` field to `GatewayState`**
|
|
|
|
```rust
|
|
pub struct GatewayState {
|
|
pub config: Config,
|
|
pub workspace_dir: std::path::PathBuf,
|
|
pub session_manager: Arc<SessionManager>,
|
|
pub channel_manager: ChannelManager,
|
|
pub pool: sqlx::SqlitePool, // <-- add this
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 3b: Set pool in `GatewayState::new()`**
|
|
|
|
After creating storage (line 53), clone the pool:
|
|
|
|
```rust
|
|
let pool = storage.pool().clone();
|
|
```
|
|
|
|
Then in the `Ok(Self { ... })` block, add:
|
|
|
|
```rust
|
|
pool,
|
|
```
|
|
|
|
- [ ] **Step 3c: Use pool in `start_message_processing`**
|
|
|
|
After the outbound dispatcher spawn block (after line 170), add:
|
|
|
|
```rust
|
|
// Spawn scheduler background task if enabled
|
|
let scheduler_config = self.config.gateway.scheduler.clone().unwrap_or_default();
|
|
if scheduler_config.enabled {
|
|
let sched = Arc::new(Scheduler::new(
|
|
self.pool.clone(),
|
|
self.session_manager.clone(),
|
|
scheduler_config,
|
|
));
|
|
tokio::spawn(async move {
|
|
sched.run().await;
|
|
});
|
|
tracing::info!("Scheduler background task spawned");
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Verify build**
|
|
|
|
Run: `cargo check 2>&1`
|
|
Expected: Compiles successfully.
|
|
|
|
- [ ] **Step 5: Commit**
|
|
|
|
```bash
|
|
git add src/gateway/mod.rs
|
|
git commit -m "feat: wire scheduler into GatewayState startup and message processing"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 9: Implement Agent Tools
|
|
|
|
**Files:**
|
|
- Create: `src/scheduler/tools.rs`
|
|
- Modify: `src/gateway/mod.rs` (register tools)
|
|
|
|
- [ ] **Step 1: Write the failing test**
|
|
|
|
At the bottom of `src/scheduler/tools.rs`, add:
|
|
|
|
```rust
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use crate::scheduler::types::{Schedule, ScheduledJob};
|
|
use crate::scheduler::store::SchedulerStore;
|
|
use serde_json::json;
|
|
use sqlx::SqlitePool;
|
|
|
|
async fn setup_pool() -> SqlitePool {
|
|
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
|
|
SchedulerStore::init(&pool).await.unwrap();
|
|
pool
|
|
}
|
|
|
|
fn now() -> i64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap()
|
|
.as_millis() as i64
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cron_add_tool() {
|
|
let pool = setup_pool().await;
|
|
let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]);
|
|
let result = tool.execute(json!({
|
|
"schedule": {"type": "every", "every_ms": 3600000},
|
|
"prompt": "report status",
|
|
"channel": "cli_chat",
|
|
"chat_id": "test-chat-1",
|
|
"name": "hourly report"
|
|
})).await.unwrap();
|
|
assert!(result.success);
|
|
assert!(result.output.contains("hourly report"));
|
|
|
|
let jobs = SchedulerStore::list_jobs(&pool).await.unwrap();
|
|
assert_eq!(jobs.len(), 1);
|
|
assert_eq!(jobs[0].name, "hourly report");
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cron_add_invalid_channel() {
|
|
let pool = setup_pool().await;
|
|
let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]);
|
|
let result = tool.execute(json!({
|
|
"schedule": {"type": "every", "every_ms": 3600000},
|
|
"prompt": "test",
|
|
"channel": "nonexistent",
|
|
"chat_id": "x",
|
|
"name": "test"
|
|
})).await.unwrap();
|
|
assert!(!result.success);
|
|
assert!(result.error.as_ref().unwrap().contains("Unknown channel"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cron_list_tool() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
let job = ScheduledJob {
|
|
id: uuid::Uuid::new_v4().to_string(),
|
|
name: "list-test".into(),
|
|
schedule: Schedule::Every { every_ms: 1000 },
|
|
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
|
|
model: None, enabled: true, delete_after_run: false,
|
|
next_run_at: t + 1000, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
};
|
|
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
|
|
|
let tool = CronListTool::new(pool.clone());
|
|
let result = tool.execute(json!({})).await.unwrap();
|
|
assert!(result.success);
|
|
assert!(result.output.contains("list-test"));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cron_remove_tool() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
let job = ScheduledJob {
|
|
id: "job-rm-tool".into(), name: "rm me".into(),
|
|
schedule: Schedule::Every { every_ms: 1000 },
|
|
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
|
|
model: None, enabled: true, delete_after_run: false,
|
|
next_run_at: t, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
};
|
|
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
|
|
|
let tool = CronRemoveTool::new(pool.clone());
|
|
let result = tool.execute(json!({"job_id": "job-rm-tool"})).await.unwrap();
|
|
assert!(result.success);
|
|
assert!(SchedulerStore::get_job(&pool, "job-rm-tool").await.is_err());
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cron_enable_disable_tools() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
let job = ScheduledJob {
|
|
id: "job-toggle-tool".into(), name: "toggle".into(),
|
|
schedule: Schedule::Every { every_ms: 1000 },
|
|
prompt: "hi".into(), channel: "cli_chat".into(), chat_id: "c".into(),
|
|
model: None, enabled: true, delete_after_run: false,
|
|
next_run_at: t, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
};
|
|
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
|
|
|
let disable_tool = CronDisableTool::new(pool.clone());
|
|
let result = disable_tool.execute(json!({"job_id": "job-toggle-tool"})).await.unwrap();
|
|
assert!(result.success);
|
|
|
|
let got = SchedulerStore::get_job(&pool, "job-toggle-tool").await.unwrap();
|
|
assert!(!got.enabled);
|
|
|
|
let enable_tool = CronEnableTool::new(pool.clone());
|
|
let result = enable_tool.execute(json!({"job_id": "job-toggle-tool"})).await.unwrap();
|
|
assert!(result.success);
|
|
|
|
let got = SchedulerStore::get_job(&pool, "job-toggle-tool").await.unwrap();
|
|
assert!(got.enabled);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn test_cron_update_tool() {
|
|
let pool = setup_pool().await;
|
|
let t = now();
|
|
let job = ScheduledJob {
|
|
id: "job-update-tool".into(), name: "old".into(),
|
|
schedule: Schedule::Every { every_ms: 3600000 },
|
|
prompt: "old prompt".into(), channel: "feishu".into(),
|
|
chat_id: "oc_1".into(), model: None,
|
|
enabled: true, delete_after_run: false,
|
|
next_run_at: t + 1000, last_run_at: None,
|
|
last_status: None, last_error: None,
|
|
created_at: t, updated_at: t,
|
|
};
|
|
SchedulerStore::add_job(&pool, &job).await.unwrap();
|
|
|
|
let tool = CronUpdateTool::new(pool.clone());
|
|
let result = tool.execute(json!({
|
|
"job_id": "job-update-tool",
|
|
"prompt": "new prompt",
|
|
"schedule": {"type": "every", "every_ms": 60000}
|
|
})).await.unwrap();
|
|
assert!(result.success);
|
|
|
|
let got = SchedulerStore::get_job(&pool, "job-update-tool").await.unwrap();
|
|
assert_eq!(got.prompt, "new prompt");
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Run tests to verify they fail**
|
|
|
|
Run: `cargo test --lib scheduler::tools -- 2>&1`
|
|
Expected: FAIL — tool structs not defined.
|
|
|
|
- [ ] **Step 3: Implement `src/scheduler/tools.rs`**
|
|
|
|
```rust
|
|
use async_trait::async_trait;
|
|
use serde_json::{json, Value};
|
|
use sqlx::SqlitePool;
|
|
use uuid::Uuid;
|
|
|
|
use crate::scheduler::store::SchedulerStore;
|
|
use crate::scheduler::types::{Schedule, ScheduledJob};
|
|
use crate::tools::traits::{Tool, ToolResult};
|
|
use crate::scheduler::next_run_for_schedule;
|
|
|
|
fn now_ms() -> i64 {
|
|
std::time::SystemTime::now()
|
|
.duration_since(std::time::UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_millis() as i64
|
|
}
|
|
|
|
// ── CronAddTool ──────────────────────────────────────────────────────────────
|
|
|
|
pub struct CronAddTool {
|
|
pool: SqlitePool,
|
|
valid_channels: Vec<String>,
|
|
}
|
|
|
|
impl CronAddTool {
|
|
pub fn new(pool: SqlitePool, valid_channels: Vec<String>) -> Self {
|
|
Self { pool, valid_channels }
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Tool for CronAddTool {
|
|
fn name(&self) -> &str { "cron_add" }
|
|
|
|
fn description(&self) -> &str {
|
|
"Create a new scheduled task (cron job). The task will execute an AI prompt on a schedule \
|
|
and deliver the result to the specified channel/chat. \
|
|
Schedule formats: \
|
|
- 'every': {\"type\":\"every\",\"every_ms\":3600000} for every hour, \
|
|
- 'at': {\"type\":\"at\",\"at\":<unix_timestamp_ms>} for one-shot, \
|
|
- 'cron': {\"type\":\"cron\",\"expr\":\"0 0 9 * * *\"} for cron expressions (6-field: sec min hour dom month dow)."
|
|
}
|
|
|
|
fn parameters_schema(&self) -> Value {
|
|
json!({
|
|
"type": "object",
|
|
"properties": {
|
|
"schedule": {
|
|
"type": "object",
|
|
"description": "Schedule definition. One of: {\"type\":\"every\",\"every_ms\":<ms>}, {\"type\":\"at\",\"at\":<unix_ms>}, or {\"type\":\"cron\",\"expr\":\"<cron_expr>\",\"tz\":\"<tz>\"}",
|
|
"required": ["type"]
|
|
},
|
|
"prompt": {
|
|
"type": "string",
|
|
"description": "The AI prompt to execute on each trigger"
|
|
},
|
|
"channel": {
|
|
"type": "string",
|
|
"description": "Target channel for delivering results (e.g., 'feishu', 'cli_chat')"
|
|
},
|
|
"chat_id": {
|
|
"type": "string",
|
|
"description": "Target chat ID within the channel"
|
|
},
|
|
"name": {
|
|
"type": "string",
|
|
"description": "Human-readable name for the job (optional, defaults to truncated prompt)"
|
|
},
|
|
"model": {
|
|
"type": "string",
|
|
"description": "Optional model override for this job"
|
|
}
|
|
},
|
|
"required": ["schedule", "prompt", "channel", "chat_id"]
|
|
})
|
|
}
|
|
|
|
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
|
|
let schedule_json = args.get("schedule").ok_or_else(|| anyhow::anyhow!("missing 'schedule'"))?;
|
|
let schedule: Schedule = serde_json::from_value(schedule_json.clone())
|
|
.map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?;
|
|
|
|
let prompt = args.get("prompt").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
|
if prompt.is_empty() {
|
|
return Ok(ToolResult { success: false, output: String::new(), error: Some("prompt is required".into()) });
|
|
}
|
|
|
|
let channel = args.get("channel").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
|
if !self.valid_channels.contains(&channel) {
|
|
return Ok(ToolResult {
|
|
success: false,
|
|
output: format!("Unknown channel '{}'. Available: {}",
|
|
channel, self.valid_channels.join(", ")),
|
|
error: Some(format!("Unknown channel: {}", channel)),
|
|
});
|
|
}
|
|
|
|
let chat_id = args.get("chat_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
|
if chat_id.is_empty() {
|
|
return Ok(ToolResult { success: false, output: String::new(), error: Some("chat_id is required".into()) });
|
|
}
|
|
|
|
let name = args.get("name").and_then(|v| v.as_str()).unwrap_or(&prompt[..prompt.len().min(50)]).to_string();
|
|
let model = args.get("model").and_then(|v| v.as_str()).map(|s| s.to_string());
|
|
|
|
let now = now_ms();
|
|
let next_run_at = next_run_for_schedule(&schedule, now)
|
|
.ok_or_else(|| anyhow::anyhow!("could not compute next run time from schedule"))?;
|
|
|
|
let id = Uuid::new_v4().to_string();
|
|
let job = ScheduledJob {
|
|
id: id.clone(),
|
|
name: name.clone(),
|
|
schedule,
|
|
prompt,
|
|
channel,
|
|
chat_id,
|
|
model,
|
|
enabled: true,
|
|
delete_after_run: false,
|
|
next_run_at,
|
|
last_run_at: None,
|
|
last_status: None,
|
|
last_error: None,
|
|
created_at: now,
|
|
updated_at: now,
|
|
};
|
|
|
|
SchedulerStore::add_job(&self.pool, &job).await?;
|
|
|
|
Ok(ToolResult {
|
|
success: true,
|
|
output: format!("Scheduled job created: id={}, name=\"{}\", next_run_at={}", id, name, next_run_at),
|
|
error: None,
|
|
})
|
|
}
|
|
}
|
|
|
|
// ── CronListTool ─────────────────────────────────────────────────────────────
|
|
|
|
pub struct CronListTool {
|
|
pool: SqlitePool,
|
|
}
|
|
|
|
impl CronListTool {
|
|
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Tool for CronListTool {
|
|
fn name(&self) -> &str { "cron_list" }
|
|
|
|
fn description(&self) -> &str {
|
|
"List all scheduled tasks (cron jobs) with their status and next run time."
|
|
}
|
|
|
|
fn read_only(&self) -> bool { true }
|
|
|
|
fn parameters_schema(&self) -> Value {
|
|
json!({
|
|
"type": "object",
|
|
"properties": {
|
|
"status": {
|
|
"type": "string",
|
|
"enum": ["all", "enabled", "disabled"],
|
|
"description": "Filter by job status (default: all)"
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
|
|
let filter = args.get("status").and_then(|v| v.as_str()).unwrap_or("all");
|
|
let jobs = SchedulerStore::list_jobs(&self.pool).await?;
|
|
|
|
let filtered: Vec<&ScheduledJob> = match filter {
|
|
"enabled" => jobs.iter().filter(|j| j.enabled).collect(),
|
|
"disabled" => jobs.iter().filter(|j| !j.enabled).collect(),
|
|
_ => jobs.iter().collect(),
|
|
};
|
|
|
|
if filtered.is_empty() {
|
|
return Ok(ToolResult { success: true, output: "No scheduled jobs found.".into(), error: None });
|
|
}
|
|
|
|
let mut lines = Vec::new();
|
|
for j in &filtered {
|
|
let status = if j.enabled { "🟢" } else { "⚫" };
|
|
let last = match (&j.last_status, &j.last_error) {
|
|
(Some(s), _) if s == "ok" => " last:✅".to_string(),
|
|
(Some(_), Some(e)) => format!(" last:❌({})", &e[..e.len().min(40)]),
|
|
_ => String::new(),
|
|
};
|
|
let model = j.model.as_deref().unwrap_or("default");
|
|
lines.push(format!(
|
|
"{} id={} name=\"{}\" channel={} chat={} model={} next={}{}",
|
|
status, j.id, j.name, j.channel, j.chat_id, model, j.next_run_at, last
|
|
));
|
|
}
|
|
|
|
Ok(ToolResult { success: true, output: lines.join("\n"), error: None })
|
|
}
|
|
}
|
|
|
|
// ── CronRemoveTool ───────────────────────────────────────────────────────────
|
|
|
|
pub struct CronRemoveTool {
|
|
pool: SqlitePool,
|
|
}
|
|
|
|
impl CronRemoveTool {
|
|
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Tool for CronRemoveTool {
|
|
fn name(&self) -> &str { "cron_remove" }
|
|
|
|
fn description(&self) -> &str {
|
|
"Delete a scheduled task permanently by its job ID. Use cron_list first to find the ID."
|
|
}
|
|
|
|
fn parameters_schema(&self) -> Value {
|
|
json!({
|
|
"type": "object",
|
|
"properties": {
|
|
"job_id": {
|
|
"type": "string",
|
|
"description": "The ID of the job to delete"
|
|
}
|
|
},
|
|
"required": ["job_id"]
|
|
})
|
|
}
|
|
|
|
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
|
|
let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
|
if job_id.is_empty() {
|
|
return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
|
|
}
|
|
|
|
// Verify job exists
|
|
match SchedulerStore::get_job(&self.pool, &job_id).await {
|
|
Ok(_) => {},
|
|
Err(_) => return Ok(ToolResult { success: false, output: format!("Job {} not found.", job_id), error: Some("not found".into()) }),
|
|
}
|
|
|
|
SchedulerStore::remove_job(&self.pool, &job_id).await?;
|
|
Ok(ToolResult { success: true, output: format!("Job {} deleted.", job_id), error: None })
|
|
}
|
|
}
|
|
|
|
// ── CronEnableTool ───────────────────────────────────────────────────────────
|
|
|
|
pub struct CronEnableTool {
|
|
pool: SqlitePool,
|
|
}
|
|
|
|
impl CronEnableTool {
|
|
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Tool for CronEnableTool {
|
|
fn name(&self) -> &str { "cron_enable" }
|
|
|
|
fn description(&self) -> &str { "Enable a disabled scheduled task by its job ID." }
|
|
|
|
fn parameters_schema(&self) -> Value {
|
|
json!({
|
|
"type": "object",
|
|
"properties": {
|
|
"job_id": {
|
|
"type": "string",
|
|
"description": "The ID of the job to enable"
|
|
}
|
|
},
|
|
"required": ["job_id"]
|
|
})
|
|
}
|
|
|
|
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
|
|
let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
|
if job_id.is_empty() {
|
|
return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
|
|
}
|
|
|
|
let job = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?;
|
|
|
|
let next = next_run_for_schedule(&job.schedule, now_ms());
|
|
SchedulerStore::set_enabled(&self.pool, &job_id, true).await?;
|
|
if let Some(n) = next {
|
|
SchedulerStore::set_next_run(&self.pool, &job_id, n).await?;
|
|
}
|
|
|
|
Ok(ToolResult { success: true, output: format!("Job {} enabled.", job_id), error: None })
|
|
}
|
|
}
|
|
|
|
// ── CronDisableTool ──────────────────────────────────────────────────────────
|
|
|
|
pub struct CronDisableTool {
|
|
pool: SqlitePool,
|
|
}
|
|
|
|
impl CronDisableTool {
|
|
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Tool for CronDisableTool {
|
|
fn name(&self) -> &str { "cron_disable" }
|
|
|
|
fn description(&self) -> &str { "Disable a scheduled task by its job ID without deleting it." }
|
|
|
|
fn parameters_schema(&self) -> Value {
|
|
json!({
|
|
"type": "object",
|
|
"properties": {
|
|
"job_id": {
|
|
"type": "string",
|
|
"description": "The ID of the job to disable"
|
|
}
|
|
},
|
|
"required": ["job_id"]
|
|
})
|
|
}
|
|
|
|
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
|
|
let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
|
if job_id.is_empty() {
|
|
return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
|
|
}
|
|
|
|
let _ = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?;
|
|
SchedulerStore::set_enabled(&self.pool, &job_id, false).await?;
|
|
|
|
Ok(ToolResult { success: true, output: format!("Job {} disabled.", job_id), error: None })
|
|
}
|
|
}
|
|
|
|
// ── CronUpdateTool ───────────────────────────────────────────────────────────
|
|
|
|
pub struct CronUpdateTool {
|
|
pool: SqlitePool,
|
|
}
|
|
|
|
impl CronUpdateTool {
|
|
pub fn new(pool: SqlitePool) -> Self { Self { pool } }
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Tool for CronUpdateTool {
|
|
fn name(&self) -> &str { "cron_update" }
|
|
|
|
fn description(&self) -> &str {
|
|
"Update fields of an existing scheduled task. Only specified fields are changed."
|
|
}
|
|
|
|
fn parameters_schema(&self) -> Value {
|
|
json!({
|
|
"type": "object",
|
|
"properties": {
|
|
"job_id": {
|
|
"type": "string",
|
|
"description": "The ID of the job to update"
|
|
},
|
|
"prompt": {
|
|
"type": "string",
|
|
"description": "New AI prompt"
|
|
},
|
|
"schedule": {
|
|
"type": "object",
|
|
"description": "New schedule definition"
|
|
},
|
|
"channel": {
|
|
"type": "string",
|
|
"description": "New target channel"
|
|
},
|
|
"chat_id": {
|
|
"type": "string",
|
|
"description": "New target chat ID"
|
|
},
|
|
"model": {
|
|
"type": "string",
|
|
"description": "New model override"
|
|
}
|
|
},
|
|
"required": ["job_id"]
|
|
})
|
|
}
|
|
|
|
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
|
|
let job_id = args.get("job_id").and_then(|v| v.as_str()).unwrap_or("").to_string();
|
|
if job_id.is_empty() {
|
|
return Ok(ToolResult { success: false, output: String::new(), error: Some("job_id is required".into()) });
|
|
}
|
|
|
|
let _ = SchedulerStore::get_job(&self.pool, &job_id).await.map_err(|_| anyhow::anyhow!("Job {} not found.", job_id))?;
|
|
|
|
let prompt = args.get("prompt").and_then(|v| v.as_str()).map(|s| s.to_string());
|
|
let schedule: Option<Schedule> = match args.get("schedule") {
|
|
Some(s) => Some(serde_json::from_value(s.clone()).map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?),
|
|
None => None,
|
|
};
|
|
let channel = args.get("channel").and_then(|v| v.as_str()).map(|s| s.to_string());
|
|
let chat_id = args.get("chat_id").and_then(|v| v.as_str()).map(|s| s.to_string());
|
|
let model = args.get("model").and_then(|v| v.as_str()).map(|s| s.to_string());
|
|
|
|
SchedulerStore::update_job(&self.pool, &job_id, prompt, schedule, channel, chat_id, model).await?;
|
|
|
|
// If schedule changed, recompute next_run_at
|
|
if args.get("schedule").is_some() {
|
|
let job = SchedulerStore::get_job(&self.pool, &job_id).await?;
|
|
if let Some(next) = next_run_for_schedule(&job.schedule, now_ms()) {
|
|
SchedulerStore::set_next_run(&self.pool, &job_id, next).await?;
|
|
}
|
|
}
|
|
|
|
Ok(ToolResult { success: true, output: format!("Job {} updated.", job_id), error: None })
|
|
}
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 4: Run tests to verify they pass**
|
|
|
|
Run: `cargo test --lib scheduler::tools -- 2>&1`
|
|
Expected: All 6 tests PASS.
|
|
|
|
- [ ] **Step 5: Register tools in gateway**
|
|
|
|
In `src/gateway/mod.rs`, after `session_manager.register_outbound_tool(available_channels);` (line 76), add:
|
|
|
|
```rust
|
|
// Register cron tools if scheduler is enabled
|
|
if config.gateway.scheduler.as_ref().map_or(true, |c| c.enabled) {
|
|
let scheduler_pool = pool.clone();
|
|
let valid_channels = available_channels.clone();
|
|
session_manager.tools().register(
|
|
crate::scheduler::tools::CronAddTool::new(scheduler_pool.clone(), valid_channels)
|
|
);
|
|
session_manager.tools().register(
|
|
crate::scheduler::tools::CronListTool::new(scheduler_pool.clone())
|
|
);
|
|
session_manager.tools().register(
|
|
crate::scheduler::tools::CronRemoveTool::new(scheduler_pool.clone())
|
|
);
|
|
session_manager.tools().register(
|
|
crate::scheduler::tools::CronEnableTool::new(scheduler_pool.clone())
|
|
);
|
|
session_manager.tools().register(
|
|
crate::scheduler::tools::CronDisableTool::new(scheduler_pool.clone())
|
|
);
|
|
session_manager.tools().register(
|
|
crate::scheduler::tools::CronUpdateTool::new(scheduler_pool.clone())
|
|
);
|
|
tracing::info!("Cron tools registered");
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 6: Verify build**
|
|
|
|
Run: `cargo check 2>&1`
|
|
Expected: Compiles successfully.
|
|
|
|
- [ ] **Step 7: Commit**
|
|
|
|
```bash
|
|
git add src/scheduler/tools.rs src/gateway/mod.rs
|
|
git commit -m "feat: add 6 cron agent tools (add/list/remove/enable/disable/update)"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 10: Full Build, Lint, and Unit Tests
|
|
|
|
**Files:**
|
|
- All above
|
|
|
|
- [ ] **Step 1: Run full check**
|
|
|
|
Run: `cargo check 2>&1`
|
|
Expected: Compiles successfully, no errors, no warnings.
|
|
|
|
- [ ] **Step 2: Run cargo clippy**
|
|
|
|
Run: `cargo clippy -- -D warnings 2>&1`
|
|
Expected: No warnings emitted.
|
|
|
|
- [ ] **Step 3: Run all unit tests**
|
|
|
|
Run: `cargo test --lib 2>&1`
|
|
Expected: All tests PASS, including existing tests and the 18 new scheduler tests.
|
|
|
|
- [ ] **Step 4: Commit**
|
|
|
|
```bash
|
|
git add --all
|
|
git commit -m "feat: complete scheduled tasks implementation"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 11: Integration Test
|
|
|
|
**Files:**
|
|
- Modify: `tests/test_integration.rs` (or create `tests/test_scheduler.rs`)
|
|
|
|
- [ ] **Step 1: Write integration test**
|
|
|
|
Create `tests/test_scheduler.rs`:
|
|
|
|
```rust
|
|
//! Integration tests for the scheduled tasks (cron) system.
|
|
//! Requires `.env` with real API keys and a running Gateway.
|
|
//! Run with: cargo test --test test_scheduler -- --ignored
|
|
|
|
use serde_json::json;
|
|
|
|
/// This test verifies that the scheduler module compiles and its types are
|
|
/// accessible. Full integration testing requires a running Gateway instance
|
|
/// with API keys, so the actual job-execution flow is tested there.
|
|
#[tokio::test]
|
|
async fn test_scheduler_types_roundtrip() {
|
|
use picobot::scheduler::Schedule;
|
|
|
|
// Verify JSON (de)serialization works
|
|
let s1 = Schedule::Every { every_ms: 3600000 };
|
|
let json = serde_json::to_string(&s1).unwrap();
|
|
let s2: Schedule = serde_json::from_str(&json).unwrap();
|
|
match s2 {
|
|
Schedule::Every { every_ms } => assert_eq!(every_ms, 3600000),
|
|
_ => panic!("expected Every"),
|
|
}
|
|
|
|
let s1 = Schedule::At { at: 1000000 };
|
|
let json = serde_json::to_string(&s1).unwrap();
|
|
let s2: Schedule = serde_json::from_str(&json).unwrap();
|
|
match s2 {
|
|
Schedule::At { at } => assert_eq!(at, 1000000),
|
|
_ => panic!("expected At"),
|
|
}
|
|
|
|
let s1 = Schedule::Cron { expr: "0 0 9 * * *".into(), tz: None };
|
|
let json = serde_json::to_string(&s1).unwrap();
|
|
let s2: Schedule = serde_json::from_str(&json).unwrap();
|
|
match s2 {
|
|
Schedule::Cron { expr, tz } => {
|
|
assert_eq!(expr, "0 0 9 * * *");
|
|
assert!(tz.is_none());
|
|
}
|
|
_ => panic!("expected Cron"),
|
|
}
|
|
}
|
|
|
|
/// Verify that next_run_for_schedule produces valid future timestamps.
|
|
#[test]
|
|
fn test_next_run_always_future() {
|
|
use picobot::scheduler::{next_run_for_schedule, Schedule};
|
|
|
|
let now = 1700000000000_i64; // Some fixed reference time
|
|
|
|
let schedules = vec![
|
|
Schedule::Every { every_ms: 60000 },
|
|
Schedule::Cron { expr: "0 0 9 * * *".into(), tz: None },
|
|
];
|
|
|
|
for s in &schedules {
|
|
let next = next_run_for_schedule(s, now);
|
|
assert!(next.is_some(), "expected next run for {:?}", s);
|
|
assert!(next.unwrap() > now, "next run should be after now for {:?}", s);
|
|
}
|
|
}
|
|
|
|
/// Verify that one-shot At schedule disables after run (logic tested in unit tests,
|
|
/// this just ensures the schedule round-trips correctly).
|
|
#[test]
|
|
fn test_at_schedule_is_one_shot_by_contract() {
|
|
use picobot::scheduler::Schedule;
|
|
// At schedules by definition fire once — the scheduler loop handles
|
|
// disabling/deleting after run. This test confirms the type is correct.
|
|
let s = Schedule::At { at: 1700000000000 };
|
|
let json = serde_json::to_string(&s).unwrap();
|
|
assert!(json.contains("\"at\""));
|
|
}
|
|
```
|
|
|
|
- [ ] **Step 2: Run integration test**
|
|
|
|
Run: `cargo test --test test_scheduler 2>&1`
|
|
Expected: All 3 tests PASS.
|
|
|
|
- [ ] **Step 3: Commit**
|
|
|
|
```bash
|
|
git add tests/test_scheduler.rs
|
|
git commit -m "test: add scheduler integration tests"
|
|
```
|
|
|
|
---
|
|
|
|
### Task 12: Final Verification
|
|
|
|
- [ ] **Step 1: Full test suite**
|
|
|
|
Run: `cargo test --lib 2>&1`
|
|
Expected: All tests PASS.
|
|
|
|
Run: `cargo test --test test_scheduler 2>&1`
|
|
Expected: All tests PASS.
|
|
|
|
- [ ] **Step 2: Build binary**
|
|
|
|
Run: `cargo build 2>&1`
|
|
Expected: Build succeeds with no errors.
|
|
|
|
- [ ] **Step 3: Grep for TODOs / placeholders**
|
|
|
|
Run: `grep -rn "TODO\|FIXME\|TBD\|todo!\|unimplemented!" src/scheduler/ 2>&1`
|
|
Expected: No output (no placeholders).
|
|
|
|
- [ ] **Step 4: Final commit (if any changes)**
|
|
|
|
```bash
|
|
git status
|
|
git add --all
|
|
git commit -m "chore: final cleanup for scheduled tasks"
|
|
```
|
|
```
|
|
|