Compare commits

..

4 Commits

11 changed files with 523 additions and 57 deletions

View File

@ -25,7 +25,9 @@ tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] }
tracing-appender = "0.2"
anyhow = "1.0"
chrono = { version = "0.4", features = ["serde"] }
chrono-tz = "0.10"
cron = { version = "0.13", features = ["serde"] }
iana-time-zone = "0.1"
mime_guess = "2.0"
base64 = "0.22"
tempfile = "3"

33
config.json Normal file
View File

@ -0,0 +1,33 @@
{
"providers": {
"default": {
"type": "openai",
"base_url": "<OPENAI_BASE_URL>",
"api_key": "<OPENAI_API_KEY>",
"extra_headers": {}
}
},
"models": {
"default": {
"model_id": "<OPENAI_MODEL_NAME>",
"temperature": 0.2
}
},
"agents": {
"default": {
"provider": "default",
"model": "default"
}
},
"gateway": {
"host": "0.0.0.0",
"port": 19876,
"agent_prompt_reinject_every": 100
},
"scheduler": {
"enabled": true,
"tick_resolution_ms": 1000,
"misfire_policy": "skip",
"jobs": []
}
}

View File

@ -27,6 +27,13 @@ const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__";
const DEFAULT_PENDING_ASSISTANT_MESSAGE: &str = "工具已经启动并进入等待用户操作的状态。请先完成外部操作,完成后直接告诉我继续。";
const RECOVERABLE_LLM_ERROR_MESSAGE: &str = "模型服务暂时不可用或响应超时。请稍后重试。";
const SUPPORTED_IMAGE_MIME_TYPES: &[&str] = &[
"image/jpeg",
"image/png",
"image/gif",
"image/webp",
];
/// Build content blocks from text and media paths
fn build_content_blocks(text: &str, media_paths: &[String]) -> Vec<ContentBlock> {
let mut blocks = Vec::new();
@ -38,6 +45,11 @@ fn build_content_blocks(text: &str, media_paths: &[String]) -> Vec<ContentBlock>
// Add image blocks for media paths
for path in media_paths {
if supported_image_mime_type(path).is_none() {
tracing::debug!(media_path = %path, "Skipping non-image media ref for LLM image block");
continue;
}
if let Ok((mime_type, base64_data)) = encode_image_to_base64(path) {
let url = format!("data:{};base64,{}", mime_type, base64_data);
blocks.push(ContentBlock::image_url(url));
@ -52,18 +64,32 @@ fn build_content_blocks(text: &str, media_paths: &[String]) -> Vec<ContentBlock>
blocks
}
fn supported_image_mime_type(path: &str) -> Option<String> {
let mime = mime_guess::from_path(path).first_or_octet_stream();
let essence = mime.essence_str();
if SUPPORTED_IMAGE_MIME_TYPES.contains(&essence) {
Some(essence.to_string())
} else {
None
}
}
/// Encode an image file to base64 data URL
fn encode_image_to_base64(path: &str) -> Result<(String, String), std::io::Error> {
use base64::{Engine as _, engine::general_purpose::STANDARD};
let mime = supported_image_mime_type(path).ok_or_else(|| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("unsupported image media type for path: {}", path),
)
})?;
let mut file = std::fs::File::open(path)?;
let mut buffer = Vec::new();
file.read_to_end(&mut buffer)?;
let mime = mime_guess::from_path(path)
.first_or_octet_stream()
.to_string();
let encoded = STANDARD.encode(&buffer);
Ok((mime, encoded))
}
@ -779,6 +805,7 @@ impl AgentLoop {
mod tests {
use super::*;
use crate::observability::{MultiObserver, Observer};
use tempfile::tempdir;
struct TestObserver {
events: std::sync::Mutex<Vec<ObserverEvent>>,
@ -881,6 +908,31 @@ mod tests {
assert_eq!(output.as_deref(), Some("请完成授权"));
assert!(parse_pending_tool_output("normal output").is_none());
}
#[test]
fn test_build_content_blocks_skips_non_image_media_refs() {
let temp_dir = tempdir().unwrap();
let pdf_path = temp_dir.path().join("demo.pdf");
std::fs::write(&pdf_path, b"%PDF-1.4").unwrap();
let blocks = build_content_blocks("hello", &[pdf_path.to_string_lossy().to_string()]);
assert_eq!(blocks.len(), 1);
assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "hello"));
}
#[test]
fn test_build_content_blocks_keeps_supported_images() {
let temp_dir = tempdir().unwrap();
let jpg_path = temp_dir.path().join("demo.jpg");
std::fs::write(&jpg_path, b"fake-jpeg-data").unwrap();
let blocks = build_content_blocks("hello", &[jpg_path.to_string_lossy().to_string()]);
assert_eq!(blocks.len(), 2);
assert!(matches!(&blocks[0], ContentBlock::Text { text } if text == "hello"));
assert!(matches!(&blocks[1], ContentBlock::ImageUrl { image_url } if image_url.url.starts_with("data:image/jpeg;base64,")));
}
}
#[derive(Debug)]

