From 4fb102644e81d580144c39989868e1ab7278472d Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Mon, 27 Apr 2026 13:05:55 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=B4=E6=96=B0=E8=B0=83=E5=BA=A6?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E4=BB=A5=E6=AF=8F4=E5=B0=8F=E6=97=B6?= =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=86=85=E5=AD=98=E7=BB=B4=E6=8A=A4=EF=BC=8C?= =?UTF-8?q?=E5=B9=B6=E6=B7=BB=E5=8A=A0=E6=A0=B9=E6=8D=AE=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E6=97=B6=E9=97=B4=E8=BF=87=E6=BB=A4=E5=86=85=E5=AD=98=E8=8C=83?= =?UTF-8?q?=E5=9B=B4=E7=9A=84=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config/mod.rs | 6 ++-- src/gateway/session.rs | 68 +++++++++++++++++++++++++++++++++++++--- src/scheduler/mod.rs | 7 +++-- src/storage/mod.rs | 70 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 142 insertions(+), 9 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 6a890e0..17403c8 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -311,7 +311,7 @@ impl SchedulerConfig { enabled: true, kind: SchedulerJobKind::InternalEvent, schedule: Some(SchedulerSchedule::Cron { - expression: "0 3 * * *".to_string(), + expression: "0 */4 * * *".to_string(), }), startup_delay_secs: 0, interval_secs: 0, @@ -319,7 +319,7 @@ impl SchedulerConfig { payload: serde_json::json!({ "event": "memory_maintenance", "time_zone": time.timezone, - "local_time": "03:00" + "local_time": "every_4_hours" }), }] } @@ -1146,7 +1146,7 @@ mod tests { assert_eq!( effective_jobs[0].resolved_schedule().unwrap(), SchedulerSchedule::Cron { - expression: "0 3 * * *".to_string(), + expression: "0 */4 * * *".to_string(), } ); } diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 6813540..ec7ed88 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -1168,11 +1168,17 @@ impl SessionManager { pub(crate) async fn run_memory_maintenance_for_all_scopes( &self, + updated_since: Option, ) -> Result, AgentError> { - let scope_keys = self - .store - .list_memory_scope_keys("user") - .map_err(|err| AgentError::Other(format!("list memory scope keys error: {}", err)))?; + let scope_keys = if let Some(cutoff) = updated_since { + self.store + .list_memory_scope_keys_updated_since("user", cutoff) + .map_err(|err| AgentError::Other(format!("list memory scope keys updated since error: {}", err)))? + } else { + self.store + .list_memory_scope_keys("user") + .map_err(|err| AgentError::Other(format!("list memory scope keys error: {}", err)))? + }; let mut results = Vec::new(); for scope_key in scope_keys { @@ -2250,6 +2256,60 @@ mod tests { assert!(output.managed_markdown.contains("### 用户事实")); } + #[tokio::test] + async fn test_run_memory_maintenance_for_all_scopes_returns_empty_when_no_recent_updates() { + let provider_config = LLMProviderConfig { + provider_type: "openai".to_string(), + name: "maintenance-provider".to_string(), + base_url: "http://localhost".to_string(), + api_key: "test-key".to_string(), + extra_headers: HashMap::new(), + model_id: "maintenance-model".to_string(), + temperature: Some(0.0), + max_tokens: Some(256), + model_extra: HashMap::new(), + max_tool_iterations: 1, + llm_timeout_secs: 30, + tool_result_max_chars: 20_000, + context_tool_result_trim_chars: 20_000, + }; + + let session_manager = SessionManager::new( + 4, + 100, + false, + "Asia/Shanghai".to_string(), + provider_config.clone(), + HashMap::from([("default".to_string(), provider_config)]), + Arc::new(SkillRuntime::default()), + ) + .unwrap(); + + let memory = session_manager + .store() + .put_memory(&crate::storage::MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: "feishu:user-1".to_string(), + namespace: "profile".to_string(), + memory_key: "work".to_string(), + content: "用户在做AI产品".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + }) + .unwrap(); + + let results = session_manager + .run_memory_maintenance_for_all_scopes(Some(memory.updated_at + 1)) + .await + .unwrap(); + + assert!(results.is_empty()); + } + #[test] fn test_apply_memory_maintenance_output_merges_and_deletes_low_value_records() { let store = SessionStore::in_memory().unwrap(); diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index e6627bf..b0fe725 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -501,7 +501,9 @@ async fn execute_internal_event(session_manager: &SessionManager, job: &RuntimeJ Ok(()) } "memory_maintenance" => { - let results = session_manager.run_memory_maintenance_for_all_scopes().await?; + let results = session_manager + .run_memory_maintenance_for_all_scopes(job.last_fired_at) + .await?; for result in &results { tracing::info!( job_id = %job.id, @@ -937,9 +939,10 @@ mod tests { saved.schedule, serde_json::json!({ "type": "cron", - "expression": "0 3 * * *" + "expression": "0 */4 * * *" }) ); + assert_eq!(saved.payload.get("local_time").and_then(|value| value.as_str()), Some("every_4_hours")); assert!(saved.next_fire_at.is_some()); } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 570db75..0a0c288 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -925,6 +925,29 @@ impl SessionStore { Ok(scope_keys) } + pub fn list_memory_scope_keys_updated_since( + &self, + scope_kind: &str, + since_timestamp: i64, + ) -> Result, StorageError> { + let conn = self.conn.lock().expect("session db mutex poisoned"); + let mut stmt = conn.prepare( + " + SELECT DISTINCT scope_key + FROM memories + WHERE scope_kind = ?1 AND updated_at > ?2 + ORDER BY scope_key ASC + ", + )?; + + let rows = stmt.query_map(params![scope_kind, since_timestamp], |row| row.get::<_, String>(0))?; + let mut scope_keys = Vec::new(); + for row in rows { + scope_keys.push(row?); + } + Ok(scope_keys) + } + pub fn list_memories_for_scope( &self, scope_kind: &str, @@ -2429,4 +2452,51 @@ mod tests { assert_eq!(fetched.run_count, 1); assert_eq!(fetched.completed_at, Some(1_700_000_000_100)); } + + #[test] + fn test_list_memory_scope_keys_updated_since_filters_recent_scopes() { + let store = SessionStore::in_memory().unwrap(); + + let first = store + .put_memory(&MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: "feishu:user-1".to_string(), + namespace: "profile".to_string(), + memory_key: "work".to_string(), + content: "用户在做AI产品".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + }) + .unwrap(); + + let cutoff = first.updated_at; + + std::thread::sleep(std::time::Duration::from_millis(2)); + + store + .put_memory(&MemoryUpsert { + scope_kind: "user".to_string(), + scope_key: "feishu:user-2".to_string(), + namespace: "preferences".to_string(), + memory_key: "style".to_string(), + content: "偏好简洁表达".to_string(), + source_type: "message".to_string(), + source_session_id: None, + source_message_id: None, + source_message_seq: None, + source_channel_name: None, + source_chat_id: None, + }) + .unwrap(); + + let recent_scope_keys = store + .list_memory_scope_keys_updated_since("user", cutoff) + .unwrap(); + + assert_eq!(recent_scope_keys, vec!["feishu:user-2".to_string()]); + } } \ No newline at end of file