1842 lines
63 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc};
use chrono_tz::Tz;
use tokio::sync::watch;
use crate::bus::{MessageBus, OutboundMessage};
use crate::config::{
SchedulerConfig, SchedulerJobConfig, SchedulerJobKind, SchedulerJobTarget,
SchedulerMisfirePolicy, SchedulerSchedule,
};
use crate::storage::{
SchedulerJobRecord, SchedulerJobRepository, SchedulerJobState, SchedulerJobStatus,
SchedulerJobUpsert,
};
#[derive(Debug, Clone, Default)]
pub struct ScheduledAgentTaskOptions {
pub sender_id: Option<String>,
pub system_prompt: Option<String>,
pub metadata: HashMap<String, String>,
pub agent: Option<String>,
}
#[derive(Debug, Clone)]
pub struct MaintenanceRunSummary {
pub scope_key: String,
pub merges: usize,
pub conflicts: usize,
pub low_value: usize,
}
#[async_trait]
pub trait AgentTaskExecutor: Send + Sync {
async fn execute(
&self,
channel_name: &str,
chat_id: &str,
prompt: &str,
options: ScheduledAgentTaskOptions,
) -> anyhow::Result<Vec<OutboundMessage>>;
/// 执行 SilentAgentTask支持 session_chat_id 和 notification_chat_id 分离
async fn execute_silent(
&self,
channel_name: &str,
session_chat_id: &str,
notification_chat_id: Option<&str>,
prompt: &str,
options: ScheduledAgentTaskOptions,
) -> anyhow::Result<Vec<OutboundMessage>>;
}
#[async_trait]
pub trait MaintenanceExecutor: Send + Sync {
async fn cleanup_expired_sessions(&self) -> usize;
async fn run_memory_maintenance_for_all_scopes(&self) -> anyhow::Result<Vec<MaintenanceRunSummary>>;
}
pub struct Scheduler {
bus: Arc<MessageBus>,
config: SchedulerConfig,
timezone: Tz,
jobs: Arc<dyn SchedulerJobRepository>,
agent_task_executor: Arc<dyn AgentTaskExecutor>,
maintenance_executor: Arc<dyn MaintenanceExecutor>,
}
impl Scheduler {
pub fn new<A, M>(
bus: Arc<MessageBus>,
config: SchedulerConfig,
timezone: Tz,
jobs: Arc<dyn SchedulerJobRepository>,
agent_task_executor: A,
maintenance_executor: M,
) -> Self
where
A: AgentTaskExecutor + 'static,
M: MaintenanceExecutor + 'static,
{
Self {
bus,
config,
timezone,
jobs,
agent_task_executor: Arc::new(agent_task_executor),
maintenance_executor: Arc::new(maintenance_executor),
}
}
pub async fn run(&self, mut shutdown_rx: watch::Receiver<bool>) {
if !self.config.enabled {
tracing::info!("Scheduler disabled; skipping startup");
return;
}
if let Err(error) = self.sync_config_jobs() {
tracing::error!(error = %error, "Failed to sync scheduler config jobs");
}
if let Err(error) = self.recover_interrupted_jobs().await {
tracing::error!(error = %error, "Failed to recover interrupted scheduler jobs");
}
let tick_ms = self.config.tick_resolution_ms.max(100);
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(tick_ms));
tracing::info!(tick_resolution_ms = tick_ms, "Scheduler started");
loop {
tokio::select! {
_ = ticker.tick() => {
if let Err(error) = self.process_tick().await {
tracing::error!(error = %error, "Scheduler tick failed");
}
}
changed = shutdown_rx.changed() => {
if changed.is_ok() && *shutdown_rx.borrow() {
tracing::info!("Scheduler shutdown requested");
break;
}
}
}
}
}
fn sync_config_jobs(&self) -> anyhow::Result<()> {
let now = Utc::now();
for job in self.config.effective_jobs(&crate::config::TimeConfig {
timezone: self.timezone.name().to_string(),
}) {
let runtime =
RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?;
let mut upsert = runtime.to_upsert();
if let Some(existing) = self.jobs.get_scheduler_job(&runtime.id)? {
preserve_persisted_runtime(&mut upsert, &existing);
}
self.jobs.upsert_scheduler_job(&upsert)?;
}
Ok(())
}
async fn recover_interrupted_jobs(&self) -> anyhow::Result<usize> {
let now = Utc::now();
let running_jobs = self.jobs.list_running_scheduler_jobs()?;
let mut recovered_count = 0;
for record in running_jobs {
let error_msg = format!(
"Job interrupted due to application restart at {}",
now.to_rfc3339()
);
let next_fire_at = match record.next_fire_at {
Some(timestamp) if timestamp <= now.timestamp_millis() => {
match self.config.misfire_policy {
SchedulerMisfirePolicy::CatchUp => Some(timestamp),
SchedulerMisfirePolicy::Skip => {
let schedule_result = deserialize_schedule(
&record.schedule,
record.interval_secs,
record.startup_delay_secs,
);
match schedule_result {
Ok(schedule) => compute_next_fire_at(
&schedule,
now,
Some(timestamp),
self.config.misfire_policy,
self.timezone,
)
.ok()
.flatten(),
Err(_) => Some(timestamp),
}
}
}
}
other => other,
};
self.jobs.update_scheduler_job_runtime(
&record.id,
SchedulerJobState::Scheduled,
Some(SchedulerJobStatus::Error),
Some(&error_msg),
record.run_count,
record.last_fired_at,
next_fire_at,
record.paused_at,
record.completed_at,
)?;
recovered_count += 1;
tracing::info!(
job_id = %record.id,
old_next_fire_at = ?record.next_fire_at,
new_next_fire_at = ?next_fire_at,
"Recovered interrupted job from previous session"
);
}
if recovered_count > 0 {
tracing::info!(
recovered_count = recovered_count,
"Scheduler recovered {} interrupted jobs from previous session",
recovered_count
);
}
Ok(recovered_count)
}
async fn process_tick(&self) -> anyhow::Result<()> {
let now = Utc::now();
let jobs = self.jobs.list_scheduler_jobs(true)?;
for record in jobs {
let Some(mut job) =
RuntimeJob::from_record(&record, self.config.misfire_policy, self.timezone)?
else {
continue;
};
if record.next_fire_at.is_none() && job.next_fire_at.is_some() {
self.jobs.update_scheduler_job_runtime(
&job.id,
job.state.clone(),
job.last_status.clone(),
job.last_error.as_deref(),
job.run_count,
job.last_fired_at,
job.next_fire_at,
job.paused_at,
job.completed_at,
)?;
}
if !job.is_due(now) {
continue;
}
self.jobs.update_scheduler_job_runtime(
&job.id,
SchedulerJobState::Running,
job.last_status.clone(),
job.last_error.as_deref(),
job.run_count,
job.last_fired_at,
job.next_fire_at,
job.paused_at,
job.completed_at,
)?;
let execution_result = self.execute_job(&job).await;
job.after_execution(
now,
execution_result.as_ref().err().map(|err| err.to_string()),
self.config.misfire_policy,
self.timezone,
)?;
let status = if execution_result.is_ok() {
Some(SchedulerJobStatus::Ok)
} else {
Some(SchedulerJobStatus::Error)
};
if let Err(error) = &execution_result {
tracing::error!(job_id = %job.id, error = %error, "Scheduler job failed");
}
self.jobs.update_scheduler_job_runtime(
&job.id,
job.state.clone(),
status,
job.last_error.as_deref(),
job.run_count,
job.last_fired_at,
job.next_fire_at,
job.paused_at,
job.completed_at,
)?;
}
Ok(())
}
async fn execute_job(&self, job: &RuntimeJob) -> anyhow::Result<()> {
match job.kind {
SchedulerJobKind::OutboundMessage => {
let message = build_outbound_message(job)?;
self.bus.publish_outbound(message).await?;
}
SchedulerJobKind::InternalEvent => {
execute_internal_event(self.maintenance_executor.as_ref(), job).await?;
}
SchedulerJobKind::AgentTask => {
let outbound_messages = execute_agent_task(
self.agent_task_executor.as_ref(),
job,
required_notification_chat_id(job, "agent_task")?,
)
.await?;
for message in outbound_messages {
self.bus.publish_outbound(message).await?;
}
}
SchedulerJobKind::SilentAgentTask => {
// Session 使用虚拟 chat_idscheduler/job_id
let session_chat_id = resolve_execution_chat_id(job)?;
// ToolContext 使用真实 chat_idtarget.chat_id
let notification_chat_id = job.target.chat_id.as_deref();
tracing::info!(
job_id = %job.id,
session_chat_id = %session_chat_id,
notification_chat_id = ?notification_chat_id,
"Executing silent agent task"
);
// 先提取参数,如果失败需要手动发送错误通知
let prompt = match extract_prompt(job) {
Ok(p) => p,
Err(e) => {
if let Err(notify_error) =
self.notify_silent_agent_task_failure(job, &e).await
{
tracing::error!(
job_id = %job.id,
error = %notify_error,
"Failed to publish silent scheduler failure notification"
);
}
return Err(e);
}
};
let options = match parse_scheduled_agent_task_options(job) {
Ok(o) => o,
Err(e) => {
if let Err(notify_error) =
self.notify_silent_agent_task_failure(job, &e).await
{
tracing::error!(
job_id = %job.id,
error = %notify_error,
"Failed to publish silent scheduler failure notification"
);
}
return Err(e);
}
};
if let Err(error) = self
.agent_task_executor
.execute_silent(
job.target.channel.as_deref().unwrap_or_default(),
&session_chat_id,
notification_chat_id,
prompt,
options,
)
.await
{
if let Err(notify_error) =
self.notify_silent_agent_task_failure(job, &error).await
{
tracing::error!(
job_id = %job.id,
error = %notify_error,
"Failed to publish silent scheduler failure notification"
);
}
return Err(error);
}
}
}
Ok(())
}
async fn notify_silent_agent_task_failure(
&self,
job: &RuntimeJob,
error: &anyhow::Error,
) -> anyhow::Result<()> {
let channel = job
.target
.channel
.clone()
.ok_or_else(|| anyhow::anyhow!("silent_agent_task requires target.channel"))?;
let chat_id = required_notification_chat_id(job, "silent_agent_task")?.to_string();
let mut metadata = HashMap::new();
metadata.insert("scheduler_job_id".to_string(), job.id.clone());
metadata.insert(
"scheduler_job_kind".to_string(),
"silent_agent_task".to_string(),
);
self.bus
.publish_outbound(OutboundMessage::error_notification(
channel,
chat_id,
None, // session_id
format!(
"定时任务执行失败:{}\n{}",
job.id,
summarize_scheduler_error(error)
),
job.target.reply_to.clone(),
metadata,
))
.await
.map_err(|error| anyhow::anyhow!(error.to_string()))
}
}
fn preserve_persisted_runtime(input: &mut SchedulerJobUpsert, existing: &SchedulerJobRecord) {
if !scheduler_job_definition_matches(input, existing) {
return;
}
input.last_status = existing.last_status.clone();
input.last_error = existing.last_error.clone();
input.run_count = existing.run_count;
input.max_runs = existing.max_runs;
input.last_fired_at = existing.last_fired_at;
input.next_fire_at = existing.next_fire_at;
if existing.state == SchedulerJobState::Running {
input.state = SchedulerJobState::Scheduled;
input.paused_at = None;
input.completed_at = None;
return;
}
input.state = existing.state.clone();
input.paused_at = existing.paused_at;
input.completed_at = existing.completed_at;
}
fn scheduler_job_definition_matches(
input: &SchedulerJobUpsert,
existing: &SchedulerJobRecord,
) -> bool {
let input_schedule = serde_json::from_value::<SchedulerSchedule>(input.schedule.clone()).ok();
let existing_schedule =
deserialize_schedule(&existing.schedule, existing.interval_secs, existing.startup_delay_secs)
.ok();
let input_target = serde_json::from_value::<SchedulerJobTarget>(input.target.clone()).ok();
let existing_target = serde_json::from_value::<SchedulerJobTarget>(existing.target.clone()).ok();
let targets_match = match (input_target, existing_target) {
(Some(input_target), Some(existing_target)) => {
input_target.channel == existing_target.channel
&& input_target.chat_id == existing_target.chat_id
&& input_target.session_chat_id == existing_target.session_chat_id
&& input_target.reply_to == existing_target.reply_to
}
(None, None) => true,
_ => false,
};
input.kind == existing.kind
&& input_schedule == existing_schedule
&& input.interval_secs == existing.interval_secs
&& input.startup_delay_secs == existing.startup_delay_secs
&& targets_match
&& input.payload == existing.payload
&& input.enabled == existing.enabled
}
#[derive(Debug, Clone)]
struct RuntimeJob {
id: String,
kind: SchedulerJobKind,
schedule: SchedulerSchedule,
target: SchedulerJobTarget,
payload: serde_json::Value,
enabled: bool,
state: SchedulerJobState,
last_status: Option<SchedulerJobStatus>,
last_error: Option<String>,
run_count: i64,
max_runs: Option<i64>,
last_fired_at: Option<i64>,
next_fire_at: Option<i64>,
paused_at: Option<i64>,
completed_at: Option<i64>,
interval_secs: i64,
startup_delay_secs: i64,
}
impl RuntimeJob {
fn from_config(
job: &SchedulerJobConfig,
now: DateTime<Utc>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Self> {
let schedule = job.resolved_schedule()?;
let (interval_secs, startup_delay_secs) = match &schedule {
SchedulerSchedule::Interval {
seconds,
startup_delay_secs,
} => (*seconds as i64, *startup_delay_secs as i64),
_ => (0, 0),
};
let initial_state = if job.enabled {
SchedulerJobState::Scheduled
} else {
SchedulerJobState::Paused
};
let next_fire_at = if job.enabled {
compute_initial_next_fire_at(&schedule, now, None, misfire_policy, timezone)?
} else {
None
};
Ok(Self {
id: job.id.clone(),
kind: job.kind.clone(),
interval_secs,
startup_delay_secs,
schedule,
target: job.target.clone(),
payload: job.payload.clone(),
enabled: job.enabled,
state: initial_state,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at,
paused_at: None,
completed_at: None,
})
}
fn from_record(
record: &SchedulerJobRecord,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<Self>> {
let kind = match record.kind.as_str() {
"internal_event" => SchedulerJobKind::InternalEvent,
"outbound_message" => SchedulerJobKind::OutboundMessage,
"agent_task" => SchedulerJobKind::AgentTask,
"silent_agent_task" => SchedulerJobKind::SilentAgentTask,
other => {
tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind");
return Ok(None);
}
};
let schedule = deserialize_schedule(
&record.schedule,
record.interval_secs,
record.startup_delay_secs,
)?;
let now = Utc::now();
let next_fire_at = match (record.enabled, record.state.clone(), record.next_fire_at) {
(false, _, _) => None,
(_, SchedulerJobState::Paused, _) => None,
(_, SchedulerJobState::Completed, _) => None,
(_, _, some_next) if some_next.is_some() => some_next,
_ => compute_initial_next_fire_at(
&schedule,
now,
record.last_fired_at,
misfire_policy,
timezone,
)?,
};
Ok(Some(Self {
id: record.id.clone(),
kind,
schedule,
target: record.target.clone().try_into()?,
payload: record.payload.clone(),
enabled: record.enabled,
state: record.state.clone(),
last_status: record.last_status.clone(),
last_error: record.last_error.clone(),
run_count: record.run_count,
max_runs: record.max_runs,
last_fired_at: record.last_fired_at,
next_fire_at,
paused_at: record.paused_at,
completed_at: record.completed_at,
interval_secs: record.interval_secs,
startup_delay_secs: record.startup_delay_secs,
}))
}
fn is_due(&self, now: DateTime<Utc>) -> bool {
self.enabled
&& self.state == SchedulerJobState::Scheduled
&& self
.next_fire_at
.map(|value| value <= now.timestamp_millis())
.unwrap_or(false)
}
fn after_execution(
&mut self,
now: DateTime<Utc>,
last_error: Option<String>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<()> {
self.run_count += 1;
self.last_fired_at = Some(now.timestamp_millis());
self.last_error = last_error;
if self.schedule.is_one_shot() {
self.state = SchedulerJobState::Completed;
self.next_fire_at = None;
self.completed_at = Some(now.timestamp_millis());
return Ok(());
}
if let Some(max_runs) = self.max_runs {
if self.run_count >= max_runs {
self.state = SchedulerJobState::Completed;
self.next_fire_at = None;
self.completed_at = Some(now.timestamp_millis());
return Ok(());
}
}
let reference_ms = self.next_fire_at.or(self.last_fired_at);
self.state = SchedulerJobState::Scheduled;
self.completed_at = None;
self.next_fire_at =
compute_next_fire_at(&self.schedule, now, reference_ms, misfire_policy, timezone)?;
Ok(())
}
fn to_upsert(&self) -> SchedulerJobUpsert {
SchedulerJobUpsert {
id: self.id.clone(),
kind: match self.kind {
SchedulerJobKind::InternalEvent => "internal_event".to_string(),
SchedulerJobKind::OutboundMessage => "outbound_message".to_string(),
SchedulerJobKind::AgentTask => "agent_task".to_string(),
SchedulerJobKind::SilentAgentTask => "silent_agent_task".to_string(),
},
schedule: serde_json::to_value(&self.schedule)
.unwrap_or_else(|_| serde_json::json!({})),
interval_secs: self.interval_secs,
startup_delay_secs: self.startup_delay_secs,
target: serde_json::to_value(&self.target).unwrap_or_else(|_| serde_json::json!({})),
payload: self.payload.clone(),
enabled: self.enabled,
state: self.state.clone(),
last_status: self.last_status.clone(),
last_error: self.last_error.clone(),
run_count: self.run_count,
max_runs: self.max_runs,
last_fired_at: self.last_fired_at,
next_fire_at: self.next_fire_at,
paused_at: self.paused_at,
completed_at: self.completed_at,
}
}
}
fn deserialize_schedule(
schedule_json: &serde_json::Value,
interval_secs: i64,
startup_delay_secs: i64,
) -> anyhow::Result<SchedulerSchedule> {
if !schedule_json.is_null() && schedule_json != &serde_json::json!({}) {
return Ok(serde_json::from_value(schedule_json.clone())?);
}
if interval_secs > 0 {
return Ok(SchedulerSchedule::Interval {
seconds: interval_secs as u64,
startup_delay_secs: startup_delay_secs as u64,
});
}
anyhow::bail!("scheduler job is missing schedule definition")
}
fn compute_initial_next_fire_at(
schedule: &SchedulerSchedule,
now: DateTime<Utc>,
last_fired_at: Option<i64>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<i64>> {
match last_fired_at {
Some(last_fired_at) => {
compute_next_fire_at(schedule, now, Some(last_fired_at), misfire_policy, timezone)
}
None => match schedule {
SchedulerSchedule::Delay { seconds } => Ok(Some(
(now + ChronoDuration::seconds(*seconds as i64)).timestamp_millis(),
)),
SchedulerSchedule::Interval {
seconds,
startup_delay_secs,
} => {
let delay = if *startup_delay_secs > 0 {
*startup_delay_secs
} else {
*seconds
};
Ok(Some(
(now + ChronoDuration::seconds(delay as i64)).timestamp_millis(),
))
}
SchedulerSchedule::At { timestamp } => {
Ok(Some(parse_rfc3339_to_utc(timestamp)?.timestamp_millis()))
}
SchedulerSchedule::Cron { expression } => {
let schedule = parse_scheduler_cron(expression)?;
let local_now = now.with_timezone(&timezone);
Ok(schedule
.after(&local_now)
.next()
.map(|next| next.with_timezone(&Utc).timestamp_millis()))
}
},
}
}
fn compute_next_fire_at(
schedule: &SchedulerSchedule,
now: DateTime<Utc>,
reference_ms: Option<i64>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<i64>> {
match schedule {
SchedulerSchedule::Delay { .. } | SchedulerSchedule::At { .. } => Ok(None),
SchedulerSchedule::Interval { seconds, .. } => {
let interval_ms = (*seconds as i64) * 1_000;
let baseline = reference_ms.unwrap_or_else(|| now.timestamp_millis());
let next_ms = match misfire_policy {
SchedulerMisfirePolicy::Skip => now.timestamp_millis() + interval_ms,
SchedulerMisfirePolicy::CatchUp => {
let mut candidate = baseline + interval_ms;
while candidate <= now.timestamp_millis() {
candidate += interval_ms;
}
candidate
}
};
Ok(Some(next_ms))
}
SchedulerSchedule::Cron { expression } => {
let schedule = parse_scheduler_cron(expression)?;
let anchor = match misfire_policy {
SchedulerMisfirePolicy::Skip => now.with_timezone(&timezone),
SchedulerMisfirePolicy::CatchUp => reference_ms
.and_then(ts_millis_to_utc)
.map(|value| value.with_timezone(&timezone))
.unwrap_or_else(|| now.with_timezone(&timezone)),
};
Ok(schedule
.after(&anchor)
.next()
.map(|next| next.with_timezone(&Utc).timestamp_millis()))
}
}
}
fn parse_rfc3339_to_utc(value: &str) -> anyhow::Result<DateTime<Utc>> {
Ok(DateTime::parse_from_rfc3339(value)?.with_timezone(&Utc))
}
fn parse_scheduler_cron(expression: &str) -> anyhow::Result<cron::Schedule> {
let normalized = normalize_cron_expression(expression);
Ok(cron::Schedule::from_str(&normalized)?)
}
fn normalize_cron_expression(expression: &str) -> String {
let parts: Vec<&str> = expression.split_whitespace().collect();
let expression_with_seconds = if parts.len() == 5 {
format!("0 {}", expression.trim())
} else {
expression.trim().to_string()
};
// 转换星期字段为标准 cron 到 cron crate 格式
// 标准 cron: 0=周日, 1=周一, ..., 6=周六, 7=周日
// cron crate: 1=周日, 2=周一, ..., 6=周五, 7=周六
convert_weekday_field(&expression_with_seconds)
}
/// 将标准 cron 的星期字段转换为 cron crate 格式
/// 标准: 0/7=周日, 1=周一, 2=周二, 3=周三, 4=周四, 5=周五, 6=周六
/// crate: 1=周日, 2=周一, 3=周二, 4=周三, 5=周四, 6=周五, 7=周六
fn convert_weekday_field(expression: &str) -> String {
let parts: Vec<&str> = expression.split_whitespace().collect();
if parts.len() != 6 {
return expression.to_string();
}
let weekday_field = parts[5];
let converted = convert_cron_weekday(weekday_field);
format!("{} {} {} {} {} {}", parts[0], parts[1], parts[2], parts[3], parts[4], converted)
}
/// 转换星期表达式中的数字
fn convert_cron_weekday(field: &str) -> String {
if field == "*" || field == "?" {
return field.to_string();
}
// 处理列表(逗号分隔)
let items: Vec<&str> = field.split(',').collect();
let converted_items: Vec<String> = items.iter().map(|item| {
convert_weekday_item(item.trim())
}).collect();
converted_items.join(",")
}
/// 转换单个星期项(可能是范围、步长或单个值)
fn convert_weekday_item(item: &str) -> String {
// 检查是否有步长
if let Some(pos) = item.find('/') {
let base = &item[..pos];
let step = &item[pos + 1..];
let converted_base = convert_weekday_range_or_value(base);
return format!("{}/{}", converted_base, step);
}
// 检查是否是范围
if item.contains('-') {
return convert_weekday_range_or_value(item);
}
// 单个值
convert_single_weekday(item)
}
/// 转换范围或单个值
fn convert_weekday_range_or_value(item: &str) -> String {
let parts: Vec<&str> = item.split('-').collect();
if parts.len() == 2 {
let start = convert_single_weekday(parts[0].trim());
let end = convert_single_weekday(parts[1].trim());
format!("{}-{}", start, end)
} else {
convert_single_weekday(item)
}
}
/// 转换单个星期数字
fn convert_single_weekday(day: &str) -> String {
match day {
"0" | "7" => "1".to_string(), // 周日 -> 1
"1" => "2".to_string(), // 周一 -> 2
"2" => "3".to_string(), // 周二 -> 3
"3" => "4".to_string(), // 周三 -> 4
"4" => "5".to_string(), // 周四 -> 5
"5" => "6".to_string(), // 周五 -> 6
"6" => "7".to_string(), // 周六 -> 7
_ => day.to_string(), // 其他(如字母)保持不变
}
}
fn ts_millis_to_utc(value: i64) -> Option<DateTime<Utc>> {
Utc.timestamp_millis_opt(value).single()
}
fn build_outbound_message(job: &RuntimeJob) -> anyhow::Result<OutboundMessage> {
let channel = job
.target
.channel
.clone()
.ok_or_else(|| anyhow::anyhow!("outbound scheduler job requires target.channel"))?;
let chat_id = job
.target
.chat_id
.clone()
.ok_or_else(|| anyhow::anyhow!("outbound scheduler job requires target.chat_id"))?;
let content = job
.payload
.get("content")
.and_then(|value| value.as_str())
.ok_or_else(|| {
anyhow::anyhow!("outbound scheduler job payload.content must be a string")
})?;
let mut metadata = HashMap::new();
metadata.insert("scheduler_job_id".to_string(), job.id.clone());
Ok(OutboundMessage::scheduler_notification(
channel,
chat_id,
None, // session_id
content.to_string(),
job.target.reply_to.clone(),
metadata,
))
}
async fn execute_internal_event(
maintenance_executor: &dyn MaintenanceExecutor,
job: &RuntimeJob,
) -> anyhow::Result<()> {
let event = job
.payload
.get("event")
.and_then(|value| value.as_str())
.unwrap_or("session_cleanup");
match event {
"session_cleanup" => {
let removed = maintenance_executor.cleanup_expired_sessions().await;
tracing::info!(job_id = %job.id, removed, "Scheduler session cleanup completed");
Ok(())
}
"memory_maintenance" => {
let results = maintenance_executor.run_memory_maintenance_for_all_scopes().await?;
for result in &results {
tracing::info!(
job_id = %job.id,
scope_key = %result.scope_key,
merges = result.merges,
conflicts = result.conflicts,
low_value = result.low_value,
"Scheduler completed memory maintenance model run"
);
}
tracing::info!(job_id = %job.id, scope_count = results.len(), "Scheduler memory maintenance triggered");
Ok(())
}
other => anyhow::bail!("unsupported internal scheduler event: {}", other),
}
}
async fn execute_agent_task(
agent_task_executor: &dyn AgentTaskExecutor,
job: &RuntimeJob,
execution_chat_id: &str,
) -> anyhow::Result<Vec<OutboundMessage>> {
let channel_name = job
.target
.channel
.as_deref()
.ok_or_else(|| anyhow::anyhow!("scheduled agent task requires target.channel"))?;
let prompt = extract_prompt(job)?;
let options = parse_scheduled_agent_task_options(job)?;
agent_task_executor
.execute(channel_name, execution_chat_id, prompt, options)
.await
}
fn extract_prompt(job: &RuntimeJob) -> anyhow::Result<&str> {
job.payload
.get("prompt")
.and_then(|value| value.as_str())
.ok_or_else(|| anyhow::anyhow!("agent_task payload.prompt must be a string"))
}
fn required_notification_chat_id<'a>(
job: &'a RuntimeJob,
kind_name: &str,
) -> anyhow::Result<&'a str> {
job.target
.chat_id
.as_deref()
.ok_or_else(|| anyhow::anyhow!("{} requires target.chat_id", kind_name))
}
fn resolve_execution_chat_id(job: &RuntimeJob) -> anyhow::Result<String> {
match job.kind {
SchedulerJobKind::AgentTask => {
Ok(required_notification_chat_id(job, "agent_task")?.to_string())
}
SchedulerJobKind::SilentAgentTask => Ok(job
.target
.session_chat_id
.clone()
.unwrap_or_else(|| derive_silent_session_chat_id(&job.id))),
_ => anyhow::bail!("execution chat id is only supported for agent task kinds"),
}
}
fn derive_silent_session_chat_id(job_id: &str) -> String {
format!("scheduler/{}", job_id)
}
fn summarize_scheduler_error(error: &anyhow::Error) -> String {
let text = error.to_string().replace('\n', " ");
const MAX_LEN: usize = 240;
if text.chars().count() <= MAX_LEN {
text
} else {
let summary = text.chars().take(MAX_LEN).collect::<String>();
format!("{}...", summary)
}
}
fn parse_scheduled_agent_task_options(
job: &RuntimeJob,
) -> anyhow::Result<ScheduledAgentTaskOptions> {
let sender_id = job
.payload
.get("sender_id")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let system_prompt = job
.payload
.get("system_prompt")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let agent = job
.payload
.get("agent")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let metadata = parse_metadata_map(job.payload.get("metadata"))?;
Ok(ScheduledAgentTaskOptions {
sender_id,
system_prompt,
metadata,
agent,
})
}
fn parse_metadata_map(
value: Option<&serde_json::Value>,
) -> anyhow::Result<HashMap<String, String>> {
let Some(value) = value else {
return Ok(HashMap::new());
};
let object = value
.as_object()
.ok_or_else(|| anyhow::anyhow!("agent_task payload.metadata must be an object"))?;
let mut metadata = HashMap::with_capacity(object.len());
for (key, value) in object {
let stringified = match value {
serde_json::Value::String(inner) => inner.clone(),
serde_json::Value::Null => "null".to_string(),
serde_json::Value::Bool(inner) => inner.to_string(),
serde_json::Value::Number(inner) => inner.to_string(),
_ => {
return Err(anyhow::anyhow!(
"agent_task payload.metadata field '{}' must be a string, number, bool, or null",
key
));
}
};
metadata.insert(key.clone(), stringified);
}
Ok(metadata)
}
#[cfg(test)]
mod agent_task_tests {
use super::*;
#[test]
fn runtime_job_from_record_supports_agent_task_kind() {
let record = SchedulerJobRecord {
id: "agent.daily_summary".to_string(),
kind: "agent_task".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 300
}),
interval_secs: 0,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "test-channel",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({
"prompt": "请总结今天待办"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: Some(1_700_000_010_000),
paused_at: None,
completed_at: None,
created_at: 1_700_000_000_000,
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(
&record,
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
assert_eq!(job.kind, SchedulerJobKind::AgentTask);
assert_eq!(
job.payload.get("prompt").and_then(|value| value.as_str()),
Some("请总结今天待办")
);
}
#[test]
fn runtime_job_from_record_supports_silent_agent_task_kind() {
let record = SchedulerJobRecord {
id: "agent.daily_summary.background".to_string(),
kind: "silent_agent_task".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 300
}),
interval_secs: 0,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "test-channel",
"chat_id": "oc_demo",
"session_chat_id": "scheduler/agent.daily_summary.background"
}),
payload: serde_json::json!({
"prompt": "请后台整理今天待办"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: Some(1_700_000_010_000),
paused_at: None,
completed_at: None,
created_at: 1_700_000_000_000,
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(
&record,
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
assert_eq!(job.kind, SchedulerJobKind::SilentAgentTask);
assert_eq!(
job.target.session_chat_id.as_deref(),
Some("scheduler/agent.daily_summary.background")
);
}
#[test]
fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() {
let job = RuntimeJob {
id: "agent.daily_summary".to_string(),
kind: SchedulerJobKind::AgentTask,
schedule: SchedulerSchedule::Interval {
seconds: 300,
startup_delay_secs: 0,
},
target: SchedulerJobTarget {
channel: Some("test-channel".to_string()),
chat_id: Some("oc_demo".to_string()),
session_chat_id: None,
reply_to: None,
},
payload: serde_json::json!({
"prompt": "请总结今天待办",
"agent": "planner",
"sender_id": "scheduler-bot",
"fresh_session": true,
"system_prompt": "你是日报助手",
"metadata": {
"job_type": "daily_summary",
"priority": 1,
"urgent": false
}
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
interval_secs: 300,
startup_delay_secs: 0,
};
let options = parse_scheduled_agent_task_options(&job).unwrap();
assert_eq!(options.agent.as_deref(), Some("planner"));
assert_eq!(options.sender_id.as_deref(), Some("scheduler-bot"));
assert_eq!(options.system_prompt.as_deref(), Some("你是日报助手"));
assert_eq!(
options.metadata.get("job_type").map(String::as_str),
Some("daily_summary")
);
assert_eq!(
options.metadata.get("priority").map(String::as_str),
Some("1")
);
assert_eq!(
options.metadata.get("urgent").map(String::as_str),
Some("false")
);
}
#[test]
fn resolve_execution_chat_id_uses_dedicated_session_for_silent_agent_tasks() {
let job = RuntimeJob {
id: "agent.daily_summary.background".to_string(),
kind: SchedulerJobKind::SilentAgentTask,
schedule: SchedulerSchedule::Interval {
seconds: 300,
startup_delay_secs: 0,
},
target: SchedulerJobTarget {
channel: Some("test-channel".to_string()),
chat_id: Some("oc_demo".to_string()),
session_chat_id: None,
reply_to: None,
},
payload: serde_json::json!({
"prompt": "请后台整理今天待办"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
interval_secs: 300,
startup_delay_secs: 0,
};
assert_eq!(
resolve_execution_chat_id(&job).unwrap(),
"scheduler/agent.daily_summary.background"
);
}
}
impl TryFrom<serde_json::Value> for SchedulerJobTarget {
type Error = anyhow::Error;
fn try_from(value: serde_json::Value) -> Result<Self, Self::Error> {
Ok(serde_json::from_value(value)?)
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{Datelike, Timelike};
use crate::bus::MessageBus;
use crate::config::BUILTIN_MEMORY_MAINTENANCE_JOB_ID;
use crate::storage::{SchedulerJobUpsert, SessionStore};
#[derive(Clone)]
struct TestAgentTaskExecutor;
#[async_trait::async_trait]
impl AgentTaskExecutor for TestAgentTaskExecutor {
async fn execute(
&self,
_channel_name: &str,
_chat_id: &str,
_prompt: &str,
_options: ScheduledAgentTaskOptions,
) -> anyhow::Result<Vec<OutboundMessage>> {
Ok(Vec::new())
}
async fn execute_silent(
&self,
_channel_name: &str,
_session_chat_id: &str,
_notification_chat_id: Option<&str>,
_prompt: &str,
_options: ScheduledAgentTaskOptions,
) -> anyhow::Result<Vec<OutboundMessage>> {
Ok(Vec::new())
}
}
#[derive(Clone)]
struct TestMaintenanceExecutor;
#[async_trait::async_trait]
impl MaintenanceExecutor for TestMaintenanceExecutor {
async fn cleanup_expired_sessions(&self) -> usize {
0
}
async fn run_memory_maintenance_for_all_scopes(&self) -> anyhow::Result<Vec<MaintenanceRunSummary>> {
Ok(Vec::new())
}
}
fn test_scheduler_services() -> (TestAgentTaskExecutor, TestMaintenanceExecutor) {
(TestAgentTaskExecutor, TestMaintenanceExecutor)
}
#[test]
fn runtime_job_skip_policy_advances_from_now() {
let now = Utc
.timestamp_millis_opt(1_700_000_000_000)
.single()
.unwrap();
let next = compute_next_fire_at(
&SchedulerSchedule::Interval {
seconds: 60,
startup_delay_secs: 0,
},
now,
Some(now.timestamp_millis() - 10 * 60 * 1_000),
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
assert_eq!(next, now.timestamp_millis() + 60_000);
}
#[test]
fn runtime_job_catch_up_policy_moves_past_now() {
let now = Utc
.timestamp_millis_opt(1_700_000_000_000)
.single()
.unwrap();
let next = compute_next_fire_at(
&SchedulerSchedule::Interval {
seconds: 60,
startup_delay_secs: 0,
},
now,
Some(now.timestamp_millis() - 10 * 60 * 1_000),
SchedulerMisfirePolicy::CatchUp,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
assert!(next > now.timestamp_millis());
assert_eq!((next - now.timestamp_millis()) % 60_000, 0);
}
#[test]
fn runtime_job_from_record_uses_persisted_schedule() {
let record = SchedulerJobRecord {
id: "heartbeat".to_string(),
kind: "outbound_message".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 120,
"startup_delay_secs": 10
}),
interval_secs: 0,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "test-channel",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({"content": "hello"}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: Some(1_700_000_010_000),
paused_at: None,
completed_at: None,
created_at: 1_700_000_000_000,
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(
&record,
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
assert_eq!(
job.schedule,
SchedulerSchedule::Interval {
seconds: 120,
startup_delay_secs: 10,
}
);
assert_eq!(job.next_fire_at, Some(1_700_000_010_000));
}
#[tokio::test]
async fn process_tick_persists_initial_next_fire_at_for_db_created_jobs() {
let store = Arc::new(SessionStore::in_memory().unwrap());
store
.upsert_scheduler_job(&SchedulerJobUpsert {
id: "massage_reminder".to_string(),
kind: "outbound_message".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 60
}),
interval_secs: 60,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "test-channel",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({
"content": "ping"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: Some(1),
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
})
.unwrap();
let (agent_task_executor, maintenance_service) = test_scheduler_services();
let scheduler = Scheduler::new(
MessageBus::new(8),
SchedulerConfig {
enabled: true,
tick_resolution_ms: 1000,
worker_queue_capacity: 64,
misfire_policy: SchedulerMisfirePolicy::Skip,
jobs: Vec::new(),
},
chrono_tz::Asia::Shanghai,
store.clone(),
agent_task_executor,
maintenance_service,
);
scheduler.process_tick().await.unwrap();
let saved = store
.get_scheduler_job("massage_reminder")
.unwrap()
.unwrap();
assert!(saved.next_fire_at.is_some());
assert_eq!(saved.run_count, 0);
assert_eq!(saved.state, SchedulerJobState::Scheduled);
}
#[test]
fn sync_config_jobs_persists_builtin_memory_maintenance_job() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let (agent_task_executor, maintenance_service) = test_scheduler_services();
let scheduler = Scheduler::new(
MessageBus::new(8),
SchedulerConfig::default(),
chrono_tz::Asia::Shanghai,
store.clone(),
agent_task_executor,
maintenance_service,
);
scheduler.sync_config_jobs().unwrap();
let saved = store
.get_scheduler_job(BUILTIN_MEMORY_MAINTENANCE_JOB_ID)
.unwrap()
.unwrap();
assert_eq!(saved.id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert_eq!(saved.kind, "internal_event");
assert!(saved.enabled);
assert_eq!(saved.state, SchedulerJobState::Scheduled);
assert_eq!(
saved.payload.get("event").and_then(|value| value.as_str()),
Some("memory_maintenance")
);
assert_eq!(
saved.schedule,
serde_json::json!({
"type": "cron",
"expression": "0 */4 * * *"
})
);
assert_eq!(
saved
.payload
.get("local_time")
.and_then(|value| value.as_str()),
Some("every_4_hours")
);
assert!(saved.next_fire_at.is_some());
}
#[test]
fn sync_config_jobs_preserves_persisted_next_fire_at_for_matching_jobs() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let persisted_next_fire_at = 1_700_000_300_000;
let config_job = SchedulerJobConfig {
id: "agent.heartbeat".to_string(),
enabled: true,
kind: SchedulerJobKind::OutboundMessage,
schedule: Some(SchedulerSchedule::Interval {
seconds: 300,
startup_delay_secs: 0,
}),
startup_delay_secs: 0,
interval_secs: 0,
target: SchedulerJobTarget {
channel: Some("test-channel".to_string()),
chat_id: Some("oc_demo".to_string()),
session_chat_id: None,
reply_to: None,
},
payload: serde_json::json!({
"content": "ping"
}),
};
store
.upsert_scheduler_job(&SchedulerJobUpsert {
id: "agent.heartbeat".to_string(),
kind: "outbound_message".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 300,
"startup_delay_secs": 0
}),
interval_secs: 300,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "test-channel",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({
"content": "ping"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: Some(SchedulerJobStatus::Ok),
last_error: None,
run_count: 3,
max_runs: None,
last_fired_at: Some(1_700_000_000_000),
next_fire_at: Some(persisted_next_fire_at),
paused_at: None,
completed_at: None,
})
.unwrap();
let probe_runtime = RuntimeJob::from_config(
&config_job,
Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap(),
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap();
let probe_existing = store
.get_scheduler_job("agent.heartbeat")
.unwrap()
.unwrap();
let probe_upsert = probe_runtime.to_upsert();
assert!(scheduler_job_definition_matches(&probe_upsert, &probe_existing));
let (agent_task_executor, maintenance_service) = test_scheduler_services();
let scheduler = Scheduler::new(
MessageBus::new(8),
SchedulerConfig {
enabled: true,
tick_resolution_ms: 1000,
worker_queue_capacity: 64,
misfire_policy: SchedulerMisfirePolicy::Skip,
jobs: vec![config_job],
},
chrono_tz::Asia::Shanghai,
store.clone(),
agent_task_executor,
maintenance_service,
);
scheduler.sync_config_jobs().unwrap();
let saved = store
.get_scheduler_job("agent.heartbeat")
.unwrap()
.unwrap();
assert_eq!(saved.next_fire_at, Some(persisted_next_fire_at));
assert_eq!(saved.run_count, 3);
assert_eq!(saved.last_status, Some(SchedulerJobStatus::Ok));
}
#[tokio::test]
async fn silent_agent_task_failure_notifies_primary_chat() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let bus = MessageBus::new(8);
let (agent_task_executor, maintenance_service) = test_scheduler_services();
let scheduler = Scheduler::new(
bus.clone(),
SchedulerConfig {
enabled: true,
tick_resolution_ms: 1000,
worker_queue_capacity: 64,
misfire_policy: SchedulerMisfirePolicy::Skip,
jobs: Vec::new(),
},
chrono_tz::Asia::Shanghai,
store,
agent_task_executor,
maintenance_service,
);
let job = RuntimeJob {
id: "agent.daily_summary.background".to_string(),
kind: SchedulerJobKind::SilentAgentTask,
schedule: SchedulerSchedule::Interval {
seconds: 300,
startup_delay_secs: 0,
},
target: SchedulerJobTarget {
channel: Some("test-channel".to_string()),
chat_id: Some("oc_demo".to_string()),
session_chat_id: Some("scheduler/agent.daily_summary.background".to_string()),
reply_to: None,
},
payload: serde_json::json!({}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
interval_secs: 300,
startup_delay_secs: 0,
};
let error = scheduler.execute_job(&job).await.unwrap_err();
assert!(error.to_string().contains("payload.prompt"));
let outbound = tokio::time::timeout(
std::time::Duration::from_millis(100),
bus.consume_outbound(),
)
.await
.expect("timeout waiting for outbound message")
.expect("bus outbound closed");
assert_eq!(outbound.channel, "test-channel");
assert_eq!(outbound.chat_id, "oc_demo");
assert!(outbound.content.contains("定时任务执行失败"));
assert!(outbound.content.contains("agent.daily_summary.background"));
}
#[test]
fn cron_schedule_uses_configured_timezone() {
let now = Utc
.with_ymd_and_hms(2026, 4, 23, 18, 0, 0)
.single()
.unwrap();
let next = compute_next_fire_at(
&SchedulerSchedule::Cron {
expression: "0 3 * * *".to_string(),
},
now,
None,
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
let next_utc = ts_millis_to_utc(next).unwrap();
assert_eq!(
next_utc,
Utc.with_ymd_and_hms(2026, 4, 23, 19, 0, 0)
.single()
.unwrap()
);
}
#[test]
fn debug_cron_weekday_definitions() {
// 重大发现cron crate 的星期定义是反常规的!
// 1 = 周日, 2 = 周一, ..., 6 = 周五, 7 = 周六
// 0 是无效的!
let test_cases = vec![
("0 9 * * 1", "1=周日"),
("0 9 * * 2", "2=周一"),
("0 9 * * 3", "3=周二"),
("0 9 * * 4", "4=周三"),
("0 9 * * 5", "5=周四"),
("0 9 * * 6", "6=周五"),
("0 9 * * 7", "7=周六"),
("0 9 * * 1-5", "1-5=周日到周四"),
("0 9 * * 2-6", "2-6=周一到周五(这才是正确的工作日)"),
];
// 从周六2026-04-25开始测试
let saturday = Utc.with_ymd_and_hms(2026, 4, 25, 10, 0, 0).single().unwrap();
let shanghai_saturday = saturday.with_timezone(&chrono_tz::Asia::Shanghai);
println!("\n=== 从周六 {} ({:?}) 开始测试 ===", shanghai_saturday, shanghai_saturday.weekday());
for (expr, desc) in &test_cases {
let schedule = parse_scheduler_cron(expr).unwrap();
let next = schedule.after(&shanghai_saturday).next().unwrap();
println!("{}: 下次执行 {} (星期: {:?})", desc, next, next.weekday());
}
// 验证正确的工作日表达式
println!("\n=== 验证正确的工作日表达式 1-5标准 cron===");
let schedule_workday = parse_scheduler_cron("0 9 * * 1-5").unwrap();
let sat_next = schedule_workday.after(&shanghai_saturday).next().unwrap();
println!("周六 -> 1-5 下次执行: {} (星期: {:?})", sat_next, sat_next.weekday());
assert_eq!(sat_next.weekday(), chrono::Weekday::Mon, "1-5 应该从周六跳到周一");
// 从周日开始
let sunday = Utc.with_ymd_and_hms(2026, 4, 26, 10, 0, 0).single().unwrap();
let shanghai_sunday = sunday.with_timezone(&chrono_tz::Asia::Shanghai);
let sun_next = schedule_workday.after(&shanghai_sunday).next().unwrap();
println!("周日 -> 1-5 下次执行: {} (星期: {:?})", sun_next, sun_next.weekday());
assert_eq!(sun_next.weekday(), chrono::Weekday::Mon, "1-5 应该从周日跳到周一");
// 从周一早上7点开始
let shanghai_monday = chrono_tz::Asia::Shanghai.with_ymd_and_hms(2026, 4, 27, 7, 0, 0).single().unwrap();
println!("周一早上7点 -> 1-5 下次执行: {} (星期: {:?})",
schedule_workday.after(&shanghai_monday).next().unwrap(),
schedule_workday.after(&shanghai_monday).next().unwrap().weekday());
}
/// 测试标准 cron 星期转换功能
/// 现在可以使用标准 cron 语法:
/// - 0 或 7 = 周日
/// - 1 = 周一
/// - ...
/// - 6 = 周六
/// 工作日(周一到周五)应该使用 1-5
#[test]
fn standard_cron_weekday_conversion() {
// 测试:标准 cron 的 1-5 应该表示周一到周五
let saturday = Utc.with_ymd_and_hms(2026, 4, 25, 10, 0, 0).single().unwrap();
let shanghai_saturday = saturday.with_timezone(&chrono_tz::Asia::Shanghai);
// 现在使用标准 cron1-5 表示周一到周五
let schedule_std = parse_scheduler_cron("0 9 * * 1-5").unwrap();
let sat_next = schedule_std.after(&shanghai_saturday).next().unwrap();
println!("周六 -> 标准 cron 1-5 下次执行: {} (星期: {:?})", sat_next, sat_next.weekday());
assert_eq!(sat_next.weekday(), chrono::Weekday::Mon, "标准 cron 1-5 应该从周六跳到周一");
// 从周日开始
let sunday = Utc.with_ymd_and_hms(2026, 4, 26, 10, 0, 0).single().unwrap();
let shanghai_sunday = sunday.with_timezone(&chrono_tz::Asia::Shanghai);
let sun_next = schedule_std.after(&shanghai_sunday).next().unwrap();
println!("周日 -> 标准 cron 1-5 下次执行: {} (星期: {:?})", sun_next, sun_next.weekday());
assert_eq!(sun_next.weekday(), chrono::Weekday::Mon, "标准 cron 1-5 应该从周日跳到周一");
// 从周一开始上海时间周一早上7点
let shanghai_monday = chrono_tz::Asia::Shanghai.with_ymd_and_hms(2026, 4, 27, 7, 0, 0).single().unwrap();
let mon_next = schedule_std.after(&shanghai_monday).next().unwrap();
println!("周一早上 -> 标准 cron 1-5 下次执行: {} (星期: {:?})", mon_next, mon_next.weekday());
assert_eq!(mon_next.weekday(), chrono::Weekday::Mon, "标准 cron 1-5 应该当天执行");
assert_eq!(mon_next.hour(), 9, "应该是上海时间9点");
// 从周五开始(应该下周周一)
let friday = Utc.with_ymd_and_hms(2026, 5, 1, 10, 0, 0).single().unwrap(); // 周五
let shanghai_friday = friday.with_timezone(&chrono_tz::Asia::Shanghai);
let fri_next = schedule_std.after(&shanghai_friday).next().unwrap();
println!("周五 -> 标准 cron 1-5 下次执行: {} (星期: {:?})", fri_next, fri_next.weekday());
assert_eq!(fri_next.weekday(), chrono::Weekday::Mon, "标准 cron 1-5 应该从周五跳到下周一");
}
/// 测试转换辅助函数
#[test]
fn test_weekday_conversion_helper() {
// 测试单个值
assert_eq!(convert_single_weekday("0"), "1"); // 周日
assert_eq!(convert_single_weekday("1"), "2"); // 周一
assert_eq!(convert_single_weekday("5"), "6"); // 周五
assert_eq!(convert_single_weekday("6"), "7"); // 周六
assert_eq!(convert_single_weekday("7"), "1"); // 周日(标准 cron 兼容写法)
// 测试范围
assert_eq!(convert_weekday_range_or_value("1-5"), "2-6"); // 周一到周五
assert_eq!(convert_weekday_range_or_value("0-6"), "1-7"); // 周日到周六
assert_eq!(convert_weekday_range_or_value("0-7"), "1-1"); // 周日(循环)
// 测试列表
assert_eq!(convert_cron_weekday("1,3,5"), "2,4,6"); // 周一、三、五
assert_eq!(convert_cron_weekday("0,6"), "1,7"); // 周日和周六
// 测试步长
assert_eq!(convert_weekday_item("*/2"), "*/2"); // 步长保持不变
// 测试混合
assert_eq!(convert_cron_weekday("1-5,7"), "2-6,1"); // 周一到周五 + 周日
// 测试特殊字符
assert_eq!(convert_cron_weekday("*"), "*");
assert_eq!(convert_cron_weekday("?"), "?");
}
}