feat: add DB-backed scheduler for heartbeat, delayed jobs, interval jobs, and cron jobs

- Add scheduler module with SQLite persistence
- Support schedule types: delay, interval, at, cron
- Support job kinds: internal_event, outbound_message
- Add scheduler_manage tool for runtime management
- Add session_cleanup internal event for expired sessions
- Update memory tool usage prompt for better context awareness
- Add chrono and cron dependencies
This commit is contained in:
ooodc 2026-04-22 20:32:18 +08:00
parent 302e6ef6b9
commit a3ae8acde5
12 changed files with 1873 additions and 15 deletions

View File

@ -24,6 +24,8 @@ tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-appender = "0.2"
anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] }
cron = { version = "0.13", features = ["serde"] }
mime_guess = "2.0"
base64 = "0.22"
tempfile = "3"

View File

@ -81,3 +81,58 @@ Add skills in config.json:
"max_listed_skills": 32
}
}
Scheduler
PicoBot now includes a DB-backed scheduler for heartbeat, delayed jobs, interval jobs, one-shot absolute-time jobs, and cron jobs.
Current behavior:
- Scheduler runs as a background loop inside gateway lifecycle.
- Job definitions and runtime state are persisted in SQLite instead of JSON files.
- Supported schedule types: delay, interval, at, cron.
- Supported job kinds: internal_event, outbound_message.
- Built-in internal event: session_cleanup, used to clear expired in-memory channel sessions.
- Built-in management tool: scheduler_manage.
Config example:
{
"scheduler": {
"enabled": true,
"tick_resolution_ms": 1000,
"misfire_policy": "skip",
"jobs": [
{
"id": "session.cleanup",
"kind": "internal_event",
"schedule": {
"type": "interval",
"seconds": 300
},
"payload": {
"event": "session_cleanup"
}
},
{
"id": "daily.reminder",
"kind": "outbound_message",
"schedule": {
"type": "cron",
"expression": "0 9 * * *"
},
"target": {
"channel": "feishu",
"chat_id": "oc_xxx"
},
"payload": {
"content": "每日提醒"
}
}
]
}
}
Runtime management:
- Use scheduler_manage with action=list|get|put|delete|pause|resume.
- Jobs created by the tool are written into SQLite and picked up by the scheduler loop.
- Config-defined jobs are also synced into SQLite on startup.

View File

@ -18,10 +18,13 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
- 打开外键约束
- 自动建表和建索引
当前持久化只覆盖两类核心数据:
当前持久化覆盖以下核心数据:
- `sessions`:会话元数据
- `messages`:会话内的消息流水
- `skill_events`:技能发现和使用事件
- `memories`:长期记忆
- `scheduler_jobs`:定时任务定义和运行状态
内存中的 `Session` 负责运行态处理SQLite 负责跨进程、跨重启保留历史。整体设计是“内存缓存 + SQLite 事实来源”。
@ -145,9 +148,45 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
- `tool_calls_json` 表示“assistant 想调用哪些工具”
- `tool_call_id` 表示“这一条 tool 结果是在回应哪一次工具调用”
## 6. 数据写入流程
## 6. scheduler_jobs
### 6.1 创建会话
`scheduler_jobs` 是 PicoBot 计划任务的事实来源。它同时保存任务定义和最近一次运行后的状态,使 scheduler 在重启后仍能恢复调度。
字段说明:
| 字段 | 类型 | 含义 |
| --- | --- | --- |
| `id` | `TEXT PRIMARY KEY` | 任务唯一标识 |
| `kind` | `TEXT NOT NULL` | 任务类型,当前支持 `internal_event``outbound_message` |
| `schedule_json` | `TEXT NOT NULL` | 统一 schedule 定义JSON 形式保存 `delay` / `interval` / `at` / `cron` |
| `interval_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 |
| `startup_delay_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 |
| `target_json` | `TEXT NOT NULL` | 任务目标,例如 channel/chat_id |
| `payload_json` | `TEXT NOT NULL` | 任务执行载荷 |
| `enabled` | `INTEGER NOT NULL DEFAULT 1` | 是否启用 |
| `state` | `TEXT NOT NULL DEFAULT 'scheduled'` | 生命周期状态:`scheduled` / `running` / `paused` / `completed` |
| `last_status` | `TEXT` | 最近一次运行结果:`ok` / `error` / `skipped` |
| `last_error` | `TEXT` | 最近一次执行错误信息 |
| `run_count` | `INTEGER NOT NULL DEFAULT 0` | 已执行次数 |
| `max_runs` | `INTEGER` | 最大执行次数,达到后转为 completed |
| `last_fired_at` | `INTEGER` | 最近一次触发时间 |
| `next_fire_at` | `INTEGER` | 下一次计划触发时间 |
| `paused_at` | `INTEGER` | 暂停时间 |
| `completed_at` | `INTEGER` | 完成时间 |
| `created_at` | `INTEGER NOT NULL` | 创建时间 |
| `updated_at` | `INTEGER NOT NULL` | 最近更新时间 |
运行语义:
- scheduler 启动后会先把 config 中声明的 jobs 同步进 `scheduler_jobs`
- 运行时新建、暂停、恢复、删除任务可以通过 `scheduler_manage` 工具直接操作数据库。
- repeating job 会在每次执行后更新 `run_count``last_fired_at``next_fire_at`
- one-shot job`delay` / `at`)完成后会进入 `completed` 状态,不再调度。
- 内置 `internal_event` 当前包含 `session_cleanup`,用于回收超时的内存 session 缓存。
## 7. 数据写入流程
### 7.1 创建会话
有两种进入方式:
@ -161,7 +200,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
- `deleted_at = NULL`
- `message_count = 0`
### 6.2 追加消息
### 7.2 追加消息
消息持久化统一走 `append_message()`,写入过程是一个 SQLite 事务:
@ -177,7 +216,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
另外,只有 `role = user` 的消息会递增 `user_turn_count``system``assistant``tool` 消息不会影响周期注入阈值的判定。
### 6.3 读取历史
### 7.3 读取历史
`load_messages(session_id)` 会按 `seq ASC` 读取当前活动段历史,并把 JSON 字段反序列化回 `ChatMessage`。活动段的定义是:
@ -188,9 +227,9 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
因此运行态恢复的是“当前活动段的逻辑顺序”,而不是简单按创建时间排序。只要 `seq` 连续,重放顺序就稳定。
## 7. 典型时序
## 8. 典型时序
### 7.1 普通问答
### 8.1 普通问答
1. 用户消息进入网关。
2. 如果数据库中没有对应会话,先插入一条 `sessions`
@ -199,7 +238,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
5. assistant 回复写入 `messages``role = assistant`
6. 会话的 `message_count` 增加 2`last_active_at` 更新时间。
### 7.2 带工具调用的问答
### 8.2 带工具调用的问答
1. assistant 先生成一条带 `tool_calls_json` 的消息,`role = assistant`
2. 系统执行对应工具。
@ -213,16 +252,16 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
- 每个工具调用返回了什么
- 最终 assistant 给了什么结论
## 8. 会话生命周期操作
## 9. 会话生命周期操作
### 8.1 重命名
### 9.1 重命名
`rename_session(session_id, title)`
- 更新 `sessions.title`
- 更新 `sessions.updated_at`
### 8.2 归档
### 9.2 归档
`archive_session(session_id)`
@ -235,7 +274,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
- `include_archived = false` 只返回 `archived_at IS NULL` 的会话
- `include_archived = true` 返回全部未删除会话
### 8.3 清空消息
### 9.3 清空消息
`clear_messages(session_id)`

View File

