feat(timezone): 添加时区支持,增强调度和日志功能

This commit is contained in:
ooodc 2026-04-23 23:50:08 +08:00
parent e24a081293
commit e6f23858b8
7 changed files with 248 additions and 45 deletions

View File

@ -25,7 +25,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-appender = "0.2" tracing-appender = "0.2"
anyhow = "1.0" anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] } chrono = { version = "0.4", features = ["serde"] }
chrono-tz = "0.10"
cron = { version = "0.13", features = ["serde"] } cron = { version = "0.13", features = ["serde"] }
iana-time-zone = "0.1"
mime_guess = "2.0" mime_guess = "2.0"
base64 = "0.22" base64 = "0.22"
tempfile = "3" tempfile = "3"

View File

@ -37,8 +37,7 @@
## 写入规则 ## 写入规则
- 写入或修改记忆时,再使用 memory_manage。 - 写入或修改记忆时,再使用 memory_manage。
- 仅在遇到高价值且未来仍有用的信息时写入记忆:用户长期偏好、稳定事实、用户对你的纠正、持续任务或项目上下文、明确决策等。 - 遇到高价值且未来仍有用的信息时写入记忆:用户长期偏好、稳定事实、用户对你的纠正、持续任务或项目上下文、明确决策等。
- 不要保存一次性工具结果、临时列表、敏感凭证或不确定推测。
- 写入时优先使用规范 namespacepreferences、profile、tasks、decisions。 - 写入时优先使用规范 namespacepreferences、profile、tasks、decisions。
- 优先调用 memory_manage(action='put');同一 namespace/key 可直接覆盖更新。 - 优先调用 memory_manage(action='put');同一 namespace/key 可直接覆盖更新。
@ -51,3 +50,4 @@
## 最后检查 ## 最后检查
如果你决定跳过记忆搜索,应先确认当前请求确实属于上述少数例外,而不是因为你忘了检索,或因为你误以为单凭当前消息就足够。 如果你决定跳过记忆搜索,应先确认当前请求确实属于上述少数例外,而不是因为你忘了检索,或因为你误以为单凭当前消息就足够。
如果你决定跳过记忆保存,确定当前确实没有记忆需要保存的。

View File

