feat: 简化内存维护逻辑,移除不必要的时间戳参数,优化代码可读性
This commit is contained in:
parent
aa7f1d6160
commit
7c48a0f7f9
@ -61,13 +61,8 @@ impl SchedulerMaintenanceService {
|
|||||||
self.session_manager.cleanup_expired_sessions().await
|
self.session_manager.cleanup_expired_sessions().await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_memory_maintenance(
|
async fn run_memory_maintenance(&self) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> {
|
||||||
&self,
|
self.session_manager.run_memory_maintenance_for_all_scopes().await
|
||||||
updated_since: Option<i64>,
|
|
||||||
) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> {
|
|
||||||
self.session_manager
|
|
||||||
.run_memory_maintenance_for_all_scopes(updated_since)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -77,11 +72,8 @@ impl MaintenanceExecutor for SchedulerMaintenanceService {
|
|||||||
self.cleanup_sessions().await
|
self.cleanup_sessions().await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_memory_maintenance_for_all_scopes(
|
async fn run_memory_maintenance_for_all_scopes(&self) -> anyhow::Result<Vec<MaintenanceRunSummary>> {
|
||||||
&self,
|
self.run_memory_maintenance()
|
||||||
updated_since: Option<i64>,
|
|
||||||
) -> anyhow::Result<Vec<MaintenanceRunSummary>> {
|
|
||||||
self.run_memory_maintenance(updated_since)
|
|
||||||
.await
|
.await
|
||||||
.map(|results| {
|
.map(|results| {
|
||||||
results
|
results
|
||||||
|
|||||||
@ -220,22 +220,10 @@ impl MemoryMaintenanceService {
|
|||||||
|
|
||||||
pub(crate) async fn run_for_all_scopes(
|
pub(crate) async fn run_for_all_scopes(
|
||||||
&self,
|
&self,
|
||||||
updated_since: Option<i64>,
|
|
||||||
) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> {
|
) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> {
|
||||||
let scope_keys = if let Some(cutoff) = updated_since {
|
let scope_keys = self.store.list_memory_scope_keys("user").map_err(|err| {
|
||||||
self.store
|
|
||||||
.list_memory_scope_keys_updated_since("user", cutoff)
|
|
||||||
.map_err(|err| {
|
|
||||||
AgentError::Other(format!(
|
|
||||||
"list memory scope keys updated since error: {}",
|
|
||||||
err
|
|
||||||
))
|
|
||||||
})?
|
|
||||||
} else {
|
|
||||||
self.store.list_memory_scope_keys("user").map_err(|err| {
|
|
||||||
AgentError::Other(format!("list memory scope keys error: {}", err))
|
AgentError::Other(format!("list memory scope keys error: {}", err))
|
||||||
})?
|
})?;
|
||||||
};
|
|
||||||
let mut results = Vec::new();
|
let mut results = Vec::new();
|
||||||
|
|
||||||
for scope_key in scope_keys {
|
for scope_key in scope_keys {
|
||||||
|
|||||||
@ -32,9 +32,8 @@ impl MemoryMaintenanceCoordinator {
|
|||||||
|
|
||||||
pub(crate) async fn run_for_all_scopes(
|
pub(crate) async fn run_for_all_scopes(
|
||||||
&self,
|
&self,
|
||||||
updated_since: Option<i64>,
|
|
||||||
) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> {
|
) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> {
|
||||||
self.service()?.run_for_all_scopes(updated_since).await
|
self.service()?.run_for_all_scopes().await
|
||||||
}
|
}
|
||||||
|
|
||||||
fn service(&self) -> Result<MemoryMaintenanceService, AgentError> {
|
fn service(&self) -> Result<MemoryMaintenanceService, AgentError> {
|
||||||
|
|||||||
@ -420,11 +420,8 @@ impl SessionManager {
|
|||||||
|
|
||||||
pub(crate) async fn run_memory_maintenance_for_all_scopes(
|
pub(crate) async fn run_memory_maintenance_for_all_scopes(
|
||||||
&self,
|
&self,
|
||||||
updated_since: Option<i64>,
|
|
||||||
) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> {
|
) -> Result<Vec<MemoryMaintenanceScopeResult>, AgentError> {
|
||||||
self.memory_maintenance
|
self.memory_maintenance.run_for_all_scopes().await
|
||||||
.run_for_all_scopes(updated_since)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 确保 session 存在且未超时,超时则重建
|
/// 确保 session 存在且未超时,超时则重建
|
||||||
@ -1239,18 +1236,34 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn test_run_memory_maintenance_for_all_scopes_returns_empty_when_no_recent_updates() {
|
async fn test_run_memory_maintenance_for_all_scopes_scans_all_scopes_even_without_recent_updates() {
|
||||||
|
let mock_response_content = serde_json::to_string(&json!({
|
||||||
|
"user_facts": ["用户在做AI产品"],
|
||||||
|
"preferences": [],
|
||||||
|
"behavior_patterns": [],
|
||||||
|
"merges": [],
|
||||||
|
"conflicts": [],
|
||||||
|
"low_value_ids": [],
|
||||||
|
"managed_markdown": "### 用户事实\n- 用户在做AI产品"
|
||||||
|
}))
|
||||||
|
.unwrap();
|
||||||
|
let base_url =
|
||||||
|
start_mock_openai_server_with_content(Some(mock_response_content.clone())).await;
|
||||||
|
|
||||||
let provider_config = LLMProviderConfig {
|
let provider_config = LLMProviderConfig {
|
||||||
provider_type: "openai".to_string(),
|
provider_type: "openai".to_string(),
|
||||||
name: "maintenance-provider".to_string(),
|
name: "maintenance-provider".to_string(),
|
||||||
base_url: "http://localhost".to_string(),
|
base_url,
|
||||||
api_key: "test-key".to_string(),
|
api_key: "test-key".to_string(),
|
||||||
extra_headers: HashMap::new(),
|
extra_headers: HashMap::new(),
|
||||||
model_id: "maintenance-model".to_string(),
|
model_id: "maintenance-model".to_string(),
|
||||||
temperature: Some(0.0),
|
temperature: Some(0.0),
|
||||||
max_tokens: Some(256),
|
max_tokens: Some(256),
|
||||||
context_window_tokens: None,
|
context_window_tokens: None,
|
||||||
model_extra: HashMap::new(),
|
model_extra: HashMap::from([(
|
||||||
|
"mock_response_content".to_string(),
|
||||||
|
json!(mock_response_content),
|
||||||
|
)]),
|
||||||
max_tool_iterations: 1,
|
max_tool_iterations: 1,
|
||||||
llm_timeout_secs: 30,
|
llm_timeout_secs: 30,
|
||||||
tool_result_max_chars: 20_000,
|
tool_result_max_chars: 20_000,
|
||||||
@ -1268,7 +1281,7 @@ mod tests {
|
|||||||
)
|
)
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let memory = session_manager
|
session_manager
|
||||||
.store()
|
.store()
|
||||||
.put_memory(&crate::storage::MemoryUpsert {
|
.put_memory(&crate::storage::MemoryUpsert {
|
||||||
scope_kind: "user".to_string(),
|
scope_kind: "user".to_string(),
|
||||||
@ -1286,11 +1299,13 @@ mod tests {
|
|||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
let results = session_manager
|
let results = session_manager
|
||||||
.run_memory_maintenance_for_all_scopes(Some(memory.updated_at + 1))
|
.run_memory_maintenance_for_all_scopes()
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
assert!(results.is_empty());
|
assert_eq!(results.len(), 1);
|
||||||
|
assert_eq!(results[0].scope_key, "feishu:user-1");
|
||||||
|
assert!(results[0].output.user_facts.contains(&"用户在做AI产品".to_string()));
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
@ -1343,7 +1358,7 @@ mod tests {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let results = session_manager
|
let results = session_manager
|
||||||
.run_memory_maintenance_for_all_scopes(None)
|
.run_memory_maintenance_for_all_scopes()
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
|
|
||||||
|
|||||||
@ -52,10 +52,7 @@ pub trait AgentTaskExecutor: Send + Sync {
|
|||||||
pub trait MaintenanceExecutor: Send + Sync {
|
pub trait MaintenanceExecutor: Send + Sync {
|
||||||
async fn cleanup_expired_sessions(&self) -> usize;
|
async fn cleanup_expired_sessions(&self) -> usize;
|
||||||
|
|
||||||
async fn run_memory_maintenance_for_all_scopes(
|
async fn run_memory_maintenance_for_all_scopes(&self) -> anyhow::Result<Vec<MaintenanceRunSummary>>;
|
||||||
&self,
|
|
||||||
updated_since: Option<i64>,
|
|
||||||
) -> anyhow::Result<Vec<MaintenanceRunSummary>>;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Scheduler {
|
pub struct Scheduler {
|
||||||
@ -652,9 +649,7 @@ async fn execute_internal_event(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
"memory_maintenance" => {
|
"memory_maintenance" => {
|
||||||
let results = maintenance_executor
|
let results = maintenance_executor.run_memory_maintenance_for_all_scopes().await?;
|
||||||
.run_memory_maintenance_for_all_scopes(job.last_fired_at)
|
|
||||||
.await?;
|
|
||||||
for result in &results {
|
for result in &results {
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
job_id = %job.id,
|
job_id = %job.id,
|
||||||
@ -1039,10 +1034,7 @@ mod tests {
|
|||||||
0
|
0
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_memory_maintenance_for_all_scopes(
|
async fn run_memory_maintenance_for_all_scopes(&self) -> anyhow::Result<Vec<MaintenanceRunSummary>> {
|
||||||
&self,
|
|
||||||
_updated_since: Option<i64>,
|
|
||||||
) -> anyhow::Result<Vec<MaintenanceRunSummary>> {
|
|
||||||
Ok(Vec::new())
|
Ok(Vec::new())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user