feat(scheduler): 添加 agent_task 类型支持,扩展任务调度功能

This commit is contained in:
ooodc 2026-04-23 08:45:32 +08:00
parent 73840c608c
commit 1ffdcab585
6 changed files with 392 additions and 4 deletions

View File

@ -90,7 +90,7 @@ Current behavior:
- Scheduler runs as a background loop inside gateway lifecycle. - Scheduler runs as a background loop inside gateway lifecycle.
- Job definitions and runtime state are persisted in SQLite instead of JSON files. - Job definitions and runtime state are persisted in SQLite instead of JSON files.
- Supported schedule types: delay, interval, at, cron. - Supported schedule types: delay, interval, at, cron.
- Supported job kinds: internal_event, outbound_message. - Supported job kinds: internal_event, outbound_message, agent_task.
- Built-in internal event: session_cleanup, used to clear expired in-memory channel sessions. - Built-in internal event: session_cleanup, used to clear expired in-memory channel sessions.
- Built-in management tool: scheduler_manage. - Built-in management tool: scheduler_manage.
@ -113,6 +113,28 @@ Config example:
"event": "session_cleanup" "event": "session_cleanup"
} }
}, },
{
"id": "agent.daily_summary",
"kind": "agent_task",
"schedule": {
"type": "cron",
"expression": "30 18 * * *"
},
"target": {
"channel": "feishu",
"chat_id": "oc_xxx"
},
"payload": {
"prompt": "请总结今天的项目进展,并列出明天的优先事项",
"fresh_session": true,
"system_prompt": "你是日报助手,输出时先给摘要,再给待办。",
"sender_id": "scheduler-daily-summary",
"metadata": {
"job_type": "daily_summary",
"source": "scheduler"
}
}
},
{ {
"id": "daily.reminder", "id": "daily.reminder",
"kind": "outbound_message", "kind": "outbound_message",
@ -136,3 +158,10 @@ Runtime management:
- Use scheduler_manage with action=list|get|put|delete|pause|resume. - Use scheduler_manage with action=list|get|put|delete|pause|resume.
- Jobs created by the tool are written into SQLite and picked up by the scheduler loop. - Jobs created by the tool are written into SQLite and picked up by the scheduler loop.
- Config-defined jobs are also synced into SQLite on startup. - Config-defined jobs are also synced into SQLite on startup.
- agent_task reuses the normal agent pipeline: it creates a synthetic user turn from payload.prompt and runs tools, persistence, and outbound rendering through SessionManager.
- agent_task payload fields:
- prompt: required, synthetic user input.
- fresh_session: optional, when true reset the active chat segment before running.
- system_prompt: optional, append a task-specific system message before the synthetic user turn.
- sender_id: optional, overrides the synthetic sender id used for tool context and memory scoping.
- metadata: optional, attached to outbound messages emitted by this task.

View File

@ -157,7 +157,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
| 字段 | 类型 | 含义 | | 字段 | 类型 | 含义 |
| --- | --- | --- | | --- | --- | --- |
| `id` | `TEXT PRIMARY KEY` | 任务唯一标识 | | `id` | `TEXT PRIMARY KEY` | 任务唯一标识 |
| `kind` | `TEXT NOT NULL` | 任务类型,当前支持 `internal_event``outbound_message` | | `kind` | `TEXT NOT NULL` | 任务类型,当前支持 `internal_event``outbound_message``agent_task` |
| `schedule_json` | `TEXT NOT NULL` | 统一 schedule 定义JSON 形式保存 `delay` / `interval` / `at` / `cron` | | `schedule_json` | `TEXT NOT NULL` | 统一 schedule 定义JSON 形式保存 `delay` / `interval` / `at` / `cron` |
| `interval_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 | | `interval_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 |
| `startup_delay_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 | | `startup_delay_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 |
@ -183,6 +183,11 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
- repeating job 会在每次执行后更新 `run_count``last_fired_at``next_fire_at` - repeating job 会在每次执行后更新 `run_count``last_fired_at``next_fire_at`
- one-shot job`delay` / `at`)完成后会进入 `completed` 状态,不再调度。 - one-shot job`delay` / `at`)完成后会进入 `completed` 状态,不再调度。
- 内置 `internal_event` 当前包含 `session_cleanup`,用于回收超时的内存 session 缓存。 - 内置 `internal_event` 当前包含 `session_cleanup`,用于回收超时的内存 session 缓存。
- `agent_task` 会把 `payload.prompt` 作为一次合成用户输入,交给 `SessionManager::run_scheduled_agent_task()` 执行,因此会复用持久化历史、工具调用和渠道下发链路。
- `payload.fresh_session = true` 时,会先对目标 chat 执行一次逻辑 reset再开始本次任务运行。
- `payload.system_prompt` 会作为额外 system 消息写入本次任务上下文。
- `payload.sender_id` 会覆盖默认的 `scheduler` 发送者标识。
- `payload.metadata` 会映射到 outbound metadata便于渠道侧做追踪或特殊处理。
## 7. 数据写入流程 ## 7. 数据写入流程

