From 3d241544c5aba85daeb6609c1809dd7ea5df4282 Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Thu, 23 Apr 2026 14:16:22 +0800 Subject: [PATCH] =?UTF-8?q?feat(memory):=20=E6=B7=BB=E5=8A=A0=E5=86=85?= =?UTF-8?q?=E7=BD=AE=E8=AE=B0=E5=BF=86=E7=BB=B4=E6=8A=A4=E4=BD=9C=E4=B8=9A?= =?UTF-8?q?=EF=BC=8C=E5=A2=9E=E5=BC=BA=E8=B0=83=E5=BA=A6=E5=8A=9F=E8=83=BD?= =?UTF-8?q?=E5=B9=B6=E6=94=AF=E6=8C=81=E6=9C=89=E6=95=88=E4=BD=9C=E4=B8=9A?= =?UTF-8?q?=E5=90=88=E5=B9=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config/mod.rs | 91 ++++++ src/gateway/session.rs | 684 ++++++++++++++++++++++++++++++++++++++++- src/scheduler/mod.rs | 22 +- src/storage/mod.rs | 104 +++++++ 4 files changed, 896 insertions(+), 5 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index aaed652..14c84db 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -172,6 +172,8 @@ pub struct SchedulerConfig { pub jobs: Vec, } +pub const BUILTIN_MEMORY_MAINTENANCE_JOB_ID: &str = "builtin.memory_maintenance_daily"; + #[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)] #[serde(rename_all = "snake_case")] pub enum SchedulerMisfirePolicy { @@ -256,6 +258,41 @@ impl SchedulerJobConfig { } } +impl SchedulerConfig { + pub fn builtin_jobs() -> Vec { + vec![SchedulerJobConfig { + id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(), + enabled: true, + kind: SchedulerJobKind::InternalEvent, + schedule: Some(SchedulerSchedule::Cron { + expression: "0 19 * * *".to_string(), + }), + startup_delay_secs: 0, + interval_secs: 0, + target: SchedulerJobTarget::default(), + payload: serde_json::json!({ + "event": "memory_maintenance", + "time_zone": "Asia/Shanghai", + "local_time": "03:00" + }), + }] + } + + pub fn effective_jobs(&self) -> Vec { + let mut jobs = Self::builtin_jobs(); + + for configured in &self.jobs { + if let Some(existing) = jobs.iter_mut().find(|job| job.id == configured.id) { + *existing = configured.clone(); + } else { + jobs.push(configured.clone()); + } + } + + jobs + } +} + impl SchedulerSchedule { pub fn validate(&self, job_id: &str) -> Result<(), ConfigError> { match self { @@ -738,6 +775,60 @@ mod tests { assert_eq!(config.scheduler.worker_queue_capacity, 64); assert_eq!(config.scheduler.misfire_policy, SchedulerMisfirePolicy::Skip); assert!(config.scheduler.jobs.is_empty()); + + let effective_jobs = config.scheduler.effective_jobs(); + assert_eq!(effective_jobs.len(), 1); + assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID); + assert_eq!(effective_jobs[0].kind, SchedulerJobKind::InternalEvent); + assert_eq!( + effective_jobs[0].resolved_schedule().unwrap(), + SchedulerSchedule::Cron { + expression: "0 19 * * *".to_string(), + } + ); + } + + #[test] + fn test_scheduler_effective_jobs_allows_builtin_override() { + let mut scheduler = SchedulerConfig::default(); + scheduler.jobs.push(SchedulerJobConfig { + id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(), + enabled: false, + kind: SchedulerJobKind::InternalEvent, + schedule: Some(SchedulerSchedule::Cron { + expression: "15 2 * * *".to_string(), + }), + startup_delay_secs: 0, + interval_secs: 0, + target: SchedulerJobTarget::default(), + payload: serde_json::json!({ + "event": "memory_maintenance", + "time_zone": "UTC", + "local_time": "02:15" + }), + }); + scheduler.jobs.push(SchedulerJobConfig { + id: "custom.reminder".to_string(), + enabled: true, + kind: SchedulerJobKind::InternalEvent, + schedule: Some(SchedulerSchedule::Delay { seconds: 30 }), + startup_delay_secs: 0, + interval_secs: 0, + target: SchedulerJobTarget::default(), + payload: serde_json::json!({"event": "custom"}), + }); + + let effective_jobs = scheduler.effective_jobs(); + assert_eq!(effective_jobs.len(), 2); + assert_eq!(effective_jobs[0].id, BUILTIN_MEMORY_MAINTENANCE_JOB_ID); + assert!(!effective_jobs[0].enabled); + assert_eq!( + effective_jobs[0].resolved_schedule().unwrap(), + SchedulerSchedule::Cron { + expression: "15 2 * * *".to_string(), + } + ); + assert_eq!(effective_jobs[1].id, "custom.reminder"); } #[test] diff --git a/src/gateway/session.rs b/src/gateway/session.rs index a7f009a..fe3cd23 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -1,13 +1,16 @@ use std::collections::{HashMap, HashSet}; use std::fs; +use std::path::Path; use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; +use serde::{Deserialize, Serialize}; use tokio::sync::{Mutex, mpsc}; use uuid::Uuid; use crate::bus::{ChatMessage, MessageBus, OutboundMessage}; use crate::config::LLMProviderConfig; use crate::agent::{AgentLoop, AgentError, ContextCompressor, EmittedMessageHandler}; +use crate::providers::{create_provider, ChatCompletionRequest, Message}; use crate::protocol::WsOutbound; use crate::skills::SkillRuntime; use crate::storage::{SessionRecord, SessionStore, persistent_session_id}; @@ -18,6 +21,299 @@ use crate::tools::{ }; const DEFAULT_AGENT_PROMPT: &str = "# PicoBot 代理配置\n\n## 身份\n- 你是 PicoBot,一名务实、可靠的通用助理。\n- 你的目标是理解用户当下的真实需求,并给出清晰、可执行的帮助。\n\n## 工作方式\n- 优先理解意图,再给出回应或行动。\n- 保持简洁、准确、自然,不故作热情,也不空泛铺陈。\n- 能直接验证的内容尽量先验证,避免凭空猜测。\n- 当现有工具是完成任务的最直接方式时,优先使用工具。\n- 除非用户明确要求改变方向,否则保持用户原本目标不变。\n\n## 助理原则\n- 优先解决问题,而不是展示过程。\n- 输出要方便用户立即使用,结论尽量明确。\n- 对不确定的地方要直说,不把猜测包装成事实。\n- 复杂任务先收敛重点,简单任务直接给结果。\n- 避免不必要的重复、客套和冗长说明。\n\n## 回复规则\n- 除非用户另有要求,否则使用中文回复。\n- 默认短而清楚,按信息密度组织内容。\n- 如果任务涉及文件、命令、配置或下一步操作,优先给出最关键的那部分。\n- 如果存在限制、风险或前提条件,要直接说明。\n\n## 补充要求\n- 你是 PicoBot。\n- 回答应以帮助用户完成当前目标为中心。\n- 在信息不足时先补关键前提,在信息充分时直接执行。\n"; +const MANAGED_AGENT_MEMORY_BLOCK_START: &str = ""; +const MANAGED_AGENT_MEMORY_BLOCK_END: &str = ""; +const MANAGED_AGENT_MEMORY_TITLE: &str = "## 用户记忆摘要"; +const MEMORY_MAINTENANCE_SYSTEM_PROMPT: &str = "你是 PicoBot 的后台记忆整理器。你必须根据输入的候选记忆做语义整理,并严格返回 JSON,不要输出 Markdown 代码块,不要输出额外解释。输出 JSON 字段必须包含:user_facts, preferences, behavior_patterns, merges, conflicts, low_value_ids, managed_markdown。user_facts、preferences、behavior_patterns 是字符串数组。merges 是对象数组,每个对象必须包含 source_ids、namespace、memory_key、content。conflicts 是对象数组,每个对象必须包含 source_ids、note。low_value_ids 是需要删除的候选记忆 id 数组。只能引用输入里出现过的候选 id。managed_markdown 必须是 Markdown 文本,且只保留稳定模式,不写一次性事件。"; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +enum MemoryMaintenanceCategory { + UserFacts, + Preferences, + BehaviorPatterns, + Other, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) struct MemoryMaintenanceCandidate { + id: String, + namespace: String, + key: String, + content: String, +} + +#[derive(Debug, Clone, Default, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) struct MemoryMaintenancePlan { + user_facts: Vec, + preferences: Vec, + behavior_patterns: Vec, + others: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) struct MemoryMaintenanceMerge { + pub(crate) source_ids: Vec, + pub(crate) namespace: String, + pub(crate) memory_key: String, + pub(crate) content: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) struct MemoryMaintenanceConflict { + pub(crate) source_ids: Vec, + pub(crate) note: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) struct MemoryMaintenanceModelOutput { + pub(crate) user_facts: Vec, + pub(crate) preferences: Vec, + pub(crate) behavior_patterns: Vec, + pub(crate) merges: Vec, + pub(crate) conflicts: Vec, + pub(crate) low_value_ids: Vec, + pub(crate) managed_markdown: String, +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub(crate) struct MemoryMaintenanceScopeResult { + pub(crate) scope_key: String, + pub(crate) output: MemoryMaintenanceModelOutput, +} + +fn build_memory_maintenance_plan(memories: &[crate::storage::MemoryRecord]) -> MemoryMaintenancePlan { + let mut plan = MemoryMaintenancePlan::default(); + let mut seen = HashSet::new(); + + for memory in memories { + let normalized_content = memory.content.trim(); + if normalized_content.is_empty() { + continue; + } + + let dedupe_key = format!( + "{}\u{1f}{}\u{1f}{}", + memory.namespace.trim().to_ascii_lowercase(), + memory.memory_key.trim().to_ascii_lowercase(), + normalized_content + ); + if !seen.insert(dedupe_key) { + continue; + } + + let candidate = MemoryMaintenanceCandidate { + id: memory.id.clone(), + namespace: memory.namespace.clone(), + key: memory.memory_key.clone(), + content: normalized_content.to_string(), + }; + + match memory_maintenance_category(&memory.namespace) { + MemoryMaintenanceCategory::UserFacts => plan.user_facts.push(candidate), + MemoryMaintenanceCategory::Preferences => plan.preferences.push(candidate), + MemoryMaintenanceCategory::BehaviorPatterns => plan.behavior_patterns.push(candidate), + MemoryMaintenanceCategory::Other => plan.others.push(candidate), + } + } + + plan +} + +fn memory_maintenance_category(namespace: &str) -> MemoryMaintenanceCategory { + match namespace.trim().to_ascii_lowercase().as_str() { + "profile" | "facts" | "identity" => MemoryMaintenanceCategory::UserFacts, + "preferences" | "style" | "likes" => MemoryMaintenanceCategory::Preferences, + "patterns" | "behavior" | "habits" | "workflow" => MemoryMaintenanceCategory::BehaviorPatterns, + _ => MemoryMaintenanceCategory::Other, + } +} + +fn render_managed_agent_memory_block(markdown_body: &str) -> String { + format!( + "{MANAGED_AGENT_MEMORY_BLOCK_START}\n{MANAGED_AGENT_MEMORY_TITLE}\n\n{}\n{MANAGED_AGENT_MEMORY_BLOCK_END}", + markdown_body.trim() + ) +} + +fn upsert_managed_agent_memory_block(existing: &str, markdown_body: &str) -> String { + let managed_block = render_managed_agent_memory_block(markdown_body); + + if let (Some(start), Some(end)) = ( + existing.find(MANAGED_AGENT_MEMORY_BLOCK_START), + existing.find(MANAGED_AGENT_MEMORY_BLOCK_END), + ) { + let end = end + MANAGED_AGENT_MEMORY_BLOCK_END.len(); + let mut updated = String::new(); + updated.push_str(existing[..start].trim_end()); + updated.push_str("\n\n"); + updated.push_str(&managed_block); + updated.push_str("\n\n"); + updated.push_str(existing[end..].trim_start()); + return updated.trim().to_string() + "\n"; + } + + if let Some(reply_rules_index) = existing.find("## 回复规则") { + let mut updated = String::new(); + updated.push_str(existing[..reply_rules_index].trim_end()); + updated.push_str("\n\n"); + updated.push_str(&managed_block); + updated.push_str("\n\n"); + updated.push_str(existing[reply_rules_index..].trim_start()); + return updated.trim().to_string() + "\n"; + } + + let mut updated = existing.trim_end().to_string(); + if !updated.is_empty() { + updated.push_str("\n\n"); + } + updated.push_str(&managed_block); + updated.push('\n'); + updated +} + +fn write_agent_prompt(path: &Path, content: &str) -> Result<(), AgentError> { + if let Some(parent) = path.parent() { + fs::create_dir_all(parent) + .map_err(|err| AgentError::Other(format!("create agent prompt dir error: {}", err)))?; + } + + let temp_path = path.with_extension("md.tmp"); + fs::write(&temp_path, content) + .map_err(|err| AgentError::Other(format!("write agent prompt temp file error: {}", err)))?; + fs::rename(&temp_path, path) + .map_err(|err| AgentError::Other(format!("replace agent prompt file error: {}", err)))?; + Ok(()) +} + +fn strip_json_code_fence(content: &str) -> &str { + let trimmed = content.trim(); + if let Some(rest) = trimmed.strip_prefix("```json") { + return rest.strip_suffix("```").map(str::trim).unwrap_or(trimmed); + } + if let Some(rest) = trimmed.strip_prefix("```") { + return rest.strip_suffix("```").map(str::trim).unwrap_or(trimmed); + } + trimmed +} + +fn combine_managed_memory_markdown(chunks: &[String]) -> String { + let normalized_chunks = chunks + .iter() + .map(|chunk| chunk.trim()) + .filter(|chunk| !chunk.is_empty()) + .collect::>(); + + let mut combined = Vec::new(); + for (index, chunk) in normalized_chunks.iter().enumerate() { + let chunk_lines = chunk + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .collect::>(); + + let is_subset_of_other = normalized_chunks.iter().enumerate().any(|(other_index, other)| { + if index == other_index { + return false; + } + + let other_lines = other + .lines() + .map(str::trim) + .filter(|line| !line.is_empty()) + .collect::>(); + + chunk_lines.len() < other_lines.len() && chunk_lines.is_subset(&other_lines) + }); + + if !is_subset_of_other && !combined.iter().any(|existing: &String| existing == chunk) { + combined.push((*chunk).to_string()); + } + } + + combined.join("\n\n") +} + +fn apply_memory_maintenance_output( + store: &SessionStore, + scope_key: &str, + plan: &MemoryMaintenancePlan, + output: &MemoryMaintenanceModelOutput, +) -> Result<(), AgentError> { + let all_candidates = plan + .user_facts + .iter() + .chain(plan.preferences.iter()) + .chain(plan.behavior_patterns.iter()) + .chain(plan.others.iter()) + .cloned() + .collect::>(); + + let candidates_by_id = all_candidates + .iter() + .map(|candidate| (candidate.id.as_str(), candidate)) + .collect::>(); + + let mut deleted_ids = HashSet::new(); + + for merge in &output.merges { + if merge.source_ids.is_empty() { + continue; + } + + let source_candidates = merge + .source_ids + .iter() + .filter_map(|id| candidates_by_id.get(id.as_str()).copied()) + .collect::>(); + if source_candidates.is_empty() { + continue; + } + + let existing_target_id = source_candidates + .iter() + .find(|candidate| candidate.namespace == merge.namespace && candidate.key == merge.memory_key) + .map(|candidate| candidate.id.clone()); + + store + .put_memory(&crate::storage::MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: scope_key.to_string(), + namespace: merge.namespace.trim().to_string(), + memory_key: merge.memory_key.trim().to_string(), + content: merge.content.trim().to_string(), + source_type: "memory_maintenance".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + }) + .map_err(|err| AgentError::Other(format!("upsert merged memory error: {}", err)))?; + + for candidate in source_candidates { + if existing_target_id.as_ref().is_some_and(|target_id| target_id == &candidate.id) { + continue; + } + if deleted_ids.insert(candidate.id.clone()) { + store + .delete_memory("user", scope_key, &candidate.namespace, &candidate.key) + .map_err(|err| AgentError::Other(format!("delete merged source memory error: {}", err)))?; + } + } + } + + for memory_id in &output.low_value_ids { + if let Some(candidate) = candidates_by_id.get(memory_id.as_str()) { + if deleted_ids.insert(candidate.id.clone()) { + store + .delete_memory("user", scope_key, &candidate.namespace, &candidate.key) + .map_err(|err| AgentError::Other(format!("delete low value memory error: {}", err)))?; + } + } + } + + Ok(()) +} /// Session 按 channel 隔离,每个 channel 一个 Session /// History 按 chat_id 隔离,由 Session 统一管理 @@ -371,8 +667,7 @@ fn load_agent_prompt() -> Result, AgentError> { } if !path.exists() { - fs::write(&path, DEFAULT_AGENT_PROMPT) - .map_err(|err| AgentError::Other(format!("create agent prompt file error: {}", err)))?; + write_agent_prompt(&path, DEFAULT_AGENT_PROMPT)?; } let content = fs::read_to_string(&path) @@ -509,6 +804,128 @@ impl SessionManager { self.skills.clone() } + pub(crate) fn build_memory_maintenance_plan_for_scope( + &self, + scope_key: &str, + ) -> Result, AgentError> { + let memories = self + .store + .list_memories_for_scope("user", scope_key) + .map_err(|err| AgentError::Other(format!("list memories for scope error: {}", err)))?; + + if memories.is_empty() { + return Ok(None); + } + + Ok(Some(build_memory_maintenance_plan(&memories))) + } + + pub(crate) fn upsert_managed_agent_memory_summary(&self, markdown_body: &str) -> Result<(), AgentError> { + let path = agent_prompt_path()?; + let existing = if path.exists() { + fs::read_to_string(&path) + .map_err(|err| AgentError::Other(format!("read agent prompt file error: {}", err)))? + } else { + DEFAULT_AGENT_PROMPT.to_string() + }; + let updated = upsert_managed_agent_memory_block(&existing, markdown_body); + write_agent_prompt(&path, &updated) + } + + #[cfg_attr(not(test), allow(dead_code))] + pub(crate) async fn summarize_memory_maintenance_for_scope( + &self, + scope_key: &str, + ) -> Result, AgentError> { + let Some(plan) = self.build_memory_maintenance_plan_for_scope(scope_key)? else { + return Ok(None); + }; + + self.summarize_memory_maintenance_plan(scope_key, &plan).await.map(Some) + } + + async fn summarize_memory_maintenance_plan( + &self, + scope_key: &str, + plan: &MemoryMaintenancePlan, + ) -> Result { + + let provider_config = self.provider_config_for_agent(None)?; + let provider = create_provider(provider_config) + .map_err(|err| AgentError::Other(format!("create maintenance provider error: {}", err)))?; + + let request = ChatCompletionRequest { + messages: vec![ + Message::system(MEMORY_MAINTENANCE_SYSTEM_PROMPT), + Message::user( + serde_json::to_string_pretty(&serde_json::json!({ + "scope_key": scope_key, + "candidates": plan, + })) + .unwrap_or_else(|_| "{}".to_string()), + ), + ], + temperature: Some(0.0), + max_tokens: Some(1200), + tools: None, + }; + + let response = provider + .chat(request) + .await + .map_err(|err| AgentError::Other(format!("memory maintenance model error: {}", err)))?; + + let output: MemoryMaintenanceModelOutput = serde_json::from_str(strip_json_code_fence(&response.content)) + .map_err(|err| AgentError::Other(format!("memory maintenance JSON decode error: {}", err)))?; + + Ok(output) + } + + pub(crate) async fn run_memory_maintenance_for_scope( + &self, + scope_key: &str, + ) -> Result, AgentError> { + let Some(plan) = self.build_memory_maintenance_plan_for_scope(scope_key)? else { + return Ok(None); + }; + + let output = self.summarize_memory_maintenance_plan(scope_key, &plan).await?; + apply_memory_maintenance_output(self.store.as_ref(), scope_key, &plan, &output)?; + + Ok(Some(output)) + } + + pub(crate) async fn run_memory_maintenance_for_all_scopes( + &self, + ) -> Result, AgentError> { + let scope_keys = self + .store + .list_memory_scope_keys("user") + .map_err(|err| AgentError::Other(format!("list memory scope keys error: {}", err)))?; + let mut results = Vec::new(); + + for scope_key in scope_keys { + let Some(output) = self.run_memory_maintenance_for_scope(&scope_key).await? else { + continue; + }; + + results.push(MemoryMaintenanceScopeResult { scope_key, output }); + } + + let combined_markdown = combine_managed_memory_markdown( + &results + .iter() + .map(|result| result.output.managed_markdown.clone()) + .collect::>(), + ); + + if !combined_markdown.is_empty() { + self.upsert_managed_agent_memory_summary(&combined_markdown)?; + } + + Ok(results) + } + pub fn provider_config_for_agent(&self, agent_name: Option<&str>) -> Result { select_provider_config(&self.provider_config, &self.provider_configs, agent_name) } @@ -907,6 +1324,7 @@ mod tests { use super::*; use axum::{Json, Router, routing::post}; use crate::bus::MessageBus; + use crate::storage::MemoryRecord; use serde_json::{Value, json}; use std::collections::HashMap; use tokio::net::TcpListener; @@ -1009,6 +1427,11 @@ mod tests { .get("model") .and_then(|value| value.as_str()) .unwrap_or("unknown-model"); + let content = body + .get("mock_response_content") + .and_then(|value| value.as_str()) + .map(ToString::to_string) + .unwrap_or_else(|| format!("reply from {}", model)); Json(json!({ "id": "mock-response", @@ -1016,7 +1439,7 @@ mod tests { "choices": [ { "message": { - "content": format!("reply from {}", model), + "content": content, "tool_calls": [] } } @@ -1158,6 +1581,166 @@ mod tests { assert!(default_outbound[0].content.contains("default-model")); } + #[tokio::test] + async fn test_summarize_memory_maintenance_for_scope_uses_model_output() { + let base_url = start_mock_openai_server().await; + let mock_response_content = serde_json::to_string(&json!({ + "user_facts": ["用户在做AI产品"], + "preferences": ["偏好简洁表达"], + "behavior_patterns": ["习惯先问方案再要代码"], + "merges": [], + "conflicts": [], + "low_value_ids": [], + "managed_markdown": "### 用户事实\n- 用户在做AI产品\n\n### 用户偏好\n- 偏好简洁表达\n\n### 行为模式\n- 习惯先问方案再要代码" + })) + .unwrap(); + + let provider_config = LLMProviderConfig { + provider_type: "openai".to_string(), + name: "maintenance-provider".to_string(), + base_url, + api_key: "test-key".to_string(), + extra_headers: HashMap::new(), + model_id: "maintenance-model".to_string(), + temperature: Some(0.0), + max_tokens: Some(256), + model_extra: HashMap::from([( + "mock_response_content".to_string(), + json!(mock_response_content), + )]), + max_tool_iterations: 1, + token_limit: 4096, + llm_timeout_secs: 30, + }; + + let session_manager = SessionManager::new( + 4, + 100, + false, + provider_config.clone(), + HashMap::from([("default".to_string(), provider_config)]), + Arc::new(SkillRuntime::default()), + ) + .unwrap(); + + session_manager + .store() + .put_memory(&crate::storage::MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: "feishu:user-1".to_string(), + namespace: "profile".to_string(), + memory_key: "work".to_string(), + content: "用户在做AI产品".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + }) + .unwrap(); + + let output = session_manager + .summarize_memory_maintenance_for_scope("feishu:user-1") + .await + .unwrap() + .unwrap(); + + assert_eq!(output.user_facts, vec!["用户在做AI产品".to_string()]); + assert_eq!(output.preferences, vec!["偏好简洁表达".to_string()]); + assert_eq!(output.behavior_patterns, vec!["习惯先问方案再要代码".to_string()]); + assert!(output.managed_markdown.contains("### 用户事实")); + } + + #[test] + fn test_apply_memory_maintenance_output_merges_and_deletes_low_value_records() { + let store = SessionStore::in_memory().unwrap(); + let scope_key = "feishu:user-1"; + + let work = store + .put_memory(&crate::storage::MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: scope_key.to_string(), + namespace: "profile".to_string(), + memory_key: "work_short".to_string(), + content: "用户在做AI产品".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + }) + .unwrap(); + let role = store + .put_memory(&crate::storage::MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: scope_key.to_string(), + namespace: "profile".to_string(), + memory_key: "work_detail".to_string(), + content: "用户主要在做AI产品设计和实现".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + }) + .unwrap(); + let noise = store + .put_memory(&crate::storage::MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: scope_key.to_string(), + namespace: "notes".to_string(), + memory_key: "temporary".to_string(), + content: "今天临时提到过一个无后续的小细节".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + }) + .unwrap(); + + let plan = build_memory_maintenance_plan(&store.list_memories_for_scope("user", scope_key).unwrap()); + let output = MemoryMaintenanceModelOutput { + user_facts: vec!["用户在做AI产品".to_string()], + preferences: Vec::new(), + behavior_patterns: Vec::new(), + merges: vec![MemoryMaintenanceMerge { + source_ids: vec![work.id.clone(), role.id.clone()], + namespace: "profile".to_string(), + memory_key: "work".to_string(), + content: "用户主要在做AI产品设计与实现".to_string(), + }], + conflicts: Vec::new(), + low_value_ids: vec![noise.id.clone()], + managed_markdown: "### 用户事实\n- 用户在做AI产品".to_string(), + }; + + apply_memory_maintenance_output(&store, scope_key, &plan, &output).unwrap(); + + let all_memories = store.list_memories_for_scope("user", scope_key).unwrap(); + assert_eq!(all_memories.len(), 1); + assert_eq!(all_memories[0].namespace, "profile"); + assert_eq!(all_memories[0].memory_key, "work"); + assert_eq!(all_memories[0].content, "用户主要在做AI产品设计与实现"); + } + + #[test] + fn test_combine_managed_memory_markdown_prefers_richer_summary_over_subset() { + let combined = combine_managed_memory_markdown(&[ + "### 用户事实\n- 用户在做AI产品\n\n### 用户偏好\n- 偏好简洁表达".to_string(), + "- 用户在做AI产品".to_string(), + "### 用户事实\n- 用户名为区德成,昵称DC。".to_string(), + ]); + + assert!(combined.contains("### 用户事实\n- 用户在做AI产品\n\n### 用户偏好\n- 偏好简洁表达")); + assert!(combined.contains("### 用户事实\n- 用户名为区德成,昵称DC。")); + assert_eq!(combined.matches("- 用户在做AI产品").count(), 1); + } + #[test] fn test_should_display_message_to_user_hides_completed_tool_results_by_default() { let completed = ChatMessage::tool("call-1", "calculator", "2"); @@ -1387,4 +1970,99 @@ mod tests { assert_eq!(history[0].role, "system"); } + #[test] + fn test_build_memory_maintenance_plan_deduplicates_and_categorizes() { + let memories = vec![ + MemoryRecord { + id: "1".to_string(), + scope_kind: "user".to_string(), + scope_key: "feishu:user-1".to_string(), + namespace: "profile".to_string(), + memory_key: "work".to_string(), + content: "用户在做AI产品".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + created_at: 1, + updated_at: 1, + }, + MemoryRecord { + id: "2".to_string(), + scope_kind: "user".to_string(), + scope_key: "feishu:user-1".to_string(), + namespace: "profile".to_string(), + memory_key: "work".to_string(), + content: "用户在做AI产品".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + created_at: 2, + updated_at: 2, + }, + MemoryRecord { + id: "3".to_string(), + scope_kind: "user".to_string(), + scope_key: "feishu:user-1".to_string(), + namespace: "preferences".to_string(), + memory_key: "style".to_string(), + content: "偏好简洁表达".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + created_at: 3, + updated_at: 3, + }, + MemoryRecord { + id: "4".to_string(), + scope_kind: "user".to_string(), + scope_key: "feishu:user-1".to_string(), + namespace: "patterns".to_string(), + memory_key: "workflow".to_string(), + content: "习惯先问方案再要代码".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + created_at: 4, + updated_at: 4, + }, + ]; + + let plan = build_memory_maintenance_plan(&memories); + assert_eq!(plan.user_facts.len(), 1); + assert_eq!(plan.preferences.len(), 1); + assert_eq!(plan.behavior_patterns.len(), 1); + assert!(plan.others.is_empty()); + assert_eq!(plan.user_facts[0].content, "用户在做AI产品"); + assert_eq!(plan.preferences[0].content, "偏好简洁表达"); + assert_eq!(plan.behavior_patterns[0].content, "习惯先问方案再要代码"); + } + + #[test] + fn test_upsert_managed_agent_memory_block_inserts_before_reply_rules() { + let original = "# PicoBot 代理配置\n\n## 身份\n- 你是 PicoBot。\n\n## 回复规则\n- 使用中文回复。\n"; + let updated = upsert_managed_agent_memory_block( + original, + "### 用户事实\n- 用户在做AI产品\n\n### 用户偏好\n- 偏好简洁表达", + ); + + let managed_pos = updated.find(MANAGED_AGENT_MEMORY_BLOCK_START).unwrap(); + let reply_rules_pos = updated.find("## 回复规则").unwrap(); + assert!(managed_pos < reply_rules_pos); + assert!(updated.contains(MANAGED_AGENT_MEMORY_TITLE)); + assert!(updated.contains("用户在做AI产品")); + assert!(updated.contains("偏好简洁表达")); + } + } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 5292751..16091f8 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -72,8 +72,8 @@ impl Scheduler { fn sync_config_jobs(&self) -> anyhow::Result<()> { let now = Utc::now(); - for job in &self.config.jobs { - let runtime = RuntimeJob::from_config(job, now, self.config.misfire_policy)?; + for job in self.config.effective_jobs() { + let runtime = RuntimeJob::from_config(&job, now, self.config.misfire_policy)?; self.store.upsert_scheduler_job(&runtime.to_upsert())?; } Ok(()) @@ -484,6 +484,24 @@ async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJ tracing::info!(job_id = %job.id, removed, "Scheduler session cleanup completed"); Ok(()) } + "memory_maintenance" => { + let results = session_manager.run_memory_maintenance_for_all_scopes().await?; + for result in &results { + tracing::info!( + job_id = %job.id, + scope_key = %result.scope_key, + user_facts = result.output.user_facts.len(), + preferences = result.output.preferences.len(), + behavior_patterns = result.output.behavior_patterns.len(), + merges = result.output.merges.len(), + conflicts = result.output.conflicts.len(), + low_value = result.output.low_value_ids.len(), + "Scheduler completed memory maintenance model run" + ); + } + tracing::info!(job_id = %job.id, scope_count = results.len(), "Scheduler memory maintenance triggered"); + Ok(()) + } other => anyhow::bail!("unsupported internal scheduler event: {}", other), } } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 95c3bee..56776a0 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -813,6 +813,50 @@ impl SessionStore { Ok(memories) } + pub fn list_memory_scope_keys(&self, scope_kind: &str) -> Result, StorageError> { + let conn = self.conn.lock().expect("session db mutex poisoned"); + let mut stmt = conn.prepare( + " + SELECT DISTINCT scope_key + FROM memories + WHERE scope_kind = ?1 + ORDER BY scope_key ASC + ", + )?; + + let rows = stmt.query_map(params![scope_kind], |row| row.get::<_, String>(0))?; + let mut scope_keys = Vec::new(); + for row in rows { + scope_keys.push(row?); + } + Ok(scope_keys) + } + + pub fn list_memories_for_scope( + &self, + scope_kind: &str, + scope_key: &str, + ) -> Result, StorageError> { + let conn = self.conn.lock().expect("session db mutex poisoned"); + let mut stmt = conn.prepare( + " + SELECT id, scope_kind, scope_key, namespace, memory_key, content, + source_type, source_session_id, source_message_id, source_message_seq, + source_channel_name, source_chat_id, created_at, updated_at + FROM memories + WHERE scope_kind = ?1 AND scope_key = ?2 + ORDER BY updated_at DESC, namespace ASC, memory_key ASC + ", + )?; + + let rows = stmt.query_map(params![scope_kind, scope_key], map_memory_record)?; + let mut memories = Vec::new(); + for row in rows { + memories.push(row?); + } + Ok(memories) + } + pub fn update_memory( &self, input: &MemoryUpsert, @@ -1911,6 +1955,66 @@ mod tests { assert!(hits.iter().any(|memory| memory.memory_key == "quality")); } + #[test] + fn test_memory_scope_listing_and_full_scope_read() { + let store = SessionStore::in_memory().unwrap(); + + store + .put_memory(&MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: "feishu:user-2".to_string(), + namespace: "preferences".to_string(), + memory_key: "style".to_string(), + content: "偏好简洁表达".to_string(), + source_type: "message".to_string(), + source_session_id: Some("feishu:chat-2".to_string()), + source_message_id: Some("msg-2".to_string()), + source_message_seq: Some(2), + source_channel_name: Some("feishu".to_string()), + source_chat_id: Some("chat-2".to_string()), + }) + .unwrap(); + store + .put_memory(&MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: "feishu:user-1".to_string(), + namespace: "profile".to_string(), + memory_key: "work".to_string(), + content: "用户在做AI产品".to_string(), + source_type: "message".to_string(), + source_session_id: Some("feishu:chat-1".to_string()), + source_message_id: Some("msg-1".to_string()), + source_message_seq: Some(1), + source_channel_name: Some("feishu".to_string()), + source_chat_id: Some("chat-1".to_string()), + }) + .unwrap(); + store + .put_memory(&MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: "feishu:user-1".to_string(), + namespace: "patterns".to_string(), + memory_key: "workflow".to_string(), + content: "习惯先问方案再要代码".to_string(), + source_type: "message".to_string(), + source_session_id: Some("feishu:chat-1".to_string()), + source_message_id: Some("msg-3".to_string()), + source_message_seq: Some(3), + source_channel_name: Some("feishu".to_string()), + source_chat_id: Some("chat-1".to_string()), + }) + .unwrap(); + + let scope_keys = store.list_memory_scope_keys("user").unwrap(); + assert_eq!(scope_keys, vec!["feishu:user-1".to_string(), "feishu:user-2".to_string()]); + + let full_scope = store.list_memories_for_scope("user", "feishu:user-1").unwrap(); + assert_eq!(full_scope.len(), 2); + assert!(full_scope.iter().all(|memory| memory.scope_key == "feishu:user-1")); + assert!(full_scope.iter().any(|memory| memory.memory_key == "work")); + assert!(full_scope.iter().any(|memory| memory.memory_key == "workflow")); + } + #[test] fn test_scheduler_job_roundtrip_and_runtime_update() { let store = SessionStore::in_memory().unwrap();