View File

@ -37,11 +37,17 @@
## 写入规则
- 写入或修改记忆时,再使用 memory_manage。
- 仅在遇到高价值且未来仍有用的信息时写入记忆:用户长期偏好、稳定事实、用户对你的纠正、持续任务或项目上下文、明确决策。
- 不要保存一次性工具结果、临时列表、敏感凭证或不确定推测。
- 遇到高价值且未来仍有用的信息时写入记忆:用户长期偏好、稳定事实、用户对你的纠正、持续任务或项目上下文、明确决策等。
- 写入时优先使用规范 namespacepreferences、profile、tasks、decisions。
- 优先调用 memory_manage(action='put');同一 namespace/key 可直接覆盖更新。
### 以下场景视为高价值加分
- 用户多次交互优化输出
- 用户对你的纠正
- 确定的事实,路径/地址/网址等
- 用户独特的表达,缩写/非常规的表达
## 最后检查
如果你决定跳过记忆搜索,应先确认当前请求确实属于上述少数例外,而不是因为你忘了检索,或因为你误以为单凭当前消息就足够。
如果你决定跳过记忆保存,确定当前确实没有记忆需要保存的。

View File

@ -1,4 +1,5 @@
use chrono::{DateTime, Utc};
use chrono_tz::Tz;
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
@ -13,6 +14,8 @@ pub struct Config {
pub models: HashMap<String, ModelConfig>,
pub agents: HashMap<String, AgentConfig>,
#[serde(default)]
pub time: TimeConfig,
#[serde(default)]
pub gateway: GatewayConfig,
#[serde(default)]
pub scheduler: SchedulerConfig,
@ -24,6 +27,31 @@ pub struct Config {
pub skills: SkillsConfig,
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct TimeConfig {
#[serde(default = "default_timezone")]
pub timezone: String,
}
impl TimeConfig {
pub fn parse_timezone(&self) -> Result<Tz, ConfigError> {
self.timezone.parse::<Tz>().map_err(|_| {
ConfigError::InvalidTimezone(format!(
"unsupported timezone '{}', expected an IANA timezone like 'Asia/Shanghai'",
self.timezone
))
})
}
}
impl Default for TimeConfig {
fn default() -> Self {
Self {
timezone: default_timezone(),
}
}
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct SkillsConfig {
#[serde(default = "default_skills_enabled")]
@ -127,7 +155,7 @@ pub struct AgentConfig {
}
fn default_max_tool_iterations() -> usize {
20
100
}
fn default_token_limit() -> usize {
@ -259,27 +287,27 @@ impl SchedulerJobConfig {
}
impl SchedulerConfig {
pub fn builtin_jobs() -> Vec<SchedulerJobConfig> {
pub fn builtin_jobs(time: &TimeConfig) -> Vec<SchedulerJobConfig> {
vec![SchedulerJobConfig {
id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(),
enabled: true,
kind: SchedulerJobKind::InternalEvent,
schedule: Some(SchedulerSchedule::Cron {
expression: "0 19 * * *".to_string(),
expression: "0 3 * * *".to_string(),
}),
startup_delay_secs: 0,
interval_secs: 0,
target: SchedulerJobTarget::default(),
payload: serde_json::json!({
"event": "memory_maintenance",
"time_zone": "Asia/Shanghai",
"time_zone": time.timezone,
"local_time": "03:00"
}),
}]
}
pub fn effective_jobs(&self) -> Vec<SchedulerJobConfig> {
let mut jobs = Self::builtin_jobs();
pub fn effective_jobs(&self, time: &TimeConfig) -> Vec<SchedulerJobConfig> {
let mut jobs = Self::builtin_jobs(time);
for configured in &self.jobs {
if let Some(existing) = jobs.iter_mut().find(|job| job.id == configured.id) {
@ -400,6 +428,23 @@ fn default_gateway_url() -> String {
"ws://127.0.0.1:19876/ws".to_string()
}
fn default_timezone() -> String {
detect_system_timezone().unwrap_or_else(default_beijing_timezone)
}
fn detect_system_timezone() -> Option<String> {
let detected = iana_time_zone::get_timezone().ok()?;
if detected.parse::<Tz>().is_ok() {
Some(detected)
} else {
None
}
}
fn default_beijing_timezone() -> String {
"Asia/Shanghai".to_string()
}
fn default_agent_prompt_reinject_every() -> u64 {
100
}
@ -427,7 +472,7 @@ impl Default for ClientConfig {
impl Default for SchedulerConfig {
fn default() -> Self {
Self {
enabled: false,
enabled: true,
tick_resolution_ms: default_scheduler_tick_resolution_ms(),
worker_queue_capacity: default_scheduler_worker_queue_capacity(),
misfire_policy: SchedulerMisfirePolicy::default(),
@ -486,6 +531,7 @@ impl Config {
};
let content = resolve_env_placeholders(&content);
let config: Config = serde_json::from_str(&content)?;
config.time.parse_timezone()?;
Ok(config)
}
@ -523,6 +569,7 @@ pub enum ConfigError {
ProviderNotFound(String),
ModelNotFound(String),
InvalidSchedulerJob(String),
InvalidTimezone(String),
}
impl std::fmt::Display for ConfigError {
@ -533,6 +580,7 @@ impl std::fmt::Display for ConfigError {
ConfigError::ProviderNotFound(name) => write!(f, "Provider not found: {}", name),
ConfigError::ModelNotFound(name) => write!(f, "Model not found: {}", name),
ConfigError::InvalidSchedulerJob(message) => write!(f, "Invalid scheduler job: {}", message),
ConfigError::InvalidTimezone(message) => write!(f, "Invalid timezone: {}", message),
}
}
}
@ -729,6 +777,79 @@ mod tests {
assert_eq!(config.gateway.agent_prompt_reinject_every, 100);
}
#[test]
fn test_config_loads_configured_timezone() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
},
"time": {
"timezone": "Asia/Shanghai"
}
}"#,
)
.unwrap();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
assert_eq!(config.time.timezone, "Asia/Shanghai");
assert_eq!(config.time.parse_timezone().unwrap(), chrono_tz::Asia::Shanghai);
}
#[test]
fn test_config_rejects_invalid_timezone() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
},
"time": {
"timezone": "Mars/Base"
}
}"#,
)
.unwrap();
let error = Config::load(file.path().to_str().unwrap()).unwrap_err();
assert!(error.to_string().contains("Invalid timezone"));
}
#[test]
fn test_gateway_config_can_enable_tool_results() {
let file = tempfile::NamedTempFile::new().unwrap();
@ -765,25 +886,58 @@ mod tests {
assert!(config.gateway.show_tool_results);
}
#[test]
fn test_agent_config_defaults_max_tool_iterations_to_100() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
}
}"#,
)
.unwrap();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
assert_eq!(config.agents["default"].max_tool_iterations, 100);
}
#[test]
fn test_scheduler_config_defaults() {
let file = write_test_config();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
assert!(!config.scheduler.enabled);
assert!(config.scheduler.enabled);
assert_eq!(config.scheduler.tick_resolution_ms, 1_000);
assert_eq!(config.scheduler.worker_queue_capacity, 64);
assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::Skip);
assert!(config.scheduler.jobs.is_empty());
let effective_jobs = config.scheduler.effective_jobs();
let effective_jobs = config.scheduler.effective_jobs(&config.time);
assert_eq!(effective_jobs.len(), 1);
assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert_eq!(effective_jobs[0].kind, SchedulerJobKind::InternalEvent);
assert_eq!(
effective_jobs[0].resolved_schedule().unwrap(),
SchedulerSchedule::Cron {
expression: "0 19 * * *".to_string(),
expression: "0 3 * * *".to_string(),
}
);
}
@ -818,7 +972,9 @@ mod tests {
payload: serde_json::json!({"event": "custom"}),
});
let effective_jobs = scheduler.effective_jobs();
let effective_jobs = scheduler.effective_jobs(&TimeConfig {
timezone: "Asia/Shanghai".to_string(),
});
assert_eq!(effective_jobs.len(), 2);
assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert!(!effective_jobs[0].enabled);

