Compare commits

..

No commits in common. "8c17af1209c8a012612dff6112c4321d751e5163" and "52c94f274a58724cb83fc3dc2702a81b06da4214" have entirely different histories.

6 changed files with 73 additions and 556 deletions

View File

@ -375,7 +375,6 @@ PicoBot 带有一个基于 SQLite 的调度器,而不是纯内存或 JSON 文
- internal_event内部事件 - internal_event内部事件
- outbound_message直接向目标通道发消息 - outbound_message直接向目标通道发消息
- agent_task构造一次合成用户输入复用完整 Agent 流程执行 - agent_task构造一次合成用户输入复用完整 Agent 流程执行
- silent_agent_task在独立后台会话中执行 Agent 流程,成功不推送,失败回主 chat 通知
agent_task 会复用正常链路中的这些能力: agent_task 会复用正常链路中的这些能力:
@ -385,14 +384,6 @@ agent_task 会复用正常链路中的这些能力:
- 会话持久化 - 会话持久化
- 渠道消息下发 - 渠道消息下发
silent_agent_task 和 agent_task 使用同一套 Agent 执行能力,但路由语义不同:
- target.chat_id 仍表示用户通知目标
- target.session_chat_id 表示后台任务实际写入的会话;如果省略,会稳定派生为 scheduler/{job_id}
- 成功执行后不会向用户发送正常结果
- 执行失败时会向主 chat 发送一条失败通知,便于用户感知异常
- 后台任务的历史、压缩和会话内上下文会留在独立会话中,不污染主会话
### 9.3 运行时管理 ### 9.3 运行时管理
通过 scheduler_manage 可以进行: 通过 scheduler_manage 可以进行:
@ -448,43 +439,12 @@ silent_agent_task 和 agent_task 使用同一套 Agent 执行能力,但路由
"source": "scheduler" "source": "scheduler"
} }
} }
},
{
"id": "agent.weekly_report.background",
"kind": "silent_agent_task",
"schedule": {
"type": "cron",
"expression": "0 8 * * 1"
},
"target": {
"channel": "feishu",
"chat_id": "oc_xxx",
"session_chat_id": "scheduler/agent.weekly_report.background"
},
"payload": {
"prompt": "请后台整理上周项目进展,输出结构化周报草稿,重点标出风险、阻塞项和下周优先级。",
"agent": "default",
"fresh_session": false,
"system_prompt": "你是周报助手,只在后台整理,不要面向用户寒暄。",
"sender_id": "scheduler-weekly-report",
"metadata": {
"job_type": "weekly_report_background",
"source": "scheduler"
}
}
}
} }
] ]
} }
} }
``` ```
推荐场景:
- agent_task用户需要直接收到结果例如日报提醒、定时播报、定时外发通知
- silent_agent_task任务需要长期积累独立上下文或后台整理材料但不应污染主会话例如周报草稿整理、周期性资料汇总、后台分析任务
## 10. 渠道与运行方式 ## 10. 渠道与运行方式
### 10.1 当前支持的通道 ### 10.1 当前支持的通道

View File

