diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index fe28eff..a171a26 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -78,6 +78,7 @@ impl GatewayState { // Register send_message tool with available channel names let available_channels = channel_manager.list_channel_names().await; + let valid_channels = available_channels.clone(); session_manager.register_outbound_tool(available_channels); // Initialize scheduler if enabled in config @@ -87,6 +88,28 @@ impl GatewayState { .await .map_err(|e| format!("failed to initialize scheduler store: {}", e))?; tracing::info!("Scheduler store initialized"); + + // Register cron tools + let scheduler_pool = pool.clone(); + session_manager.tools().register( + crate::scheduler::tools::CronAddTool::new(scheduler_pool.clone(), valid_channels), + ); + session_manager.tools().register( + crate::scheduler::tools::CronListTool::new(scheduler_pool.clone()), + ); + session_manager.tools().register( + crate::scheduler::tools::CronRemoveTool::new(scheduler_pool.clone()), + ); + session_manager.tools().register( + crate::scheduler::tools::CronEnableTool::new(scheduler_pool.clone()), + ); + session_manager.tools().register( + crate::scheduler::tools::CronDisableTool::new(scheduler_pool.clone()), + ); + session_manager.tools().register( + crate::scheduler::tools::CronUpdateTool::new(scheduler_pool.clone()), + ); + tracing::info!("Cron tools registered"); } Ok(Self { diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 12b82aa..ca43c89 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -221,7 +221,7 @@ impl Scheduler { async fn reschedule_after_run( &self, job: &ScheduledJob, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { let now = now_ms(); match &job.schedule { diff --git a/src/scheduler/store.rs b/src/scheduler/store.rs index c1635c6..b061c76 100644 --- a/src/scheduler/store.rs +++ b/src/scheduler/store.rs @@ -9,7 +9,7 @@ pub struct SchedulerStore; impl SchedulerStore { /// Initialize the scheduler tables. Idempotent (CREATE TABLE IF NOT EXISTS). - pub async fn init(pool: &SqlitePool) -> Result<(), Box> { + pub async fn init(pool: &SqlitePool) -> anyhow::Result<()> { sqlx::query( r#" CREATE TABLE IF NOT EXISTS scheduled_jobs ( @@ -68,7 +68,7 @@ impl SchedulerStore { pub async fn add_job( pool: &SqlitePool, job: &ScheduledJob, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { let schedule_json = serde_json::to_string(&job.schedule)?; sqlx::query( r#" @@ -103,19 +103,19 @@ impl SchedulerStore { pub async fn get_job( pool: &SqlitePool, id: &str, - ) -> Result> { + ) -> anyhow::Result { let row = sqlx::query("SELECT * FROM scheduled_jobs WHERE id = ?") .bind(id) .fetch_optional(pool) .await? - .ok_or_else(|| format!("job not found: {id}"))?; + .ok_or_else(|| anyhow::anyhow!("job not found: {id}"))?; Ok(row_to_job(&row)?) } /// List all jobs, ordered by next_run_at ascending. pub async fn list_jobs( pool: &SqlitePool, - ) -> Result, Box> { + ) -> anyhow::Result> { let rows = sqlx::query("SELECT * FROM scheduled_jobs ORDER BY next_run_at ASC") .fetch_all(pool) .await?; @@ -126,7 +126,7 @@ impl SchedulerStore { pub async fn remove_job( pool: &SqlitePool, id: &str, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { sqlx::query("DELETE FROM scheduled_jobs WHERE id = ?") .bind(id) .execute(pool) @@ -139,7 +139,7 @@ impl SchedulerStore { pool: &SqlitePool, id: &str, enabled: bool, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { sqlx::query("UPDATE scheduled_jobs SET enabled = ?, updated_at = ? WHERE id = ?") .bind(enabled as i32) .bind(now_ms()) @@ -158,7 +158,7 @@ impl SchedulerStore { channel: Option, chat_id: Option, model: Option, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { let now = now_ms(); if let Some(p) = prompt { @@ -220,7 +220,7 @@ impl SchedulerStore { pool: &SqlitePool, id: &str, next_run_at: i64, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { let now = now_ms(); sqlx::query( "UPDATE scheduled_jobs SET next_run_at = ?, last_run_at = ?, updated_at = ? WHERE id = ?", @@ -239,7 +239,7 @@ impl SchedulerStore { pool: &SqlitePool, id: &str, at: i64, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { sqlx::query( "UPDATE scheduled_jobs SET last_run_at = ?, updated_at = ? WHERE id = ?", ) @@ -257,7 +257,7 @@ impl SchedulerStore { id: &str, status: &str, error: Option<&str>, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { let now = now_ms(); sqlx::query( "UPDATE scheduled_jobs SET last_status = ?, last_error = ?, updated_at = ? WHERE id = ?", @@ -276,7 +276,7 @@ impl SchedulerStore { pool: &SqlitePool, now: i64, limit: usize, - ) -> Result, Box> { + ) -> anyhow::Result> { let rows = sqlx::query( "SELECT * FROM scheduled_jobs WHERE enabled = 1 AND next_run_at <= ? ORDER BY next_run_at ASC LIMIT ?", ) @@ -291,7 +291,7 @@ impl SchedulerStore { pub async fn record_run( pool: &SqlitePool, run: &JobRun, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { sqlx::query( r#" INSERT INTO job_runs (job_id, started_at, finished_at, status, output, error, duration_ms) @@ -315,7 +315,7 @@ impl SchedulerStore { pool: &SqlitePool, job_id: &str, limit: usize, - ) -> Result, Box> { + ) -> anyhow::Result> { let rows = sqlx::query( "SELECT * FROM job_runs WHERE job_id = ? ORDER BY finished_at DESC LIMIT ?", ) @@ -343,7 +343,7 @@ impl SchedulerStore { pub async fn cleanup_disabled( pool: &SqlitePool, before: i64, - ) -> Result<(), Box> { + ) -> anyhow::Result<()> { sqlx::query( "DELETE FROM scheduled_jobs WHERE enabled = 0 AND updated_at < ?", ) @@ -361,7 +361,7 @@ fn now_ms() -> i64 { .as_millis() as i64 } -fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> Result> { +fn row_to_job(row: &sqlx::sqlite::SqliteRow) -> anyhow::Result { let schedule_json: String = row.try_get("schedule")?; let schedule: Schedule = serde_json::from_str(&schedule_json)?; Ok(ScheduledJob { diff --git a/src/scheduler/tools.rs b/src/scheduler/tools.rs index 33ee302..93a8f74 100644 --- a/src/scheduler/tools.rs +++ b/src/scheduler/tools.rs @@ -1 +1,788 @@ -// Stub — will be filled in Task 9 +use async_trait::async_trait; +use serde_json::{json, Value}; +use sqlx::SqlitePool; +use uuid::Uuid; + +use crate::scheduler::next_run_for_schedule; +use crate::scheduler::store::SchedulerStore; +use crate::scheduler::types::{Schedule, ScheduledJob}; +use crate::tools::traits::{Tool, ToolResult}; + +fn now_ms() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64 +} + +// ── CronAddTool ────────────────────────────────────────────────────────────── + +pub struct CronAddTool { + pool: SqlitePool, + valid_channels: Vec, +} + +impl CronAddTool { + pub fn new(pool: SqlitePool, valid_channels: Vec) -> Self { + Self { + pool, + valid_channels, + } + } +} + +#[async_trait] +impl Tool for CronAddTool { + fn name(&self) -> &str { + "cron_add" + } + + fn description(&self) -> &str { + "Create a new scheduled task (cron job). The task will execute an AI prompt on a schedule \ + and deliver the result to the specified channel/chat. \ + Schedule formats: \ + - 'every': {\"type\":\"every\",\"every_ms\":3600000} for every hour, \ + - 'at': {\"type\":\"at\",\"at\":} for one-shot, \ + - 'cron': {\"type\":\"cron\",\"expr\":\"0 0 9 * * *\"} for cron expressions (6-field: sec min hour dom month dow)." + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "schedule": { + "type": "object", + "description": "Schedule definition. One of: {\"type\":\"every\",\"every_ms\":}, {\"type\":\"at\",\"at\":}, or {\"type\":\"cron\",\"expr\":\"\",\"tz\":\"\"}", + "required": ["type"] + }, + "prompt": { + "type": "string", + "description": "The AI prompt to execute on each trigger" + }, + "channel": { + "type": "string", + "description": "Target channel for delivering results (e.g., 'feishu', 'cli_chat')" + }, + "chat_id": { + "type": "string", + "description": "Target chat ID within the channel" + }, + "name": { + "type": "string", + "description": "Human-readable name for the job (optional, defaults to truncated prompt)" + }, + "model": { + "type": "string", + "description": "Optional model override for this job" + } + }, + "required": ["schedule", "prompt", "channel", "chat_id"] + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let schedule_json = args + .get("schedule") + .ok_or_else(|| anyhow::anyhow!("missing 'schedule'"))?; + let schedule: Schedule = serde_json::from_value(schedule_json.clone()) + .map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?; + + let prompt = args + .get("prompt") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if prompt.is_empty() { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("prompt is required".into()), + }); + } + + let channel = args + .get("channel") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if !self.valid_channels.contains(&channel) { + return Ok(ToolResult { + success: false, + output: format!( + "Unknown channel '{}'. Available: {}", + channel, + self.valid_channels.join(", ") + ), + error: Some(format!("Unknown channel: {}", channel)), + }); + } + + let chat_id = args + .get("chat_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if chat_id.is_empty() { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("chat_id is required".into()), + }); + } + + let name = args + .get("name") + .and_then(|v| v.as_str()) + .unwrap_or(&prompt[..prompt.len().min(50)]) + .to_string(); + let model = args + .get("model") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + let now = now_ms(); + let next_run_at = next_run_for_schedule(&schedule, now) + .ok_or_else(|| anyhow::anyhow!("could not compute next run time from schedule"))?; + + let id = Uuid::new_v4().to_string(); + let job = ScheduledJob { + id: id.clone(), + name: name.clone(), + schedule, + prompt, + channel, + chat_id, + model, + enabled: true, + delete_after_run: false, + next_run_at, + last_run_at: None, + last_status: None, + last_error: None, + created_at: now, + updated_at: now, + }; + + SchedulerStore::add_job(&self.pool, &job).await?; + + Ok(ToolResult { + success: true, + output: format!( + "Scheduled job created: id={}, name=\"{}\", next_run_at={}", + id, name, next_run_at + ), + error: None, + }) + } +} + +// ── CronListTool ───────────────────────────────────────────────────────────── + +pub struct CronListTool { + pool: SqlitePool, +} + +impl CronListTool { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl Tool for CronListTool { + fn name(&self) -> &str { + "cron_list" + } + + fn description(&self) -> &str { + "List all scheduled tasks (cron jobs) with their status and next run time." + } + + fn read_only(&self) -> bool { + true + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "status": { + "type": "string", + "enum": ["all", "enabled", "disabled"], + "description": "Filter by job status (default: all)" + } + } + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let filter = args + .get("status") + .and_then(|v| v.as_str()) + .unwrap_or("all"); + let jobs = SchedulerStore::list_jobs(&self.pool).await?; + + let filtered: Vec<&ScheduledJob> = match filter { + "enabled" => jobs.iter().filter(|j| j.enabled).collect(), + "disabled" => jobs.iter().filter(|j| !j.enabled).collect(), + _ => jobs.iter().collect(), + }; + + if filtered.is_empty() { + return Ok(ToolResult { + success: true, + output: "No scheduled jobs found.".into(), + error: None, + }); + } + + let mut lines = Vec::new(); + for j in &filtered { + let status = if j.enabled { "enabled" } else { "disabled" }; + let last = match (&j.last_status, &j.last_error) { + (Some(s), _) if s == "ok" => " last:ok".to_string(), + (Some(_), Some(e)) => format!(" last:err({})", &e[..e.len().min(40)]), + _ => String::new(), + }; + let model = j.model.as_deref().unwrap_or("default"); + lines.push(format!( + "[{}] id={} name=\"{}\" channel={} chat={} model={} next={}{}", + status, j.id, j.name, j.channel, j.chat_id, model, j.next_run_at, last + )); + } + + Ok(ToolResult { + success: true, + output: lines.join("\n"), + error: None, + }) + } +} + +// ── CronRemoveTool ─────────────────────────────────────────────────────────── + +pub struct CronRemoveTool { + pool: SqlitePool, +} + +impl CronRemoveTool { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl Tool for CronRemoveTool { + fn name(&self) -> &str { + "cron_remove" + } + + fn description(&self) -> &str { + "Delete a scheduled task permanently by its job ID. Use cron_list first to find the ID." + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "The ID of the job to delete" + } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let job_id = args + .get("job_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if job_id.is_empty() { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("job_id is required".into()), + }); + } + + match SchedulerStore::get_job(&self.pool, &job_id).await { + Ok(_) => {} + Err(_) => { + return Ok(ToolResult { + success: false, + output: format!("Job {} not found.", job_id), + error: Some("not found".into()), + }); + } + } + + SchedulerStore::remove_job(&self.pool, &job_id).await?; + Ok(ToolResult { + success: true, + output: format!("Job {} deleted.", job_id), + error: None, + }) + } +} + +// ── CronEnableTool ─────────────────────────────────────────────────────────── + +pub struct CronEnableTool { + pool: SqlitePool, +} + +impl CronEnableTool { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl Tool for CronEnableTool { + fn name(&self) -> &str { + "cron_enable" + } + + fn description(&self) -> &str { + "Enable a disabled scheduled task by its job ID." + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "The ID of the job to enable" + } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let job_id = args + .get("job_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if job_id.is_empty() { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("job_id is required".into()), + }); + } + + let job = SchedulerStore::get_job(&self.pool, &job_id) + .await + .map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?; + + let next = next_run_for_schedule(&job.schedule, now_ms()); + SchedulerStore::set_enabled(&self.pool, &job_id, true).await?; + if let Some(n) = next { + SchedulerStore::set_next_run(&self.pool, &job_id, n).await?; + } + + Ok(ToolResult { + success: true, + output: format!("Job {} enabled.", job_id), + error: None, + }) + } +} + +// ── CronDisableTool ────────────────────────────────────────────────────────── + +pub struct CronDisableTool { + pool: SqlitePool, +} + +impl CronDisableTool { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl Tool for CronDisableTool { + fn name(&self) -> &str { + "cron_disable" + } + + fn description(&self) -> &str { + "Disable a scheduled task by its job ID without deleting it." + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "The ID of the job to disable" + } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let job_id = args + .get("job_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if job_id.is_empty() { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("job_id is required".into()), + }); + } + + let _ = SchedulerStore::get_job(&self.pool, &job_id) + .await + .map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?; + SchedulerStore::set_enabled(&self.pool, &job_id, false).await?; + + Ok(ToolResult { + success: true, + output: format!("Job {} disabled.", job_id), + error: None, + }) + } +} + +// ── CronUpdateTool ─────────────────────────────────────────────────────────── + +pub struct CronUpdateTool { + pool: SqlitePool, +} + +impl CronUpdateTool { + pub fn new(pool: SqlitePool) -> Self { + Self { pool } + } +} + +#[async_trait] +impl Tool for CronUpdateTool { + fn name(&self) -> &str { + "cron_update" + } + + fn description(&self) -> &str { + "Update fields of an existing scheduled task. Only specified fields are changed." + } + + fn parameters_schema(&self) -> Value { + json!({ + "type": "object", + "properties": { + "job_id": { + "type": "string", + "description": "The ID of the job to update" + }, + "prompt": { + "type": "string", + "description": "New AI prompt" + }, + "schedule": { + "type": "object", + "description": "New schedule definition" + }, + "channel": { + "type": "string", + "description": "New target channel" + }, + "chat_id": { + "type": "string", + "description": "New target chat ID" + }, + "model": { + "type": "string", + "description": "New model override" + } + }, + "required": ["job_id"] + }) + } + + async fn execute(&self, args: Value) -> anyhow::Result { + let job_id = args + .get("job_id") + .and_then(|v| v.as_str()) + .unwrap_or("") + .to_string(); + if job_id.is_empty() { + return Ok(ToolResult { + success: false, + output: String::new(), + error: Some("job_id is required".into()), + }); + } + + let _ = SchedulerStore::get_job(&self.pool, &job_id) + .await + .map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?; + + let prompt = args + .get("prompt") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let schedule: Option = match args.get("schedule") { + Some(s) => Some( + serde_json::from_value(s.clone()) + .map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?, + ), + None => None, + }; + let channel = args + .get("channel") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let chat_id = args + .get("chat_id") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + let model = args + .get("model") + .and_then(|v| v.as_str()) + .map(|s| s.to_string()); + + SchedulerStore::update_job(&self.pool, &job_id, prompt, schedule, channel, chat_id, model) + .await?; + + if args.get("schedule").is_some() { + let job = SchedulerStore::get_job(&self.pool, &job_id).await?; + if let Some(next) = next_run_for_schedule(&job.schedule, now_ms()) { + SchedulerStore::set_next_run(&self.pool, &job_id, next).await?; + } + } + + Ok(ToolResult { + success: true, + output: format!("Job {} updated.", job_id), + error: None, + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::scheduler::store::SchedulerStore; + use crate::scheduler::types::{Schedule, ScheduledJob}; + use serde_json::json; + use sqlx::SqlitePool; + + async fn setup_pool() -> SqlitePool { + let pool = SqlitePool::connect("sqlite::memory:").await.unwrap(); + SchedulerStore::init(&pool).await.unwrap(); + pool + } + + fn now() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_millis() as i64 + } + + #[tokio::test] + async fn test_cron_add_tool() { + let pool = setup_pool().await; + let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]); + let result = tool + .execute(json!({ + "schedule": {"type": "every", "every_ms": 3600000}, + "prompt": "report status", + "channel": "cli_chat", + "chat_id": "test-chat-1", + "name": "hourly report" + })) + .await + .unwrap(); + assert!(result.success); + assert!(result.output.contains("hourly report")); + + let jobs = SchedulerStore::list_jobs(&pool).await.unwrap(); + assert_eq!(jobs.len(), 1); + assert_eq!(jobs[0].name, "hourly report"); + } + + #[tokio::test] + async fn test_cron_add_invalid_channel() { + let pool = setup_pool().await; + let tool = CronAddTool::new(pool.clone(), vec!["cli_chat".to_string()]); + let result = tool + .execute(json!({ + "schedule": {"type": "every", "every_ms": 3600000}, + "prompt": "test", + "channel": "nonexistent", + "chat_id": "x", + "name": "test" + })) + .await + .unwrap(); + assert!(!result.success); + assert!(result.error.as_ref().unwrap().contains("Unknown channel")); + } + + #[tokio::test] + async fn test_cron_list_tool() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: uuid::Uuid::new_v4().to_string(), + name: "list-test".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t + 1000, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + + let tool = CronListTool::new(pool.clone()); + let result = tool.execute(json!({})).await.unwrap(); + assert!(result.success); + assert!(result.output.contains("list-test")); + } + + #[tokio::test] + async fn test_cron_remove_tool() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-rm-tool".into(), + name: "rm me".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + + let tool = CronRemoveTool::new(pool.clone()); + let result = tool + .execute(json!({"job_id": "job-rm-tool"})) + .await + .unwrap(); + assert!(result.success); + assert!(SchedulerStore::get_job(&pool, "job-rm-tool") + .await + .is_err()); + } + + #[tokio::test] + async fn test_cron_enable_disable_tools() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-toggle-tool".into(), + name: "toggle".into(), + schedule: Schedule::Every { every_ms: 1000 }, + prompt: "hi".into(), + channel: "cli_chat".into(), + chat_id: "c".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + + let disable_tool = CronDisableTool::new(pool.clone()); + let result = disable_tool + .execute(json!({"job_id": "job-toggle-tool"})) + .await + .unwrap(); + assert!(result.success); + + let got = SchedulerStore::get_job(&pool, "job-toggle-tool") + .await + .unwrap(); + assert!(!got.enabled); + + let enable_tool = CronEnableTool::new(pool.clone()); + let result = enable_tool + .execute(json!({"job_id": "job-toggle-tool"})) + .await + .unwrap(); + assert!(result.success); + + let got = SchedulerStore::get_job(&pool, "job-toggle-tool") + .await + .unwrap(); + assert!(got.enabled); + } + + #[tokio::test] + async fn test_cron_update_tool() { + let pool = setup_pool().await; + let t = now(); + let job = ScheduledJob { + id: "job-update-tool".into(), + name: "old".into(), + schedule: Schedule::Every { + every_ms: 3600000, + }, + prompt: "old prompt".into(), + channel: "feishu".into(), + chat_id: "oc_1".into(), + model: None, + enabled: true, + delete_after_run: false, + next_run_at: t + 1000, + last_run_at: None, + last_status: None, + last_error: None, + created_at: t, + updated_at: t, + }; + SchedulerStore::add_job(&pool, &job).await.unwrap(); + + let tool = CronUpdateTool::new(pool.clone()); + let result = tool + .execute(json!({ + "job_id": "job-update-tool", + "prompt": "new prompt", + "schedule": {"type": "every", "every_ms": 60000} + })) + .await + .unwrap(); + assert!(result.success); + + let got = SchedulerStore::get_job(&pool, "job-update-tool") + .await + .unwrap(); + assert_eq!(got.prompt, "new prompt"); + } +}