PicoBot/src/tools/cron.rs
2026-05-07 16:28:54 +08:00

801 lines
24 KiB
Rust

use std::sync::Arc;
use async_trait::async_trait;
use serde_json::{json, Value};
use uuid::Uuid;
use crate::scheduler::{next_run_for_schedule, Schedule};
use crate::storage::{ScheduledJob, Storage};
use crate::tools::traits::{Tool, ToolResult};
fn now_ms() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
pub struct CronAddTool {
storage: Arc<Storage>,
valid_channels: Vec<String>,
}
impl CronAddTool {
pub fn new(storage: Arc<Storage>, valid_channels: Vec<String>) -> Self {
Self {
storage,
valid_channels,
}
}
}
#[async_trait]
impl Tool for CronAddTool {
fn name(&self) -> &str {
"cron_add"
}
fn description(&self) -> &str {
"Create a new scheduled task (cron job). The task will execute an AI prompt on a schedule \
and deliver the result to the specified channel/chat. \
Important: the execution environment is a fresh session with no access to your current \
conversation history. The prompt parameter MUST include all necessary context: \
what to do, the target audience, required output format, and any background information. \
Schedule formats: \
- 'every': {\"type\":\"every\",\"every_ms\":3600000} for every hour, \
- 'at': {\"type\":\"at\",\"at\":<unix_timestamp_ms>} for one-shot, \
- 'cron': {\"type\":\"cron\",\"expr\":\"0 0 9 * * *\"} for cron expressions (6-field: sec min hour dom month dow)."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"schedule": {
"type": "object",
"description": "Schedule definition. One of: {\"type\":\"every\",\"every_ms\":<ms>}, {\"type\":\"at\",\"at\":<unix_ms>}, or {\"type\":\"cron\",\"expr\":\"<cron_expr>\",\"tz\":\"<tz>\"}",
"required": ["type"]
},
"prompt": {
"type": "string",
"description": "The AI prompt to execute on each trigger"
},
"channel": {
"type": "string",
"description": "Target channel for delivering results (e.g., 'feishu', 'cli_chat')"
},
"chat_id": {
"type": "string",
"description": "Target chat ID within the channel"
},
"name": {
"type": "string",
"description": "Human-readable name for the job (optional, defaults to truncated prompt)"
},
"model": {
"type": "string",
"description": "Optional model override for this job"
}
},
"required": ["schedule", "prompt", "channel", "chat_id"]
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let schedule_json = args
.get("schedule")
.ok_or_else(|| anyhow::anyhow!("missing 'schedule'"))?;
let schedule: Schedule = serde_json::from_value(schedule_json.clone())
.map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?;
let prompt = args
.get("prompt")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if prompt.is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("prompt is required".into()),
});
}
let channel = args
.get("channel")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if !self.valid_channels.contains(&channel) {
return Ok(ToolResult {
success: false,
output: format!(
"Unknown channel '{}'. Available: {}",
channel,
self.valid_channels.join(", ")
),
error: Some(format!("Unknown channel: {}", channel)),
});
}
let chat_id = args
.get("chat_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if chat_id.is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("chat_id is required".into()),
});
}
let name = args
.get("name")
.and_then(|v| v.as_str())
.unwrap_or_else(|| {
// char-boundary-safe truncation to 50 bytes
let limit = 50;
if prompt.len() <= limit {
prompt.as_str()
} else {
let mut end = limit;
while !prompt.is_char_boundary(end) {
end -= 1;
}
&prompt[..end]
}
})
.to_string();
let model = args
.get("model")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let now = now_ms();
let next_run_at = next_run_for_schedule(&schedule, now)
.ok_or_else(|| anyhow::anyhow!("could not compute next run time from schedule"))?;
let id = Uuid::new_v4().to_string()[..8].to_string();
let job = ScheduledJob {
id: id.clone(),
name: name.clone(),
schedule,
prompt,
channel,
chat_id,
model,
enabled: true,
delete_after_run: false,
next_run_at,
last_run_at: None,
last_status: None,
last_error: None,
created_at: now,
updated_at: now,
};
self.storage.add_scheduled_job(&job).await?;
Ok(ToolResult {
success: true,
output: format!(
"Scheduled job created: id={}, name=\"{}\", next_run_at={}",
id, name, next_run_at
),
error: None,
})
}
}
// ── CronListTool ─────────────────────────────────────────────────────────────
pub struct CronListTool {
storage: Arc<Storage>,
}
impl CronListTool {
pub fn new(storage: Arc<Storage>) -> Self {
Self { storage }
}
}
#[async_trait]
impl Tool for CronListTool {
fn name(&self) -> &str {
"cron_list"
}
fn description(&self) -> &str {
"List all scheduled tasks (cron jobs) with their status and next run time."
}
fn read_only(&self) -> bool {
true
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"status": {
"type": "string",
"enum": ["all", "enabled", "disabled"],
"description": "Filter by job status (default: all)"
}
}
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let filter = args
.get("status")
.and_then(|v| v.as_str())
.unwrap_or("all");
let jobs = self.storage.list_scheduled_jobs().await?;
let filtered: Vec<&ScheduledJob> = match filter {
"enabled" => jobs.iter().filter(|j| j.enabled).collect(),
"disabled" => jobs.iter().filter(|j| !j.enabled).collect(),
_ => jobs.iter().collect(),
};
if filtered.is_empty() {
return Ok(ToolResult {
success: true,
output: "No scheduled jobs found.".into(),
error: None,
});
}
let mut lines = Vec::new();
for j in &filtered {
let status = if j.enabled { "enabled" } else { "disabled" };
let last = match (&j.last_status, &j.last_error) {
(Some(s), _) if s == "ok" => " last:ok".to_string(),
(Some(_), Some(e)) => format!(" last:err({})", &e[..e.len().min(40)]),
_ => String::new(),
};
let model = j.model.as_deref().unwrap_or("default");
lines.push(format!(
"[{}] id={} name=\"{}\" channel={} chat={} model={} next={}{}",
status, j.id, j.name, j.channel, j.chat_id, model, j.next_run_at, last
));
}
Ok(ToolResult {
success: true,
output: lines.join("\n"),
error: None,
})
}
}
// ── CronRemoveTool ───────────────────────────────────────────────────────────
pub struct CronRemoveTool {
storage: Arc<Storage>,
}
impl CronRemoveTool {
pub fn new(storage: Arc<Storage>) -> Self {
Self { storage }
}
}
#[async_trait]
impl Tool for CronRemoveTool {
fn name(&self) -> &str {
"cron_remove"
}
fn description(&self) -> &str {
"Delete a scheduled task permanently by its job ID. Use cron_list first to find the ID."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "The ID of the job to delete"
}
},
"required": ["job_id"]
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let job_id = args
.get("job_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if job_id.is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("job_id is required".into()),
});
}
match self.storage.get_scheduled_job(&job_id).await {
Ok(_) => {}
Err(_) => {
return Ok(ToolResult {
success: false,
output: format!("Job {} not found.", job_id),
error: Some("not found".into()),
});
}
}
self.storage.remove_scheduled_job(&job_id).await?;
Ok(ToolResult {
success: true,
output: format!("Job {} deleted.", job_id),
error: None,
})
}
}
// ── CronEnableTool ───────────────────────────────────────────────────────────
pub struct CronEnableTool {
storage: Arc<Storage>,
}
impl CronEnableTool {
pub fn new(storage: Arc<Storage>) -> Self {
Self { storage }
}
}
#[async_trait]
impl Tool for CronEnableTool {
fn name(&self) -> &str {
"cron_enable"
}
fn description(&self) -> &str {
"Enable a disabled scheduled task by its job ID."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "The ID of the job to enable"
}
},
"required": ["job_id"]
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let job_id = args
.get("job_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if job_id.is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("job_id is required".into()),
});
}
let job = self
.storage
.get_scheduled_job(&job_id)
.await
.map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?;
let next = next_run_for_schedule(&job.schedule, now_ms());
self.storage.set_scheduled_job_enabled(&job_id, true).await?;
if let Some(n) = next {
self.storage.set_scheduled_job_next_run(&job_id, n).await?;
}
Ok(ToolResult {
success: true,
output: format!("Job {} enabled.", job_id),
error: None,
})
}
}
// ── CronDisableTool ──────────────────────────────────────────────────────────
pub struct CronDisableTool {
storage: Arc<Storage>,
}
impl CronDisableTool {
pub fn new(storage: Arc<Storage>) -> Self {
Self { storage }
}
}
#[async_trait]
impl Tool for CronDisableTool {
fn name(&self) -> &str {
"cron_disable"
}
fn description(&self) -> &str {
"Disable a scheduled task by its job ID without deleting it."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "The ID of the job to disable"
}
},
"required": ["job_id"]
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let job_id = args
.get("job_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if job_id.is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("job_id is required".into()),
});
}
let _ = self
.storage
.get_scheduled_job(&job_id)
.await
.map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?;
self.storage.set_scheduled_job_enabled(&job_id, false).await?;
Ok(ToolResult {
success: true,
output: format!("Job {} disabled.", job_id),
error: None,
})
}
}
// ── CronUpdateTool ───────────────────────────────────────────────────────────
pub struct CronUpdateTool {
storage: Arc<Storage>,
}
impl CronUpdateTool {
pub fn new(storage: Arc<Storage>) -> Self {
Self { storage }
}
}
#[async_trait]
impl Tool for CronUpdateTool {
fn name(&self) -> &str {
"cron_update"
}
fn description(&self) -> &str {
"Update fields of an existing scheduled task. Only specified fields are changed."
}
fn parameters_schema(&self) -> Value {
json!({
"type": "object",
"properties": {
"job_id": {
"type": "string",
"description": "The ID of the job to update"
},
"prompt": {
"type": "string",
"description": "New AI prompt"
},
"schedule": {
"type": "object",
"description": "New schedule definition"
},
"channel": {
"type": "string",
"description": "New target channel"
},
"chat_id": {
"type": "string",
"description": "New target chat ID"
},
"model": {
"type": "string",
"description": "New model override"
}
},
"required": ["job_id"]
})
}
async fn execute(&self, args: Value) -> anyhow::Result<ToolResult> {
let job_id = args
.get("job_id")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
if job_id.is_empty() {
return Ok(ToolResult {
success: false,
output: String::new(),
error: Some("job_id is required".into()),
});
}
let _ = self
.storage
.get_scheduled_job(&job_id)
.await
.map_err(|e| anyhow::anyhow!("Job {} not found: {}", job_id, e))?;
let prompt = args
.get("prompt")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let schedule: Option<Schedule> = match args.get("schedule") {
Some(s) => Some(
serde_json::from_value(s.clone())
.map_err(|e| anyhow::anyhow!("invalid schedule: {}", e))?,
),
None => None,
};
let channel = args
.get("channel")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let chat_id = args
.get("chat_id")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let model = args
.get("model")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
self.storage
.update_scheduled_job(&job_id, prompt, schedule, channel, chat_id, model)
.await?;
if args.get("schedule").is_some() {
let job = self.storage.get_scheduled_job(&job_id).await?;
if let Some(next) = next_run_for_schedule(&job.schedule, now_ms()) {
self.storage.set_scheduled_job_next_run(&job_id, next).await?;
}
}
Ok(ToolResult {
success: true,
output: format!("Job {} updated.", job_id),
error: None,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::Schedule;
use crate::storage::{ScheduledJob, Storage};
use serde_json::json;
use sqlx::SqlitePool;
async fn setup_storage() -> Arc<Storage> {
let pool = SqlitePool::connect("sqlite::memory:").await.unwrap();
Storage::init_scheduler_schema(&pool).await.unwrap();
Arc::new(Storage { pool })
}
fn now() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}
#[tokio::test]
async fn test_cron_add_tool() {
let storage = setup_storage().await;
let tool = CronAddTool::new(storage.clone(), vec!["cli_chat".to_string()]);
let result = tool
.execute(json!({
"schedule": {"type": "every", "every_ms": 3600000},
"prompt": "report status",
"channel": "cli_chat",
"chat_id": "test-chat-1",
"name": "hourly report"
}))
.await
.unwrap();
assert!(result.success);
assert!(result.output.contains("hourly report"));
let jobs = storage.list_scheduled_jobs().await.unwrap();
assert_eq!(jobs.len(), 1);
assert_eq!(jobs[0].name, "hourly report");
}
#[tokio::test]
async fn test_cron_add_invalid_channel() {
let storage = setup_storage().await;
let tool = CronAddTool::new(storage.clone(), vec!["cli_chat".to_string()]);
let result = tool
.execute(json!({
"schedule": {"type": "every", "every_ms": 3600000},
"prompt": "test",
"channel": "nonexistent",
"chat_id": "x",
"name": "test"
}))
.await
.unwrap();
assert!(!result.success);
assert!(result.error.as_ref().unwrap().contains("Unknown channel"));
}
#[tokio::test]
async fn test_cron_list_tool() {
let storage = setup_storage().await;
let t = now();
let job = ScheduledJob {
id: uuid::Uuid::new_v4().to_string(),
name: "list-test".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(),
channel: "cli_chat".into(),
chat_id: "c".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t + 1000,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
storage.add_scheduled_job(&job).await.unwrap();
let tool = CronListTool::new(storage.clone());
let result = tool.execute(json!({})).await.unwrap();
assert!(result.success);
assert!(result.output.contains("list-test"));
}
#[tokio::test]
async fn test_cron_remove_tool() {
let storage = setup_storage().await;
let t = now();
let job = ScheduledJob {
id: "job-rm-tool".into(),
name: "rm me".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(),
channel: "cli_chat".into(),
chat_id: "c".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
storage.add_scheduled_job(&job).await.unwrap();
let tool = CronRemoveTool::new(storage.clone());
let result = tool
.execute(json!({"job_id": "job-rm-tool"}))
.await
.unwrap();
assert!(result.success);
assert!(storage.get_scheduled_job("job-rm-tool").await.is_err());
}
#[tokio::test]
async fn test_cron_enable_disable_tools() {
let storage = setup_storage().await;
let t = now();
let job = ScheduledJob {
id: "job-toggle-tool".into(),
name: "toggle".into(),
schedule: Schedule::Every { every_ms: 1000 },
prompt: "hi".into(),
channel: "cli_chat".into(),
chat_id: "c".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
storage.add_scheduled_job(&job).await.unwrap();
let disable_tool = CronDisableTool::new(storage.clone());
let result = disable_tool
.execute(json!({"job_id": "job-toggle-tool"}))
.await
.unwrap();
assert!(result.success);
let got = storage.get_scheduled_job("job-toggle-tool").await.unwrap();
assert!(!got.enabled);
let enable_tool = CronEnableTool::new(storage.clone());
let result = enable_tool
.execute(json!({"job_id": "job-toggle-tool"}))
.await
.unwrap();
assert!(result.success);
let got = storage.get_scheduled_job("job-toggle-tool").await.unwrap();
assert!(got.enabled);
}
#[tokio::test]
async fn test_cron_update_tool() {
let storage = setup_storage().await;
let t = now();
let job = ScheduledJob {
id: "job-update-tool".into(),
name: "old".into(),
schedule: Schedule::Every {
every_ms: 3600000,
},
prompt: "old prompt".into(),
channel: "feishu".into(),
chat_id: "oc_1".into(),
model: None,
enabled: true,
delete_after_run: false,
next_run_at: t + 1000,
last_run_at: None,
last_status: None,
last_error: None,
created_at: t,
updated_at: t,
};
storage.add_scheduled_job(&job).await.unwrap();
let tool = CronUpdateTool::new(storage.clone());
let result = tool
.execute(json!({
"job_id": "job-update-tool",
"prompt": "new prompt",
"schedule": {"type": "every", "every_ms": 60000}
}))
.await
.unwrap();
assert!(result.success);
let got = storage.get_scheduled_job("job-update-tool").await.unwrap();
assert_eq!(got.prompt, "new prompt");
}
}