From ad7fa70a026affede29b5216a05ef31a3ff57dc8 Mon Sep 17 00:00:00 2001 From: xiaoxixi Date: Tue, 16 Jun 2026 22:47:07 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96session=E7=9A=84=E9=94=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/session/session.rs | 663 +++++++++++++++++++++++++++-------------- 1 file changed, 433 insertions(+), 230 deletions(-) diff --git a/src/session/session.rs b/src/session/session.rs index 5d90819..9d68a9c 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -8,6 +8,13 @@ use crate::mcp::get_mcp_status; use crate::storage::{Storage, StorageError}; use std::sync::Arc as StdArc; +type MessagePersistSnapshot = ( + StdArc, + String, + crate::storage::message::MessageMeta, + crate::storage::session::SessionMeta, +); + tokio::task_local! { static CURRENT_SOURCE_SESSION: Option; } @@ -82,6 +89,14 @@ pub struct Session { current_cancel: Option>, /// Monotonic counter to detect stale workers worker_generation: u64, + /// Monotonic counter for in-memory session mutations. + /// + /// Slow work such as memory recall, compression, and title generation runs + /// outside the session lock. Workers capture this version before starting + /// that work and verify it before committing results, so stale snapshots do + /// not overwrite a session that was changed by a command such as /clear or + /// /delete while the slow work was in flight. + state_version: u64, } /// A task to be processed by the per-session agent worker @@ -146,6 +161,7 @@ impl Session { agent_tx: None, current_cancel: None, worker_generation: 0, + state_version: 0, }) } @@ -322,6 +338,7 @@ impl Session { agent_tx: None, current_cancel: None, worker_generation: 0, + state_version: 0, }) } @@ -337,6 +354,15 @@ impl Session { message: ChatMessage, persist: bool, ) -> Result<(), StorageError> { + let snapshot = self.add_message_in_memory(message, persist); + persist_added_message(snapshot).await + } + + fn add_message_in_memory( + &mut self, + message: ChatMessage, + persist: bool, + ) -> Option { let is_user = message.role == "user"; let now = chrono::Utc::now().timestamp_millis(); @@ -344,36 +370,37 @@ impl Session { let seq = self.seq_counter; self.seq_counter += 1; - // Persist to Storage - if persist && let Some(ref storage) = self.storage { - let msg_meta = crate::storage::message::MessageMeta { - id: message.id.clone(), - session_id: self.id.to_string(), - seq, - role: message.role.clone(), - content: message.content.clone(), - reasoning_content: message.reasoning_content.clone(), - media_refs: if message.media_refs.is_empty() { - None - } else { - Some(serde_json::to_string(&message.media_refs).unwrap_or_default()) - }, - tool_call_id: message.tool_call_id.clone(), - tool_name: message.tool_name.clone(), - tool_calls: message - .tool_calls - .as_ref() - .and_then(|tc| serde_json::to_string(tc).ok()), - source: message - .source - .as_ref() - .map(|s| serde_json::to_string(s).unwrap_or_default()), - created_at: now, - }; - storage - .append_message_with_retry(&self.id.to_string(), &msg_meta) - .await?; - } + let persist_snapshot = if persist { + self.storage.clone().map(|storage| { + let msg_meta = crate::storage::message::MessageMeta { + id: message.id.clone(), + session_id: self.id.to_string(), + seq, + role: message.role.clone(), + content: message.content.clone(), + reasoning_content: message.reasoning_content.clone(), + media_refs: if message.media_refs.is_empty() { + None + } else { + Some(serde_json::to_string(&message.media_refs).unwrap_or_default()) + }, + tool_call_id: message.tool_call_id.clone(), + tool_name: message.tool_name.clone(), + tool_calls: message + .tool_calls + .as_ref() + .and_then(|tc| serde_json::to_string(tc).ok()), + source: message + .source + .as_ref() + .map(|s| serde_json::to_string(s).unwrap_or_default()), + created_at: now, + }; + (storage, self.id.to_string(), msg_meta) + }) + } else { + None + }; // Update in-memory state self.messages.push(message); @@ -382,16 +409,30 @@ impl Session { self.message_count += 1; } self.last_active_at = now; + self.state_version = self.state_version.wrapping_add(1); - // Sync message_count to Storage - if persist { - tracing::debug!(session_id = %self.id, last_active_at = %now, message_count = %self.message_count, "Persisting session meta after add_message"); - if let Err(e) = self.persist_session_meta().await { - tracing::warn!("failed to persist session meta: {}", e); - } - } - - Ok(()) + persist_snapshot.map(|(storage, session_id, msg_meta)| { + let session_meta = crate::storage::session::SessionMeta { + id: session_id.clone(), + channel: self.id.channel.clone(), + chat_id: self.id.chat_id.clone(), + dialog_id: self.id.dialog_id.clone(), + title: self.title.clone(), + created_at: self.created_at, + last_active_at: self.last_active_at, + message_count: self.message_count, + routing_info: if self.routing_info.is_empty() { + None + } else { + Some(self.routing_info.clone()) + }, + archived_at: self.archived_at, + deleted_at: None, + last_consolidated_at: self.last_consolidated_at, + last_compressed_message_at: self.last_compressed_message_at, + }; + (storage, session_id, msg_meta, session_meta) + }) } /// 获取消息历史 @@ -406,6 +447,7 @@ impl Session { self.seq_counter = 1; self.total_message_count = 0; self.message_count = 0; + self.state_version = self.state_version.wrapping_add(1); #[cfg(debug_assertions)] tracing::debug!(session_id = %self.id, previous_len = len, "Chat history cleared"); } @@ -417,6 +459,7 @@ impl Session { self.seq_counter = 1; self.total_message_count = 0; self.message_count = 0; + self.state_version = self.state_version.wrapping_add(1); #[cfg(debug_assertions)] tracing::debug!(session_id = %self.id, previous_len = len, "Chat context reset in memory"); } @@ -444,43 +487,49 @@ impl Session { /// 将 session 元数据写回 Storage pub async fn persist_session_meta(&self) -> Result<(), StorageError> { - if let Some(ref storage) = self.storage { - let meta = crate::storage::session::SessionMeta { - id: self.id.to_string(), - channel: self.id.channel.clone(), - chat_id: self.id.chat_id.clone(), - dialog_id: self.id.dialog_id.clone(), - title: self.title.clone(), - created_at: self.created_at, - last_active_at: self.last_active_at, - message_count: self.message_count, - routing_info: if self.routing_info.is_empty() { - None - } else { - Some(self.routing_info.clone()) - }, - archived_at: self.archived_at, - deleted_at: None, - last_consolidated_at: self.last_consolidated_at, - last_compressed_message_at: self.last_compressed_message_at, - }; + if let Some((storage, meta)) = self.session_meta_snapshot() { storage.upsert_session(&meta).await?; } Ok(()) } + fn session_meta_snapshot( + &self, + ) -> Option<(StdArc, crate::storage::session::SessionMeta)> { + let storage = self.storage.clone()?; + let meta = crate::storage::session::SessionMeta { + id: self.id.to_string(), + channel: self.id.channel.clone(), + chat_id: self.id.chat_id.clone(), + dialog_id: self.id.dialog_id.clone(), + title: self.title.clone(), + created_at: self.created_at, + last_active_at: self.last_active_at, + message_count: self.message_count, + routing_info: if self.routing_info.is_empty() { + None + } else { + Some(self.routing_info.clone()) + }, + archived_at: self.archived_at, + deleted_at: None, + last_consolidated_at: self.last_consolidated_at, + last_compressed_message_at: self.last_compressed_message_at, + }; + Some((storage, meta)) + } + /// 检查是否需要自动生成 title(5 条用户消息后) pub fn should_generate_title(&self) -> bool { self.title == "新对话" && self.message_count >= 5 } - /// 生成标题(调用 LLM) - pub async fn generate_title(&mut self) -> Result<(), AgentError> { + fn title_prompt_snapshot(&self) -> Option { if !self.should_generate_title() { - return Ok(()); + return None; } - let prompt = format!( + Some(format!( r#"给定以下对话历史,生成一个简短的会话标题(5-15 个中文字符),概括这个对话的核心内容或用户的主要需求。只返回一个标题,不要解释。 历史: @@ -492,38 +541,41 @@ impl Session { .map(|m| format!("[{}]: {}", m.role, m.content)) .collect::>() .join("\n") - ); - - let title = self.call_llm_for_title(&prompt).await?; - - if !title.is_empty() { - self.title = title.clone(); - if let Err(e) = self.persist_session_meta().await { - tracing::warn!("failed to persist title: {}", e); - } - } - - Ok(()) + )) } - /// 调用 LLM 生成标题 - async fn call_llm_for_title(&self, prompt: &str) -> Result { - use crate::providers::{ChatCompletionRequest, ChatCompletionResponse, Message}; + fn apply_generated_title(&mut self, title: String) -> bool { + if title.is_empty() || !self.should_generate_title() { + return false; + } - let request = ChatCompletionRequest { - messages: vec![Message::user(prompt.to_string())], - temperature: Some(0.3), - max_tokens: Some(20), - tools: None, + self.title = title; + self.state_version = self.state_version.wrapping_add(1); + true + } + + fn fresh_context_compressor(&self) -> ContextCompressor { + let compressor_config = ContextCompressionConfig { + protect_first_n: 2, + ..Default::default() }; + let mut compressor = ContextCompressor::with_config( + self.provider.clone(), + self.provider_config.token_limit, + compressor_config, + self.memory_manager.clone(), + ); + compressor.set_session_id(Some(self.id.to_string())); + compressor + } - let response: ChatCompletionResponse = self - .provider - .chat(request) - .await - .map_err(|e| AgentError::Other(format!("LLM call failed: {}", e)))?; - - Ok(response.content.trim().to_string()) + fn replace_history_in_memory(&mut self, messages: Vec) { + self.messages = messages; + self.seq_counter = self.messages.len() as i64 + 1; + self.total_message_count = self.messages.len() as i64; + self.message_count = self.messages.iter().filter(|m| m.role == "user").count() as i64; + self.last_active_at = chrono::Utc::now().timestamp_millis(); + self.state_version = self.state_version.wrapping_add(1); } /// 获取 provider_config 引用 @@ -1075,24 +1127,39 @@ impl SessionManager { "compact" => { if let Some(sid) = current_session_id { let session = self.get_or_create_session(sid).await?; - let mut session_guard = session.lock().await; - let original_count = session_guard.get_history().len(); - let history = session_guard.get_history().to_vec(); - let result = session_guard.compressor.compress_if_needed(history).await?; + let (original_count, history, mut compressor, base_version) = { + let session_guard = session.lock().await; + ( + session_guard.get_history().len(), + session_guard.get_history().to_vec(), + session_guard.fresh_context_compressor(), + session_guard.state_version, + ) + }; + + let result = compressor.compress_if_needed(history).await?; let compressed_count = result.history.len(); - if result.created_timelines { - session_guard.last_compressed_message_at = - Some(chrono::Utc::now().timestamp_millis()); - if let Err(e) = session_guard.persist_session_meta().await { - tracing::warn!(error = %e, "Failed to persist compression marker after /compact"); + let meta_snapshot = { + let mut session_guard = session.lock().await; + if session_guard.state_version != base_version { + return Ok(( + None, + "Context changed while compacting; please run /compact again." + .to_string(), + )); } - } - session_guard.clear_history(); - for msg in result.history { - session_guard - .add_message(msg, false) - .await - .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; + if result.created_timelines { + session_guard.last_compressed_message_at = + Some(chrono::Utc::now().timestamp_millis()); + } + session_guard.replace_history_in_memory(result.history); + session_guard.session_meta_snapshot() + }; + + if let Some((storage, meta)) = meta_snapshot + && let Err(e) = storage.upsert_session(&meta).await + { + tracing::warn!(error = %e, "Failed to persist compression marker after /compact"); } Ok(( None, @@ -1304,16 +1371,22 @@ impl SessionManager { let sid = current_session_id .ok_or_else(|| AgentError::Other("no active session".to_string()))?; let session = self.get_or_create_session(sid).await?; - let mut guard = session.lock().await; - let mut msgs: Vec = Vec::new(); - if guard.current_cancel.take().is_some() { - msgs.push("当前任务已发送停止信号。".to_string()); - } - if guard.agent_tx.take().is_some() { - msgs.push("消息队列已清空。".to_string()); - } - guard.worker_generation = guard.worker_generation.wrapping_add(1); + let msgs = { + let mut guard = session.lock().await; + let mut msgs: Vec = Vec::new(); + if guard.current_cancel.take().is_some() { + msgs.push("当前任务已发送停止信号。".to_string()); + } + if guard.agent_tx.take().is_some() { + msgs.push("消息队列已清空。".to_string()); + } + guard.worker_generation = guard.worker_generation.wrapping_add(1); + guard.state_version = guard.state_version.wrapping_add(1); + msgs + }; + // Cancel all running background sub-agent tasks for this session + // after releasing the session lock. self.sub_agent_manager .cancel_by_session(&sid.to_string()) .await; @@ -1670,7 +1743,7 @@ impl SessionManager { ) -> Result<(), AgentError> { let unified_id = self.resolve_dialog_id(channel, chat_id).await?; let session = self.get_or_create_session(&unified_id).await?; - { + let persist_snapshot = { let mut guard = session.lock().await; let source = MessageSource { kind: SourceKind::SystemNotification, @@ -1681,11 +1754,11 @@ impl SessionManager { task_id: task_id.map(|s| s.to_string()), }; let msg = ChatMessage::assistant_with_source(content, source); - guard - .add_message(msg, true) - .await - .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; - } + guard.add_message_in_memory(msg, true) + }; + persist_added_message(persist_snapshot) + .await + .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; let outbound = OutboundMessage { channel: channel.to_string(), @@ -1805,6 +1878,63 @@ impl SessionManager { } } +async fn maybe_generate_title_outside_lock(session: Arc>) -> Result<(), AgentError> { + use crate::providers::{ChatCompletionRequest, ChatCompletionResponse, Message}; + + let (provider, prompt) = { + let guard = session.lock().await; + let Some(prompt) = guard.title_prompt_snapshot() else { + return Ok(()); + }; + (guard.provider.clone(), prompt) + }; + + let request = ChatCompletionRequest { + messages: vec![Message::user(prompt)], + temperature: Some(0.3), + max_tokens: Some(20), + tools: None, + }; + + let response: ChatCompletionResponse = provider + .chat(request) + .await + .map_err(|e| AgentError::Other(format!("LLM call failed: {}", e)))?; + let title = response.content.trim().to_string(); + + let meta_snapshot = { + let mut guard = session.lock().await; + if guard.apply_generated_title(title) { + guard.session_meta_snapshot() + } else { + None + } + }; + + if let Some((storage, meta)) = meta_snapshot { + storage + .upsert_session(&meta) + .await + .map_err(|e| AgentError::Other(format!("failed to persist title: {}", e)))?; + } + + Ok(()) +} + +async fn persist_added_message( + snapshot: Option, +) -> Result<(), StorageError> { + let Some((storage, session_id, msg_meta, session_meta)) = snapshot else { + return Ok(()); + }; + + storage + .append_message_with_retry(&session_id, &msg_meta) + .await?; + storage.upsert_session(&session_meta).await?; + Ok(()) +} + fn spawn_agent_worker( mut task_rx: mpsc::UnboundedReceiver, session: Arc>, @@ -1845,8 +1975,12 @@ fn spawn_agent_worker( }); } - // Phase 1: prepare data under session lock - let (agent, history_out, system_prompt_out, cancel_rx) = { + // Phase 1: capture a stable session snapshot under lock. + // Memory recall and compression happen outside this block so + // /stop and other commands are not blocked behind slow I/O or + // LLM-backed compaction. + let skills_prompt = skills_loader.build_skills_prompt(); + let (agent, history_raw, mut compressor, base_version, cancel_rx) = { let mut guard = session.lock().await; if guard.worker_generation != worker_gen { @@ -1857,7 +1991,9 @@ fn spawn_agent_worker( task.media.iter().map(|m| m.to_media_ref()).collect(); let user_message = guard.create_user_message(&task.content, media_refs); - if let Err(e) = guard.add_message(user_message, true).await { + let user_persist = guard.add_message_in_memory(user_message, true); + drop(guard); + if let Err(e) = persist_added_message(user_persist).await { tracing::error!(error = %e, "Failed to persist user message"); let err_outbound = OutboundMessage { channel: task_chan.clone(), @@ -1871,61 +2007,12 @@ fn spawn_agent_worker( let _ = bus.publish_outbound(err_outbound).await; return; } + let mut guard = session.lock().await; + if guard.worker_generation != worker_gen { + return; + } let history_raw = guard.get_history().to_vec(); - let skills_prompt = skills_loader.build_skills_prompt(); - - let memory_context = match memory_manager - .recall( - &task.content, - 5, - Some(crate::memory::MemoryCategory::Knowledge), - None, - ) - .await - { - Ok(entries) if !entries.is_empty() => Some( - entries - .iter() - .map(|e| format!("- {}: {}", e.key, e.content)) - .collect::>() - .join("\n"), - ), - Err(e) => { - tracing::warn!(error = %e, "Failed to fetch memory context"); - None - } - _ => None, - }; - - let system_prompt = guard - .build_system_prompt(&skills_prompt, memory_context.as_deref()); - - let result = guard - .compressor - .compress_if_needed(history_raw) - .await - .map(|r| { - if r.created_timelines { - guard.last_compressed_message_at = - Some(chrono::Utc::now().timestamp_millis()); - } - r.history - }) - .unwrap_or_else(|e| { - tracing::warn!( - error = %e, - "Context compression failed in worker" - ); - guard.get_history().to_vec() - }); - - let mut history = result; - history.insert(0, ChatMessage::system(system_prompt.clone())); - - let now = chrono::Utc::now().timestamp_millis(); - guard.last_consolidated_at = Some(now); - let _ = guard.persist_session_meta().await; let agent = match guard.create_agent_with_notify(notify_tx) { Ok(a) => a, @@ -1952,9 +2039,87 @@ fn spawn_agent_worker( } guard.current_cancel = Some(cancel_tx); - (agent, history, system_prompt, cancel_rx) + ( + agent, + history_raw, + guard.fresh_context_compressor(), + guard.state_version, + cancel_rx, + ) }; // lock released + let memory_context = match memory_manager + .recall( + &task.content, + 5, + Some(crate::memory::MemoryCategory::Knowledge), + None, + ) + .await + { + Ok(entries) if !entries.is_empty() => Some( + entries + .iter() + .map(|e| format!("- {}: {}", e.key, e.content)) + .collect::>() + .join("\n"), + ), + Err(e) => { + tracing::warn!(error = %e, "Failed to fetch memory context"); + None + } + _ => None, + }; + + let system_prompt_out = { + let guard = session.lock().await; + if guard.worker_generation != worker_gen { + return; + } + guard.build_system_prompt(&skills_prompt, memory_context.as_deref()) + }; + + let compression_result = compressor.compress_if_needed(history_raw).await; + let mut history_out = match compression_result { + Ok(result) => { + let meta_snapshot = { + let mut guard = session.lock().await; + if guard.worker_generation != worker_gen { + return; + } + if guard.state_version != base_version { + tracing::warn!( + session_id = %guard.id, + "Session changed while preparing agent history; dropping stale task" + ); + return; + } + if result.created_timelines { + guard.last_compressed_message_at = + Some(chrono::Utc::now().timestamp_millis()); + } + guard.last_consolidated_at = + Some(chrono::Utc::now().timestamp_millis()); + guard.session_meta_snapshot() + }; + if let Some((storage, meta)) = meta_snapshot + && let Err(e) = storage.upsert_session(&meta).await + { + tracing::warn!(error = %e, "Failed to persist session meta after compression"); + } + result.history + } + Err(e) => { + tracing::warn!(error = %e, "Context compression failed in worker"); + let guard = session.lock().await; + if guard.worker_generation != worker_gen { + return; + } + guard.get_history().to_vec() + } + }; + history_out.insert(0, ChatMessage::system(system_prompt_out.clone())); + // Phase 2 + 3: LLM call with cancellation let session2 = session.clone(); let bus2 = bus.clone(); @@ -1975,8 +2140,8 @@ fn spawn_agent_worker( Err(AgentError::LlmError(ref msg)) if is_context_overflow_error(msg) => { - let retry_history = { - let mut guard = session2.lock().await; + let (raw, mut retry_compressor, retry_base_version, new_window) = { + let guard = session2.lock().await; let new_window = crate::agent::ContextCompressor::parse_context_limit_from_error(msg) .unwrap_or(guard.compressor_threshold()); @@ -1985,31 +2150,56 @@ fn spawn_agent_worker( error = %msg, "Context overflow in worker — retrying" ); + ( + guard.get_history().to_vec(), + guard.fresh_context_compressor(), + guard.state_version, + new_window, + ) + }; + retry_compressor.set_context_window(new_window); + let retry_result = + match retry_compressor.compress_if_needed(raw).await { + Ok(r) => r, + Err(e) => { + tracing::error!(error = %e, "Retry compression failed"); + let err_outbound = OutboundMessage { + channel: chan2, + chat_id: cid2, + content: "Context overflow handling failed." + .to_string(), + reply_to: None, + media: vec![], + metadata: HashMap::new(), + }; + let _ = bus2.publish_outbound(err_outbound).await; + return; + } + }; + + let meta_snapshot = { + let mut guard = session2.lock().await; + if guard.state_version != retry_base_version { + tracing::warn!( + session_id = %guard.id, + "Session changed while retry-compressing after context overflow" + ); + return; + } guard.compressor.set_context_window(new_window); - let raw = guard.get_history().to_vec(); - let retry_result = - match guard.compressor.compress_if_needed(raw).await { - Ok(r) => r, - Err(e) => { - tracing::error!(error = %e, "Retry compression failed"); - let err_outbound = OutboundMessage { - channel: chan2, - chat_id: cid2, - content: "Context overflow handling failed." - .to_string(), - reply_to: None, - media: vec![], - metadata: HashMap::new(), - }; - let _ = bus2.publish_outbound(err_outbound).await; - return; - } - }; if retry_result.created_timelines { guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis()); - let _ = guard.persist_session_meta().await; } + guard.session_meta_snapshot() + }; + if let Some((storage, meta)) = meta_snapshot + && let Err(e) = storage.upsert_session(&meta).await + { + tracing::warn!(error = %e, "Failed to persist session meta after retry compression"); + } + + let retry_history = { let mut retry = retry_result.history; retry.insert( 0, @@ -2055,20 +2245,24 @@ fn spawn_agent_worker( let response = { let mut guard = session2.lock().await; + let mut persist_snapshots = Vec::new(); for msg in result.emitted_messages { - guard.add_message(msg, true).await.inspect_err(|e| { - tracing::error!(error = %e, "Failed to persist message") - }).ok(); - } - if guard.should_generate_title() - && let Err(e) = guard.generate_title().await - { - tracing::warn!("failed to generate title: {}", e); + persist_snapshots.push(guard.add_message_in_memory(msg, true)); } let sent_count = guard.messages.len(); guard.compressor.set_last_api_info(sent_count, result.total_tokens); - result.final_response.content + (result.final_response.content, persist_snapshots) }; + let (response, persist_snapshots) = response; + for snapshot in persist_snapshots { + if let Err(e) = persist_added_message(snapshot).await { + tracing::error!(error = %e, "Failed to persist message"); + } + } + + if let Err(e) = maybe_generate_title_outside_lock(session2.clone()).await { + tracing::warn!("failed to generate title: {}", e); + } let outbound = OutboundMessage { channel: chan2, @@ -2158,25 +2352,34 @@ impl SessionManager { unified_id: &UnifiedSessionId, ) -> Result<(), AgentError> { let session = self.get_or_create_session(unified_id).await?; - let mut session_guard = session.lock().await; - // Clear in-memory - session_guard.messages.clear(); - session_guard.seq_counter = 1; - session_guard.total_message_count = 0; - session_guard.message_count = 0; - session_guard.last_consolidated_at = None; - session_guard.last_compressed_message_at = None; - // Clear Storage - if let Some(ref storage) = session_guard.storage { + let (storage, session_id, meta_snapshot) = { + let mut session_guard = session.lock().await; + // Clear in-memory + session_guard.messages.clear(); + session_guard.seq_counter = 1; + session_guard.total_message_count = 0; + session_guard.message_count = 0; + session_guard.last_consolidated_at = None; + session_guard.last_compressed_message_at = None; + session_guard.state_version = session_guard.state_version.wrapping_add(1); + ( + session_guard.storage.clone(), + session_guard.id.to_string(), + session_guard.session_meta_snapshot(), + ) + }; + // Clear Storage outside the session lock. + if let Some(storage) = storage { storage - .clear_messages(&session_guard.id.to_string()) + .clear_messages(&session_id) .await .map_err(|e| AgentError::Other(format!("failed to clear messages: {}", e)))?; } - session_guard - .persist_session_meta() - .await - .map_err(|e| AgentError::Other(format!("failed to persist cleared session: {}", e)))?; + if let Some((storage, meta)) = meta_snapshot { + storage.upsert_session(&meta).await.map_err(|e| { + AgentError::Other(format!("failed to persist cleared session: {}", e)) + })?; + } Ok(()) } } @@ -2235,14 +2438,14 @@ impl OutboundMessenger for SessionManager { }; // Write source-tagged assistant message to target session history - { + let persist_snapshot = { let mut guard = session.lock().await; let msg = ChatMessage::assistant_with_source(marked_content.clone(), source); - guard - .add_message(msg, true) - .await - .map_err(|e| e.to_string())?; - } + guard.add_message_in_memory(msg, true) + }; + persist_added_message(persist_snapshot) + .await + .map_err(|e| e.to_string())?; // Restore active dialog if source and target share channel:chat_id but differ in dialog_id if let Some(ref origin_id) = origin_id {