1697 lines
58 KiB
Rust
1697 lines
58 KiB
Rust
use std::collections::HashMap;
|
||
use std::str::FromStr;
|
||
use std::sync::Arc;
|
||
|
||
use async_trait::async_trait;
|
||
use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc};
|
||
use chrono_tz::Tz;
|
||
use tokio::sync::watch;
|
||
|
||
use crate::bus::{MessageBus, OutboundMessage};
|
||
use crate::config::{
|
||
SchedulerConfig, SchedulerJobConfig, SchedulerJobKind, SchedulerJobTarget,
|
||
SchedulerMisfirePolicy, SchedulerSchedule,
|
||
};
|
||
use crate::storage::{
|
||
SchedulerJobRecord, SchedulerJobRepository, SchedulerJobState, SchedulerJobStatus,
|
||
SchedulerJobUpsert,
|
||
};
|
||
|
||
#[derive(Debug, Clone, Default)]
|
||
pub struct ScheduledAgentTaskOptions {
|
||
pub sender_id: Option<String>,
|
||
pub fresh_session: bool,
|
||
pub system_prompt: Option<String>,
|
||
pub metadata: HashMap<String, String>,
|
||
pub agent: Option<String>,
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
pub struct MaintenanceRunSummary {
|
||
pub scope_key: String,
|
||
pub merges: usize,
|
||
pub conflicts: usize,
|
||
pub low_value: usize,
|
||
}
|
||
|
||
#[async_trait]
|
||
pub trait AgentTaskExecutor: Send + Sync {
|
||
async fn execute(
|
||
&self,
|
||
channel_name: &str,
|
||
chat_id: &str,
|
||
prompt: &str,
|
||
options: ScheduledAgentTaskOptions,
|
||
) -> anyhow::Result<Vec<OutboundMessage>>;
|
||
}
|
||
|
||
#[async_trait]
|
||
pub trait MaintenanceExecutor: Send + Sync {
|
||
async fn cleanup_expired_sessions(&self) -> usize;
|
||
|
||
async fn run_memory_maintenance_for_all_scopes(&self) -> anyhow::Result<Vec<MaintenanceRunSummary>>;
|
||
}
|
||
|
||
pub struct Scheduler {
|
||
bus: Arc<MessageBus>,
|
||
config: SchedulerConfig,
|
||
timezone: Tz,
|
||
jobs: Arc<dyn SchedulerJobRepository>,
|
||
agent_task_executor: Arc<dyn AgentTaskExecutor>,
|
||
maintenance_executor: Arc<dyn MaintenanceExecutor>,
|
||
}
|
||
|
||
impl Scheduler {
|
||
pub fn new<A, M>(
|
||
bus: Arc<MessageBus>,
|
||
config: SchedulerConfig,
|
||
timezone: Tz,
|
||
jobs: Arc<dyn SchedulerJobRepository>,
|
||
agent_task_executor: A,
|
||
maintenance_executor: M,
|
||
) -> Self
|
||
where
|
||
A: AgentTaskExecutor + 'static,
|
||
M: MaintenanceExecutor + 'static,
|
||
{
|
||
Self {
|
||
bus,
|
||
config,
|
||
timezone,
|
||
jobs,
|
||
agent_task_executor: Arc::new(agent_task_executor),
|
||
maintenance_executor: Arc::new(maintenance_executor),
|
||
}
|
||
}
|
||
|
||
pub async fn run(&self, mut shutdown_rx: watch::Receiver<bool>) {
|
||
if !self.config.enabled {
|
||
tracing::info!("Scheduler disabled; skipping startup");
|
||
return;
|
||
}
|
||
|
||
if let Err(error) = self.sync_config_jobs() {
|
||
tracing::error!(error = %error, "Failed to sync scheduler config jobs");
|
||
}
|
||
|
||
let tick_ms = self.config.tick_resolution_ms.max(100);
|
||
let mut ticker = tokio::time::interval(std::time::Duration::from_millis(tick_ms));
|
||
|
||
tracing::info!(tick_resolution_ms = tick_ms, "Scheduler started");
|
||
|
||
loop {
|
||
tokio::select! {
|
||
_ = ticker.tick() => {
|
||
if let Err(error) = self.process_tick().await {
|
||
tracing::error!(error = %error, "Scheduler tick failed");
|
||
}
|
||
}
|
||
changed = shutdown_rx.changed() => {
|
||
if changed.is_ok() && *shutdown_rx.borrow() {
|
||
tracing::info!("Scheduler shutdown requested");
|
||
break;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
fn sync_config_jobs(&self) -> anyhow::Result<()> {
|
||
let now = Utc::now();
|
||
for job in self.config.effective_jobs(&crate::config::TimeConfig {
|
||
timezone: self.timezone.name().to_string(),
|
||
}) {
|
||
let runtime =
|
||
RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?;
|
||
let mut upsert = runtime.to_upsert();
|
||
if let Some(existing) = self.jobs.get_scheduler_job(&runtime.id)? {
|
||
preserve_persisted_runtime(&mut upsert, &existing);
|
||
}
|
||
self.jobs.upsert_scheduler_job(&upsert)?;
|
||
}
|
||
Ok(())
|
||
}
|
||
|
||
async fn process_tick(&self) -> anyhow::Result<()> {
|
||
let now = Utc::now();
|
||
let jobs = self.jobs.list_scheduler_jobs(true)?;
|
||
|
||
for record in jobs {
|
||
let Some(mut job) =
|
||
RuntimeJob::from_record(&record, self.config.misfire_policy, self.timezone)?
|
||
else {
|
||
continue;
|
||
};
|
||
|
||
if record.next_fire_at.is_none() && job.next_fire_at.is_some() {
|
||
self.jobs.update_scheduler_job_runtime(
|
||
&job.id,
|
||
job.state.clone(),
|
||
job.last_status.clone(),
|
||
job.last_error.as_deref(),
|
||
job.run_count,
|
||
job.last_fired_at,
|
||
job.next_fire_at,
|
||
job.paused_at,
|
||
job.completed_at,
|
||
)?;
|
||
}
|
||
|
||
if !job.is_due(now) {
|
||
continue;
|
||
}
|
||
|
||
self.jobs.update_scheduler_job_runtime(
|
||
&job.id,
|
||
SchedulerJobState::Running,
|
||
job.last_status.clone(),
|
||
job.last_error.as_deref(),
|
||
job.run_count,
|
||
job.last_fired_at,
|
||
job.next_fire_at,
|
||
job.paused_at,
|
||
job.completed_at,
|
||
)?;
|
||
|
||
let execution_result = self.execute_job(&job).await;
|
||
job.after_execution(
|
||
now,
|
||
execution_result.as_ref().err().map(|err| err.to_string()),
|
||
self.config.misfire_policy,
|
||
self.timezone,
|
||
)?;
|
||
|
||
let status = if execution_result.is_ok() {
|
||
Some(SchedulerJobStatus::Ok)
|
||
} else {
|
||
Some(SchedulerJobStatus::Error)
|
||
};
|
||
|
||
if let Err(error) = &execution_result {
|
||
tracing::error!(job_id = %job.id, error = %error, "Scheduler job failed");
|
||
}
|
||
|
||
self.jobs.update_scheduler_job_runtime(
|
||
&job.id,
|
||
job.state.clone(),
|
||
status,
|
||
job.last_error.as_deref(),
|
||
job.run_count,
|
||
job.last_fired_at,
|
||
job.next_fire_at,
|
||
job.paused_at,
|
||
job.completed_at,
|
||
)?;
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn execute_job(&self, job: &RuntimeJob) -> anyhow::Result<()> {
|
||
match job.kind {
|
||
SchedulerJobKind::OutboundMessage => {
|
||
let message = build_outbound_message(job)?;
|
||
self.bus.publish_outbound(message).await?;
|
||
}
|
||
SchedulerJobKind::InternalEvent => {
|
||
execute_internal_event(self.maintenance_executor.as_ref(), job).await?;
|
||
}
|
||
SchedulerJobKind::AgentTask => {
|
||
let outbound_messages = execute_agent_task(
|
||
self.agent_task_executor.as_ref(),
|
||
job,
|
||
required_notification_chat_id(job, "agent_task")?,
|
||
)
|
||
.await?;
|
||
for message in outbound_messages {
|
||
self.bus.publish_outbound(message).await?;
|
||
}
|
||
}
|
||
SchedulerJobKind::SilentAgentTask => {
|
||
let execution_chat_id = resolve_execution_chat_id(job)?;
|
||
if let Err(error) =
|
||
execute_agent_task(self.agent_task_executor.as_ref(), job, &execution_chat_id)
|
||
.await
|
||
{
|
||
if let Err(notify_error) =
|
||
self.notify_silent_agent_task_failure(job, &error).await
|
||
{
|
||
tracing::error!(
|
||
job_id = %job.id,
|
||
error = %notify_error,
|
||
"Failed to publish silent scheduler failure notification"
|
||
);
|
||
}
|
||
return Err(error);
|
||
}
|
||
}
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
|
||
async fn notify_silent_agent_task_failure(
|
||
&self,
|
||
job: &RuntimeJob,
|
||
error: &anyhow::Error,
|
||
) -> anyhow::Result<()> {
|
||
let channel = job
|
||
.target
|
||
.channel
|
||
.clone()
|
||
.ok_or_else(|| anyhow::anyhow!("silent_agent_task requires target.channel"))?;
|
||
let chat_id = required_notification_chat_id(job, "silent_agent_task")?.to_string();
|
||
|
||
let mut metadata = HashMap::new();
|
||
metadata.insert("scheduler_job_id".to_string(), job.id.clone());
|
||
metadata.insert(
|
||
"scheduler_job_kind".to_string(),
|
||
"silent_agent_task".to_string(),
|
||
);
|
||
|
||
self.bus
|
||
.publish_outbound(OutboundMessage::error_notification(
|
||
channel,
|
||
chat_id,
|
||
format!(
|
||
"定时任务执行失败:{}\n{}",
|
||
job.id,
|
||
summarize_scheduler_error(error)
|
||
),
|
||
job.target.reply_to.clone(),
|
||
metadata,
|
||
))
|
||
.await
|
||
.map_err(|error| anyhow::anyhow!(error.to_string()))
|
||
}
|
||
}
|
||
|
||
fn preserve_persisted_runtime(input: &mut SchedulerJobUpsert, existing: &SchedulerJobRecord) {
|
||
if !scheduler_job_definition_matches(input, existing) {
|
||
return;
|
||
}
|
||
|
||
input.last_status = existing.last_status.clone();
|
||
input.last_error = existing.last_error.clone();
|
||
input.run_count = existing.run_count;
|
||
input.max_runs = existing.max_runs;
|
||
input.last_fired_at = existing.last_fired_at;
|
||
input.next_fire_at = existing.next_fire_at;
|
||
|
||
if existing.state == SchedulerJobState::Running {
|
||
input.state = SchedulerJobState::Scheduled;
|
||
input.paused_at = None;
|
||
input.completed_at = None;
|
||
return;
|
||
}
|
||
|
||
input.state = existing.state.clone();
|
||
input.paused_at = existing.paused_at;
|
||
input.completed_at = existing.completed_at;
|
||
}
|
||
|
||
fn scheduler_job_definition_matches(
|
||
input: &SchedulerJobUpsert,
|
||
existing: &SchedulerJobRecord,
|
||
) -> bool {
|
||
let input_schedule = serde_json::from_value::<SchedulerSchedule>(input.schedule.clone()).ok();
|
||
let existing_schedule =
|
||
deserialize_schedule(&existing.schedule, existing.interval_secs, existing.startup_delay_secs)
|
||
.ok();
|
||
let input_target = serde_json::from_value::<SchedulerJobTarget>(input.target.clone()).ok();
|
||
let existing_target = serde_json::from_value::<SchedulerJobTarget>(existing.target.clone()).ok();
|
||
let targets_match = match (input_target, existing_target) {
|
||
(Some(input_target), Some(existing_target)) => {
|
||
input_target.channel == existing_target.channel
|
||
&& input_target.chat_id == existing_target.chat_id
|
||
&& input_target.session_chat_id == existing_target.session_chat_id
|
||
&& input_target.reply_to == existing_target.reply_to
|
||
}
|
||
(None, None) => true,
|
||
_ => false,
|
||
};
|
||
|
||
input.kind == existing.kind
|
||
&& input_schedule == existing_schedule
|
||
&& input.interval_secs == existing.interval_secs
|
||
&& input.startup_delay_secs == existing.startup_delay_secs
|
||
&& targets_match
|
||
&& input.payload == existing.payload
|
||
&& input.enabled == existing.enabled
|
||
}
|
||
|
||
#[derive(Debug, Clone)]
|
||
struct RuntimeJob {
|
||
id: String,
|
||
kind: SchedulerJobKind,
|
||
schedule: SchedulerSchedule,
|
||
target: SchedulerJobTarget,
|
||
payload: serde_json::Value,
|
||
enabled: bool,
|
||
state: SchedulerJobState,
|
||
last_status: Option<SchedulerJobStatus>,
|
||
last_error: Option<String>,
|
||
run_count: i64,
|
||
max_runs: Option<i64>,
|
||
last_fired_at: Option<i64>,
|
||
next_fire_at: Option<i64>,
|
||
paused_at: Option<i64>,
|
||
completed_at: Option<i64>,
|
||
interval_secs: i64,
|
||
startup_delay_secs: i64,
|
||
}
|
||
|
||
impl RuntimeJob {
|
||
fn from_config(
|
||
job: &SchedulerJobConfig,
|
||
now: DateTime<Utc>,
|
||
misfire_policy: SchedulerMisfirePolicy,
|
||
timezone: Tz,
|
||
) -> anyhow::Result<Self> {
|
||
let schedule = job.resolved_schedule()?;
|
||
let (interval_secs, startup_delay_secs) = match &schedule {
|
||
SchedulerSchedule::Interval {
|
||
seconds,
|
||
startup_delay_secs,
|
||
} => (*seconds as i64, *startup_delay_secs as i64),
|
||
_ => (0, 0),
|
||
};
|
||
let initial_state = if job.enabled {
|
||
SchedulerJobState::Scheduled
|
||
} else {
|
||
SchedulerJobState::Paused
|
||
};
|
||
let next_fire_at = if job.enabled {
|
||
compute_initial_next_fire_at(&schedule, now, None, misfire_policy, timezone)?
|
||
} else {
|
||
None
|
||
};
|
||
|
||
Ok(Self {
|
||
id: job.id.clone(),
|
||
kind: job.kind.clone(),
|
||
interval_secs,
|
||
startup_delay_secs,
|
||
schedule,
|
||
target: job.target.clone(),
|
||
payload: job.payload.clone(),
|
||
enabled: job.enabled,
|
||
state: initial_state,
|
||
last_status: None,
|
||
last_error: None,
|
||
run_count: 0,
|
||
max_runs: None,
|
||
last_fired_at: None,
|
||
next_fire_at,
|
||
paused_at: None,
|
||
completed_at: None,
|
||
})
|
||
}
|
||
|
||
fn from_record(
|
||
record: &SchedulerJobRecord,
|
||
misfire_policy: SchedulerMisfirePolicy,
|
||
timezone: Tz,
|
||
) -> anyhow::Result<Option<Self>> {
|
||
let kind = match record.kind.as_str() {
|
||
"internal_event" => SchedulerJobKind::InternalEvent,
|
||
"outbound_message" => SchedulerJobKind::OutboundMessage,
|
||
"agent_task" => SchedulerJobKind::AgentTask,
|
||
"silent_agent_task" => SchedulerJobKind::SilentAgentTask,
|
||
other => {
|
||
tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind");
|
||
return Ok(None);
|
||
}
|
||
};
|
||
|
||
let schedule = deserialize_schedule(
|
||
&record.schedule,
|
||
record.interval_secs,
|
||
record.startup_delay_secs,
|
||
)?;
|
||
let now = Utc::now();
|
||
let next_fire_at = match (record.enabled, record.state.clone(), record.next_fire_at) {
|
||
(false, _, _) => None,
|
||
(_, SchedulerJobState::Paused, _) => None,
|
||
(_, SchedulerJobState::Completed, _) => None,
|
||
(_, _, some_next) if some_next.is_some() => some_next,
|
||
_ => compute_initial_next_fire_at(
|
||
&schedule,
|
||
now,
|
||
record.last_fired_at,
|
||
misfire_policy,
|
||
timezone,
|
||
)?,
|
||
};
|
||
|
||
Ok(Some(Self {
|
||
id: record.id.clone(),
|
||
kind,
|
||
schedule,
|
||
target: record.target.clone().try_into()?,
|
||
payload: record.payload.clone(),
|
||
enabled: record.enabled,
|
||
state: record.state.clone(),
|
||
last_status: record.last_status.clone(),
|
||
last_error: record.last_error.clone(),
|
||
run_count: record.run_count,
|
||
max_runs: record.max_runs,
|
||
last_fired_at: record.last_fired_at,
|
||
next_fire_at,
|
||
paused_at: record.paused_at,
|
||
completed_at: record.completed_at,
|
||
interval_secs: record.interval_secs,
|
||
startup_delay_secs: record.startup_delay_secs,
|
||
}))
|
||
}
|
||
|
||
fn is_due(&self, now: DateTime<Utc>) -> bool {
|
||
self.enabled
|
||
&& self.state == SchedulerJobState::Scheduled
|
||
&& self
|
||
.next_fire_at
|
||
.map(|value| value <= now.timestamp_millis())
|
||
.unwrap_or(false)
|
||
}
|
||
|
||
fn after_execution(
|
||
&mut self,
|
||
now: DateTime<Utc>,
|
||
last_error: Option<String>,
|
||
misfire_policy: SchedulerMisfirePolicy,
|
||
timezone: Tz,
|
||
) -> anyhow::Result<()> {
|
||
self.run_count += 1;
|
||
self.last_fired_at = Some(now.timestamp_millis());
|
||
self.last_error = last_error;
|
||
|
||
if self.schedule.is_one_shot() {
|
||
self.state = SchedulerJobState::Completed;
|
||
self.next_fire_at = None;
|
||
self.completed_at = Some(now.timestamp_millis());
|
||
return Ok(());
|
||
}
|
||
|
||
if let Some(max_runs) = self.max_runs {
|
||
if self.run_count >= max_runs {
|
||
self.state = SchedulerJobState::Completed;
|
||
self.next_fire_at = None;
|
||
self.completed_at = Some(now.timestamp_millis());
|
||
return Ok(());
|
||
}
|
||
}
|
||
|
||
let reference_ms = self.next_fire_at.or(self.last_fired_at);
|
||
self.state = SchedulerJobState::Scheduled;
|
||
self.completed_at = None;
|
||
self.next_fire_at =
|
||
compute_next_fire_at(&self.schedule, now, reference_ms, misfire_policy, timezone)?;
|
||
Ok(())
|
||
}
|
||
|
||
fn to_upsert(&self) -> SchedulerJobUpsert {
|
||
SchedulerJobUpsert {
|
||
id: self.id.clone(),
|
||
kind: match self.kind {
|
||
SchedulerJobKind::InternalEvent => "internal_event".to_string(),
|
||
SchedulerJobKind::OutboundMessage => "outbound_message".to_string(),
|
||
SchedulerJobKind::AgentTask => "agent_task".to_string(),
|
||
SchedulerJobKind::SilentAgentTask => "silent_agent_task".to_string(),
|
||
},
|
||
schedule: serde_json::to_value(&self.schedule)
|
||
.unwrap_or_else(|_| serde_json::json!({})),
|
||
interval_secs: self.interval_secs,
|
||
startup_delay_secs: self.startup_delay_secs,
|
||
target: serde_json::to_value(&self.target).unwrap_or_else(|_| serde_json::json!({})),
|
||
payload: self.payload.clone(),
|
||
enabled: self.enabled,
|
||
state: self.state.clone(),
|
||
last_status: self.last_status.clone(),
|
||
last_error: self.last_error.clone(),
|
||
run_count: self.run_count,
|
||
max_runs: self.max_runs,
|
||
last_fired_at: self.last_fired_at,
|
||
next_fire_at: self.next_fire_at,
|
||
paused_at: self.paused_at,
|
||
completed_at: self.completed_at,
|
||
}
|
||
}
|
||
}
|
||
|
||
fn deserialize_schedule(
|
||
schedule_json: &serde_json::Value,
|
||
interval_secs: i64,
|
||
startup_delay_secs: i64,
|
||
) -> anyhow::Result<SchedulerSchedule> {
|
||
if !schedule_json.is_null() && schedule_json != &serde_json::json!({}) {
|
||
return Ok(serde_json::from_value(schedule_json.clone())?);
|
||
}
|
||
|
||
if interval_secs > 0 {
|
||
return Ok(SchedulerSchedule::Interval {
|
||
seconds: interval_secs as u64,
|
||
startup_delay_secs: startup_delay_secs as u64,
|
||
});
|
||
}
|
||
|
||
anyhow::bail!("scheduler job is missing schedule definition")
|
||
}
|
||
|
||
fn compute_initial_next_fire_at(
|
||
schedule: &SchedulerSchedule,
|
||
now: DateTime<Utc>,
|
||
last_fired_at: Option<i64>,
|
||
misfire_policy: SchedulerMisfirePolicy,
|
||
timezone: Tz,
|
||
) -> anyhow::Result<Option<i64>> {
|
||
match last_fired_at {
|
||
Some(last_fired_at) => {
|
||
compute_next_fire_at(schedule, now, Some(last_fired_at), misfire_policy, timezone)
|
||
}
|
||
None => match schedule {
|
||
SchedulerSchedule::Delay { seconds } => Ok(Some(
|
||
(now + ChronoDuration::seconds(*seconds as i64)).timestamp_millis(),
|
||
)),
|
||
SchedulerSchedule::Interval {
|
||
seconds,
|
||
startup_delay_secs,
|
||
} => {
|
||
let delay = if *startup_delay_secs > 0 {
|
||
*startup_delay_secs
|
||
} else {
|
||
*seconds
|
||
};
|
||
Ok(Some(
|
||
(now + ChronoDuration::seconds(delay as i64)).timestamp_millis(),
|
||
))
|
||
}
|
||
SchedulerSchedule::At { timestamp } => {
|
||
Ok(Some(parse_rfc3339_to_utc(timestamp)?.timestamp_millis()))
|
||
}
|
||
SchedulerSchedule::Cron { expression } => {
|
||
let schedule = parse_scheduler_cron(expression)?;
|
||
let local_now = now.with_timezone(&timezone);
|
||
Ok(schedule
|
||
.after(&local_now)
|
||
.next()
|
||
.map(|next| next.with_timezone(&Utc).timestamp_millis()))
|
||
}
|
||
},
|
||
}
|
||
}
|
||
|
||
fn compute_next_fire_at(
|
||
schedule: &SchedulerSchedule,
|
||
now: DateTime<Utc>,
|
||
reference_ms: Option<i64>,
|
||
misfire_policy: SchedulerMisfirePolicy,
|
||
timezone: Tz,
|
||
) -> anyhow::Result<Option<i64>> {
|
||
match schedule {
|
||
SchedulerSchedule::Delay { .. } | SchedulerSchedule::At { .. } => Ok(None),
|
||
SchedulerSchedule::Interval { seconds, .. } => {
|
||
let interval_ms = (*seconds as i64) * 1_000;
|
||
let baseline = reference_ms.unwrap_or_else(|| now.timestamp_millis());
|
||
let next_ms = match misfire_policy {
|
||
SchedulerMisfirePolicy::Skip => now.timestamp_millis() + interval_ms,
|
||
SchedulerMisfirePolicy::CatchUp => {
|
||
let mut candidate = baseline + interval_ms;
|
||
while candidate <= now.timestamp_millis() {
|
||
candidate += interval_ms;
|
||
}
|
||
candidate
|
||
}
|
||
};
|
||
Ok(Some(next_ms))
|
||
}
|
||
SchedulerSchedule::Cron { expression } => {
|
||
let schedule = parse_scheduler_cron(expression)?;
|
||
let anchor = match misfire_policy {
|
||
SchedulerMisfirePolicy::Skip => now.with_timezone(&timezone),
|
||
SchedulerMisfirePolicy::CatchUp => reference_ms
|
||
.and_then(ts_millis_to_utc)
|
||
.map(|value| value.with_timezone(&timezone))
|
||
.unwrap_or_else(|| now.with_timezone(&timezone)),
|
||
};
|
||
Ok(schedule
|
||
.after(&anchor)
|
||
.next()
|
||
.map(|next| next.with_timezone(&Utc).timestamp_millis()))
|
||
}
|
||
}
|
||
}
|
||
|
||
fn parse_rfc3339_to_utc(value: &str) -> anyhow::Result<DateTime<Utc>> {
|
||
Ok(DateTime::parse_from_rfc3339(value)?.with_timezone(&Utc))
|
||
}
|
||
|
||
fn parse_scheduler_cron(expression: &str) -> anyhow::Result<cron::Schedule> {
|
||
let normalized = normalize_cron_expression(expression);
|
||
Ok(cron::Schedule::from_str(&normalized)?)
|
||
}
|
||
|
||
fn normalize_cron_expression(expression: &str) -> String {
|
||
let parts: Vec<&str> = expression.split_whitespace().collect();
|
||
let expression_with_seconds = if parts.len() == 5 {
|
||
format!("0 {}", expression.trim())
|
||
} else {
|
||
expression.trim().to_string()
|
||
};
|
||
|
||
// 转换星期字段为标准 cron 到 cron crate 格式
|
||
// 标准 cron: 0=周日, 1=周一, ..., 6=周六, 7=周日
|
||
// cron crate: 1=周日, 2=周一, ..., 6=周五, 7=周六
|
||
convert_weekday_field(&expression_with_seconds)
|
||
}
|
||
|
||
/// 将标准 cron 的星期字段转换为 cron crate 格式
|
||
/// 标准: 0/7=周日, 1=周一, 2=周二, 3=周三, 4=周四, 5=周五, 6=周六
|
||
/// crate: 1=周日, 2=周一, 3=周二, 4=周三, 5=周四, 6=周五, 7=周六
|
||
fn convert_weekday_field(expression: &str) -> String {
|
||
let parts: Vec<&str> = expression.split_whitespace().collect();
|
||
if parts.len() != 6 {
|
||
return expression.to_string();
|
||
}
|
||
|
||
let weekday_field = parts[5];
|
||
let converted = convert_cron_weekday(weekday_field);
|
||
|
||
format!("{} {} {} {} {} {}", parts[0], parts[1], parts[2], parts[3], parts[4], converted)
|
||
}
|
||
|
||
/// 转换星期表达式中的数字
|
||
fn convert_cron_weekday(field: &str) -> String {
|
||
if field == "*" || field == "?" {
|
||
return field.to_string();
|
||
}
|
||
|
||
// 处理列表(逗号分隔)
|
||
let items: Vec<&str> = field.split(',').collect();
|
||
let converted_items: Vec<String> = items.iter().map(|item| {
|
||
convert_weekday_item(item.trim())
|
||
}).collect();
|
||
|
||
converted_items.join(",")
|
||
}
|
||
|
||
/// 转换单个星期项(可能是范围、步长或单个值)
|
||
fn convert_weekday_item(item: &str) -> String {
|
||
// 检查是否有步长
|
||
if let Some(pos) = item.find('/') {
|
||
let base = &item[..pos];
|
||
let step = &item[pos + 1..];
|
||
let converted_base = convert_weekday_range_or_value(base);
|
||
return format!("{}/{}", converted_base, step);
|
||
}
|
||
|
||
// 检查是否是范围
|
||
if item.contains('-') {
|
||
return convert_weekday_range_or_value(item);
|
||
}
|
||
|
||
// 单个值
|
||
convert_single_weekday(item)
|
||
}
|
||
|
||
/// 转换范围或单个值
|
||
fn convert_weekday_range_or_value(item: &str) -> String {
|
||
let parts: Vec<&str> = item.split('-').collect();
|
||
if parts.len() == 2 {
|
||
let start = convert_single_weekday(parts[0].trim());
|
||
let end = convert_single_weekday(parts[1].trim());
|
||
format!("{}-{}", start, end)
|
||
} else {
|
||
convert_single_weekday(item)
|
||
}
|
||
}
|
||
|
||
/// 转换单个星期数字
|
||
fn convert_single_weekday(day: &str) -> String {
|
||
match day {
|
||
"0" | "7" => "1".to_string(), // 周日 -> 1
|
||
"1" => "2".to_string(), // 周一 -> 2
|
||
"2" => "3".to_string(), // 周二 -> 3
|
||
"3" => "4".to_string(), // 周三 -> 4
|
||
"4" => "5".to_string(), // 周四 -> 5
|
||
"5" => "6".to_string(), // 周五 -> 6
|
||
"6" => "7".to_string(), // 周六 -> 7
|
||
_ => day.to_string(), // 其他(如字母)保持不变
|
||
}
|
||
}
|
||
|
||
fn ts_millis_to_utc(value: i64) -> Option<DateTime<Utc>> {
|
||
Utc.timestamp_millis_opt(value).single()
|
||
}
|
||
|
||
fn build_outbound_message(job: &RuntimeJob) -> anyhow::Result<OutboundMessage> {
|
||
let channel = job
|
||
.target
|
||
.channel
|
||
.clone()
|
||
.ok_or_else(|| anyhow::anyhow!("outbound scheduler job requires target.channel"))?;
|
||
let chat_id = job
|
||
.target
|
||
.chat_id
|
||
.clone()
|
||
.ok_or_else(|| anyhow::anyhow!("outbound scheduler job requires target.chat_id"))?;
|
||
let content = job
|
||
.payload
|
||
.get("content")
|
||
.and_then(|value| value.as_str())
|
||
.ok_or_else(|| {
|
||
anyhow::anyhow!("outbound scheduler job payload.content must be a string")
|
||
})?;
|
||
|
||
let mut metadata = HashMap::new();
|
||
metadata.insert("scheduler_job_id".to_string(), job.id.clone());
|
||
|
||
Ok(OutboundMessage::scheduler_notification(
|
||
channel,
|
||
chat_id,
|
||
content.to_string(),
|
||
job.target.reply_to.clone(),
|
||
metadata,
|
||
))
|
||
}
|
||
|
||
async fn execute_internal_event(
|
||
maintenance_executor: &dyn MaintenanceExecutor,
|
||
job: &RuntimeJob,
|
||
) -> anyhow::Result<()> {
|
||
let event = job
|
||
.payload
|
||
.get("event")
|
||
.and_then(|value| value.as_str())
|
||
.unwrap_or("session_cleanup");
|
||
|
||
match event {
|
||
"session_cleanup" => {
|
||
let removed = maintenance_executor.cleanup_expired_sessions().await;
|
||
tracing::info!(job_id = %job.id, removed, "Scheduler session cleanup completed");
|
||
Ok(())
|
||
}
|
||
"memory_maintenance" => {
|
||
let results = maintenance_executor.run_memory_maintenance_for_all_scopes().await?;
|
||
for result in &results {
|
||
tracing::info!(
|
||
job_id = %job.id,
|
||
scope_key = %result.scope_key,
|
||
merges = result.merges,
|
||
conflicts = result.conflicts,
|
||
low_value = result.low_value,
|
||
"Scheduler completed memory maintenance model run"
|
||
);
|
||
}
|
||
tracing::info!(job_id = %job.id, scope_count = results.len(), "Scheduler memory maintenance triggered");
|
||
Ok(())
|
||
}
|
||
other => anyhow::bail!("unsupported internal scheduler event: {}", other),
|
||
}
|
||
}
|
||
|
||
async fn execute_agent_task(
|
||
agent_task_executor: &dyn AgentTaskExecutor,
|
||
job: &RuntimeJob,
|
||
execution_chat_id: &str,
|
||
) -> anyhow::Result<Vec<OutboundMessage>> {
|
||
let channel_name = job
|
||
.target
|
||
.channel
|
||
.as_deref()
|
||
.ok_or_else(|| anyhow::anyhow!("scheduled agent task requires target.channel"))?;
|
||
let prompt = job
|
||
.payload
|
||
.get("prompt")
|
||
.and_then(|value| value.as_str())
|
||
.ok_or_else(|| anyhow::anyhow!("agent_task payload.prompt must be a string"))?;
|
||
let options = parse_scheduled_agent_task_options(job)?;
|
||
|
||
agent_task_executor
|
||
.execute(channel_name, execution_chat_id, prompt, options)
|
||
.await
|
||
}
|
||
|
||
fn required_notification_chat_id<'a>(
|
||
job: &'a RuntimeJob,
|
||
kind_name: &str,
|
||
) -> anyhow::Result<&'a str> {
|
||
job.target
|
||
.chat_id
|
||
.as_deref()
|
||
.ok_or_else(|| anyhow::anyhow!("{} requires target.chat_id", kind_name))
|
||
}
|
||
|
||
fn resolve_execution_chat_id(job: &RuntimeJob) -> anyhow::Result<String> {
|
||
match job.kind {
|
||
SchedulerJobKind::AgentTask => {
|
||
Ok(required_notification_chat_id(job, "agent_task")?.to_string())
|
||
}
|
||
SchedulerJobKind::SilentAgentTask => Ok(job
|
||
.target
|
||
.session_chat_id
|
||
.clone()
|
||
.unwrap_or_else(|| derive_silent_session_chat_id(&job.id))),
|
||
_ => anyhow::bail!("execution chat id is only supported for agent task kinds"),
|
||
}
|
||
}
|
||
|
||
fn derive_silent_session_chat_id(job_id: &str) -> String {
|
||
format!("scheduler/{}", job_id)
|
||
}
|
||
|
||
fn summarize_scheduler_error(error: &anyhow::Error) -> String {
|
||
let text = error.to_string().replace('\n', " ");
|
||
const MAX_LEN: usize = 240;
|
||
if text.chars().count() <= MAX_LEN {
|
||
text
|
||
} else {
|
||
let summary = text.chars().take(MAX_LEN).collect::<String>();
|
||
format!("{}...", summary)
|
||
}
|
||
}
|
||
|
||
fn parse_scheduled_agent_task_options(
|
||
job: &RuntimeJob,
|
||
) -> anyhow::Result<ScheduledAgentTaskOptions> {
|
||
let sender_id = job
|
||
.payload
|
||
.get("sender_id")
|
||
.and_then(|value| value.as_str())
|
||
.map(ToString::to_string);
|
||
let fresh_session = job
|
||
.payload
|
||
.get("fresh_session")
|
||
.and_then(|value| value.as_bool())
|
||
.unwrap_or(false);
|
||
let system_prompt = job
|
||
.payload
|
||
.get("system_prompt")
|
||
.and_then(|value| value.as_str())
|
||
.map(ToString::to_string);
|
||
let agent = job
|
||
.payload
|
||
.get("agent")
|
||
.and_then(|value| value.as_str())
|
||
.map(ToString::to_string);
|
||
let metadata = parse_metadata_map(job.payload.get("metadata"))?;
|
||
|
||
Ok(ScheduledAgentTaskOptions {
|
||
sender_id,
|
||
fresh_session,
|
||
system_prompt,
|
||
metadata,
|
||
agent,
|
||
})
|
||
}
|
||
|
||
fn parse_metadata_map(
|
||
value: Option<&serde_json::Value>,
|
||
) -> anyhow::Result<HashMap<String, String>> {
|
||
let Some(value) = value else {
|
||
return Ok(HashMap::new());
|
||
};
|
||
|
||
let object = value
|
||
.as_object()
|
||
.ok_or_else(|| anyhow::anyhow!("agent_task payload.metadata must be an object"))?;
|
||
let mut metadata = HashMap::with_capacity(object.len());
|
||
|
||
for (key, value) in object {
|
||
let stringified = match value {
|
||
serde_json::Value::String(inner) => inner.clone(),
|
||
serde_json::Value::Null => "null".to_string(),
|
||
serde_json::Value::Bool(inner) => inner.to_string(),
|
||
serde_json::Value::Number(inner) => inner.to_string(),
|
||
_ => {
|
||
return Err(anyhow::anyhow!(
|
||
"agent_task payload.metadata field '{}' must be a string, number, bool, or null",
|
||
key
|
||
));
|
||
}
|
||
};
|
||
metadata.insert(key.clone(), stringified);
|
||
}
|
||
|
||
Ok(metadata)
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod agent_task_tests {
|
||
use super::*;
|
||
|
||
#[test]
|
||
fn runtime_job_from_record_supports_agent_task_kind() {
|
||
let record = SchedulerJobRecord {
|
||
id: "agent.daily_summary".to_string(),
|
||
kind: "agent_task".to_string(),
|
||
schedule: serde_json::json!({
|
||
"type": "interval",
|
||
"seconds": 300
|
||
}),
|
||
interval_secs: 0,
|
||
startup_delay_secs: 0,
|
||
target: serde_json::json!({
|
||
"channel": "test-channel",
|
||
"chat_id": "oc_demo"
|
||
}),
|
||
payload: serde_json::json!({
|
||
"prompt": "请总结今天待办"
|
||
}),
|
||
enabled: true,
|
||
state: SchedulerJobState::Scheduled,
|
||
last_status: None,
|
||
last_error: None,
|
||
run_count: 0,
|
||
max_runs: None,
|
||
last_fired_at: None,
|
||
next_fire_at: Some(1_700_000_010_000),
|
||
paused_at: None,
|
||
completed_at: None,
|
||
created_at: 1_700_000_000_000,
|
||
updated_at: 1_700_000_000_000,
|
||
};
|
||
|
||
let job = RuntimeJob::from_record(
|
||
&record,
|
||
SchedulerMisfirePolicy::Skip,
|
||
chrono_tz::Asia::Shanghai,
|
||
)
|
||
.unwrap()
|
||
.unwrap();
|
||
|
||
assert_eq!(job.kind, SchedulerJobKind::AgentTask);
|
||
assert_eq!(
|
||
job.payload.get("prompt").and_then(|value| value.as_str()),
|
||
Some("请总结今天待办")
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn runtime_job_from_record_supports_silent_agent_task_kind() {
|
||
let record = SchedulerJobRecord {
|
||
id: "agent.daily_summary.background".to_string(),
|
||
kind: "silent_agent_task".to_string(),
|
||
schedule: serde_json::json!({
|
||
"type": "interval",
|
||
"seconds": 300
|
||
}),
|
||
interval_secs: 0,
|
||
startup_delay_secs: 0,
|
||
target: serde_json::json!({
|
||
"channel": "test-channel",
|
||
"chat_id": "oc_demo",
|
||
"session_chat_id": "scheduler/agent.daily_summary.background"
|
||
}),
|
||
payload: serde_json::json!({
|
||
"prompt": "请后台整理今天待办"
|
||
}),
|
||
enabled: true,
|
||
state: SchedulerJobState::Scheduled,
|
||
last_status: None,
|
||
last_error: None,
|
||
run_count: 0,
|
||
max_runs: None,
|
||
last_fired_at: None,
|
||
next_fire_at: Some(1_700_000_010_000),
|
||
paused_at: None,
|
||
completed_at: None,
|
||
created_at: 1_700_000_000_000,
|
||
updated_at: 1_700_000_000_000,
|
||
};
|
||
|
||
let job = RuntimeJob::from_record(
|
||
&record,
|
||
SchedulerMisfirePolicy::Skip,
|
||
chrono_tz::Asia::Shanghai,
|
||
)
|
||
.unwrap()
|
||
.unwrap();
|
||
|
||
assert_eq!(job.kind, SchedulerJobKind::SilentAgentTask);
|
||
assert_eq!(
|
||
job.target.session_chat_id.as_deref(),
|
||
Some("scheduler/agent.daily_summary.background")
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() {
|
||
let job = RuntimeJob {
|
||
id: "agent.daily_summary".to_string(),
|
||
kind: SchedulerJobKind::AgentTask,
|
||
schedule: SchedulerSchedule::Interval {
|
||
seconds: 300,
|
||
startup_delay_secs: 0,
|
||
},
|
||
target: SchedulerJobTarget {
|
||
channel: Some("test-channel".to_string()),
|
||
chat_id: Some("oc_demo".to_string()),
|
||
session_chat_id: None,
|
||
reply_to: None,
|
||
},
|
||
payload: serde_json::json!({
|
||
"prompt": "请总结今天待办",
|
||
"agent": "planner",
|
||
"sender_id": "scheduler-bot",
|
||
"fresh_session": true,
|
||
"system_prompt": "你是日报助手",
|
||
"metadata": {
|
||
"job_type": "daily_summary",
|
||
"priority": 1,
|
||
"urgent": false
|
||
}
|
||
}),
|
||
enabled: true,
|
||
state: SchedulerJobState::Scheduled,
|
||
last_status: None,
|
||
last_error: None,
|
||
run_count: 0,
|
||
max_runs: None,
|
||
last_fired_at: None,
|
||
next_fire_at: None,
|
||
paused_at: None,
|
||
completed_at: None,
|
||
interval_secs: 300,
|
||
startup_delay_secs: 0,
|
||
};
|
||
|
||
let options = parse_scheduled_agent_task_options(&job).unwrap();
|
||
assert_eq!(options.agent.as_deref(), Some("planner"));
|
||
assert_eq!(options.sender_id.as_deref(), Some("scheduler-bot"));
|
||
assert!(options.fresh_session);
|
||
assert_eq!(options.system_prompt.as_deref(), Some("你是日报助手"));
|
||
assert_eq!(
|
||
options.metadata.get("job_type").map(String::as_str),
|
||
Some("daily_summary")
|
||
);
|
||
assert_eq!(
|
||
options.metadata.get("priority").map(String::as_str),
|
||
Some("1")
|
||
);
|
||
assert_eq!(
|
||
options.metadata.get("urgent").map(String::as_str),
|
||
Some("false")
|
||
);
|
||
}
|
||
|
||
#[test]
|
||
fn resolve_execution_chat_id_uses_dedicated_session_for_silent_agent_tasks() {
|
||
let job = RuntimeJob {
|
||
id: "agent.daily_summary.background".to_string(),
|
||
kind: SchedulerJobKind::SilentAgentTask,
|
||
schedule: SchedulerSchedule::Interval {
|
||
seconds: 300,
|
||
startup_delay_secs: 0,
|
||
},
|
||
target: SchedulerJobTarget {
|
||
channel: Some("test-channel".to_string()),
|
||
chat_id: Some("oc_demo".to_string()),
|
||
session_chat_id: None,
|
||
reply_to: None,
|
||
},
|
||
payload: serde_json::json!({
|
||
"prompt": "请后台整理今天待办"
|
||
}),
|
||
enabled: true,
|
||
state: SchedulerJobState::Scheduled,
|
||
last_status: None,
|
||
last_error: None,
|
||
run_count: 0,
|
||
max_runs: None,
|
||
last_fired_at: None,
|
||
next_fire_at: None,
|
||
paused_at: None,
|
||
completed_at: None,
|
||
interval_secs: 300,
|
||
startup_delay_secs: 0,
|
||
};
|
||
|
||
assert_eq!(
|
||
resolve_execution_chat_id(&job).unwrap(),
|
||
"scheduler/agent.daily_summary.background"
|
||
);
|
||
}
|
||
}
|
||
|
||
impl TryFrom<serde_json::Value> for SchedulerJobTarget {
|
||
type Error = anyhow::Error;
|
||
|
||
fn try_from(value: serde_json::Value) -> Result<Self, Self::Error> {
|
||
Ok(serde_json::from_value(value)?)
|
||
}
|
||
}
|
||
|
||
#[cfg(test)]
|
||
mod tests {
|
||
use super::*;
|
||
use chrono::{Datelike, Timelike};
|
||
use crate::bus::MessageBus;
|
||
use crate::config::BUILTIN_MEMORY_MAINTENANCE_JOB_ID;
|
||
use crate::storage::{SchedulerJobUpsert, SessionStore};
|
||
|
||
#[derive(Clone)]
|
||
struct TestAgentTaskExecutor;
|
||
|
||
#[async_trait::async_trait]
|
||
impl AgentTaskExecutor for TestAgentTaskExecutor {
|
||
async fn execute(
|
||
&self,
|
||
_channel_name: &str,
|
||
_chat_id: &str,
|
||
_prompt: &str,
|
||
_options: ScheduledAgentTaskOptions,
|
||
) -> anyhow::Result<Vec<OutboundMessage>> {
|
||
Ok(Vec::new())
|
||
}
|
||
}
|
||
|
||
#[derive(Clone)]
|
||
struct TestMaintenanceExecutor;
|
||
|
||
#[async_trait::async_trait]
|
||
impl MaintenanceExecutor for TestMaintenanceExecutor {
|
||
async fn cleanup_expired_sessions(&self) -> usize {
|
||
0
|
||
}
|
||
|
||
async fn run_memory_maintenance_for_all_scopes(&self) -> anyhow::Result<Vec<MaintenanceRunSummary>> {
|
||
Ok(Vec::new())
|
||
}
|
||
}
|
||
|
||
fn test_scheduler_services() -> (TestAgentTaskExecutor, TestMaintenanceExecutor) {
|
||
(TestAgentTaskExecutor, TestMaintenanceExecutor)
|
||
}
|
||
|
||
#[test]
|
||
fn runtime_job_skip_policy_advances_from_now() {
|
||
let now = Utc
|
||
.timestamp_millis_opt(1_700_000_000_000)
|
||
.single()
|
||
.unwrap();
|
||
let next = compute_next_fire_at(
|
||
&SchedulerSchedule::Interval {
|
||
seconds: 60,
|
||
startup_delay_secs: 0,
|
||
},
|
||
now,
|
||
Some(now.timestamp_millis() - 10 * 60 * 1_000),
|
||
SchedulerMisfirePolicy::Skip,
|
||
chrono_tz::Asia::Shanghai,
|
||
)
|
||
.unwrap()
|
||
.unwrap();
|
||
|
||
assert_eq!(next, now.timestamp_millis() + 60_000);
|
||
}
|
||
|
||
#[test]
|
||
fn runtime_job_catch_up_policy_moves_past_now() {
|
||
let now = Utc
|
||
.timestamp_millis_opt(1_700_000_000_000)
|
||
.single()
|
||
.unwrap();
|
||
let next = compute_next_fire_at(
|
||
&SchedulerSchedule::Interval {
|
||
seconds: 60,
|
||
startup_delay_secs: 0,
|
||
},
|
||
now,
|
||
Some(now.timestamp_millis() - 10 * 60 * 1_000),
|
||
SchedulerMisfirePolicy::CatchUp,
|
||
chrono_tz::Asia::Shanghai,
|
||
)
|
||
.unwrap()
|
||
.unwrap();
|
||
|
||
assert!(next > now.timestamp_millis());
|
||
assert_eq!((next - now.timestamp_millis()) % 60_000, 0);
|
||
}
|
||
|
||
#[test]
|
||
fn runtime_job_from_record_uses_persisted_schedule() {
|
||
let record = SchedulerJobRecord {
|
||
id: "heartbeat".to_string(),
|
||
kind: "outbound_message".to_string(),
|
||
schedule: serde_json::json!({
|
||
"type": "interval",
|
||
"seconds": 120,
|
||
"startup_delay_secs": 10
|
||
}),
|
||
interval_secs: 0,
|
||
startup_delay_secs: 0,
|
||
target: serde_json::json!({
|
||
"channel": "test-channel",
|
||
"chat_id": "oc_demo"
|
||
}),
|
||
payload: serde_json::json!({"content": "hello"}),
|
||
enabled: true,
|
||
state: SchedulerJobState::Scheduled,
|
||
last_status: None,
|
||
last_error: None,
|
||
run_count: 0,
|
||
max_runs: None,
|
||
last_fired_at: None,
|
||
next_fire_at: Some(1_700_000_010_000),
|
||
paused_at: None,
|
||
completed_at: None,
|
||
created_at: 1_700_000_000_000,
|
||
updated_at: 1_700_000_000_000,
|
||
};
|
||
|
||
let job = RuntimeJob::from_record(
|
||
&record,
|
||
SchedulerMisfirePolicy::Skip,
|
||
chrono_tz::Asia::Shanghai,
|
||
)
|
||
.unwrap()
|
||
.unwrap();
|
||
|
||
assert_eq!(
|
||
job.schedule,
|
||
SchedulerSchedule::Interval {
|
||
seconds: 120,
|
||
startup_delay_secs: 10,
|
||
}
|
||
);
|
||
assert_eq!(job.next_fire_at, Some(1_700_000_010_000));
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn process_tick_persists_initial_next_fire_at_for_db_created_jobs() {
|
||
let store = Arc::new(SessionStore::in_memory().unwrap());
|
||
store
|
||
.upsert_scheduler_job(&SchedulerJobUpsert {
|
||
id: "massage_reminder".to_string(),
|
||
kind: "outbound_message".to_string(),
|
||
schedule: serde_json::json!({
|
||
"type": "interval",
|
||
"seconds": 60
|
||
}),
|
||
interval_secs: 60,
|
||
startup_delay_secs: 0,
|
||
target: serde_json::json!({
|
||
"channel": "test-channel",
|
||
"chat_id": "oc_demo"
|
||
}),
|
||
payload: serde_json::json!({
|
||
"content": "ping"
|
||
}),
|
||
enabled: true,
|
||
state: SchedulerJobState::Scheduled,
|
||
last_status: None,
|
||
last_error: None,
|
||
run_count: 0,
|
||
max_runs: Some(1),
|
||
last_fired_at: None,
|
||
next_fire_at: None,
|
||
paused_at: None,
|
||
completed_at: None,
|
||
})
|
||
.unwrap();
|
||
|
||
let (agent_task_executor, maintenance_service) = test_scheduler_services();
|
||
let scheduler = Scheduler::new(
|
||
MessageBus::new(8),
|
||
SchedulerConfig {
|
||
enabled: true,
|
||
tick_resolution_ms: 1000,
|
||
worker_queue_capacity: 64,
|
||
misfire_policy: SchedulerMisfirePolicy::Skip,
|
||
jobs: Vec::new(),
|
||
},
|
||
chrono_tz::Asia::Shanghai,
|
||
store.clone(),
|
||
agent_task_executor,
|
||
maintenance_service,
|
||
);
|
||
|
||
scheduler.process_tick().await.unwrap();
|
||
|
||
let saved = store
|
||
.get_scheduler_job("massage_reminder")
|
||
.unwrap()
|
||
.unwrap();
|
||
assert!(saved.next_fire_at.is_some());
|
||
assert_eq!(saved.run_count, 0);
|
||
assert_eq!(saved.state, SchedulerJobState::Scheduled);
|
||
}
|
||
|
||
#[test]
|
||
fn sync_config_jobs_persists_builtin_memory_maintenance_job() {
|
||
let store = Arc::new(SessionStore::in_memory().unwrap());
|
||
|
||
let (agent_task_executor, maintenance_service) = test_scheduler_services();
|
||
let scheduler = Scheduler::new(
|
||
MessageBus::new(8),
|
||
SchedulerConfig::default(),
|
||
chrono_tz::Asia::Shanghai,
|
||
store.clone(),
|
||
agent_task_executor,
|
||
maintenance_service,
|
||
);
|
||
|
||
scheduler.sync_config_jobs().unwrap();
|
||
|
||
let saved = store
|
||
.get_scheduler_job(BUILTIN_MEMORY_MAINTENANCE_JOB_ID)
|
||
.unwrap()
|
||
.unwrap();
|
||
|
||
assert_eq!(saved.id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
|
||
assert_eq!(saved.kind, "internal_event");
|
||
assert!(saved.enabled);
|
||
assert_eq!(saved.state, SchedulerJobState::Scheduled);
|
||
assert_eq!(
|
||
saved.payload.get("event").and_then(|value| value.as_str()),
|
||
Some("memory_maintenance")
|
||
);
|
||
assert_eq!(
|
||
saved.schedule,
|
||
serde_json::json!({
|
||
"type": "cron",
|
||
"expression": "0 */4 * * *"
|
||
})
|
||
);
|
||
assert_eq!(
|
||
saved
|
||
.payload
|
||
.get("local_time")
|
||
.and_then(|value| value.as_str()),
|
||
Some("every_4_hours")
|
||
);
|
||
assert!(saved.next_fire_at.is_some());
|
||
}
|
||
|
||
#[test]
|
||
fn sync_config_jobs_preserves_persisted_next_fire_at_for_matching_jobs() {
|
||
let store = Arc::new(SessionStore::in_memory().unwrap());
|
||
let persisted_next_fire_at = 1_700_000_300_000;
|
||
let config_job = SchedulerJobConfig {
|
||
id: "agent.heartbeat".to_string(),
|
||
enabled: true,
|
||
kind: SchedulerJobKind::OutboundMessage,
|
||
schedule: Some(SchedulerSchedule::Interval {
|
||
seconds: 300,
|
||
startup_delay_secs: 0,
|
||
}),
|
||
startup_delay_secs: 0,
|
||
interval_secs: 0,
|
||
target: SchedulerJobTarget {
|
||
channel: Some("test-channel".to_string()),
|
||
chat_id: Some("oc_demo".to_string()),
|
||
session_chat_id: None,
|
||
reply_to: None,
|
||
},
|
||
payload: serde_json::json!({
|
||
"content": "ping"
|
||
}),
|
||
};
|
||
|
||
store
|
||
.upsert_scheduler_job(&SchedulerJobUpsert {
|
||
id: "agent.heartbeat".to_string(),
|
||
kind: "outbound_message".to_string(),
|
||
schedule: serde_json::json!({
|
||
"type": "interval",
|
||
"seconds": 300,
|
||
"startup_delay_secs": 0
|
||
}),
|
||
interval_secs: 300,
|
||
startup_delay_secs: 0,
|
||
target: serde_json::json!({
|
||
"channel": "test-channel",
|
||
"chat_id": "oc_demo"
|
||
}),
|
||
payload: serde_json::json!({
|
||
"content": "ping"
|
||
}),
|
||
enabled: true,
|
||
state: SchedulerJobState::Scheduled,
|
||
last_status: Some(SchedulerJobStatus::Ok),
|
||
last_error: None,
|
||
run_count: 3,
|
||
max_runs: None,
|
||
last_fired_at: Some(1_700_000_000_000),
|
||
next_fire_at: Some(persisted_next_fire_at),
|
||
paused_at: None,
|
||
completed_at: None,
|
||
})
|
||
.unwrap();
|
||
|
||
let probe_runtime = RuntimeJob::from_config(
|
||
&config_job,
|
||
Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap(),
|
||
SchedulerMisfirePolicy::Skip,
|
||
chrono_tz::Asia::Shanghai,
|
||
)
|
||
.unwrap();
|
||
let probe_existing = store
|
||
.get_scheduler_job("agent.heartbeat")
|
||
.unwrap()
|
||
.unwrap();
|
||
let probe_upsert = probe_runtime.to_upsert();
|
||
assert!(scheduler_job_definition_matches(&probe_upsert, &probe_existing));
|
||
|
||
let (agent_task_executor, maintenance_service) = test_scheduler_services();
|
||
let scheduler = Scheduler::new(
|
||
MessageBus::new(8),
|
||
SchedulerConfig {
|
||
enabled: true,
|
||
tick_resolution_ms: 1000,
|
||
worker_queue_capacity: 64,
|
||
misfire_policy: SchedulerMisfirePolicy::Skip,
|
||
jobs: vec![config_job],
|
||
},
|
||
chrono_tz::Asia::Shanghai,
|
||
store.clone(),
|
||
agent_task_executor,
|
||
maintenance_service,
|
||
);
|
||
|
||
scheduler.sync_config_jobs().unwrap();
|
||
|
||
let saved = store
|
||
.get_scheduler_job("agent.heartbeat")
|
||
.unwrap()
|
||
.unwrap();
|
||
|
||
assert_eq!(saved.next_fire_at, Some(persisted_next_fire_at));
|
||
assert_eq!(saved.run_count, 3);
|
||
assert_eq!(saved.last_status, Some(SchedulerJobStatus::Ok));
|
||
}
|
||
|
||
#[tokio::test]
|
||
async fn silent_agent_task_failure_notifies_primary_chat() {
|
||
let store = Arc::new(SessionStore::in_memory().unwrap());
|
||
let bus = MessageBus::new(8);
|
||
let (agent_task_executor, maintenance_service) = test_scheduler_services();
|
||
let scheduler = Scheduler::new(
|
||
bus.clone(),
|
||
SchedulerConfig {
|
||
enabled: true,
|
||
tick_resolution_ms: 1000,
|
||
worker_queue_capacity: 64,
|
||
misfire_policy: SchedulerMisfirePolicy::Skip,
|
||
jobs: Vec::new(),
|
||
},
|
||
chrono_tz::Asia::Shanghai,
|
||
store,
|
||
agent_task_executor,
|
||
maintenance_service,
|
||
);
|
||
|
||
let job = RuntimeJob {
|
||
id: "agent.daily_summary.background".to_string(),
|
||
kind: SchedulerJobKind::SilentAgentTask,
|
||
schedule: SchedulerSchedule::Interval {
|
||
seconds: 300,
|
||
startup_delay_secs: 0,
|
||
},
|
||
target: SchedulerJobTarget {
|
||
channel: Some("test-channel".to_string()),
|
||
chat_id: Some("oc_demo".to_string()),
|
||
session_chat_id: Some("scheduler/agent.daily_summary.background".to_string()),
|
||
reply_to: None,
|
||
},
|
||
payload: serde_json::json!({}),
|
||
enabled: true,
|
||
state: SchedulerJobState::Scheduled,
|
||
last_status: None,
|
||
last_error: None,
|
||
run_count: 0,
|
||
max_runs: None,
|
||
last_fired_at: None,
|
||
next_fire_at: None,
|
||
paused_at: None,
|
||
completed_at: None,
|
||
interval_secs: 300,
|
||
startup_delay_secs: 0,
|
||
};
|
||
|
||
let error = scheduler.execute_job(&job).await.unwrap_err();
|
||
assert!(error.to_string().contains("payload.prompt"));
|
||
|
||
let outbound = tokio::time::timeout(
|
||
std::time::Duration::from_millis(100),
|
||
bus.consume_outbound(),
|
||
)
|
||
.await
|
||
.unwrap();
|
||
assert_eq!(outbound.channel, "test-channel");
|
||
assert_eq!(outbound.chat_id, "oc_demo");
|
||
assert!(outbound.content.contains("定时任务执行失败"));
|
||
assert!(outbound.content.contains("agent.daily_summary.background"));
|
||
}
|
||
|
||
#[test]
|
||
fn cron_schedule_uses_configured_timezone() {
|
||
let now = Utc
|
||
.with_ymd_and_hms(2026, 4, 23, 18, 0, 0)
|
||
.single()
|
||
.unwrap();
|
||
let next = compute_next_fire_at(
|
||
&SchedulerSchedule::Cron {
|
||
expression: "0 3 * * *".to_string(),
|
||
},
|
||
now,
|
||
None,
|
||
SchedulerMisfirePolicy::Skip,
|
||
chrono_tz::Asia::Shanghai,
|
||
)
|
||
.unwrap()
|
||
.unwrap();
|
||
|
||
let next_utc = ts_millis_to_utc(next).unwrap();
|
||
assert_eq!(
|
||
next_utc,
|
||
Utc.with_ymd_and_hms(2026, 4, 23, 19, 0, 0)
|
||
.single()
|
||
.unwrap()
|
||
);
|
||
}
|
||
|
||
|
||
#[test]
|
||
fn debug_cron_weekday_definitions() {
|
||
// 重大发现:cron crate 的星期定义是反常规的!
|
||
// 1 = 周日, 2 = 周一, ..., 6 = 周五, 7 = 周六
|
||
// 0 是无效的!
|
||
let test_cases = vec![
|
||
("0 9 * * 1", "1=周日"),
|
||
("0 9 * * 2", "2=周一"),
|
||
("0 9 * * 3", "3=周二"),
|
||
("0 9 * * 4", "4=周三"),
|
||
("0 9 * * 5", "5=周四"),
|
||
("0 9 * * 6", "6=周五"),
|
||
("0 9 * * 7", "7=周六"),
|
||
("0 9 * * 1-5", "1-5=周日到周四"),
|
||
("0 9 * * 2-6", "2-6=周一到周五(这才是正确的工作日)"),
|
||
];
|
||
|
||
// 从周六(2026-04-25)开始测试
|
||
let saturday = Utc.with_ymd_and_hms(2026, 4, 25, 10, 0, 0).single().unwrap();
|
||
let shanghai_saturday = saturday.with_timezone(&chrono_tz::Asia::Shanghai);
|
||
println!("\n=== 从周六 {} ({:?}) 开始测试 ===", shanghai_saturday, shanghai_saturday.weekday());
|
||
|
||
for (expr, desc) in &test_cases {
|
||
let schedule = parse_scheduler_cron(expr).unwrap();
|
||
let next = schedule.after(&shanghai_saturday).next().unwrap();
|
||
println!("{}: 下次执行 {} (星期: {:?})", desc, next, next.weekday());
|
||
}
|
||
|
||
// 验证正确的工作日表达式
|
||
println!("\n=== 验证正确的工作日表达式 1-5(标准 cron)===");
|
||
let schedule_workday = parse_scheduler_cron("0 9 * * 1-5").unwrap();
|
||
|
||
let sat_next = schedule_workday.after(&shanghai_saturday).next().unwrap();
|
||
println!("周六 -> 1-5 下次执行: {} (星期: {:?})", sat_next, sat_next.weekday());
|
||
assert_eq!(sat_next.weekday(), chrono::Weekday::Mon, "1-5 应该从周六跳到周一");
|
||
|
||
// 从周日开始
|
||
let sunday = Utc.with_ymd_and_hms(2026, 4, 26, 10, 0, 0).single().unwrap();
|
||
let shanghai_sunday = sunday.with_timezone(&chrono_tz::Asia::Shanghai);
|
||
let sun_next = schedule_workday.after(&shanghai_sunday).next().unwrap();
|
||
println!("周日 -> 1-5 下次执行: {} (星期: {:?})", sun_next, sun_next.weekday());
|
||
assert_eq!(sun_next.weekday(), chrono::Weekday::Mon, "1-5 应该从周日跳到周一");
|
||
|
||
// 从周一早上7点开始
|
||
let shanghai_monday = chrono_tz::Asia::Shanghai.with_ymd_and_hms(2026, 4, 27, 7, 0, 0).single().unwrap();
|
||
println!("周一早上7点 -> 1-5 下次执行: {} (星期: {:?})",
|
||
schedule_workday.after(&shanghai_monday).next().unwrap(),
|
||
schedule_workday.after(&shanghai_monday).next().unwrap().weekday());
|
||
}
|
||
|
||
/// 测试标准 cron 星期转换功能
|
||
/// 现在可以使用标准 cron 语法:
|
||
/// - 0 或 7 = 周日
|
||
/// - 1 = 周一
|
||
/// - ...
|
||
/// - 6 = 周六
|
||
/// 工作日(周一到周五)应该使用 1-5
|
||
#[test]
|
||
fn standard_cron_weekday_conversion() {
|
||
// 测试:标准 cron 的 1-5 应该表示周一到周五
|
||
let saturday = Utc.with_ymd_and_hms(2026, 4, 25, 10, 0, 0).single().unwrap();
|
||
let shanghai_saturday = saturday.with_timezone(&chrono_tz::Asia::Shanghai);
|
||
|
||
// 现在使用标准 cron:1-5 表示周一到周五
|
||
let schedule_std = parse_scheduler_cron("0 9 * * 1-5").unwrap();
|
||
|
||
let sat_next = schedule_std.after(&shanghai_saturday).next().unwrap();
|
||
println!("周六 -> 标准 cron 1-5 下次执行: {} (星期: {:?})", sat_next, sat_next.weekday());
|
||
assert_eq!(sat_next.weekday(), chrono::Weekday::Mon, "标准 cron 1-5 应该从周六跳到周一");
|
||
|
||
// 从周日开始
|
||
let sunday = Utc.with_ymd_and_hms(2026, 4, 26, 10, 0, 0).single().unwrap();
|
||
let shanghai_sunday = sunday.with_timezone(&chrono_tz::Asia::Shanghai);
|
||
let sun_next = schedule_std.after(&shanghai_sunday).next().unwrap();
|
||
println!("周日 -> 标准 cron 1-5 下次执行: {} (星期: {:?})", sun_next, sun_next.weekday());
|
||
assert_eq!(sun_next.weekday(), chrono::Weekday::Mon, "标准 cron 1-5 应该从周日跳到周一");
|
||
|
||
// 从周一开始(上海时间周一早上7点)
|
||
let shanghai_monday = chrono_tz::Asia::Shanghai.with_ymd_and_hms(2026, 4, 27, 7, 0, 0).single().unwrap();
|
||
let mon_next = schedule_std.after(&shanghai_monday).next().unwrap();
|
||
println!("周一早上 -> 标准 cron 1-5 下次执行: {} (星期: {:?})", mon_next, mon_next.weekday());
|
||
assert_eq!(mon_next.weekday(), chrono::Weekday::Mon, "标准 cron 1-5 应该当天执行");
|
||
assert_eq!(mon_next.hour(), 9, "应该是上海时间9点");
|
||
|
||
// 从周五开始(应该下周周一)
|
||
let friday = Utc.with_ymd_and_hms(2026, 5, 1, 10, 0, 0).single().unwrap(); // 周五
|
||
let shanghai_friday = friday.with_timezone(&chrono_tz::Asia::Shanghai);
|
||
let fri_next = schedule_std.after(&shanghai_friday).next().unwrap();
|
||
println!("周五 -> 标准 cron 1-5 下次执行: {} (星期: {:?})", fri_next, fri_next.weekday());
|
||
assert_eq!(fri_next.weekday(), chrono::Weekday::Mon, "标准 cron 1-5 应该从周五跳到下周一");
|
||
}
|
||
|
||
/// 测试转换辅助函数
|
||
#[test]
|
||
fn test_weekday_conversion_helper() {
|
||
// 测试单个值
|
||
assert_eq!(convert_single_weekday("0"), "1"); // 周日
|
||
assert_eq!(convert_single_weekday("1"), "2"); // 周一
|
||
assert_eq!(convert_single_weekday("5"), "6"); // 周五
|
||
assert_eq!(convert_single_weekday("6"), "7"); // 周六
|
||
assert_eq!(convert_single_weekday("7"), "1"); // 周日(标准 cron 兼容写法)
|
||
|
||
// 测试范围
|
||
assert_eq!(convert_weekday_range_or_value("1-5"), "2-6"); // 周一到周五
|
||
assert_eq!(convert_weekday_range_or_value("0-6"), "1-7"); // 周日到周六
|
||
assert_eq!(convert_weekday_range_or_value("0-7"), "1-1"); // 周日(循环)
|
||
|
||
// 测试列表
|
||
assert_eq!(convert_cron_weekday("1,3,5"), "2,4,6"); // 周一、三、五
|
||
assert_eq!(convert_cron_weekday("0,6"), "1,7"); // 周日和周六
|
||
|
||
// 测试步长
|
||
assert_eq!(convert_weekday_item("*/2"), "*/2"); // 步长保持不变
|
||
|
||
// 测试混合
|
||
assert_eq!(convert_cron_weekday("1-5,7"), "2-6,1"); // 周一到周五 + 周日
|
||
|
||
// 测试特殊字符
|
||
assert_eq!(convert_cron_weekday("*"), "*");
|
||
assert_eq!(convert_cron_weekday("?"), "?");
|
||
}
|
||
}
|