use std::collections::HashSet; use std::sync::Arc; use async_trait::async_trait; use serde_json::json; use crate::config::SchedulerSchedule; use crate::storage::{ SchedulerJobRecord, SchedulerJobState, SchedulerJobUpsert, SessionStore, }; use crate::tools::traits::{Tool, ToolResult}; pub struct SchedulerManageTool { store: Arc, known_agents: Arc>, } impl SchedulerManageTool { pub fn new(store: Arc, known_agents: HashSet) -> Self { Self { store, known_agents: Arc::new(known_agents), } } } #[async_trait] impl Tool for SchedulerManageTool { fn name(&self) -> &str { "scheduler_manage" } fn description(&self) -> &str { "Manage DB-backed scheduled jobs. Supports actions: list, get, put, delete, pause, resume. Jobs persist in SQLite and are executed by the scheduler runtime. When creating agent_task jobs, keep prompt/system_prompt focused on the work to perform; do not restate execution times unless the task logic truly depends on them, because the trigger already controls timing." } fn parameters_schema(&self) -> serde_json::Value { let mut allowed_agents = self .known_agents .iter() .cloned() .collect::>(); allowed_agents.sort(); let agent_hint = if allowed_agents.is_empty() { "agent_task payload.agent may be omitted or set to 'default'.".to_string() } else { format!( "agent_task payload.agent may be omitted, set to 'default', or use one of configured agents: {}.", allowed_agents.join(", ") ) }; json!({ "type": "object", "properties": { "action": { "type": "string", "enum": ["list", "get", "put", "delete", "pause", "resume"] }, "id": { "type": "string", "description": "Job id" }, "enabled_only": { "type": "boolean", "description": "Only for list action" }, "kind": { "type": "string", "enum": ["internal_event", "outbound_message", "agent_task"] }, "schedule": { "type": "object", "description": "Schedule object, for example {type: 'interval', seconds: 300} or {type: 'cron', expression: '0 9 * * *'}" }, "target": { "type": "object" }, "payload": { "type": "object", "description": format!("Job payload. agent_task supports prompt, agent, fresh_session, system_prompt, sender_id, metadata. For agent_task, write prompt/system_prompt as execution instructions and avoid repeating schedule phrases or execution times such as 每天9点 or 每小时 unless the task itself must reason about time. {} outbound_message expects content. internal_event expects event.", agent_hint) }, "max_runs": { "type": ["integer", "null"] } }, "required": ["action"] }) } async fn execute(&self, args: serde_json::Value) -> anyhow::Result { self.execute_with_context(&crate::tools::ToolContext::default(), args) .await } async fn execute_with_context( &self, context: &crate::tools::ToolContext, args: serde_json::Value, ) -> anyhow::Result { if args.is_null() { return Ok(error_result( "Missing required parameters: scheduler_manage expects a JSON object like {\"action\":\"list\"}", )); } if !args.is_object() { return Ok(error_result( "Invalid parameters: scheduler_manage expects a JSON object", )); } let action = match args.get("action").and_then(|value| value.as_str()) { Some(action) => action, None => return Ok(error_result("Missing required parameter: action")), }; let output = match action { "list" => { let enabled_only = args .get("enabled_only") .and_then(|value| value.as_bool()) .unwrap_or(false); let jobs = self.store.list_scheduler_jobs(enabled_only)?; json!(jobs.iter().map(record_to_json).collect::>()) } "get" => { let id = require_str(&args, "id")?; match self.store.get_scheduler_job(id)? { Some(record) => record_to_json(&record), None => return Ok(error_result(&format!("scheduler job '{}' not found", id))), } } "put" => { let input = build_upsert(context, &args, &self.known_agents)?; let record = self.store.upsert_scheduler_job(&input)?; record_to_json(&record) } "delete" => { let id = require_str(&args, "id")?; self.store.delete_scheduler_job(id)?; json!({"status": "deleted", "id": id}) } "pause" => { let id = require_str(&args, "id")?; let record = self .store .get_scheduler_job(id)? .ok_or_else(|| anyhow::anyhow!("scheduler job '{}' not found", id))?; let mut input = record_to_upsert(&record); input.enabled = false; input.state = SchedulerJobState::Paused; input.paused_at = Some(current_timestamp()); input.next_fire_at = None; let saved = self.store.upsert_scheduler_job(&input)?; record_to_json(&saved) } "resume" => { let id = require_str(&args, "id")?; let record = self .store .get_scheduler_job(id)? .ok_or_else(|| anyhow::anyhow!("scheduler job '{}' not found", id))?; let mut input = record_to_upsert(&record); input.enabled = true; input.state = SchedulerJobState::Scheduled; input.paused_at = None; input.completed_at = None; input.next_fire_at = None; let saved = self.store.upsert_scheduler_job(&input)?; record_to_json(&saved) } _ => return Ok(error_result("Unsupported action")), }; Ok(ToolResult { success: true, output: serde_json::to_string_pretty(&output)?, error: None, }) } } fn build_upsert( context: &crate::tools::ToolContext, args: &serde_json::Value, known_agents: &HashSet, ) -> anyhow::Result { let id = require_str(args, "id")?.to_string(); let kind = require_str(args, "kind")?.to_string(); let schedule_value = args .get("schedule") .cloned() .ok_or_else(|| anyhow::anyhow!("Missing required parameter: schedule"))?; let schedule: SchedulerSchedule = serde_json::from_value(schedule_value.clone())?; schedule.validate(&id)?; let (interval_secs, startup_delay_secs) = match &schedule { SchedulerSchedule::Interval { seconds, startup_delay_secs, } => (*seconds as i64, *startup_delay_secs as i64), _ => (0, 0), }; let payload = args.get("payload").cloned().unwrap_or_else(|| json!({})); let target = enrich_target_from_context( args.get("target").cloned().unwrap_or_else(|| json!({})), context, ); if kind == "agent_task" { validate_agent_task_payload(&payload, known_agents)?; validate_target_fields(&target, &["channel", "chat_id"], "agent_task")?; } else if kind == "outbound_message" { validate_outbound_message_payload(&payload)?; validate_target_fields(&target, &["channel", "chat_id"], "outbound_message")?; } Ok(SchedulerJobUpsert { id, kind, schedule: serde_json::to_value(schedule)?, interval_secs, startup_delay_secs, target, payload, enabled: args.get("enabled").and_then(|value| value.as_bool()).unwrap_or(true), state: if args.get("enabled").and_then(|value| value.as_bool()).unwrap_or(true) { SchedulerJobState::Scheduled } else { SchedulerJobState::Paused }, last_status: None, last_error: None, run_count: 0, max_runs: args.get("max_runs").and_then(|value| value.as_i64()), last_fired_at: None, next_fire_at: None, paused_at: None, completed_at: None, }) } fn enrich_target_from_context( target: serde_json::Value, context: &crate::tools::ToolContext, ) -> serde_json::Value { let mut object = match target { serde_json::Value::Object(map) => map, _ => return target, }; if !has_non_empty_string(&object, "channel") { if let Some(channel_name) = context.channel_name.as_ref().filter(|value| !value.trim().is_empty()) { object.insert("channel".to_string(), serde_json::Value::String(channel_name.clone())); } } if !has_non_empty_string(&object, "chat_id") { if let Some(chat_id) = context.chat_id.as_ref().filter(|value| !value.trim().is_empty()) { object.insert("chat_id".to_string(), serde_json::Value::String(chat_id.clone())); } } serde_json::Value::Object(object) } fn has_non_empty_string(object: &serde_json::Map, field: &str) -> bool { object .get(field) .and_then(|value| value.as_str()) .map(|value| !value.trim().is_empty()) .unwrap_or(false) } fn validate_agent_task_payload(payload: &serde_json::Value, known_agents: &HashSet) -> anyhow::Result<()> { let Some(prompt) = payload.get("prompt").and_then(|value| value.as_str()) else { anyhow::bail!("agent_task payload.prompt is required and must be a string") }; if prompt.trim().is_empty() { anyhow::bail!("agent_task payload.prompt cannot be empty") } let Some(agent_name) = payload.get("agent").and_then(|value| value.as_str()) else { return Ok(()); }; let normalized = agent_name.trim(); if normalized.is_empty() || normalized == "default" || known_agents.contains(normalized) { return Ok(()); } anyhow::bail!(unknown_agent_message(normalized, known_agents)) } fn unknown_agent_message(agent_name: &str, known_agents: &HashSet) -> String { let mut configured_agents = known_agents.iter().cloned().collect::>(); configured_agents.sort(); let configured_hint = if configured_agents.is_empty() { "No named agents are configured; use payload.agent='default' or omit payload.agent.".to_string() } else { format!( "payload.agent must be omitted, set to 'default', or use one of configured agents: default, {}.", configured_agents.join(", ") ) }; format!( "Unknown agent '{}' for agent_task payload.agent. {} '{}' is not an agent. If you mean a skill, do not put it in payload.agent.", agent_name, configured_hint, agent_name, ) } fn validate_outbound_message_payload(payload: &serde_json::Value) -> anyhow::Result<()> { let Some(content) = payload.get("content").and_then(|value| value.as_str()) else { anyhow::bail!("outbound_message payload.content is required and must be a string") }; if content.trim().is_empty() { anyhow::bail!("outbound_message payload.content cannot be empty") } Ok(()) } fn validate_target_fields( target: &serde_json::Value, required_fields: &[&str], kind: &str, ) -> anyhow::Result<()> { let object = target .as_object() .ok_or_else(|| anyhow::anyhow!("{} target must be an object", kind))?; for field in required_fields { let Some(value) = object.get(*field).and_then(|value| value.as_str()) else { anyhow::bail!("{} target.{} is required and must be a string", kind, field) }; if value.trim().is_empty() { anyhow::bail!("{} target.{} cannot be empty", kind, field) } } Ok(()) } fn record_to_json(record: &SchedulerJobRecord) -> serde_json::Value { json!({ "id": record.id, "kind": record.kind, "schedule": record.schedule, "target": record.target, "payload": record.payload, "enabled": record.enabled, "state": record.state, "last_status": record.last_status, "last_error": record.last_error, "run_count": record.run_count, "max_runs": record.max_runs, "last_fired_at": record.last_fired_at, "next_fire_at": record.next_fire_at, "paused_at": record.paused_at, "completed_at": record.completed_at, "created_at": record.created_at, "updated_at": record.updated_at, }) } fn record_to_upsert(record: &SchedulerJobRecord) -> SchedulerJobUpsert { SchedulerJobUpsert { id: record.id.clone(), kind: record.kind.clone(), schedule: record.schedule.clone(), interval_secs: record.interval_secs, startup_delay_secs: record.startup_delay_secs, target: record.target.clone(), payload: record.payload.clone(), enabled: record.enabled, state: record.state.clone(), last_status: record.last_status.clone(), last_error: record.last_error.clone(), run_count: record.run_count, max_runs: record.max_runs, last_fired_at: record.last_fired_at, next_fire_at: record.next_fire_at, paused_at: record.paused_at, completed_at: record.completed_at, } } fn require_str<'a>(args: &'a serde_json::Value, key: &str) -> anyhow::Result<&'a str> { args.get(key) .and_then(|value| value.as_str()) .ok_or_else(|| anyhow::anyhow!("Missing required parameter: {}", key)) } fn error_result(message: &str) -> ToolResult { ToolResult { success: false, output: String::new(), error: Some(message.to_string()), } } fn current_timestamp() -> i64 { std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_millis() as i64 } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_scheduler_manage_put_and_get() { let store = Arc::new(SessionStore::in_memory().unwrap()); let tool = SchedulerManageTool::new(store, HashSet::new()); let put_result = tool .execute(json!({ "action": "put", "id": "heartbeat", "kind": "outbound_message", "schedule": { "type": "interval", "seconds": 60 }, "target": { "channel": "feishu", "chat_id": "oc_demo" }, "payload": { "content": "ping" } })) .await .unwrap(); assert!(put_result.success); let get_result = tool .execute(json!({ "action": "get", "id": "heartbeat" })) .await .unwrap(); assert!(get_result.success); assert!(get_result.output.contains("heartbeat")); assert!(get_result.output.contains("outbound_message")); } #[tokio::test] async fn test_scheduler_manage_put_agent_task() { let store = Arc::new(SessionStore::in_memory().unwrap()); let tool = SchedulerManageTool::new(store, HashSet::from(["planner".to_string()])); let put_result = tool .execute(json!({ "action": "put", "id": "agent.daily_summary", "kind": "agent_task", "schedule": { "type": "cron", "expression": "0 9 * * *" }, "target": { "channel": "feishu", "chat_id": "oc_demo" }, "payload": { "prompt": "请总结今天待办", "agent": "planner" } })) .await .unwrap(); assert!(put_result.success); assert!(put_result.output.contains("agent_task")); } #[tokio::test] async fn test_scheduler_manage_rejects_outbound_message_without_target() { let store = Arc::new(SessionStore::in_memory().unwrap()); let tool = SchedulerManageTool::new(store, HashSet::new()); let put_result = tool .execute(json!({ "action": "put", "id": "massage_reminder", "kind": "outbound_message", "schedule": { "type": "interval", "seconds": 60 }, "payload": { "content": "⏰ 时间到了!该去按摩了!💆" } })) .await; assert!(put_result.is_err()); let error = put_result.err().unwrap().to_string(); assert!(error.contains("outbound_message target.channel is required")); } #[tokio::test] async fn test_scheduler_manage_put_uses_tool_context_target_defaults() { let store = Arc::new(SessionStore::in_memory().unwrap()); let tool = SchedulerManageTool::new(store.clone(), HashSet::new()); let put_result = tool .execute_with_context( &crate::tools::ToolContext { channel_name: Some("feishu".to_string()), sender_id: Some("user-1".to_string()), chat_id: Some("oc_demo".to_string()), session_id: Some("feishu:oc_demo".to_string()), message_id: Some("msg-1".to_string()), message_seq: Some(1), }, json!({ "action": "put", "id": "work_reminder", "kind": "outbound_message", "schedule": { "type": "interval", "seconds": 60 }, "payload": { "content": "ping" } }), ) .await .unwrap(); assert!(put_result.success); let saved = store.get_scheduler_job("work_reminder").unwrap().unwrap(); assert_eq!(saved.target["channel"], "feishu"); assert_eq!(saved.target["chat_id"], "oc_demo"); } #[tokio::test] async fn test_scheduler_manage_rejects_unknown_agent_task_agent() { let store = Arc::new(SessionStore::in_memory().unwrap()); let tool = SchedulerManageTool::new(store, HashSet::from(["planner".to_string()])); let put_result = tool .execute(json!({ "action": "put", "id": "agent.daily_summary", "kind": "agent_task", "schedule": { "type": "cron", "expression": "0 9 * * *" }, "target": { "channel": "feishu", "chat_id": "oc_demo" }, "payload": { "prompt": "请总结今天待办", "agent": "missing-agent" } })) .await; assert!(put_result.is_err()); let error = put_result.err().unwrap().to_string(); assert!(error.contains("Unknown agent 'missing-agent' for agent_task payload.agent")); assert!(error.contains("payload.agent must be omitted, set to 'default', or use one of configured agents: default, planner")); assert!(error.contains("If you mean a skill, do not put it in payload.agent")); } #[tokio::test] async fn test_scheduler_manage_accepts_default_agent() { let store = Arc::new(SessionStore::in_memory().unwrap()); let tool = SchedulerManageTool::new(store, HashSet::from(["planner".to_string()])); let put_result = tool .execute(json!({ "action": "put", "id": "agent.default_summary", "kind": "agent_task", "schedule": { "type": "cron", "expression": "0 9 * * *" }, "target": { "channel": "feishu", "chat_id": "oc_demo" }, "payload": { "prompt": "请总结今天待办", "agent": "default" } })) .await .unwrap(); assert!(put_result.success); } #[tokio::test] async fn test_scheduler_manage_rejects_null_args_locally() { let store = Arc::new(SessionStore::in_memory().unwrap()); let tool = SchedulerManageTool::new(store, HashSet::new()); let result = tool.execute(serde_json::Value::Null).await.unwrap(); assert!(!result.success); assert_eq!( result.error.as_deref(), Some("Missing required parameters: scheduler_manage expects a JSON object like {\"action\":\"list\"}") ); } #[test] fn test_scheduler_manage_schema_advises_against_repeating_schedule_in_agent_task_prompt() { let store = Arc::new(SessionStore::in_memory().unwrap()); let tool = SchedulerManageTool::new(store, HashSet::new()); let schema = tool.parameters_schema(); let payload_description = schema["properties"]["payload"]["description"] .as_str() .unwrap(); assert!(payload_description.contains("avoid repeating schedule phrases or execution times")); assert!(payload_description.contains("每天9点")); assert!(payload_description.contains("每小时")); } }