feat: 添加silent_agent_task类型的调度任务,支持后台执行并优化通知机制
This commit is contained in:
parent
4fb102644e
commit
8c17af1209
40
README.md
40
README.md
@ -375,6 +375,7 @@ PicoBot 带有一个基于 SQLite 的调度器,而不是纯内存或 JSON 文
|
|||||||
- internal_event:内部事件
|
- internal_event:内部事件
|
||||||
- outbound_message:直接向目标通道发消息
|
- outbound_message:直接向目标通道发消息
|
||||||
- agent_task:构造一次合成用户输入,复用完整 Agent 流程执行
|
- agent_task:构造一次合成用户输入,复用完整 Agent 流程执行
|
||||||
|
- silent_agent_task:在独立后台会话中执行 Agent 流程,成功不推送,失败回主 chat 通知
|
||||||
|
|
||||||
agent_task 会复用正常链路中的这些能力:
|
agent_task 会复用正常链路中的这些能力:
|
||||||
|
|
||||||
@ -384,6 +385,14 @@ agent_task 会复用正常链路中的这些能力:
|
|||||||
- 会话持久化
|
- 会话持久化
|
||||||
- 渠道消息下发
|
- 渠道消息下发
|
||||||
|
|
||||||
|
silent_agent_task 和 agent_task 使用同一套 Agent 执行能力,但路由语义不同:
|
||||||
|
|
||||||
|
- target.chat_id 仍表示用户通知目标
|
||||||
|
- target.session_chat_id 表示后台任务实际写入的会话;如果省略,会稳定派生为 scheduler/{job_id}
|
||||||
|
- 成功执行后不会向用户发送正常结果
|
||||||
|
- 执行失败时会向主 chat 发送一条失败通知,便于用户感知异常
|
||||||
|
- 后台任务的历史、压缩和会话内上下文会留在独立会话中,不污染主会话
|
||||||
|
|
||||||
### 9.3 运行时管理
|
### 9.3 运行时管理
|
||||||
|
|
||||||
通过 scheduler_manage 可以进行:
|
通过 scheduler_manage 可以进行:
|
||||||
@ -439,12 +448,43 @@ agent_task 会复用正常链路中的这些能力:
|
|||||||
"source": "scheduler"
|
"source": "scheduler"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"id": "agent.weekly_report.background",
|
||||||
|
"kind": "silent_agent_task",
|
||||||
|
"schedule": {
|
||||||
|
"type": "cron",
|
||||||
|
"expression": "0 8 * * 1"
|
||||||
|
},
|
||||||
|
"target": {
|
||||||
|
"channel": "feishu",
|
||||||
|
"chat_id": "oc_xxx",
|
||||||
|
"session_chat_id": "scheduler/agent.weekly_report.background"
|
||||||
|
},
|
||||||
|
"payload": {
|
||||||
|
"prompt": "请后台整理上周项目进展,输出结构化周报草稿,重点标出风险、阻塞项和下周优先级。",
|
||||||
|
"agent": "default",
|
||||||
|
"fresh_session": false,
|
||||||
|
"system_prompt": "你是周报助手,只在后台整理,不要面向用户寒暄。",
|
||||||
|
"sender_id": "scheduler-weekly-report",
|
||||||
|
"metadata": {
|
||||||
|
"job_type": "weekly_report_background",
|
||||||
|
"source": "scheduler"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
```
|
```
|
||||||
|
|
||||||
|
推荐场景:
|
||||||
|
|
||||||
|
- agent_task:用户需要直接收到结果,例如日报提醒、定时播报、定时外发通知
|
||||||
|
- silent_agent_task:任务需要长期积累独立上下文或后台整理材料,但不应污染主会话,例如周报草稿整理、周期性资料汇总、后台分析任务
|
||||||
|
|
||||||
## 10. 渠道与运行方式
|
## 10. 渠道与运行方式
|
||||||
|
|
||||||
### 10.1 当前支持的通道
|
### 10.1 当前支持的通道
|
||||||
|
|||||||
@ -252,6 +252,7 @@ pub enum SchedulerJobKind {
|
|||||||
InternalEvent,
|
InternalEvent,
|
||||||
OutboundMessage,
|
OutboundMessage,
|
||||||
AgentTask,
|
AgentTask,
|
||||||
|
SilentAgentTask,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
|
#[derive(Debug, Clone, Deserialize, Serialize, Default)]
|
||||||
@ -261,6 +262,8 @@ pub struct SchedulerJobTarget {
|
|||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub chat_id: Option<String>,
|
pub chat_id: Option<String>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
pub session_chat_id: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
pub reply_to: Option<String>,
|
pub reply_to: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1413,6 +1416,68 @@ mod tests {
|
|||||||
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
|
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn test_scheduler_config_loads_silent_agent_task_job() {
|
||||||
|
let file = tempfile::NamedTempFile::new().unwrap();
|
||||||
|
std::fs::write(
|
||||||
|
file.path(),
|
||||||
|
r#"{
|
||||||
|
"providers": {
|
||||||
|
"aliyun": {
|
||||||
|
"type": "openai",
|
||||||
|
"base_url": "https://example.invalid/v1",
|
||||||
|
"api_key": "test-key",
|
||||||
|
"extra_headers": {}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"models": {
|
||||||
|
"qwen-plus": {
|
||||||
|
"model_id": "qwen-plus"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"agents": {
|
||||||
|
"default": {
|
||||||
|
"provider": "aliyun",
|
||||||
|
"model": "qwen-plus"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"scheduler": {
|
||||||
|
"enabled": true,
|
||||||
|
"jobs": [
|
||||||
|
{
|
||||||
|
"id": "agent.daily_summary.background",
|
||||||
|
"kind": "silent_agent_task",
|
||||||
|
"schedule": {
|
||||||
|
"type": "cron",
|
||||||
|
"expression": "0 9 * * *"
|
||||||
|
},
|
||||||
|
"target": {
|
||||||
|
"channel": "feishu",
|
||||||
|
"chat_id": "oc_demo",
|
||||||
|
"session_chat_id": "scheduler/agent.daily_summary.background"
|
||||||
|
},
|
||||||
|
"payload": {
|
||||||
|
"prompt": "请后台总结今天待办"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}"#,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let config = Config::load(file.path().to_str().unwrap()).unwrap();
|
||||||
|
let job = &config.scheduler.jobs[0];
|
||||||
|
assert_eq!(job.kind, SchedulerJobKind::SilentAgentTask);
|
||||||
|
assert_eq!(job.target.channel.as_deref(), Some("feishu"));
|
||||||
|
assert_eq!(job.target.chat_id.as_deref(), Some("oc_demo"));
|
||||||
|
assert_eq!(
|
||||||
|
job.target.session_chat_id.as_deref(),
|
||||||
|
Some("scheduler/agent.daily_summary.background")
|
||||||
|
);
|
||||||
|
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请后台总结今天待办"));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_scheduler_schedule_validation_rejects_invalid_values() {
|
fn test_scheduler_schedule_validation_rejects_invalid_values() {
|
||||||
assert!(SchedulerSchedule::Delay { seconds: 0 }
|
assert!(SchedulerSchedule::Delay { seconds: 0 }
|
||||||
|
|||||||
@ -166,15 +166,65 @@ impl Scheduler {
|
|||||||
execute_internal_event(&self.session_manager, job).await?;
|
execute_internal_event(&self.session_manager, job).await?;
|
||||||
}
|
}
|
||||||
SchedulerJobKind::AgentTask => {
|
SchedulerJobKind::AgentTask => {
|
||||||
let outbound_messages = execute_agent_task(&self.session_manager, job).await?;
|
let outbound_messages = execute_agent_task(
|
||||||
|
&self.session_manager,
|
||||||
|
job,
|
||||||
|
required_notification_chat_id(job, "agent_task")?,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
for message in outbound_messages {
|
for message in outbound_messages {
|
||||||
self.bus.publish_outbound(message).await?;
|
self.bus.publish_outbound(message).await?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
SchedulerJobKind::SilentAgentTask => {
|
||||||
|
let execution_chat_id = resolve_execution_chat_id(job)?;
|
||||||
|
if let Err(error) = execute_agent_task(&self.session_manager, job, &execution_chat_id).await {
|
||||||
|
if let Err(notify_error) = self.notify_silent_agent_task_failure(job, &error).await {
|
||||||
|
tracing::error!(
|
||||||
|
job_id = %job.id,
|
||||||
|
error = %notify_error,
|
||||||
|
"Failed to publish silent scheduler failure notification"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn notify_silent_agent_task_failure(
|
||||||
|
&self,
|
||||||
|
job: &RuntimeJob,
|
||||||
|
error: &anyhow::Error,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let channel = job
|
||||||
|
.target
|
||||||
|
.channel
|
||||||
|
.clone()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("silent_agent_task requires target.channel"))?;
|
||||||
|
let chat_id = required_notification_chat_id(job, "silent_agent_task")?.to_string();
|
||||||
|
|
||||||
|
let mut metadata = HashMap::new();
|
||||||
|
metadata.insert("scheduler_job_id".to_string(), job.id.clone());
|
||||||
|
metadata.insert("scheduler_job_kind".to_string(), "silent_agent_task".to_string());
|
||||||
|
|
||||||
|
self.bus
|
||||||
|
.publish_outbound(OutboundMessage::assistant(
|
||||||
|
channel,
|
||||||
|
chat_id,
|
||||||
|
format!(
|
||||||
|
"定时任务执行失败:{}\n{}",
|
||||||
|
job.id,
|
||||||
|
summarize_scheduler_error(error)
|
||||||
|
),
|
||||||
|
job.target.reply_to.clone(),
|
||||||
|
metadata,
|
||||||
|
))
|
||||||
|
.await
|
||||||
|
.map_err(|error| anyhow::anyhow!(error.to_string()))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
@ -247,6 +297,7 @@ impl RuntimeJob {
|
|||||||
"internal_event" => SchedulerJobKind::InternalEvent,
|
"internal_event" => SchedulerJobKind::InternalEvent,
|
||||||
"outbound_message" => SchedulerJobKind::OutboundMessage,
|
"outbound_message" => SchedulerJobKind::OutboundMessage,
|
||||||
"agent_task" => SchedulerJobKind::AgentTask,
|
"agent_task" => SchedulerJobKind::AgentTask,
|
||||||
|
"silent_agent_task" => SchedulerJobKind::SilentAgentTask,
|
||||||
other => {
|
other => {
|
||||||
tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind");
|
tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind");
|
||||||
return Ok(None);
|
return Ok(None);
|
||||||
@ -331,6 +382,7 @@ impl RuntimeJob {
|
|||||||
SchedulerJobKind::InternalEvent => "internal_event".to_string(),
|
SchedulerJobKind::InternalEvent => "internal_event".to_string(),
|
||||||
SchedulerJobKind::OutboundMessage => "outbound_message".to_string(),
|
SchedulerJobKind::OutboundMessage => "outbound_message".to_string(),
|
||||||
SchedulerJobKind::AgentTask => "agent_task".to_string(),
|
SchedulerJobKind::AgentTask => "agent_task".to_string(),
|
||||||
|
SchedulerJobKind::SilentAgentTask => "silent_agent_task".to_string(),
|
||||||
},
|
},
|
||||||
schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})),
|
schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})),
|
||||||
interval_secs: self.interval_secs,
|
interval_secs: self.interval_secs,
|
||||||
@ -527,17 +579,13 @@ async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJ
|
|||||||
async fn execute_agent_task(
|
async fn execute_agent_task(
|
||||||
session_manager: &SessionManager,
|
session_manager: &SessionManager,
|
||||||
job: &RuntimeJob,
|
job: &RuntimeJob,
|
||||||
|
execution_chat_id: &str,
|
||||||
) -> anyhow::Result<Vec<OutboundMessage>> {
|
) -> anyhow::Result<Vec<OutboundMessage>> {
|
||||||
let channel_name = job
|
let channel_name = job
|
||||||
.target
|
.target
|
||||||
.channel
|
.channel
|
||||||
.as_deref()
|
.as_deref()
|
||||||
.ok_or_else(|| anyhow::anyhow!("agent_task requires target.channel"))?;
|
.ok_or_else(|| anyhow::anyhow!("scheduled agent task requires target.channel"))?;
|
||||||
let chat_id = job
|
|
||||||
.target
|
|
||||||
.chat_id
|
|
||||||
.as_deref()
|
|
||||||
.ok_or_else(|| anyhow::anyhow!("agent_task requires target.chat_id"))?;
|
|
||||||
let prompt = job
|
let prompt = job
|
||||||
.payload
|
.payload
|
||||||
.get("prompt")
|
.get("prompt")
|
||||||
@ -546,11 +594,45 @@ async fn execute_agent_task(
|
|||||||
let options = parse_scheduled_agent_task_options(job)?;
|
let options = parse_scheduled_agent_task_options(job)?;
|
||||||
|
|
||||||
session_manager
|
session_manager
|
||||||
.run_scheduled_agent_task(channel_name, chat_id, prompt, options)
|
.run_scheduled_agent_task(channel_name, execution_chat_id, prompt, options)
|
||||||
.await
|
.await
|
||||||
.map_err(|error| anyhow::anyhow!(error.to_string()))
|
.map_err(|error| anyhow::anyhow!(error.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn required_notification_chat_id<'a>(job: &'a RuntimeJob, kind_name: &str) -> anyhow::Result<&'a str> {
|
||||||
|
job.target
|
||||||
|
.chat_id
|
||||||
|
.as_deref()
|
||||||
|
.ok_or_else(|| anyhow::anyhow!("{} requires target.chat_id", kind_name))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn resolve_execution_chat_id(job: &RuntimeJob) -> anyhow::Result<String> {
|
||||||
|
match job.kind {
|
||||||
|
SchedulerJobKind::AgentTask => Ok(required_notification_chat_id(job, "agent_task")?.to_string()),
|
||||||
|
SchedulerJobKind::SilentAgentTask => Ok(job
|
||||||
|
.target
|
||||||
|
.session_chat_id
|
||||||
|
.clone()
|
||||||
|
.unwrap_or_else(|| derive_silent_session_chat_id(&job.id))),
|
||||||
|
_ => anyhow::bail!("execution chat id is only supported for agent task kinds"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn derive_silent_session_chat_id(job_id: &str) -> String {
|
||||||
|
format!("scheduler/{}", job_id)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn summarize_scheduler_error(error: &anyhow::Error) -> String {
|
||||||
|
let text = error.to_string().replace('\n', " ");
|
||||||
|
const MAX_LEN: usize = 240;
|
||||||
|
if text.chars().count() <= MAX_LEN {
|
||||||
|
text
|
||||||
|
} else {
|
||||||
|
let summary = text.chars().take(MAX_LEN).collect::<String>();
|
||||||
|
format!("{}...", summary)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn parse_scheduled_agent_task_options(job: &RuntimeJob) -> anyhow::Result<ScheduledAgentTaskOptions> {
|
fn parse_scheduled_agent_task_options(job: &RuntimeJob) -> anyhow::Result<ScheduledAgentTaskOptions> {
|
||||||
let sender_id = job
|
let sender_id = job
|
||||||
.payload
|
.payload
|
||||||
@ -656,6 +738,47 @@ mod agent_task_tests {
|
|||||||
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
|
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn runtime_job_from_record_supports_silent_agent_task_kind() {
|
||||||
|
let record = SchedulerJobRecord {
|
||||||
|
id: "agent.daily_summary.background".to_string(),
|
||||||
|
kind: "silent_agent_task".to_string(),
|
||||||
|
schedule: serde_json::json!({
|
||||||
|
"type": "interval",
|
||||||
|
"seconds": 300
|
||||||
|
}),
|
||||||
|
interval_secs: 0,
|
||||||
|
startup_delay_secs: 0,
|
||||||
|
target: serde_json::json!({
|
||||||
|
"channel": "feishu",
|
||||||
|
"chat_id": "oc_demo",
|
||||||
|
"session_chat_id": "scheduler/agent.daily_summary.background"
|
||||||
|
}),
|
||||||
|
payload: serde_json::json!({
|
||||||
|
"prompt": "请后台整理今天待办"
|
||||||
|
}),
|
||||||
|
enabled: true,
|
||||||
|
state: SchedulerJobState::Scheduled,
|
||||||
|
last_status: None,
|
||||||
|
last_error: None,
|
||||||
|
run_count: 0,
|
||||||
|
max_runs: None,
|
||||||
|
last_fired_at: None,
|
||||||
|
next_fire_at: Some(1_700_000_010_000),
|
||||||
|
paused_at: None,
|
||||||
|
completed_at: None,
|
||||||
|
created_at: 1_700_000_000_000,
|
||||||
|
updated_at: 1_700_000_000_000,
|
||||||
|
};
|
||||||
|
|
||||||
|
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip, chrono_tz::Asia::Shanghai)
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert_eq!(job.kind, SchedulerJobKind::SilentAgentTask);
|
||||||
|
assert_eq!(job.target.session_chat_id.as_deref(), Some("scheduler/agent.daily_summary.background"));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() {
|
fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() {
|
||||||
let job = RuntimeJob {
|
let job = RuntimeJob {
|
||||||
@ -668,6 +791,7 @@ mod agent_task_tests {
|
|||||||
target: SchedulerJobTarget {
|
target: SchedulerJobTarget {
|
||||||
channel: Some("feishu".to_string()),
|
channel: Some("feishu".to_string()),
|
||||||
chat_id: Some("oc_demo".to_string()),
|
chat_id: Some("oc_demo".to_string()),
|
||||||
|
session_chat_id: None,
|
||||||
reply_to: None,
|
reply_to: None,
|
||||||
},
|
},
|
||||||
payload: serde_json::json!({
|
payload: serde_json::json!({
|
||||||
@ -705,6 +829,44 @@ mod agent_task_tests {
|
|||||||
assert_eq!(options.metadata.get("priority").map(String::as_str), Some("1"));
|
assert_eq!(options.metadata.get("priority").map(String::as_str), Some("1"));
|
||||||
assert_eq!(options.metadata.get("urgent").map(String::as_str), Some("false"));
|
assert_eq!(options.metadata.get("urgent").map(String::as_str), Some("false"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn resolve_execution_chat_id_uses_dedicated_session_for_silent_agent_tasks() {
|
||||||
|
let job = RuntimeJob {
|
||||||
|
id: "agent.daily_summary.background".to_string(),
|
||||||
|
kind: SchedulerJobKind::SilentAgentTask,
|
||||||
|
schedule: SchedulerSchedule::Interval {
|
||||||
|
seconds: 300,
|
||||||
|
startup_delay_secs: 0,
|
||||||
|
},
|
||||||
|
target: SchedulerJobTarget {
|
||||||
|
channel: Some("feishu".to_string()),
|
||||||
|
chat_id: Some("oc_demo".to_string()),
|
||||||
|
session_chat_id: None,
|
||||||
|
reply_to: None,
|
||||||
|
},
|
||||||
|
payload: serde_json::json!({
|
||||||
|
"prompt": "请后台整理今天待办"
|
||||||
|
}),
|
||||||
|
enabled: true,
|
||||||
|
state: SchedulerJobState::Scheduled,
|
||||||
|
last_status: None,
|
||||||
|
last_error: None,
|
||||||
|
run_count: 0,
|
||||||
|
max_runs: None,
|
||||||
|
last_fired_at: None,
|
||||||
|
next_fire_at: None,
|
||||||
|
paused_at: None,
|
||||||
|
completed_at: None,
|
||||||
|
interval_secs: 300,
|
||||||
|
startup_delay_secs: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
resolve_execution_chat_id(&job).unwrap(),
|
||||||
|
"scheduler/agent.daily_summary.background"
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TryFrom<serde_json::Value> for SchedulerJobTarget {
|
impl TryFrom<serde_json::Value> for SchedulerJobTarget {
|
||||||
@ -725,6 +887,38 @@ mod tests {
|
|||||||
use crate::skills::SkillRuntime;
|
use crate::skills::SkillRuntime;
|
||||||
use crate::storage::{SchedulerJobUpsert, SessionStore};
|
use crate::storage::{SchedulerJobUpsert, SessionStore};
|
||||||
|
|
||||||
|
fn test_provider_config() -> LLMProviderConfig {
|
||||||
|
LLMProviderConfig {
|
||||||
|
provider_type: "openai".to_string(),
|
||||||
|
name: "default".to_string(),
|
||||||
|
base_url: "http://localhost".to_string(),
|
||||||
|
api_key: "test-key".to_string(),
|
||||||
|
extra_headers: HashMap::new(),
|
||||||
|
llm_timeout_secs: 30,
|
||||||
|
model_id: "test-model".to_string(),
|
||||||
|
temperature: Some(0.0),
|
||||||
|
max_tokens: None,
|
||||||
|
model_extra: HashMap::new(),
|
||||||
|
max_tool_iterations: 4,
|
||||||
|
tool_result_max_chars: 20_000,
|
||||||
|
context_tool_result_trim_chars: 20_000,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_session_manager() -> SessionManager {
|
||||||
|
let provider_config = test_provider_config();
|
||||||
|
SessionManager::new(
|
||||||
|
4,
|
||||||
|
100,
|
||||||
|
false,
|
||||||
|
"Asia/Shanghai".to_string(),
|
||||||
|
provider_config.clone(),
|
||||||
|
HashMap::from([("default".to_string(), provider_config)]),
|
||||||
|
Arc::new(SkillRuntime::default()),
|
||||||
|
)
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn runtime_job_skip_policy_advances_from_now() {
|
fn runtime_job_skip_policy_advances_from_now() {
|
||||||
let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap();
|
let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap();
|
||||||
@ -839,31 +1033,7 @@ mod tests {
|
|||||||
})
|
})
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let provider_config = LLMProviderConfig {
|
let session_manager = test_session_manager();
|
||||||
provider_type: "openai".to_string(),
|
|
||||||
name: "default".to_string(),
|
|
||||||
base_url: "http://localhost".to_string(),
|
|
||||||
api_key: "test-key".to_string(),
|
|
||||||
extra_headers: HashMap::new(),
|
|
||||||
llm_timeout_secs: 30,
|
|
||||||
model_id: "test-model".to_string(),
|
|
||||||
temperature: Some(0.0),
|
|
||||||
max_tokens: None,
|
|
||||||
model_extra: HashMap::new(),
|
|
||||||
max_tool_iterations: 4,
|
|
||||||
tool_result_max_chars: 20_000,
|
|
||||||
context_tool_result_trim_chars: 20_000,
|
|
||||||
};
|
|
||||||
let session_manager = SessionManager::new(
|
|
||||||
4,
|
|
||||||
100,
|
|
||||||
false,
|
|
||||||
"Asia/Shanghai".to_string(),
|
|
||||||
provider_config.clone(),
|
|
||||||
HashMap::from([("default".to_string(), provider_config)]),
|
|
||||||
Arc::new(SkillRuntime::default()),
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
let scheduler = Scheduler::new(
|
let scheduler = Scheduler::new(
|
||||||
MessageBus::new(8),
|
MessageBus::new(8),
|
||||||
SchedulerConfig {
|
SchedulerConfig {
|
||||||
@ -890,31 +1060,7 @@ mod tests {
|
|||||||
fn sync_config_jobs_persists_builtin_memory_maintenance_job() {
|
fn sync_config_jobs_persists_builtin_memory_maintenance_job() {
|
||||||
let store = Arc::new(SessionStore::in_memory().unwrap());
|
let store = Arc::new(SessionStore::in_memory().unwrap());
|
||||||
|
|
||||||
let provider_config = LLMProviderConfig {
|
let session_manager = test_session_manager();
|
||||||
provider_type: "openai".to_string(),
|
|
||||||
name: "default".to_string(),
|
|
||||||
base_url: "http://localhost".to_string(),
|
|
||||||
api_key: "test-key".to_string(),
|
|
||||||
extra_headers: HashMap::new(),
|
|
||||||
llm_timeout_secs: 30,
|
|
||||||
model_id: "test-model".to_string(),
|
|
||||||
temperature: Some(0.0),
|
|
||||||
max_tokens: None,
|
|
||||||
model_extra: HashMap::new(),
|
|
||||||
max_tool_iterations: 4,
|
|
||||||
tool_result_max_chars: 20_000,
|
|
||||||
context_tool_result_trim_chars: 20_000,
|
|
||||||
};
|
|
||||||
let session_manager = SessionManager::new(
|
|
||||||
4,
|
|
||||||
100,
|
|
||||||
false,
|
|
||||||
"Asia/Shanghai".to_string(),
|
|
||||||
provider_config.clone(),
|
|
||||||
HashMap::from([("default".to_string(), provider_config)]),
|
|
||||||
Arc::new(SkillRuntime::default()),
|
|
||||||
)
|
|
||||||
.unwrap();
|
|
||||||
let scheduler = Scheduler::new(
|
let scheduler = Scheduler::new(
|
||||||
MessageBus::new(8),
|
MessageBus::new(8),
|
||||||
SchedulerConfig::default(),
|
SchedulerConfig::default(),
|
||||||
@ -946,6 +1092,67 @@ mod tests {
|
|||||||
assert!(saved.next_fire_at.is_some());
|
assert!(saved.next_fire_at.is_some());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn silent_agent_task_failure_notifies_primary_chat() {
|
||||||
|
let store = Arc::new(SessionStore::in_memory().unwrap());
|
||||||
|
let bus = MessageBus::new(8);
|
||||||
|
let scheduler = Scheduler::new(
|
||||||
|
bus.clone(),
|
||||||
|
SchedulerConfig {
|
||||||
|
enabled: true,
|
||||||
|
tick_resolution_ms: 1000,
|
||||||
|
worker_queue_capacity: 64,
|
||||||
|
misfire_policy: SchedulerMisfirePolicy::Skip,
|
||||||
|
jobs: Vec::new(),
|
||||||
|
},
|
||||||
|
chrono_tz::Asia::Shanghai,
|
||||||
|
store,
|
||||||
|
test_session_manager(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let job = RuntimeJob {
|
||||||
|
id: "agent.daily_summary.background".to_string(),
|
||||||
|
kind: SchedulerJobKind::SilentAgentTask,
|
||||||
|
schedule: SchedulerSchedule::Interval {
|
||||||
|
seconds: 300,
|
||||||
|
startup_delay_secs: 0,
|
||||||
|
},
|
||||||
|
target: SchedulerJobTarget {
|
||||||
|
channel: Some("feishu".to_string()),
|
||||||
|
chat_id: Some("oc_demo".to_string()),
|
||||||
|
session_chat_id: Some("scheduler/agent.daily_summary.background".to_string()),
|
||||||
|
reply_to: None,
|
||||||
|
},
|
||||||
|
payload: serde_json::json!({}),
|
||||||
|
enabled: true,
|
||||||
|
state: SchedulerJobState::Scheduled,
|
||||||
|
last_status: None,
|
||||||
|
last_error: None,
|
||||||
|
run_count: 0,
|
||||||
|
max_runs: None,
|
||||||
|
last_fired_at: None,
|
||||||
|
next_fire_at: None,
|
||||||
|
paused_at: None,
|
||||||
|
completed_at: None,
|
||||||
|
interval_secs: 300,
|
||||||
|
startup_delay_secs: 0,
|
||||||
|
};
|
||||||
|
|
||||||
|
let error = scheduler.execute_job(&job).await.unwrap_err();
|
||||||
|
assert!(error.to_string().contains("payload.prompt"));
|
||||||
|
|
||||||
|
let outbound = tokio::time::timeout(
|
||||||
|
std::time::Duration::from_millis(100),
|
||||||
|
bus.consume_outbound(),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(outbound.channel, "feishu");
|
||||||
|
assert_eq!(outbound.chat_id, "oc_demo");
|
||||||
|
assert!(outbound.content.contains("定时任务执行失败"));
|
||||||
|
assert!(outbound.content.contains("agent.daily_summary.background"));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn cron_schedule_uses_configured_timezone() {
|
fn cron_schedule_uses_configured_timezone() {
|
||||||
let now = Utc.with_ymd_and_hms(2026, 4, 23, 18, 0, 0).single().unwrap();
|
let now = Utc.with_ymd_and_hms(2026, 4, 23, 18, 0, 0).single().unwrap();
|
||||||
|
|||||||
@ -31,7 +31,7 @@ impl Tool for SchedulerManageTool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn description(&self) -> &str {
|
fn description(&self) -> &str {
|
||||||
"Manage DB-backed scheduled jobs. Supports actions: list, get, put, delete, pause, resume. Jobs persist in SQLite and are executed by the scheduler runtime. When creating agent_task jobs, keep prompt/system_prompt focused on the work to perform; do not restate execution times unless the task logic truly depends on them, because the trigger already controls timing."
|
"Manage DB-backed scheduled jobs. Supports actions: list, get, put, delete, pause, resume. Jobs persist in SQLite and are executed by the scheduler runtime. When creating agent_task or silent_agent_task jobs, keep prompt/system_prompt focused on the work to perform; do not restate execution times unless the task logic truly depends on them, because the trigger already controls timing."
|
||||||
}
|
}
|
||||||
|
|
||||||
fn parameters_schema(&self) -> serde_json::Value {
|
fn parameters_schema(&self) -> serde_json::Value {
|
||||||
@ -67,18 +67,19 @@ impl Tool for SchedulerManageTool {
|
|||||||
},
|
},
|
||||||
"kind": {
|
"kind": {
|
||||||
"type": "string",
|
"type": "string",
|
||||||
"enum": ["internal_event", "outbound_message", "agent_task"]
|
"enum": ["internal_event", "outbound_message", "agent_task", "silent_agent_task"]
|
||||||
},
|
},
|
||||||
"schedule": {
|
"schedule": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"description": "Schedule object, for example {type: 'interval', seconds: 300} or {type: 'cron', expression: '0 9 * * *'}"
|
"description": "Schedule object, for example {type: 'interval', seconds: 300} or {type: 'cron', expression: '0 9 * * *'}"
|
||||||
},
|
},
|
||||||
"target": {
|
"target": {
|
||||||
"type": "object"
|
"type": "object",
|
||||||
|
"description": "Target routing. agent_task and silent_agent_task require channel and chat_id. silent_agent_task may optionally set session_chat_id to choose a dedicated background session; otherwise one is derived from the job id."
|
||||||
},
|
},
|
||||||
"payload": {
|
"payload": {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"description": format!("Job payload. agent_task supports prompt, agent, fresh_session, system_prompt, sender_id, metadata. For agent_task, write prompt/system_prompt as execution instructions and avoid repeating schedule phrases or execution times such as 每天9点 or 每小时 unless the task itself must reason about time. {} outbound_message expects content. internal_event expects event.", agent_hint)
|
"description": format!("Job payload. agent_task and silent_agent_task support prompt, agent, fresh_session, system_prompt, sender_id, metadata. For these kinds, write prompt/system_prompt as execution instructions and avoid repeating schedule phrases or execution times such as 每天9点 or 每小时 unless the task itself must reason about time. {} outbound_message expects content. internal_event expects event.", agent_hint)
|
||||||
},
|
},
|
||||||
"max_runs": {
|
"max_runs": {
|
||||||
"type": ["integer", "null"]
|
"type": ["integer", "null"]
|
||||||
@ -208,9 +209,9 @@ fn build_upsert(
|
|||||||
args.get("target").cloned().unwrap_or_else(|| json!({})),
|
args.get("target").cloned().unwrap_or_else(|| json!({})),
|
||||||
context,
|
context,
|
||||||
);
|
);
|
||||||
if kind == "agent_task" {
|
if kind == "agent_task" || kind == "silent_agent_task" {
|
||||||
validate_agent_task_payload(&payload, known_agents)?;
|
validate_agent_task_payload(&payload, known_agents)?;
|
||||||
validate_target_fields(&target, &["channel", "chat_id"], "agent_task")?;
|
validate_target_fields(&target, &["channel", "chat_id"], &kind)?;
|
||||||
} else if kind == "outbound_message" {
|
} else if kind == "outbound_message" {
|
||||||
validate_outbound_message_payload(&payload)?;
|
validate_outbound_message_payload(&payload)?;
|
||||||
validate_target_fields(&target, &["channel", "chat_id"], "outbound_message")?;
|
validate_target_fields(&target, &["channel", "chat_id"], "outbound_message")?;
|
||||||
@ -482,6 +483,43 @@ mod tests {
|
|||||||
assert!(put_result.output.contains("agent_task"));
|
assert!(put_result.output.contains("agent_task"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_scheduler_manage_put_silent_agent_task() {
|
||||||
|
let store = Arc::new(SessionStore::in_memory().unwrap());
|
||||||
|
let tool = SchedulerManageTool::new(store.clone(), HashSet::from(["planner".to_string()]));
|
||||||
|
|
||||||
|
let put_result = tool
|
||||||
|
.execute(json!({
|
||||||
|
"action": "put",
|
||||||
|
"id": "agent.daily_summary.background",
|
||||||
|
"kind": "silent_agent_task",
|
||||||
|
"schedule": {
|
||||||
|
"type": "cron",
|
||||||
|
"expression": "0 9 * * *"
|
||||||
|
},
|
||||||
|
"target": {
|
||||||
|
"channel": "feishu",
|
||||||
|
"chat_id": "oc_demo",
|
||||||
|
"session_chat_id": "scheduler/agent.daily_summary.background"
|
||||||
|
},
|
||||||
|
"payload": {
|
||||||
|
"prompt": "请后台总结今天待办",
|
||||||
|
"agent": "planner"
|
||||||
|
}
|
||||||
|
}))
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
assert!(put_result.success);
|
||||||
|
|
||||||
|
let saved = store
|
||||||
|
.get_scheduler_job("agent.daily_summary.background")
|
||||||
|
.unwrap()
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(saved.kind, "silent_agent_task");
|
||||||
|
assert_eq!(saved.target["session_chat_id"], "scheduler/agent.daily_summary.background");
|
||||||
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_scheduler_manage_rejects_outbound_message_without_target() {
|
async fn test_scheduler_manage_rejects_outbound_message_without_target() {
|
||||||
let store = Arc::new(SessionStore::in_memory().unwrap());
|
let store = Arc::new(SessionStore::in_memory().unwrap());
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user