@ -1,4 +1,5 @@
use chrono::{DateTime, Utc}; use chrono::{DateTime, Utc};
use chrono_tz::Tz;
use regex::Regex; use regex::Regex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
@ -13,6 +14,8 @@ pub struct Config {
pub models: HashMap<String, ModelConfig>, pub models: HashMap<String, ModelConfig>,
pub agents: HashMap<String, AgentConfig>, pub agents: HashMap<String, AgentConfig>,
#[serde(default)] #[serde(default)]
pub time: TimeConfig,
#[serde(default)]
pub gateway: GatewayConfig, pub gateway: GatewayConfig,
#[serde(default)] #[serde(default)]
pub scheduler: SchedulerConfig, pub scheduler: SchedulerConfig,
@ -24,6 +27,31 @@ pub struct Config {
pub skills: SkillsConfig, pub skills: SkillsConfig,
} }
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimeConfig {
#[serde(default = "default_timezone")]
pub timezone: String,
}
impl TimeConfig {
pub fn parse_timezone(&self) -> Result<Tz, ConfigError> {
self.timezone.parse::<Tz>().map_err(|_| {
ConfigError::InvalidTimezone(format!(
"unsupported timezone '{}', expected an IANA timezone like 'Asia/Shanghai'",
self.timezone
))
})
}
}
impl Default for TimeConfig {
fn default() -> Self {
Self {
timezone: default_timezone(),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkillsConfig { pub struct SkillsConfig {
#[serde(default = "default_skills_enabled")] #[serde(default = "default_skills_enabled")]
@ -127,7 +155,7 @@ pub struct AgentConfig {
} }
fn default_max_tool_iterations() -> usize { fn default_max_tool_iterations() -> usize {
20 100
} }
fn default_token_limit() -> usize { fn default_token_limit() -> usize {
@ -259,27 +287,27 @@ impl SchedulerJobConfig {
} }
impl SchedulerConfig { impl SchedulerConfig {
pub fn builtin_jobs() -> Vec<SchedulerJobConfig> { pub fn builtin_jobs(time: &TimeConfig) -> Vec<SchedulerJobConfig> {
vec![SchedulerJobConfig { vec![SchedulerJobConfig {
id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(), id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(),
enabled: true, enabled: true,
kind: SchedulerJobKind::InternalEvent, kind: SchedulerJobKind::InternalEvent,
schedule: Some(SchedulerSchedule::Cron { schedule: Some(SchedulerSchedule::Cron {
expression: "0 19 * * *".to_string(), expression: "0 3 * * *".to_string(),
}), }),
startup_delay_secs: 0, startup_delay_secs: 0,
interval_secs: 0, interval_secs: 0,
target: SchedulerJobTarget::default(), target: SchedulerJobTarget::default(),
payload: serde_json::json!({ payload: serde_json::json!({
"event": "memory_maintenance", "event": "memory_maintenance",
"time_zone": "Asia/Shanghai", "time_zone": time.timezone,
"local_time": "03:00" "local_time": "03:00"
}), }),
}] }]
} }
pub fn effective_jobs(&self) -> Vec<SchedulerJobConfig> { pub fn effective_jobs(&self, time: &TimeConfig) -> Vec<SchedulerJobConfig> {
let mut jobs = Self::builtin_jobs(); let mut jobs = Self::builtin_jobs(time);
for configured in &self.jobs { for configured in &self.jobs {
if let Some(existing) = jobs.iter_mut().find(|job| job.id == configured.id) { if let Some(existing) = jobs.iter_mut().find(|job| job.id == configured.id) {
@ -400,6 +428,23 @@ fn default_gateway_url() -> String {
"ws://127.0.0.1:19876/ws".to_string() "ws://127.0.0.1:19876/ws".to_string()
} }
fn default_timezone() -> String {
detect_system_timezone().unwrap_or_else(default_beijing_timezone)
}
fn detect_system_timezone() -> Option<String> {
let detected = iana_time_zone::get_timezone().ok()?;
if detected.parse::<Tz>().is_ok() {
Some(detected)
} else {
None
}
}
fn default_beijing_timezone() -> String {
"Asia/Shanghai".to_string()
}
fn default_agent_prompt_reinject_every() -> u64 { fn default_agent_prompt_reinject_every() -> u64 {
100 100
} }
@ -486,6 +531,7 @@ impl Config {
}; };
let content = resolve_env_placeholders(&content); let content = resolve_env_placeholders(&content);
let config: Config = serde_json::from_str(&content)?; let config: Config = serde_json::from_str(&content)?;
config.time.parse_timezone()?;
Ok(config) Ok(config)
} }
@ -523,6 +569,7 @@ pub enum ConfigError {
ProviderNotFound(String), ProviderNotFound(String),
ModelNotFound(String), ModelNotFound(String),
InvalidSchedulerJob(String), InvalidSchedulerJob(String),
InvalidTimezone(String),
} }
impl std::fmt::Display for ConfigError { impl std::fmt::Display for ConfigError {
@ -533,6 +580,7 @@ impl std::fmt::Display for ConfigError {
ConfigError::ProviderNotFound(name) => write!(f, "Provider not found: {}", name), ConfigError::ProviderNotFound(name) => write!(f, "Provider not found: {}", name),
ConfigError::ModelNotFound(name) => write!(f, "Model not found: {}", name), ConfigError::ModelNotFound(name) => write!(f, "Model not found: {}", name),
ConfigError::InvalidSchedulerJob(message) => write!(f, "Invalid scheduler job: {}", message), ConfigError::InvalidSchedulerJob(message) => write!(f, "Invalid scheduler job: {}", message),
ConfigError::InvalidTimezone(message) => write!(f, "Invalid timezone: {}", message),
} }
} }
} }
@ -729,6 +777,79 @@ mod tests {
assert_eq!(config.gateway.agent_prompt_reinject_every, 100); assert_eq!(config.gateway.agent_prompt_reinject_every, 100);
} }
#[test]
fn test_config_loads_configured_timezone() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
},
"time": {
"timezone": "Asia/Shanghai"
}
}"#,
)
.unwrap();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
assert_eq!(config.time.timezone, "Asia/Shanghai");
assert_eq!(config.time.parse_timezone().unwrap(), chrono_tz::Asia::Shanghai);
}
#[test]
fn test_config_rejects_invalid_timezone() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
},
"time": {
"timezone": "Mars/Base"
}
}"#,
)
.unwrap();
let error = Config::load(file.path().to_str().unwrap()).unwrap_err();
assert!(error.to_string().contains("Invalid timezone"));
}
#[test] #[test]
fn test_gateway_config_can_enable_tool_results() { fn test_gateway_config_can_enable_tool_results() {
let file = tempfile::NamedTempFile::new().unwrap(); let file = tempfile::NamedTempFile::new().unwrap();
@ -765,6 +886,39 @@ mod tests {
assert!(config.gateway.show_tool_results); assert!(config.gateway.show_tool_results);
} }
#[test]
fn test_agent_config_defaults_max_tool_iterations_to_100() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
}
}"#,
)
.unwrap();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
assert_eq!(config.agents["default"].max_tool_iterations, 100);
}
#[test] #[test]
fn test_scheduler_config_defaults() { fn test_scheduler_config_defaults() {
let file = write_test_config(); let file = write_test_config();
@ -776,14 +930,14 @@ mod tests {
assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::Skip); assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::Skip);
assert!(config.scheduler.jobs.is_empty()); assert!(config.scheduler.jobs.is_empty());
let effective_jobs = config.scheduler.effective_jobs(); let effective_jobs = config.scheduler.effective_jobs(&config.time);
assert_eq!(effective_jobs.len(), 1); assert_eq!(effective_jobs.len(), 1);
assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID); assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert_eq!(effective_jobs[0].kind, SchedulerJobKind::InternalEvent); assert_eq!(effective_jobs[0].kind, SchedulerJobKind::InternalEvent);
assert_eq!( assert_eq!(
effective_jobs[0].resolved_schedule().unwrap(), effective_jobs[0].resolved_schedule().unwrap(),
SchedulerSchedule::Cron { SchedulerSchedule::Cron {
expression: "0 19 * * *".to_string(), expression: "0 3 * * *".to_string(),
} }
); );
} }
@ -818,7 +972,9 @@ mod tests {
payload: serde_json::json!({"event": "custom"}), payload: serde_json::json!({"event": "custom"}),
}); });
let effective_jobs = scheduler.effective_jobs(); let effective_jobs = scheduler.effective_jobs(&TimeConfig {
timezone: "Asia/Shanghai".to_string(),
});
assert_eq!(effective_jobs.len(), 2); assert_eq!(effective_jobs.len(), 2);
assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID); assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert!(!effective_jobs[0].enabled); assert!(!effective_jobs[0].enabled);

