From e2fd8367946aa513c83bb1187034e4e48e2c0142 Mon Sep 17 00:00:00 2001 From: xiaoxixi Date: Tue, 28 Apr 2026 23:10:53 +0800 Subject: [PATCH] =?UTF-8?q?fix(session):=20=E6=B6=88=E6=81=AF=E6=8C=81?= =?UTF-8?q?=E4=B9=85=E5=8C=96=E5=92=8C=E6=81=A2=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 问题:消息只写内存,重启后丢失。from_storage 恢复了 meta 但没加载消息。 修复: - add_message 改为 async,同时写内存和 Storage - compact 场景用 persist=false 避免重复持久化 - clear_session_history 同时清内存和 Storage 消息 - 所有调用点添加 .await 和错误转换 --- src/session/session.rs | 84 ++++++++++++++++++++---------------------- 1 file changed, 40 insertions(+), 44 deletions(-) diff --git a/src/session/session.rs b/src/session/session.rs index 29f7e8f..f336ca1 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -163,27 +163,9 @@ impl Session { self.id.to_string() } - /// 添加消息到历史(仅内存,Phase 3 会扩展为持久化) - pub fn add_message(&mut self, message: ChatMessage) { - let is_user = message.role == "user"; - let now = chrono::Utc::now().timestamp_millis(); - - // Assign seq (in-memory only, persistence in Phase 3) - let _seq = self.seq_counter; - self.seq_counter += 1; - - // Update in-memory state - self.messages.push(message); - self.total_message_count += 1; - if is_user { - self.message_count += 1; - } - self.last_active_at = now; - } - - /// 添加消息到历史并持久化到 Storage(Phase 3 使用) - /// 目前 storage 为 None,此方法退化为 add_message - pub async fn add_message_and_persist(&mut self, message: ChatMessage) -> Result<(), StorageError> { + /// 添加消息到历史并持久化到 Storage + /// 如果 `persist` 为 false,只更新内存(用于 compaction 场景) + pub async fn add_message(&mut self, message: ChatMessage, persist: bool) -> Result<(), StorageError> { let is_user = message.role == "user"; let now = chrono::Utc::now().timestamp_millis(); @@ -191,25 +173,27 @@ impl Session { let seq = self.seq_counter; self.seq_counter += 1; - // Persist to Storage (currently None, wired up in Phase 3) - if let Some(ref storage) = self.storage { - let msg_meta = crate::storage::message::MessageMeta { - id: message.id.clone(), - session_id: self.id.to_string(), - seq, - role: message.role.clone(), - content: message.content.clone(), - media_refs: if message.media_refs.is_empty() { - None - } else { - Some(serde_json::to_string(&message.media_refs).unwrap_or_default()) - }, - tool_call_id: message.tool_call_id.clone(), - tool_name: message.tool_name.clone(), - tool_calls: message.tool_calls.as_ref().map(|tc| serde_json::to_string(tc).unwrap_or_default()), - created_at: now, - }; - storage.append_message_with_retry(&self.id.to_string(), &msg_meta).await?; + // Persist to Storage + if persist { + if let Some(ref storage) = self.storage { + let msg_meta = crate::storage::message::MessageMeta { + id: message.id.clone(), + session_id: self.id.to_string(), + seq, + role: message.role.clone(), + content: message.content.clone(), + media_refs: if message.media_refs.is_empty() { + None + } else { + Some(serde_json::to_string(&message.media_refs).unwrap_or_default()) + }, + tool_call_id: message.tool_call_id.clone(), + tool_name: message.tool_name.clone(), + tool_calls: message.tool_calls.as_ref().map(|tc| serde_json::to_string(tc).unwrap_or_default()), + created_at: now, + }; + storage.append_message_with_retry(&self.id.to_string(), &msg_meta).await?; + } } // Update in-memory state @@ -757,7 +741,8 @@ impl SessionManager { let compressed_count = compressed.len(); session_guard.clear_history(); for msg in compressed { - session_guard.add_message(msg); + session_guard.add_message(msg, false).await + .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; } Ok((None, format!( "Context compressed: {} → {} messages.", @@ -1125,7 +1110,8 @@ impl SessionManager { } let user_message = session_guard.create_user_message(content, media_refs); - session_guard.add_message(user_message); + session_guard.add_message(user_message, true).await + .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; let mut history = session_guard.get_history().to_vec(); @@ -1145,7 +1131,8 @@ impl SessionManager { let result = agent.process(history).await?; for msg in result.emitted_messages { - session_guard.add_message(msg); + session_guard.add_message(msg, true).await + .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; } result.final_response.content @@ -1165,7 +1152,16 @@ impl SessionManager { pub async fn clear_session_history(&self, unified_id: &UnifiedSessionId) -> Result<(), AgentError> { let session = self.get_or_create_session(unified_id).await?; let mut session_guard = session.lock().await; - session_guard.clear_history(); + // Clear in-memory + session_guard.messages.clear(); + session_guard.seq_counter = 1; + session_guard.total_message_count = 0; + session_guard.message_count = 0; + // Clear Storage + if let Some(ref storage) = session_guard.storage { + storage.clear_messages(&session_guard.id.to_string()).await + .map_err(|e| AgentError::Other(format!("failed to clear messages: {}", e)))?; + } Ok(()) } }