diff --git a/README.md b/README.md index c224ec3..b0c918b 100644 --- a/README.md +++ b/README.md @@ -375,6 +375,7 @@ PicoBot 带有一个基于 SQLite 的调度器,而不是纯内存或 JSON 文 - internal_event:内部事件 - outbound_message:直接向目标通道发消息 - agent_task:构造一次合成用户输入,复用完整 Agent 流程执行 +- silent_agent_task:在独立后台会话中执行 Agent 流程,成功不推送,失败回主 chat 通知 agent_task 会复用正常链路中的这些能力: @@ -384,6 +385,14 @@ agent_task 会复用正常链路中的这些能力: - 会话持久化 - 渠道消息下发 +silent_agent_task 和 agent_task 使用同一套 Agent 执行能力,但路由语义不同: + +- target.chat_id 仍表示用户通知目标 +- target.session_chat_id 表示后台任务实际写入的会话;如果省略,会稳定派生为 scheduler/{job_id} +- 成功执行后不会向用户发送正常结果 +- 执行失败时会向主 chat 发送一条失败通知,便于用户感知异常 +- 后台任务的历史、压缩和会话内上下文会留在独立会话中,不污染主会话 + ### 9.3 运行时管理 通过 scheduler_manage 可以进行: @@ -439,12 +448,43 @@ agent_task 会复用正常链路中的这些能力: "source": "scheduler" } } + }, + { + "id": "agent.weekly_report.background", + "kind": "silent_agent_task", + "schedule": { + "type": "cron", + "expression": "0 8 * * 1" + }, + "target": { + "channel": "feishu", + "chat_id": "oc_xxx", + "session_chat_id": "scheduler/agent.weekly_report.background" + }, + "payload": { + "prompt": "请后台整理上周项目进展,输出结构化周报草稿,重点标出风险、阻塞项和下周优先级。", + "agent": "default", + "fresh_session": false, + "system_prompt": "你是周报助手,只在后台整理,不要面向用户寒暄。", + "sender_id": "scheduler-weekly-report", + "metadata": { + "job_type": "weekly_report_background", + "source": "scheduler" + } + } + } } ] } } + ``` +推荐场景: + +- agent_task:用户需要直接收到结果,例如日报提醒、定时播报、定时外发通知 +- silent_agent_task:任务需要长期积累独立上下文或后台整理材料,但不应污染主会话,例如周报草稿整理、周期性资料汇总、后台分析任务 + ## 10. 渠道与运行方式 ### 10.1 当前支持的通道 diff --git a/src/config/mod.rs b/src/config/mod.rs index 17403c8..0ecfe4f 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -252,6 +252,7 @@ pub enum SchedulerJobKind { InternalEvent, OutboundMessage, AgentTask, + SilentAgentTask, } #[derive(Debug, Clone, Deserialize, Serialize, Default)] @@ -261,6 +262,8 @@ pub struct SchedulerJobTarget { #[serde(default)] pub chat_id: Option, #[serde(default)] + pub session_chat_id: Option, + #[serde(default)] pub reply_to: Option, } @@ -1413,6 +1416,68 @@ mod tests { assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办")); } + #[test] + fn test_scheduler_config_loads_silent_agent_task_job() { + let file = tempfile::NamedTempFile::new().unwrap(); + std::fs::write( + file.path(), + r#"{ + "providers": { + "aliyun": { + "type": "openai", + "base_url": "https://example.invalid/v1", + "api_key": "test-key", + "extra_headers": {} + } + }, + "models": { + "qwen-plus": { + "model_id": "qwen-plus" + } + }, + "agents": { + "default": { + "provider": "aliyun", + "model": "qwen-plus" + } + }, + "scheduler": { + "enabled": true, + "jobs": [ + { + "id": "agent.daily_summary.background", + "kind": "silent_agent_task", + "schedule": { + "type": "cron", + "expression": "0 9 * * *" + }, + "target": { + "channel": "feishu", + "chat_id": "oc_demo", + "session_chat_id": "scheduler/agent.daily_summary.background" + }, + "payload": { + "prompt": "请后台总结今天待办" + } + } + ] + } +}"#, + ) + .unwrap(); + + let config = Config::load(file.path().to_str().unwrap()).unwrap(); + let job = &config.scheduler.jobs[0]; + assert_eq!(job.kind, SchedulerJobKind::SilentAgentTask); + assert_eq!(job.target.channel.as_deref(), Some("feishu")); + assert_eq!(job.target.chat_id.as_deref(), Some("oc_demo")); + assert_eq!( + job.target.session_chat_id.as_deref(), + Some("scheduler/agent.daily_summary.background") + ); + assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请后台总结今天待办")); + } + #[test] fn test_scheduler_schedule_validation_rejects_invalid_values() { assert!(SchedulerSchedule::Delay { seconds: 0 } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index b0fe725..f2e6edf 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -166,15 +166,65 @@ impl Scheduler { execute_internal_event(&self.session_manager, job).await?; } SchedulerJobKind::AgentTask => { - let outbound_messages = execute_agent_task(&self.session_manager, job).await?; + let outbound_messages = execute_agent_task( + &self.session_manager, + job, + required_notification_chat_id(job, "agent_task")?, + ) + .await?; for message in outbound_messages { self.bus.publish_outbound(message).await?; } } + SchedulerJobKind::SilentAgentTask => { + let execution_chat_id = resolve_execution_chat_id(job)?; + if let Err(error) = execute_agent_task(&self.session_manager, job, &execution_chat_id).await { + if let Err(notify_error) = self.notify_silent_agent_task_failure(job, &error).await { + tracing::error!( + job_id = %job.id, + error = %notify_error, + "Failed to publish silent scheduler failure notification" + ); + } + return Err(error); + } + } } Ok(()) } + + async fn notify_silent_agent_task_failure( + &self, + job: &RuntimeJob, + error: &anyhow::Error, + ) -> anyhow::Result<()> { + let channel = job + .target + .channel + .clone() + .ok_or_else(|| anyhow::anyhow!("silent_agent_task requires target.channel"))?; + let chat_id = required_notification_chat_id(job, "silent_agent_task")?.to_string(); + + let mut metadata = HashMap::new(); + metadata.insert("scheduler_job_id".to_string(), job.id.clone()); + metadata.insert("scheduler_job_kind".to_string(), "silent_agent_task".to_string()); + + self.bus + .publish_outbound(OutboundMessage::assistant( + channel, + chat_id, + format!( + "定时任务执行失败:{}\n{}", + job.id, + summarize_scheduler_error(error) + ), + job.target.reply_to.clone(), + metadata, + )) + .await + .map_err(|error| anyhow::anyhow!(error.to_string())) + } } #[derive(Debug, Clone)] @@ -247,6 +297,7 @@ impl RuntimeJob { "internal_event" => SchedulerJobKind::InternalEvent, "outbound_message" => SchedulerJobKind::OutboundMessage, "agent_task" => SchedulerJobKind::AgentTask, + "silent_agent_task" => SchedulerJobKind::SilentAgentTask, other => { tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind"); return Ok(None); @@ -331,6 +382,7 @@ impl RuntimeJob { SchedulerJobKind::InternalEvent => "internal_event".to_string(), SchedulerJobKind::OutboundMessage => "outbound_message".to_string(), SchedulerJobKind::AgentTask => "agent_task".to_string(), + SchedulerJobKind::SilentAgentTask => "silent_agent_task".to_string(), }, schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})), interval_secs: self.interval_secs, @@ -527,17 +579,13 @@ async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJ async fn execute_agent_task( session_manager: &SessionManager, job: &RuntimeJob, + execution_chat_id: &str, ) -> anyhow::Result> { let channel_name = job .target .channel .as_deref() - .ok_or_else(|| anyhow::anyhow!("agent_task requires target.channel"))?; - let chat_id = job - .target - .chat_id - .as_deref() - .ok_or_else(|| anyhow::anyhow!("agent_task requires target.chat_id"))?; + .ok_or_else(|| anyhow::anyhow!("scheduled agent task requires target.channel"))?; let prompt = job .payload .get("prompt") @@ -546,11 +594,45 @@ async fn execute_agent_task( let options = parse_scheduled_agent_task_options(job)?; session_manager - .run_scheduled_agent_task(channel_name, chat_id, prompt, options) + .run_scheduled_agent_task(channel_name, execution_chat_id, prompt, options) .await .map_err(|error| anyhow::anyhow!(error.to_string())) } +fn required_notification_chat_id<'a>(job: &'a RuntimeJob, kind_name: &str) -> anyhow::Result<&'a str> { + job.target + .chat_id + .as_deref() + .ok_or_else(|| anyhow::anyhow!("{} requires target.chat_id", kind_name)) +} + +fn resolve_execution_chat_id(job: &RuntimeJob) -> anyhow::Result { + match job.kind { + SchedulerJobKind::AgentTask => Ok(required_notification_chat_id(job, "agent_task")?.to_string()), + SchedulerJobKind::SilentAgentTask => Ok(job + .target + .session_chat_id + .clone() + .unwrap_or_else(|| derive_silent_session_chat_id(&job.id))), + _ => anyhow::bail!("execution chat id is only supported for agent task kinds"), + } +} + +fn derive_silent_session_chat_id(job_id: &str) -> String { + format!("scheduler/{}", job_id) +} + +fn summarize_scheduler_error(error: &anyhow::Error) -> String { + let text = error.to_string().replace('\n', " "); + const MAX_LEN: usize = 240; + if text.chars().count() <= MAX_LEN { + text + } else { + let summary = text.chars().take(MAX_LEN).collect::(); + format!("{}...", summary) + } +} + fn parse_scheduled_agent_task_options(job: &RuntimeJob) -> anyhow::Result { let sender_id = job .payload @@ -656,6 +738,47 @@ mod agent_task_tests { assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办")); } + #[test] + fn runtime_job_from_record_supports_silent_agent_task_kind() { + let record = SchedulerJobRecord { + id: "agent.daily_summary.background".to_string(), + kind: "silent_agent_task".to_string(), + schedule: serde_json::json!({ + "type": "interval", + "seconds": 300 + }), + interval_secs: 0, + startup_delay_secs: 0, + target: serde_json::json!({ + "channel": "feishu", + "chat_id": "oc_demo", + "session_chat_id": "scheduler/agent.daily_summary.background" + }), + payload: serde_json::json!({ + "prompt": "请后台整理今天待办" + }), + enabled: true, + state: SchedulerJobState::Scheduled, + last_status: None, + last_error: None, + run_count: 0, + max_runs: None, + last_fired_at: None, + next_fire_at: Some(1_700_000_010_000), + paused_at: None, + completed_at: None, + created_at: 1_700_000_000_000, + updated_at: 1_700_000_000_000, + }; + + let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip, chrono_tz::Asia::Shanghai) + .unwrap() + .unwrap(); + + assert_eq!(job.kind, SchedulerJobKind::SilentAgentTask); + assert_eq!(job.target.session_chat_id.as_deref(), Some("scheduler/agent.daily_summary.background")); + } + #[test] fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() { let job = RuntimeJob { @@ -668,6 +791,7 @@ mod agent_task_tests { target: SchedulerJobTarget { channel: Some("feishu".to_string()), chat_id: Some("oc_demo".to_string()), + session_chat_id: None, reply_to: None, }, payload: serde_json::json!({ @@ -705,6 +829,44 @@ mod agent_task_tests { assert_eq!(options.metadata.get("priority").map(String::as_str), Some("1")); assert_eq!(options.metadata.get("urgent").map(String::as_str), Some("false")); } + + #[test] + fn resolve_execution_chat_id_uses_dedicated_session_for_silent_agent_tasks() { + let job = RuntimeJob { + id: "agent.daily_summary.background".to_string(), + kind: SchedulerJobKind::SilentAgentTask, + schedule: SchedulerSchedule::Interval { + seconds: 300, + startup_delay_secs: 0, + }, + target: SchedulerJobTarget { + channel: Some("feishu".to_string()), + chat_id: Some("oc_demo".to_string()), + session_chat_id: None, + reply_to: None, + }, + payload: serde_json::json!({ + "prompt": "请后台整理今天待办" + }), + enabled: true, + state: SchedulerJobState::Scheduled, + last_status: None, + last_error: None, + run_count: 0, + max_runs: None, + last_fired_at: None, + next_fire_at: None, + paused_at: None, + completed_at: None, + interval_secs: 300, + startup_delay_secs: 0, + }; + + assert_eq!( + resolve_execution_chat_id(&job).unwrap(), + "scheduler/agent.daily_summary.background" + ); + } } impl TryFrom for SchedulerJobTarget { @@ -725,6 +887,38 @@ mod tests { use crate::skills::SkillRuntime; use crate::storage::{SchedulerJobUpsert, SessionStore}; + fn test_provider_config() -> LLMProviderConfig { + LLMProviderConfig { + provider_type: "openai".to_string(), + name: "default".to_string(), + base_url: "http://localhost".to_string(), + api_key: "test-key".to_string(), + extra_headers: HashMap::new(), + llm_timeout_secs: 30, + model_id: "test-model".to_string(), + temperature: Some(0.0), + max_tokens: None, + model_extra: HashMap::new(), + max_tool_iterations: 4, + tool_result_max_chars: 20_000, + context_tool_result_trim_chars: 20_000, + } + } + + fn test_session_manager() -> SessionManager { + let provider_config = test_provider_config(); + SessionManager::new( + 4, + 100, + false, + "Asia/Shanghai".to_string(), + provider_config.clone(), + HashMap::from([("default".to_string(), provider_config)]), + Arc::new(SkillRuntime::default()), + ) + .unwrap() + } + #[test] fn runtime_job_skip_policy_advances_from_now() { let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap(); @@ -839,31 +1033,7 @@ mod tests { }) .unwrap(); - let provider_config = LLMProviderConfig { - provider_type: "openai".to_string(), - name: "default".to_string(), - base_url: "http://localhost".to_string(), - api_key: "test-key".to_string(), - extra_headers: HashMap::new(), - llm_timeout_secs: 30, - model_id: "test-model".to_string(), - temperature: Some(0.0), - max_tokens: None, - model_extra: HashMap::new(), - max_tool_iterations: 4, - tool_result_max_chars: 20_000, - context_tool_result_trim_chars: 20_000, - }; - let session_manager = SessionManager::new( - 4, - 100, - false, - "Asia/Shanghai".to_string(), - provider_config.clone(), - HashMap::from([("default".to_string(), provider_config)]), - Arc::new(SkillRuntime::default()), - ) - .unwrap(); + let session_manager = test_session_manager(); let scheduler = Scheduler::new( MessageBus::new(8), SchedulerConfig { @@ -890,31 +1060,7 @@ mod tests { fn sync_config_jobs_persists_builtin_memory_maintenance_job() { let store = Arc::new(SessionStore::in_memory().unwrap()); - let provider_config = LLMProviderConfig { - provider_type: "openai".to_string(), - name: "default".to_string(), - base_url: "http://localhost".to_string(), - api_key: "test-key".to_string(), - extra_headers: HashMap::new(), - llm_timeout_secs: 30, - model_id: "test-model".to_string(), - temperature: Some(0.0), - max_tokens: None, - model_extra: HashMap::new(), - max_tool_iterations: 4, - tool_result_max_chars: 20_000, - context_tool_result_trim_chars: 20_000, - }; - let session_manager = SessionManager::new( - 4, - 100, - false, - "Asia/Shanghai".to_string(), - provider_config.clone(), - HashMap::from([("default".to_string(), provider_config)]), - Arc::new(SkillRuntime::default()), - ) - .unwrap(); + let session_manager = test_session_manager(); let scheduler = Scheduler::new( MessageBus::new(8), SchedulerConfig::default(), @@ -946,6 +1092,67 @@ mod tests { assert!(saved.next_fire_at.is_some()); } + #[tokio::test] + async fn silent_agent_task_failure_notifies_primary_chat() { + let store = Arc::new(SessionStore::in_memory().unwrap()); + let bus = MessageBus::new(8); + let scheduler = Scheduler::new( + bus.clone(), + SchedulerConfig { + enabled: true, + tick_resolution_ms: 1000, + worker_queue_capacity: 64, + misfire_policy: SchedulerMisfirePolicy::Skip, + jobs: Vec::new(), + }, + chrono_tz::Asia::Shanghai, + store, + test_session_manager(), + ); + + let job = RuntimeJob { + id: "agent.daily_summary.background".to_string(), + kind: SchedulerJobKind::SilentAgentTask, + schedule: SchedulerSchedule::Interval { + seconds: 300, + startup_delay_secs: 0, + }, + target: SchedulerJobTarget { + channel: Some("feishu".to_string()), + chat_id: Some("oc_demo".to_string()), + session_chat_id: Some("scheduler/agent.daily_summary.background".to_string()), + reply_to: None, + }, + payload: serde_json::json!({}), + enabled: true, + state: SchedulerJobState::Scheduled, + last_status: None, + last_error: None, + run_count: 0, + max_runs: None, + last_fired_at: None, + next_fire_at: None, + paused_at: None, + completed_at: None, + interval_secs: 300, + startup_delay_secs: 0, + }; + + let error = scheduler.execute_job(&job).await.unwrap_err(); + assert!(error.to_string().contains("payload.prompt")); + + let outbound = tokio::time::timeout( + std::time::Duration::from_millis(100), + bus.consume_outbound(), + ) + .await + .unwrap(); + assert_eq!(outbound.channel, "feishu"); + assert_eq!(outbound.chat_id, "oc_demo"); + assert!(outbound.content.contains("定时任务执行失败")); + assert!(outbound.content.contains("agent.daily_summary.background")); + } + #[test] fn cron_schedule_uses_configured_timezone() { let now = Utc.with_ymd_and_hms(2026, 4, 23, 18, 0, 0).single().unwrap(); diff --git a/src/tools/scheduler_manage.rs b/src/tools/scheduler_manage.rs index d9e8dff..e5b3c10 100644 --- a/src/tools/scheduler_manage.rs +++ b/src/tools/scheduler_manage.rs @@ -31,7 +31,7 @@ impl Tool for SchedulerManageTool { } fn description(&self) -> &str { - "Manage DB-backed scheduled jobs. Supports actions: list, get, put, delete, pause, resume. Jobs persist in SQLite and are executed by the scheduler runtime. When creating agent_task jobs, keep prompt/system_prompt focused on the work to perform; do not restate execution times unless the task logic truly depends on them, because the trigger already controls timing." + "Manage DB-backed scheduled jobs. Supports actions: list, get, put, delete, pause, resume. Jobs persist in SQLite and are executed by the scheduler runtime. When creating agent_task or silent_agent_task jobs, keep prompt/system_prompt focused on the work to perform; do not restate execution times unless the task logic truly depends on them, because the trigger already controls timing." } fn parameters_schema(&self) -> serde_json::Value { @@ -67,18 +67,19 @@ impl Tool for SchedulerManageTool { }, "kind": { "type": "string", - "enum": ["internal_event", "outbound_message", "agent_task"] + "enum": ["internal_event", "outbound_message", "agent_task", "silent_agent_task"] }, "schedule": { "type": "object", "description": "Schedule object, for example {type: 'interval', seconds: 300} or {type: 'cron', expression: '0 9 * * *'}" }, "target": { - "type": "object" + "type": "object", + "description": "Target routing. agent_task and silent_agent_task require channel and chat_id. silent_agent_task may optionally set session_chat_id to choose a dedicated background session; otherwise one is derived from the job id." }, "payload": { "type": "object", - "description": format!("Job payload. agent_task supports prompt, agent, fresh_session, system_prompt, sender_id, metadata. For agent_task, write prompt/system_prompt as execution instructions and avoid repeating schedule phrases or execution times such as 每天9点 or 每小时 unless the task itself must reason about time. {} outbound_message expects content. internal_event expects event.", agent_hint) + "description": format!("Job payload. agent_task and silent_agent_task support prompt, agent, fresh_session, system_prompt, sender_id, metadata. For these kinds, write prompt/system_prompt as execution instructions and avoid repeating schedule phrases or execution times such as 每天9点 or 每小时 unless the task itself must reason about time. {} outbound_message expects content. internal_event expects event.", agent_hint) }, "max_runs": { "type": ["integer", "null"] @@ -208,9 +209,9 @@ fn build_upsert( args.get("target").cloned().unwrap_or_else(|| json!({})), context, ); - if kind == "agent_task" { + if kind == "agent_task" || kind == "silent_agent_task" { validate_agent_task_payload(&payload, known_agents)?; - validate_target_fields(&target, &["channel", "chat_id"], "agent_task")?; + validate_target_fields(&target, &["channel", "chat_id"], &kind)?; } else if kind == "outbound_message" { validate_outbound_message_payload(&payload)?; validate_target_fields(&target, &["channel", "chat_id"], "outbound_message")?; @@ -482,6 +483,43 @@ mod tests { assert!(put_result.output.contains("agent_task")); } + #[tokio::test] + async fn test_scheduler_manage_put_silent_agent_task() { + let store = Arc::new(SessionStore::in_memory().unwrap()); + let tool = SchedulerManageTool::new(store.clone(), HashSet::from(["planner".to_string()])); + + let put_result = tool + .execute(json!({ + "action": "put", + "id": "agent.daily_summary.background", + "kind": "silent_agent_task", + "schedule": { + "type": "cron", + "expression": "0 9 * * *" + }, + "target": { + "channel": "feishu", + "chat_id": "oc_demo", + "session_chat_id": "scheduler/agent.daily_summary.background" + }, + "payload": { + "prompt": "请后台总结今天待办", + "agent": "planner" + } + })) + .await + .unwrap(); + + assert!(put_result.success); + + let saved = store + .get_scheduler_job("agent.daily_summary.background") + .unwrap() + .unwrap(); + assert_eq!(saved.kind, "silent_agent_task"); + assert_eq!(saved.target["session_chat_id"], "scheduler/agent.daily_summary.background"); + } + #[tokio::test] async fn test_scheduler_manage_rejects_outbound_message_without_target() { let store = Arc::new(SessionStore::in_memory().unwrap());