@ -21,7 +21,7 @@ use std::time::Instant;
const MAX_TOOL_RESULT_CHARS: usize = 16_000;
/// Minimum characters to keep when truncating
const TRUNCATION_SUFFIX_LEN: usize = 200;
const MEMORY_TOOL_USAGE_SYSTEM_PROMPT: &str = "你可以在处理任务过程中使用长期记忆工具。读取记忆时,优先使用 memory_search需要用户长期偏好、稳定事实、历史决策、持续任务上下文时,先 search已知 namespace/key 时可用 get需要浏览最近记忆时可用 list。写入或修改记忆时再使用 memory_manage。仅在遇到高价值且未来仍有用的信息时写入记忆用户长期偏好、稳定事实、用户对你的纠正、持续任务/项目上下文、明确决策。不要保存一次性工具结果、临时列表、敏感凭证或不确定推测。写入时优先使用规范 namespacepreferences、profile、tasks、decisions并优先调用 memory_manage(action='put');同一 namespace/key 可直接覆盖更新。检索时应提供 queries 数组,尽量同时放入中文关键词、英文别名,以及可能的 snake_case memory_key 词,例如 queries=['email', '邮件', 'email_folder_preference']。";
const MEMORY_TOOL_USAGE_SYSTEM_PROMPT: &str = "你可以在处理任务过程中使用长期记忆工具。读取记忆时,优先使用 memory_search用户的任务描述缺少相关指代,上下文存在模糊不清,执行任务需要用户长期偏好、稳定事实、历史决策、持续任务上下文时,先 search已知 namespace/key 时可用 get需要浏览最近记忆时可用 list。写入或修改记忆时再使用 memory_manage。仅在遇到高价值且未来仍有用的信息时写入记忆用户长期偏好、稳定事实、用户对你的纠正、持续任务/项目上下文、明确决策。不要保存一次性工具结果、临时列表、敏感凭证或不确定推测。写入时优先使用规范 namespacepreferences、profile、tasks、decisions并优先调用 memory_manage(action='put');同一 namespace/key 可直接覆盖更新。检索时应提供 queries 数组,尽量同时放入中文关键词、英文别名,以及可能的 snake_case memory_key 词,例如 queries=['email', '邮件', 'email_folder_preference']。";
const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__";
const DEFAULT_PENDING_ASSISTANT_MESSAGE: &str = "工具已经启动并进入等待用户操作的状态。请先完成外部操作,完成后直接告诉我继续。";

View File