@ -252,7 +252,6 @@ pub enum SchedulerJobKind {
InternalEvent, InternalEvent,
OutboundMessage, OutboundMessage,
AgentTask, AgentTask,
SilentAgentTask,
} }
#[derive(Debug, Clone, Deserialize, Serialize, Default)] #[derive(Debug, Clone, Deserialize, Serialize, Default)]
@ -262,8 +261,6 @@ pub struct SchedulerJobTarget {
#[serde(default)] #[serde(default)]
pub chat_id: Option<String>, pub chat_id: Option<String>,
#[serde(default)] #[serde(default)]
pub session_chat_id: Option<String>,
#[serde(default)]
pub reply_to: Option<String>, pub reply_to: Option<String>,
} }
@ -314,7 +311,7 @@ impl SchedulerConfig {
enabled: true, enabled: true,
kind: SchedulerJobKind::InternalEvent, kind: SchedulerJobKind::InternalEvent,
schedule: Some(SchedulerSchedule::Cron { schedule: Some(SchedulerSchedule::Cron {
expression: "0 */4 * * *".to_string(), expression: "0 3 * * *".to_string(),
}), }),
startup_delay_secs: 0, startup_delay_secs: 0,
interval_secs: 0, interval_secs: 0,
@ -322,7 +319,7 @@ impl SchedulerConfig {
payload: serde_json::json!({ payload: serde_json::json!({
"event": "memory_maintenance", "event": "memory_maintenance",
"time_zone": time.timezone, "time_zone": time.timezone,
"local_time": "every_4_hours" "local_time": "03:00"
}), }),
}] }]
} }
@ -1149,7 +1146,7 @@ mod tests {
assert_eq!( assert_eq!(
effective_jobs[0].resolved_schedule().unwrap(), effective_jobs[0].resolved_schedule().unwrap(),
SchedulerSchedule::Cron { SchedulerSchedule::Cron {
expression: "0 */4 * * *".to_string(), expression: "0 3 * * *".to_string(),
} }
); );
} }
@ -1416,68 +1413,6 @@ mod tests {
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办")); assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
} }
#[test]
fn test_scheduler_config_loads_silent_agent_task_job() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
},
"scheduler": {
"enabled": true,
"jobs": [
{
"id": "agent.daily_summary.background",
"kind": "silent_agent_task",
"schedule": {
"type": "cron",
"expression": "0 9 * * *"
},
"target": {
"channel": "feishu",
"chat_id": "oc_demo",
"session_chat_id": "scheduler/agent.daily_summary.background"
},
"payload": {
"prompt": "请后台总结今天待办"
}
}
]
}
}"#,
)
.unwrap();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
let job = &config.scheduler.jobs[0];
assert_eq!(job.kind, SchedulerJobKind::SilentAgentTask);
assert_eq!(job.target.channel.as_deref(), Some("feishu"));
assert_eq!(job.target.chat_id.as_deref(), Some("oc_demo"));
assert_eq!(
job.target.session_chat_id.as_deref(),
Some("scheduler/agent.daily_summary.background")
);
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请后台总结今天待办"));
}
#[test] #[test]
fn test_scheduler_schedule_validation_rejects_invalid_values() { fn test_scheduler_schedule_validation_rejects_invalid_values() {
assert!(SchedulerSchedule::Delay { seconds: 0 } assert!(SchedulerSchedule::Delay { seconds: 0 }

View File

@ -1168,17 +1168,11 @@ impl SessionManager {
pub(crate) async fn run_memory_maintenance_for_all_scopes( pub(crate) async fn run_memory_maintenance_for_all_scopes(
&self, &self,
updated_since: Option<i64>,
) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> { ) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> {
let scope_keys = if let Some(cutoff) = updated_since { let scope_keys = self
self.store .store
.list_memory_scope_keys_updated_since("user", cutoff) .list_memory_scope_keys("user")
.map_err(|err| AgentError::Other(format!("list memory scope keys updated since error: {}", err)))? .map_err(|err| AgentError::Other(format!("list memory scope keys error: {}", err)))?;
} else {
self.store
.list_memory_scope_keys("user")
.map_err(|err| AgentError::Other(format!("list memory scope keys error: {}", err)))?
};
let mut results = Vec::new(); let mut results = Vec::new();
for scope_key in scope_keys { for scope_key in scope_keys {
@ -2256,60 +2250,6 @@ mod tests {
assert!(output.managed_markdown.contains("### 用户事实")); assert!(output.managed_markdown.contains("### 用户事实"));
} }
#[tokio::test]
async fn test_run_memory_maintenance_for_all_scopes_returns_empty_when_no_recent_updates() {
let provider_config = LLMProviderConfig {
provider_type: "openai".to_string(),
name: "maintenance-provider".to_string(),
base_url: "http://localhost".to_string(),
api_key: "test-key".to_string(),
extra_headers: HashMap::new(),
model_id: "maintenance-model".to_string(),
temperature: Some(0.0),
max_tokens: Some(256),
model_extra: HashMap::new(),
max_tool_iterations: 1,
llm_timeout_secs: 30,
tool_result_max_chars: 20_000,
context_tool_result_trim_chars: 20_000,
};
let session_manager = SessionManager::new(
4,
100,
false,
"Asia/Shanghai".to_string(),
provider_config.clone(),
HashMap::from([("default".to_string(), provider_config)]),
Arc::new(SkillRuntime::default()),
)
.unwrap();
let memory = session_manager
.store()
.put_memory(&crate::storage::MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "profile".to_string(),
memory_key: "work".to_string(),
content: "用户在做AI产品".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
})
.unwrap();
let results = session_manager
.run_memory_maintenance_for_all_scopes(Some(memory.updated_at + 1))
.await
.unwrap();
assert!(results.is_empty());
}
#[test] #[test]
fn test_apply_memory_maintenance_output_merges_and_deletes_low_value_records() { fn test_apply_memory_maintenance_output_merges_and_deletes_low_value_records() {
let store = SessionStore::in_memory().unwrap(); let store = SessionStore::in_memory().unwrap();

View File

@ -166,65 +166,15 @@ impl Scheduler {
execute_internal_event(&self.session_manager, job).await?; execute_internal_event(&self.session_manager, job).await?;
} }
SchedulerJobKind::AgentTask => { SchedulerJobKind::AgentTask => {
let outbound_messages = execute_agent_task( let outbound_messages = execute_agent_task(&self.session_manager, job).await?;
&self.session_manager,
job,
required_notification_chat_id(job, "agent_task")?,
)
.await?;
for message in outbound_messages { for message in outbound_messages {
self.bus.publish_outbound(message).await?; self.bus.publish_outbound(message).await?;
} }
} }
SchedulerJobKind::SilentAgentTask => {
let execution_chat_id = resolve_execution_chat_id(job)?;
if let Err(error) = execute_agent_task(&self.session_manager, job, &execution_chat_id).await {
if let Err(notify_error) = self.notify_silent_agent_task_failure(job, &error).await {
tracing::error!(
job_id = %job.id,
error = %notify_error,
"Failed to publish silent scheduler failure notification"
);
}
return Err(error);
}
}
} }
Ok(()) Ok(())
} }
async fn notify_silent_agent_task_failure(
&self,
job: &RuntimeJob,
error: &anyhow::Error,
) -> anyhow::Result<()> {
let channel = job
.target
.channel
.clone()
.ok_or_else(|| anyhow::anyhow!("silent_agent_task requires target.channel"))?;
let chat_id = required_notification_chat_id(job, "silent_agent_task")?.to_string();
let mut metadata = HashMap::new();
metadata.insert("scheduler_job_id".to_string(), job.id.clone());
metadata.insert("scheduler_job_kind".to_string(), "silent_agent_task".to_string());
self.bus
.publish_outbound(OutboundMessage::assistant(
channel,
chat_id,
format!(
"定时任务执行失败:{}\n{}",
job.id,
summarize_scheduler_error(error)
),
job.target.reply_to.clone(),
metadata,
))
.await
.map_err(|error| anyhow::anyhow!(error.to_string()))
}
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -297,7 +247,6 @@ impl RuntimeJob {
"internal_event" => SchedulerJobKind::InternalEvent, "internal_event" => SchedulerJobKind::InternalEvent,
"outbound_message" => SchedulerJobKind::OutboundMessage, "outbound_message" => SchedulerJobKind::OutboundMessage,
"agent_task" => SchedulerJobKind::AgentTask, "agent_task" => SchedulerJobKind::AgentTask,
"silent_agent_task" => SchedulerJobKind::SilentAgentTask,
other => { other => {
tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind"); tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind");
return Ok(None); return Ok(None);
@ -382,7 +331,6 @@ impl RuntimeJob {
SchedulerJobKind::InternalEvent => "internal_event".to_string(), SchedulerJobKind::InternalEvent => "internal_event".to_string(),
SchedulerJobKind::OutboundMessage => "outbound_message".to_string(), SchedulerJobKind::OutboundMessage => "outbound_message".to_string(),
SchedulerJobKind::AgentTask => "agent_task".to_string(), SchedulerJobKind::AgentTask => "agent_task".to_string(),
SchedulerJobKind::SilentAgentTask => "silent_agent_task".to_string(),
}, },
schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})), schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})),
interval_secs: self.interval_secs, interval_secs: self.interval_secs,
@ -553,9 +501,7 @@ async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJ
Ok(()) Ok(())
} }
"memory_maintenance" => { "memory_maintenance" => {
let results = session_manager let results = session_manager.run_memory_maintenance_for_all_scopes().await?;
.run_memory_maintenance_for_all_scopes(job.last_fired_at)
.await?;
for result in &results { for result in &results {
tracing::info!( tracing::info!(
job_id = %job.id, job_id = %job.id,
@ -579,13 +525,17 @@ async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJ
async fn execute_agent_task( async fn execute_agent_task(
session_manager: &SessionManager, session_manager: &SessionManager,
job: &RuntimeJob, job: &RuntimeJob,
execution_chat_id: &str,
) -> anyhow::Result<Vec<OutboundMessage>> { ) -> anyhow::Result<Vec<OutboundMessage>> {
let channel_name = job let channel_name = job
.target .target
.channel .channel
.as_deref() .as_deref()
.ok_or_else(|| anyhow::anyhow!("scheduled agent task requires target.channel"))?; .ok_or_else(|| anyhow::anyhow!("agent_task requires target.channel"))?;
let chat_id = job
.target
.chat_id
.as_deref()
.ok_or_else(|| anyhow::anyhow!("agent_task requires target.chat_id"))?;
let prompt = job let prompt = job
.payload .payload
.get("prompt") .get("prompt")
@ -594,45 +544,11 @@ async fn execute_agent_task(
let options = parse_scheduled_agent_task_options(job)?; let options = parse_scheduled_agent_task_options(job)?;
session_manager session_manager
.run_scheduled_agent_task(channel_name, execution_chat_id, prompt, options) .run_scheduled_agent_task(channel_name, chat_id, prompt, options)
.await .await
.map_err(|error| anyhow::anyhow!(error.to_string())) .map_err(|error| anyhow::anyhow!(error.to_string()))
} }
fn required_notification_chat_id<'a>(job: &'a RuntimeJob, kind_name: &str) -> anyhow::Result<&'a str> {
job.target
.chat_id
.as_deref()
.ok_or_else(|| anyhow::anyhow!("{} requires target.chat_id", kind_name))
}
fn resolve_execution_chat_id(job: &RuntimeJob) -> anyhow::Result<String> {
match job.kind {
SchedulerJobKind::AgentTask => Ok(required_notification_chat_id(job, "agent_task")?.to_string()),
SchedulerJobKind::SilentAgentTask => Ok(job
.target
.session_chat_id
.clone()
.unwrap_or_else(|| derive_silent_session_chat_id(&job.id))),
_ => anyhow::bail!("execution chat id is only supported for agent task kinds"),
}
}
fn derive_silent_session_chat_id(job_id: &str) -> String {
format!("scheduler/{}", job_id)
}
fn summarize_scheduler_error(error: &anyhow::Error) -> String {
let text = error.to_string().replace('\n', " ");
const MAX_LEN: usize = 240;
if text.chars().count() <= MAX_LEN {
text
} else {
let summary = text.chars().take(MAX_LEN).collect::<String>();
format!("{}...", summary)
}
}
fn parse_scheduled_agent_task_options(job: &RuntimeJob) -> anyhow::Result<ScheduledAgentTaskOptions> { fn parse_scheduled_agent_task_options(job: &RuntimeJob) -> anyhow::Result<ScheduledAgentTaskOptions> {
let sender_id = job let sender_id = job
.payload .payload
@ -738,47 +654,6 @@ mod agent_task_tests {
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办")); assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
} }
#[test]
fn runtime_job_from_record_supports_silent_agent_task_kind() {
let record = SchedulerJobRecord {
id: "agent.daily_summary.background".to_string(),
kind: "silent_agent_task".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 300
}),
interval_secs: 0,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "feishu",
"chat_id": "oc_demo",
"session_chat_id": "scheduler/agent.daily_summary.background"
}),
payload: serde_json::json!({
"prompt": "请后台整理今天待办"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: Some(1_700_000_010_000),
paused_at: None,
completed_at: None,
created_at: 1_700_000_000_000,
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip, chrono_tz::Asia::Shanghai)
.unwrap()
.unwrap();
assert_eq!(job.kind, SchedulerJobKind::SilentAgentTask);
assert_eq!(job.target.session_chat_id.as_deref(), Some("scheduler/agent.daily_summary.background"));
}
#[test] #[test]
fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() { fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() {
let job = RuntimeJob { let job = RuntimeJob {
@ -791,7 +666,6 @@ mod agent_task_tests {
target: SchedulerJobTarget { target: SchedulerJobTarget {
channel: Some("feishu".to_string()), channel: Some("feishu".to_string()),
chat_id: Some("oc_demo".to_string()), chat_id: Some("oc_demo".to_string()),
session_chat_id: None,
reply_to: None, reply_to: None,
}, },
payload: serde_json::json!({ payload: serde_json::json!({
@ -829,44 +703,6 @@ mod agent_task_tests {
assert_eq!(options.metadata.get("priority").map(String::as_str), Some("1")); assert_eq!(options.metadata.get("priority").map(String::as_str), Some("1"));
assert_eq!(options.metadata.get("urgent").map(String::as_str), Some("false")); assert_eq!(options.metadata.get("urgent").map(String::as_str), Some("false"));
} }
#[test]
fn resolve_execution_chat_id_uses_dedicated_session_for_silent_agent_tasks() {
let job = RuntimeJob {
id: "agent.daily_summary.background".to_string(),
kind: SchedulerJobKind::SilentAgentTask,
schedule: SchedulerSchedule::Interval {
seconds: 300,
startup_delay_secs: 0,
},
target: SchedulerJobTarget {
channel: Some("feishu".to_string()),
chat_id: Some("oc_demo".to_string()),
session_chat_id: None,
reply_to: None,
},
payload: serde_json::json!({
"prompt": "请后台整理今天待办"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
interval_secs: 300,
startup_delay_secs: 0,
};
assert_eq!(
resolve_execution_chat_id(&job).unwrap(),
"scheduler/agent.daily_summary.background"
);
}
} }
impl TryFrom<serde_json::Value> for SchedulerJobTarget { impl TryFrom<serde_json::Value> for SchedulerJobTarget {
@ -887,38 +723,6 @@ mod tests {
use crate::skills::SkillRuntime; use crate::skills::SkillRuntime;
use crate::storage::{SchedulerJobUpsert, SessionStore}; use crate::storage::{SchedulerJobUpsert, SessionStore};
fn test_provider_config() -> LLMProviderConfig {
LLMProviderConfig {
provider_type: "openai".to_string(),
name: "default".to_string(),
base_url: "http://localhost".to_string(),
api_key: "test-key".to_string(),
extra_headers: HashMap::new(),
llm_timeout_secs: 30,
model_id: "test-model".to_string(),
temperature: Some(0.0),
max_tokens: None,
model_extra: HashMap::new(),
max_tool_iterations: 4,
tool_result_max_chars: 20_000,
context_tool_result_trim_chars: 20_000,
}
}
fn test_session_manager() -> SessionManager {
let provider_config = test_provider_config();
SessionManager::new(
4,
100,
false,
"Asia/Shanghai".to_string(),
provider_config.clone(),
HashMap::from([("default".to_string(), provider_config)]),
Arc::new(SkillRuntime::default()),
)
.unwrap()
}
#[test] #[test]
fn runtime_job_skip_policy_advances_from_now() { fn runtime_job_skip_policy_advances_from_now() {
let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap(); let now = Utc.timestamp_millis_opt(1_700_000_000_000).single().unwrap();
@ -1033,7 +837,31 @@ mod tests {
}) })
.unwrap(); .unwrap();
let session_manager = test_session_manager(); let provider_config = LLMProviderConfig {
provider_type: "openai".to_string(),
name: "default".to_string(),
base_url: "http://localhost".to_string(),
api_key: "test-key".to_string(),
extra_headers: HashMap::new(),
llm_timeout_secs: 30,
model_id: "test-model".to_string(),
temperature: Some(0.0),
max_tokens: None,
model_extra: HashMap::new(),
max_tool_iterations: 4,
tool_result_max_chars: 20_000,
context_tool_result_trim_chars: 20_000,
};
let session_manager = SessionManager::new(
4,
100,
false,
"Asia/Shanghai".to_string(),
provider_config.clone(),
HashMap::from([("default".to_string(), provider_config)]),
Arc::new(SkillRuntime::default()),
)
.unwrap();
let scheduler = Scheduler::new( let scheduler = Scheduler::new(
MessageBus::new(8), MessageBus::new(8),
SchedulerConfig { SchedulerConfig {
@ -1060,7 +888,31 @@ mod tests {
fn sync_config_jobs_persists_builtin_memory_maintenance_job() { fn sync_config_jobs_persists_builtin_memory_maintenance_job() {
let store = Arc::new(SessionStore::in_memory().unwrap()); let store = Arc::new(SessionStore::in_memory().unwrap());
let session_manager = test_session_manager(); let provider_config = LLMProviderConfig {
provider_type: "openai".to_string(),
name: "default".to_string(),
base_url: "http://localhost".to_string(),
api_key: "test-key".to_string(),
extra_headers: HashMap::new(),
llm_timeout_secs: 30,
model_id: "test-model".to_string(),
temperature: Some(0.0),
max_tokens: None,
model_extra: HashMap::new(),
max_tool_iterations: 4,
tool_result_max_chars: 20_000,
context_tool_result_trim_chars: 20_000,
};
let session_manager = SessionManager::new(
4,
100,
false,
"Asia/Shanghai".to_string(),
provider_config.clone(),
HashMap::from([("default".to_string(), provider_config)]),
Arc::new(SkillRuntime::default()),
)
.unwrap();
let scheduler = Scheduler::new( let scheduler = Scheduler::new(
MessageBus::new(8), MessageBus::new(8),
SchedulerConfig::default(), SchedulerConfig::default(),
@ -1085,74 +937,12 @@ mod tests {
saved.schedule, saved.schedule,
serde_json::json!({ serde_json::json!({
"type": "cron", "type": "cron",
"expression": "0 */4 * * *" "expression": "0 3 * * *"
}) })
); );
assert_eq!(saved.payload.get("local_time").and_then(|value| value.as_str()), Some("every_4_hours"));
assert!(saved.next_fire_at.is_some()); assert!(saved.next_fire_at.is_some());
} }
#[tokio::test]
async fn silent_agent_task_failure_notifies_primary_chat() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let bus = MessageBus::new(8);
let scheduler = Scheduler::new(
bus.clone(),
SchedulerConfig {
enabled: true,
tick_resolution_ms: 1000,
worker_queue_capacity: 64,
misfire_policy: SchedulerMisfirePolicy::Skip,
jobs: Vec::new(),
},
chrono_tz::Asia::Shanghai,
store,
test_session_manager(),
);
let job = RuntimeJob {
id: "agent.daily_summary.background".to_string(),
kind: SchedulerJobKind::SilentAgentTask,
schedule: SchedulerSchedule::Interval {
seconds: 300,
startup_delay_secs: 0,
},
target: SchedulerJobTarget {
channel: Some("feishu".to_string()),
chat_id: Some("oc_demo".to_string()),
session_chat_id: Some("scheduler/agent.daily_summary.background".to_string()),
reply_to: None,
},
payload: serde_json::json!({}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
interval_secs: 300,
startup_delay_secs: 0,
};
let error = scheduler.execute_job(&job).await.unwrap_err();
assert!(error.to_string().contains("payload.prompt"));
let outbound = tokio::time::timeout(
std::time::Duration::from_millis(100),
bus.consume_outbound(),
)
.await
.unwrap();
assert_eq!(outbound.channel, "feishu");
assert_eq!(outbound.chat_id, "oc_demo");
assert!(outbound.content.contains("定时任务执行失败"));
assert!(outbound.content.contains("agent.daily_summary.background"));
}
#[test] #[test]
fn cron_schedule_uses_configured_timezone() { fn cron_schedule_uses_configured_timezone() {
let now = Utc.with_ymd_and_hms(2026, 4, 23, 18, 0, 0).single().unwrap(); let now = Utc.with_ymd_and_hms(2026, 4, 23, 18, 0, 0).single().unwrap();

View File

@ -925,29 +925,6 @@ impl SessionStore {
Ok(scope_keys) Ok(scope_keys)
} }
pub fn list_memory_scope_keys_updated_since(
&self,
scope_kind: &str,
since_timestamp: i64,
) -> Result<Vec<String>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT DISTINCT scope_key
FROM memories
WHERE scope_kind = ?1 AND updated_at > ?2
ORDER BY scope_key ASC
",
)?;
let rows = stmt.query_map(params![scope_kind, since_timestamp], |row| row.get::<_, String>(0))?;
let mut scope_keys = Vec::new();
for row in rows {
scope_keys.push(row?);
}
Ok(scope_keys)
}
pub fn list_memories_for_scope( pub fn list_memories_for_scope(
&self, &self,
scope_kind: &str, scope_kind: &str,
@ -2452,51 +2429,4 @@ mod tests {
assert_eq!(fetched.run_count, 1); assert_eq!(fetched.run_count, 1);
assert_eq!(fetched.completed_at, Some(1_700_000_000_100)); assert_eq!(fetched.completed_at, Some(1_700_000_000_100));
} }
#[test]
fn test_list_memory_scope_keys_updated_since_filters_recent_scopes() {
let store = SessionStore::in_memory().unwrap();
let first = store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "profile".to_string(),
memory_key: "work".to_string(),
content: "用户在做AI产品".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
})
.unwrap();
let cutoff = first.updated_at;
std::thread::sleep(std::time::Duration::from_millis(2));
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-2".to_string(),
namespace: "preferences".to_string(),
memory_key: "style".to_string(),
content: "偏好简洁表达".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
})
.unwrap();
let recent_scope_keys = store
.list_memory_scope_keys_updated_since("user", cutoff)
.unwrap();
assert_eq!(recent_scope_keys, vec!["feishu:user-2".to_string()]);
}
} }

View File

@ -31,7 +31,7 @@ impl Tool for SchedulerManageTool {
} }
fn description(&self) -> &str { fn description(&self) -> &str {
"Manage DB-backed scheduled jobs. Supports actions: list, get, put, delete, pause, resume. Jobs persist in SQLite and are executed by the scheduler runtime. When creating agent_task or silent_agent_task jobs, keep prompt/system_prompt focused on the work to perform; do not restate execution times unless the task logic truly depends on them, because the trigger already controls timing." "Manage DB-backed scheduled jobs. Supports actions: list, get, put, delete, pause, resume. Jobs persist in SQLite and are executed by the scheduler runtime. When creating agent_task jobs, keep prompt/system_prompt focused on the work to perform; do not restate execution times unless the task logic truly depends on them, because the trigger already controls timing."
} }
fn parameters_schema(&self) -> serde_json::Value { fn parameters_schema(&self) -> serde_json::Value {
@ -67,19 +67,18 @@ impl Tool for SchedulerManageTool {
}, },
"kind": { "kind": {
"type": "string", "type": "string",
"enum": ["internal_event", "outbound_message", "agent_task", "silent_agent_task"] "enum": ["internal_event", "outbound_message", "agent_task"]
}, },
"schedule": { "schedule": {
"type": "object", "type": "object",
"description": "Schedule object, for example {type: 'interval', seconds: 300} or {type: 'cron', expression: '0 9 * * *'}" "description": "Schedule object, for example {type: 'interval', seconds: 300} or {type: 'cron', expression: '0 9 * * *'}"
}, },
"target": { "target": {
"type": "object", "type": "object"
"description": "Target routing. agent_task and silent_agent_task require channel and chat_id. silent_agent_task may optionally set session_chat_id to choose a dedicated background session; otherwise one is derived from the job id."
}, },
"payload": { "payload": {
"type": "object", "type": "object",
"description": format!("Job payload. agent_task and silent_agent_task support prompt, agent, fresh_session, system_prompt, sender_id, metadata. For these kinds, write prompt/system_prompt as execution instructions and avoid repeating schedule phrases or execution times such as 每天9点 or 每小时 unless the task itself must reason about time. {} outbound_message expects content. internal_event expects event.", agent_hint) "description": format!("Job payload. agent_task supports prompt, agent, fresh_session, system_prompt, sender_id, metadata. For agent_task, write prompt/system_prompt as execution instructions and avoid repeating schedule phrases or execution times such as 每天9点 or 每小时 unless the task itself must reason about time. {} outbound_message expects content. internal_event expects event.", agent_hint)
}, },
"max_runs": { "max_runs": {
"type": ["integer", "null"] "type": ["integer", "null"]
@ -209,9 +208,9 @@ fn build_upsert(
args.get("target").cloned().unwrap_or_else(|| json!({})), args.get("target").cloned().unwrap_or_else(|| json!({})),
context, context,
); );
if kind == "agent_task" || kind == "silent_agent_task" { if kind == "agent_task" {
validate_agent_task_payload(&payload, known_agents)?; validate_agent_task_payload(&payload, known_agents)?;
validate_target_fields(&target, &["channel", "chat_id"], &kind)?; validate_target_fields(&target, &["channel", "chat_id"], "agent_task")?;
} else if kind == "outbound_message" { } else if kind == "outbound_message" {
validate_outbound_message_payload(&payload)?; validate_outbound_message_payload(&payload)?;
validate_target_fields(&target, &["channel", "chat_id"], "outbound_message")?; validate_target_fields(&target, &["channel", "chat_id"], "outbound_message")?;
@ -483,43 +482,6 @@ mod tests {
assert!(put_result.output.contains("agent_task")); assert!(put_result.output.contains("agent_task"));
} }
#[tokio::test]
async fn test_scheduler_manage_put_silent_agent_task() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let tool = SchedulerManageTool::new(store.clone(), HashSet::from(["planner".to_string()]));
let put_result = tool
.execute(json!({
"action": "put",
"id": "agent.daily_summary.background",
"kind": "silent_agent_task",
"schedule": {
"type": "cron",
"expression": "0 9 * * *"
},
"target": {
"channel": "feishu",
"chat_id": "oc_demo",
"session_chat_id": "scheduler/agent.daily_summary.background"
},
"payload": {
"prompt": "请后台总结今天待办",
"agent": "planner"
}
}))
.await
.unwrap();
assert!(put_result.success);
let saved = store
.get_scheduler_job("agent.daily_summary.background")
.unwrap()
.unwrap();
assert_eq!(saved.kind, "silent_agent_task");
assert_eq!(saved.target["session_chat_id"], "scheduler/agent.daily_summary.background");
}
#[tokio::test] #[tokio::test]
async fn test_scheduler_manage_rejects_outbound_message_without_target() { async fn test_scheduler_manage_rejects_outbound_message_without_target() {
let store = Arc::new(SessionStore::in_memory().unwrap()); let store = Arc::new(SessionStore::in_memory().unwrap());