967 lines
33 KiB
Rust

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc};
use chrono_tz::Tz;
use tokio::sync::watch;
use crate::bus::{MessageBus, OutboundMessage};
use crate::config::{
SchedulerConfig, SchedulerJobConfig, SchedulerJobKind, SchedulerJobTarget, SchedulerMisfirePolicy,
SchedulerSchedule,
};
use crate::gateway::session::SessionManager;
use crate::gateway::session::ScheduledAgentTaskOptions;
use crate::storage::{
SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore,
};
pub struct Scheduler {
bus: Arc<MessageBus>,
config: SchedulerConfig,
timezone: Tz,
store: Arc<SessionStore>,
session_manager: SessionManager,
}
impl Scheduler {
pub fn new(
bus: Arc<MessageBus>,
config: SchedulerConfig,
timezone: Tz,
store: Arc<SessionStore>,
session_manager: SessionManager,
) -> Self {
Self {
bus,
config,
timezone,
store,
session_manager,
}
}
pub async fn run(&self, mut shutdown_rx: watch::Receiver<bool>) {
if !self.config.enabled {
tracing::info!("Scheduler disabled; skipping startup");
return;
}
if let Err(error) = self.sync_config_jobs() {
tracing::error!(error = %error, "Failed to sync scheduler config jobs");
}
let tick_ms = self.config.tick_resolution_ms.max(100);
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(tick_ms));
tracing::info!(tick_resolution_ms = tick_ms, "Scheduler started");
loop {
tokio::select! {
_ = ticker.tick() => {
if let Err(error) = self.process_tick().await {
tracing::error!(error = %error, "Scheduler tick failed");
}
}
changed = shutdown_rx.changed() => {
if changed.is_ok() && *shutdown_rx.borrow() {
tracing::info!("Scheduler shutdown requested");
break;
}
}
}
}
}
fn sync_config_jobs(&self) -> anyhow::Result<()> {
let now = Utc::now();
for job in self.config.effective_jobs(&crate::config::TimeConfig { timezone: self.timezone.name().to_string() }) {
let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?;
self.store.upsert_scheduler_job(&runtime.to_upsert())?;
}
Ok(())
}
async fn process_tick(&self) -> anyhow::Result<()> {
let now = Utc::now();
let jobs = self.store.list_scheduler_jobs(true)?;
for record in jobs {
let Some(mut job) = RuntimeJob::from_record(&record, self.config.misfire_policy, self.timezone)? else {
continue;
};
if record.next_fire_at.is_none() && job.next_fire_at.is_some() {
self.store.update_scheduler_job_runtime(
&job.id,
job.state.clone(),
job.last_status.clone(),
job.last_error.as_deref(),
job.run_count,
job.last_fired_at,
job.next_fire_at,
job.paused_at,
job.completed_at,
)?;
}
if !job.is_due(now) {
continue;
}
self.store.update_scheduler_job_runtime(
&job.id,
SchedulerJobState::Running,
job.last_status.clone(),
job.last_error.as_deref(),
job.run_count,
job.last_fired_at,
job.next_fire_at,
job.paused_at,
job.completed_at,
)?;
let execution_result = self.execute_job(&job).await;
job.after_execution(
now,
execution_result.as_ref().err().map(|err| err.to_string()),
self.config.misfire_policy,
self.timezone,
)?;
let status = if execution_result.is_ok() {
Some(SchedulerJobStatus::Ok)
} else {
Some(SchedulerJobStatus::Error)
};
if let Err(error) = &execution_result {
tracing::error!(job_id = %job.id, error = %error, "Scheduler job failed");
}
self.store.update_scheduler_job_runtime(
&job.id,
job.state.clone(),
status,
job.last_error.as_deref(),
job.run_count,
job.last_fired_at,
job.next_fire_at,
job.paused_at,
job.completed_at,
)?;
}
Ok(())
}
async fn execute_job(&self, job: &RuntimeJob) -> anyhow::Result<()> {
match job.kind {
SchedulerJobKind::OutboundMessage => {
let message = build_outbound_message(job)?;
self.bus.publish_outbound(message).await?;
}
SchedulerJobKind::InternalEvent => {
execute_internal_event(&self.session_manager, job).await?;
}
SchedulerJobKind::AgentTask => {
let outbound_messages = execute_agent_task(&self.session_manager, job).await?;
for message in outbound_messages {
self.bus.publish_outbound(message).await?;
}
}
}
Ok(())
}
}
#[derive(Debug, Clone)]
struct RuntimeJob {
id: String,
kind: SchedulerJobKind,
schedule: SchedulerSchedule,
target: SchedulerJobTarget,
payload: serde_json::Value,
enabled: bool,
state: SchedulerJobState,
last_status: Option<SchedulerJobStatus>,
last_error: Option<String>,
run_count: i64,
max_runs: Option<i64>,
last_fired_at: Option<i64>,
next_fire_at: Option<i64>,
paused_at: Option<i64>,
completed_at: Option<i64>,
interval_secs: i64,
startup_delay_secs: i64,
}
impl RuntimeJob {
fn from_config(
job: &SchedulerJobConfig,
now: DateTime<Utc>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Self> {
let schedule = job.resolved_schedule()?;
let initial_state = if job.enabled {
SchedulerJobState::Scheduled
} else {
SchedulerJobState::Paused
};
let next_fire_at = if job.enabled {
compute_initial_next_fire_at(&schedule, now, None, misfire_policy, timezone)?
} else {
None
};
Ok(Self {
id: job.id.clone(),
kind: job.kind.clone(),
interval_secs: job.interval_secs as i64,
startup_delay_secs: job.startup_delay_secs as i64,
schedule,
target: job.target.clone(),
payload: job.payload.clone(),
enabled: job.enabled,
state: initial_state,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at,
paused_at: None,
completed_at: None,
})
}
fn from_record(
record: &SchedulerJobRecord,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<Self>> {
let kind = match record.kind.as_str() {
"internal_event" => SchedulerJobKind::InternalEvent,
"outbound_message" => SchedulerJobKind::OutboundMessage,
"agent_task" => SchedulerJobKind::AgentTask,
other => {
tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind");
return Ok(None);
}
};
let schedule = deserialize_schedule(&record.schedule, record.interval_secs, record.startup_delay_secs)?;
let now = Utc::now();
let next_fire_at = match (record.enabled, record.state.clone(), record.next_fire_at) {
(false, _, _) => None,
(_, SchedulerJobState::Paused, _) => None,
(_, SchedulerJobState::Completed, _) => None,
(_, _, some_next) if some_next.is_some() => some_next,
_ => compute_initial_next_fire_at(&schedule, now, record.last_fired_at, misfire_policy, timezone)?,
};
Ok(Some(Self {
id: record.id.clone(),
kind,
schedule,
target: record.target.clone().try_into()?,
payload: record.payload.clone(),
enabled: record.enabled,
state: record.state.clone(),
last_status: record.last_status.clone(),
last_error: record.last_error.clone(),
run_count: record.run_count,
max_runs: record.max_runs,
last_fired_at: record.last_fired_at,
next_fire_at,
paused_at: record.paused_at,
completed_at: record.completed_at,
interval_secs: record.interval_secs,
startup_delay_secs: record.startup_delay_secs,
}))
}
fn is_due(&self, now: DateTime<Utc>) -> bool {
self.enabled
&& self.state == SchedulerJobState::Scheduled
&& self.next_fire_at.map(|value| value <= now.timestamp_millis()).unwrap_or(false)
}
fn after_execution(
&mut self,
now: DateTime<Utc>,
last_error: Option<String>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<()> {
self.run_count += 1;
self.last_fired_at = Some(now.timestamp_millis());
self.last_error = last_error;
if self.schedule.is_one_shot() {
self.state = SchedulerJobState::Completed;
self.next_fire_at = None;
self.completed_at = Some(now.timestamp_millis());
return Ok(());
}
if let Some(max_runs) = self.max_runs {
if self.run_count >= max_runs {
self.state = SchedulerJobState::Completed;
self.next_fire_at = None;
self.completed_at = Some(now.timestamp_millis());
return Ok(());
}
}
let reference_ms = self.next_fire_at.or(self.last_fired_at);
self.state = SchedulerJobState::Scheduled;
self.completed_at = None;
self.next_fire_at = compute_next_fire_at(&self.schedule, now, reference_ms, misfire_policy, timezone)?;
Ok(())
}
fn to_upsert(&self) -> SchedulerJobUpsert {
SchedulerJobUpsert {
id: self.id.clone(),
kind: match self.kind {
SchedulerJobKind::InternalEvent => "internal_event".to_string(),
SchedulerJobKind::OutboundMessage => "outbound_message".to_string(),
SchedulerJobKind::AgentTask => "agent_task".to_string(),
},
schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})),
interval_secs: self.interval_secs,
startup_delay_secs: self.startup_delay_secs,
target: serde_json::to_value(&self.target).unwrap_or_else(|_| serde_json::json!({})),
payload: self.payload.clone(),
enabled: self.enabled,
state: self.state.clone(),
last_status: self.last_status.clone(),
last_error: self.last_error.clone(),
run_count: self.run_count,
max_runs: self.max_runs,
last_fired_at: self.last_fired_at,
next_fire_at: self.next_fire_at,
paused_at: self.paused_at,
completed_at: self.completed_at,
}
}
}
fn deserialize_schedule(
schedule_json: &serde_json::Value,
interval_secs: i64,
startup_delay_secs: i64,
) -> anyhow::Result<SchedulerSchedule> {
if !schedule_json.is_null() && schedule_json != &serde_json::json!({}) {
return Ok(serde_json::from_value(schedule_json.clone())?);
}
if interval_secs > 0 {
return Ok(SchedulerSchedule::Interval {
seconds: interval_secs as u64,
startup_delay_secs: startup_delay_secs as u64,
});
}
anyhow::bail!("scheduler job is missing schedule definition")
}
fn compute_initial_next_fire_at(
schedule: &SchedulerSchedule,
now: DateTime<Utc>,
last_fired_at: Option<i64>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<i64>> {
match last_fired_at {
Some(last_fired_at) => compute_next_fire_at(schedule, now, Some(last_fired_at), misfire_policy, timezone),
None => match schedule {
SchedulerSchedule::Delay { seconds } => Ok(Some((now + ChronoDuration::seconds(*seconds as i64)).timestamp_millis())),
SchedulerSchedule::Interval {
seconds,
startup_delay_secs,
} => {
let delay = if *startup_delay_secs > 0 { *startup_delay_secs } else { *seconds };
Ok(Some((now + ChronoDuration::seconds(delay as i64)).timestamp_millis()))
}
SchedulerSchedule::At { timestamp } => Ok(Some(parse_rfc3339_to_utc(timestamp)?.timestamp_millis())),
SchedulerSchedule::Cron { expression } => {
let schedule = parse_scheduler_cron(expression)?;
let local_now = now.with_timezone(&timezone);
Ok(schedule.after(&local_now).next().map(|next| next.with_timezone(&Utc).timestamp_millis()))
}
},
}
}
fn compute_next_fire_at(
schedule: &SchedulerSchedule,
now: DateTime<Utc>,
reference_ms: Option<i64>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<i64>> {
match schedule {
SchedulerSchedule::Delay { .. } | SchedulerSchedule::At { .. } => Ok(None),
SchedulerSchedule::Interval { seconds, .. } => {
let interval_ms = (*seconds as i64) * 1_000;
let baseline = reference_ms.unwrap_or_else(|| now.timestamp_millis());
let next_ms = match misfire_policy {
SchedulerMisfirePolicy::Skip => now.timestamp_millis() + interval_ms,
SchedulerMisfirePolicy::CatchUp => {
let mut candidate = baseline + interval_ms;
while candidate <= now.timestamp_millis() {
candidate += interval_ms;
}
candidate
}
};
Ok(Some(next_ms))
}
SchedulerSchedule::Cron { expression } => {
let schedule = parse_scheduler_cron(expression)?;
let anchor = match misfire_policy {
SchedulerMisfirePolicy::Skip => now.with_timezone(&timezone),
SchedulerMisfirePolicy::CatchUp => reference_ms
.and_then(ts_millis_to_utc)
.map(|value| value.with_timezone(&timezone))
.unwrap_or_else(|| now.with_timezone(&timezone)),
};
Ok(schedule.after(&anchor).next().map(|next| next.with_timezone(&Utc).timestamp_millis()))
}
}
}
fn parse_rfc3339_to_utc(value: &str) -> anyhow::Result<DateTime<Utc>> {
Ok(DateTime::parse_from_rfc3339(value)?.with_timezone(&Utc))
}
fn parse_scheduler_cron(expression: &str) -> anyhow::Result<cron::Schedule> {
let normalized = normalize_cron_expression(expression);
Ok(cron::Schedule::from_str(&normalized)?)
}
fn normalize_cron_expression(expression: &str) -> String {
let parts: Vec<&str> = expression.split_whitespace().collect();
if parts.len() == 5 {
format!("0 {}", expression.trim())
} else {
expression.trim().to_string()
}
}
fn ts_millis_to_utc(value: i64) -> Option<DateTime<Utc>> {
Utc.timestamp_millis_opt(value).single()
}
fn build_outbound_message(job: &RuntimeJob) -> anyhow::Result<OutboundMessage> {
let channel = job
.target
.channel
.clone()
.ok_or_else(|| anyhow::anyhow!("outbound scheduler job requires target.channel"))?;
let chat_id = job
.target
.chat_id
.clone()
.ok_or_else(|| anyhow::anyhow!("outbound scheduler job requires target.chat_id"))?;
let content = job
.payload
.get("content")
.and_then(|value| value.as_str())
.ok_or_else(|| anyhow::anyhow!("outbound scheduler job payload.content must be a string"))?;
let mut metadata = HashMap::new();
metadata.insert("scheduler_job_id".to_string(), job.id.clone());
Ok(OutboundMessage::assistant(
channel,
chat_id,
content.to_string(),
job.target.reply_to.clone(),
metadata,
))
}
async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJob) -> anyhow::Result<()> {
let event = job
.payload
.get("event")
.and_then(|value| value.as_str())
.unwrap_or("session_cleanup");
match event {
"session_cleanup" => {
let removed = session_manager.cleanup_expired_sessions().await;
tracing::info!(job_id = %job.id, removed, "Scheduler session cleanup completed");
Ok(())
}
"memory_maintenance" => {
let results = session_manager
.run_memory_maintenance_for_all_scopes(job.last_fired_at)
.await?;
for result in &results {
tracing::info!(
job_id = %job.id,
scope_key = %result.scope_key,
user_facts = result.output.user_facts.len(),
preferences = result.output.preferences.len(),
behavior_patterns = result.output.behavior_patterns.len(),
merges = result.output.merges.len(),
conflicts = result.output.conflicts.len(),
low_value = result.output.low_value_ids.len(),
"Scheduler completed memory maintenance model run"
);
}
tracing::info!(job_id = %job.id, scope_count = results.len(), "Scheduler memory maintenance triggered");
Ok(())
}
other => anyhow::bail!("unsupported internal scheduler event: {}", other),
}
}
async fn execute_agent_task(
session_manager: &SessionManager,
job: &RuntimeJob,
) -> anyhow::Result<Vec<OutboundMessage>> {
let channel_name = job
.target
.channel
.as_deref()
.ok_or_else(|| anyhow::anyhow!("agent_task requires target.channel"))?;
let chat_id = job
.target
.chat_id
.as_deref()
.ok_or_else(|| anyhow::anyhow!("agent_task requires target.chat_id"))?;
let prompt = job
.payload
.get("prompt")
.and_then(|value| value.as_str())
.ok_or_else(|| anyhow::anyhow!("agent_task payload.prompt must be a string"))?;
let options = parse_scheduled_agent_task_options(job)?;
session_manager
.run_scheduled_agent_task(channel_name, chat_id, prompt, options)
.await
.map_err(|error| anyhow::anyhow!(error.to_string()))
}
fn parse_scheduled_agent_task_options(job: &RuntimeJob) -> anyhow::Result<ScheduledAgentTaskOptions> {
let sender_id = job
.payload
.get("sender_id")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let fresh_session = job
.payload
.get("fresh_session")
.and_then(|value| value.as_bool())
.unwrap_or(false);
let system_prompt = job
.payload
.get("system_prompt")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let agent = job
.payload
.get("agent")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let metadata = parse_metadata_map(job.payload.get("metadata"))?;
Ok(ScheduledAgentTaskOptions {
sender_id,
fresh_session,
system_prompt,
metadata,
agent,
})
}
fn parse_metadata_map(value: Option<&serde_json::Value>) -> anyhow::Result<HashMap<String, String>> {
let Some(value) = value else {
return Ok(HashMap::new());
};
let object = value
.as_object()
.ok_or_else(|| anyhow::anyhow!("agent_task payload.metadata must be an object"))?;
let mut metadata = HashMap::with_capacity(object.len());
for (key, value) in object {
let stringified = match value {
serde_json::Value::String(inner) => inner.clone(),
serde_json::Value::Null => "null".to_string(),
serde_json::Value::Bool(inner) => inner.to_string(),
serde_json::Value::Number(inner) => inner.to_string(),
_ => {
return Err(anyhow::anyhow!(
"agent_task payload.metadata field '{}' must be a string, number, bool, or null",
key
))
}
};
metadata.insert(key.clone(), stringified);
}
Ok(metadata)
}
#[cfg(test)]
mod agent_task_tests {
use super::*;
#[test]
fn runtime_job_from_record_supports_agent_task_kind() {
let record = SchedulerJobRecord {
id: "agent.daily_summary".to_string(),
kind: "agent_task".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 300
}),
interval_secs: 0,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "feishu",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({
"prompt": "请总结今天待办"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: Some(1_700_000_010_000),
paused_at: None,
completed_at: None,
created_at: 1_700_000_000_000,
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip, chrono_tz::Asia::Shanghai)
.unwrap()
.unwrap();
assert_eq!(job.kind, SchedulerJobKind::AgentTask);
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
}
#[test]
fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() {
let job = RuntimeJob {
id: "agent.daily_summary".to_string(),
kind: SchedulerJobKind::AgentTask,
schedule: SchedulerSchedule::Interval {
seconds: 300,
startup_delay_secs: 0,
},
target: SchedulerJobTarget {
channel: Some("feishu".to_string()),
chat_id: Some("oc_demo".to_string()),
reply_to: None,
},
payload: serde_json::json!({
"prompt": "请总结今天待办",
"agent": "planner",
"sender_id": "scheduler-bot",
"fresh_session": true,
"system_prompt": "你是日报助手",
"metadata": {
"job_type": "daily_summary",
"priority": 1,
"urgent": false
}
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
interval_secs: 300,
startup_delay_secs: 0,
};
let options = parse_scheduled_agent_task_options(&job).unwrap();
assert_eq!(options.agent.as_deref(), Some("planner"));
assert_eq!(options.sender_id.as_deref(), Some("scheduler-bot"));
assert!(options.fresh_session);
assert_eq!(options.system_prompt.as_deref(), Some("你是日报助手"));
assert_eq!(options.metadata.get("job_type").map(String::as_str), Some("daily_summary"));
assert_eq!(options.metadata.get("priority").map(String::as_str), Some("1"));
assert_eq!(options.metadata.get("urgent").map(String::as_str), Some("false"));
}
}
impl TryFrom<serde_json::Value> for SchedulerJobTarget {
type Error = anyhow::Error;
fn try_from(value: serde_json::Value) -> Result<Self, Self::Error> {
Ok(serde_json::from_value(value)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
use crate::bus::MessageBus;
use crate::config::{BUILTIN_MEMORY_MAINTENANCE_JOB_ID, LLMProviderConfig};
use crate::gateway::session::SessionManager;
use crate::skills::SkillRuntime;
use crate::storage::{SchedulerJobUpsert, SessionStore};
#[test]
fn runtime_job_skip_policy_advances_from_now() {
let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap();
let next = compute_next_fire_at(
&SchedulerSchedule::Interval {
seconds: 60,
startup_delay_secs: 0,
},
now,
Some(now.timestamp_millis() - 10 * 60 * 1_000),
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
assert_eq!(next, now.timestamp_millis() + 60_000);
}
#[test]
fn runtime_job_catch_up_policy_moves_past_now() {
let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap();
let next = compute_next_fire_at(
&SchedulerSchedule::Interval {
seconds: 60,
startup_delay_secs: 0,
},
now,
Some(now.timestamp_millis() - 10 * 60 * 1_000),
SchedulerMisfirePolicy::CatchUp,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
assert!(next > now.timestamp_millis());
assert_eq!((next - now.timestamp_millis()) % 60_000, 0);
}
#[test]
fn runtime_job_from_record_uses_persisted_schedule() {
let record = SchedulerJobRecord {
id: "heartbeat".to_string(),
kind: "outbound_message".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 120,
"startup_delay_secs": 10
}),
interval_secs: 0,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "feishu",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({"content": "hello"}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: Some(1_700_000_010_000),
paused_at: None,
completed_at: None,
created_at: 1_700_000_000_000,
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip, chrono_tz::Asia::Shanghai)
.unwrap()
.unwrap();
assert_eq!(job.schedule, SchedulerSchedule::Interval {
seconds: 120,
startup_delay_secs: 10,
});
assert_eq!(job.next_fire_at, Some(1_700_000_010_000));
}
#[tokio::test]
async fn process_tick_persists_initial_next_fire_at_for_db_created_jobs() {
let store = Arc::new(SessionStore::in_memory().unwrap());
store
.upsert_scheduler_job(&SchedulerJobUpsert {
id: "massage_reminder".to_string(),
kind: "outbound_message".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 60
}),
interval_secs: 60,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "feishu",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({
"content": "ping"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: Some(1),
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
})
.unwrap();
let provider_config = LLMProviderConfig {
provider_type: "openai".to_string(),
name: "default".to_string(),
base_url: "http://localhost".to_string(),
api_key: "test-key".to_string(),
extra_headers: HashMap::new(),
llm_timeout_secs: 30,
model_id: "test-model".to_string(),
temperature: Some(0.0),
max_tokens: None,
model_extra: HashMap::new(),
max_tool_iterations: 4,
tool_result_max_chars: 20_000,
context_tool_result_trim_chars: 20_000,
};
let session_manager = SessionManager::new(
4,
100,
false,
"Asia/Shanghai".to_string(),
provider_config.clone(),
HashMap::from([("default".to_string(), provider_config)]),
Arc::new(SkillRuntime::default()),
)
.unwrap();
let scheduler = Scheduler::new(
MessageBus::new(8),
SchedulerConfig {
enabled: true,
tick_resolution_ms: 1000,
worker_queue_capacity: 64,
misfire_policy: SchedulerMisfirePolicy::Skip,
jobs: Vec::new(),
},
chrono_tz::Asia::Shanghai,
store.clone(),
session_manager,
);
scheduler.process_tick().await.unwrap();
let saved = store.get_scheduler_job("massage_reminder").unwrap().unwrap();
assert!(saved.next_fire_at.is_some());
assert_eq!(saved.run_count, 0);
assert_eq!(saved.state, SchedulerJobState::Scheduled);
}
#[test]
fn sync_config_jobs_persists_builtin_memory_maintenance_job() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let provider_config = LLMProviderConfig {
provider_type: "openai".to_string(),
name: "default".to_string(),
base_url: "http://localhost".to_string(),
api_key: "test-key".to_string(),
extra_headers: HashMap::new(),
llm_timeout_secs: 30,
model_id: "test-model".to_string(),
temperature: Some(0.0),
max_tokens: None,
model_extra: HashMap::new(),
max_tool_iterations: 4,
tool_result_max_chars: 20_000,
context_tool_result_trim_chars: 20_000,
};
let session_manager = SessionManager::new(
4,
100,
false,
"Asia/Shanghai".to_string(),
provider_config.clone(),
HashMap::from([("default".to_string(), provider_config)]),
Arc::new(SkillRuntime::default()),
)
.unwrap();
let scheduler = Scheduler::new(
MessageBus::new(8),
SchedulerConfig::default(),
chrono_tz::Asia::Shanghai,
store.clone(),
session_manager,
);
scheduler.sync_config_jobs().unwrap();
let saved = store
.get_scheduler_job(BUILTIN_MEMORY_MAINTENANCE_JOB_ID)
.unwrap()
.unwrap();
assert_eq!(saved.id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert_eq!(saved.kind, "internal_event");
assert!(saved.enabled);
assert_eq!(saved.state, SchedulerJobState::Scheduled);
assert_eq!(saved.payload.get("event").and_then(|value| value.as_str()), Some("memory_maintenance"));
assert_eq!(
saved.schedule,
serde_json::json!({
"type": "cron",
"expression": "0 */4 * * *"
})
);
assert_eq!(saved.payload.get("local_time").and_then(|value| value.as_str()), Some("every_4_hours"));
assert!(saved.next_fire_at.is_some());
}
#[test]
fn cron_schedule_uses_configured_timezone() {
let now = Utc.with_ymd_and_hms(2026, 4, 23, 18, 0, 0).single().unwrap();
let next = compute_next_fire_at(
&SchedulerSchedule::Cron {
expression: "0 3 * * *".to_string(),
},
now,
None,
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
let next_utc = ts_millis_to_utc(next).unwrap();
assert_eq!(next_utc, Utc.with_ymd_and_hms(2026, 4, 23, 19, 0, 0).single().unwrap());
}
}