@ -1,9 +1,11 @@
use chrono::{DateTime, Utc};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::env;
use std::fs;
use std::path::{Path, PathBuf};
use std::str::FromStr;
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct Config {
@ -13,6 +15,8 @@ pub struct Config {
#[serde(default)]
pub gateway: GatewayConfig,
#[serde(default)]
pub scheduler: SchedulerConfig,
#[serde(default)]
pub client: ClientConfig,
#[serde(default)]
pub channels: HashMap<String, FeishuChannelConfig>,
@ -148,6 +152,198 @@ pub struct ClientConfig {
pub gateway_url: String,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SchedulerConfig {
#[serde(default)]
pub enabled: bool,
#[serde(default = "default_scheduler_tick_resolution_ms")]
pub tick_resolution_ms: u64,
#[serde(default = "default_scheduler_worker_queue_capacity")]
pub worker_queue_capacity: usize,
#[serde(default)]
pub misfire_policy: SchedulerMisfirePolicy,
#[serde(default)]
pub jobs: Vec<SchedulerJobConfig>,
}
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum SchedulerMisfirePolicy {
CatchUp,
#[default]
Skip,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SchedulerJobConfig {
pub id: String,
#[serde(default = "default_scheduler_job_enabled")]
pub enabled: bool,
pub kind: SchedulerJobKind,
#[serde(default)]
pub schedule: Option<SchedulerSchedule>,
#[serde(default)]
pub startup_delay_secs: u64,
#[serde(default)]
pub interval_secs: u64,
#[serde(default)]
pub target: SchedulerJobTarget,
#[serde(default)]
pub payload: serde_json::Value,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SchedulerJobKind {
InternalEvent,
OutboundMessage,
}
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
pub struct SchedulerJobTarget {
#[serde(default)]
pub channel: Option<String>,
#[serde(default)]
pub chat_id: Option<String>,
#[serde(default)]
pub reply_to: Option<String>,
}
#[derive(Debug, Clone, Deserialize, Serialize, PartialEq, Eq)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SchedulerSchedule {
Delay {
seconds: u64,
},
Interval {
seconds: u64,
#[serde(default)]
startup_delay_secs: u64,
},
At {
timestamp: String,
},
Cron {
expression: String,
},
}
impl SchedulerJobConfig {
pub fn resolved_schedule(&self) -> Result<SchedulerSchedule, ConfigError> {
if let Some(schedule) = &self.schedule {
schedule.validate(&self.id)?;
return Ok(schedule.normalized_for_storage());
}
if self.interval_secs > 0 {
return Ok(SchedulerSchedule::Interval {
seconds: self.interval_secs,
startup_delay_secs: self.startup_delay_secs,
});
}
Err(ConfigError::InvalidSchedulerJob(format!(
"scheduler job '{}' requires schedule or interval_secs",
self.id
)))
}
}
impl SchedulerSchedule {
pub fn validate(&self, job_id: &str) -> Result<(), ConfigError> {
match self {
SchedulerSchedule::Delay { seconds } => {
if *seconds == 0 {
return Err(ConfigError::InvalidSchedulerJob(format!(
"scheduler job '{}' delay.seconds must be greater than 0",
job_id
)));
}
}
SchedulerSchedule::Interval { seconds, .. } => {
if *seconds == 0 {
return Err(ConfigError::InvalidSchedulerJob(format!(
"scheduler job '{}' interval.seconds must be greater than 0",
job_id
)));
}
}
SchedulerSchedule::At { timestamp } => {
DateTime::parse_from_rfc3339(timestamp).map_err(|err| {
ConfigError::InvalidSchedulerJob(format!(
"scheduler job '{}' invalid at.timestamp '{}': {}",
job_id, timestamp, err
))
})?;
}
SchedulerSchedule::Cron { expression } => {
parse_scheduler_cron(expression).map_err(|err| {
ConfigError::InvalidSchedulerJob(format!(
"scheduler job '{}' invalid cron.expression '{}': {}",
job_id, expression, err
))
})?;
}
}
Ok(())
}
pub fn is_one_shot(&self) -> bool {
matches!(self, SchedulerSchedule::Delay { .. } | SchedulerSchedule::At { .. })
}
pub fn normalized_for_storage(&self) -> Self {
match self {
SchedulerSchedule::At { timestamp } => {
let parsed = DateTime::parse_from_rfc3339(timestamp)
.map(|value| value.with_timezone(&Utc).to_rfc3339())
.unwrap_or_else(|_| timestamp.clone());
SchedulerSchedule::At { timestamp: parsed }
}
other => other.clone(),
}
}
pub fn display(&self) -> String {
match self {
SchedulerSchedule::Delay { seconds } => format!("delay:{}s", seconds),
SchedulerSchedule::Interval {
seconds,
startup_delay_secs,
} => format!("interval:{}s:start_delay:{}s", seconds, startup_delay_secs),
SchedulerSchedule::At { timestamp } => format!("at:{}", timestamp),
SchedulerSchedule::Cron { expression } => format!("cron:{}", expression),
}
}
}
fn parse_scheduler_cron(expression: &str) -> Result<cron::Schedule, cron::error::Error> {
let normalized = normalize_cron_expression(expression);
cron::Schedule::from_str(&normalized)
}
fn normalize_cron_expression(expression: &str) -> String {
let parts: Vec<&str> = expression.split_whitespace().collect();
if parts.len() == 5 {
format!("0 {}", expression.trim())
} else {
expression.trim().to_string()
}
}
fn default_scheduler_tick_resolution_ms() -> u64 {
1_000
}
fn default_scheduler_worker_queue_capacity() -> usize {
64
}
fn default_scheduler_job_enabled() -> bool {
true
}
fn default_gateway_host() -> String {
"127.0.0.1".to_string()
}
@ -184,6 +380,18 @@ impl Default for ClientConfig {
}
}
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
enabled: false,
tick_resolution_ms: default_scheduler_tick_resolution_ms(),
worker_queue_capacity: default_scheduler_worker_queue_capacity(),
misfire_policy: SchedulerMisfirePolicy::default(),
jobs: Vec::new(),
}
}
}
#[derive(Debug, Clone)]
pub struct LLMProviderConfig {
pub provider_type: String,
@ -268,6 +476,7 @@ pub enum ConfigError {
AgentNotFound(String),
ProviderNotFound(String),
ModelNotFound(String),
InvalidSchedulerJob(String),
}
impl std::fmt::Display for ConfigError {
@ -277,6 +486,7 @@ impl std::fmt::Display for ConfigError {
ConfigError::AgentNotFound(name) => write!(f, "Agent not found: {}", name),
ConfigError::ProviderNotFound(name) => write!(f, "Provider not found: {}", name),
ConfigError::ModelNotFound(name) => write!(f, "Model not found: {}", name),
ConfigError::InvalidSchedulerJob(message) => write!(f, "Invalid scheduler job: {}", message),
}
}
}
@ -471,4 +681,196 @@ mod tests {
let config = Config::load(file.path().to_str().unwrap()).unwrap();
assert!(config.gateway.show_tool_results);
}
#[test]
fn test_scheduler_config_defaults() {
let file = write_test_config();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
assert!(!config.scheduler.enabled);
assert_eq!(config.scheduler.tick_resolution_ms, 1_000);
assert_eq!(config.scheduler.worker_queue_capacity, 64);
assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::Skip);
assert!(config.scheduler.jobs.is_empty());
}
#[test]
fn test_scheduler_config_loads_interval_compat_jobs() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
},
"scheduler": {
"enabled": true,
"tick_resolution_ms": 500,
"worker_queue_capacity": 8,
"misfire_policy": "catch_up",
"jobs": [
{
"id": "heartbeat.reminder",
"kind": "outbound_message",
"interval_secs": 60,
"startup_delay_secs": 5,
"target": {
"channel": "feishu",
"chat_id": "oc_demo"
},
"payload": {
"content": "heartbeat"
}
}
]
}
}"#,
)
.unwrap();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
assert!(config.scheduler.enabled);
assert_eq!(config.scheduler.tick_resolution_ms, 500);
assert_eq!(config.scheduler.worker_queue_capacity, 8);
assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::CatchUp);
assert_eq!(config.scheduler.jobs.len(), 1);
let job = &config.scheduler.jobs[0];
assert_eq!(job.id, "heartbeat.reminder");
assert!(job.enabled);
assert_eq!(job.kind, SchedulerJobKind::OutboundMessage);
assert_eq!(job.interval_secs, 60);
assert_eq!(job.startup_delay_secs, 5);
assert_eq!(job.target.channel.as_deref(), Some("feishu"));
assert_eq!(job.target.chat_id.as_deref(), Some("oc_demo"));
assert_eq!(job.payload.get("content").and_then(|value| value.as_str()), Some("heartbeat"));
assert_eq!(job.resolved_schedule().unwrap(), SchedulerSchedule::Interval {
seconds: 60,
startup_delay_secs: 5,
});
}
#[test]
fn test_scheduler_config_loads_schedule_variants() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
},
"scheduler": {
"enabled": true,
"jobs": [
{
"id": "delay.job",
"kind": "internal_event",
"schedule": {
"type": "delay",
"seconds": 30
}
},
{
"id": "at.job",
"kind": "outbound_message",
"schedule": {
"type": "at",
"timestamp": "2026-04-23T09:00:00Z"
},
"target": {
"channel": "feishu",
"chat_id": "oc_demo"
},
"payload": {
"content": "at run"
}
},
{
"id": "cron.job",
"kind": "internal_event",
"schedule": {
"type": "cron",
"expression": "0 9 * * *"
}
}
]
}
}"#,
)
.unwrap();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
assert_eq!(config.scheduler.jobs.len(), 3);
assert_eq!(
config.scheduler.jobs[0].resolved_schedule().unwrap(),
SchedulerSchedule::Delay { seconds: 30 }
);
assert_eq!(
config.scheduler.jobs[1].resolved_schedule().unwrap(),
SchedulerSchedule::At {
timestamp: "2026-04-23T09:00:00+00:00".to_string(),
}
);
assert_eq!(
config.scheduler.jobs[2].resolved_schedule().unwrap(),
SchedulerSchedule::Cron {
expression: "0 9 * * *".to_string(),
}
);
}
#[test]
fn test_scheduler_schedule_validation_rejects_invalid_values() {
assert!(SchedulerSchedule::Delay { seconds: 0 }
.validate("delay.job")
.is_err());
assert!(SchedulerSchedule::Interval {
seconds: 0,
startup_delay_secs: 0,
}
.validate("interval.job")
.is_err());
assert!(SchedulerSchedule::At {
timestamp: "bad timestamp".to_string(),
}
.validate("at.job")
.is_err());
assert!(SchedulerSchedule::Cron {
expression: "bad cron".to_string(),
}
.validate("cron.job")
.is_err());
}
}

View File

@ -10,6 +10,7 @@ use crate::bus::{MessageBus, OutboundDispatcher};
use crate::channels::ChannelManager;
use crate::config::Config;
use crate::logging;
use crate::scheduler::Scheduler;
use crate::skills::SkillRuntime;
use session::{BusToolCallEmitter, SessionManager};
@ -148,6 +149,20 @@ pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn
// Start message processing (inbound processor + outbound dispatcher)
state.start_message_processing().await;
let (scheduler_shutdown_tx, scheduler_shutdown_rx) = tokio::sync::watch::channel(false);
if state.config.scheduler.enabled {
let scheduler = Scheduler::new(
state.bus.clone(),
state.config.scheduler.clone(),
state.session_manager.store(),
state.session_manager.clone(),
);
tokio::spawn(async move {
scheduler.run(scheduler_shutdown_rx).await;
});
}
// CLI args override config file values
let bind_host = host.unwrap_or_else(|| state.config.gateway.host.clone());
let bind_port = port.unwrap_or(state.config.gateway.port);
@ -169,6 +184,7 @@ pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn
tokio::spawn(async move {
tokio::signal::ctrl_c().await.ok();
tracing::info!("Shutdown signal received");
let _ = scheduler_shutdown_tx.send(true);
let _ = channel_manager.stop_all().await;
let _ = shutdown_tx.send(());
});

View File