View File

@ -20,6 +20,7 @@
- 对不确定的地方要直说,不把猜测包装成事实。
- 复杂任务先收敛重点,简单任务直接给结果。
- 避免不必要的重复、客套和冗长说明。
- 调用工具的时候需要不仅仅回复工具的json最好也简短说明你调用工具要完成什么工作
## 回复规则
@ -27,9 +28,8 @@
- 默认短而清楚,按信息密度组织内容。
- 如果任务涉及文件、命令、配置或下一步操作,优先给出最关键的那部分。
- 如果存在限制、风险或前提条件,要直接说明。
- 在信息不足时先补关键前提,在信息充分时直接执行
## 补充要求
- 你是 PicoBot。
- 回答应以帮助用户完成当前目标为中心。
- 在信息不足时先补关键前提,在信息充分时直接执行。
## PICO配置
- 默认路径为[basedir]:~/.picobot
- Skill安装在[basedir]/skills

View File

@ -24,9 +24,7 @@ pub struct GatewayState {
}
impl GatewayState {
pub fn new() -> Result<Self, Box<dyn std::error::Error>> {
let config = Config::load_default()?;
pub fn from_config(config: Config) -> Result<Self, Box<dyn std::error::Error>> {
// Get provider config for SessionManager
let provider_config = config.get_provider_config("default")?;
let mut provider_configs = HashMap::<String, LLMProviderConfig>::new();
@ -140,11 +138,14 @@ impl GatewayState {
}
pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn std::error::Error>> {
let config = Config::load_default()?;
let timezone = config.time.parse_timezone()?;
// Initialize logging
logging::init_logging();
logging::init_logging(timezone);
tracing::info!("Starting PicoBot Gateway");
let state = Arc::new(GatewayState::new()?);
let state = Arc::new(GatewayState::from_config(config)?);
// Get provider config for channels
let provider_config = state.config.get_provider_config("default")?;
@ -161,6 +162,7 @@ pub async fn run(host: Option<String>, port: Option<u16>) -> Result<(), Box<dyn
let scheduler = Scheduler::new(
state.bus.clone(),
state.config.scheduler.clone(),
timezone,
state.session_manager.store(),
state.session_manager.clone(),
);

View File

@ -262,6 +262,17 @@ fn preview_text(content: &str, max_chars: usize) -> String {
preview.replace('\n', "\\n")
}
fn enrich_user_content_with_media_refs(content: &str, media_refs: &[String]) -> Result<String, AgentError> {
if media_refs.is_empty() {
return Ok(content.to_string());
}
let media_refs_json = serde_json::to_string(media_refs)
.map_err(|err| AgentError::Other(format!("serialize media refs error: {}", err)))?;
Ok(format!("{content}\n\nmedia_refs_json: {media_refs_json}"))
}
fn combine_managed_memory_markdown(chunks: &[String]) -> String {
let normalized_chunks = chunks
.iter()
@ -1237,7 +1248,8 @@ impl SessionManager {
if !media_refs.is_empty() {
tracing::debug!(media_count = %media.len(), media_refs = ?media_refs, "Adding user message with media");
}
let user_message = session_guard.create_user_message(content, media_refs);
let enriched_content = enrich_user_content_with_media_refs(content, &media_refs)?;
let user_message = session_guard.create_user_message(&enriched_content, media_refs);
let user_message_id = user_message.id.clone();
session_guard.append_persisted_message(chat_id, user_message)?;
@ -1501,6 +1513,25 @@ mod tests {
assert_eq!(selected.model_id, "planner-model");
}
#[test]
fn test_enrich_user_content_with_media_refs_appends_tagged_json() {
let media_refs = vec!["/tmp/a.png".to_string(), "/tmp/b.pdf".to_string()];
let enriched = enrich_user_content_with_media_refs("hello", &media_refs).unwrap();
assert_eq!(
enriched,
"hello\n\nmedia_refs_json: [\"/tmp/a.png\",\"/tmp/b.pdf\"]"
);
}
#[test]
fn test_enrich_user_content_with_media_refs_keeps_plain_text_without_media() {
let enriched = enrich_user_content_with_media_refs("hello", &[]).unwrap();
assert_eq!(enriched, "hello");
}
#[tokio::test]
async fn test_latest_user_message_guard_tracks_current_turn() {
let store = Arc::new(SessionStore::in_memory().unwrap());

View File

@ -1,5 +1,6 @@
use std::path::PathBuf;
use chrono::Local;
use chrono::Utc;
use chrono_tz::Tz;
use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{
fmt,
@ -9,12 +10,14 @@ use tracing_subscriber::{
EnvFilter,
};
#[derive(Clone, Copy, Debug, Default)]
struct LocalTimestamp;
#[derive(Clone, Copy, Debug)]
struct ConfiguredTimestamp {
timezone: Tz,
}
impl FormatTime for LocalTimestamp {
impl FormatTime for ConfiguredTimestamp {
fn format_time(&self, writer: &mut tracing_subscriber::fmt::format::Writer<'_>) -> std::fmt::Result {
write!(writer, "{}", Local::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
write!(writer, "{}", Utc::now().with_timezone(&self.timezone).to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
}
}
@ -32,7 +35,7 @@ pub fn get_default_config_path() -> PathBuf {
/// Initialize logging with file appender
/// Logs are written to ~/.picobot/logs/ with daily rotation
pub fn init_logging() {
pub fn init_logging(timezone: Tz) {
let log_dir = get_default_log_dir();
// Create log directory if it doesn't exist
@ -55,14 +58,14 @@ pub fn init_logging() {
let file_layer = fmt::layer()
.with_writer(file_appender)
.with_timer(LocalTimestamp)
.with_timer(ConfiguredTimestamp { timezone })
.with_ansi(false)
.with_target(true)
.with_level(true)
.with_thread_ids(true);
let console_layer = fmt::layer()
.with_timer(LocalTimestamp)
.with_timer(ConfiguredTimestamp { timezone })
.with_target(true)
.with_level(true);
@ -76,12 +79,12 @@ pub fn init_logging() {
}
/// Initialize logging without file output (console only)
pub fn init_logging_console_only() {
pub fn init_logging_console_only(timezone: Tz) {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new("info"));
let console_layer = fmt::layer()
.with_timer(LocalTimestamp)
.with_timer(ConfiguredTimestamp { timezone })
.with_target(true)
.with_level(true);

View File

@ -3,6 +3,7 @@ use std::str::FromStr;
use std::sync::Arc;
use chrono::{DateTime, Duration as ChronoDuration, TimeZone, Utc};
use chrono_tz::Tz;
use tokio::sync::watch;
use crate::bus::{MessageBus, OutboundMessage};
@ -19,6 +20,7 @@ use crate::storage::{
pub struct Scheduler {
bus: Arc<MessageBus>,
config: SchedulerConfig,
timezone: Tz,
store: Arc<SessionStore>,
session_manager: SessionManager,
}
@ -27,12 +29,14 @@ impl Scheduler {
pub fn new(
bus: Arc<MessageBus>,
config: SchedulerConfig,
timezone: Tz,
store: Arc<SessionStore>,
session_manager: SessionManager,
) -> Self {
Self {
bus,
config,
timezone,
store,
session_manager,
}
@ -72,8 +76,8 @@ impl Scheduler {
fn sync_config_jobs(&self) -> anyhow::Result<()> {
let now = Utc::now();
for job in self.config.effective_jobs() {
let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy)?;
for job in self.config.effective_jobs(&crate::config::TimeConfig { timezone: self.timezone.name().to_string() }) {
let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy, self.timezone)?;
self.store.upsert_scheduler_job(&runtime.to_upsert())?;
}
Ok(())
@ -84,7 +88,7 @@ impl Scheduler {
let jobs = self.store.list_scheduler_jobs(true)?;
for record in jobs {
let Some(mut job) = RuntimeJob::from_record(&record, self.config.misfire_policy)? else {
let Some(mut job) = RuntimeJob::from_record(&record, self.config.misfire_policy, self.timezone)? else {
continue;
};
@ -119,7 +123,12 @@ impl Scheduler {
)?;
let execution_result = self.execute_job(&job).await;
job.after_execution(now, execution_result.as_ref().err().map(|err| err.to_string()), self.config.misfire_policy)?;
job.after_execution(
now,
execution_result.as_ref().err().map(|err| err.to_string()),
self.config.misfire_policy,
self.timezone,
)?;
let status = if execution_result.is_ok() {
Some(SchedulerJobStatus::Ok)
@ -194,6 +203,7 @@ impl RuntimeJob {
job: &SchedulerJobConfig,
now: DateTime<Utc>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Self> {
let schedule = job.resolved_schedule()?;
let initial_state = if job.enabled {
@ -202,7 +212,7 @@ impl RuntimeJob {
SchedulerJobState::Paused
};
let next_fire_at = if job.enabled {
compute_initial_next_fire_at(&schedule, now, None, misfire_policy)?
compute_initial_next_fire_at(&schedule, now, None, misfire_policy, timezone)?
} else {
None
};
@ -231,6 +241,7 @@ impl RuntimeJob {
fn from_record(
record: &SchedulerJobRecord,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<Self>> {
let kind = match record.kind.as_str() {
"internal_event" => SchedulerJobKind::InternalEvent,
@ -249,7 +260,7 @@ impl RuntimeJob {
(_, SchedulerJobState::Paused, _) => None,
(_, SchedulerJobState::Completed, _) => None,
(_, _, some_next) if some_next.is_some() => some_next,
_ => compute_initial_next_fire_at(&schedule, now, record.last_fired_at, misfire_policy)?,
_ => compute_initial_next_fire_at(&schedule, now, record.last_fired_at, misfire_policy, timezone)?,
};
Ok(Some(Self {
@ -284,6 +295,7 @@ impl RuntimeJob {
now: DateTime<Utc>,
last_error: Option<String>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<()> {
self.run_count += 1;
self.last_fired_at = Some(now.timestamp_millis());
@ -308,7 +320,7 @@ impl RuntimeJob {
let reference_ms = self.next_fire_at.or(self.last_fired_at);
self.state = SchedulerJobState::Scheduled;
self.completed_at = None;
self.next_fire_at = compute_next_fire_at(&self.schedule, now, reference_ms, misfire_policy)?;
self.next_fire_at = compute_next_fire_at(&self.schedule, now, reference_ms, misfire_policy, timezone)?;
Ok(())
}
@ -363,9 +375,10 @@ fn compute_initial_next_fire_at(
now: DateTime<Utc>,
last_fired_at: Option<i64>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<i64>> {
match last_fired_at {
Some(last_fired_at) => compute_next_fire_at(schedule, now, Some(last_fired_at), misfire_policy),
Some(last_fired_at) => compute_next_fire_at(schedule, now, Some(last_fired_at), misfire_policy, timezone),
None => match schedule {
SchedulerSchedule::Delay { seconds } => Ok(Some((now + ChronoDuration::seconds(*seconds as i64)).timestamp_millis())),
SchedulerSchedule::Interval {
@ -378,7 +391,8 @@ fn compute_initial_next_fire_at(
SchedulerSchedule::At { timestamp } => Ok(Some(parse_rfc3339_to_utc(timestamp)?.timestamp_millis())),
SchedulerSchedule::Cron { expression } => {
let schedule = parse_scheduler_cron(expression)?;
Ok(schedule.after(&now).next().map(|next| next.timestamp_millis()))
let local_now = now.with_timezone(&timezone);
Ok(schedule.after(&local_now).next().map(|next| next.with_timezone(&Utc).timestamp_millis()))
}
},
}
@ -389,6 +403,7 @@ fn compute_next_fire_at(
now: DateTime<Utc>,
reference_ms: Option<i64>,
misfire_policy: SchedulerMisfirePolicy,
timezone: Tz,
) -> anyhow::Result<Option<i64>> {
match schedule {
SchedulerSchedule::Delay { .. } | SchedulerSchedule::At { .. } => Ok(None),
@ -410,12 +425,13 @@ fn compute_next_fire_at(
SchedulerSchedule::Cron { expression } => {
let schedule = parse_scheduler_cron(expression)?;
let anchor = match misfire_policy {
SchedulerMisfirePolicy::Skip => now,
SchedulerMisfirePolicy::Skip => now.with_timezone(&timezone),
SchedulerMisfirePolicy::CatchUp => reference_ms
.and_then(ts_millis_to_utc)
.unwrap_or(now),
.map(|value| value.with_timezone(&timezone))
.unwrap_or_else(|| now.with_timezone(&timezone)),
};
Ok(schedule.after(&anchor).next().map(|next| next.timestamp_millis()))
Ok(schedule.after(&anchor).next().map(|next| next.with_timezone(&Utc).timestamp_millis()))
}
}
}
@ -630,7 +646,7 @@ mod agent_task_tests {
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip)
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip, chrono_tz::Asia::Shanghai)
.unwrap()
.unwrap();
@ -702,7 +718,7 @@ mod tests {
use super::*;
use std::collections::HashMap;
use crate::bus::MessageBus;
use crate::config::LLMProviderConfig;
use crate::config::{BUILTIN_MEMORY_MAINTENANCE_JOB_ID, LLMProviderConfig};
use crate::gateway::session::SessionManager;
use crate::skills::SkillRuntime;
use crate::storage::{SchedulerJobUpsert, SessionStore};
@ -718,6 +734,7 @@ mod tests {
now,
Some(now.timestamp_millis() - 10 * 60 * 1_000),
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
@ -736,6 +753,7 @@ mod tests {
now,
Some(now.timestamp_millis() - 10 * 60 * 1_000),
SchedulerMisfirePolicy::CatchUp,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
@ -775,7 +793,7 @@ mod tests {
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip)
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip, chrono_tz::Asia::Shanghai)
.unwrap()
.unwrap();
@ -851,6 +869,7 @@ mod tests {
misfire_policy: SchedulerMisfirePolicy::Skip,
jobs: Vec::new(),
},
chrono_tz::Asia::Shanghai,
store.clone(),
session_manager,
);
@ -862,4 +881,80 @@ mod tests {
assert_eq!(saved.run_count, 0);
assert_eq!(saved.state, SchedulerJobState::Scheduled);
}
#[test]
fn sync_config_jobs_persists_builtin_memory_maintenance_job() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let provider_config = LLMProviderConfig {
provider_type: "openai".to_string(),
name: "default".to_string(),
base_url: "http://localhost".to_string(),
api_key: "test-key".to_string(),
extra_headers: HashMap::new(),
llm_timeout_secs: 30,
model_id: "test-model".to_string(),
temperature: Some(0.0),
max_tokens: None,
model_extra: HashMap::new(),
token_limit: 4096,
max_tool_iterations: 4,
};
let session_manager = SessionManager::new(
4,
100,
false,
provider_config.clone(),
HashMap::from([("default".to_string(), provider_config)]),
Arc::new(SkillRuntime::default()),
)
.unwrap();
let scheduler = Scheduler::new(
MessageBus::new(8),
SchedulerConfig::default(),
chrono_tz::Asia::Shanghai,
store.clone(),
session_manager,
);
scheduler.sync_config_jobs().unwrap();
let saved = store
.get_scheduler_job(BUILTIN_MEMORY_MAINTENANCE_JOB_ID)
.unwrap()
.unwrap();
assert_eq!(saved.id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert_eq!(saved.kind, "internal_event");
assert!(saved.enabled);
assert_eq!(saved.state, SchedulerJobState::Scheduled);
assert_eq!(saved.payload.get("event").and_then(|value| value.as_str()), Some("memory_maintenance"));
assert_eq!(
saved.schedule,
serde_json::json!({
"type": "cron",
"expression": "0 3 * * *"
})
);
assert!(saved.next_fire_at.is_some());
}
#[test]
fn cron_schedule_uses_configured_timezone() {
let now = Utc.with_ymd_and_hms(2026, 4, 23, 18, 0, 0).single().unwrap();
let next = compute_next_fire_at(
&SchedulerSchedule::Cron {
expression: "0 3 * * *".to_string(),
},
now,
None,
SchedulerMisfirePolicy::Skip,
chrono_tz::Asia::Shanghai,
)
.unwrap()
.unwrap();
let next_utc = ts_millis_to_utc(next).unwrap();
assert_eq!(next_utc, Utc.with_ymd_and_hms(2026, 4, 23, 19, 0, 0).single().unwrap());
}
}

View File

@ -74,6 +74,15 @@ impl Tool for SchedulerManageTool {
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
self.execute_with_context(&crate::tools::ToolContext::default(), args)
.await
}
async fn execute_with_context(
&self,
context: &crate::tools::ToolContext,
args: serde_json::Value,
) -> anyhow::Result<ToolResult> {
let action = match args.get("action").and_then(|value| value.as_str()) {
Some(action) => action,
None => return Ok(error_result("Missing required parameter: action")),
@ -96,7 +105,7 @@ impl Tool for SchedulerManageTool {
}
}
"put" => {
let input = build_upsert(&args, &self.known_agents)?;
let input = build_upsert(context, &args, &self.known_agents)?;
let record = self.store.upsert_scheduler_job(&input)?;
record_to_json(&record)
}
@ -145,7 +154,11 @@ impl Tool for SchedulerManageTool {
}
}
fn build_upsert(args: &serde_json::Value, known_agents: &HashSet<String>) -> anyhow::Result<SchedulerJobUpsert> {
fn build_upsert(
context: &crate::tools::ToolContext,
args: &serde_json::Value,
known_agents: &HashSet<String>,
) -> anyhow::Result<SchedulerJobUpsert> {
let id = require_str(args, "id")?.to_string();
let kind = require_str(args, "kind")?.to_string();
let schedule_value = args
@ -164,7 +177,10 @@ fn build_upsert(args: &serde_json::Value, known_agents: &HashSet<String>) -> any
};
let payload = args.get("payload").cloned().unwrap_or_else(|| json!({}));
let target = args.get("target").cloned().unwrap_or_else(|| json!({}));
let target = enrich_target_from_context(
args.get("target").cloned().unwrap_or_else(|| json!({})),
context,
);
if kind == "agent_task" {
validate_agent_task_payload(&payload, known_agents)?;
validate_target_fields(&target, &["channel", "chat_id"], "agent_task")?;
@ -198,6 +214,38 @@ fn build_upsert(args: &serde_json::Value, known_agents: &HashSet<String>) -> any
})
}
fn enrich_target_from_context(
target: serde_json::Value,
context: &crate::tools::ToolContext,
) -> serde_json::Value {
let mut object = match target {
serde_json::Value::Object(map) => map,
_ => return target,
};
if !has_non_empty_string(&object, "channel") {
if let Some(channel_name) = context.channel_name.as_ref().filter(|value| !value.trim().is_empty()) {
object.insert("channel".to_string(), serde_json::Value::String(channel_name.clone()));
}
}
if !has_non_empty_string(&object, "chat_id") {
if let Some(chat_id) = context.chat_id.as_ref().filter(|value| !value.trim().is_empty()) {
object.insert("chat_id".to_string(), serde_json::Value::String(chat_id.clone()));
}
}
serde_json::Value::Object(object)
}
fn has_non_empty_string(object: &serde_json::Map<String, serde_json::Value>, field: &str) -> bool {
object
.get(field)
.and_then(|value| value.as_str())
.map(|value| !value.trim().is_empty())
.unwrap_or(false)
}
fn validate_agent_task_payload(payload: &serde_json::Value, known_agents: &HashSet<String>) -> anyhow::Result<()> {
let Some(prompt) = payload.get("prompt").and_then(|value| value.as_str()) else {
anyhow::bail!("agent_task payload.prompt is required and must be a string")
@ -411,6 +459,44 @@ mod tests {
assert!(error.contains("outbound_message target.channel is required"));
}
#[tokio::test]
async fn test_scheduler_manage_put_uses_tool_context_target_defaults() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let tool = SchedulerManageTool::new(store.clone(), HashSet::new());
let put_result = tool
.execute_with_context(
&crate::tools::ToolContext {
channel_name: Some("feishu".to_string()),
sender_id: Some("user-1".to_string()),
chat_id: Some("oc_demo".to_string()),
session_id: Some("feishu:oc_demo".to_string()),
message_id: Some("msg-1".to_string()),
message_seq: Some(1),
},
json!({
"action": "put",
"id": "work_reminder",
"kind": "outbound_message",
"schedule": {
"type": "interval",
"seconds": 60
},
"payload": {
"content": "ping"
}
}),
)
.await
.unwrap();
assert!(put_result.success);
let saved = store.get_scheduler_job("work_reminder").unwrap().unwrap();
assert_eq!(saved.target["channel"], "feishu");
assert_eq!(saved.target["chat_id"], "oc_demo");
}
#[tokio::test]
async fn test_scheduler_manage_rejects_unknown_agent_task_agent() {
let store = Arc::new(SessionStore::in_memory().unwrap());