Compare commits

..

4 Commits

Author SHA1 Message Date
7fefd40dca feat: 重构记忆工具提示和代理配置,增强用户指导和系统提示 2026-04-23 17:33:10 +08:00
3d241544c5 feat(memory): 添加内置记忆维护作业,增强调度功能并支持有效作业合并 2026-04-23 14:16:22 +08:00
f3f369b329 feat: add llm_timeout_secs to provider configuration and implement timeout handling
- Introduced llm_timeout_secs in ProviderConfig and LLMProviderConfig to specify timeout for LLM requests.
- Updated OpenAIProvider and AnthropicProvider to utilize the timeout setting when creating HTTP clients.
- Enhanced error handling for API responses to include timeout information.
- Modified SessionManager to support agent-specific provider configurations, allowing for more flexible agent management.
- Added tests to verify the correct behavior of timeout settings and agent task validation.
2026-04-23 09:23:15 +08:00
1ffdcab585 feat(scheduler): 添加 agent_task 类型支持,扩展任务调度功能 2026-04-23 08:45:32 +08:00
20 changed files with 2449 additions and 66 deletions

View File

@ -90,7 +90,7 @@ Current behavior:
- Scheduler runs as a background loop inside gateway lifecycle. - Scheduler runs as a background loop inside gateway lifecycle.
- Job definitions and runtime state are persisted in SQLite instead of JSON files. - Job definitions and runtime state are persisted in SQLite instead of JSON files.
- Supported schedule types: delay, interval, at, cron. - Supported schedule types: delay, interval, at, cron.
- Supported job kinds: internal_event, outbound_message. - Supported job kinds: internal_event, outbound_message, agent_task.
- Built-in internal event: session_cleanup, used to clear expired in-memory channel sessions. - Built-in internal event: session_cleanup, used to clear expired in-memory channel sessions.
- Built-in management tool: scheduler_manage. - Built-in management tool: scheduler_manage.
@ -113,6 +113,29 @@ Config example:
"event": "session_cleanup" "event": "session_cleanup"
} }
}, },
{
"id": "agent.daily_summary",
"kind": "agent_task",
"schedule": {
"type": "cron",
"expression": "30 18 * * *"
},
"target": {
"channel": "feishu",
"chat_id": "oc_xxx"
},
"payload": {
"prompt": "请总结今天的项目进展,并列出明天的优先事项",
"agent": "default",
"fresh_session": true,
"system_prompt": "你是日报助手,输出时先给摘要,再给待办。",
"sender_id": "scheduler-daily-summary",
"metadata": {
"job_type": "daily_summary",
"source": "scheduler"
}
}
},
{ {
"id": "daily.reminder", "id": "daily.reminder",
"kind": "outbound_message", "kind": "outbound_message",
@ -136,3 +159,11 @@ Runtime management:
- Use scheduler_manage with action=list|get|put|delete|pause|resume. - Use scheduler_manage with action=list|get|put|delete|pause|resume.
- Jobs created by the tool are written into SQLite and picked up by the scheduler loop. - Jobs created by the tool are written into SQLite and picked up by the scheduler loop.
- Config-defined jobs are also synced into SQLite on startup. - Config-defined jobs are also synced into SQLite on startup.
- agent_task reuses the normal agent pipeline: it creates a synthetic user turn from payload.prompt and runs tools, persistence, and outbound rendering through SessionManager.
- agent_task payload fields:
- prompt: required, synthetic user input.
- agent: optional, choose which configured agent definition to use. default or any configured agent name.
- fresh_session: optional, when true reset the active chat segment before running.
- system_prompt: optional, append a task-specific system message before the synthetic user turn.
- sender_id: optional, overrides the synthetic sender id used for tool context and memory scoping.
- metadata: optional, attached to outbound messages emitted by this task.

View File

@ -157,7 +157,7 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
| 字段 | 类型 | 含义 | | 字段 | 类型 | 含义 |
| --- | --- | --- | | --- | --- | --- |
| `id` | `TEXT PRIMARY KEY` | 任务唯一标识 | | `id` | `TEXT PRIMARY KEY` | 任务唯一标识 |
| `kind` | `TEXT NOT NULL` | 任务类型,当前支持 `internal_event``outbound_message` | | `kind` | `TEXT NOT NULL` | 任务类型,当前支持 `internal_event``outbound_message``agent_task` |
| `schedule_json` | `TEXT NOT NULL` | 统一 schedule 定义JSON 形式保存 `delay` / `interval` / `at` / `cron` | | `schedule_json` | `TEXT NOT NULL` | 统一 schedule 定义JSON 形式保存 `delay` / `interval` / `at` / `cron` |
| `interval_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 | | `interval_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 |
| `startup_delay_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 | | `startup_delay_secs` | `INTEGER NOT NULL DEFAULT 0` | 兼容首版 interval 配置的冗余字段 |
@ -183,6 +183,12 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
- repeating job 会在每次执行后更新 `run_count``last_fired_at``next_fire_at` - repeating job 会在每次执行后更新 `run_count``last_fired_at``next_fire_at`
- one-shot job`delay` / `at`)完成后会进入 `completed` 状态,不再调度。 - one-shot job`delay` / `at`)完成后会进入 `completed` 状态,不再调度。
- 内置 `internal_event` 当前包含 `session_cleanup`,用于回收超时的内存 session 缓存。 - 内置 `internal_event` 当前包含 `session_cleanup`,用于回收超时的内存 session 缓存。
- `agent_task` 会把 `payload.prompt` 作为一次合成用户输入,交给 `SessionManager::run_scheduled_agent_task()` 执行,因此会复用持久化历史、工具调用和渠道下发链路。
- `payload.fresh_session = true` 时,会先对目标 chat 执行一次逻辑 reset再开始本次任务运行。
- `payload.agent` 可指定本次任务使用哪一个已配置 agent未指定时仍使用 `default`
- `payload.system_prompt` 会作为额外 system 消息写入本次任务上下文。
- `payload.sender_id` 会覆盖默认的 `scheduler` 发送者标识。
- `payload.metadata` 会映射到 outbound metadata便于渠道侧做追踪或特殊处理。
## 7. 数据写入流程 ## 7. 数据写入流程

View File

@ -21,9 +21,11 @@ use std::time::Instant;
const MAX_TOOL_RESULT_CHARS: usize = 16_000; const MAX_TOOL_RESULT_CHARS: usize = 16_000;
/// Minimum characters to keep when truncating /// Minimum characters to keep when truncating
const TRUNCATION_SUFFIX_LEN: usize = 200; const TRUNCATION_SUFFIX_LEN: usize = 200;
const MEMORY_TOOL_USAGE_SYSTEM_PROMPT: &str = "在绝大多数请求开始时,你都应先使用长期记忆检索工具 memory_search 来召回相关上下文,然后再决定如何回答或是否需要写入记忆。默认流程是:先用 memory_search(action='search');只有在你已经明确知道 namespace 和 key 时才改用 get只有在需要浏览最近几条记忆时才用 list。即使用户没有明确提到“记忆”或“偏好”只要请求可能与用户长期偏好、稳定事实、历史决策、持续任务或项目上下文有关就应先搜记忆。仅以下少数情况可跳过记忆搜索纯寒暄、一次性简单计算、完全不依赖用户历史的直接事实问答。写入或修改记忆时再使用 memory_manage。仅在遇到高价值且未来仍有用的信息时写入记忆用户长期偏好、稳定事实、用户对你的纠正、持续任务/项目上下文、明确决策。不要保存一次性工具结果、临时列表、敏感凭证或不确定推测。写入时优先使用规范 namespacepreferences、profile、tasks、decisions并优先调用 memory_manage(action='put');同一 namespace/key 可直接覆盖更新。检索时应提供 queries 数组,尽量同时放入中文关键词、英文别名,以及可能的 snake_case memory_key 词,例如 queries=['email', '邮件', 'email_folder_preference']。如果你决定跳过记忆搜索,应先确认当前请求确实属于上述少数例外,而不是因为你忘了检索。"; const MEMORY_TOOL_USAGE_SYSTEM_PROMPT: &str =
include_str!("memory_tool_usage_system_prompt.md");
const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__"; const PENDING_USER_ACTION_MARKER: &str = "__PICOBOT_PENDING_USER_ACTION__";
const DEFAULT_PENDING_ASSISTANT_MESSAGE: &str = "工具已经启动并进入等待用户操作的状态。请先完成外部操作,完成后直接告诉我继续。"; const DEFAULT_PENDING_ASSISTANT_MESSAGE: &str = "工具已经启动并进入等待用户操作的状态。请先完成外部操作,完成后直接告诉我继续。";
const RECOVERABLE_LLM_ERROR_MESSAGE: &str = "模型服务暂时不可用或响应超时。请稍后重试。";
/// Build content blocks from text and media paths /// Build content blocks from text and media paths
fn build_content_blocks(text: &str, media_paths: &[String]) -> Vec<ContentBlock> { fn build_content_blocks(text: &str, media_paths: &[String]) -> Vec<ContentBlock> {
@ -99,6 +101,23 @@ fn parse_pending_tool_output(output: &str) -> Option<String> {
output.strip_prefix(PENDING_USER_ACTION_MARKER).map(|rest| rest.trim().to_string()) output.strip_prefix(PENDING_USER_ACTION_MARKER).map(|rest| rest.trim().to_string())
} }
fn is_recoverable_llm_error(error: &str) -> bool {
let normalized = error.to_ascii_lowercase();
normalized.contains("504")
|| normalized.contains("gateway timeout")
|| normalized.contains("stream timeout")
|| normalized.contains("timed out")
|| normalized.contains("timeout")
}
fn recoverable_llm_message(error: &str) -> String {
if is_recoverable_llm_error(error) {
RECOVERABLE_LLM_ERROR_MESSAGE.to_string()
} else {
format!("模型请求失败:{}", error)
}
}
/// Loop detection result. /// Loop detection result.
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
enum LoopDetectionResult { enum LoopDetectionResult {
@ -386,11 +405,18 @@ impl AgentLoop {
}; };
// Call LLM // Call LLM
let response = (*self.provider).chat(request).await let response = match (*self.provider).chat(request).await {
.map_err(|e| { Ok(response) => response,
Err(e) => {
tracing::error!(error = %e, "LLM request failed"); tracing::error!(error = %e, "LLM request failed");
AgentError::LlmError(e.to_string()) let assistant_message = ChatMessage::assistant(recoverable_llm_message(&e.to_string()));
})?; emitted_messages.push(assistant_message.clone());
return Ok(AgentProcessResult {
final_response: assistant_message,
emitted_messages,
});
}
};
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::debug!( tracing::debug!(
@ -539,11 +565,8 @@ impl AgentLoop {
}) })
} }
Err(e) => { Err(e) => {
// Fallback if summary call fails
tracing::error!(error = %e, "Failed to get summary from LLM"); tracing::error!(error = %e, "Failed to get summary from LLM");
let final_message = ChatMessage::assistant( let final_message = ChatMessage::assistant(recoverable_llm_message(&e.to_string()));
format!("I reached the maximum number of tool call iterations ({}) without completing the task. The work done so far has been lost due to an error. Please try breaking the task into smaller steps.", self.max_iterations)
);
emitted_messages.push(final_message.clone()); emitted_messages.push(final_message.clone());
Ok(AgentProcessResult { Ok(AgentProcessResult {
final_response: final_message, final_response: final_message,
@ -828,6 +851,20 @@ mod tests {
assert_eq!(provider_message.tool_calls.as_ref().unwrap()[0].name, "calculator"); assert_eq!(provider_message.tool_calls.as_ref().unwrap()[0].name, "calculator");
} }
#[test]
fn test_memory_prompt_requires_proactive_memory_search() {
assert!(MEMORY_TOOL_USAGE_SYSTEM_PROMPT.contains("在绝大多数请求开始时"));
assert!(MEMORY_TOOL_USAGE_SYSTEM_PROMPT.contains("先使用长期记忆检索工具 memory_search"));
assert!(MEMORY_TOOL_USAGE_SYSTEM_PROMPT.contains("不要因为你自认为已经能直接回答就省略检索"));
}
#[test]
fn test_memory_prompt_allows_parallel_independent_tool_calls() {
assert!(MEMORY_TOOL_USAGE_SYSTEM_PROMPT.contains("同一轮同时返回多个 tool calls"));
assert!(MEMORY_TOOL_USAGE_SYSTEM_PROMPT.contains("自动并行执行"));
assert!(MEMORY_TOOL_USAGE_SYSTEM_PROMPT.contains("只有当后一个工具的参数依赖"));
}
#[test] #[test]
fn test_truncate_tool_result_handles_utf8_char_boundaries() { fn test_truncate_tool_result_handles_utf8_char_boundaries() {
let input = "".repeat(MAX_TOOL_RESULT_CHARS + 500); let input = "".repeat(MAX_TOOL_RESULT_CHARS + 500);

View File

@ -41,6 +41,7 @@ impl Default for ContextCompressionConfig {
} }
/// Context compressor that reduces message history when it exceeds token limits. /// Context compressor that reduces message history when it exceeds token limits.
#[derive(Clone)]
pub struct ContextCompressor { pub struct ContextCompressor {
config: ContextCompressionConfig, config: ContextCompressionConfig,
context_window: usize, context_window: usize,

View File

@ -0,0 +1,47 @@
# 记忆检索
在绝大多数请求开始时,你都应先使用长期记忆检索工具 memory_search 来召回相关记忆,再决定如何回答或是否需要写入记忆。先检索通常能帮助你识别用户长期偏好、稳定事实、历史决策、持续任务和上下文约束。
## 默认流程
- 先使用长期记忆检索工具 memory_search优先调用 memory_search(action='search')。
- 只有在你已经明确知道 namespace 和 key 时,才改用 get。
- 只有在需要浏览最近几条记忆时,才用 list。
- 即使用户没有明确提到“记忆”或“偏好”,也应该先搜记忆,不要因为你自认为已经能直接回答就省略检索。
## 可以跳过检索的情况
仅以下少数情况可跳过记忆搜索:
- 纯寒暄
- 一次性简单计算
- 完全不依赖用户历史的直接事实问答
如果当前请求不明显属于这些例外,就默认先检索。
## 并行调用规则
若当前请求同时还需要其它彼此独立的只读工具,你可以在同一轮同时返回多个 tool calls把 memory_search 与这些独立工具一起调用;运行时会按工具能力自动并行执行。
只有当后一个工具的参数依赖 memory_search 或其它工具结果时,才分多轮串行调用。
## 检索方式
- 检索时应提供 queries 数组。
- 尽量同时放入中文关键词、英文别名,以及可能的 snake_case memory_key 词。如果最后一轮的会话与前面的话题不一致则优先根据最后一轮用户的会话来生成关键词保留1、 2个历史会话的
- 越靠近最新会话,生成关键词的比例或者权重应该更高
- 例如queries=['email', '邮件', 'email_folder_preference']。
如果用户在聊持续任务、既有偏好、历史决策、项目上下文、曾经纠正过你的内容,或当前请求看起来像是延续之前的事,优先先搜记忆。
## 写入规则
- 写入或修改记忆时,再使用 memory_manage。
- 仅在遇到高价值且未来仍有用的信息时写入记忆:用户长期偏好、稳定事实、用户对你的纠正、持续任务或项目上下文、明确决策。
- 不要保存一次性工具结果、临时列表、敏感凭证或不确定推测。
- 写入时优先使用规范 namespacepreferences、profile、tasks、decisions。
- 优先调用 memory_manage(action='put');同一 namespace/key 可直接覆盖更新。
## 最后检查
如果你决定跳过记忆搜索,应先确认当前请求确实属于上述少数例外,而不是因为你忘了检索,或因为你误以为单凭当前消息就足够。

View File

@ -101,6 +101,8 @@ pub struct ProviderConfig {
pub api_key: String, pub api_key: String,
#[serde(default)] #[serde(default)]
pub extra_headers: HashMap<String, String>, pub extra_headers: HashMap<String, String>,
#[serde(default = "default_llm_timeout_secs")]
pub llm_timeout_secs: u64,
} }
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
@ -132,6 +134,10 @@ fn default_token_limit() -> usize {
128_000 128_000
} }
fn default_llm_timeout_secs() -> u64 {
120
}
#[derive(Debug, Clone, Deserialize, Serialize)] #[derive(Debug, Clone, Deserialize, Serialize)]
pub struct GatewayConfig { pub struct GatewayConfig {
#[serde(default = "default_gateway_host")] #[serde(default = "default_gateway_host")]
@ -166,6 +172,8 @@ pub struct SchedulerConfig {
pub jobs: Vec<SchedulerJobConfig>, pub jobs: Vec<SchedulerJobConfig>,
} }
pub const BUILTIN_MEMORY_MAINTENANCE_JOB_ID: &str = "builtin.memory_maintenance_daily";
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)] #[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum SchedulerMisfirePolicy { pub enum SchedulerMisfirePolicy {
@ -197,6 +205,7 @@ pub struct SchedulerJobConfig {
pub enum SchedulerJobKind { pub enum SchedulerJobKind {
InternalEvent, InternalEvent,
OutboundMessage, OutboundMessage,
AgentTask,
} }
#[derive(Debug, Clone, Deserialize, Serialize, Default)] #[derive(Debug, Clone, Deserialize, Serialize, Default)]
@ -249,6 +258,41 @@ impl SchedulerJobConfig {
} }
} }
impl SchedulerConfig {
pub fn builtin_jobs() -> Vec<SchedulerJobConfig> {
vec![SchedulerJobConfig {
id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(),
enabled: true,
kind: SchedulerJobKind::InternalEvent,
schedule: Some(SchedulerSchedule::Cron {
expression: "0 19 * * *".to_string(),
}),
startup_delay_secs: 0,
interval_secs: 0,
target: SchedulerJobTarget::default(),
payload: serde_json::json!({
"event": "memory_maintenance",
"time_zone": "Asia/Shanghai",
"local_time": "03:00"
}),
}]
}
pub fn effective_jobs(&self) -> Vec<SchedulerJobConfig> {
let mut jobs = Self::builtin_jobs();
for configured in &self.jobs {
if let Some(existing) = jobs.iter_mut().find(|job| job.id == configured.id) {
*existing = configured.clone();
} else {
jobs.push(configured.clone());
}
}
jobs
}
}
impl SchedulerSchedule { impl SchedulerSchedule {
pub fn validate(&self, job_id: &str) -> Result<(), ConfigError> { pub fn validate(&self, job_id: &str) -> Result<(), ConfigError> {
match self { match self {
@ -399,6 +443,7 @@ pub struct LLMProviderConfig {
pub base_url: String, pub base_url: String,
pub api_key: String, pub api_key: String,
pub extra_headers: HashMap<String, String>, pub extra_headers: HashMap<String, String>,
pub llm_timeout_secs: u64,
pub model_id: String, pub model_id: String,
pub temperature: Option<f32>, pub temperature: Option<f32>,
pub max_tokens: Option<u32>, pub max_tokens: Option<u32>,
@ -460,6 +505,7 @@ impl Config {
base_url: provider.base_url.clone(), base_url: provider.base_url.clone(),
api_key: provider.api_key.clone(), api_key: provider.api_key.clone(),
extra_headers: provider.extra_headers.clone(), extra_headers: provider.extra_headers.clone(),
llm_timeout_secs: provider.llm_timeout_secs,
model_id: model.model_id.clone(), model_id: model.model_id.clone(),
temperature: model.temperature, temperature: model.temperature,
max_tokens: model.max_tokens, max_tokens: model.max_tokens,
@ -600,6 +646,43 @@ mod tests {
assert_eq!(provider_config.name, "aliyun"); assert_eq!(provider_config.name, "aliyun");
assert_eq!(provider_config.model_id, "qwen-plus"); assert_eq!(provider_config.model_id, "qwen-plus");
assert_eq!(provider_config.temperature, Some(0.0)); assert_eq!(provider_config.temperature, Some(0.0));
assert_eq!(provider_config.llm_timeout_secs, 120);
}
#[test]
fn test_provider_config_loads_custom_llm_timeout() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {},
"llm_timeout_secs": 400
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
}
}"#,
)
.unwrap();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
let provider_config = config.get_provider_config("default").unwrap();
assert_eq!(provider_config.llm_timeout_secs, 400);
} }
#[test] #[test]
@ -692,6 +775,60 @@ mod tests {
assert_eq!(config.scheduler.worker_queue_capacity, 64); assert_eq!(config.scheduler.worker_queue_capacity, 64);
assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::Skip); assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::Skip);
assert!(config.scheduler.jobs.is_empty()); assert!(config.scheduler.jobs.is_empty());
let effective_jobs = config.scheduler.effective_jobs();
assert_eq!(effective_jobs.len(), 1);
assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert_eq!(effective_jobs[0].kind, SchedulerJobKind::InternalEvent);
assert_eq!(
effective_jobs[0].resolved_schedule().unwrap(),
SchedulerSchedule::Cron {
expression: "0 19 * * *".to_string(),
}
);
}
#[test]
fn test_scheduler_effective_jobs_allows_builtin_override() {
let mut scheduler = SchedulerConfig::default();
scheduler.jobs.push(SchedulerJobConfig {
id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(),
enabled: false,
kind: SchedulerJobKind::InternalEvent,
schedule: Some(SchedulerSchedule::Cron {
expression: "15 2 * * *".to_string(),
}),
startup_delay_secs: 0,
interval_secs: 0,
target: SchedulerJobTarget::default(),
payload: serde_json::json!({
"event": "memory_maintenance",
"time_zone": "UTC",
"local_time": "02:15"
}),
});
scheduler.jobs.push(SchedulerJobConfig {
id: "custom.reminder".to_string(),
enabled: true,
kind: SchedulerJobKind::InternalEvent,
schedule: Some(SchedulerSchedule::Delay { seconds: 30 }),
startup_delay_secs: 0,
interval_secs: 0,
target: SchedulerJobTarget::default(),
payload: serde_json::json!({"event": "custom"}),
});
let effective_jobs = scheduler.effective_jobs();
assert_eq!(effective_jobs.len(), 2);
assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert!(!effective_jobs[0].enabled);
assert_eq!(
effective_jobs[0].resolved_schedule().unwrap(),
SchedulerSchedule::Cron {
expression: "15 2 * * *".to_string(),
}
);
assert_eq!(effective_jobs[1].id, "custom.reminder");
} }
#[test] #[test]
@ -837,18 +974,78 @@ mod tests {
config.scheduler.jobs[0].resolved_schedule().unwrap(), config.scheduler.jobs[0].resolved_schedule().unwrap(),
SchedulerSchedule::Delay { seconds: 30 } SchedulerSchedule::Delay { seconds: 30 }
); );
assert_eq!(config.scheduler.jobs[0].kind, SchedulerJobKind::InternalEvent);
assert_eq!( assert_eq!(
config.scheduler.jobs[1].resolved_schedule().unwrap(), config.scheduler.jobs[1].resolved_schedule().unwrap(),
SchedulerSchedule::At { SchedulerSchedule::At {
timestamp: "2026-04-23T09:00:00+00:00".to_string(), timestamp: "2026-04-23T09:00:00+00:00".to_string(),
} }
); );
assert_eq!(config.scheduler.jobs[1].kind, SchedulerJobKind::OutboundMessage);
assert_eq!( assert_eq!(
config.scheduler.jobs[2].resolved_schedule().unwrap(), config.scheduler.jobs[2].resolved_schedule().unwrap(),
SchedulerSchedule::Cron { SchedulerSchedule::Cron {
expression: "0 9 * * *".to_string(), expression: "0 9 * * *".to_string(),
} }
); );
assert_eq!(config.scheduler.jobs[2].kind, SchedulerJobKind::InternalEvent);
}
#[test]
fn test_scheduler_config_loads_agent_task_job() {
let file = tempfile::NamedTempFile::new().unwrap();
std::fs::write(
file.path(),
r#"{
"providers": {
"aliyun": {
"type": "openai",
"base_url": "https://example.invalid/v1",
"api_key": "test-key",
"extra_headers": {}
}
},
"models": {
"qwen-plus": {
"model_id": "qwen-plus"
}
},
"agents": {
"default": {
"provider": "aliyun",
"model": "qwen-plus"
}
},
"scheduler": {
"enabled": true,
"jobs": [
{
"id": "agent.daily_summary",
"kind": "agent_task",
"schedule": {
"type": "cron",
"expression": "0 9 * * *"
},
"target": {
"channel": "feishu",
"chat_id": "oc_demo"
},
"payload": {
"prompt": "请总结今天待办"
}
}
]
}
}"#,
)
.unwrap();
let config = Config::load(file.path().to_str().unwrap()).unwrap();
let job = &config.scheduler.jobs[0];
assert_eq!(job.kind, SchedulerJobKind::AgentTask);
assert_eq!(job.target.channel.as_deref(), Some("feishu"));
assert_eq!(job.target.chat_id.as_deref(), Some("oc_demo"));
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
} }
#[test] #[test]

View File

@ -0,0 +1,35 @@
# PicoBot 代理配置
## 身份
- 你是 PicoBot一名务实、可靠的通用助理。
- 你的目标是理解用户当下的真实需求,并给出清晰、可执行的帮助。
## 工作方式
- 优先理解意图,再给出回应或行动。
- 保持简洁、准确、自然,不故作热情,也不空泛铺陈。
- 能直接验证的内容尽量先验证,避免凭空猜测。
- 当现有工具是完成任务的最直接方式时,优先使用工具。
- 除非用户明确要求改变方向,否则保持用户原本目标不变。
## 助理原则
- 优先解决问题,而不是展示过程。
- 输出要方便用户立即使用,结论尽量明确。
- 对不确定的地方要直说,不把猜测包装成事实。
- 复杂任务先收敛重点,简单任务直接给结果。
- 避免不必要的重复、客套和冗长说明。
## 回复规则
- 除非用户另有要求,否则使用中文回复。
- 默认短而清楚,按信息密度组织内容。
- 如果任务涉及文件、命令、配置或下一步操作,优先给出最关键的那部分。
- 如果存在限制、风险或前提条件,要直接说明。
## 补充要求
- 你是 PicoBot。
- 回答应以帮助用户完成当前目标为中心。
- 在信息不足时先补关键前提,在信息充分时直接执行。

View File

@ -0,0 +1 @@
你是 PicoBot 的后台记忆整理器。你必须根据输入的候选记忆做语义整理,并严格返回 JSON不要输出 Markdown 代码块,不要输出额外解释。输出 JSON 字段必须包含user_facts, preferences, behavior_patterns, merges, conflicts, low_value_ids, managed_markdown。user_facts、preferences、behavior_patterns 是字符串数组。merges 是对象数组,每个对象必须包含 source_ids、namespace、memory_key、content。conflicts 是对象数组,每个对象必须包含 source_ids、note。low_value_ids 是需要删除的候选记忆 id 数组。只能引用输入里出现过的候选 id。managed_markdown 必须是 Markdown 文本,且只保留稳定模式,不写一次性事件。

View File

@ -2,6 +2,7 @@ pub mod http;
pub mod session; pub mod session;
pub mod ws; pub mod ws;
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use axum::{routing, Router}; use axum::{routing, Router};
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -9,6 +10,7 @@ use tokio::net::TcpListener;
use crate::bus::{MessageBus, OutboundDispatcher}; use crate::bus::{MessageBus, OutboundDispatcher};
use crate::channels::ChannelManager; use crate::channels::ChannelManager;
use crate::config::Config; use crate::config::Config;
use crate::config::LLMProviderConfig;
use crate::logging; use crate::logging;
use crate::scheduler::Scheduler; use crate::scheduler::Scheduler;
use crate::skills::SkillRuntime; use crate::skills::SkillRuntime;
@ -27,6 +29,10 @@ impl GatewayState {
// Get provider config for SessionManager // Get provider config for SessionManager
let provider_config = config.get_provider_config("default")?; let provider_config = config.get_provider_config("default")?;
let mut provider_configs = HashMap::<String, LLMProviderConfig>::new();
for agent_name in config.agents.keys() {
provider_configs.insert(agent_name.clone(), config.get_provider_config(agent_name)?);
}
// Session TTL from config (default 4 hours) // Session TTL from config (default 4 hours)
let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4); let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4);
@ -40,6 +46,7 @@ impl GatewayState {
agent_prompt_reinject_every, agent_prompt_reinject_every,
show_tool_results, show_tool_results,
provider_config, provider_config,
provider_configs,
skills, skills,
)?; )?;
let channel_manager = ChannelManager::new(); let channel_manager = ChannelManager::new();

File diff suppressed because it is too large Load Diff

View File

@ -1,12 +1,23 @@
use std::path::PathBuf; use std::path::PathBuf;
use chrono::Local;
use tracing_appender::rolling::{RollingFileAppender, Rotation}; use tracing_appender::rolling::{RollingFileAppender, Rotation};
use tracing_subscriber::{ use tracing_subscriber::{
fmt, fmt,
fmt::time::FormatTime,
layer::SubscriberExt, layer::SubscriberExt,
util::SubscriberInitExt, util::SubscriberInitExt,
EnvFilter, EnvFilter,
}; };
#[derive(Clone, Copy, Debug, Default)]
struct LocalTimestamp;
impl FormatTime for LocalTimestamp {
fn format_time(&self, writer: &mut tracing_subscriber::fmt::format::Writer<'_>) -> std::fmt::Result {
write!(writer, "{}", Local::now().to_rfc3339_opts(chrono::SecondsFormat::Millis, true))
}
}
/// Get the default log directory path: ~/.picobot/logs /// Get the default log directory path: ~/.picobot/logs
pub fn get_default_log_dir() -> PathBuf { pub fn get_default_log_dir() -> PathBuf {
let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from(".")); let home = dirs::home_dir().unwrap_or_else(|| PathBuf::from("."));
@ -44,12 +55,14 @@ pub fn init_logging() {
let file_layer = fmt::layer() let file_layer = fmt::layer()
.with_writer(file_appender) .with_writer(file_appender)
.with_timer(LocalTimestamp)
.with_ansi(false) .with_ansi(false)
.with_target(true) .with_target(true)
.with_level(true) .with_level(true)
.with_thread_ids(true); .with_thread_ids(true);
let console_layer = fmt::layer() let console_layer = fmt::layer()
.with_timer(LocalTimestamp)
.with_target(true) .with_target(true)
.with_level(true); .with_level(true);
@ -68,6 +81,7 @@ pub fn init_logging_console_only() {
.unwrap_or_else(|_| EnvFilter::new("info")); .unwrap_or_else(|_| EnvFilter::new("info"));
let console_layer = fmt::layer() let console_layer = fmt::layer()
.with_timer(LocalTimestamp)
.with_target(true) .with_target(true)
.with_level(true); .with_level(true);

View File

@ -2,6 +2,7 @@ use async_trait::async_trait;
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Duration;
use crate::bus::message::ContentBlock; use crate::bus::message::ContentBlock;
use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, Tool, ToolCall}; use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, Tool, ToolCall};
@ -58,6 +59,7 @@ pub struct AnthropicProvider {
api_key: String, api_key: String,
base_url: String, base_url: String,
extra_headers: HashMap<String, String>, extra_headers: HashMap<String, String>,
llm_timeout_secs: u64,
model_id: String, model_id: String,
temperature: Option<f32>, temperature: Option<f32>,
max_tokens: Option<u32>, max_tokens: Option<u32>,
@ -70,17 +72,24 @@ impl AnthropicProvider {
api_key: String, api_key: String,
base_url: String, base_url: String,
extra_headers: HashMap<String, String>, extra_headers: HashMap<String, String>,
llm_timeout_secs: u64,
model_id: String, model_id: String,
temperature: Option<f32>, temperature: Option<f32>,
max_tokens: Option<u32>, max_tokens: Option<u32>,
model_extra: HashMap<String, serde_json::Value>, model_extra: HashMap<String, serde_json::Value>,
) -> Self { ) -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(llm_timeout_secs))
.build()
.unwrap_or_else(|_| Client::new());
Self { Self {
client: Client::new(), client,
name, name,
api_key, api_key,
base_url, base_url,
extra_headers, extra_headers,
llm_timeout_secs,
model_id, model_id,
temperature, temperature,
max_tokens, max_tokens,
@ -190,8 +199,21 @@ impl LLMProvider for AnthropicProvider {
} }
let resp = req_builder.json(&body).send().await?; let resp = req_builder.json(&body).send().await?;
let status = resp.status();
let text = resp.text().await?;
let anthropic_resp: AnthropicResponse = resp.json().await?; if !status.is_success() {
return Err(format!("API error {}: {}", status, text).into());
}
#[cfg(debug_assertions)]
{
let resp_preview: String = text.chars().take(100).collect();
tracing::debug!(status = %status, response_preview = %resp_preview, response_len = %text.len(), timeout_secs = self.llm_timeout_secs, "Anthropic response (first 100 chars shown)");
}
let anthropic_resp: AnthropicResponse = serde_json::from_str(&text)
.map_err(|e| format!("decode error: {} | body: {}", e, &text))?;
let mut content = String::new(); let mut content = String::new();
let mut tool_calls = Vec::new(); let mut tool_calls = Vec::new();

View File

@ -15,6 +15,7 @@ pub fn create_provider(config: LLMProviderConfig) -> Result<Box<dyn LLMProvider>
config.api_key, config.api_key,
config.base_url, config.base_url,
config.extra_headers, config.extra_headers,
config.llm_timeout_secs,
config.model_id, config.model_id,
config.temperature, config.temperature,
config.max_tokens, config.max_tokens,
@ -25,6 +26,7 @@ pub fn create_provider(config: LLMProviderConfig) -> Result<Box<dyn LLMProvider>
config.api_key, config.api_key,
config.base_url, config.base_url,
config.extra_headers, config.extra_headers,
config.llm_timeout_secs,
config.model_id, config.model_id,
config.temperature, config.temperature,
config.max_tokens, config.max_tokens,

View File

@ -3,6 +3,7 @@ use reqwest::Client;
use serde::Deserialize; use serde::Deserialize;
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Duration;
use crate::bus::message::ContentBlock; use crate::bus::message::ContentBlock;
use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, ToolCall}; use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, ToolCall};
@ -28,6 +29,7 @@ pub struct OpenAIProvider {
api_key: String, api_key: String,
base_url: String, base_url: String,
extra_headers: HashMap<String, String>, extra_headers: HashMap<String, String>,
llm_timeout_secs: u64,
model_id: String, model_id: String,
temperature: Option<f32>, temperature: Option<f32>,
max_tokens: Option<u32>, max_tokens: Option<u32>,
@ -40,17 +42,24 @@ impl OpenAIProvider {
api_key: String, api_key: String,
base_url: String, base_url: String,
extra_headers: HashMap<String, String>, extra_headers: HashMap<String, String>,
llm_timeout_secs: u64,
model_id: String, model_id: String,
temperature: Option<f32>, temperature: Option<f32>,
max_tokens: Option<u32>, max_tokens: Option<u32>,
model_extra: HashMap<String, serde_json::Value>, model_extra: HashMap<String, serde_json::Value>,
) -> Self { ) -> Self {
let client = Client::builder()
.timeout(Duration::from_secs(llm_timeout_secs))
.build()
.unwrap_or_else(|_| Client::new());
Self { Self {
client: Client::new(), client,
name, name,
api_key, api_key,
base_url, base_url,
extra_headers, extra_headers,
llm_timeout_secs,
model_id, model_id,
temperature, temperature,
max_tokens, max_tokens,
@ -209,7 +218,7 @@ impl LLMProvider for OpenAIProvider {
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
{ {
let resp_preview: String = text.chars().take(100).collect(); let resp_preview: String = text.chars().take(100).collect();
tracing::debug!(status = %status, response_preview = %resp_preview, response_len = %text.len(), "LLM response (first 100 chars shown)"); tracing::debug!(status = %status, response_preview = %resp_preview, response_len = %text.len(), timeout_secs = self.llm_timeout_secs, "LLM response (first 100 chars shown)");
} }
if !status.is_success() { if !status.is_success() {
@ -275,6 +284,7 @@ mod tests {
"key".to_string(), "key".to_string(),
"https://example.com/v1".to_string(), "https://example.com/v1".to_string(),
HashMap::new(), HashMap::new(),
120,
"gpt-test".to_string(), "gpt-test".to_string(),
None, None,
None, None,

View File

@ -11,6 +11,7 @@ use crate::config::{
SchedulerSchedule, SchedulerSchedule,
}; };
use crate::gateway::session::SessionManager; use crate::gateway::session::SessionManager;
use crate::gateway::session::ScheduledAgentTaskOptions;
use crate::storage::{ use crate::storage::{
SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore, SchedulerJobRecord, SchedulerJobState, SchedulerJobStatus, SchedulerJobUpsert, SessionStore,
}; };
@ -71,8 +72,8 @@ impl Scheduler {
fn sync_config_jobs(&self) -> anyhow::Result<()> { fn sync_config_jobs(&self) -> anyhow::Result<()> {
let now = Utc::now(); let now = Utc::now();
for job in &self.config.jobs { for job in self.config.effective_jobs() {
let runtime = RuntimeJob::from_config(job, now, self.config.misfire_policy)?; let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy)?;
self.store.upsert_scheduler_job(&runtime.to_upsert())?; self.store.upsert_scheduler_job(&runtime.to_upsert())?;
} }
Ok(()) Ok(())
@ -87,6 +88,20 @@ impl Scheduler {
continue; continue;
}; };
if record.next_fire_at.is_none() && job.next_fire_at.is_some() {
self.store.update_scheduler_job_runtime(
&job.id,
job.state.clone(),
job.last_status.clone(),
job.last_error.as_deref(),
job.run_count,
job.last_fired_at,
job.next_fire_at,
job.paused_at,
job.completed_at,
)?;
}
if !job.is_due(now) { if !job.is_due(now) {
continue; continue;
} }
@ -141,6 +156,12 @@ impl Scheduler {
SchedulerJobKind::InternalEvent => { SchedulerJobKind::InternalEvent => {
execute_internal_event(&self.session_manager, job).await?; execute_internal_event(&self.session_manager, job).await?;
} }
SchedulerJobKind::AgentTask => {
let outbound_messages = execute_agent_task(&self.session_manager, job).await?;
for message in outbound_messages {
self.bus.publish_outbound(message).await?;
}
}
} }
Ok(()) Ok(())
@ -214,6 +235,7 @@ impl RuntimeJob {
let kind = match record.kind.as_str() { let kind = match record.kind.as_str() {
"internal_event" => SchedulerJobKind::InternalEvent, "internal_event" => SchedulerJobKind::InternalEvent,
"outbound_message" => SchedulerJobKind::OutboundMessage, "outbound_message" => SchedulerJobKind::OutboundMessage,
"agent_task" => SchedulerJobKind::AgentTask,
other => { other => {
tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind"); tracing::warn!(job_id = %record.id, kind = %other, "Skipping unsupported scheduler job kind");
return Ok(None); return Ok(None);
@ -296,6 +318,7 @@ impl RuntimeJob {
kind: match self.kind { kind: match self.kind {
SchedulerJobKind::InternalEvent => "internal_event".to_string(), SchedulerJobKind::InternalEvent => "internal_event".to_string(),
SchedulerJobKind::OutboundMessage => "outbound_message".to_string(), SchedulerJobKind::OutboundMessage => "outbound_message".to_string(),
SchedulerJobKind::AgentTask => "agent_task".to_string(),
}, },
schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})), schedule: serde_json::to_value(&self.schedule).unwrap_or_else(|_| serde_json::json!({})),
interval_secs: self.interval_secs, interval_secs: self.interval_secs,
@ -461,10 +484,211 @@ async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJ
tracing::info!(job_id = %job.id, removed, "Scheduler session cleanup completed"); tracing::info!(job_id = %job.id, removed, "Scheduler session cleanup completed");
Ok(()) Ok(())
} }
"memory_maintenance" => {
let results = session_manager.run_memory_maintenance_for_all_scopes().await?;
for result in &results {
tracing::info!(
job_id = %job.id,
scope_key = %result.scope_key,
user_facts = result.output.user_facts.len(),
preferences = result.output.preferences.len(),
behavior_patterns = result.output.behavior_patterns.len(),
merges = result.output.merges.len(),
conflicts = result.output.conflicts.len(),
low_value = result.output.low_value_ids.len(),
"Scheduler completed memory maintenance model run"
);
}
tracing::info!(job_id = %job.id, scope_count = results.len(), "Scheduler memory maintenance triggered");
Ok(())
}
other => anyhow::bail!("unsupported internal scheduler event: {}", other), other => anyhow::bail!("unsupported internal scheduler event: {}", other),
} }
} }
async fn execute_agent_task(
session_manager: &SessionManager,
job: &RuntimeJob,
) -> anyhow::Result<Vec<OutboundMessage>> {
let channel_name = job
.target
.channel
.as_deref()
.ok_or_else(|| anyhow::anyhow!("agent_task requires target.channel"))?;
let chat_id = job
.target
.chat_id
.as_deref()
.ok_or_else(|| anyhow::anyhow!("agent_task requires target.chat_id"))?;
let prompt = job
.payload
.get("prompt")
.and_then(|value| value.as_str())
.ok_or_else(|| anyhow::anyhow!("agent_task payload.prompt must be a string"))?;
let options = parse_scheduled_agent_task_options(job)?;
session_manager
.run_scheduled_agent_task(channel_name, chat_id, prompt, options)
.await
.map_err(|error| anyhow::anyhow!(error.to_string()))
}
fn parse_scheduled_agent_task_options(job: &RuntimeJob) -> anyhow::Result<ScheduledAgentTaskOptions> {
let sender_id = job
.payload
.get("sender_id")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let fresh_session = job
.payload
.get("fresh_session")
.and_then(|value| value.as_bool())
.unwrap_or(false);
let system_prompt = job
.payload
.get("system_prompt")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let agent = job
.payload
.get("agent")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let metadata = parse_metadata_map(job.payload.get("metadata"))?;
Ok(ScheduledAgentTaskOptions {
sender_id,
fresh_session,
system_prompt,
metadata,
agent,
})
}
fn parse_metadata_map(value: Option<&serde_json::Value>) -> anyhow::Result<HashMap<String, String>> {
let Some(value) = value else {
return Ok(HashMap::new());
};
let object = value
.as_object()
.ok_or_else(|| anyhow::anyhow!("agent_task payload.metadata must be an object"))?;
let mut metadata = HashMap::with_capacity(object.len());
for (key, value) in object {
let stringified = match value {
serde_json::Value::String(inner) => inner.clone(),
serde_json::Value::Null => "null".to_string(),
serde_json::Value::Bool(inner) => inner.to_string(),
serde_json::Value::Number(inner) => inner.to_string(),
_ => {
return Err(anyhow::anyhow!(
"agent_task payload.metadata field '{}' must be a string, number, bool, or null",
key
))
}
};
metadata.insert(key.clone(), stringified);
}
Ok(metadata)
}
#[cfg(test)]
mod agent_task_tests {
use super::*;
#[test]
fn runtime_job_from_record_supports_agent_task_kind() {
let record = SchedulerJobRecord {
id: "agent.daily_summary".to_string(),
kind: "agent_task".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 300
}),
interval_secs: 0,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "feishu",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({
"prompt": "请总结今天待办"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: Some(1_700_000_010_000),
paused_at: None,
completed_at: None,
created_at: 1_700_000_000_000,
updated_at: 1_700_000_000_000,
};
let job = RuntimeJob::from_record(&record, SchedulerMisfirePolicy::Skip)
.unwrap()
.unwrap();
assert_eq!(job.kind, SchedulerJobKind::AgentTask);
assert_eq!(job.payload.get("prompt").and_then(|value| value.as_str()), Some("请总结今天待办"));
}
#[test]
fn parse_scheduled_agent_task_options_supports_fresh_session_and_metadata() {
let job = RuntimeJob {
id: "agent.daily_summary".to_string(),
kind: SchedulerJobKind::AgentTask,
schedule: SchedulerSchedule::Interval {
seconds: 300,
startup_delay_secs: 0,
},
target: SchedulerJobTarget {
channel: Some("feishu".to_string()),
chat_id: Some("oc_demo".to_string()),
reply_to: None,
},
payload: serde_json::json!({
"prompt": "请总结今天待办",
"agent": "planner",
"sender_id": "scheduler-bot",
"fresh_session": true,
"system_prompt": "你是日报助手",
"metadata": {
"job_type": "daily_summary",
"priority": 1,
"urgent": false
}
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: None,
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
interval_secs: 300,
startup_delay_secs: 0,
};
let options = parse_scheduled_agent_task_options(&job).unwrap();
assert_eq!(options.agent.as_deref(), Some("planner"));
assert_eq!(options.sender_id.as_deref(), Some("scheduler-bot"));
assert!(options.fresh_session);
assert_eq!(options.system_prompt.as_deref(), Some("你是日报助手"));
assert_eq!(options.metadata.get("job_type").map(String::as_str), Some("daily_summary"));
assert_eq!(options.metadata.get("priority").map(String::as_str), Some("1"));
assert_eq!(options.metadata.get("urgent").map(String::as_str), Some("false"));
}
}
impl TryFrom<serde_json::Value> for SchedulerJobTarget { impl TryFrom<serde_json::Value> for SchedulerJobTarget {
type Error = anyhow::Error; type Error = anyhow::Error;
@ -476,6 +700,12 @@ impl TryFrom<serde_json::Value> for SchedulerJobTarget {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::collections::HashMap;
use crate::bus::MessageBus;
use crate::config::LLMProviderConfig;
use crate::gateway::session::SessionManager;
use crate::skills::SkillRuntime;
use crate::storage::{SchedulerJobUpsert, SessionStore};
#[test] #[test]
fn runtime_job_skip_policy_advances_from_now() { fn runtime_job_skip_policy_advances_from_now() {
@ -555,4 +785,81 @@ mod tests {
}); });
assert_eq!(job.next_fire_at, Some(1_700_000_010_000)); assert_eq!(job.next_fire_at, Some(1_700_000_010_000));
} }
#[tokio::test]
async fn process_tick_persists_initial_next_fire_at_for_db_created_jobs() {
let store = Arc::new(SessionStore::in_memory().unwrap());
store
.upsert_scheduler_job(&SchedulerJobUpsert {
id: "massage_reminder".to_string(),
kind: "outbound_message".to_string(),
schedule: serde_json::json!({
"type": "interval",
"seconds": 60
}),
interval_secs: 60,
startup_delay_secs: 0,
target: serde_json::json!({
"channel": "feishu",
"chat_id": "oc_demo"
}),
payload: serde_json::json!({
"content": "ping"
}),
enabled: true,
state: SchedulerJobState::Scheduled,
last_status: None,
last_error: None,
run_count: 0,
max_runs: Some(1),
last_fired_at: None,
next_fire_at: None,
paused_at: None,
completed_at: None,
})
.unwrap();
let provider_config = LLMProviderConfig {
provider_type: "openai".to_string(),
name: "default".to_string(),
base_url: "http://localhost".to_string(),
api_key: "test-key".to_string(),
extra_headers: HashMap::new(),
llm_timeout_secs: 30,
model_id: "test-model".to_string(),
temperature: Some(0.0),
max_tokens: None,
model_extra: HashMap::new(),
token_limit: 4096,
max_tool_iterations: 4,
};
let session_manager = SessionManager::new(
4,
100,
false,
provider_config.clone(),
HashMap::from([("default".to_string(), provider_config)]),
Arc::new(SkillRuntime::default()),
)
.unwrap();
let scheduler = Scheduler::new(
MessageBus::new(8),
SchedulerConfig {
enabled: true,
tick_resolution_ms: 1000,
worker_queue_capacity: 64,
misfire_policy: SchedulerMisfirePolicy::Skip,
jobs: Vec::new(),
},
store.clone(),
session_manager,
);
scheduler.process_tick().await.unwrap();
let saved = store.get_scheduler_job("massage_reminder").unwrap().unwrap();
assert!(saved.next_fire_at.is_some());
assert_eq!(saved.run_count, 0);
assert_eq!(saved.state, SchedulerJobState::Scheduled);
}
} }

View File

@ -813,6 +813,50 @@ impl SessionStore {
Ok(memories) Ok(memories)
} }
pub fn list_memory_scope_keys(&self, scope_kind: &str) -> Result<Vec<String>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT DISTINCT scope_key
FROM memories
WHERE scope_kind = ?1
ORDER BY scope_key ASC
",
)?;
let rows = stmt.query_map(params![scope_kind], |row| row.get::<_, String>(0))?;
let mut scope_keys = Vec::new();
for row in rows {
scope_keys.push(row?);
}
Ok(scope_keys)
}
pub fn list_memories_for_scope(
&self,
scope_kind: &str,
scope_key: &str,
) -> Result<Vec<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT id, scope_kind, scope_key, namespace, memory_key, content,
source_type, source_session_id, source_message_id, source_message_seq,
source_channel_name, source_chat_id, created_at, updated_at
FROM memories
WHERE scope_kind = ?1 AND scope_key = ?2
ORDER BY updated_at DESC, namespace ASC, memory_key ASC
",
)?;
let rows = stmt.query_map(params![scope_kind, scope_key], map_memory_record)?;
let mut memories = Vec::new();
for row in rows {
memories.push(row?);
}
Ok(memories)
}
pub fn update_memory( pub fn update_memory(
&self, &self,
input: &MemoryUpsert, input: &MemoryUpsert,
@ -1911,6 +1955,66 @@ mod tests {
assert!(hits.iter().any(|memory| memory.memory_key == "quality")); assert!(hits.iter().any(|memory| memory.memory_key == "quality"));
} }
#[test]
fn test_memory_scope_listing_and_full_scope_read() {
let store = SessionStore::in_memory().unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-2".to_string(),
namespace: "preferences".to_string(),
memory_key: "style".to_string(),
content: "偏好简洁表达".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-2".to_string()),
source_message_id: Some("msg-2".to_string()),
source_message_seq: Some(2),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-2".to_string()),
})
.unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "profile".to_string(),
memory_key: "work".to_string(),
content: "用户在做AI产品".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-1".to_string()),
source_message_id: Some("msg-1".to_string()),
source_message_seq: Some(1),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-1".to_string()),
})
.unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "patterns".to_string(),
memory_key: "workflow".to_string(),
content: "习惯先问方案再要代码".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-1".to_string()),
source_message_id: Some("msg-3".to_string()),
source_message_seq: Some(3),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-1".to_string()),
})
.unwrap();
let scope_keys = store.list_memory_scope_keys("user").unwrap();
assert_eq!(scope_keys, vec!["feishu:user-1".to_string(), "feishu:user-2".to_string()]);
let full_scope = store.list_memories_for_scope("user", "feishu:user-1").unwrap();
assert_eq!(full_scope.len(), 2);
assert!(full_scope.iter().all(|memory| memory.scope_key == "feishu:user-1"));
assert!(full_scope.iter().any(|memory| memory.memory_key == "work"));
assert!(full_scope.iter().any(|memory| memory.memory_key == "workflow"));
}
#[test] #[test]
fn test_scheduler_job_roundtrip_and_runtime_update() { fn test_scheduler_job_roundtrip_and_runtime_update() {
let store = SessionStore::in_memory().unwrap(); let store = SessionStore::in_memory().unwrap();

View File

@ -23,7 +23,7 @@ impl Tool for MemorySearchTool {
} }
fn description(&self) -> &str { fn description(&self) -> &str {
"Search and read long-term user memories stored in SQLite. This is the default entry point for memory retrieval and should usually be the first memory tool you call at the start of a request, unless the request is clearly a simple greeting, a one-off calculation, or a direct fact question that does not depend on user history. Use it to recall prior preferences, stable facts, historical decisions, and ongoing task context. This tool is read-only and supports three actions: search for multi-keyword recall, get for exact namespace/key lookup, and list for browsing recent memories. Prefer this tool over memory_manage whenever you only need to retrieve memory." "Search and read long-term user memories stored in SQLite. This is the default entry point for memory retrieval and should usually be the first memory tool you call at the start of a request, unless the request is clearly a simple greeting, a one-off calculation, or a direct fact question that does not depend on user history. Use it to recall prior preferences, stable facts, historical decisions, and ongoing task context. If the request also needs other independent read-only tools, you may call memory_search in the same round alongside them. This tool is read-only and supports three actions: search for multi-keyword recall, get for exact namespace/key lookup, and list for browsing recent memories. Prefer this tool over memory_manage whenever you only need to retrieve memory."
} }
fn parameters_schema(&self) -> serde_json::Value { fn parameters_schema(&self) -> serde_json::Value {

View File

@ -1,3 +1,4 @@
use std::collections::HashSet;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
@ -11,11 +12,15 @@ use crate::tools::traits::{Tool, ToolResult};
pub struct SchedulerManageTool { pub struct SchedulerManageTool {
store: Arc<SessionStore>, store: Arc<SessionStore>,
known_agents: Arc<HashSet<String>>,
} }
impl SchedulerManageTool { impl SchedulerManageTool {
pub fn new(store: Arc<SessionStore>) -> Self { pub fn new(store: Arc<SessionStore>, known_agents: HashSet<String>) -> Self {
Self { store } Self {
store,
known_agents: Arc::new(known_agents),
}
} }
} }
@ -47,7 +52,7 @@ impl Tool for SchedulerManageTool {
}, },
"kind": { "kind": {
"type": "string", "type": "string",
"enum": ["internal_event", "outbound_message"] "enum": ["internal_event", "outbound_message", "agent_task"]
}, },
"schedule": { "schedule": {
"type": "object", "type": "object",
@ -57,7 +62,8 @@ impl Tool for SchedulerManageTool {
"type": "object" "type": "object"
}, },
"payload": { "payload": {
"type": "object" "type": "object",
"description": "Job payload. agent_task supports prompt, agent, fresh_session, system_prompt, sender_id, metadata. outbound_message expects content. internal_event expects event."
}, },
"max_runs": { "max_runs": {
"type": ["integer", "null"] "type": ["integer", "null"]
@ -90,7 +96,7 @@ impl Tool for SchedulerManageTool {
} }
} }
"put" => { "put" => {
let input = build_upsert(&args)?; let input = build_upsert(&args, &self.known_agents)?;
let record = self.store.upsert_scheduler_job(&input)?; let record = self.store.upsert_scheduler_job(&input)?;
record_to_json(&record) record_to_json(&record)
} }
@ -139,7 +145,7 @@ impl Tool for SchedulerManageTool {
} }
} }
fn build_upsert(args: &serde_json::Value) -> anyhow::Result<SchedulerJobUpsert> { fn build_upsert(args: &serde_json::Value, known_agents: &HashSet<String>) -> anyhow::Result<SchedulerJobUpsert> {
let id = require_str(args, "id")?.to_string(); let id = require_str(args, "id")?.to_string();
let kind = require_str(args, "kind")?.to_string(); let kind = require_str(args, "kind")?.to_string();
let schedule_value = args let schedule_value = args
@ -157,14 +163,24 @@ fn build_upsert(args: &serde_json::Value) -> anyhow::Result<SchedulerJobUpsert>
_ => (0, 0), _ => (0, 0),
}; };
let payload = args.get("payload").cloned().unwrap_or_else(|| json!({}));
let target = args.get("target").cloned().unwrap_or_else(|| json!({}));
if kind == "agent_task" {
validate_agent_task_payload(&payload, known_agents)?;
validate_target_fields(&target, &["channel", "chat_id"], "agent_task")?;
} else if kind == "outbound_message" {
validate_outbound_message_payload(&payload)?;
validate_target_fields(&target, &["channel", "chat_id"], "outbound_message")?;
}
Ok(SchedulerJobUpsert { Ok(SchedulerJobUpsert {
id, id,
kind, kind,
schedule: serde_json::to_value(schedule)?, schedule: serde_json::to_value(schedule)?,
interval_secs, interval_secs,
startup_delay_secs, startup_delay_secs,
target: args.get("target").cloned().unwrap_or_else(|| json!({})), target,
payload: args.get("payload").cloned().unwrap_or_else(|| json!({})), payload,
enabled: args.get("enabled").and_then(|value| value.as_bool()).unwrap_or(true), enabled: args.get("enabled").and_then(|value| value.as_bool()).unwrap_or(true),
state: if args.get("enabled").and_then(|value| value.as_bool()).unwrap_or(true) { state: if args.get("enabled").and_then(|value| value.as_bool()).unwrap_or(true) {
SchedulerJobState::Scheduled SchedulerJobState::Scheduled
@ -182,6 +198,57 @@ fn build_upsert(args: &serde_json::Value) -> anyhow::Result<SchedulerJobUpsert>
}) })
} }
fn validate_agent_task_payload(payload: &serde_json::Value, known_agents: &HashSet<String>) -> anyhow::Result<()> {
let Some(prompt) = payload.get("prompt").and_then(|value| value.as_str()) else {
anyhow::bail!("agent_task payload.prompt is required and must be a string")
};
if prompt.trim().is_empty() {
anyhow::bail!("agent_task payload.prompt cannot be empty")
}
let Some(agent_name) = payload.get("agent").and_then(|value| value.as_str()) else {
return Ok(());
};
let normalized = agent_name.trim();
if normalized.is_empty() || normalized == "default" || known_agents.contains(normalized) {
return Ok(());
}
anyhow::bail!("Unknown agent '{}' for agent_task payload.agent", normalized)
}
fn validate_outbound_message_payload(payload: &serde_json::Value) -> anyhow::Result<()> {
let Some(content) = payload.get("content").and_then(|value| value.as_str()) else {
anyhow::bail!("outbound_message payload.content is required and must be a string")
};
if content.trim().is_empty() {
anyhow::bail!("outbound_message payload.content cannot be empty")
}
Ok(())
}
fn validate_target_fields(
target: &serde_json::Value,
required_fields: &[&str],
kind: &str,
) -> anyhow::Result<()> {
let object = target
.as_object()
.ok_or_else(|| anyhow::anyhow!("{} target must be an object", kind))?;
for field in required_fields {
let Some(value) = object.get(*field).and_then(|value| value.as_str()) else {
anyhow::bail!("{} target.{} is required and must be a string", kind, field)
};
if value.trim().is_empty() {
anyhow::bail!("{} target.{} cannot be empty", kind, field)
}
}
Ok(())
}
fn record_to_json(record: &SchedulerJobRecord) -> serde_json::Value { fn record_to_json(record: &SchedulerJobRecord) -> serde_json::Value {
json!({ json!({
"id": record.id, "id": record.id,
@ -254,7 +321,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_scheduler_manage_put_and_get() { async fn test_scheduler_manage_put_and_get() {
let store = Arc::new(SessionStore::in_memory().unwrap()); let store = Arc::new(SessionStore::in_memory().unwrap());
let tool = SchedulerManageTool::new(store); let tool = SchedulerManageTool::new(store, HashSet::new());
let put_result = tool let put_result = tool
.execute(json!({ .execute(json!({
@ -288,4 +355,89 @@ mod tests {
assert!(get_result.output.contains("heartbeat")); assert!(get_result.output.contains("heartbeat"));
assert!(get_result.output.contains("outbound_message")); assert!(get_result.output.contains("outbound_message"));
} }
#[tokio::test]
async fn test_scheduler_manage_put_agent_task() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let tool = SchedulerManageTool::new(store, HashSet::from(["planner".to_string()]));
let put_result = tool
.execute(json!({
"action": "put",
"id": "agent.daily_summary",
"kind": "agent_task",
"schedule": {
"type": "cron",
"expression": "0 9 * * *"
},
"target": {
"channel": "feishu",
"chat_id": "oc_demo"
},
"payload": {
"prompt": "请总结今天待办",
"agent": "planner"
}
}))
.await
.unwrap();
assert!(put_result.success);
assert!(put_result.output.contains("agent_task"));
}
#[tokio::test]
async fn test_scheduler_manage_rejects_outbound_message_without_target() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let tool = SchedulerManageTool::new(store, HashSet::new());
let put_result = tool
.execute(json!({
"action": "put",
"id": "massage_reminder",
"kind": "outbound_message",
"schedule": {
"type": "interval",
"seconds": 60
},
"payload": {
"content": "⏰ 时间到了!该去按摩了!💆"
}
}))
.await;
assert!(put_result.is_err());
let error = put_result.err().unwrap().to_string();
assert!(error.contains("outbound_message target.channel is required"));
}
#[tokio::test]
async fn test_scheduler_manage_rejects_unknown_agent_task_agent() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let tool = SchedulerManageTool::new(store, HashSet::from(["planner".to_string()]));
let put_result = tool
.execute(json!({
"action": "put",
"id": "agent.daily_summary",
"kind": "agent_task",
"schedule": {
"type": "cron",
"expression": "0 9 * * *"
},
"target": {
"channel": "feishu",
"chat_id": "oc_demo"
},
"payload": {
"prompt": "请总结今天待办",
"agent": "missing-agent"
}
}))
.await;
assert!(put_result.is_err());
let error = put_result.err().unwrap().to_string();
assert!(error.contains("Unknown agent 'missing-agent'"));
}
} }

View File

@ -19,6 +19,7 @@ fn load_config() -> Option<LLMProviderConfig> {
base_url: openai_base_url, base_url: openai_base_url,
api_key: openai_api_key, api_key: openai_api_key,
extra_headers: HashMap::new(), extra_headers: HashMap::new(),
llm_timeout_secs: 120,
model_id: openai_model, model_id: openai_model,
temperature: Some(0.0), temperature: Some(0.0),
max_tokens: Some(100), max_tokens: Some(100),

View File

@ -19,6 +19,7 @@ fn load_openai_config() -> Option<LLMProviderConfig> {
base_url: openai_base_url, base_url: openai_base_url,
api_key: openai_api_key, api_key: openai_api_key,
extra_headers: HashMap::new(), extra_headers: HashMap::new(),
llm_timeout_secs: 120,
model_id: openai_model, model_id: openai_model,
temperature: Some(0.0), temperature: Some(0.0),
max_tokens: Some(100), max_tokens: Some(100),