@ -13,7 +13,7 @@ use crate::skills::SkillRuntime;
use crate::storage::{SessionRecord, SessionStore, persistent_session_id};
use crate::tools::{
BashTool, CalculatorTool, FileEditTool, FileReadTool, FileWriteTool,
HttpRequestTool, MemoryManageTool, MemorySearchTool, SkillListTool, SkillManageTool, ToolContext, ToolRegistry,
HttpRequestTool, MemoryManageTool, MemorySearchTool, SchedulerManageTool, SkillListTool, SkillManageTool, ToolContext, ToolRegistry,
WebFetchTool,
};
@ -380,7 +380,8 @@ fn default_tools(skills: Arc<SkillRuntime>, store: Arc<SessionStore>) -> ToolReg
registry.register(FileWriteTool::new());
registry.register(FileEditTool::new());
registry.register(MemorySearchTool::new(store.clone()));
registry.register(MemoryManageTool::new(store));
registry.register(MemoryManageTool::new(store.clone()));
registry.register(SchedulerManageTool::new(store));
registry.register(SkillListTool::new(skills.clone()));
registry.register(SkillManageTool::new(skills));
registry.register(BashTool::new());
@ -571,6 +572,29 @@ impl SessionManager {
inner.session_timestamps.insert(channel_name.to_string(), Instant::now());
}
pub async fn cleanup_expired_sessions(&self) -> usize {
let mut inner = self.inner.lock().await;
let now = Instant::now();
let expired_channels: Vec<String> = inner
.session_timestamps
.iter()
.filter_map(|(channel_name, last_active)| {
if now.duration_since(*last_active) > inner.session_ttl {
Some(channel_name.clone())
} else {
None
}
})
.collect();
for channel_name in &expired_channels {
inner.sessions.remove(channel_name);
inner.session_timestamps.remove(channel_name);
}
expired_channels.len()
}
/// 处理消息:路由到对应 session 的 agent
pub async fn handle_message(
&self,

View File

@ -9,6 +9,7 @@ pub mod protocol;
pub mod channels;
pub mod logging;
pub mod observability;
pub mod scheduler;
pub mod storage;
pub mod tools;
pub mod skills;

558
src/scheduler/mod.rs Normal file
View File

@ -0,0 +1,558 @@
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc};
use tokio::sync::watch;
use crate::bus::{MessageBus, OutboundMessage};
use crate::config::{
SchedulerConfig, SchedulerJobConfig, SchedulerJobKind, SchedulerJobTarget, SchedulerMisfirePolicy,
SchedulerSchedule,
};
use crate::gateway::session::SessionManager;
use crate::storage::{
SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore,
};
pub struct Scheduler {
bus: Arc<MessageBus>,
config: SchedulerConfig,
store: Arc<SessionStore>,
session_manager: SessionManager,
}
impl Scheduler {
pub fn new(
bus: Arc<MessageBus>,
config: SchedulerConfig,
store: Arc<SessionStore>,
session_manager: SessionManager,
) -> Self {
Self {
bus,
config,
store,
session_manager,
}
}
pub async fn run(&self, mut shutdown_rx: watch::Receiver<bool>) {
if !self.config.enabled {
tracing::info!("Scheduler disabled; skipping startup");
return;
}
if let Err(error) = self.sync_config_jobs() {
tracing::error!(error = %error, "Failed to sync scheduler config jobs");
}
let tick_ms = self.config.tick_resolution_ms.max(100);
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(tick_ms));
tracing::info!(tick_resolution_ms = tick_ms, "Scheduler started");
loop {
tokio::select! {
_ = ticker.tick() => {
if let Err(error) = self.process_tick().await {
tracing::error!(error = %error, "Scheduler tick failed");
}
}
changed = shutdown_rx.changed() => {
if changed.is_ok() && *shutdown_rx.borrow() {
tracing::info!("Scheduler shutdown requested");
break;
}
}
}
}
}
fn sync_config_jobs(&self) -> anyhow::Result<()> {
let now = Utc::now();
for job in &self.config.jobs {
let runtime = RuntimeJob::from_config(job, now, self.config.misfire_policy)?;
self.store.upsert_scheduler_job(&runtime.to_upsert())?;
}
Ok(())
}
async fn process_tick(&self) -> anyhow::Result<()> {
let now = Utc::now();
let jobs = self.store.list_scheduler_jobs(true)?;
for record in jobs {
let Some(mut job) = RuntimeJob::from_record(&record, self.config.misfire_policy)? else {
continue;
};
if !job.is_due(now) {
continue;
}
self.store.update_scheduler_job_runtime(
&job.id,
SchedulerJobState::Running,
job.last_status.clone(),
job.last_error.as_deref(),
job.run_count,
job.last_fired_at,
job.next_fire_at,
job.paused_at,
job.completed_at,
)?;
let execution_result = self.execute_job(&job).await;
job.after_execution(now, execution_result.as_ref().err().map(|err| err.to_string()), self.config.misfire_policy)?;
let status = if execution_result.is_ok() {
Some(SchedulerJobStatus::Ok)
} else {
Some(SchedulerJobStatus::Error)
};
if let Err(error) = &execution_result {
tracing::error!(job_id = %job.id, error = %error, "Scheduler job failed");
}
self.store.update_scheduler_job_runtime(
&job.id,
job.state.clone(),
status,
job.last_error.as_deref(),
job.run_count,
job.last_fired_at,
job.next_fire_at,
job.paused_at,
job.completed_at,
)?;
}
Ok(())
}
async fn execute_job(&self, job: &RuntimeJob) -> anyhow::Result<()> {
match job.kind {
SchedulerJobKind::OutboundMessage => {
let message = build_outbound_message(job)?;
self.bus.publish_outbound(message).await?;
}
SchedulerJobKind::InternalEvent => {
execute_internal_event(&self.session_manager, job).await?;
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
struct RuntimeJob {
id: String,
kind: SchedulerJobKind,
schedule: SchedulerSchedule,
target: SchedulerJobTarget,
payload: serde_json::Value,
enabled: bool,
state: SchedulerJobState,
last_status: Option<SchedulerJobStatus>,
last_error: Option<String>,
run_count: i64,
max_runs: Option<i64>,
last_fired_at: Option<i64>,
next_fire_at: Option<i64>,
paused_at: Option<i64>,
completed_at: Option<i64>,
interval_secs: i64,
startup_delay_secs: i64,
}
impl RuntimeJob {
fn from_config(
job: &SchedulerJobConfig,
now: DateTime<Utc>,
misfire_policy: SchedulerMisfirePolicy,
) -> anyhow::Result<Self> {
let schedule = job.resolved_schedule()?;
let initial_state = if job.enabled {
SchedulerJobState::Scheduled
} else {
SchedulerJobState::Paused
};
let next_fire_at = if job.enabled {
compute_initial_next_fire_at(&schedule, now, None, misfire_policy)?
} else {
None
};
Ok(Self {
id: job.id.clone(),
kind: job.kind.clone(),
interval_secs: job.interval_secs as i64,
startup_delay_secs: job.startup_delay_secs as i64,
schedule,
target: job.target.clone(),
payload: job.payload.clone(),
enabled: job.enabled,
state: initial_state,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at,
paused_at: None,
completed_at: None,
})
}
fn from_record(
record: &SchedulerJobRecord,
misfire_policy: SchedulerMisfirePolicy,
) -> anyhow::Result<Option<Self>> {
let kind = match record.kind.as_str() {
"internal_event" => SchedulerJobKind::InternalEvent,
"outbound_message" => SchedulerJobKind::OutboundMessage,
other => {
tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind");
return Ok(None);
}
};
let schedule = deserialize_schedule(&record.schedule, record.interval_secs, record.startup_delay_secs)?;
let now = Utc::now();
let next_fire_at = match (record.enabled, record.state.clone(), record.next_fire_at) {
(false, _, _) => None,
(_, SchedulerJobState::Paused, _) => None,
(_, SchedulerJobState::Completed, _) => None,
(_, _, some_next) if some_next.is_some() => some_next,
_ => compute_initial_next_fire_at(&schedule, now, record.last_fired_at, misfire_policy)?,
};
Ok(Some(Self {
id: record.id.clone(),
kind,
schedule,
target: record.target.clone().try_into()?,
payload: record.payload.clone(),
enabled: record.enabled,
state: record.state.clone(),
last_status: record.last_status.clone(),
last_error: record.last_error.clone(),
run_count: record.run_count,
max_runs: record.max_runs,
last_fired_at: record.last_fired_at,
next_fire_at,
paused_at: record.paused_at,
completed_at: record.completed_at,
interval_secs: record.interval_secs,
startup_delay_secs: record.startup_delay_secs,
}))
}
fn is_due(&self, now: DateTime<Utc>) -> bool {
self.enabled
&& self.state == SchedulerJobState::Scheduled
&& self.next_fire_at.map(|value| value <= now.timestamp_millis()).unwrap_or(false)
}
fn after_execution(
&mut self,
now: DateTime<Utc>,
last_error: Option<String>,
misfire_policy: SchedulerMisfirePolicy,
) -> anyhow::Result<()> {
self.run_count += 1;
self.last_fired_at = Some(now.timestamp_millis());
self.last_error = last_error;
if self.schedule.is_one_shot() {
self.state = SchedulerJobState::Completed;
self.next_fire_at = None;
self.completed_at = Some(now.timestamp_millis());
return Ok(());
}
if let Some(max_runs) = self.max_runs {
if self.run_count >= max_runs {
self.state = SchedulerJobState::Completed;
self.next_fire_at = None;
self.completed_at = Some(now.timestamp_millis());
return Ok(());
}
}
let reference_ms = self.next_fire_at.or(self.last_fired_at);
self.state = SchedulerJobState::Scheduled;
self.completed_at = None;
self.next_fire_at = compute_next_fire_at(&self.schedule, now, reference_ms, misfire_policy)?;
Ok(())
}
fn to_upsert(&self) -> SchedulerJobUpsert {
SchedulerJobUpsert {
id: self.id.clone(),
kind: match self.kind {
SchedulerJobKind::InternalEvent => "internal_event".to_string(),
SchedulerJobKind::OutboundMessage => "outbound_message".to_string(),
},
schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})),
interval_secs: self.interval_secs,
startup_delay_secs: self.startup_delay_secs,
target: serde_json::to_value(&self.target).unwrap_or_else(|_| serde_json::json!({})),
payload: self.payload.clone(),
enabled: self.enabled,
state: self.state.clone(),
last_status: self.last_status.clone(),
last_error: self.last_error.clone(),
run_count: self.run_count,
max_runs: self.max_runs,
last_fired_at: self.last_fired_at,
next_fire_at: self.next_fire_at,
paused_at: self.paused_at,
completed_at: self.completed_at,
}
}
}
fn deserialize_schedule(
schedule_json: &serde_json::Value,
interval_secs: i64,
startup_delay_secs: i64,
) -> anyhow::Result<SchedulerSchedule> {
if !schedule_json.is_null() && schedule_json != &serde_json::json!({}) {
return Ok(serde_json::from_value(schedule_json.clone())?);
}
if interval_secs > 0 {
return Ok(SchedulerSchedule::Interval {
seconds: interval_secs as u64,
startup_delay_secs: startup_delay_secs as u64,
});
}
anyhow::bail!("scheduler job is missing schedule definition")
}
fn compute_initial_next_fire_at(
schedule: &SchedulerSchedule,
now: DateTime<Utc>,
last_fired_at: Option<i64>,
misfire_policy: SchedulerMisfirePolicy,
) -> anyhow::Result<Option<i64>> {
match last_fired_at {
Some(last_fired_at) => compute_next_fire_at(schedule, now, Some(last_fired_at), misfire_policy),
None => match schedule {
SchedulerSchedule::Delay { seconds } => Ok(Some((now + ChronoDuration::seconds(*seconds as i64)).timestamp_millis())),
SchedulerSchedule::Interval {
seconds,
startup_delay_secs,
} => {
let delay = if *startup_delay_secs > 0 { *startup_delay_secs } else { *seconds };
Ok(Some((now + ChronoDuration::seconds(delay as i64)).timestamp_millis()))
}
SchedulerSchedule::At { timestamp } => Ok(Some(parse_rfc3339_to_utc(timestamp)?.timestamp_millis())),
SchedulerSchedule::Cron { expression } => {
let schedule = parse_scheduler_cron(expression)?;
Ok(schedule.after(&now).next().map(|next| next.timestamp_millis()))
}
},
}
}
fn compute_next_fire_at(
schedule: &SchedulerSchedule,
now: DateTime<Utc>,
reference_ms: Option<i64>,
misfire_policy: SchedulerMisfirePolicy,
) -> anyhow::Result<Option<i64>> {
match schedule {
SchedulerSchedule::Delay { .. } | SchedulerSchedule::At { .. } => Ok(None),
SchedulerSchedule::Interval { seconds, .. } => {
let interval_ms = (*seconds as i64) * 1_000;
let baseline = reference_ms.unwrap_or_else(|| now.timestamp_millis());
let next_ms = match misfire_policy {
SchedulerMisfirePolicy::Skip => now.timestamp_millis() + interval_ms,
SchedulerMisfirePolicy::CatchUp => {
let mut candidate = baseline + interval_ms;
while candidate <= now.timestamp_millis() {
candidate += interval_ms;
}
candidate
}
};
Ok(Some(next_ms))
}
SchedulerSchedule::Cron { expression } => {
let schedule = parse_scheduler_cron(expression)?;
let anchor = match misfire_policy {
SchedulerMisfirePolicy::Skip => now,
SchedulerMisfirePolicy::CatchUp => reference_ms
.and_then(ts_millis_to_utc)
.unwrap_or(now),
};
Ok(schedule.after(&anchor).next().map(|next| next.timestamp_millis()))
}
}
}
fn parse_rfc3339_to_utc(value: &str) -> anyhow::Result<DateTime<Utc>> {
Ok(DateTime::parse_from_rfc3339(value)?.with_timezone(&Utc))
}
fn parse_scheduler_cron(expression: &str) -> anyhow::Result<cron::Schedule> {
let normalized = normalize_cron_expression(expression);
Ok(cron::Schedule::from_str(&normalized)?)
}
fn normalize_cron_expression(expression: &str) -> String {
let parts: Vec<&str> = expression.split_whitespace().collect();
if parts.len() == 5 {
format!("0 {}", expression.trim())
} else {
expression.trim().to_string()
}
}
fn ts_millis_to_utc(value: i64) -> Option<DateTime<Utc>> {
Utc.timestamp_millis_opt(value).single()
}
fn build_outbound_message(job: &RuntimeJob) -> anyhow::Result<OutboundMessage> {
let channel = job
.target
.channel
.clone()
.ok_or_else(|| anyhow::anyhow!("outbound scheduler job requires target.channel"))?;
let chat_id = job
.target
.chat_id
.clone()
.ok_or_else(|| anyhow::anyhow!("outbound scheduler job requires target.chat_id"))?;
let content = job
.payload
.get("content")
.and_then(|value| value.as_str())
.ok_or_else(|| anyhow::anyhow!("outbound scheduler job payload.content must be a string"))?;
let mut metadata = HashMap::new();
metadata.insert("scheduler_job_id".to_string(), job.id.clone());
Ok(OutboundMessage::assistant(
channel,
chat_id,
content.to_string(),
job.target.reply_to.clone(),
metadata,
))
}
async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJob) -> anyhow::Result<()> {
let event = job
.payload
.get("event")
.and_then(|value| value.as_str())
.unwrap_or("session_cleanup");
match event {
"session_cleanup" => {
let removed = session_manager.cleanup_expired_sessions().await;
tracing::info!(job_id = %job.id, removed, "Scheduler session cleanup completed");
Ok(())
}
other => anyhow::bail!("unsupported internal scheduler event: {}", other),
}
}
impl TryFrom<serde_json::Value> for SchedulerJobTarget {
type Error = anyhow::Error;
fn try_from(value: serde_json::Value) -> Result<Self, Self::Error> {
Ok(serde_json::from_value(value)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn runtime_job_skip_policy_advances_from_now() {
let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap();
let next = compute_next_fire_at(
&SchedulerSchedule::Interval {
seconds: 60,
startup_delay_secs: 0,
},
now,
Some(now.timestamp_millis() - 10 * 60 * 1_000),
SchedulerMisfirePolicy::Skip,
)
.unwrap()
.unwrap();
assert_eq!(next, now.timestamp_millis() + 60_000);
}
#[test]
fn runtime_job_catch_up_policy_moves_past_now() {
let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap();
let next = compute_next_fire_at(
&SchedulerSchedule::Interval {
seconds: 60,
startup_delay_secs: 0,
},
now,
Some(now.timestamp_millis() - 10 * 60 * 1_000),
SchedulerMisfirePolicy::CatchUp,
)
.unwrap()
.unwrap();
assert!(next > now.timestamp_millis());
assert_eq!((next - now.timestamp_millis()) % 60_000, 0);
}
#[test]
fn runtime_job_from_record_uses_persisted_schedule() {
let record = SchedulerJobRecord {
id: "heartbeat".to_string(),
kind: "outbound_message".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 120,
"startup_delay_secs": 10
}),
interval_secs: 0,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "feishu",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({"content": "hello"}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: Some(1_700_000_010_000),
paused_at: None,
completed_at: None,
created_at: 1_700_000_000_000,
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip)
.unwrap()
.unwrap();
assert_eq!(job.schedule, SchedulerSchedule::Interval {
seconds: 120,
startup_delay_secs: 10,
});
assert_eq!(job.next_fire_at, Some(1_700_000_010_000));
}
}

View File

@ -77,11 +77,118 @@ pub struct MemoryUpsert {
pub source_chat_id: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SchedulerJobState {
Scheduled,
Running,
Paused,
Completed,
}
impl SchedulerJobState {
pub fn as_str(&self) -> &'static str {
match self {
SchedulerJobState::Scheduled => "scheduled",
SchedulerJobState::Running => "running",
SchedulerJobState::Paused => "paused",
SchedulerJobState::Completed => "completed",
}
}
pub fn from_str(value: &str) -> Option<Self> {
match value {
"scheduled" => Some(Self::Scheduled),
"running" => Some(Self::Running),
"paused" => Some(Self::Paused),
"completed" => Some(Self::Completed),
_ => None,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum SchedulerJobStatus {
Ok,
Error,
Skipped,
}
impl SchedulerJobStatus {
pub fn as_str(&self) -> &'static str {
match self {
SchedulerJobStatus::Ok => "ok",
SchedulerJobStatus::Error => "error",
SchedulerJobStatus::Skipped => "skipped",
}
}
pub fn from_str(value: &str) -> Option<Self> {
match value {
"ok" => Some(Self::Ok),
"error" => Some(Self::Error),
"skipped" => Some(Self::Skipped),
_ => None,
}
}
}
impl Default for SchedulerJobState {
fn default() -> Self {
Self::Scheduled
}
}
#[derive(Clone)]
pub struct SessionStore {
conn: Arc<Mutex<Connection>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SchedulerJobRecord {
pub id: String,
pub kind: String,
pub schedule: serde_json::Value,
pub interval_secs: i64,
pub startup_delay_secs: i64,
pub target: serde_json::Value,
pub payload: serde_json::Value,
pub enabled: bool,
pub state: SchedulerJobState,
pub last_status: Option<SchedulerJobStatus>,
pub last_error: Option<String>,
pub run_count: i64,
pub max_runs: Option<i64>,
pub last_fired_at: Option<i64>,
pub next_fire_at: Option<i64>,
pub paused_at: Option<i64>,
pub completed_at: Option<i64>,
pub created_at: i64,
pub updated_at: i64,
}
#[derive(Debug, Clone)]
pub struct SchedulerJobUpsert {
pub id: String,
pub kind: String,
pub schedule: serde_json::Value,
pub interval_secs: i64,
pub startup_delay_secs: i64,
pub target: serde_json::Value,
pub payload: serde_json::Value,
pub enabled: bool,
pub state: SchedulerJobState,
pub last_status: Option<SchedulerJobStatus>,
pub last_error: Option<String>,
pub run_count: i64,
pub max_runs: Option<i64>,
pub last_fired_at: Option<i64>,
pub next_fire_at: Option<i64>,
pub paused_at: Option<i64>,
pub completed_at: Option<i64>,
}
impl SessionStore {
pub fn new() -> Result<Self, StorageError> {
let db_path = default_session_db_path()?;
@ -185,6 +292,31 @@ impl SessionStore {
CREATE INDEX IF NOT EXISTS idx_memories_source_session
ON memories(source_session_id, updated_at DESC);
CREATE TABLE IF NOT EXISTS scheduler_jobs (
id TEXT PRIMARY KEY,
kind TEXT NOT NULL,
schedule_json TEXT NOT NULL DEFAULT '{}',
interval_secs INTEGER NOT NULL DEFAULT 0,
startup_delay_secs INTEGER NOT NULL DEFAULT 0,
target_json TEXT NOT NULL,
payload_json TEXT NOT NULL,
enabled INTEGER NOT NULL DEFAULT 1,
state TEXT NOT NULL DEFAULT 'scheduled',
last_status TEXT,
last_error TEXT,
run_count INTEGER NOT NULL DEFAULT 0,
max_runs INTEGER,
last_fired_at INTEGER,
next_fire_at INTEGER,
paused_at INTEGER,
completed_at INTEGER,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_scheduler_jobs_enabled_next_fire
ON scheduler_jobs(enabled, state, next_fire_at ASC);
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
namespace,
memory_key,
@ -213,6 +345,7 @@ impl SessionStore {
)?;
ensure_sessions_schema(&conn)?;
ensure_scheduler_schema(&conn)?;
Ok(Self {
conn: Arc::new(Mutex::new(conn)),
@ -717,6 +850,162 @@ impl SessionStore {
Ok(changed > 0)
}
pub fn upsert_scheduler_job(&self, input: &SchedulerJobUpsert) -> Result<SchedulerJobRecord, StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"
INSERT INTO scheduler_jobs (
id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
) VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?14, ?15, ?16, ?17, ?18, ?18)
ON CONFLICT(id) DO UPDATE SET
kind = excluded.kind,
schedule_json = excluded.schedule_json,
interval_secs = excluded.interval_secs,
startup_delay_secs = excluded.startup_delay_secs,
target_json = excluded.target_json,
payload_json = excluded.payload_json,
enabled = excluded.enabled,
state = excluded.state,
last_status = excluded.last_status,
last_error = excluded.last_error,
run_count = excluded.run_count,
max_runs = excluded.max_runs,
last_fired_at = excluded.last_fired_at,
next_fire_at = excluded.next_fire_at,
paused_at = excluded.paused_at,
completed_at = excluded.completed_at,
updated_at = excluded.updated_at
",
params![
input.id,
input.kind,
serde_json::to_string(&input.schedule)?,
input.interval_secs,
input.startup_delay_secs,
serde_json::to_string(&input.target)?,
serde_json::to_string(&input.payload)?,
if input.enabled { 1 } else { 0 },
input.state.as_str(),
input.last_status.as_ref().map(SchedulerJobStatus::as_str),
input.last_error,
input.run_count,
input.max_runs,
input.last_fired_at,
input.next_fire_at,
input.paused_at,
input.completed_at,
now,
],
)?;
drop(conn);
self.get_scheduler_job(&input.id)?
.ok_or_else(|| rusqlite::Error::QueryReturnedNoRows.into())
}
pub fn get_scheduler_job(&self, job_id: &str) -> Result<Option<SchedulerJobRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
FROM scheduler_jobs
WHERE id = ?1
",
)?;
stmt.query_row(params![job_id], map_scheduler_job_record)
.optional()
.map_err(StorageError::from)
}
pub fn list_scheduler_jobs(&self, enabled_only: bool) -> Result<Vec<SchedulerJobRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let sql = if enabled_only {
"
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
FROM scheduler_jobs
WHERE enabled = 1
ORDER BY COALESCE(next_fire_at, created_at) ASC, id ASC
"
} else {
"
SELECT id, kind, schedule_json, interval_secs, startup_delay_secs,
target_json, payload_json, enabled, state, last_status, last_error,
run_count, max_runs, last_fired_at, next_fire_at, paused_at, completed_at,
created_at, updated_at
FROM scheduler_jobs
ORDER BY COALESCE(next_fire_at, created_at) ASC, id ASC
"
};
let mut stmt = conn.prepare(sql)?;
let rows = stmt.query_map([], map_scheduler_job_record)?;
let mut jobs = Vec::new();
for row in rows {
jobs.push(row?);
}
Ok(jobs)
}
pub fn delete_scheduler_job(&self, job_id: &str) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute("DELETE FROM scheduler_jobs WHERE id = ?1", params![job_id])?;
Ok(())
}
pub fn update_scheduler_job_runtime(
&self,
job_id: &str,
state: SchedulerJobState,
last_status: Option<SchedulerJobStatus>,
last_error: Option<&str>,
run_count: i64,
last_fired_at: Option<i64>,
next_fire_at: Option<i64>,
paused_at: Option<i64>,
completed_at: Option<i64>,
) -> Result<(), StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
conn.execute(
"
UPDATE scheduler_jobs
SET state = ?2,
last_status = ?3,
last_error = ?4,
run_count = ?5,
last_fired_at = ?6,
next_fire_at = ?7,
paused_at = ?8,
completed_at = ?9,
updated_at = ?10
WHERE id = ?1
",
params![
job_id,
state.as_str(),
last_status.as_ref().map(SchedulerJobStatus::as_str),
last_error,
run_count,
last_fired_at,
next_fire_at,
paused_at,
completed_at,
current_timestamp(),
],
)?;
Ok(())
}
pub fn search_memories(
&self,
scope_kind: &str,
@ -933,6 +1222,64 @@ fn map_memory_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<MemoryRecord>
})
}
fn map_scheduler_job_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<SchedulerJobRecord> {
let schedule_json: String = row.get(2)?;
let target_json: String = row.get(5)?;
let payload_json: String = row.get(6)?;
let state: String = row.get(8)?;
let last_status: Option<String> = row.get(9)?;
let schedule = serde_json::from_str(&schedule_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
2,
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
let target = serde_json::from_str(&target_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
5,
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
let payload = serde_json::from_str(&payload_json).map_err(|err| {
rusqlite::Error::FromSqlConversionFailure(
6,
rusqlite::types::Type::Text,
Box::new(err),
)
})?;
Ok(SchedulerJobRecord {
id: row.get(0)?,
kind: row.get(1)?,
schedule,
interval_secs: row.get(3)?,
startup_delay_secs: row.get(4)?,
target,
payload,
enabled: row.get::<_, i64>(7)? != 0,
state: SchedulerJobState::from_str(&state).ok_or_else(|| {
rusqlite::Error::FromSqlConversionFailure(
8,
rusqlite::types::Type::Text,
format!("invalid scheduler job state: {}", state).into(),
)
})?,
last_status: last_status.and_then(|value| SchedulerJobStatus::from_str(&value)),
last_error: row.get(10)?,
run_count: row.get(11)?,
max_runs: row.get(12)?,
last_fired_at: row.get(13)?,
next_fire_at: row.get(14)?,
paused_at: row.get(15)?,
completed_at: row.get(16)?,
created_at: row.get(17)?,
updated_at: row.get(18)?,
})
}
fn ensure_sessions_schema(conn: &Connection) -> Result<(), StorageError> {
if !has_column(conn, "sessions", "reset_cutoff_seq")? {
conn.execute(
@ -958,6 +1305,66 @@ fn ensure_sessions_schema(conn: &Connection) -> Result<(), StorageError> {
Ok(())
}
fn ensure_scheduler_schema(conn: &Connection) -> Result<(), StorageError> {
if !has_column(conn, "scheduler_jobs", "schedule_json")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN schedule_json TEXT NOT NULL DEFAULT '{}'",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "state")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN state TEXT NOT NULL DEFAULT 'scheduled'",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "last_status")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN last_status TEXT",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "last_error")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN last_error TEXT",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "run_count")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN run_count INTEGER NOT NULL DEFAULT 0",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "max_runs")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN max_runs INTEGER",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "paused_at")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN paused_at INTEGER",
[],
)?;
}
if !has_column(conn, "scheduler_jobs", "completed_at")? {
conn.execute(
"ALTER TABLE scheduler_jobs ADD COLUMN completed_at INTEGER",
[],
)?;
}
Ok(())
}
fn has_column(conn: &Connection, table_name: &str, column_name: &str) -> Result<bool, StorageError> {
let pragma = format!("PRAGMA table_info({})", table_name);
let mut stmt = conn.prepare(&pragma)?;
@ -1503,4 +1910,65 @@ mod tests {
assert!(hits.iter().any(|memory| memory.memory_key == "editor"));
assert!(hits.iter().any(|memory| memory.memory_key == "quality"));
}
#[test]
fn test_scheduler_job_roundtrip_and_runtime_update() {
let store = SessionStore::in_memory().unwrap();
let saved = store
.upsert_scheduler_job(&SchedulerJobUpsert {
id: "heartbeat".to_string(),
kind: "outbound_message".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 300,
"startup_delay_secs": 10,
}),
interval_secs: 300,
startup_delay_secs: 10,
target: serde_json::json!({
"channel": "feishu",
"chat_id": "oc_demo",
}),
payload: serde_json::json!({
"content": "heartbeat",
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: Some(3),
last_fired_at: None,
next_fire_at: Some(1_700_000_000_000),
paused_at: None,
completed_at: None,
})
.unwrap();
assert_eq!(saved.id, "heartbeat");
assert_eq!(saved.kind, "outbound_message");
assert_eq!(saved.state, SchedulerJobState::Scheduled);
assert_eq!(saved.max_runs, Some(3));
store
.update_scheduler_job_runtime(
"heartbeat",
SchedulerJobState::Completed,
Some(SchedulerJobStatus::Ok),
None,
1,
Some(1_700_000_000_000),
None,
None,
Some(1_700_000_000_100),
)
.unwrap();
let fetched = store.get_scheduler_job("heartbeat").unwrap().unwrap();
assert_eq!(fetched.state, SchedulerJobState::Completed);
assert_eq!(fetched.last_status, Some(SchedulerJobStatus::Ok));
assert_eq!(fetched.run_count, 1);
assert_eq!(fetched.completed_at, Some(1_700_000_000_100));
}
}