View File

@ -197,6 +197,7 @@ pub struct SchedulerJobConfig {
pub enum SchedulerJobKind { pub enum SchedulerJobKind {
InternalEvent, InternalEvent,
OutboundMessage, OutboundMessage,
AgentTask,
} }
#[derive(Debug, Clone, Deserialize, Serialize, Default)] #[derive(Debug, Clone, Deserialize, Serialize, Default)]
@ -837,18 +838,78 @@ mod tests {
config.scheduler.jobs[0].resolved_schedule().unwrap(), config.scheduler.jobs[0].resolved_schedule().unwrap(),
SchedulerSchedule::Delay { seconds: 30 } SchedulerSchedule::Delay { seconds: 30 }
); );
assert_eq!(config.scheduler.jobs[0].kind, SchedulerJobKind::InternalEvent);
assert_eq!( assert_eq!(
config.scheduler.jobs[1].resolved_schedule().unwrap(), config.scheduler.jobs[1].resolved_schedule().unwrap(),
SchedulerSchedule::At { SchedulerSchedule::At {
timestamp: "2026-04-23T09:00:00+00:00".to_string(), timestamp: "2026-04-23T09:00:00+00:00".to_string(),
} }
); );
assert_eq!(config.scheduler.jobs[1].kind, SchedulerJobKind::OutboundMessage);
assert_eq!( assert_eq!(
config.scheduler.jobs[2].resolved_schedule().unwrap(), config.scheduler.jobs[2].resolved_schedule().unwrap(),
SchedulerSchedule::Cron { SchedulerSchedule::Cron {
expression: "0 9 * * *".to_string(), expression: "0 9 * * *".to_string(),
} }
); );
assert_eq!(config.scheduler.jobs[2].kind, SchedulerJobKind::InternalEvent);
}
#[test]
fn test_scheduler_config_loads_agent_task_job() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
},
"scheduler": {
"enabled": true,
"jobs": [
{
"id": "agent.daily_summary",
"kind": "agent_task",
"schedule": {
"type": "cron",
"expression": "0 9 * * *"
},
"target": {
"channel": "feishu",
"chat_id": "oc_demo"
},
"payload": {
"prompt": "请总结今天待办"
}
}
]
}
}"#,
)
.unwrap();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
let job = &config.scheduler.jobs[0];
assert_eq!(job.kind, SchedulerJobKind::AgentTask);
assert_eq!(job.target.channel.as_deref(), Some("feishu"));
assert_eq!(job.target.chat_id.as_deref(), Some("oc_demo"));
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
} }
#[test] #[test]

View File

