From 7c48a0f7f932762967eb711d859b2aa7900a2906 Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Tue, 5 May 2026 19:40:07 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E7=AE=80=E5=8C=96=E5=86=85=E5=AD=98?= =?UTF-8?q?=E7=BB=B4=E6=8A=A4=E9=80=BB=E8=BE=91=EF=BC=8C=E7=A7=BB=E9=99=A4?= =?UTF-8?q?=E4=B8=8D=E5=BF=85=E8=A6=81=E7=9A=84=E6=97=B6=E9=97=B4=E6=88=B3?= =?UTF-8?q?=E5=8F=82=E6=95=B0=EF=BC=8C=E4=BC=98=E5=8C=96=E4=BB=A3=E7=A0=81?= =?UTF-8?q?=E5=8F=AF=E8=AF=BB=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gateway/agent_task_executor.rs | 16 ++------ src/gateway/memory_maintenance.rs | 18 ++------- src/gateway/memory_maintenance_coordinator.rs | 3 +- src/gateway/session.rs | 37 +++++++++++++------ src/scheduler/mod.rs | 14 ++----- 5 files changed, 37 insertions(+), 51 deletions(-) diff --git a/src/gateway/agent_task_executor.rs b/src/gateway/agent_task_executor.rs index 1b2e7ce..e21ba4c 100644 --- a/src/gateway/agent_task_executor.rs +++ b/src/gateway/agent_task_executor.rs @@ -61,13 +61,8 @@ impl SchedulerMaintenanceService { self.session_manager.cleanup_expired_sessions().await } - async fn run_memory_maintenance( - &self, - updated_since: Option, - ) -> Result, AgentError> { - self.session_manager - .run_memory_maintenance_for_all_scopes(updated_since) - .await + async fn run_memory_maintenance(&self) -> Result, AgentError> { + self.session_manager.run_memory_maintenance_for_all_scopes().await } } @@ -77,11 +72,8 @@ impl MaintenanceExecutor for SchedulerMaintenanceService { self.cleanup_sessions().await } - async fn run_memory_maintenance_for_all_scopes( - &self, - updated_since: Option, - ) -> anyhow::Result> { - self.run_memory_maintenance(updated_since) + async fn run_memory_maintenance_for_all_scopes(&self) -> anyhow::Result> { + self.run_memory_maintenance() .await .map(|results| { results diff --git a/src/gateway/memory_maintenance.rs b/src/gateway/memory_maintenance.rs index 5653bb5..1ba0ca8 100644 --- a/src/gateway/memory_maintenance.rs +++ b/src/gateway/memory_maintenance.rs @@ -220,22 +220,10 @@ impl MemoryMaintenanceService { pub(crate) async fn run_for_all_scopes( &self, - updated_since: Option, ) -> Result, AgentError> { - let scope_keys = if let Some(cutoff) = updated_since { - self.store - .list_memory_scope_keys_updated_since("user", cutoff) - .map_err(|err| { - AgentError::Other(format!( - "list memory scope keys updated since error: {}", - err - )) - })? - } else { - self.store.list_memory_scope_keys("user").map_err(|err| { - AgentError::Other(format!("list memory scope keys error: {}", err)) - })? - }; + let scope_keys = self.store.list_memory_scope_keys("user").map_err(|err| { + AgentError::Other(format!("list memory scope keys error: {}", err)) + })?; let mut results = Vec::new(); for scope_key in scope_keys { diff --git a/src/gateway/memory_maintenance_coordinator.rs b/src/gateway/memory_maintenance_coordinator.rs index 7e22c5f..72d34ef 100644 --- a/src/gateway/memory_maintenance_coordinator.rs +++ b/src/gateway/memory_maintenance_coordinator.rs @@ -32,9 +32,8 @@ impl MemoryMaintenanceCoordinator { pub(crate) async fn run_for_all_scopes( &self, - updated_since: Option, ) -> Result, AgentError> { - self.service()?.run_for_all_scopes(updated_since).await + self.service()?.run_for_all_scopes().await } fn service(&self) -> Result { diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 14056af..7a1babd 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -420,11 +420,8 @@ impl SessionManager { pub(crate) async fn run_memory_maintenance_for_all_scopes( &self, - updated_since: Option, ) -> Result, AgentError> { - self.memory_maintenance - .run_for_all_scopes(updated_since) - .await + self.memory_maintenance.run_for_all_scopes().await } /// 确保 session 存在且未超时,超时则重建 @@ -1239,18 +1236,34 @@ mod tests { } #[tokio::test] - async fn test_run_memory_maintenance_for_all_scopes_returns_empty_when_no_recent_updates() { + async fn test_run_memory_maintenance_for_all_scopes_scans_all_scopes_even_without_recent_updates() { + let mock_response_content = serde_json::to_string(&json!({ + "user_facts": ["用户在做AI产品"], + "preferences": [], + "behavior_patterns": [], + "merges": [], + "conflicts": [], + "low_value_ids": [], + "managed_markdown": "### 用户事实\n- 用户在做AI产品" + })) + .unwrap(); + let base_url = + start_mock_openai_server_with_content(Some(mock_response_content.clone())).await; + let provider_config = LLMProviderConfig { provider_type: "openai".to_string(), name: "maintenance-provider".to_string(), - base_url: "http://localhost".to_string(), + base_url, api_key: "test-key".to_string(), extra_headers: HashMap::new(), model_id: "maintenance-model".to_string(), temperature: Some(0.0), max_tokens: Some(256), context_window_tokens: None, - model_extra: HashMap::new(), + model_extra: HashMap::from([( + "mock_response_content".to_string(), + json!(mock_response_content), + )]), max_tool_iterations: 1, llm_timeout_secs: 30, tool_result_max_chars: 20_000, @@ -1268,7 +1281,7 @@ mod tests { ) .unwrap(); - let memory = session_manager + session_manager .store() .put_memory(&crate::storage::MemoryUpsert { scope_kind: "user".to_string(), @@ -1286,11 +1299,13 @@ mod tests { .unwrap(); let results = session_manager - .run_memory_maintenance_for_all_scopes(Some(memory.updated_at + 1)) + .run_memory_maintenance_for_all_scopes() .await .unwrap(); - assert!(results.is_empty()); + assert_eq!(results.len(), 1); + assert_eq!(results[0].scope_key, "feishu:user-1"); + assert!(results[0].output.user_facts.contains(&"用户在做AI产品".to_string())); } #[tokio::test] @@ -1343,7 +1358,7 @@ mod tests { } let results = session_manager - .run_memory_maintenance_for_all_scopes(None) + .run_memory_maintenance_for_all_scopes() .await .unwrap(); diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index fd7487d..ac4eabd 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -52,10 +52,7 @@ pub trait AgentTaskExecutor: Send + Sync { pub trait MaintenanceExecutor: Send + Sync { async fn cleanup_expired_sessions(&self) -> usize; - async fn run_memory_maintenance_for_all_scopes( - &self, - updated_since: Option, - ) -> anyhow::Result>; + async fn run_memory_maintenance_for_all_scopes(&self) -> anyhow::Result>; } pub struct Scheduler { @@ -652,9 +649,7 @@ async fn execute_internal_event( Ok(()) } "memory_maintenance" => { - let results = maintenance_executor - .run_memory_maintenance_for_all_scopes(job.last_fired_at) - .await?; + let results = maintenance_executor.run_memory_maintenance_for_all_scopes().await?; for result in &results { tracing::info!( job_id = %job.id, @@ -1039,10 +1034,7 @@ mod tests { 0 } - async fn run_memory_maintenance_for_all_scopes( - &self, - _updated_since: Option, - ) -> anyhow::Result> { + async fn run_memory_maintenance_for_all_scopes(&self) -> anyhow::Result> { Ok(Vec::new()) } }