From 1ffdcab58567b3842548c0d677de409ea999a5ec Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Thu, 23 Apr 2026 08:45:32 +0800 Subject: [PATCH] =?UTF-8?q?feat(scheduler):=20=E6=B7=BB=E5=8A=A0=20agent?= =?UTF-8?q?=5Ftask=20=E7=B1=BB=E5=9E=8B=E6=94=AF=E6=8C=81=EF=BC=8C?= =?UTF-8?q?=E6=89=A9=E5=B1=95=E4=BB=BB=E5=8A=A1=E8=B0=83=E5=BA=A6=E5=8A=9F?= =?UTF-8?q?=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 31 +++++- docs/PERSISTENCE.md | 7 +- src/config/mod.rs | 61 +++++++++++ src/gateway/session.rs | 79 +++++++++++++++ src/scheduler/mod.rs | 184 ++++++++++++++++++++++++++++++++++ src/tools/scheduler_manage.rs | 34 ++++++- 6 files changed, 392 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 67faeb2..77ba850 100644 --- a/README.md +++ b/README.md @@ -90,7 +90,7 @@ Current behavior: - Scheduler runs as a background loop inside gateway lifecycle. - Job definitions and runtime state are persisted in SQLite instead of JSON files. - Supported schedule types: delay, interval, at, cron. -- Supported job kinds: internal_event, outbound_message. +- Supported job kinds: internal_event, outbound_message, agent_task. - Built-in internal event: session_cleanup, used to clear expired in-memory channel sessions. - Built-in management tool: scheduler_manage. @@ -113,6 +113,28 @@ Config example: "event": "session_cleanup" } }, + { + "id": "agent.daily_summary", + "kind": "agent_task", + "schedule": { + "type": "cron", + "expression": "30 18 * * *" + }, + "target": { + "channel": "feishu", + "chat_id": "oc_xxx" + }, + "payload": { + "prompt": "请总结今天的项目进展,并列出明天的优先事项", + "fresh_session": true, + "system_prompt": "你是日报助手,输出时先给摘要,再给待办。", + "sender_id": "scheduler-daily-summary", + "metadata": { + "job_type": "daily_summary", + "source": "scheduler" + } + } + }, { "id": "daily.reminder", "kind": "outbound_message", @@ -136,3 +158,10 @@ Runtime management: - Use scheduler_manage with action=list|get|put|delete|pause|resume. - Jobs created by the tool are written into SQLite and picked up by the scheduler loop. - Config-defined jobs are also synced into SQLite on startup. +- agent_task reuses the normal agent pipeline: it creates a synthetic user turn from payload.prompt and runs tools, persistence, and outbound rendering through SessionManager. +- agent_task payload fields: + - prompt: required, synthetic user input. + - fresh_session: optional, when true reset the active chat segment before running. + - system_prompt: optional, append a task-specific system message before the synthetic user turn. + - sender_id: optional, overrides the synthetic sender id used for tool context and memory scoping. + - metadata: optional, attached to outbound messages emitted by this task. diff --git a/docs/PERSISTENCE.md b/docs/PERSISTENCE.md index 58d01e2..c1603db 100644 --- a/docs/PERSISTENCE.md +++ b/docs/PERSISTENCE.md @@ -157,7 +157,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 | 字段 | 类型 | 含义 | | --- | --- | --- | | `id` | `TEXT PRIMARY KEY` | 任务唯一标识 | -| `kind` | `TEXT NOT NULL` | 任务类型,当前支持 `internal_event`、`outbound_message` | +| `kind` | `TEXT NOT NULL` | 任务类型,当前支持 `internal_event`、`outbound_message`、`agent_task` | | `schedule_json` | `TEXT NOT NULL` | 统一 schedule 定义,JSON 形式保存 `delay` / `interval` / `at` / `cron` | | `interval_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 | | `startup_delay_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 | @@ -183,6 +183,11 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 - repeating job 会在每次执行后更新 `run_count`、`last_fired_at`、`next_fire_at`。 - one-shot job(`delay` / `at`)完成后会进入 `completed` 状态,不再调度。 - 内置 `internal_event` 当前包含 `session_cleanup`,用于回收超时的内存 session 缓存。 +- `agent_task` 会把 `payload.prompt` 作为一次合成用户输入,交给 `SessionManager::run_scheduled_agent_task()` 执行,因此会复用持久化历史、工具调用和渠道下发链路。 +- `payload.fresh_session = true` 时,会先对目标 chat 执行一次逻辑 reset,再开始本次任务运行。 +- `payload.system_prompt` 会作为额外 system 消息写入本次任务上下文。 +- `payload.sender_id` 会覆盖默认的 `scheduler` 发送者标识。 +- `payload.metadata` 会映射到 outbound metadata,便于渠道侧做追踪或特殊处理。 ## 7. 数据写入流程 diff --git a/src/config/mod.rs b/src/config/mod.rs index b0b37c3..74cbd8a 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -197,6 +197,7 @@ pub struct SchedulerJobConfig { pub enum SchedulerJobKind { InternalEvent, OutboundMessage, + AgentTask, } #[derive(Debug, Clone, Deserialize, Serialize, Default)] @@ -837,18 +838,78 @@ mod tests { config.scheduler.jobs[0].resolved_schedule().unwrap(), SchedulerSchedule::Delay { seconds: 30 } ); + assert_eq!(config.scheduler.jobs[0].kind, SchedulerJobKind::InternalEvent); assert_eq!( config.scheduler.jobs[1].resolved_schedule().unwrap(), SchedulerSchedule::At { timestamp: "2026-04-23T09:00:00+00:00".to_string(), } ); + assert_eq!(config.scheduler.jobs[1].kind, SchedulerJobKind::OutboundMessage); assert_eq!( config.scheduler.jobs[2].resolved_schedule().unwrap(), SchedulerSchedule::Cron { expression: "0 9 * * *".to_string(), } ); + assert_eq!(config.scheduler.jobs[2].kind, SchedulerJobKind::InternalEvent); + } + + #[test] + fn test_scheduler_config_loads_agent_task_job() { + let file = tempfile::NamedTempFile::new().unwrap(); + std::fs::write( + file.path(), + r#"{ + "providers": { + "aliyun": { + "type": "openai", + "base_url": "https://example.invalid/v1", + "api_key": "test-key", + "extra_headers": {} + } + }, + "models": { + "qwen-plus": { + "model_id": "qwen-plus" + } + }, + "agents": { + "default": { + "provider": "aliyun", + "model": "qwen-plus" + } + }, + "scheduler": { + "enabled": true, + "jobs": [ + { + "id": "agent.daily_summary", + "kind": "agent_task", + "schedule": { + "type": "cron", + "expression": "0 9 * * *" + }, + "target": { + "channel": "feishu", + "chat_id": "oc_demo" + }, + "payload": { + "prompt": "请总结今天待办" + } + } + ] + } +}"#, + ) + .unwrap(); + + let config = Config::load(file.path().to_str().unwrap()).unwrap(); + let job = &config.scheduler.jobs[0]; + assert_eq!(job.kind, SchedulerJobKind::AgentTask); + assert_eq!(job.target.channel.as_deref(), Some("feishu")); + assert_eq!(job.target.chat_id.as_deref(), Some("oc_demo")); + assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办")); } #[test] diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 0f67d43..ceffeda 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -43,6 +43,14 @@ pub struct BusToolCallEmitter { show_tool_results: bool, } +#[derive(Debug, Clone, Default)] +pub struct ScheduledAgentTaskOptions { + pub sender_id: Option, + pub fresh_session: bool, + pub system_prompt: Option, + pub metadata: HashMap, +} + impl BusToolCallEmitter { pub fn new( bus: Arc, @@ -709,6 +717,77 @@ impl SessionManager { Ok(response) } + pub async fn run_scheduled_agent_task( + &self, + channel_name: &str, + chat_id: &str, + prompt: &str, + options: ScheduledAgentTaskOptions, + ) -> Result, AgentError> { + self.ensure_session(channel_name).await?; + self.touch(channel_name).await; + + let session = self + .get(channel_name) + .await + .ok_or_else(|| AgentError::Other("Session not found".to_string()))?; + + let sender_id = options + .sender_id + .clone() + .unwrap_or_else(|| "scheduler".to_string()); + + let response = { + let mut session_guard = session.lock().await; + + session_guard.ensure_persistent_session(chat_id)?; + + if options.fresh_session { + session_guard.reset_chat_context(chat_id)?; + } + + session_guard.ensure_chat_loaded(chat_id)?; + session_guard.ensure_agent_prompt_before_user_message(chat_id)?; + + if let Some(system_prompt) = options.system_prompt.as_deref() { + session_guard.append_persisted_message(chat_id, ChatMessage::system(system_prompt))?; + } + + let user_message = session_guard.create_user_message(prompt, Vec::new()); + let user_message_id = user_message.id.clone(); + session_guard.append_persisted_message(chat_id, user_message)?; + + let history = session_guard.get_or_create_history(chat_id).clone(); + let history = session_guard + .compressor + .compress_if_needed(history, &session_guard.provider_config) + .await?; + + session_guard.record_skill_offer(chat_id)?; + + let agent = session_guard.create_agent(chat_id, Some(&sender_id), Some(&user_message_id))?; + let result = agent.process(history).await?; + session_guard.append_persisted_messages(chat_id, result.emitted_messages.clone())?; + + result + .emitted_messages + .iter() + .filter(|message| should_display_message_to_user(self.show_tool_results, message)) + .flat_map(|message| { + OutboundMessage::from_chat_message( + channel_name, + chat_id, + None, + &options.metadata, + message, + ) + }) + .collect::>() + }; + + Ok(response) + } + /// 清除指定 session 的所有历史 pub async fn clear_session_history(&self, channel_name: &str) -> Result<(), AgentError> { if let Some(session) = self.get(channel_name).await { diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 054d338..eb8e5aa 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -11,6 +11,7 @@ use crate::config::{ SchedulerSchedule, }; use crate::gateway::session::SessionManager; +use crate::gateway::session::ScheduledAgentTaskOptions; use crate::storage::{ SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore, }; @@ -141,6 +142,12 @@ impl Scheduler { SchedulerJobKind::InternalEvent => { execute_internal_event(&self.session_manager, job).await?; } + SchedulerJobKind::AgentTask => { + let outbound_messages = execute_agent_task(&self.session_manager, job).await?; + for message in outbound_messages { + self.bus.publish_outbound(message).await?; + } + } } Ok(()) @@ -214,6 +221,7 @@ impl RuntimeJob { let kind = match record.kind.as_str() { "internal_event" => SchedulerJobKind::InternalEvent, "outbound_message" => SchedulerJobKind::OutboundMessage, + "agent_task" => SchedulerJobKind::AgentTask, other => { tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind"); return Ok(None); @@ -296,6 +304,7 @@ impl RuntimeJob { kind: match self.kind { SchedulerJobKind::InternalEvent => "internal_event".to_string(), SchedulerJobKind::OutboundMessage => "outbound_message".to_string(), + SchedulerJobKind::AgentTask => "agent_task".to_string(), }, schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})), interval_secs: self.interval_secs, @@ -465,6 +474,181 @@ async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJ } } +async fn execute_agent_task( + session_manager: &SessionManager, + job: &RuntimeJob, +) -> anyhow::Result> { + let channel_name = job + .target + .channel + .as_deref() + .ok_or_else(|| anyhow::anyhow!("agent_task requires target.channel"))?; + let chat_id = job + .target + .chat_id + .as_deref() + .ok_or_else(|| anyhow::anyhow!("agent_task requires target.chat_id"))?; + let prompt = job + .payload + .get("prompt") + .and_then(|value| value.as_str()) + .ok_or_else(|| anyhow::anyhow!("agent_task payload.prompt must be a string"))?; + let options = parse_scheduled_agent_task_options(job)?; + + session_manager + .run_scheduled_agent_task(channel_name, chat_id, prompt, options) + .await + .map_err(|error| anyhow::anyhow!(error.to_string())) +} + +fn parse_scheduled_agent_task_options(job: &RuntimeJob) -> anyhow::Result { + let sender_id = job + .payload + .get("sender_id") + .and_then(|value| value.as_str()) + .map(ToString::to_string); + let fresh_session = job + .payload + .get("fresh_session") + .and_then(|value| value.as_bool()) + .unwrap_or(false); + let system_prompt = job + .payload + .get("system_prompt") + .and_then(|value| value.as_str()) + .map(ToString::to_string); + let metadata = parse_metadata_map(job.payload.get("metadata"))?; + + Ok(ScheduledAgentTaskOptions { + sender_id, + fresh_session, + system_prompt, + metadata, + }) +} + +fn parse_metadata_map(value: Option<&serde_json::Value>) -> anyhow::Result> { + let Some(value) = value else { + return Ok(HashMap::new()); + }; + + let object = value + .as_object() + .ok_or_else(|| anyhow::anyhow!("agent_task payload.metadata must be an object"))?; + let mut metadata = HashMap::with_capacity(object.len()); + + for (key, value) in object { + let stringified = match value { + serde_json::Value::String(inner) => inner.clone(), + serde_json::Value::Null => "null".to_string(), + serde_json::Value::Bool(inner) => inner.to_string(), + serde_json::Value::Number(inner) => inner.to_string(), + _ => { + return Err(anyhow::anyhow!( + "agent_task payload.metadata field '{}' must be a string, number, bool, or null", + key + )) + } + }; + metadata.insert(key.clone(), stringified); + } + + Ok(metadata) +} + +#[cfg(test)] +mod agent_task_tests { + use super::*; + + #[test] + fn runtime_job_from_record_supports_agent_task_kind() { + let record = SchedulerJobRecord { + id: "agent.daily_summary".to_string(), + kind: "agent_task".to_string(), + schedule: serde_json::json!({ + "type": "interval", + "seconds": 300 + }), + interval_secs: 0, + startup_delay_secs: 0, + target: serde_json::json!({ + "channel": "feishu", + "chat_id": "oc_demo" + }), + payload: serde_json::json!({ + "prompt": "请总结今天待办" + }), + enabled: true, + state: SchedulerJobState::Scheduled, + last_status: None, + last_error: None, + run_count: 0, + max_runs: None, + last_fired_at: None, + next_fire_at: Some(1_700_000_010_000), + paused_at: None, + completed_at: None, + created_at: 1_700_000_000_000, + updated_at: 1_700_000_000_000, + }; + + let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip) + .unwrap() + .unwrap(); + + assert_eq!(job.kind, SchedulerJobKind::AgentTask); + assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办")); + } + + #[test] + fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() { + let job = RuntimeJob { + id: "agent.daily_summary".to_string(), + kind: SchedulerJobKind::AgentTask, + schedule: SchedulerSchedule::Interval { + seconds: 300, + startup_delay_secs: 0, + }, + target: SchedulerJobTarget { + channel: Some("feishu".to_string()), + chat_id: Some("oc_demo".to_string()), + reply_to: None, + }, + payload: serde_json::json!({ + "prompt": "请总结今天待办", + "sender_id": "scheduler-bot", + "fresh_session": true, + "system_prompt": "你是日报助手", + "metadata": { + "job_type": "daily_summary", + "priority": 1, + "urgent": false + } + }), + enabled: true, + state: SchedulerJobState::Scheduled, + last_status: None, + last_error: None, + run_count: 0, + max_runs: None, + last_fired_at: None, + next_fire_at: None, + paused_at: None, + completed_at: None, + interval_secs: 300, + startup_delay_secs: 0, + }; + + let options = parse_scheduled_agent_task_options(&job).unwrap(); + assert_eq!(options.sender_id.as_deref(), Some("scheduler-bot")); + assert!(options.fresh_session); + assert_eq!(options.system_prompt.as_deref(), Some("你是日报助手")); + assert_eq!(options.metadata.get("job_type").map(String::as_str), Some("daily_summary")); + assert_eq!(options.metadata.get("priority").map(String::as_str), Some("1")); + assert_eq!(options.metadata.get("urgent").map(String::as_str), Some("false")); + } +} + impl TryFrom for SchedulerJobTarget { type Error = anyhow::Error; diff --git a/src/tools/scheduler_manage.rs b/src/tools/scheduler_manage.rs index a73a378..144fd5d 100644 --- a/src/tools/scheduler_manage.rs +++ b/src/tools/scheduler_manage.rs @@ -47,7 +47,7 @@ impl Tool for SchedulerManageTool { }, "kind": { "type": "string", - "enum": ["internal_event", "outbound_message"] + "enum": ["internal_event", "outbound_message", "agent_task"] }, "schedule": { "type": "object", @@ -57,7 +57,8 @@ impl Tool for SchedulerManageTool { "type": "object" }, "payload": { - "type": "object" + "type": "object", + "description": "Job payload. agent_task supports prompt, fresh_session, system_prompt, sender_id, metadata. outbound_message expects content. internal_event expects event." }, "max_runs": { "type": ["integer", "null"] @@ -288,4 +289,33 @@ mod tests { assert!(get_result.output.contains("heartbeat")); assert!(get_result.output.contains("outbound_message")); } + + #[tokio::test] + async fn test_scheduler_manage_put_agent_task() { + let store = Arc::new(SessionStore::in_memory().unwrap()); + let tool = SchedulerManageTool::new(store); + + let put_result = tool + .execute(json!({ + "action": "put", + "id": "agent.daily_summary", + "kind": "agent_task", + "schedule": { + "type": "cron", + "expression": "0 9 * * *" + }, + "target": { + "channel": "feishu", + "chat_id": "oc_demo" + }, + "payload": { + "prompt": "请总结今天待办" + } + })) + .await + .unwrap(); + + assert!(put_result.success); + assert!(put_result.output.contains("agent_task")); + } } \ No newline at end of file