@ -43,6 +43,14 @@ pub struct BusToolCallEmitter {
show_tool_results: bool, show_tool_results: bool,
} }
#[derive(Debug, Clone, Default)]
pub struct ScheduledAgentTaskOptions {
pub sender_id: Option<String>,
pub fresh_session: bool,
pub system_prompt: Option<String>,
pub metadata: HashMap<String, String>,
}
impl BusToolCallEmitter { impl BusToolCallEmitter {
pub fn new( pub fn new(
bus: Arc<MessageBus>, bus: Arc<MessageBus>,
@ -709,6 +717,77 @@ impl SessionManager {
Ok(response) Ok(response)
} }
pub async fn run_scheduled_agent_task(
&self,
channel_name: &str,
chat_id: &str,
prompt: &str,
options: ScheduledAgentTaskOptions,
) -> Result<Vec<OutboundMessage>, AgentError> {
self.ensure_session(channel_name).await?;
self.touch(channel_name).await;
let session = self
.get(channel_name)
.await
.ok_or_else(|| AgentError::Other("Session not found".to_string()))?;
let sender_id = options
.sender_id
.clone()
.unwrap_or_else(|| "scheduler".to_string());
let response = {
let mut session_guard = session.lock().await;
session_guard.ensure_persistent_session(chat_id)?;
if options.fresh_session {
session_guard.reset_chat_context(chat_id)?;
}
session_guard.ensure_chat_loaded(chat_id)?;
session_guard.ensure_agent_prompt_before_user_message(chat_id)?;
if let Some(system_prompt) = options.system_prompt.as_deref() {
session_guard.append_persisted_message(chat_id, ChatMessage::system(system_prompt))?;
}
let user_message = session_guard.create_user_message(prompt, Vec::new());
let user_message_id = user_message.id.clone();
session_guard.append_persisted_message(chat_id, user_message)?;
let history = session_guard.get_or_create_history(chat_id).clone();
let history = session_guard
.compressor
.compress_if_needed(history, &session_guard.provider_config)
.await?;
session_guard.record_skill_offer(chat_id)?;
let agent = session_guard.create_agent(chat_id, Some(&sender_id), Some(&user_message_id))?;
let result = agent.process(history).await?;
session_guard.append_persisted_messages(chat_id, result.emitted_messages.clone())?;
result
.emitted_messages
.iter()
.filter(|message| should_display_message_to_user(self.show_tool_results, message))
.flat_map(|message| {
OutboundMessage::from_chat_message(
channel_name,
chat_id,
None,
&options.metadata,
message,
)
})
.collect::<Vec<_>>()
};
Ok(response)
}
/// 清除指定 session 的所有历史 /// 清除指定 session 的所有历史
pub async fn clear_session_history(&self, channel_name: &str) -> Result<(), AgentError> { pub async fn clear_session_history(&self, channel_name: &str) -> Result<(), AgentError> {
if let Some(session) = self.get(channel_name).await { if let Some(session) = self.get(channel_name).await {

View File

@ -11,6 +11,7 @@ use crate::config::{
SchedulerSchedule, SchedulerSchedule,
}; };
use crate::gateway::session::SessionManager; use crate::gateway::session::SessionManager;
use crate::gateway::session::ScheduledAgentTaskOptions;
use crate::storage::{ use crate::storage::{
SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore, SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore,
}; };
@ -141,6 +142,12 @@ impl Scheduler {
SchedulerJobKind::InternalEvent => { SchedulerJobKind::InternalEvent => {
execute_internal_event(&self.session_manager, job).await?; execute_internal_event(&self.session_manager, job).await?;
} }
SchedulerJobKind::AgentTask => {
let outbound_messages = execute_agent_task(&self.session_manager, job).await?;
for message in outbound_messages {
self.bus.publish_outbound(message).await?;
}
}
} }
Ok(()) Ok(())
@ -214,6 +221,7 @@ impl RuntimeJob {
let kind = match record.kind.as_str() { let kind = match record.kind.as_str() {
"internal_event" => SchedulerJobKind::InternalEvent, "internal_event" => SchedulerJobKind::InternalEvent,
"outbound_message" => SchedulerJobKind::OutboundMessage, "outbound_message" => SchedulerJobKind::OutboundMessage,
"agent_task" => SchedulerJobKind::AgentTask,
other => { other => {
tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind"); tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind");
return Ok(None); return Ok(None);
@ -296,6 +304,7 @@ impl RuntimeJob {
kind: match self.kind { kind: match self.kind {
SchedulerJobKind::InternalEvent => "internal_event".to_string(), SchedulerJobKind::InternalEvent => "internal_event".to_string(),
SchedulerJobKind::OutboundMessage => "outbound_message".to_string(), SchedulerJobKind::OutboundMessage => "outbound_message".to_string(),
SchedulerJobKind::AgentTask => "agent_task".to_string(),
}, },
schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})), schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})),
interval_secs: self.interval_secs, interval_secs: self.interval_secs,
@ -465,6 +474,181 @@ async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJ
} }
} }
async fn execute_agent_task(
session_manager: &SessionManager,
job: &RuntimeJob,
) -> anyhow::Result<Vec<OutboundMessage>> {
let channel_name = job
.target
.channel
.as_deref()
.ok_or_else(|| anyhow::anyhow!("agent_task requires target.channel"))?;
let chat_id = job
.target
.chat_id
.as_deref()
.ok_or_else(|| anyhow::anyhow!("agent_task requires target.chat_id"))?;
let prompt = job
.payload
.get("prompt")
.and_then(|value| value.as_str())
.ok_or_else(|| anyhow::anyhow!("agent_task payload.prompt must be a string"))?;
let options = parse_scheduled_agent_task_options(job)?;
session_manager
.run_scheduled_agent_task(channel_name, chat_id, prompt, options)
.await
.map_err(|error| anyhow::anyhow!(error.to_string()))
}
fn parse_scheduled_agent_task_options(job: &RuntimeJob) -> anyhow::Result<ScheduledAgentTaskOptions> {
let sender_id = job
.payload
.get("sender_id")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let fresh_session = job
.payload
.get("fresh_session")
.and_then(|value| value.as_bool())
.unwrap_or(false);
let system_prompt = job
.payload
.get("system_prompt")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let metadata = parse_metadata_map(job.payload.get("metadata"))?;
Ok(ScheduledAgentTaskOptions {
sender_id,
fresh_session,
system_prompt,
metadata,
})
}
fn parse_metadata_map(value: Option<&serde_json::Value>) -> anyhow::Result<HashMap<String, String>> {
let Some(value) = value else {
return Ok(HashMap::new());
};
let object = value
.as_object()
.ok_or_else(|| anyhow::anyhow!("agent_task payload.metadata must be an object"))?;
let mut metadata = HashMap::with_capacity(object.len());
for (key, value) in object {
let stringified = match value {
serde_json::Value::String(inner) => inner.clone(),
serde_json::Value::Null => "null".to_string(),
serde_json::Value::Bool(inner) => inner.to_string(),
serde_json::Value::Number(inner) => inner.to_string(),
_ => {
return Err(anyhow::anyhow!(
"agent_task payload.metadata field '{}' must be a string, number, bool, or null",
key
))
}
};
metadata.insert(key.clone(), stringified);
}
Ok(metadata)
}
#[cfg(test)]
mod agent_task_tests {
use super::*;
#[test]
fn runtime_job_from_record_supports_agent_task_kind() {
let record = SchedulerJobRecord {
id: "agent.daily_summary".to_string(),
kind: "agent_task".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 300
}),
interval_secs: 0,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "feishu",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({
"prompt": "请总结今天待办"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: Some(1_700_000_010_000),
paused_at: None,
completed_at: None,
created_at: 1_700_000_000_000,
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip)
.unwrap()
.unwrap();
assert_eq!(job.kind, SchedulerJobKind::AgentTask);
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
}
#[test]
fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() {
let job = RuntimeJob {
id: "agent.daily_summary".to_string(),
kind: SchedulerJobKind::AgentTask,
schedule: SchedulerSchedule::Interval {
seconds: 300,
startup_delay_secs: 0,
},
target: SchedulerJobTarget {
channel: Some("feishu".to_string()),
chat_id: Some("oc_demo".to_string()),
reply_to: None,
},
payload: serde_json::json!({
"prompt": "请总结今天待办",
"sender_id": "scheduler-bot",
"fresh_session": true,
"system_prompt": "你是日报助手",
"metadata": {
"job_type": "daily_summary",
"priority": 1,
"urgent": false
}
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
interval_secs: 300,
startup_delay_secs: 0,
};
let options = parse_scheduled_agent_task_options(&job).unwrap();
assert_eq!(options.sender_id.as_deref(), Some("scheduler-bot"));
assert!(options.fresh_session);
assert_eq!(options.system_prompt.as_deref(), Some("你是日报助手"));
assert_eq!(options.metadata.get("job_type").map(String::as_str), Some("daily_summary"));
assert_eq!(options.metadata.get("priority").map(String::as_str), Some("1"));
assert_eq!(options.metadata.get("urgent").map(String::as_str), Some("false"));
}
}
impl TryFrom<serde_json::Value> for SchedulerJobTarget { impl TryFrom<serde_json::Value> for SchedulerJobTarget {
type Error = anyhow::Error; type Error = anyhow::Error;

View File

@ -47,7 +47,7 @@ impl Tool for SchedulerManageTool {
}, },
"kind": { "kind": {
"type": "string", "type": "string",
"enum": ["internal_event", "outbound_message"] "enum": ["internal_event", "outbound_message", "agent_task"]
}, },
"schedule": { "schedule": {
"type": "object", "type": "object",
@ -57,7 +57,8 @@ impl Tool for SchedulerManageTool {
"type": "object" "type": "object"
}, },
"payload": { "payload": {
"type": "object" "type": "object",
"description": "Job payload. agent_task supports prompt, fresh_session, system_prompt, sender_id, metadata. outbound_message expects content. internal_event expects event."
}, },
"max_runs": { "max_runs": {
"type": ["integer", "null"] "type": ["integer", "null"]
@ -288,4 +289,33 @@ mod tests {
assert!(get_result.output.contains("heartbeat")); assert!(get_result.output.contains("heartbeat"));
assert!(get_result.output.contains("outbound_message")); assert!(get_result.output.contains("outbound_message"));
} }
#[tokio::test]
async fn test_scheduler_manage_put_agent_task() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let tool = SchedulerManageTool::new(store);
let put_result = tool
.execute(json!({
"action": "put",
"id": "agent.daily_summary",
"kind": "agent_task",
"schedule": {
"type": "cron",
"expression": "0 9 * * *"
},
"target": {
"channel": "feishu",
"chat_id": "oc_demo"
},
"payload": {
"prompt": "请总结今天待办"
}
}))
.await
.unwrap();
assert!(put_result.success);
assert!(put_result.output.contains("agent_task"));
}
} }