diff --git a/Cargo.toml b/Cargo.toml index 4baa31e..b9d6b4c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,8 @@ tracing = "0.1" tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } tracing-appender = "0.2" anyhow = "1.0" +chrono = { version = "0.4", features = ["serde"] } +cron = { version = "0.13", features = ["serde"] } mime_guess = "2.0" base64 = "0.22" tempfile = "3" diff --git a/README.md b/README.md index cfda533..67faeb2 100644 --- a/README.md +++ b/README.md @@ -81,3 +81,58 @@ Add skills in config.json: "max_listed_skills": 32 } } + +Scheduler + +PicoBot now includes a DB-backed scheduler for heartbeat, delayed jobs, interval jobs, one-shot absolute-time jobs, and cron jobs. + +Current behavior: +- Scheduler runs as a background loop inside gateway lifecycle. +- Job definitions and runtime state are persisted in SQLite instead of JSON files. +- Supported schedule types: delay, interval, at, cron. +- Supported job kinds: internal_event, outbound_message. +- Built-in internal event: session_cleanup, used to clear expired in-memory channel sessions. +- Built-in management tool: scheduler_manage. + +Config example: + +{ + "scheduler": { + "enabled": true, + "tick_resolution_ms": 1000, + "misfire_policy": "skip", + "jobs": [ + { + "id": "session.cleanup", + "kind": "internal_event", + "schedule": { + "type": "interval", + "seconds": 300 + }, + "payload": { + "event": "session_cleanup" + } + }, + { + "id": "daily.reminder", + "kind": "outbound_message", + "schedule": { + "type": "cron", + "expression": "0 9 * * *" + }, + "target": { + "channel": "feishu", + "chat_id": "oc_xxx" + }, + "payload": { + "content": "每日提醒" + } + } + ] + } +} + +Runtime management: +- Use scheduler_manage with action=list|get|put|delete|pause|resume. +- Jobs created by the tool are written into SQLite and picked up by the scheduler loop. +- Config-defined jobs are also synced into SQLite on startup. diff --git a/docs/PERSISTENCE.md b/docs/PERSISTENCE.md index e615366..58d01e2 100644 --- a/docs/PERSISTENCE.md +++ b/docs/PERSISTENCE.md @@ -18,10 +18,13 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 - 打开外键约束 - 自动建表和建索引 -当前持久化只覆盖两类核心数据: +当前持久化覆盖以下核心数据: - `sessions`:会话元数据 - `messages`:会话内的消息流水 +- `skill_events`:技能发现和使用事件 +- `memories`:长期记忆 +- `scheduler_jobs`:定时任务定义和运行状态 内存中的 `Session` 负责运行态处理,SQLite 负责跨进程、跨重启保留历史。整体设计是“内存缓存 + SQLite 事实来源”。 @@ -145,9 +148,45 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 - `tool_calls_json` 表示“assistant 想调用哪些工具” - `tool_call_id` 表示“这一条 tool 结果是在回应哪一次工具调用” -## 6. 数据写入流程 +## 6. scheduler_jobs -### 6.1 创建会话 +`scheduler_jobs` 是 PicoBot 计划任务的事实来源。它同时保存任务定义和最近一次运行后的状态,使 scheduler 在重启后仍能恢复调度。 + +字段说明: + +| 字段 | 类型 | 含义 | +| --- | --- | --- | +| `id` | `TEXT PRIMARY KEY` | 任务唯一标识 | +| `kind` | `TEXT NOT NULL` | 任务类型,当前支持 `internal_event`、`outbound_message` | +| `schedule_json` | `TEXT NOT NULL` | 统一 schedule 定义,JSON 形式保存 `delay` / `interval` / `at` / `cron` | +| `interval_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 | +| `startup_delay_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 | +| `target_json` | `TEXT NOT NULL` | 任务目标,例如 channel/chat_id | +| `payload_json` | `TEXT NOT NULL` | 任务执行载荷 | +| `enabled` | `INTEGER NOT NULL DEFAULT 1` | 是否启用 | +| `state` | `TEXT NOT NULL DEFAULT 'scheduled'` | 生命周期状态:`scheduled` / `running` / `paused` / `completed` | +| `last_status` | `TEXT` | 最近一次运行结果:`ok` / `error` / `skipped` | +| `last_error` | `TEXT` | 最近一次执行错误信息 | +| `run_count` | `INTEGER NOT NULL DEFAULT 0` | 已执行次数 | +| `max_runs` | `INTEGER` | 最大执行次数,达到后转为 completed | +| `last_fired_at` | `INTEGER` | 最近一次触发时间 | +| `next_fire_at` | `INTEGER` | 下一次计划触发时间 | +| `paused_at` | `INTEGER` | 暂停时间 | +| `completed_at` | `INTEGER` | 完成时间 | +| `created_at` | `INTEGER NOT NULL` | 创建时间 | +| `updated_at` | `INTEGER NOT NULL` | 最近更新时间 | + +运行语义: + +- scheduler 启动后会先把 config 中声明的 jobs 同步进 `scheduler_jobs`。 +- 运行时新建、暂停、恢复、删除任务可以通过 `scheduler_manage` 工具直接操作数据库。 +- repeating job 会在每次执行后更新 `run_count`、`last_fired_at`、`next_fire_at`。 +- one-shot job(`delay` / `at`)完成后会进入 `completed` 状态,不再调度。 +- 内置 `internal_event` 当前包含 `session_cleanup`,用于回收超时的内存 session 缓存。 + +## 7. 数据写入流程 + +### 7.1 创建会话 有两种进入方式: @@ -161,7 +200,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 - `deleted_at = NULL` - `message_count = 0` -### 6.2 追加消息 +### 7.2 追加消息 消息持久化统一走 `append_message()`,写入过程是一个 SQLite 事务: @@ -177,7 +216,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 另外,只有 `role = user` 的消息会递增 `user_turn_count`;`system`、`assistant`、`tool` 消息不会影响周期注入阈值的判定。 -### 6.3 读取历史 +### 7.3 读取历史 `load_messages(session_id)` 会按 `seq ASC` 读取当前活动段历史,并把 JSON 字段反序列化回 `ChatMessage`。活动段的定义是: @@ -188,9 +227,9 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 因此运行态恢复的是“当前活动段的逻辑顺序”,而不是简单按创建时间排序。只要 `seq` 连续,重放顺序就稳定。 -## 7. 典型时序 +## 8. 典型时序 -### 7.1 普通问答 +### 8.1 普通问答 1. 用户消息进入网关。 2. 如果数据库中没有对应会话,先插入一条 `sessions`。 @@ -199,7 +238,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 5. assistant 回复写入 `messages`,`role = assistant`。 6. 会话的 `message_count` 增加 2,`last_active_at` 更新时间。 -### 7.2 带工具调用的问答 +### 8.2 带工具调用的问答 1. assistant 先生成一条带 `tool_calls_json` 的消息,`role = assistant`。 2. 系统执行对应工具。 @@ -213,16 +252,16 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 - 每个工具调用返回了什么 - 最终 assistant 给了什么结论 -## 8. 会话生命周期操作 +## 9. 会话生命周期操作 -### 8.1 重命名 +### 9.1 重命名 `rename_session(session_id, title)`: - 更新 `sessions.title` - 更新 `sessions.updated_at` -### 8.2 归档 +### 9.2 归档 `archive_session(session_id)`: @@ -235,7 +274,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 - `include_archived = false` 只返回 `archived_at IS NULL` 的会话 - `include_archived = true` 返回全部未删除会话 -### 8.3 清空消息 +### 9.3 清空消息 `clear_messages(session_id)`: diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index c5584d2..4f916cf 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -21,7 +21,7 @@ use std::time::Instant; const MAX_TOOL_RESULT_CHARS: usize = 16_000; /// Minimum characters to keep when truncating const TRUNCATION_SUFFIX_LEN: usize = 200; -const MEMORY_TOOL_USAGE_SYSTEM_PROMPT: &str = "你可以在处理任务过程中使用长期记忆工具。读取记忆时,优先使用 memory_search:当你需要用户长期偏好、稳定事实、历史决策、持续任务上下文时,先 search;已知 namespace/key 时可用 get;需要浏览最近记忆时可用 list。写入或修改记忆时,再使用 memory_manage。仅在遇到高价值且未来仍有用的信息时写入记忆:用户长期偏好、稳定事实、用户对你的纠正、持续任务/项目上下文、明确决策。不要保存一次性工具结果、临时列表、敏感凭证或不确定推测。写入时优先使用规范 namespace:preferences、profile、tasks、decisions,并优先调用 memory_manage(action='put');同一 namespace/key 可直接覆盖更新。检索时应提供 queries 数组,尽量同时放入中文关键词、英文别名,以及可能的 snake_case memory_key 词,例如 queries=['email', '邮件', 'email_folder_preference']。"; +const MEMORY_TOOL_USAGE_SYSTEM_PROMPT: &str = "你可以在处理任务过程中使用长期记忆工具。读取记忆时,优先使用 memory_search:当用户的任务描述缺少相关指代,上下文存在模糊不清,执行任务需要用户长期偏好、稳定事实、历史决策、持续任务上下文时,先 search;已知 namespace/key 时可用 get;需要浏览最近记忆时可用 list。写入或修改记忆时,再使用 memory_manage。仅在遇到高价值且未来仍有用的信息时写入记忆:用户长期偏好、稳定事实、用户对你的纠正、持续任务/项目上下文、明确决策。不要保存一次性工具结果、临时列表、敏感凭证或不确定推测。写入时优先使用规范 namespace:preferences、profile、tasks、decisions,并优先调用 memory_manage(action='put');同一 namespace/key 可直接覆盖更新。检索时应提供 queries 数组,尽量同时放入中文关键词、英文别名,以及可能的 snake_case memory_key 词,例如 queries=['email', '邮件', 'email_folder_preference']。"; const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__"; const DEFAULT_PENDING_ASSISTANT_MESSAGE: &str = "工具已经启动并进入等待用户操作的状态。请先完成外部操作,完成后直接告诉我继续。"; diff --git a/src/config/mod.rs b/src/config/mod.rs index 74484a2..b0b37c3 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,9 +1,11 @@ +use chrono::{DateTime, Utc}; use regex::Regex; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::env; use std::fs; use std::path::{Path, PathBuf}; +use std::str::FromStr; #[derive(Debug, Clone, Deserialize, Serialize)] pub struct Config { @@ -13,6 +15,8 @@ pub struct Config { #[serde(default)] pub gateway: GatewayConfig, #[serde(default)] + pub scheduler: SchedulerConfig, + #[serde(default)] pub client: ClientConfig, #[serde(default)] pub channels: HashMap, @@ -148,6 +152,198 @@ pub struct ClientConfig { pub gateway_url: String, } +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SchedulerConfig { + #[serde(default)] + pub enabled: bool, + #[serde(default = "default_scheduler_tick_resolution_ms")] + pub tick_resolution_ms: u64, + #[serde(default = "default_scheduler_worker_queue_capacity")] + pub worker_queue_capacity: usize, + #[serde(default)] + pub misfire_policy: SchedulerMisfirePolicy, + #[serde(default)] + pub jobs: Vec, +} + +#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)] +#[serde(rename_all = "snake_case")] +pub enum SchedulerMisfirePolicy { + CatchUp, + #[default] + Skip, +} + +#[derive(Debug, Clone, Deserialize, Serialize)] +pub struct SchedulerJobConfig { + pub id: String, + #[serde(default = "default_scheduler_job_enabled")] + pub enabled: bool, + pub kind: SchedulerJobKind, + #[serde(default)] + pub schedule: Option, + #[serde(default)] + pub startup_delay_secs: u64, + #[serde(default)] + pub interval_secs: u64, + #[serde(default)] + pub target: SchedulerJobTarget, + #[serde(default)] + pub payload: serde_json::Value, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum SchedulerJobKind { + InternalEvent, + OutboundMessage, +} + +#[derive(Debug, Clone, Deserialize, Serialize, Default)] +pub struct SchedulerJobTarget { + #[serde(default)] + pub channel: Option, + #[serde(default)] + pub chat_id: Option, + #[serde(default)] + pub reply_to: Option, +} + +#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)] +#[serde(tag = "type", rename_all = "snake_case")] +pub enum SchedulerSchedule { + Delay { + seconds: u64, + }, + Interval { + seconds: u64, + #[serde(default)] + startup_delay_secs: u64, + }, + At { + timestamp: String, + }, + Cron { + expression: String, + }, +} + +impl SchedulerJobConfig { + pub fn resolved_schedule(&self) -> Result { + if let Some(schedule) = &self.schedule { + schedule.validate(&self.id)?; + return Ok(schedule.normalized_for_storage()); + } + + if self.interval_secs > 0 { + return Ok(SchedulerSchedule::Interval { + seconds: self.interval_secs, + startup_delay_secs: self.startup_delay_secs, + }); + } + + Err(ConfigError::InvalidSchedulerJob(format!( + "scheduler job '{}' requires schedule or interval_secs", + self.id + ))) + } +} + +impl SchedulerSchedule { + pub fn validate(&self, job_id: &str) -> Result<(), ConfigError> { + match self { + SchedulerSchedule::Delay { seconds } => { + if *seconds == 0 { + return Err(ConfigError::InvalidSchedulerJob(format!( + "scheduler job '{}' delay.seconds must be greater than 0", + job_id + ))); + } + } + SchedulerSchedule::Interval { seconds, .. } => { + if *seconds == 0 { + return Err(ConfigError::InvalidSchedulerJob(format!( + "scheduler job '{}' interval.seconds must be greater than 0", + job_id + ))); + } + } + SchedulerSchedule::At { timestamp } => { + DateTime::parse_from_rfc3339(timestamp).map_err(|err| { + ConfigError::InvalidSchedulerJob(format!( + "scheduler job '{}' invalid at.timestamp '{}': {}", + job_id, timestamp, err + )) + })?; + } + SchedulerSchedule::Cron { expression } => { + parse_scheduler_cron(expression).map_err(|err| { + ConfigError::InvalidSchedulerJob(format!( + "scheduler job '{}' invalid cron.expression '{}': {}", + job_id, expression, err + )) + })?; + } + } + + Ok(()) + } + + pub fn is_one_shot(&self) -> bool { + matches!(self, SchedulerSchedule::Delay { .. } | SchedulerSchedule::At { .. }) + } + + pub fn normalized_for_storage(&self) -> Self { + match self { + SchedulerSchedule::At { timestamp } => { + let parsed = DateTime::parse_from_rfc3339(timestamp) + .map(|value| value.with_timezone(&Utc).to_rfc3339()) + .unwrap_or_else(|_| timestamp.clone()); + SchedulerSchedule::At { timestamp: parsed } + } + other => other.clone(), + } + } + + pub fn display(&self) -> String { + match self { + SchedulerSchedule::Delay { seconds } => format!("delay:{}s", seconds), + SchedulerSchedule::Interval { + seconds, + startup_delay_secs, + } => format!("interval:{}s:start_delay:{}s", seconds, startup_delay_secs), + SchedulerSchedule::At { timestamp } => format!("at:{}", timestamp), + SchedulerSchedule::Cron { expression } => format!("cron:{}", expression), + } + } +} + +fn parse_scheduler_cron(expression: &str) -> Result { + let normalized = normalize_cron_expression(expression); + cron::Schedule::from_str(&normalized) +} + +fn normalize_cron_expression(expression: &str) -> String { + let parts: Vec<&str> = expression.split_whitespace().collect(); + if parts.len() == 5 { + format!("0 {}", expression.trim()) + } else { + expression.trim().to_string() + } +} + +fn default_scheduler_tick_resolution_ms() -> u64 { + 1_000 +} + +fn default_scheduler_worker_queue_capacity() -> usize { + 64 +} + +fn default_scheduler_job_enabled() -> bool { + true +} + fn default_gateway_host() -> String { "127.0.0.1".to_string() } @@ -184,6 +380,18 @@ impl Default for ClientConfig { } } +impl Default for SchedulerConfig { + fn default() -> Self { + Self { + enabled: false, + tick_resolution_ms: default_scheduler_tick_resolution_ms(), + worker_queue_capacity: default_scheduler_worker_queue_capacity(), + misfire_policy: SchedulerMisfirePolicy::default(), + jobs: Vec::new(), + } + } +} + #[derive(Debug, Clone)] pub struct LLMProviderConfig { pub provider_type: String, @@ -268,6 +476,7 @@ pub enum ConfigError { AgentNotFound(String), ProviderNotFound(String), ModelNotFound(String), + InvalidSchedulerJob(String), } impl std::fmt::Display for ConfigError { @@ -277,6 +486,7 @@ impl std::fmt::Display for ConfigError { ConfigError::AgentNotFound(name) => write!(f, "Agent not found: {}", name), ConfigError::ProviderNotFound(name) => write!(f, "Provider not found: {}", name), ConfigError::ModelNotFound(name) => write!(f, "Model not found: {}", name), + ConfigError::InvalidSchedulerJob(message) => write!(f, "Invalid scheduler job: {}", message), } } } @@ -471,4 +681,196 @@ mod tests { let config = Config::load(file.path().to_str().unwrap()).unwrap(); assert!(config.gateway.show_tool_results); } + + #[test] + fn test_scheduler_config_defaults() { + let file = write_test_config(); + let config = Config::load(file.path().to_str().unwrap()).unwrap(); + + assert!(!config.scheduler.enabled); + assert_eq!(config.scheduler.tick_resolution_ms, 1_000); + assert_eq!(config.scheduler.worker_queue_capacity, 64); + assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::Skip); + assert!(config.scheduler.jobs.is_empty()); + } + + #[test] + fn test_scheduler_config_loads_interval_compat_jobs() { + let file = tempfile::NamedTempFile::new().unwrap(); + std::fs::write( + file.path(), + r#"{ + "providers": { + "aliyun": { + "type": "openai", + "base_url": "https://example.invalid/v1", + "api_key": "test-key", + "extra_headers": {} + } + }, + "models": { + "qwen-plus": { + "model_id": "qwen-plus" + } + }, + "agents": { + "default": { + "provider": "aliyun", + "model": "qwen-plus" + } + }, + "scheduler": { + "enabled": true, + "tick_resolution_ms": 500, + "worker_queue_capacity": 8, + "misfire_policy": "catch_up", + "jobs": [ + { + "id": "heartbeat.reminder", + "kind": "outbound_message", + "interval_secs": 60, + "startup_delay_secs": 5, + "target": { + "channel": "feishu", + "chat_id": "oc_demo" + }, + "payload": { + "content": "heartbeat" + } + } + ] + } +}"#, + ) + .unwrap(); + + let config = Config::load(file.path().to_str().unwrap()).unwrap(); + assert!(config.scheduler.enabled); + assert_eq!(config.scheduler.tick_resolution_ms, 500); + assert_eq!(config.scheduler.worker_queue_capacity, 8); + assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::CatchUp); + assert_eq!(config.scheduler.jobs.len(), 1); + + let job = &config.scheduler.jobs[0]; + assert_eq!(job.id, "heartbeat.reminder"); + assert!(job.enabled); + assert_eq!(job.kind, SchedulerJobKind::OutboundMessage); + assert_eq!(job.interval_secs, 60); + assert_eq!(job.startup_delay_secs, 5); + assert_eq!(job.target.channel.as_deref(), Some("feishu")); + assert_eq!(job.target.chat_id.as_deref(), Some("oc_demo")); + assert_eq!(job.payload.get("content").and_then(|value| value.as_str()), Some("heartbeat")); + assert_eq!(job.resolved_schedule().unwrap(), SchedulerSchedule::Interval { + seconds: 60, + startup_delay_secs: 5, + }); + } + + #[test] + fn test_scheduler_config_loads_schedule_variants() { + let file = tempfile::NamedTempFile::new().unwrap(); + std::fs::write( + file.path(), + r#"{ + "providers": { + "aliyun": { + "type": "openai", + "base_url": "https://example.invalid/v1", + "api_key": "test-key", + "extra_headers": {} + } + }, + "models": { + "qwen-plus": { + "model_id": "qwen-plus" + } + }, + "agents": { + "default": { + "provider": "aliyun", + "model": "qwen-plus" + } + }, + "scheduler": { + "enabled": true, + "jobs": [ + { + "id": "delay.job", + "kind": "internal_event", + "schedule": { + "type": "delay", + "seconds": 30 + } + }, + { + "id": "at.job", + "kind": "outbound_message", + "schedule": { + "type": "at", + "timestamp": "2026-04-23T09:00:00Z" + }, + "target": { + "channel": "feishu", + "chat_id": "oc_demo" + }, + "payload": { + "content": "at run" + } + }, + { + "id": "cron.job", + "kind": "internal_event", + "schedule": { + "type": "cron", + "expression": "0 9 * * *" + } + } + ] + } +}"#, + ) + .unwrap(); + + let config = Config::load(file.path().to_str().unwrap()).unwrap(); + assert_eq!(config.scheduler.jobs.len(), 3); + assert_eq!( + config.scheduler.jobs[0].resolved_schedule().unwrap(), + SchedulerSchedule::Delay { seconds: 30 } + ); + assert_eq!( + config.scheduler.jobs[1].resolved_schedule().unwrap(), + SchedulerSchedule::At { + timestamp: "2026-04-23T09:00:00+00:00".to_string(), + } + ); + assert_eq!( + config.scheduler.jobs[2].resolved_schedule().unwrap(), + SchedulerSchedule::Cron { + expression: "0 9 * * *".to_string(), + } + ); + } + + #[test] + fn test_scheduler_schedule_validation_rejects_invalid_values() { + assert!(SchedulerSchedule::Delay { seconds: 0 } + .validate("delay.job") + .is_err()); + assert!(SchedulerSchedule::Interval { + seconds: 0, + startup_delay_secs: 0, + } + .validate("interval.job") + .is_err()); + assert!(SchedulerSchedule::At { + timestamp: "bad timestamp".to_string(), + } + .validate("at.job") + .is_err()); + assert!(SchedulerSchedule::Cron { + expression: "bad cron".to_string(), + } + .validate("cron.job") + .is_err()); + } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index fb99e23..2f711ef 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -10,6 +10,7 @@ use crate::bus::{MessageBus, OutboundDispatcher}; use crate::channels::ChannelManager; use crate::config::Config; use crate::logging; +use crate::scheduler::Scheduler; use crate::skills::SkillRuntime; use session::{BusToolCallEmitter, SessionManager}; @@ -148,6 +149,20 @@ pub async fn run(host: Option, port: Option) -> Result<(), Box, port: Option) -> Result<(), Box, store: Arc) -> ToolReg registry.register(FileWriteTool::new()); registry.register(FileEditTool::new()); registry.register(MemorySearchTool::new(store.clone())); - registry.register(MemoryManageTool::new(store)); + registry.register(MemoryManageTool::new(store.clone())); + registry.register(SchedulerManageTool::new(store)); registry.register(SkillListTool::new(skills.clone())); registry.register(SkillManageTool::new(skills)); registry.register(BashTool::new()); @@ -571,6 +572,29 @@ impl SessionManager { inner.session_timestamps.insert(channel_name.to_string(), Instant::now()); } + pub async fn cleanup_expired_sessions(&self) -> usize { + let mut inner = self.inner.lock().await; + let now = Instant::now(); + let expired_channels: Vec = inner + .session_timestamps + .iter() + .filter_map(|(channel_name, last_active)| { + if now.duration_since(*last_active) > inner.session_ttl { + Some(channel_name.clone()) + } else { + None + } + }) + .collect(); + + for channel_name in &expired_channels { + inner.sessions.remove(channel_name); + inner.session_timestamps.remove(channel_name); + } + + expired_channels.len() + } + /// 处理消息:路由到对应 session 的 agent pub async fn handle_message( &self, diff --git a/src/lib.rs b/src/lib.rs index 79d0d80..ae19698 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,7 @@ pub mod protocol; pub mod channels; pub mod logging; pub mod observability; +pub mod scheduler; pub mod storage; pub mod tools; pub mod skills; diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs new file mode 100644 index 0000000..054d338 --- /dev/null +++ b/src/scheduler/mod.rs @@ -0,0 +1,558 @@ +use std::collections::HashMap; +use std::str::FromStr; +use std::sync::Arc; + +use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc}; +use tokio::sync::watch; + +use crate::bus::{MessageBus, OutboundMessage}; +use crate::config::{ + SchedulerConfig, SchedulerJobConfig, SchedulerJobKind, SchedulerJobTarget, SchedulerMisfirePolicy, + SchedulerSchedule, +}; +use crate::gateway::session::SessionManager; +use crate::storage::{ + SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore, +}; + +pub struct Scheduler { + bus: Arc, + config: SchedulerConfig, + store: Arc, + session_manager: SessionManager, +} + +impl Scheduler { + pub fn new( + bus: Arc, + config: SchedulerConfig, + store: Arc, + session_manager: SessionManager, + ) -> Self { + Self { + bus, + config, + store, + session_manager, + } + } + + pub async fn run(&self, mut shutdown_rx: watch::Receiver) { + if !self.config.enabled { + tracing::info!("Scheduler disabled; skipping startup"); + return; + } + + if let Err(error) = self.sync_config_jobs() { + tracing::error!(error = %error, "Failed to sync scheduler config jobs"); + } + + let tick_ms = self.config.tick_resolution_ms.max(100); + let mut ticker = tokio::time::interval(std::time::Duration::from_millis(tick_ms)); + + tracing::info!(tick_resolution_ms = tick_ms, "Scheduler started"); + + loop { + tokio::select! { + _ = ticker.tick() => { + if let Err(error) = self.process_tick().await { + tracing::error!(error = %error, "Scheduler tick failed"); + } + } + changed = shutdown_rx.changed() => { + if changed.is_ok() && *shutdown_rx.borrow() { + tracing::info!("Scheduler shutdown requested"); + break; + } + } + } + } + } + + fn sync_config_jobs(&self) -> anyhow::Result<()> { + let now = Utc::now(); + for job in &self.config.jobs { + let runtime = RuntimeJob::from_config(job, now, self.config.misfire_policy)?; + self.store.upsert_scheduler_job(&runtime.to_upsert())?; + } + Ok(()) + } + + async fn process_tick(&self) -> anyhow::Result<()> { + let now = Utc::now(); + let jobs = self.store.list_scheduler_jobs(true)?; + + for record in jobs { + let Some(mut job) = RuntimeJob::from_record(&record, self.config.misfire_policy)? else { + continue; + }; + + if !job.is_due(now) { + continue; + } + + self.store.update_scheduler_job_runtime( + &job.id, + SchedulerJobState::Running, + job.last_status.clone(), + job.last_error.as_deref(), + job.run_count, + job.last_fired_at, + job.next_fire_at, + job.paused_at, + job.completed_at, + )?; + + let execution_result = self.execute_job(&job).await; + job.after_execution(now, execution_result.as_ref().err().map(|err| err.to_string()), self.config.misfire_policy)?; + + let status = if execution_result.is_ok() { + Some(SchedulerJobStatus::Ok) + } else { + Some(SchedulerJobStatus::Error) + }; + + if let Err(error) = &execution_result { + tracing::error!(job_id = %job.id, error = %error, "Scheduler job failed"); + } + + self.store.update_scheduler_job_runtime( + &job.id, + job.state.clone(), + status, + job.last_error.as_deref(), + job.run_count, + job.last_fired_at, + job.next_fire_at, + job.paused_at, + job.completed_at, + )?; + } + + Ok(()) + } + + async fn execute_job(&self, job: &RuntimeJob) -> anyhow::Result<()> { + match job.kind { + SchedulerJobKind::OutboundMessage => { + let message = build_outbound_message(job)?; + self.bus.publish_outbound(message).await?; + } + SchedulerJobKind::InternalEvent => { + execute_internal_event(&self.session_manager, job).await?; + } + } + + Ok(()) + } +} + +#[derive(Debug, Clone)] +struct RuntimeJob { + id: String, + kind: SchedulerJobKind, + schedule: SchedulerSchedule, + target: SchedulerJobTarget, + payload: serde_json::Value, + enabled: bool, + state: SchedulerJobState, + last_status: Option, + last_error: Option, + run_count: i64, + max_runs: Option, + last_fired_at: Option, + next_fire_at: Option, + paused_at: Option, + completed_at: Option, + interval_secs: i64, + startup_delay_secs: i64, +} + +impl RuntimeJob { + fn from_config( + job: &SchedulerJobConfig, + now: DateTime, + misfire_policy: SchedulerMisfirePolicy, + ) -> anyhow::Result { + let schedule = job.resolved_schedule()?; + let initial_state = if job.enabled { + SchedulerJobState::Scheduled + } else { + SchedulerJobState::Paused + }; + let next_fire_at = if job.enabled { + compute_initial_next_fire_at(&schedule, now, None, misfire_policy)? + } else { + None + }; + + Ok(Self { + id: job.id.clone(), + kind: job.kind.clone(), + interval_secs: job.interval_secs as i64, + startup_delay_secs: job.startup_delay_secs as i64, + schedule, + target: job.target.clone(), + payload: job.payload.clone(), + enabled: job.enabled, + state: initial_state, + last_status: None, + last_error: None, + run_count: 0, + max_runs: None, + last_fired_at: None, + next_fire_at, + paused_at: None, + completed_at: None, + }) + } + + fn from_record( + record: &SchedulerJobRecord, + misfire_policy: SchedulerMisfirePolicy, + ) -> anyhow::Result> { + let kind = match record.kind.as_str() { + "internal_event" => SchedulerJobKind::InternalEvent, + "outbound_message" => SchedulerJobKind::OutboundMessage, + other => { + tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind"); + return Ok(None); + } + }; + + let schedule = deserialize_schedule(&record.schedule, record.interval_secs, record.startup_delay_secs)?; + let now = Utc::now(); + let next_fire_at = match (record.enabled, record.state.clone(), record.next_fire_at) { + (false, _, _) => None, + (_, SchedulerJobState::Paused, _) => None, + (_, SchedulerJobState::Completed, _) => None, + (_, _, some_next) if some_next.is_some() => some_next, + _ => compute_initial_next_fire_at(&schedule, now, record.last_fired_at, misfire_policy)?, + }; + + Ok(Some(Self { + id: record.id.clone(), + kind, + schedule, + target: record.target.clone().try_into()?, + payload: record.payload.clone(), + enabled: record.enabled, + state: record.state.clone(), + last_status: record.last_status.clone(), + last_error: record.last_error.clone(), + run_count: record.run_count, + max_runs: record.max_runs, + last_fired_at: record.last_fired_at, + next_fire_at, + paused_at: record.paused_at, + completed_at: record.completed_at, + interval_secs: record.interval_secs, + startup_delay_secs: record.startup_delay_secs, + })) + } + + fn is_due(&self, now: DateTime) -> bool { + self.enabled + && self.state == SchedulerJobState::Scheduled + && self.next_fire_at.map(|value| value <= now.timestamp_millis()).unwrap_or(false) + } + + fn after_execution( + &mut self, + now: DateTime, + last_error: Option, + misfire_policy: SchedulerMisfirePolicy, + ) -> anyhow::Result<()> { + self.run_count += 1; + self.last_fired_at = Some(now.timestamp_millis()); + self.last_error = last_error; + + if self.schedule.is_one_shot() { + self.state = SchedulerJobState::Completed; + self.next_fire_at = None; + self.completed_at = Some(now.timestamp_millis()); + return Ok(()); + } + + if let Some(max_runs) = self.max_runs { + if self.run_count >= max_runs { + self.state = SchedulerJobState::Completed; + self.next_fire_at = None; + self.completed_at = Some(now.timestamp_millis()); + return Ok(()); + } + } + + let reference_ms = self.next_fire_at.or(self.last_fired_at); + self.state = SchedulerJobState::Scheduled; + self.completed_at = None; + self.next_fire_at = compute_next_fire_at(&self.schedule, now, reference_ms, misfire_policy)?; + Ok(()) + } + + fn to_upsert(&self) -> SchedulerJobUpsert { + SchedulerJobUpsert { + id: self.id.clone(), + kind: match self.kind { + SchedulerJobKind::InternalEvent => "internal_event".to_string(), + SchedulerJobKind::OutboundMessage => "outbound_message".to_string(), + }, + schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})), + interval_secs: self.interval_secs, + startup_delay_secs: self.startup_delay_secs, + target: serde_json::to_value(&self.target).unwrap_or_else(|_| serde_json::json!({})), + payload: self.payload.clone(), + enabled: self.enabled, + state: self.state.clone(), + last_status: self.last_status.clone(), + last_error: self.last_error.clone(), + run_count: self.run_count, + max_runs: self.max_runs, + last_fired_at: self.last_fired_at, + next_fire_at: self.next_fire_at, + paused_at: self.paused_at, + completed_at: self.completed_at, + } + } +} + +fn deserialize_schedule( + schedule_json: &serde_json::Value, + interval_secs: i64, + startup_delay_secs: i64, +) -> anyhow::Result { + if !schedule_json.is_null() && schedule_json != &serde_json::json!({}) { + return Ok(serde_json::from_value(schedule_json.clone())?); + } + + if interval_secs > 0 { + return Ok(SchedulerSchedule::Interval { + seconds: interval_secs as u64, + startup_delay_secs: startup_delay_secs as u64, + }); + } + + anyhow::bail!("scheduler job is missing schedule definition") +} + +fn compute_initial_next_fire_at( + schedule: &SchedulerSchedule, + now: DateTime, + last_fired_at: Option, + misfire_policy: SchedulerMisfirePolicy, +) -> anyhow::Result> { + match last_fired_at { + Some(last_fired_at) => compute_next_fire_at(schedule, now, Some(last_fired_at), misfire_policy), + None => match schedule { + SchedulerSchedule::Delay { seconds } => Ok(Some((now + ChronoDuration::seconds(*seconds as i64)).timestamp_millis())), + SchedulerSchedule::Interval { + seconds, + startup_delay_secs, + } => { + let delay = if *startup_delay_secs > 0 { *startup_delay_secs } else { *seconds }; + Ok(Some((now + ChronoDuration::seconds(delay as i64)).timestamp_millis())) + } + SchedulerSchedule::At { timestamp } => Ok(Some(parse_rfc3339_to_utc(timestamp)?.timestamp_millis())), + SchedulerSchedule::Cron { expression } => { + let schedule = parse_scheduler_cron(expression)?; + Ok(schedule.after(&now).next().map(|next| next.timestamp_millis())) + } + }, + } +} + +fn compute_next_fire_at( + schedule: &SchedulerSchedule, + now: DateTime, + reference_ms: Option, + misfire_policy: SchedulerMisfirePolicy, +) -> anyhow::Result> { + match schedule { + SchedulerSchedule::Delay { .. } | SchedulerSchedule::At { .. } => Ok(None), + SchedulerSchedule::Interval { seconds, .. } => { + let interval_ms = (*seconds as i64) * 1_000; + let baseline = reference_ms.unwrap_or_else(|| now.timestamp_millis()); + let next_ms = match misfire_policy { + SchedulerMisfirePolicy::Skip => now.timestamp_millis() + interval_ms, + SchedulerMisfirePolicy::CatchUp => { + let mut candidate = baseline + interval_ms; + while candidate <= now.timestamp_millis() { + candidate += interval_ms; + } + candidate + } + }; + Ok(Some(next_ms)) + } + SchedulerSchedule::Cron { expression } => { + let schedule = parse_scheduler_cron(expression)?; + let anchor = match misfire_policy { + SchedulerMisfirePolicy::Skip => now, + SchedulerMisfirePolicy::CatchUp => reference_ms + .and_then(ts_millis_to_utc) + .unwrap_or(now), + }; + Ok(schedule.after(&anchor).next().map(|next| next.timestamp_millis())) + } + } +} + +fn parse_rfc3339_to_utc(value: &str) -> anyhow::Result> { + Ok(DateTime::parse_from_rfc3339(value)?.with_timezone(&Utc)) +} + +fn parse_scheduler_cron(expression: &str) -> anyhow::Result { + let normalized = normalize_cron_expression(expression); + Ok(cron::Schedule::from_str(&normalized)?) +} + +fn normalize_cron_expression(expression: &str) -> String { + let parts: Vec<&str> = expression.split_whitespace().collect(); + if parts.len() == 5 { + format!("0 {}", expression.trim()) + } else { + expression.trim().to_string() + } +} + +fn ts_millis_to_utc(value: i64) -> Option> { + Utc.timestamp_millis_opt(value).single() +} + +fn build_outbound_message(job: &RuntimeJob) -> anyhow::Result { + let channel = job + .target + .channel + .clone() + .ok_or_else(|| anyhow::anyhow!("outbound scheduler job requires target.channel"))?; + let chat_id = job + .target + .chat_id + .clone() + .ok_or_else(|| anyhow::anyhow!("outbound scheduler job requires target.chat_id"))?; + let content = job + .payload + .get("content") + .and_then(|value| value.as_str()) + .ok_or_else(|| anyhow::anyhow!("outbound scheduler job payload.content must be a string"))?; + + let mut metadata = HashMap::new(); + metadata.insert("scheduler_job_id".to_string(), job.id.clone()); + + Ok(OutboundMessage::assistant( + channel, + chat_id, + content.to_string(), + job.target.reply_to.clone(), + metadata, + )) +} + +async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJob) -> anyhow::Result<()> { + let event = job + .payload + .get("event") + .and_then(|value| value.as_str()) + .unwrap_or("session_cleanup"); + + match event { + "session_cleanup" => { + let removed = session_manager.cleanup_expired_sessions().await; + tracing::info!(job_id = %job.id, removed, "Scheduler session cleanup completed"); + Ok(()) + } + other => anyhow::bail!("unsupported internal scheduler event: {}", other), + } +} + +impl TryFrom for SchedulerJobTarget { + type Error = anyhow::Error; + + fn try_from(value: serde_json::Value) -> Result { + Ok(serde_json::from_value(value)?) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn runtime_job_skip_policy_advances_from_now() { + let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap(); + let next = compute_next_fire_at( + &SchedulerSchedule::Interval { + seconds: 60, + startup_delay_secs: 0, + }, + now, + Some(now.timestamp_millis() - 10 * 60 * 1_000), + SchedulerMisfirePolicy::Skip, + ) + .unwrap() + .unwrap(); + + assert_eq!(next, now.timestamp_millis() + 60_000); + } + + #[test] + fn runtime_job_catch_up_policy_moves_past_now() { + let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap(); + let next = compute_next_fire_at( + &SchedulerSchedule::Interval { + seconds: 60, + startup_delay_secs: 0, + }, + now, + Some(now.timestamp_millis() - 10 * 60 * 1_000), + SchedulerMisfirePolicy::CatchUp, + ) + .unwrap() + .unwrap(); + + assert!(next > now.timestamp_millis()); + assert_eq!((next - now.timestamp_millis()) % 60_000, 0); + } + + #[test] + fn runtime_job_from_record_uses_persisted_schedule() { + let record = SchedulerJobRecord { + id: "heartbeat".to_string(), + kind: "outbound_message".to_string(), + schedule: serde_json::json!({ + "type": "interval", + "seconds": 120, + "startup_delay_secs": 10 + }), + interval_secs: 0, + startup_delay_secs: 0, + target: serde_json::json!({ + "channel": "feishu", + "chat_id": "oc_demo" + }), + payload: serde_json::json!({"content": "hello"}), + enabled: true, + state: SchedulerJobState::Scheduled, + last_status: None, + last_error: None, + run_count: 0, + max_runs: None, + last_fired_at: None, + next_fire_at: Some(1_700_000_010_000), + paused_at: None, + completed_at: None, + created_at: 1_700_000_000_000, + updated_at: 1_700_000_000_000, + }; + + let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip) + .unwrap() + .unwrap(); + + assert_eq!(job.schedule, SchedulerSchedule::Interval { + seconds: 120, + startup_delay_secs: 10, + }); + assert_eq!(job.next_fire_at, Some(1_700_000_010_000)); + } +} \ No newline at end of file diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 9db2395..95c3bee 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -77,11 +77,118 @@ pub struct MemoryUpsert { pub source_chat_id: Option, } +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum SchedulerJobState { + Scheduled, + Running, + Paused, + Completed, +} + +impl SchedulerJobState { + pub fn as_str(&self) -> &'static str { + match self { + SchedulerJobState::Scheduled => "scheduled", + SchedulerJobState::Running => "running", + SchedulerJobState::Paused => "paused", + SchedulerJobState::Completed => "completed", + } + } + + pub fn from_str(value: &str) -> Option { + match value { + "scheduled" => Some(Self::Scheduled), + "running" => Some(Self::Running), + "paused" => Some(Self::Paused), + "completed" => Some(Self::Completed), + _ => None, + } + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum SchedulerJobStatus { + Ok, + Error, + Skipped, +} + +impl SchedulerJobStatus { + pub fn as_str(&self) -> &'static str { + match self { + SchedulerJobStatus::Ok => "ok", + SchedulerJobStatus::Error => "error", + SchedulerJobStatus::Skipped => "skipped", + } + } + + pub fn from_str(value: &str) -> Option { + match value { + "ok" => Some(Self::Ok), + "error" => Some(Self::Error), + "skipped" => Some(Self::Skipped), + _ => None, + } + } +} + +impl Default for SchedulerJobState { + fn default() -> Self { + Self::Scheduled + } +} + #[derive(Clone)] pub struct SessionStore { conn: Arc>, } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct SchedulerJobRecord { + pub id: String, + pub kind: String, + pub schedule: serde_json::Value, + pub interval_secs: i64, + pub startup_delay_secs: i64, + pub target: serde_json::Value, + pub payload: serde_json::Value, + pub enabled: bool, + pub state: SchedulerJobState, + pub last_status: Option, + pub last_error: Option, + pub run_count: i64, + pub max_runs: Option, + pub last_fired_at: Option, + pub next_fire_at: Option, + pub paused_at: Option, + pub completed_at: Option, + pub created_at: i64, + pub updated_at: i64, +} + +#[derive(Debug, Clone)] +pub struct SchedulerJobUpsert { + pub id: String, + pub kind: String, + pub schedule: serde_json::Value, + pub interval_secs: i64, + pub startup_delay_secs: i64, + pub target: serde_json::Value, + pub payload: serde_json::Value, + pub enabled: bool, + pub state: SchedulerJobState, + pub last_status: Option, + pub last_error: Option, + pub run_count: i64, + pub max_runs: Option, + pub last_fired_at: Option, + pub next_fire_at: Option, + pub paused_at: Option, + pub completed_at: Option, +} + impl SessionStore { pub fn new() -> Result { let db_path = default_session_db_path()?; @@ -185,6 +292,31 @@ impl SessionStore { CREATE INDEX IF NOT EXISTS idx_memories_source_session ON memories(source_session_id, updated_at DESC); + CREATE TABLE IF NOT EXISTS scheduler_jobs ( + id TEXT PRIMARY KEY, + kind TEXT NOT NULL, + schedule_json TEXT NOT NULL DEFAULT '{}', + interval_secs INTEGER NOT NULL DEFAULT 0, + startup_delay_secs INTEGER NOT NULL DEFAULT 0, + target_json TEXT NOT NULL, + payload_json TEXT NOT NULL, + enabled INTEGER NOT NULL DEFAULT 1, + state TEXT NOT NULL DEFAULT 'scheduled', + last_status TEXT, + last_error TEXT, + run_count INTEGER NOT NULL DEFAULT 0, + max_runs INTEGER, + last_fired_at INTEGER, + next_fire_at INTEGER, + paused_at INTEGER, + completed_at INTEGER, + created_at INTEGER NOT NULL, + updated_at INTEGER NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_scheduler_jobs_enabled_next_fire + ON scheduler_jobs(enabled, state, next_fire_at ASC); + CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5( namespace, memory_key, @@ -213,6 +345,7 @@ impl SessionStore { )?; ensure_sessions_schema(&conn)?; + ensure_scheduler_schema(&conn)?; Ok(Self { conn: Arc::new(Mutex::new(conn)), @@ -717,6 +850,162 @@ impl SessionStore { Ok(changed > 0) } + pub fn upsert_scheduler_job(&self, input: &SchedulerJobUpsert) -> Result { + let now = current_timestamp(); + let conn = self.conn.lock().expect("session db mutex poisoned"); + conn.execute( + " + INSERT INTO scheduler_jobs ( + id, kind, schedule_json, interval_secs, startup_delay_secs, + target_json, payload_json, enabled, state, last_status, last_error, + run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at, + created_at, updated_at + ) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?18) + ON CONFLICT(id) DO UPDATE SET + kind = excluded.kind, + schedule_json = excluded.schedule_json, + interval_secs = excluded.interval_secs, + startup_delay_secs = excluded.startup_delay_secs, + target_json = excluded.target_json, + payload_json = excluded.payload_json, + enabled = excluded.enabled, + state = excluded.state, + last_status = excluded.last_status, + last_error = excluded.last_error, + run_count = excluded.run_count, + max_runs = excluded.max_runs, + last_fired_at = excluded.last_fired_at, + next_fire_at = excluded.next_fire_at, + paused_at = excluded.paused_at, + completed_at = excluded.completed_at, + updated_at = excluded.updated_at + ", + params![ + input.id, + input.kind, + serde_json::to_string(&input.schedule)?, + input.interval_secs, + input.startup_delay_secs, + serde_json::to_string(&input.target)?, + serde_json::to_string(&input.payload)?, + if input.enabled { 1 } else { 0 }, + input.state.as_str(), + input.last_status.as_ref().map(SchedulerJobStatus::as_str), + input.last_error, + input.run_count, + input.max_runs, + input.last_fired_at, + input.next_fire_at, + input.paused_at, + input.completed_at, + now, + ], + )?; + drop(conn); + + self.get_scheduler_job(&input.id)? + .ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into()) + } + + pub fn get_scheduler_job(&self, job_id: &str) -> Result, StorageError> { + let conn = self.conn.lock().expect("session db mutex poisoned"); + let mut stmt = conn.prepare( + " + SELECT id, kind, schedule_json, interval_secs, startup_delay_secs, + target_json, payload_json, enabled, state, last_status, last_error, + run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at, + created_at, updated_at + FROM scheduler_jobs + WHERE id = ?1 + ", + )?; + + stmt.query_row(params![job_id], map_scheduler_job_record) + .optional() + .map_err(StorageError::from) + } + + pub fn list_scheduler_jobs(&self, enabled_only: bool) -> Result, StorageError> { + let conn = self.conn.lock().expect("session db mutex poisoned"); + let sql = if enabled_only { + " + SELECT id, kind, schedule_json, interval_secs, startup_delay_secs, + target_json, payload_json, enabled, state, last_status, last_error, + run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at, + created_at, updated_at + FROM scheduler_jobs + WHERE enabled = 1 + ORDER BY COALESCE(next_fire_at, created_at) ASC, id ASC + " + } else { + " + SELECT id, kind, schedule_json, interval_secs, startup_delay_secs, + target_json, payload_json, enabled, state, last_status, last_error, + run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at, + created_at, updated_at + FROM scheduler_jobs + ORDER BY COALESCE(next_fire_at, created_at) ASC, id ASC + " + }; + + let mut stmt = conn.prepare(sql)?; + let rows = stmt.query_map([], map_scheduler_job_record)?; + let mut jobs = Vec::new(); + for row in rows { + jobs.push(row?); + } + Ok(jobs) + } + + pub fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> { + let conn = self.conn.lock().expect("session db mutex poisoned"); + conn.execute("DELETE FROM scheduler_jobs WHERE id = ?1", params![job_id])?; + Ok(()) + } + + pub fn update_scheduler_job_runtime( + &self, + job_id: &str, + state: SchedulerJobState, + last_status: Option, + last_error: Option<&str>, + run_count: i64, + last_fired_at: Option, + next_fire_at: Option, + paused_at: Option, + completed_at: Option, + ) -> Result<(), StorageError> { + let conn = self.conn.lock().expect("session db mutex poisoned"); + conn.execute( + " + UPDATE scheduler_jobs + SET state = ?2, + last_status = ?3, + last_error = ?4, + run_count = ?5, + last_fired_at = ?6, + next_fire_at = ?7, + paused_at = ?8, + completed_at = ?9, + updated_at = ?10 + WHERE id = ?1 + ", + params![ + job_id, + state.as_str(), + last_status.as_ref().map(SchedulerJobStatus::as_str), + last_error, + run_count, + last_fired_at, + next_fire_at, + paused_at, + completed_at, + current_timestamp(), + ], + )?; + Ok(()) + } + pub fn search_memories( &self, scope_kind: &str, @@ -933,6 +1222,64 @@ fn map_memory_record(row: &rusqlite::Row<'_>) -> rusqlite::Result }) } +fn map_scheduler_job_record(row: &rusqlite::Row<'_>) -> rusqlite::Result { + let schedule_json: String = row.get(2)?; + let target_json: String = row.get(5)?; + let payload_json: String = row.get(6)?; + let state: String = row.get(8)?; + let last_status: Option = row.get(9)?; + + let schedule = serde_json::from_str(&schedule_json).map_err(|err| { + rusqlite::Error::FromSqlConversionFailure( + 2, + rusqlite::types::Type::Text, + Box::new(err), + ) + })?; + let target = serde_json::from_str(&target_json).map_err(|err| { + rusqlite::Error::FromSqlConversionFailure( + 5, + rusqlite::types::Type::Text, + Box::new(err), + ) + })?; + let payload = serde_json::from_str(&payload_json).map_err(|err| { + rusqlite::Error::FromSqlConversionFailure( + 6, + rusqlite::types::Type::Text, + Box::new(err), + ) + })?; + + Ok(SchedulerJobRecord { + id: row.get(0)?, + kind: row.get(1)?, + schedule, + interval_secs: row.get(3)?, + startup_delay_secs: row.get(4)?, + target, + payload, + enabled: row.get::<_, i64>(7)? != 0, + state: SchedulerJobState::from_str(&state).ok_or_else(|| { + rusqlite::Error::FromSqlConversionFailure( + 8, + rusqlite::types::Type::Text, + format!("invalid scheduler job state: {}", state).into(), + ) + })?, + last_status: last_status.and_then(|value| SchedulerJobStatus::from_str(&value)), + last_error: row.get(10)?, + run_count: row.get(11)?, + max_runs: row.get(12)?, + last_fired_at: row.get(13)?, + next_fire_at: row.get(14)?, + paused_at: row.get(15)?, + completed_at: row.get(16)?, + created_at: row.get(17)?, + updated_at: row.get(18)?, + }) +} + fn ensure_sessions_schema(conn: &Connection) -> Result<(), StorageError> { if !has_column(conn, "sessions", "reset_cutoff_seq")? { conn.execute( @@ -958,6 +1305,66 @@ fn ensure_sessions_schema(conn: &Connection) -> Result<(), StorageError> { Ok(()) } +fn ensure_scheduler_schema(conn: &Connection) -> Result<(), StorageError> { + if !has_column(conn, "scheduler_jobs", "schedule_json")? { + conn.execute( + "ALTER TABLE scheduler_jobs ADD COLUMN schedule_json TEXT NOT NULL DEFAULT '{}'", + [], + )?; + } + + if !has_column(conn, "scheduler_jobs", "state")? { + conn.execute( + "ALTER TABLE scheduler_jobs ADD COLUMN state TEXT NOT NULL DEFAULT 'scheduled'", + [], + )?; + } + + if !has_column(conn, "scheduler_jobs", "last_status")? { + conn.execute( + "ALTER TABLE scheduler_jobs ADD COLUMN last_status TEXT", + [], + )?; + } + + if !has_column(conn, "scheduler_jobs", "last_error")? { + conn.execute( + "ALTER TABLE scheduler_jobs ADD COLUMN last_error TEXT", + [], + )?; + } + + if !has_column(conn, "scheduler_jobs", "run_count")? { + conn.execute( + "ALTER TABLE scheduler_jobs ADD COLUMN run_count INTEGER NOT NULL DEFAULT 0", + [], + )?; + } + + if !has_column(conn, "scheduler_jobs", "max_runs")? { + conn.execute( + "ALTER TABLE scheduler_jobs ADD COLUMN max_runs INTEGER", + [], + )?; + } + + if !has_column(conn, "scheduler_jobs", "paused_at")? { + conn.execute( + "ALTER TABLE scheduler_jobs ADD COLUMN paused_at INTEGER", + [], + )?; + } + + if !has_column(conn, "scheduler_jobs", "completed_at")? { + conn.execute( + "ALTER TABLE scheduler_jobs ADD COLUMN completed_at INTEGER", + [], + )?; + } + + Ok(()) +} + fn has_column(conn: &Connection, table_name: &str, column_name: &str) -> Result { let pragma = format!("PRAGMA table_info({})", table_name); let mut stmt = conn.prepare(&pragma)?; @@ -1503,4 +1910,65 @@ mod tests { assert!(hits.iter().any(|memory| memory.memory_key == "editor")); assert!(hits.iter().any(|memory| memory.memory_key == "quality")); } + + #[test] + fn test_scheduler_job_roundtrip_and_runtime_update() { + let store = SessionStore::in_memory().unwrap(); + + let saved = store + .upsert_scheduler_job(&SchedulerJobUpsert { + id: "heartbeat".to_string(), + kind: "outbound_message".to_string(), + schedule: serde_json::json!({ + "type": "interval", + "seconds": 300, + "startup_delay_secs": 10, + }), + interval_secs: 300, + startup_delay_secs: 10, + target: serde_json::json!({ + "channel": "feishu", + "chat_id": "oc_demo", + }), + payload: serde_json::json!({ + "content": "heartbeat", + }), + enabled: true, + state: SchedulerJobState::Scheduled, + last_status: None, + last_error: None, + run_count: 0, + max_runs: Some(3), + last_fired_at: None, + next_fire_at: Some(1_700_000_000_000), + paused_at: None, + completed_at: None, + }) + .unwrap(); + + assert_eq!(saved.id, "heartbeat"); + assert_eq!(saved.kind, "outbound_message"); + assert_eq!(saved.state, SchedulerJobState::Scheduled); + assert_eq!(saved.max_runs, Some(3)); + + store + .update_scheduler_job_runtime( + "heartbeat", + SchedulerJobState::Completed, + Some(SchedulerJobStatus::Ok), + None, + 1, + Some(1_700_000_000_000), + None, + None, + Some(1_700_000_000_100), + ) + .unwrap(); + + let fetched = store.get_scheduler_job("heartbeat").unwrap().unwrap(); + assert_eq!(fetched.state, SchedulerJobState::Completed); + assert_eq!(fetched.last_status, Some(SchedulerJobStatus::Ok)); + assert_eq!(fetched.run_count, 1); + assert_eq!(fetched.completed_at, Some(1_700_000_000_100)); + } } \ No newline at end of file diff --git a/src/tools/mod.rs b/src/tools/mod.rs index 384f447..53b29ef 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -7,6 +7,7 @@ pub mod http_request; pub mod memory_manage; pub mod memory_search; pub mod registry; +pub mod scheduler_manage; pub mod schema; pub mod skill_manage; pub mod traits; @@ -21,6 +22,7 @@ pub use http_request::HttpRequestTool; pub use memory_manage::MemoryManageTool; pub use memory_search::MemorySearchTool; pub use registry::ToolRegistry; +pub use scheduler_manage::SchedulerManageTool; pub use schema::{CleaningStrategy, SchemaCleanr}; pub use skill_manage::{SkillListTool, SkillManageTool}; pub use traits::{Tool, ToolContext, ToolResult}; diff --git a/src/tools/scheduler_manage.rs b/src/tools/scheduler_manage.rs new file mode 100644 index 0000000..a73a378 --- /dev/null +++ b/src/tools/scheduler_manage.rs @@ -0,0 +1,291 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use serde_json::json; + +use crate::config::SchedulerSchedule; +use crate::storage::{ + SchedulerJobRecord, SchedulerJobState, SchedulerJobUpsert, SessionStore, +}; +use crate::tools::traits::{Tool, ToolResult}; + +pub struct SchedulerManageTool { + store: Arc, +} + +impl SchedulerManageTool { + pub fn new(store: Arc) -> Self { + Self { store } + } +} + +#[async_trait] +impl Tool for SchedulerManageTool { + fn name(&self) -> &str { + "scheduler_manage" + } + + fn description(&self) -> &str { + "Manage DB-backed scheduled jobs. Supports actions: list, get, put, delete, pause, resume. Jobs persist in SQLite and are executed by the scheduler runtime." + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": ["list", "get", "put", "delete", "pause", "resume"] + }, + "id": { + "type": "string", + "description": "Job id" + }, + "enabled_only": { + "type": "boolean", + "description": "Only for list action" + }, + "kind": { + "type": "string", + "enum": ["internal_event", "outbound_message"] + }, + "schedule": { + "type": "object", + "description": "Schedule object, for example {type: 'interval', seconds: 300} or {type: 'cron', expression: '0 9 * * *'}" + }, + "target": { + "type": "object" + }, + "payload": { + "type": "object" + }, + "max_runs": { + "type": ["integer", "null"] + } + }, + "required": ["action"] + }) + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + let action = match args.get("action").and_then(|value| value.as_str()) { + Some(action) => action, + None => return Ok(error_result("Missing required parameter: action")), + }; + + let output = match action { + "list" => { + let enabled_only = args + .get("enabled_only") + .and_then(|value| value.as_bool()) + .unwrap_or(false); + let jobs = self.store.list_scheduler_jobs(enabled_only)?; + json!(jobs.iter().map(record_to_json).collect::>()) + } + "get" => { + let id = require_str(&args, "id")?; + match self.store.get_scheduler_job(id)? { + Some(record) => record_to_json(&record), + None => return Ok(error_result(&format!("scheduler job '{}' not found", id))), + } + } + "put" => { + let input = build_upsert(&args)?; + let record = self.store.upsert_scheduler_job(&input)?; + record_to_json(&record) + } + "delete" => { + let id = require_str(&args, "id")?; + self.store.delete_scheduler_job(id)?; + json!({"status": "deleted", "id": id}) + } + "pause" => { + let id = require_str(&args, "id")?; + let record = self + .store + .get_scheduler_job(id)? + .ok_or_else(|| anyhow::anyhow!("scheduler job '{}' not found", id))?; + let mut input = record_to_upsert(&record); + input.enabled = false; + input.state = SchedulerJobState::Paused; + input.paused_at = Some(current_timestamp()); + input.next_fire_at = None; + let saved = self.store.upsert_scheduler_job(&input)?; + record_to_json(&saved) + } + "resume" => { + let id = require_str(&args, "id")?; + let record = self + .store + .get_scheduler_job(id)? + .ok_or_else(|| anyhow::anyhow!("scheduler job '{}' not found", id))?; + let mut input = record_to_upsert(&record); + input.enabled = true; + input.state = SchedulerJobState::Scheduled; + input.paused_at = None; + input.completed_at = None; + input.next_fire_at = None; + let saved = self.store.upsert_scheduler_job(&input)?; + record_to_json(&saved) + } + _ => return Ok(error_result("Unsupported action")), + }; + + Ok(ToolResult { + success: true, + output: serde_json::to_string_pretty(&output)?, + error: None, + }) + } +} + +fn build_upsert(args: &serde_json::Value) -> anyhow::Result { + let id = require_str(args, "id")?.to_string(); + let kind = require_str(args, "kind")?.to_string(); + let schedule_value = args + .get("schedule") + .cloned() + .ok_or_else(|| anyhow::anyhow!("Missing required parameter: schedule"))?; + let schedule: SchedulerSchedule = serde_json::from_value(schedule_value.clone())?; + schedule.validate(&id)?; + + let (interval_secs, startup_delay_secs) = match &schedule { + SchedulerSchedule::Interval { + seconds, + startup_delay_secs, + } => (*seconds as i64, *startup_delay_secs as i64), + _ => (0, 0), + }; + + Ok(SchedulerJobUpsert { + id, + kind, + schedule: serde_json::to_value(schedule)?, + interval_secs, + startup_delay_secs, + target: args.get("target").cloned().unwrap_or_else(|| json!({})), + payload: args.get("payload").cloned().unwrap_or_else(|| json!({})), + enabled: args.get("enabled").and_then(|value| value.as_bool()).unwrap_or(true), + state: if args.get("enabled").and_then(|value| value.as_bool()).unwrap_or(true) { + SchedulerJobState::Scheduled + } else { + SchedulerJobState::Paused + }, + last_status: None, + last_error: None, + run_count: 0, + max_runs: args.get("max_runs").and_then(|value| value.as_i64()), + last_fired_at: None, + next_fire_at: None, + paused_at: None, + completed_at: None, + }) +} + +fn record_to_json(record: &SchedulerJobRecord) -> serde_json::Value { + json!({ + "id": record.id, + "kind": record.kind, + "schedule": record.schedule, + "target": record.target, + "payload": record.payload, + "enabled": record.enabled, + "state": record.state, + "last_status": record.last_status, + "last_error": record.last_error, + "run_count": record.run_count, + "max_runs": record.max_runs, + "last_fired_at": record.last_fired_at, + "next_fire_at": record.next_fire_at, + "paused_at": record.paused_at, + "completed_at": record.completed_at, + "created_at": record.created_at, + "updated_at": record.updated_at, + }) +} + +fn record_to_upsert(record: &SchedulerJobRecord) -> SchedulerJobUpsert { + SchedulerJobUpsert { + id: record.id.clone(), + kind: record.kind.clone(), + schedule: record.schedule.clone(), + interval_secs: record.interval_secs, + startup_delay_secs: record.startup_delay_secs, + target: record.target.clone(), + payload: record.payload.clone(), + enabled: record.enabled, + state: record.state.clone(), + last_status: record.last_status.clone(), + last_error: record.last_error.clone(), + run_count: record.run_count, + max_runs: record.max_runs, + last_fired_at: record.last_fired_at, + next_fire_at: record.next_fire_at, + paused_at: record.paused_at, + completed_at: record.completed_at, + } +} + +fn require_str<'a>(args: &'a serde_json::Value, key: &str) -> anyhow::Result<&'a str> { + args.get(key) + .and_then(|value| value.as_str()) + .ok_or_else(|| anyhow::anyhow!("Missing required parameter: {}", key)) +} + +fn error_result(message: &str) -> ToolResult { + ToolResult { + success: false, + output: String::new(), + error: Some(message.to_string()), + } +} + +fn current_timestamp() -> i64 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_millis() as i64 +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_scheduler_manage_put_and_get() { + let store = Arc::new(SessionStore::in_memory().unwrap()); + let tool = SchedulerManageTool::new(store); + + let put_result = tool + .execute(json!({ + "action": "put", + "id": "heartbeat", + "kind": "outbound_message", + "schedule": { + "type": "interval", + "seconds": 60 + }, + "target": { + "channel": "feishu", + "chat_id": "oc_demo" + }, + "payload": { + "content": "ping" + } + })) + .await + .unwrap(); + assert!(put_result.success); + + let get_result = tool + .execute(json!({ + "action": "get", + "id": "heartbeat" + })) + .await + .unwrap(); + assert!(get_result.success); + assert!(get_result.output.contains("heartbeat")); + assert!(get_result.output.contains("outbound_message")); + } +} \ No newline at end of file