View File

@ -7,6 +7,7 @@ pub mod http_request;
pub mod memory_manage;
pub mod memory_search;
pub mod registry;
pub mod scheduler_manage;
pub mod schema;
pub mod skill_manage;
pub mod traits;
@ -21,6 +22,7 @@ pub use http_request::HttpRequestTool;
pub use memory_manage::MemoryManageTool;
pub use memory_search::MemorySearchTool;
pub use registry::ToolRegistry;
pub use scheduler_manage::SchedulerManageTool;
pub use schema::{CleaningStrategy, SchemaCleanr};
pub use skill_manage::{SkillListTool, SkillManageTool};
pub use traits::{Tool, ToolContext, ToolResult};

View File

@ -0,0 +1,291 @@
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::json;
use crate::config::SchedulerSchedule;
use crate::storage::{
SchedulerJobRecord, SchedulerJobState, SchedulerJobUpsert, SessionStore,
};
use crate::tools::traits::{Tool, ToolResult};
pub struct SchedulerManageTool {
store: Arc<SessionStore>,
}
impl SchedulerManageTool {
pub fn new(store: Arc<SessionStore>) -> Self {
Self { store }
}
}
#[async_trait]
impl Tool for SchedulerManageTool {
fn name(&self) -> &str {
"scheduler_manage"
}
fn description(&self) -> &str {
"Manage DB-backed scheduled jobs. Supports actions: list, get, put, delete, pause, resume. Jobs persist in SQLite and are executed by the scheduler runtime."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list", "get", "put", "delete", "pause", "resume"]
},
"id": {
"type": "string",
"description": "Job id"
},
"enabled_only": {
"type": "boolean",
"description": "Only for list action"
},
"kind": {
"type": "string",
"enum": ["internal_event", "outbound_message"]
},
"schedule": {
"type": "object",
"description": "Schedule object, for example {type: 'interval', seconds: 300} or {type: 'cron', expression: '0 9 * * *'}"
},
"target": {
"type": "object"
},
"payload": {
"type": "object"
},
"max_runs": {
"type": ["integer", "null"]
}
},
"required": ["action"]
})
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let action = match args.get("action").and_then(|value| value.as_str()) {
Some(action) => action,
None => return Ok(error_result("Missing required parameter: action")),
};
let output = match action {
"list" => {
let enabled_only = args
.get("enabled_only")
.and_then(|value| value.as_bool())
.unwrap_or(false);
let jobs = self.store.list_scheduler_jobs(enabled_only)?;
json!(jobs.iter().map(record_to_json).collect::<Vec<_>>())
}
"get" => {
let id = require_str(&args, "id")?;
match self.store.get_scheduler_job(id)? {
Some(record) => record_to_json(&record),
None => return Ok(error_result(&format!("scheduler job '{}' not found", id))),
}
}
"put" => {
let input = build_upsert(&args)?;
let record = self.store.upsert_scheduler_job(&input)?;
record_to_json(&record)
}
"delete" => {
let id = require_str(&args, "id")?;
self.store.delete_scheduler_job(id)?;
json!({"status": "deleted", "id": id})
}
"pause" => {
let id = require_str(&args, "id")?;
let record = self
.store
.get_scheduler_job(id)?
.ok_or_else(|| anyhow::anyhow!("scheduler job '{}' not found", id))?;
let mut input = record_to_upsert(&record);
input.enabled = false;
input.state = SchedulerJobState::Paused;
input.paused_at = Some(current_timestamp());
input.next_fire_at = None;
let saved = self.store.upsert_scheduler_job(&input)?;
record_to_json(&saved)
}
"resume" => {
let id = require_str(&args, "id")?;
let record = self
.store
.get_scheduler_job(id)?
.ok_or_else(|| anyhow::anyhow!("scheduler job '{}' not found", id))?;
let mut input = record_to_upsert(&record);
input.enabled = true;
input.state = SchedulerJobState::Scheduled;
input.paused_at = None;
input.completed_at = None;
input.next_fire_at = None;
let saved = self.store.upsert_scheduler_job(&input)?;
record_to_json(&saved)
}
_ => return Ok(error_result("Unsupported action")),
};
Ok(ToolResult {
success: true,
output: serde_json::to_string_pretty(&output)?,
error: None,
})
}
}
fn build_upsert(args: &serde_json::Value) -> anyhow::Result<SchedulerJobUpsert> {
let id = require_str(args, "id")?.to_string();
let kind = require_str(args, "kind")?.to_string();
let schedule_value = args
.get("schedule")
.cloned()
.ok_or_else(|| anyhow::anyhow!("Missing required parameter: schedule"))?;
let schedule: SchedulerSchedule = serde_json::from_value(schedule_value.clone())?;
schedule.validate(&id)?;
let (interval_secs, startup_delay_secs) = match &schedule {
SchedulerSchedule::Interval {
seconds,
startup_delay_secs,
} => (*seconds as i64, *startup_delay_secs as i64),
_ => (0, 0),
};
Ok(SchedulerJobUpsert {
id,
kind,
schedule: serde_json::to_value(schedule)?,
interval_secs,
startup_delay_secs,
target: args.get("target").cloned().unwrap_or_else(|| json!({})),
payload: args.get("payload").cloned().unwrap_or_else(|| json!({})),
enabled: args.get("enabled").and_then(|value| value.as_bool()).unwrap_or(true),
state: if args.get("enabled").and_then(|value| value.as_bool()).unwrap_or(true) {
SchedulerJobState::Scheduled
} else {
SchedulerJobState::Paused
},
last_status: None,
last_error: None,
run_count: 0,
max_runs: args.get("max_runs").and_then(|value| value.as_i64()),
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
})
}
fn record_to_json(record: &SchedulerJobRecord) -> serde_json::Value {
json!({
"id": record.id,
"kind": record.kind,
"schedule": record.schedule,
"target": record.target,
"payload": record.payload,
"enabled": record.enabled,
"state": record.state,
"last_status": record.last_status,
"last_error": record.last_error,
"run_count": record.run_count,
"max_runs": record.max_runs,
"last_fired_at": record.last_fired_at,
"next_fire_at": record.next_fire_at,
"paused_at": record.paused_at,
"completed_at": record.completed_at,
"created_at": record.created_at,
"updated_at": record.updated_at,
})
}
fn record_to_upsert(record: &SchedulerJobRecord) -> SchedulerJobUpsert {
SchedulerJobUpsert {
id: record.id.clone(),
kind: record.kind.clone(),
schedule: record.schedule.clone(),
interval_secs: record.interval_secs,
startup_delay_secs: record.startup_delay_secs,
target: record.target.clone(),
payload: record.payload.clone(),
enabled: record.enabled,
state: record.state.clone(),
last_status: record.last_status.clone(),
last_error: record.last_error.clone(),
run_count: record.run_count,
max_runs: record.max_runs,
last_fired_at: record.last_fired_at,
next_fire_at: record.next_fire_at,
paused_at: record.paused_at,
completed_at: record.completed_at,
}
}
fn require_str<'a>(args: &'a serde_json::Value, key: &str) -> anyhow::Result<&'a str> {
args.get(key)
.and_then(|value| value.as_str())
.ok_or_else(|| anyhow::anyhow!("Missing required parameter: {}", key))
}
fn error_result(message: &str) -> ToolResult {
ToolResult {
success: false,
output: String::new(),
error: Some(message.to_string()),
}
}
fn current_timestamp() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_scheduler_manage_put_and_get() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let tool = SchedulerManageTool::new(store);
let put_result = tool
.execute(json!({
"action": "put",
"id": "heartbeat",
"kind": "outbound_message",
"schedule": {
"type": "interval",
"seconds": 60
},
"target": {
"channel": "feishu",
"chat_id": "oc_demo"
},
"payload": {
"content": "ping"
}
}))
.await
.unwrap();
assert!(put_result.success);
let get_result = tool
.execute(json!({
"action": "get",
"id": "heartbeat"
}))
.await
.unwrap();
assert!(get_result.success);
assert!(get_result.output.contains("heartbeat"));
assert!(get_result.output.contains("outbound_message"));
}
}