View File

@ -28,7 +28,8 @@
- 默认短而清楚,按信息密度组织内容。 - 默认短而清楚,按信息密度组织内容。
- 如果任务涉及文件、命令、配置或下一步操作,优先给出最关键的那部分。 - 如果任务涉及文件、命令、配置或下一步操作,优先给出最关键的那部分。
- 如果存在限制、风险或前提条件,要直接说明。 - 如果存在限制、风险或前提条件,要直接说明。
- 在信息不足时先补关键前提,在信息充分时直接执行
## 补充要求 ## PICO配置
- 默认路径为[basedir]:~/.picobot
- 在信息不足时先补关键前提,在信息充分时直接执行。 - Skill安装在[basedir]/skills

View File

@ -24,9 +24,7 @@ pub struct GatewayState {
} }
impl GatewayState { impl GatewayState {
pub fn new() -> Result<Self, Box<dyn std::error::Error>> { pub fn from_config(config: Config) -> Result<Self, Box<dyn std::error::Error>> {
let config = Config::load_default()?;
// Get provider config for SessionManager // Get provider config for SessionManager
let provider_config = config.get_provider_config("default")?; let provider_config = config.get_provider_config("default")?;
let mut provider_configs = HashMap::<String, LLMProviderConfig>::new(); let mut provider_configs = HashMap::<String, LLMProviderConfig>::new();
@ -140,11 +138,14 @@ impl GatewayState {
} }
pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn std::error::Error>> { pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn std::error::Error>> {
let config = Config::load_default()?;
let timezone = config.time.parse_timezone()?;
// Initialize logging // Initialize logging
logging::init_logging(); logging::init_logging(timezone);
tracing::info!("Starting PicoBot Gateway"); tracing::info!("Starting PicoBot Gateway");
let state = Arc::new(GatewayState::new()?); let state = Arc::new(GatewayState::from_config(config)?);
// Get provider config for channels // Get provider config for channels
let provider_config = state.config.get_provider_config("default")?; let provider_config = state.config.get_provider_config("default")?;
@ -161,6 +162,7 @@ pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn
let scheduler = Scheduler::new( let scheduler = Scheduler::new(
state.bus.clone(), state.bus.clone(),
state.config.scheduler.clone(), state.config.scheduler.clone(),
timezone,
state.session_manager.store(), state.session_manager.store(),
state.session_manager.clone(), state.session_manager.clone(),
); );

View File

@ -1,5 +1,6 @@
use std::path::PathBuf; use std::path::PathBuf;
use chrono::Local; use chrono::Utc;
use chrono_tz::Tz;
use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{ use tracing_subscriber::{
fmt, fmt,
@ -9,12 +10,14 @@ use tracing_subscriber::{
EnvFilter, EnvFilter,
}; };
#[derive(Clone, Copy, Debug, Default)] #[derive(Clone, Copy, Debug)]
struct LocalTimestamp; struct ConfiguredTimestamp {
timezone: Tz,
}
impl FormatTime for LocalTimestamp { impl FormatTime for ConfiguredTimestamp {
fn format_time(&self, writer: &mut tracing_subscriber::fmt::format::Writer<'_>) -> std::fmt::Result { fn format_time(&self, writer: &mut tracing_subscriber::fmt::format::Writer<'_>) -> std::fmt::Result {
write!(writer, "{}", Local::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true)) write!(writer, "{}", Utc::now().with_timezone(&self.timezone).to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
} }
} }
@ -32,7 +35,7 @@ pub fn get_default_config_path() -> PathBuf {
/// Initialize logging with file appender /// Initialize logging with file appender
/// Logs are written to ~/.picobot/logs/ with daily rotation /// Logs are written to ~/.picobot/logs/ with daily rotation
pub fn init_logging() { pub fn init_logging(timezone: Tz) {
let log_dir = get_default_log_dir(); let log_dir = get_default_log_dir();
// Create log directory if it doesn't exist // Create log directory if it doesn't exist
@ -55,14 +58,14 @@ pub fn init_logging() {
let file_layer = fmt::layer() let file_layer = fmt::layer()
.with_writer(file_appender) .with_writer(file_appender)
.with_timer(LocalTimestamp) .with_timer(ConfiguredTimestamp { timezone })
.with_ansi(false) .with_ansi(false)
.with_target(true) .with_target(true)
.with_level(true) .with_level(true)
.with_thread_ids(true); .with_thread_ids(true);
let console_layer = fmt::layer() let console_layer = fmt::layer()
.with_timer(LocalTimestamp) .with_timer(ConfiguredTimestamp { timezone })
.with_target(true) .with_target(true)
.with_level(true); .with_level(true);
@ -76,12 +79,12 @@ pub fn init_logging() {
} }
/// Initialize logging without file output (console only) /// Initialize logging without file output (console only)
pub fn init_logging_console_only() { pub fn init_logging_console_only(timezone: Tz) {
let env_filter = EnvFilter::try_from_default_env() let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info")); .unwrap_or_else(|_| EnvFilter::new("info"));
let console_layer = fmt::layer() let console_layer = fmt::layer()
.with_timer(LocalTimestamp) .with_timer(ConfiguredTimestamp { timezone })
.with_target(true) .with_target(true)
.with_level(true); .with_level(true);

View File

@ -3,6 +3,7 @@ use std::str::FromStr;
use std::sync::Arc; use std::sync::Arc;
use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc}; use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc};
use chrono_tz::Tz;
use tokio::sync::watch; use tokio::sync::watch;
use crate::bus::{MessageBus, OutboundMessage}; use crate::bus::{MessageBus, OutboundMessage};
@ -19,6 +20,7 @@ use crate::storage::{
pub struct Scheduler { pub struct Scheduler {
bus: Arc<MessageBus>, bus: Arc<MessageBus>,
config: SchedulerConfig, config: SchedulerConfig,
timezone: Tz,
store: Arc<SessionStore>, store: Arc<SessionStore>,
session_manager: SessionManager, session_manager: SessionManager,
} }
@ -27,12 +29,14 @@ impl Scheduler {
pub fn new( pub fn new(
bus: Arc<MessageBus>, bus: Arc<MessageBus>,
config: SchedulerConfig, config: SchedulerConfig,
timezone: Tz,
store: Arc<SessionStore>, store: Arc<SessionStore>,
session_manager: SessionManager, session_manager: SessionManager,
) -> Self { ) -> Self {
Self { Self {
bus, bus,
config, config,
timezone,
store, store,
session_manager, session_manager,
} }
@ -72,8 +76,8 @@ impl Scheduler {
fn sync_config_jobs(&self) -> anyhow::Result<()> { fn sync_config_jobs(&self) -> anyhow::Result<()> {
let now = Utc::now(); let now = Utc::now();
for job in self.config.effective_jobs() { for job in self.config.effective_jobs(&crate::config::TimeConfig { timezone: self.timezone.name().to_string() }) {
let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy)?; let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?;
self.store.upsert_scheduler_job(&runtime.to_upsert())?; self.store.upsert_scheduler_job(&runtime.to_upsert())?;
} }
Ok(()) Ok(())
@ -84,7 +88,7 @@ impl Scheduler {
let jobs = self.store.list_scheduler_jobs(true)?; let jobs = self.store.list_scheduler_jobs(true)?;
for record in jobs { for record in jobs {
let Some(mut job) = RuntimeJob::from_record(&record, self.config.misfire_policy)? else { let Some(mut job) = RuntimeJob::from_record(&record, self.config.misfire_policy, self.timezone)? else {
continue; continue;
}; };
@ -119,7 +123,12 @@ impl Scheduler {
)?; )?;
let execution_result = self.execute_job(&job).await; let execution_result = self.execute_job(&job).await;
job.after_execution(now, execution_result.as_ref().err().map(|err| err.to_string()), self.config.misfire_policy)?; job.after_execution(
now,
execution_result.as_ref().err().map(|err| err.to_string()),
self.config.misfire_policy,
self.timezone,
)?;
let status = if execution_result.is_ok() { let status = if execution_result.is_ok() {
Some(SchedulerJobStatus::Ok) Some(SchedulerJobStatus::Ok)
@ -194,6 +203,7 @@ impl RuntimeJob {
job: &SchedulerJobConfig, job: &SchedulerJobConfig,
now: DateTime<Utc>, now: DateTime<Utc>,
misfire_policy: SchedulerMisfirePolicy, misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Self> { ) -> anyhow::Result<Self> {
let schedule = job.resolved_schedule()?; let schedule = job.resolved_schedule()?;
let initial_state = if job.enabled { let initial_state = if job.enabled {
@ -202,7 +212,7 @@ impl RuntimeJob {
SchedulerJobState::Paused SchedulerJobState::Paused
}; };
let next_fire_at = if job.enabled { let next_fire_at = if job.enabled {
compute_initial_next_fire_at(&schedule, now, None, misfire_policy)? compute_initial_next_fire_at(&schedule, now, None, misfire_policy, timezone)?
} else { } else {
None None
}; };
@ -231,6 +241,7 @@ impl RuntimeJob {
fn from_record( fn from_record(
record: &SchedulerJobRecord, record: &SchedulerJobRecord,
misfire_policy: SchedulerMisfirePolicy, misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<Self>> { ) -> anyhow::Result<Option<Self>> {
let kind = match record.kind.as_str() { let kind = match record.kind.as_str() {
"internal_event" => SchedulerJobKind::InternalEvent, "internal_event" => SchedulerJobKind::InternalEvent,
@ -249,7 +260,7 @@ impl RuntimeJob {
(_, SchedulerJobState::Paused, _) => None, (_, SchedulerJobState::Paused, _) => None,
(_, SchedulerJobState::Completed, _) => None, (_, SchedulerJobState::Completed, _) => None,
(_, _, some_next) if some_next.is_some() => some_next, (_, _, some_next) if some_next.is_some() => some_next,
_ => compute_initial_next_fire_at(&schedule, now, record.last_fired_at, misfire_policy)?, _ => compute_initial_next_fire_at(&schedule, now, record.last_fired_at, misfire_policy, timezone)?,
}; };
Ok(Some(Self { Ok(Some(Self {
@ -284,6 +295,7 @@ impl RuntimeJob {
now: DateTime<Utc>, now: DateTime<Utc>,
last_error: Option<String>, last_error: Option<String>,
misfire_policy: SchedulerMisfirePolicy, misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
self.run_count += 1; self.run_count += 1;
self.last_fired_at = Some(now.timestamp_millis()); self.last_fired_at = Some(now.timestamp_millis());
@ -308,7 +320,7 @@ impl RuntimeJob {
let reference_ms = self.next_fire_at.or(self.last_fired_at); let reference_ms = self.next_fire_at.or(self.last_fired_at);
self.state = SchedulerJobState::Scheduled; self.state = SchedulerJobState::Scheduled;
self.completed_at = None; self.completed_at = None;
self.next_fire_at = compute_next_fire_at(&self.schedule, now, reference_ms, misfire_policy)?; self.next_fire_at = compute_next_fire_at(&self.schedule, now, reference_ms, misfire_policy, timezone)?;
Ok(()) Ok(())
} }
@ -363,9 +375,10 @@ fn compute_initial_next_fire_at(
now: DateTime<Utc>, now: DateTime<Utc>,
last_fired_at: Option<i64>, last_fired_at: Option<i64>,
misfire_policy: SchedulerMisfirePolicy, misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<i64>> { ) -> anyhow::Result<Option<i64>> {
match last_fired_at { match last_fired_at {
Some(last_fired_at) => compute_next_fire_at(schedule, now, Some(last_fired_at), misfire_policy), Some(last_fired_at) => compute_next_fire_at(schedule, now, Some(last_fired_at), misfire_policy, timezone),
None => match schedule { None => match schedule {
SchedulerSchedule::Delay { seconds } => Ok(Some((now + ChronoDuration::seconds(*seconds as i64)).timestamp_millis())), SchedulerSchedule::Delay { seconds } => Ok(Some((now + ChronoDuration::seconds(*seconds as i64)).timestamp_millis())),
SchedulerSchedule::Interval { SchedulerSchedule::Interval {
@ -378,7 +391,8 @@ fn compute_initial_next_fire_at(
SchedulerSchedule::At { timestamp } => Ok(Some(parse_rfc3339_to_utc(timestamp)?.timestamp_millis())), SchedulerSchedule::At { timestamp } => Ok(Some(parse_rfc3339_to_utc(timestamp)?.timestamp_millis())),
SchedulerSchedule::Cron { expression } => { SchedulerSchedule::Cron { expression } => {
let schedule = parse_scheduler_cron(expression)?; let schedule = parse_scheduler_cron(expression)?;
Ok(schedule.after(&now).next().map(|next| next.timestamp_millis())) let local_now = now.with_timezone(&timezone);
Ok(schedule.after(&local_now).next().map(|next| next.with_timezone(&Utc).timestamp_millis()))
} }
}, },
} }
@ -389,6 +403,7 @@ fn compute_next_fire_at(
now: DateTime<Utc>, now: DateTime<Utc>,
reference_ms: Option<i64>, reference_ms: Option<i64>,
misfire_policy: SchedulerMisfirePolicy, misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<i64>> { ) -> anyhow::Result<Option<i64>> {
match schedule { match schedule {
SchedulerSchedule::Delay { .. } | SchedulerSchedule::At { .. } => Ok(None), SchedulerSchedule::Delay { .. } | SchedulerSchedule::At { .. } => Ok(None),
@ -410,12 +425,13 @@ fn compute_next_fire_at(
SchedulerSchedule::Cron { expression } => { SchedulerSchedule::Cron { expression } => {
let schedule = parse_scheduler_cron(expression)?; let schedule = parse_scheduler_cron(expression)?;
let anchor = match misfire_policy { let anchor = match misfire_policy {
SchedulerMisfirePolicy::Skip => now, SchedulerMisfirePolicy::Skip => now.with_timezone(&timezone),
SchedulerMisfirePolicy::CatchUp => reference_ms SchedulerMisfirePolicy::CatchUp => reference_ms
.and_then(ts_millis_to_utc) .and_then(ts_millis_to_utc)
.unwrap_or(now), .map(|value| value.with_timezone(&timezone))
.unwrap_or_else(|| now.with_timezone(&timezone)),
}; };
Ok(schedule.after(&anchor).next().map(|next| next.timestamp_millis())) Ok(schedule.after(&anchor).next().map(|next| next.with_timezone(&Utc).timestamp_millis()))
} }
} }
} }
@ -630,7 +646,7 @@ mod agent_task_tests {
updated_at: 1_700_000_000_000, updated_at: 1_700_000_000_000,
}; };
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip) let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip, chrono_tz::Asia::Shanghai)
.unwrap() .unwrap()
.unwrap(); .unwrap();
@ -718,6 +734,7 @@ mod tests {
now, now,
Some(now.timestamp_millis() - 10 * 60 * 1_000), Some(now.timestamp_millis() - 10 * 60 * 1_000),
SchedulerMisfirePolicy::Skip, SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
) )
.unwrap() .unwrap()
.unwrap(); .unwrap();
@ -736,6 +753,7 @@ mod tests {
now, now,
Some(now.timestamp_millis() - 10 * 60 * 1_000), Some(now.timestamp_millis() - 10 * 60 * 1_000),
SchedulerMisfirePolicy::CatchUp, SchedulerMisfirePolicy::CatchUp,
chrono_tz::Asia::Shanghai,
) )
.unwrap() .unwrap()
.unwrap(); .unwrap();
@ -775,7 +793,7 @@ mod tests {
updated_at: 1_700_000_000_000, updated_at: 1_700_000_000_000,
}; };
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip) let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip, chrono_tz::Asia::Shanghai)
.unwrap() .unwrap()
.unwrap(); .unwrap();
@ -851,6 +869,7 @@ mod tests {
misfire_policy: SchedulerMisfirePolicy::Skip, misfire_policy: SchedulerMisfirePolicy::Skip,
jobs: Vec::new(), jobs: Vec::new(),
}, },
chrono_tz::Asia::Shanghai,
store.clone(), store.clone(),
session_manager, session_manager,
); );
@ -893,6 +912,7 @@ mod tests {
let scheduler = Scheduler::new( let scheduler = Scheduler::new(
MessageBus::new(8), MessageBus::new(8),
SchedulerConfig::default(), SchedulerConfig::default(),
chrono_tz::Asia::Shanghai,
store.clone(), store.clone(),
session_manager, session_manager,
); );
@ -913,9 +933,28 @@ mod tests {
saved.schedule, saved.schedule,
serde_json::json!({ serde_json::json!({
"type": "cron", "type": "cron",
"expression": "0 19 * * *" "expression": "0 3 * * *"
}) })
); );
assert!(saved.next_fire_at.is_some()); assert!(saved.next_fire_at.is_some());
} }
#[test]
fn cron_schedule_uses_configured_timezone() {
let now = Utc.with_ymd_and_hms(2026, 4, 23, 18, 0, 0).single().unwrap();
let next = compute_next_fire_at(
&SchedulerSchedule::Cron {
expression: "0 3 * * *".to_string(),
},
now,
None,
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
let next_utc = ts_millis_to_utc(next).unwrap();
assert_eq!(next_utc, Utc.with_ymd_and_hms(2026, 4, 23, 19, 0, 0).single().unwrap());
}
} }