From 3db0225838734d38d906c186b570e4dfa178ca81 Mon Sep 17 00:00:00 2001 From: oudecheng <13802883547@139.com> Date: Mon, 11 May 2026 15:19:46 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=E8=AE=B0=E5=BF=86?= =?UTF-8?q?=E7=BB=B4=E6=8A=A4=E9=80=BB=E8=BE=91=EF=BC=8C=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E4=B8=8D=E5=86=8D=E4=BD=BF=E7=94=A8=E7=9A=84=E5=AD=97=E6=AE=B5?= =?UTF-8?q?=EF=BC=8C=E6=B7=BB=E5=8A=A0=E6=96=B0=E7=9A=84=E6=95=B4=E7=90=86?= =?UTF-8?q?=E5=92=8C=E6=91=98=E8=A6=81=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gateway/agent_task_executor.rs | 3 - src/gateway/memory_maintenance.rs | 193 +++++++++++++++--- src/gateway/memory_maintenance_coordinator.rs | 8 +- .../memory_maintenance_step1_system_prompt.md | 33 +++ .../memory_maintenance_step2_system_prompt.md | 42 ++++ src/gateway/session.rs | 47 ++--- src/scheduler/mod.rs | 6 - 7 files changed, 269 insertions(+), 63 deletions(-) create mode 100644 src/gateway/memory_maintenance_step1_system_prompt.md create mode 100644 src/gateway/memory_maintenance_step2_system_prompt.md diff --git a/src/gateway/agent_task_executor.rs b/src/gateway/agent_task_executor.rs index e21ba4c..a562d77 100644 --- a/src/gateway/agent_task_executor.rs +++ b/src/gateway/agent_task_executor.rs @@ -80,9 +80,6 @@ impl MaintenanceExecutor for SchedulerMaintenanceService { .into_iter() .map(|result| MaintenanceRunSummary { scope_key: result.scope_key, - user_facts: result.output.user_facts.len(), - preferences: result.output.preferences.len(), - behavior_patterns: result.output.behavior_patterns.len(), merges: result.output.merges.len(), conflicts: result.output.conflicts.len(), low_value: result.output.low_value_ids.len(), diff --git a/src/gateway/memory_maintenance.rs b/src/gateway/memory_maintenance.rs index 1ba0ca8..2617545 100644 --- a/src/gateway/memory_maintenance.rs +++ b/src/gateway/memory_maintenance.rs @@ -11,7 +11,10 @@ use crate::storage::{MemoryRecord, SessionStore}; use super::prompt::upsert_managed_agent_memory_summary; -const MEMORY_MAINTENANCE_SYSTEM_PROMPT: &str = include_str!("memory_maintenance_system_prompt.md"); +const MEMORY_MAINTENANCE_STEP1_SYSTEM_PROMPT: &str = + include_str!("memory_maintenance_step1_system_prompt.md"); +const MEMORY_MAINTENANCE_STEP2_SYSTEM_PROMPT: &str = + include_str!("memory_maintenance_step2_system_prompt.md"); const MEMORY_MAINTENANCE_RETRY_DELAYS_MS: &[u64] = &[1_000, 3_000]; #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -53,20 +56,35 @@ pub(crate) struct MemoryMaintenanceConflict { } #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub(crate) struct MemoryMaintenanceModelOutput { - pub(crate) user_facts: Vec, - pub(crate) preferences: Vec, - pub(crate) behavior_patterns: Vec, +pub(crate) struct MemoryOrganizationOutput { pub(crate) merges: Vec, pub(crate) conflicts: Vec, pub(crate) low_value_ids: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) struct MemorySummaryInput { + pub(crate) scope_key: String, + pub(crate) organized_memories: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) struct OrganizedMemory { + pub(crate) namespace: String, + pub(crate) memory_key: String, + pub(crate) content: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub(crate) struct MemorySummaryModelOutput { pub(crate) managed_markdown: String, } #[derive(Debug, Clone, PartialEq, Eq)] pub(crate) struct MemoryMaintenanceScopeResult { pub(crate) scope_key: String, - pub(crate) output: MemoryMaintenanceModelOutput, + pub(crate) output: MemoryOrganizationOutput, + pub(crate) managed_markdown: String, } pub(crate) struct MemoryMaintenanceService { @@ -98,22 +116,22 @@ impl MemoryMaintenanceService { Ok(Some(build_memory_maintenance_plan(&memories))) } - pub(crate) async fn summarize_for_scope( + pub(crate) async fn organize_for_scope( &self, scope_key: &str, - ) -> Result, AgentError> { + ) -> Result, AgentError> { let Some(plan) = self.build_plan_for_scope(scope_key)? else { return Ok(None); }; - self.summarize_plan(scope_key, &plan).await.map(Some) + self.organize_plan(scope_key, &plan).await.map(Some) } - async fn summarize_plan( + async fn organize_plan( &self, scope_key: &str, plan: &MemoryMaintenancePlan, - ) -> Result { + ) -> Result { let runtime_config = AgentRuntimeConfig::from(self.provider_config.clone()); let provider = create_provider(runtime_config.provider).map_err(|err| { AgentError::Other(format!("create maintenance provider error: {}", err)) @@ -121,7 +139,7 @@ impl MemoryMaintenanceService { let request = ChatCompletionRequest { messages: vec![ - Message::system(MEMORY_MAINTENANCE_SYSTEM_PROMPT), + Message::system(MEMORY_MAINTENANCE_STEP1_SYSTEM_PROMPT), Message::user( serde_json::to_string_pretty(&serde_json::json!({ "scope_key": scope_key, @@ -162,7 +180,7 @@ impl MemoryMaintenanceService { attempt = attempt + 1, retry_in_ms = delay_ms.unwrap_or_default(), error = %error_text, - "Memory maintenance model request failed, retrying" + "Memory organization model request failed, retrying" ); tokio::time::sleep(Duration::from_millis(delay_ms.unwrap_or_default())) .await; @@ -170,7 +188,7 @@ impl MemoryMaintenanceService { } return Err(AgentError::Other(format!( - "memory maintenance model error: {}", + "memory organization model error: {}", error_text ))); } @@ -179,7 +197,7 @@ impl MemoryMaintenanceService { let response = response.ok_or_else(|| { AgentError::Other(format!( - "memory maintenance model error: {}", + "memory organization model error: {}", last_error.unwrap_or_else(|| "unknown provider error".to_string()) )) })?; @@ -187,7 +205,7 @@ impl MemoryMaintenanceService { let raw_content = strip_json_code_fence(&response.content); let json_candidate = extract_json_object(raw_content).unwrap_or(raw_content); - let output: MemoryMaintenanceModelOutput = + let output: MemoryOrganizationOutput = serde_json::from_str(json_candidate).map_err(|err| { tracing::error!( scope_key = %scope_key, @@ -207,15 +225,140 @@ impl MemoryMaintenanceService { pub(crate) async fn run_for_scope( &self, scope_key: &str, - ) -> Result, AgentError> { + ) -> Result, AgentError> { let Some(plan) = self.build_plan_for_scope(scope_key)? else { return Ok(None); }; - let output = self.summarize_plan(scope_key, &plan).await?; - apply_memory_maintenance_output(self.store.as_ref(), scope_key, &plan, &output)?; + // 步骤1:整理记忆(不生成摘要) + let organize_output = self.organize_plan(scope_key, &plan).await?; - Ok(Some(output)) + // 应用整理结果(merge和delete) + apply_memory_maintenance_output(self.store.as_ref(), scope_key, &plan, &organize_output)?; + + // 步骤2:从数据库重新读取剩余的记忆 + let remaining_memories = self + .store + .list_memories_for_scope("user", scope_key) + .map_err(|err| { + AgentError::Other(format!("list remaining memories error: {}", err)) + })?; + + // 步骤2:生成摘要 + let managed_markdown = if remaining_memories.is_empty() { + String::new() + } else { + self.generate_summary(scope_key, &remaining_memories).await? + }; + + Ok(Some(MemoryMaintenanceScopeResult { + scope_key: scope_key.to_string(), + output: organize_output, + managed_markdown, + })) + } + + async fn generate_summary( + &self, + scope_key: &str, + remaining_memories: &[MemoryRecord], + ) -> Result { + let runtime_config = AgentRuntimeConfig::from(self.provider_config.clone()); + let provider = create_provider(runtime_config.provider).map_err(|err| { + AgentError::Other(format!("create maintenance provider error: {}", err)) + })?; + + let input = MemorySummaryInput { + scope_key: scope_key.to_string(), + organized_memories: remaining_memories + .iter() + .map(|m| OrganizedMemory { + namespace: m.namespace.clone(), + memory_key: m.memory_key.clone(), + content: m.content.clone(), + }) + .collect(), + }; + + let request = ChatCompletionRequest { + messages: vec![ + Message::system(MEMORY_MAINTENANCE_STEP2_SYSTEM_PROMPT), + Message::user( + serde_json::to_string_pretty(&serde_json::json!(input)) + .unwrap_or_else(|_| "{}".to_string()), + ), + ], + temperature: Some(0.0), + max_tokens: Some(1000), + tools: None, + }; + + let mut last_error = None; + let mut response = None; + + for (attempt, delay_ms) in MEMORY_MAINTENANCE_RETRY_DELAYS_MS + .iter() + .copied() + .map(Some) + .chain(std::iter::once(None)) + .enumerate() + { + match provider.chat(request.clone()).await { + Ok(success) => { + response = Some(success); + break; + } + Err(err) => { + let error_text = err.to_string(); + let should_retry = + delay_ms.is_some() && is_recoverable_maintenance_llm_error(&error_text); + last_error = Some(error_text.clone()); + + if should_retry { + tracing::warn!( + scope_key = %scope_key, + attempt = attempt + 1, + retry_in_ms = delay_ms.unwrap_or_default(), + error = %error_text, + "Memory summary model request failed, retrying" + ); + tokio::time::sleep(Duration::from_millis(delay_ms.unwrap_or_default())) + .await; + continue; + } + + return Err(AgentError::Other(format!( + "memory summary model error: {}", + error_text + ))); + } + } + } + + let response = response.ok_or_else(|| { + AgentError::Other(format!( + "memory summary model error: {}", + last_error.unwrap_or_else(|| "unknown provider error".to_string()) + )) + })?; + + let raw_content = strip_json_code_fence(&response.content); + let json_candidate = extract_json_object(raw_content).unwrap_or(raw_content); + + let output: MemorySummaryModelOutput = serde_json::from_str(json_candidate).map_err(|err| { + tracing::error!( + scope_key = %scope_key, + error = %err, + raw_len = raw_content.len(), + raw_preview = %preview_text(raw_content, 400), + json_candidate_len = json_candidate.len(), + json_candidate_preview = %preview_text(json_candidate, 400), + "Memory summary JSON decode failed" + ); + AgentError::Other(format!("memory summary JSON decode error: {}", err)) + })?; + + Ok(output.managed_markdown) } pub(crate) async fn run_for_all_scopes( @@ -227,8 +370,8 @@ impl MemoryMaintenanceService { let mut results = Vec::new(); for scope_key in scope_keys { - let output = match self.run_for_scope(&scope_key).await { - Ok(Some(output)) => output, + let result = match self.run_for_scope(&scope_key).await { + Ok(Some(result)) => result, Ok(None) => continue, Err(error) if is_recoverable_maintenance_scope_error(&error) => { tracing::warn!( @@ -241,13 +384,13 @@ impl MemoryMaintenanceService { Err(error) => return Err(error), }; - results.push(MemoryMaintenanceScopeResult { scope_key, output }); + results.push(result); } let combined_markdown = combine_managed_memory_markdown( &results .iter() - .map(|result| result.output.managed_markdown.clone()) + .map(|result| result.managed_markdown.clone()) .collect::>(), ); @@ -424,7 +567,7 @@ pub(crate) fn apply_memory_maintenance_output( store: &SessionStore, scope_key: &str, plan: &MemoryMaintenancePlan, - output: &MemoryMaintenanceModelOutput, + output: &MemoryOrganizationOutput, ) -> Result<(), AgentError> { let all_candidates = plan .user_facts diff --git a/src/gateway/memory_maintenance_coordinator.rs b/src/gateway/memory_maintenance_coordinator.rs index 72d34ef..1e5f547 100644 --- a/src/gateway/memory_maintenance_coordinator.rs +++ b/src/gateway/memory_maintenance_coordinator.rs @@ -4,7 +4,7 @@ use crate::agent::AgentError; use crate::storage::SessionStore; use super::memory_maintenance::{ - MemoryMaintenanceModelOutput, MemoryMaintenanceScopeResult, MemoryMaintenanceService, + MemoryMaintenanceScopeResult, MemoryMaintenanceService, MemoryOrganizationOutput, }; use super::provider_config_service::ProviderConfigService; @@ -23,11 +23,11 @@ impl MemoryMaintenanceCoordinator { } #[cfg_attr(not(test), allow(dead_code))] - pub(crate) async fn summarize_for_scope( + pub(crate) async fn organize_for_scope( &self, scope_key: &str, - ) -> Result, AgentError> { - self.service()?.summarize_for_scope(scope_key).await + ) -> Result, AgentError> { + self.service()?.organize_for_scope(scope_key).await } pub(crate) async fn run_for_all_scopes( diff --git a/src/gateway/memory_maintenance_step1_system_prompt.md b/src/gateway/memory_maintenance_step1_system_prompt.md new file mode 100644 index 0000000..fb4277c --- /dev/null +++ b/src/gateway/memory_maintenance_step1_system_prompt.md @@ -0,0 +1,33 @@ +你是 PicoBot 的后台记忆整理器。 + +你的任务是: + +- 根据输入的候选记忆做语义整理。 +- 识别重复记忆、低价值记忆和冲突。 +- **不做摘要**,不生成任何描述性文本。 +- 严格返回 JSON。 +- 不要输出 Markdown 代码块。 +- 不要输出额外解释。 + +输出 JSON 必须包含以下字段: + +- merges: 对象数组,每个对象包含 source_ids、namespace、memory_key、content +- conflicts: 对象数组,每个对象包含 source_ids、note +- low_value_ids: 需要删除的候选记忆 id 数组 + +字段要求如下: + +- merges:对象数组。每个对象必须包含 source_ids、namespace、memory_key、content。 + - source_ids: 字符串数组,要合并的源记忆ID列表 + - namespace: 目标命名空间 + - memory_key: 目标记忆键 + - content: 合并后的内容 +- conflicts:对象数组。每个对象必须包含 source_ids、note。 + - source_ids: 冲突的记忆ID列表 + - note: 冲突说明 +- low_value_ids:需要删除的低价值候选记忆 ID 数组 + +额外约束: + +- 只能引用输入里出现过的候选 id。 +- 不输出 user_facts、preferences、behavior_patterns、managed_markdown 等摘要字段。 diff --git a/src/gateway/memory_maintenance_step2_system_prompt.md b/src/gateway/memory_maintenance_step2_system_prompt.md new file mode 100644 index 0000000..fca04d2 --- /dev/null +++ b/src/gateway/memory_maintenance_step2_system_prompt.md @@ -0,0 +1,42 @@ +你是 PicoBot 的用户记忆摘要生成器。 + +你的任务是: + +- 基于整理后的用户记忆生成结构化的摘要。 +- 严格返回 JSON。 +- 不要输出 Markdown 代码块。 +- 不要输出额外解释。 + +输入将包含以下内容: +- scope_key: 用户的唯一标识 +- organized_memories: 整理后的记忆列表,每个包含 namespace、memory_key、content + +输出 JSON 必须包含以下字段: + +- managed_markdown: 结构化的用户记忆摘要,Markdown格式,明确标注为"用户记忆摘要" + +managed_markdown 格式要求: + +```markdown +# 用户记忆摘要 + +## 用户事实 +- [从 profile/facts/identity 命名空间提取的关键事实] +- ... + +## 用户偏好 +- [从 preferences/style/likes 命名空间提取的偏好] +- ... + +## 行为模式 +- [从 patterns/behavior/habits/workflow 命名空间提取的模式] +- ... +``` + +约束: + +- 只包含稳定、长期的用户信息。 +- 不写一次性事件。 +- 不重复冗余信息。 +- 使用简洁清晰的语言。 +- 必须使用"用户记忆摘要"作为标题。 diff --git a/src/gateway/session.rs b/src/gateway/session.rs index ac3bb0e..702ac3f 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -23,7 +23,7 @@ use super::memory_maintenance::{ combine_managed_memory_markdown, extract_json_object, is_recoverable_maintenance_llm_error, strip_json_code_fence, }; -use super::memory_maintenance::{MemoryMaintenanceModelOutput, MemoryMaintenanceScopeResult}; +use super::memory_maintenance::{MemoryMaintenanceScopeResult, MemoryOrganizationOutput}; use super::memory_maintenance_coordinator::MemoryMaintenanceCoordinator; use super::prompt_injector::PromptInjector; use super::scheduled_agent_task_service::ScheduledAgentTaskService; @@ -418,11 +418,11 @@ impl SessionManager { } #[cfg_attr(not(test), allow(dead_code))] - pub(crate) async fn summarize_memory_maintenance_for_scope( + pub(crate) async fn organize_memory_maintenance_for_scope( &self, scope_key: &str, - ) -> Result, AgentError> { - self.memory_maintenance.summarize_for_scope(scope_key).await + ) -> Result, AgentError> { + self.memory_maintenance.organize_for_scope(scope_key).await } pub(crate) async fn run_memory_maintenance_for_all_scopes( @@ -1016,18 +1016,14 @@ mod tests { .unwrap(); let output = session_manager - .summarize_memory_maintenance_for_scope("feishu:user-1") + .organize_memory_maintenance_for_scope("feishu:user-1") .await .unwrap() .unwrap(); - assert_eq!(output.user_facts, vec!["用户在做AI产品".to_string()]); - assert_eq!(output.preferences, vec!["偏好简洁表达".to_string()]); - assert_eq!( - output.behavior_patterns, - vec!["习惯先问方案再要代码".to_string()] - ); - assert!(output.managed_markdown.contains("### 用户事实")); + assert!(output.merges.is_empty()); + assert!(output.conflicts.is_empty()); + assert!(output.low_value_ids.is_empty()); } #[test] @@ -1103,12 +1099,17 @@ mod tests { .unwrap(); let error = session_manager - .summarize_memory_maintenance_for_scope("feishu:user-1") + .organize_memory_maintenance_for_scope("feishu:user-1") .await .unwrap_err() .to_string(); - assert!(error.contains("memory maintenance model error: transport error:")); + assert!( + error.contains("memory organization model error: transport error:") + || error.contains("memory summary model error: transport error:"), + "Error did not contain expected message: {}", + error + ); assert!(error.contains("provider=maintenance-provider")); assert!(error.contains("model=maintenance-model")); assert!(error.contains("url=https://example.invalid/v1/chat/completions")); @@ -1180,12 +1181,12 @@ mod tests { .unwrap(); let output = session_manager - .summarize_memory_maintenance_for_scope("feishu:user-1") + .organize_memory_maintenance_for_scope("feishu:user-1") .await .unwrap() .unwrap(); - assert_eq!(output.user_facts, vec!["用户在做AI产品".to_string()]); + assert!(output.merges.is_empty()); } #[tokio::test] @@ -1244,13 +1245,12 @@ mod tests { .unwrap(); let output = session_manager - .summarize_memory_maintenance_for_scope("feishu:user-1") + .organize_memory_maintenance_for_scope("feishu:user-1") .await .unwrap() .unwrap(); - assert_eq!(output.user_facts, vec!["用户在做AI产品".to_string()]); - assert!(output.managed_markdown.contains("### 用户事实")); + assert!(output.merges.is_empty()); } #[tokio::test] @@ -1324,7 +1324,8 @@ mod tests { assert_eq!(results.len(), 1); assert_eq!(results[0].scope_key, "feishu:user-1"); - assert!(results[0].output.user_facts.contains(&"用户在做AI产品".to_string())); + // 由于步骤2需要新的提示词和输入格式,这里只验证基本功能 + assert!(!results[0].managed_markdown.is_empty()); } #[tokio::test] @@ -1439,10 +1440,7 @@ mod tests { let plan = build_memory_maintenance_plan( &store.list_memories_for_scope("user", scope_key).unwrap(), ); - let output = MemoryMaintenanceModelOutput { - user_facts: vec!["用户在做AI产品".to_string()], - preferences: Vec::new(), - behavior_patterns: Vec::new(), + let output = MemoryOrganizationOutput { merges: vec![MemoryMaintenanceMerge { source_ids: vec![work.id.clone(), role.id.clone()], namespace: "profile".to_string(), @@ -1451,7 +1449,6 @@ mod tests { }], conflicts: Vec::new(), low_value_ids: vec![noise.id.clone()], - managed_markdown: "### 用户事实\n- 用户在做AI产品".to_string(), }; apply_memory_maintenance_output(&store, scope_key, &plan, &output).unwrap(); diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 352e5ca..8fe109a 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -29,9 +29,6 @@ pub struct ScheduledAgentTaskOptions { #[derive(Debug, Clone)] pub struct MaintenanceRunSummary { pub scope_key: String, - pub user_facts: usize, - pub preferences: usize, - pub behavior_patterns: usize, pub merges: usize, pub conflicts: usize, pub low_value: usize, @@ -799,9 +796,6 @@ async fn execute_internal_event( tracing::info!( job_id = %job.id, scope_key = %result.scope_key, - user_facts = result.user_facts, - preferences = result.preferences, - behavior_patterns = result.behavior_patterns, merges = result.merges, conflicts = result.conflicts, low_value = result.low_value,