feat(memory): 添加内置记忆维护作业,增强调度功能并支持有效作业合并

This commit is contained in:
ooodc 2026-04-23 14:16:22 +08:00
parent f3f369b329
commit 3d241544c5
4 changed files with 896 additions and 5 deletions

View File

@ -172,6 +172,8 @@ pub struct SchedulerConfig {
pub jobs: Vec<SchedulerJobConfig>, pub jobs: Vec<SchedulerJobConfig>,
} }
pub const BUILTIN_MEMORY_MAINTENANCE_JOB_ID: &str = "builtin.memory_maintenance_daily";
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)] #[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")] #[serde(rename_all = "snake_case")]
pub enum SchedulerMisfirePolicy { pub enum SchedulerMisfirePolicy {
@ -256,6 +258,41 @@ impl SchedulerJobConfig {
} }
} }
impl SchedulerConfig {
pub fn builtin_jobs() -> Vec<SchedulerJobConfig> {
vec![SchedulerJobConfig {
id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(),
enabled: true,
kind: SchedulerJobKind::InternalEvent,
schedule: Some(SchedulerSchedule::Cron {
expression: "0 19 * * *".to_string(),
}),
startup_delay_secs: 0,
interval_secs: 0,
target: SchedulerJobTarget::default(),
payload: serde_json::json!({
"event": "memory_maintenance",
"time_zone": "Asia/Shanghai",
"local_time": "03:00"
}),
}]
}
pub fn effective_jobs(&self) -> Vec<SchedulerJobConfig> {
let mut jobs = Self::builtin_jobs();
for configured in &self.jobs {
if let Some(existing) = jobs.iter_mut().find(|job| job.id == configured.id) {
*existing = configured.clone();
} else {
jobs.push(configured.clone());
}
}
jobs
}
}
impl SchedulerSchedule { impl SchedulerSchedule {
pub fn validate(&self, job_id: &str) -> Result<(), ConfigError> { pub fn validate(&self, job_id: &str) -> Result<(), ConfigError> {
match self { match self {
@ -738,6 +775,60 @@ mod tests {
assert_eq!(config.scheduler.worker_queue_capacity, 64); assert_eq!(config.scheduler.worker_queue_capacity, 64);
assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::Skip); assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::Skip);
assert!(config.scheduler.jobs.is_empty()); assert!(config.scheduler.jobs.is_empty());
let effective_jobs = config.scheduler.effective_jobs();
assert_eq!(effective_jobs.len(), 1);
assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert_eq!(effective_jobs[0].kind, SchedulerJobKind::InternalEvent);
assert_eq!(
effective_jobs[0].resolved_schedule().unwrap(),
SchedulerSchedule::Cron {
expression: "0 19 * * *".to_string(),
}
);
}
#[test]
fn test_scheduler_effective_jobs_allows_builtin_override() {
let mut scheduler = SchedulerConfig::default();
scheduler.jobs.push(SchedulerJobConfig {
id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(),
enabled: false,
kind: SchedulerJobKind::InternalEvent,
schedule: Some(SchedulerSchedule::Cron {
expression: "15 2 * * *".to_string(),
}),
startup_delay_secs: 0,
interval_secs: 0,
target: SchedulerJobTarget::default(),
payload: serde_json::json!({
"event": "memory_maintenance",
"time_zone": "UTC",
"local_time": "02:15"
}),
});
scheduler.jobs.push(SchedulerJobConfig {
id: "custom.reminder".to_string(),
enabled: true,
kind: SchedulerJobKind::InternalEvent,
schedule: Some(SchedulerSchedule::Delay { seconds: 30 }),
startup_delay_secs: 0,
interval_secs: 0,
target: SchedulerJobTarget::default(),
payload: serde_json::json!({"event": "custom"}),
});
let effective_jobs = scheduler.effective_jobs();
assert_eq!(effective_jobs.len(), 2);
assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID);
assert!(!effective_jobs[0].enabled);
assert_eq!(
effective_jobs[0].resolved_schedule().unwrap(),
SchedulerSchedule::Cron {
expression: "15 2 * * *".to_string(),
}
);
assert_eq!(effective_jobs[1].id, "custom.reminder");
} }
#[test] #[test]

View File

@ -1,13 +1,16 @@
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::fs; use std::fs;
use std::path::Path;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::{Mutex, mpsc}; use tokio::sync::{Mutex, mpsc};
use uuid::Uuid; use uuid::Uuid;
use crate::bus::{ChatMessage, MessageBus, OutboundMessage}; use crate::bus::{ChatMessage, MessageBus, OutboundMessage};
use crate::config::LLMProviderConfig; use crate::config::LLMProviderConfig;
use crate::agent::{AgentLoop, AgentError, ContextCompressor, EmittedMessageHandler}; use crate::agent::{AgentLoop, AgentError, ContextCompressor, EmittedMessageHandler};
use crate::providers::{create_provider, ChatCompletionRequest, Message};
use crate::protocol::WsOutbound; use crate::protocol::WsOutbound;
use crate::skills::SkillRuntime; use crate::skills::SkillRuntime;
use crate::storage::{SessionRecord, SessionStore, persistent_session_id}; use crate::storage::{SessionRecord, SessionStore, persistent_session_id};
@ -18,6 +21,299 @@ use crate::tools::{
}; };
const DEFAULT_AGENT_PROMPT: &str = "# PicoBot 代理配置\n\n## 身份\n- 你是 PicoBot一名务实、可靠的通用助理。\n- 你的目标是理解用户当下的真实需求,并给出清晰、可执行的帮助。\n\n## 工作方式\n- 优先理解意图,再给出回应或行动。\n- 保持简洁、准确、自然,不故作热情,也不空泛铺陈。\n- 能直接验证的内容尽量先验证,避免凭空猜测。\n- 当现有工具是完成任务的最直接方式时,优先使用工具。\n- 除非用户明确要求改变方向,否则保持用户原本目标不变。\n\n## 助理原则\n- 优先解决问题,而不是展示过程。\n- 输出要方便用户立即使用,结论尽量明确。\n- 对不确定的地方要直说,不把猜测包装成事实。\n- 复杂任务先收敛重点,简单任务直接给结果。\n- 避免不必要的重复、客套和冗长说明。\n\n## 回复规则\n- 除非用户另有要求,否则使用中文回复。\n- 默认短而清楚,按信息密度组织内容。\n- 如果任务涉及文件、命令、配置或下一步操作,优先给出最关键的那部分。\n- 如果存在限制、风险或前提条件,要直接说明。\n\n## 补充要求\n- 你是 PicoBot。\n- 回答应以帮助用户完成当前目标为中心。\n- 在信息不足时先补关键前提,在信息充分时直接执行。\n"; const DEFAULT_AGENT_PROMPT: &str = "# PicoBot 代理配置\n\n## 身份\n- 你是 PicoBot一名务实、可靠的通用助理。\n- 你的目标是理解用户当下的真实需求,并给出清晰、可执行的帮助。\n\n## 工作方式\n- 优先理解意图,再给出回应或行动。\n- 保持简洁、准确、自然,不故作热情,也不空泛铺陈。\n- 能直接验证的内容尽量先验证,避免凭空猜测。\n- 当现有工具是完成任务的最直接方式时,优先使用工具。\n- 除非用户明确要求改变方向,否则保持用户原本目标不变。\n\n## 助理原则\n- 优先解决问题,而不是展示过程。\n- 输出要方便用户立即使用,结论尽量明确。\n- 对不确定的地方要直说,不把猜测包装成事实。\n- 复杂任务先收敛重点,简单任务直接给结果。\n- 避免不必要的重复、客套和冗长说明。\n\n## 回复规则\n- 除非用户另有要求,否则使用中文回复。\n- 默认短而清楚,按信息密度组织内容。\n- 如果任务涉及文件、命令、配置或下一步操作,优先给出最关键的那部分。\n- 如果存在限制、风险或前提条件,要直接说明。\n\n## 补充要求\n- 你是 PicoBot。\n- 回答应以帮助用户完成当前目标为中心。\n- 在信息不足时先补关键前提,在信息充分时直接执行。\n";
const MANAGED_AGENT_MEMORY_BLOCK_START: &str = "<!-- PICOBOT_MANAGED_MEMORY:START -->";
const MANAGED_AGENT_MEMORY_BLOCK_END: &str = "<!-- PICOBOT_MANAGED_MEMORY:END -->";
const MANAGED_AGENT_MEMORY_TITLE: &str = "## 用户记忆摘要";
const MEMORY_MAINTENANCE_SYSTEM_PROMPT: &str = "你是 PicoBot 的后台记忆整理器。你必须根据输入的候选记忆做语义整理,并严格返回 JSON不要输出 Markdown 代码块,不要输出额外解释。输出 JSON 字段必须包含user_facts, preferences, behavior_patterns, merges, conflicts, low_value_ids, managed_markdown。user_facts、preferences、behavior_patterns 是字符串数组。merges 是对象数组,每个对象必须包含 source_ids、namespace、memory_key、content。conflicts 是对象数组,每个对象必须包含 source_ids、note。low_value_ids 是需要删除的候选记忆 id 数组。只能引用输入里出现过的候选 id。managed_markdown 必须是 Markdown 文本,且只保留稳定模式,不写一次性事件。";
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum MemoryMaintenanceCategory {
UserFacts,
Preferences,
BehaviorPatterns,
Other,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct MemoryMaintenanceCandidate {
id: String,
namespace: String,
key: String,
content: String,
}
#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct MemoryMaintenancePlan {
user_facts: Vec<MemoryMaintenanceCandidate>,
preferences: Vec<MemoryMaintenanceCandidate>,
behavior_patterns: Vec<MemoryMaintenanceCandidate>,
others: Vec<MemoryMaintenanceCandidate>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct MemoryMaintenanceMerge {
pub(crate) source_ids: Vec<String>,
pub(crate) namespace: String,
pub(crate) memory_key: String,
pub(crate) content: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct MemoryMaintenanceConflict {
pub(crate) source_ids: Vec<String>,
pub(crate) note: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub(crate) struct MemoryMaintenanceModelOutput {
pub(crate) user_facts: Vec<String>,
pub(crate) preferences: Vec<String>,
pub(crate) behavior_patterns: Vec<String>,
pub(crate) merges: Vec<MemoryMaintenanceMerge>,
pub(crate) conflicts: Vec<MemoryMaintenanceConflict>,
pub(crate) low_value_ids: Vec<String>,
pub(crate) managed_markdown: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct MemoryMaintenanceScopeResult {
pub(crate) scope_key: String,
pub(crate) output: MemoryMaintenanceModelOutput,
}
fn build_memory_maintenance_plan(memories: &[crate::storage::MemoryRecord]) -> MemoryMaintenancePlan {
let mut plan = MemoryMaintenancePlan::default();
let mut seen = HashSet::new();
for memory in memories {
let normalized_content = memory.content.trim();
if normalized_content.is_empty() {
continue;
}
let dedupe_key = format!(
"{}\u{1f}{}\u{1f}{}",
memory.namespace.trim().to_ascii_lowercase(),
memory.memory_key.trim().to_ascii_lowercase(),
normalized_content
);
if !seen.insert(dedupe_key) {
continue;
}
let candidate = MemoryMaintenanceCandidate {
id: memory.id.clone(),
namespace: memory.namespace.clone(),
key: memory.memory_key.clone(),
content: normalized_content.to_string(),
};
match memory_maintenance_category(&memory.namespace) {
MemoryMaintenanceCategory::UserFacts => plan.user_facts.push(candidate),
MemoryMaintenanceCategory::Preferences => plan.preferences.push(candidate),
MemoryMaintenanceCategory::BehaviorPatterns => plan.behavior_patterns.push(candidate),
MemoryMaintenanceCategory::Other => plan.others.push(candidate),
}
}
plan
}
fn memory_maintenance_category(namespace: &str) -> MemoryMaintenanceCategory {
match namespace.trim().to_ascii_lowercase().as_str() {
"profile" | "facts" | "identity" => MemoryMaintenanceCategory::UserFacts,
"preferences" | "style" | "likes" => MemoryMaintenanceCategory::Preferences,
"patterns" | "behavior" | "habits" | "workflow" => MemoryMaintenanceCategory::BehaviorPatterns,
_ => MemoryMaintenanceCategory::Other,
}
}
fn render_managed_agent_memory_block(markdown_body: &str) -> String {
format!(
"{MANAGED_AGENT_MEMORY_BLOCK_START}\n{MANAGED_AGENT_MEMORY_TITLE}\n\n{}\n{MANAGED_AGENT_MEMORY_BLOCK_END}",
markdown_body.trim()
)
}
fn upsert_managed_agent_memory_block(existing: &str, markdown_body: &str) -> String {
let managed_block = render_managed_agent_memory_block(markdown_body);
if let (Some(start), Some(end)) = (
existing.find(MANAGED_AGENT_MEMORY_BLOCK_START),
existing.find(MANAGED_AGENT_MEMORY_BLOCK_END),
) {
let end = end + MANAGED_AGENT_MEMORY_BLOCK_END.len();
let mut updated = String::new();
updated.push_str(existing[..start].trim_end());
updated.push_str("\n\n");
updated.push_str(&managed_block);
updated.push_str("\n\n");
updated.push_str(existing[end..].trim_start());
return updated.trim().to_string() + "\n";
}
if let Some(reply_rules_index) = existing.find("## 回复规则") {
let mut updated = String::new();
updated.push_str(existing[..reply_rules_index].trim_end());
updated.push_str("\n\n");
updated.push_str(&managed_block);
updated.push_str("\n\n");
updated.push_str(existing[reply_rules_index..].trim_start());
return updated.trim().to_string() + "\n";
}
let mut updated = existing.trim_end().to_string();
if !updated.is_empty() {
updated.push_str("\n\n");
}
updated.push_str(&managed_block);
updated.push('\n');
updated
}
fn write_agent_prompt(path: &Path, content: &str) -> Result<(), AgentError> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)
.map_err(|err| AgentError::Other(format!("create agent prompt dir error: {}", err)))?;
}
let temp_path = path.with_extension("md.tmp");
fs::write(&temp_path, content)
.map_err(|err| AgentError::Other(format!("write agent prompt temp file error: {}", err)))?;
fs::rename(&temp_path, path)
.map_err(|err| AgentError::Other(format!("replace agent prompt file error: {}", err)))?;
Ok(())
}
fn strip_json_code_fence(content: &str) -> &str {
let trimmed = content.trim();
if let Some(rest) = trimmed.strip_prefix("```json") {
return rest.strip_suffix("```").map(str::trim).unwrap_or(trimmed);
}
if let Some(rest) = trimmed.strip_prefix("```") {
return rest.strip_suffix("```").map(str::trim).unwrap_or(trimmed);
}
trimmed
}
fn combine_managed_memory_markdown(chunks: &[String]) -> String {
let normalized_chunks = chunks
.iter()
.map(|chunk| chunk.trim())
.filter(|chunk| !chunk.is_empty())
.collect::<Vec<_>>();
let mut combined = Vec::new();
for (index, chunk) in normalized_chunks.iter().enumerate() {
let chunk_lines = chunk
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.collect::<HashSet<_>>();
let is_subset_of_other = normalized_chunks.iter().enumerate().any(|(other_index, other)| {
if index == other_index {
return false;
}
let other_lines = other
.lines()
.map(str::trim)
.filter(|line| !line.is_empty())
.collect::<HashSet<_>>();
chunk_lines.len() < other_lines.len() && chunk_lines.is_subset(&other_lines)
});
if !is_subset_of_other && !combined.iter().any(|existing: &String| existing == chunk) {
combined.push((*chunk).to_string());
}
}
combined.join("\n\n")
}
fn apply_memory_maintenance_output(
store: &SessionStore,
scope_key: &str,
plan: &MemoryMaintenancePlan,
output: &MemoryMaintenanceModelOutput,
) -> Result<(), AgentError> {
let all_candidates = plan
.user_facts
.iter()
.chain(plan.preferences.iter())
.chain(plan.behavior_patterns.iter())
.chain(plan.others.iter())
.cloned()
.collect::<Vec<_>>();
let candidates_by_id = all_candidates
.iter()
.map(|candidate| (candidate.id.as_str(), candidate))
.collect::<HashMap<_, _>>();
let mut deleted_ids = HashSet::new();
for merge in &output.merges {
if merge.source_ids.is_empty() {
continue;
}
let source_candidates = merge
.source_ids
.iter()
.filter_map(|id| candidates_by_id.get(id.as_str()).copied())
.collect::<Vec<_>>();
if source_candidates.is_empty() {
continue;
}
let existing_target_id = source_candidates
.iter()
.find(|candidate| candidate.namespace == merge.namespace && candidate.key == merge.memory_key)
.map(|candidate| candidate.id.clone());
store
.put_memory(&crate::storage::MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: scope_key.to_string(),
namespace: merge.namespace.trim().to_string(),
memory_key: merge.memory_key.trim().to_string(),
content: merge.content.trim().to_string(),
source_type: "memory_maintenance".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
})
.map_err(|err| AgentError::Other(format!("upsert merged memory error: {}", err)))?;
for candidate in source_candidates {
if existing_target_id.as_ref().is_some_and(|target_id| target_id == &candidate.id) {
continue;
}
if deleted_ids.insert(candidate.id.clone()) {
store
.delete_memory("user", scope_key, &candidate.namespace, &candidate.key)
.map_err(|err| AgentError::Other(format!("delete merged source memory error: {}", err)))?;
}
}
}
for memory_id in &output.low_value_ids {
if let Some(candidate) = candidates_by_id.get(memory_id.as_str()) {
if deleted_ids.insert(candidate.id.clone()) {
store
.delete_memory("user", scope_key, &candidate.namespace, &candidate.key)
.map_err(|err| AgentError::Other(format!("delete low value memory error: {}", err)))?;
}
}
}
Ok(())
}
/// Session 按 channel 隔离,每个 channel 一个 Session /// Session 按 channel 隔离,每个 channel 一个 Session
/// History 按 chat_id 隔离,由 Session 统一管理 /// History 按 chat_id 隔离,由 Session 统一管理
@ -371,8 +667,7 @@ fn load_agent_prompt() -> Result<Option<String>, AgentError> {
} }
if !path.exists() { if !path.exists() {
fs::write(&path, DEFAULT_AGENT_PROMPT) write_agent_prompt(&path, DEFAULT_AGENT_PROMPT)?;
.map_err(|err| AgentError::Other(format!("create agent prompt file error: {}", err)))?;
} }
let content = fs::read_to_string(&path) let content = fs::read_to_string(&path)
@ -509,6 +804,128 @@ impl SessionManager {
self.skills.clone() self.skills.clone()
} }
pub(crate) fn build_memory_maintenance_plan_for_scope(
&self,
scope_key: &str,
) -> Result<Option<MemoryMaintenancePlan>, AgentError> {
let memories = self
.store
.list_memories_for_scope("user", scope_key)
.map_err(|err| AgentError::Other(format!("list memories for scope error: {}", err)))?;
if memories.is_empty() {
return Ok(None);
}
Ok(Some(build_memory_maintenance_plan(&memories)))
}
pub(crate) fn upsert_managed_agent_memory_summary(&self, markdown_body: &str) -> Result<(), AgentError> {
let path = agent_prompt_path()?;
let existing = if path.exists() {
fs::read_to_string(&path)
.map_err(|err| AgentError::Other(format!("read agent prompt file error: {}", err)))?
} else {
DEFAULT_AGENT_PROMPT.to_string()
};
let updated = upsert_managed_agent_memory_block(&existing, markdown_body);
write_agent_prompt(&path, &updated)
}
#[cfg_attr(not(test), allow(dead_code))]
pub(crate) async fn summarize_memory_maintenance_for_scope(
&self,
scope_key: &str,
) -> Result<Option<MemoryMaintenanceModelOutput>, AgentError> {
let Some(plan) = self.build_memory_maintenance_plan_for_scope(scope_key)? else {
return Ok(None);
};
self.summarize_memory_maintenance_plan(scope_key, &plan).await.map(Some)
}
async fn summarize_memory_maintenance_plan(
&self,
scope_key: &str,
plan: &MemoryMaintenancePlan,
) -> Result<MemoryMaintenanceModelOutput, AgentError> {
let provider_config = self.provider_config_for_agent(None)?;
let provider = create_provider(provider_config)
.map_err(|err| AgentError::Other(format!("create maintenance provider error: {}", err)))?;
let request = ChatCompletionRequest {
messages: vec![
Message::system(MEMORY_MAINTENANCE_SYSTEM_PROMPT),
Message::user(
serde_json::to_string_pretty(&serde_json::json!({
"scope_key": scope_key,
"candidates": plan,
}))
.unwrap_or_else(|_| "{}".to_string()),
),
],
temperature: Some(0.0),
max_tokens: Some(1200),
tools: None,
};
let response = provider
.chat(request)
.await
.map_err(|err| AgentError::Other(format!("memory maintenance model error: {}", err)))?;
let output: MemoryMaintenanceModelOutput = serde_json::from_str(strip_json_code_fence(&response.content))
.map_err(|err| AgentError::Other(format!("memory maintenance JSON decode error: {}", err)))?;
Ok(output)
}
pub(crate) async fn run_memory_maintenance_for_scope(
&self,
scope_key: &str,
) -> Result<Option<MemoryMaintenanceModelOutput>, AgentError> {
let Some(plan) = self.build_memory_maintenance_plan_for_scope(scope_key)? else {
return Ok(None);
};
let output = self.summarize_memory_maintenance_plan(scope_key, &plan).await?;
apply_memory_maintenance_output(self.store.as_ref(), scope_key, &plan, &output)?;
Ok(Some(output))
}
pub(crate) async fn run_memory_maintenance_for_all_scopes(
&self,
) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> {
let scope_keys = self
.store
.list_memory_scope_keys("user")
.map_err(|err| AgentError::Other(format!("list memory scope keys error: {}", err)))?;
let mut results = Vec::new();
for scope_key in scope_keys {
let Some(output) = self.run_memory_maintenance_for_scope(&scope_key).await? else {
continue;
};
results.push(MemoryMaintenanceScopeResult { scope_key, output });
}
let combined_markdown = combine_managed_memory_markdown(
&results
.iter()
.map(|result| result.output.managed_markdown.clone())
.collect::<Vec<_>>(),
);
if !combined_markdown.is_empty() {
self.upsert_managed_agent_memory_summary(&combined_markdown)?;
}
Ok(results)
}
pub fn provider_config_for_agent(&self, agent_name: Option<&str>) -> Result<LLMProviderConfig, AgentError> { pub fn provider_config_for_agent(&self, agent_name: Option<&str>) -> Result<LLMProviderConfig, AgentError> {
select_provider_config(&self.provider_config, &self.provider_configs, agent_name) select_provider_config(&self.provider_config, &self.provider_configs, agent_name)
} }
@ -907,6 +1324,7 @@ mod tests {
use super::*; use super::*;
use axum::{Json, Router, routing::post}; use axum::{Json, Router, routing::post};
use crate::bus::MessageBus; use crate::bus::MessageBus;
use crate::storage::MemoryRecord;
use serde_json::{Value, json}; use serde_json::{Value, json};
use std::collections::HashMap; use std::collections::HashMap;
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -1009,6 +1427,11 @@ mod tests {
.get("model") .get("model")
.and_then(|value| value.as_str()) .and_then(|value| value.as_str())
.unwrap_or("unknown-model"); .unwrap_or("unknown-model");
let content = body
.get("mock_response_content")
.and_then(|value| value.as_str())
.map(ToString::to_string)
.unwrap_or_else(|| format!("reply from {}", model));
Json(json!({ Json(json!({
"id": "mock-response", "id": "mock-response",
@ -1016,7 +1439,7 @@ mod tests {
"choices": [ "choices": [
{ {
"message": { "message": {
"content": format!("reply from {}", model), "content": content,
"tool_calls": [] "tool_calls": []
} }
} }
@ -1158,6 +1581,166 @@ mod tests {
assert!(default_outbound[0].content.contains("default-model")); assert!(default_outbound[0].content.contains("default-model"));
} }
#[tokio::test]
async fn test_summarize_memory_maintenance_for_scope_uses_model_output() {
let base_url = start_mock_openai_server().await;
let mock_response_content = serde_json::to_string(&json!({
"user_facts": ["用户在做AI产品"],
"preferences": ["偏好简洁表达"],
"behavior_patterns": ["习惯先问方案再要代码"],
"merges": [],
"conflicts": [],
"low_value_ids": [],
"managed_markdown": "### 用户事实\n- 用户在做AI产品\n\n### 用户偏好\n- 偏好简洁表达\n\n### 行为模式\n- 习惯先问方案再要代码"
}))
.unwrap();
let provider_config = LLMProviderConfig {
provider_type: "openai".to_string(),
name: "maintenance-provider".to_string(),
base_url,
api_key: "test-key".to_string(),
extra_headers: HashMap::new(),
model_id: "maintenance-model".to_string(),
temperature: Some(0.0),
max_tokens: Some(256),
model_extra: HashMap::from([(
"mock_response_content".to_string(),
json!(mock_response_content),
)]),
max_tool_iterations: 1,
token_limit: 4096,
llm_timeout_secs: 30,
};
let session_manager = SessionManager::new(
4,
100,
false,
provider_config.clone(),
HashMap::from([("default".to_string(), provider_config)]),
Arc::new(SkillRuntime::default()),
)
.unwrap();
session_manager
.store()
.put_memory(&crate::storage::MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "profile".to_string(),
memory_key: "work".to_string(),
content: "用户在做AI产品".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
})
.unwrap();
let output = session_manager
.summarize_memory_maintenance_for_scope("feishu:user-1")
.await
.unwrap()
.unwrap();
assert_eq!(output.user_facts, vec!["用户在做AI产品".to_string()]);
assert_eq!(output.preferences, vec!["偏好简洁表达".to_string()]);
assert_eq!(output.behavior_patterns, vec!["习惯先问方案再要代码".to_string()]);
assert!(output.managed_markdown.contains("### 用户事实"));
}
#[test]
fn test_apply_memory_maintenance_output_merges_and_deletes_low_value_records() {
let store = SessionStore::in_memory().unwrap();
let scope_key = "feishu:user-1";
let work = store
.put_memory(&crate::storage::MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: scope_key.to_string(),
namespace: "profile".to_string(),
memory_key: "work_short".to_string(),
content: "用户在做AI产品".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
})
.unwrap();
let role = store
.put_memory(&crate::storage::MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: scope_key.to_string(),
namespace: "profile".to_string(),
memory_key: "work_detail".to_string(),
content: "用户主要在做AI产品设计和实现".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
})
.unwrap();
let noise = store
.put_memory(&crate::storage::MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: scope_key.to_string(),
namespace: "notes".to_string(),
memory_key: "temporary".to_string(),
content: "今天临时提到过一个无后续的小细节".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
})
.unwrap();
let plan = build_memory_maintenance_plan(&store.list_memories_for_scope("user", scope_key).unwrap());
let output = MemoryMaintenanceModelOutput {
user_facts: vec!["用户在做AI产品".to_string()],
preferences: Vec::new(),
behavior_patterns: Vec::new(),
merges: vec![MemoryMaintenanceMerge {
source_ids: vec![work.id.clone(), role.id.clone()],
namespace: "profile".to_string(),
memory_key: "work".to_string(),
content: "用户主要在做AI产品设计与实现".to_string(),
}],
conflicts: Vec::new(),
low_value_ids: vec![noise.id.clone()],
managed_markdown: "### 用户事实\n- 用户在做AI产品".to_string(),
};
apply_memory_maintenance_output(&store, scope_key, &plan, &output).unwrap();
let all_memories = store.list_memories_for_scope("user", scope_key).unwrap();
assert_eq!(all_memories.len(), 1);
assert_eq!(all_memories[0].namespace, "profile");
assert_eq!(all_memories[0].memory_key, "work");
assert_eq!(all_memories[0].content, "用户主要在做AI产品设计与实现");
}
#[test]
fn test_combine_managed_memory_markdown_prefers_richer_summary_over_subset() {
let combined = combine_managed_memory_markdown(&[
"### 用户事实\n- 用户在做AI产品\n\n### 用户偏好\n- 偏好简洁表达".to_string(),
"- 用户在做AI产品".to_string(),
"### 用户事实\n- 用户名为区德成昵称DC。".to_string(),
]);
assert!(combined.contains("### 用户事实\n- 用户在做AI产品\n\n### 用户偏好\n- 偏好简洁表达"));
assert!(combined.contains("### 用户事实\n- 用户名为区德成昵称DC。"));
assert_eq!(combined.matches("- 用户在做AI产品").count(), 1);
}
#[test] #[test]
fn test_should_display_message_to_user_hides_completed_tool_results_by_default() { fn test_should_display_message_to_user_hides_completed_tool_results_by_default() {
let completed = ChatMessage::tool("call-1", "calculator", "2"); let completed = ChatMessage::tool("call-1", "calculator", "2");
@ -1387,4 +1970,99 @@ mod tests {
assert_eq!(history[0].role, "system"); assert_eq!(history[0].role, "system");
} }
#[test]
fn test_build_memory_maintenance_plan_deduplicates_and_categorizes() {
let memories = vec![
MemoryRecord {
id: "1".to_string(),
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "profile".to_string(),
memory_key: "work".to_string(),
content: "用户在做AI产品".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
created_at: 1,
updated_at: 1,
},
MemoryRecord {
id: "2".to_string(),
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "profile".to_string(),
memory_key: "work".to_string(),
content: "用户在做AI产品".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
created_at: 2,
updated_at: 2,
},
MemoryRecord {
id: "3".to_string(),
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "preferences".to_string(),
memory_key: "style".to_string(),
content: "偏好简洁表达".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
created_at: 3,
updated_at: 3,
},
MemoryRecord {
id: "4".to_string(),
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "patterns".to_string(),
memory_key: "workflow".to_string(),
content: "习惯先问方案再要代码".to_string(),
source_type: "message".to_string(),
source_session_id: None,
source_message_id: None,
source_message_seq: None,
source_channel_name: None,
source_chat_id: None,
created_at: 4,
updated_at: 4,
},
];
let plan = build_memory_maintenance_plan(&memories);
assert_eq!(plan.user_facts.len(), 1);
assert_eq!(plan.preferences.len(), 1);
assert_eq!(plan.behavior_patterns.len(), 1);
assert!(plan.others.is_empty());
assert_eq!(plan.user_facts[0].content, "用户在做AI产品");
assert_eq!(plan.preferences[0].content, "偏好简洁表达");
assert_eq!(plan.behavior_patterns[0].content, "习惯先问方案再要代码");
}
#[test]
fn test_upsert_managed_agent_memory_block_inserts_before_reply_rules() {
let original = "# PicoBot 代理配置\n\n## 身份\n- 你是 PicoBot。\n\n## 回复规则\n- 使用中文回复。\n";
let updated = upsert_managed_agent_memory_block(
original,
"### 用户事实\n- 用户在做AI产品\n\n### 用户偏好\n- 偏好简洁表达",
);
let managed_pos = updated.find(MANAGED_AGENT_MEMORY_BLOCK_START).unwrap();
let reply_rules_pos = updated.find("## 回复规则").unwrap();
assert!(managed_pos < reply_rules_pos);
assert!(updated.contains(MANAGED_AGENT_MEMORY_TITLE));
assert!(updated.contains("用户在做AI产品"));
assert!(updated.contains("偏好简洁表达"));
}
} }

View File

@ -72,8 +72,8 @@ impl Scheduler {
fn sync_config_jobs(&self) -> anyhow::Result<()> { fn sync_config_jobs(&self) -> anyhow::Result<()> {
let now = Utc::now(); let now = Utc::now();
for job in &self.config.jobs { for job in self.config.effective_jobs() {
let runtime = RuntimeJob::from_config(job, now, self.config.misfire_policy)?; let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy)?;
self.store.upsert_scheduler_job(&runtime.to_upsert())?; self.store.upsert_scheduler_job(&runtime.to_upsert())?;
} }
Ok(()) Ok(())
@ -484,6 +484,24 @@ async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJ
tracing::info!(job_id = %job.id, removed, "Scheduler session cleanup completed"); tracing::info!(job_id = %job.id, removed, "Scheduler session cleanup completed");
Ok(()) Ok(())
} }
"memory_maintenance" => {
let results = session_manager.run_memory_maintenance_for_all_scopes().await?;
for result in &results {
tracing::info!(
job_id = %job.id,
scope_key = %result.scope_key,
user_facts = result.output.user_facts.len(),
preferences = result.output.preferences.len(),
behavior_patterns = result.output.behavior_patterns.len(),
merges = result.output.merges.len(),
conflicts = result.output.conflicts.len(),
low_value = result.output.low_value_ids.len(),
"Scheduler completed memory maintenance model run"
);
}
tracing::info!(job_id = %job.id, scope_count = results.len(), "Scheduler memory maintenance triggered");
Ok(())
}
other => anyhow::bail!("unsupported internal scheduler event: {}", other), other => anyhow::bail!("unsupported internal scheduler event: {}", other),
} }
} }

View File

@ -813,6 +813,50 @@ impl SessionStore {
Ok(memories) Ok(memories)
} }
pub fn list_memory_scope_keys(&self, scope_kind: &str) -> Result<Vec<String>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT DISTINCT scope_key
FROM memories
WHERE scope_kind = ?1
ORDER BY scope_key ASC
",
)?;
let rows = stmt.query_map(params![scope_kind], |row| row.get::<_, String>(0))?;
let mut scope_keys = Vec::new();
for row in rows {
scope_keys.push(row?);
}
Ok(scope_keys)
}
pub fn list_memories_for_scope(
&self,
scope_kind: &str,
scope_key: &str,
) -> Result<Vec<MemoryRecord>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let mut stmt = conn.prepare(
"
SELECT id, scope_kind, scope_key, namespace, memory_key, content,
source_type, source_session_id, source_message_id, source_message_seq,
source_channel_name, source_chat_id, created_at, updated_at
FROM memories
WHERE scope_kind = ?1 AND scope_key = ?2
ORDER BY updated_at DESC, namespace ASC, memory_key ASC
",
)?;
let rows = stmt.query_map(params![scope_kind, scope_key], map_memory_record)?;
let mut memories = Vec::new();
for row in rows {
memories.push(row?);
}
Ok(memories)
}
pub fn update_memory( pub fn update_memory(
&self, &self,
input: &MemoryUpsert, input: &MemoryUpsert,
@ -1911,6 +1955,66 @@ mod tests {
assert!(hits.iter().any(|memory| memory.memory_key == "quality")); assert!(hits.iter().any(|memory| memory.memory_key == "quality"));
} }
#[test]
fn test_memory_scope_listing_and_full_scope_read() {
let store = SessionStore::in_memory().unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-2".to_string(),
namespace: "preferences".to_string(),
memory_key: "style".to_string(),
content: "偏好简洁表达".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-2".to_string()),
source_message_id: Some("msg-2".to_string()),
source_message_seq: Some(2),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-2".to_string()),
})
.unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "profile".to_string(),
memory_key: "work".to_string(),
content: "用户在做AI产品".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-1".to_string()),
source_message_id: Some("msg-1".to_string()),
source_message_seq: Some(1),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-1".to_string()),
})
.unwrap();
store
.put_memory(&MemoryUpsert {
scope_kind: "user".to_string(),
scope_key: "feishu:user-1".to_string(),
namespace: "patterns".to_string(),
memory_key: "workflow".to_string(),
content: "习惯先问方案再要代码".to_string(),
source_type: "message".to_string(),
source_session_id: Some("feishu:chat-1".to_string()),
source_message_id: Some("msg-3".to_string()),
source_message_seq: Some(3),
source_channel_name: Some("feishu".to_string()),
source_chat_id: Some("chat-1".to_string()),
})
.unwrap();
let scope_keys = store.list_memory_scope_keys("user").unwrap();
assert_eq!(scope_keys, vec!["feishu:user-1".to_string(), "feishu:user-2".to_string()]);
let full_scope = store.list_memories_for_scope("user", "feishu:user-1").unwrap();
assert_eq!(full_scope.len(), 2);
assert!(full_scope.iter().all(|memory| memory.scope_key == "feishu:user-1"));
assert!(full_scope.iter().any(|memory| memory.memory_key == "work"));
assert!(full_scope.iter().any(|memory| memory.memory_key == "workflow"));
}
#[test] #[test]
fn test_scheduler_job_roundtrip_and_runtime_update() { fn test_scheduler_job_roundtrip_and_runtime_update() {
let store = SessionStore::in_memory().unwrap(); let store = SessionStore::in_memory().unwrap();