优化session的锁

This commit is contained in:
xiaoxixi 2026-06-16 22:47:07 +08:00
parent 48c8a51d9a
commit ad7fa70a02

View File

@ -8,6 +8,13 @@ use crate::mcp::get_mcp_status;
use crate::storage::{Storage, StorageError}; use crate::storage::{Storage, StorageError};
use std::sync::Arc as StdArc; use std::sync::Arc as StdArc;
type MessagePersistSnapshot = (
StdArc<Storage>,
String,
crate::storage::message::MessageMeta,
crate::storage::session::SessionMeta,
);
tokio::task_local! { tokio::task_local! {
static CURRENT_SOURCE_SESSION: Option<String>; static CURRENT_SOURCE_SESSION: Option<String>;
} }
@ -82,6 +89,14 @@ pub struct Session {
current_cancel: Option<oneshot::Sender<()>>, current_cancel: Option<oneshot::Sender<()>>,
/// Monotonic counter to detect stale workers /// Monotonic counter to detect stale workers
worker_generation: u64, worker_generation: u64,
/// Monotonic counter for in-memory session mutations.
///
/// Slow work such as memory recall, compression, and title generation runs
/// outside the session lock. Workers capture this version before starting
/// that work and verify it before committing results, so stale snapshots do
/// not overwrite a session that was changed by a command such as /clear or
/// /delete while the slow work was in flight.
state_version: u64,
} }
/// A task to be processed by the per-session agent worker /// A task to be processed by the per-session agent worker
@ -146,6 +161,7 @@ impl Session {
agent_tx: None, agent_tx: None,
current_cancel: None, current_cancel: None,
worker_generation: 0, worker_generation: 0,
state_version: 0,
}) })
} }
@ -322,6 +338,7 @@ impl Session {
agent_tx: None, agent_tx: None,
current_cancel: None, current_cancel: None,
worker_generation: 0, worker_generation: 0,
state_version: 0,
}) })
} }
@ -337,6 +354,15 @@ impl Session {
message: ChatMessage, message: ChatMessage,
persist: bool, persist: bool,
) -> Result<(), StorageError> { ) -> Result<(), StorageError> {
let snapshot = self.add_message_in_memory(message, persist);
persist_added_message(snapshot).await
}
fn add_message_in_memory(
&mut self,
message: ChatMessage,
persist: bool,
) -> Option<MessagePersistSnapshot> {
let is_user = message.role == "user"; let is_user = message.role == "user";
let now = chrono::Utc::now().timestamp_millis(); let now = chrono::Utc::now().timestamp_millis();
@ -344,36 +370,37 @@ impl Session {
let seq = self.seq_counter; let seq = self.seq_counter;
self.seq_counter += 1; self.seq_counter += 1;
// Persist to Storage let persist_snapshot = if persist {
if persist && let Some(ref storage) = self.storage { self.storage.clone().map(|storage| {
let msg_meta = crate::storage::message::MessageMeta { let msg_meta = crate::storage::message::MessageMeta {
id: message.id.clone(), id: message.id.clone(),
session_id: self.id.to_string(), session_id: self.id.to_string(),
seq, seq,
role: message.role.clone(), role: message.role.clone(),
content: message.content.clone(), content: message.content.clone(),
reasoning_content: message.reasoning_content.clone(), reasoning_content: message.reasoning_content.clone(),
media_refs: if message.media_refs.is_empty() { media_refs: if message.media_refs.is_empty() {
None None
} else { } else {
Some(serde_json::to_string(&message.media_refs).unwrap_or_default()) Some(serde_json::to_string(&message.media_refs).unwrap_or_default())
}, },
tool_call_id: message.tool_call_id.clone(), tool_call_id: message.tool_call_id.clone(),
tool_name: message.tool_name.clone(), tool_name: message.tool_name.clone(),
tool_calls: message tool_calls: message
.tool_calls .tool_calls
.as_ref() .as_ref()
.and_then(|tc| serde_json::to_string(tc).ok()), .and_then(|tc| serde_json::to_string(tc).ok()),
source: message source: message
.source .source
.as_ref() .as_ref()
.map(|s| serde_json::to_string(s).unwrap_or_default()), .map(|s| serde_json::to_string(s).unwrap_or_default()),
created_at: now, created_at: now,
}; };
storage (storage, self.id.to_string(), msg_meta)
.append_message_with_retry(&self.id.to_string(), &msg_meta) })
.await?; } else {
} None
};
// Update in-memory state // Update in-memory state
self.messages.push(message); self.messages.push(message);
@ -382,16 +409,30 @@ impl Session {
self.message_count += 1; self.message_count += 1;
} }
self.last_active_at = now; self.last_active_at = now;
self.state_version = self.state_version.wrapping_add(1);
// Sync message_count to Storage persist_snapshot.map(|(storage, session_id, msg_meta)| {
if persist { let session_meta = crate::storage::session::SessionMeta {
tracing::debug!(session_id = %self.id, last_active_at = %now, message_count = %self.message_count, "Persisting session meta after add_message"); id: session_id.clone(),
if let Err(e) = self.persist_session_meta().await { channel: self.id.channel.clone(),
tracing::warn!("failed to persist session meta: {}", e); chat_id: self.id.chat_id.clone(),
} dialog_id: self.id.dialog_id.clone(),
} title: self.title.clone(),
created_at: self.created_at,
Ok(()) last_active_at: self.last_active_at,
message_count: self.message_count,
routing_info: if self.routing_info.is_empty() {
None
} else {
Some(self.routing_info.clone())
},
archived_at: self.archived_at,
deleted_at: None,
last_consolidated_at: self.last_consolidated_at,
last_compressed_message_at: self.last_compressed_message_at,
};
(storage, session_id, msg_meta, session_meta)
})
} }
/// 获取消息历史 /// 获取消息历史
@ -406,6 +447,7 @@ impl Session {
self.seq_counter = 1; self.seq_counter = 1;
self.total_message_count = 0; self.total_message_count = 0;
self.message_count = 0; self.message_count = 0;
self.state_version = self.state_version.wrapping_add(1);
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::debug!(session_id = %self.id, previous_len = len, "Chat history cleared"); tracing::debug!(session_id = %self.id, previous_len = len, "Chat history cleared");
} }
@ -417,6 +459,7 @@ impl Session {
self.seq_counter = 1; self.seq_counter = 1;
self.total_message_count = 0; self.total_message_count = 0;
self.message_count = 0; self.message_count = 0;
self.state_version = self.state_version.wrapping_add(1);
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::debug!(session_id = %self.id, previous_len = len, "Chat context reset in memory"); tracing::debug!(session_id = %self.id, previous_len = len, "Chat context reset in memory");
} }
@ -444,43 +487,49 @@ impl Session {
/// 将 session 元数据写回 Storage /// 将 session 元数据写回 Storage
pub async fn persist_session_meta(&self) -> Result<(), StorageError> { pub async fn persist_session_meta(&self) -> Result<(), StorageError> {
if let Some(ref storage) = self.storage { if let Some((storage, meta)) = self.session_meta_snapshot() {
let meta = crate::storage::session::SessionMeta {
id: self.id.to_string(),
channel: self.id.channel.clone(),
chat_id: self.id.chat_id.clone(),
dialog_id: self.id.dialog_id.clone(),
title: self.title.clone(),
created_at: self.created_at,
last_active_at: self.last_active_at,
message_count: self.message_count,
routing_info: if self.routing_info.is_empty() {
None
} else {
Some(self.routing_info.clone())
},
archived_at: self.archived_at,
deleted_at: None,
last_consolidated_at: self.last_consolidated_at,
last_compressed_message_at: self.last_compressed_message_at,
};
storage.upsert_session(&meta).await?; storage.upsert_session(&meta).await?;
} }
Ok(()) Ok(())
} }
fn session_meta_snapshot(
&self,
) -> Option<(StdArc<Storage>, crate::storage::session::SessionMeta)> {
let storage = self.storage.clone()?;
let meta = crate::storage::session::SessionMeta {
id: self.id.to_string(),
channel: self.id.channel.clone(),
chat_id: self.id.chat_id.clone(),
dialog_id: self.id.dialog_id.clone(),
title: self.title.clone(),
created_at: self.created_at,
last_active_at: self.last_active_at,
message_count: self.message_count,
routing_info: if self.routing_info.is_empty() {
None
} else {
Some(self.routing_info.clone())
},
archived_at: self.archived_at,
deleted_at: None,
last_consolidated_at: self.last_consolidated_at,
last_compressed_message_at: self.last_compressed_message_at,
};
Some((storage, meta))
}
/// 检查是否需要自动生成 title5 条用户消息后) /// 检查是否需要自动生成 title5 条用户消息后)
pub fn should_generate_title(&self) -> bool { pub fn should_generate_title(&self) -> bool {
self.title == "新对话" && self.message_count >= 5 self.title == "新对话" && self.message_count >= 5
} }
/// 生成标题(调用 LLM fn title_prompt_snapshot(&self) -> Option<String> {
pub async fn generate_title(&mut self) -> Result<(), AgentError> {
if !self.should_generate_title() { if !self.should_generate_title() {
return Ok(()); return None;
} }
let prompt = format!( Some(format!(
r#"给定以下对话历史生成一个简短的会话标题5-15 个中文字符),概括这个对话的核心内容或用户的主要需求。只返回一个标题,不要解释。 r#"给定以下对话历史生成一个简短的会话标题5-15 个中文字符),概括这个对话的核心内容或用户的主要需求。只返回一个标题,不要解释。
@ -492,38 +541,41 @@ impl Session {
.map(|m| format!("[{}]: {}", m.role, m.content)) .map(|m| format!("[{}]: {}", m.role, m.content))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join("\n") .join("\n")
); ))
let title = self.call_llm_for_title(&prompt).await?;
if !title.is_empty() {
self.title = title.clone();
if let Err(e) = self.persist_session_meta().await {
tracing::warn!("failed to persist title: {}", e);
}
}
Ok(())
} }
/// 调用 LLM 生成标题 fn apply_generated_title(&mut self, title: String) -> bool {
async fn call_llm_for_title(&self, prompt: &str) -> Result<String, AgentError> { if title.is_empty() || !self.should_generate_title() {
use crate::providers::{ChatCompletionRequest, ChatCompletionResponse, Message}; return false;
}
let request = ChatCompletionRequest { self.title = title;
messages: vec![Message::user(prompt.to_string())], self.state_version = self.state_version.wrapping_add(1);
temperature: Some(0.3), true
max_tokens: Some(20), }
tools: None,
fn fresh_context_compressor(&self) -> ContextCompressor {
let compressor_config = ContextCompressionConfig {
protect_first_n: 2,
..Default::default()
}; };
let mut compressor = ContextCompressor::with_config(
self.provider.clone(),
self.provider_config.token_limit,
compressor_config,
self.memory_manager.clone(),
);
compressor.set_session_id(Some(self.id.to_string()));
compressor
}
let response: ChatCompletionResponse = self fn replace_history_in_memory(&mut self, messages: Vec<ChatMessage>) {
.provider self.messages = messages;
.chat(request) self.seq_counter = self.messages.len() as i64 + 1;
.await self.total_message_count = self.messages.len() as i64;
.map_err(|e| AgentError::Other(format!("LLM call failed: {}", e)))?; self.message_count = self.messages.iter().filter(|m| m.role == "user").count() as i64;
self.last_active_at = chrono::Utc::now().timestamp_millis();
Ok(response.content.trim().to_string()) self.state_version = self.state_version.wrapping_add(1);
} }
/// 获取 provider_config 引用 /// 获取 provider_config 引用
@ -1075,24 +1127,39 @@ impl SessionManager {
"compact" => { "compact" => {
if let Some(sid) = current_session_id { if let Some(sid) = current_session_id {
let session = self.get_or_create_session(sid).await?; let session = self.get_or_create_session(sid).await?;
let mut session_guard = session.lock().await; let (original_count, history, mut compressor, base_version) = {
let original_count = session_guard.get_history().len(); let session_guard = session.lock().await;
let history = session_guard.get_history().to_vec(); (
let result = session_guard.compressor.compress_if_needed(history).await?; session_guard.get_history().len(),
session_guard.get_history().to_vec(),
session_guard.fresh_context_compressor(),
session_guard.state_version,
)
};
let result = compressor.compress_if_needed(history).await?;
let compressed_count = result.history.len(); let compressed_count = result.history.len();
if result.created_timelines { let meta_snapshot = {
session_guard.last_compressed_message_at = let mut session_guard = session.lock().await;
Some(chrono::Utc::now().timestamp_millis()); if session_guard.state_version != base_version {
if let Err(e) = session_guard.persist_session_meta().await { return Ok((
tracing::warn!(error = %e, "Failed to persist compression marker after /compact"); None,
"Context changed while compacting; please run /compact again."
.to_string(),
));
} }
} if result.created_timelines {
session_guard.clear_history(); session_guard.last_compressed_message_at =
for msg in result.history { Some(chrono::Utc::now().timestamp_millis());
session_guard }
.add_message(msg, false) session_guard.replace_history_in_memory(result.history);
.await session_guard.session_meta_snapshot()
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; };
if let Some((storage, meta)) = meta_snapshot
&& let Err(e) = storage.upsert_session(&meta).await
{
tracing::warn!(error = %e, "Failed to persist compression marker after /compact");
} }
Ok(( Ok((
None, None,
@ -1304,16 +1371,22 @@ impl SessionManager {
let sid = current_session_id let sid = current_session_id
.ok_or_else(|| AgentError::Other("no active session".to_string()))?; .ok_or_else(|| AgentError::Other("no active session".to_string()))?;
let session = self.get_or_create_session(sid).await?; let session = self.get_or_create_session(sid).await?;
let mut guard = session.lock().await; let msgs = {
let mut msgs: Vec<String> = Vec::new(); let mut guard = session.lock().await;
if guard.current_cancel.take().is_some() { let mut msgs: Vec<String> = Vec::new();
msgs.push("当前任务已发送停止信号。".to_string()); if guard.current_cancel.take().is_some() {
} msgs.push("当前任务已发送停止信号。".to_string());
if guard.agent_tx.take().is_some() { }
msgs.push("消息队列已清空。".to_string()); if guard.agent_tx.take().is_some() {
} msgs.push("消息队列已清空。".to_string());
guard.worker_generation = guard.worker_generation.wrapping_add(1); }
guard.worker_generation = guard.worker_generation.wrapping_add(1);
guard.state_version = guard.state_version.wrapping_add(1);
msgs
};
// Cancel all running background sub-agent tasks for this session // Cancel all running background sub-agent tasks for this session
// after releasing the session lock.
self.sub_agent_manager self.sub_agent_manager
.cancel_by_session(&sid.to_string()) .cancel_by_session(&sid.to_string())
.await; .await;
@ -1670,7 +1743,7 @@ impl SessionManager {
) -> Result<(), AgentError> { ) -> Result<(), AgentError> {
let unified_id = self.resolve_dialog_id(channel, chat_id).await?; let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
let session = self.get_or_create_session(&unified_id).await?; let session = self.get_or_create_session(&unified_id).await?;
{ let persist_snapshot = {
let mut guard = session.lock().await; let mut guard = session.lock().await;
let source = MessageSource { let source = MessageSource {
kind: SourceKind::SystemNotification, kind: SourceKind::SystemNotification,
@ -1681,11 +1754,11 @@ impl SessionManager {
task_id: task_id.map(|s| s.to_string()), task_id: task_id.map(|s| s.to_string()),
}; };
let msg = ChatMessage::assistant_with_source(content, source); let msg = ChatMessage::assistant_with_source(content, source);
guard guard.add_message_in_memory(msg, true)
.add_message(msg, true) };
.await persist_added_message(persist_snapshot)
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; .await
} .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
let outbound = OutboundMessage { let outbound = OutboundMessage {
channel: channel.to_string(), channel: channel.to_string(),
@ -1805,6 +1878,63 @@ impl SessionManager {
} }
} }
async fn maybe_generate_title_outside_lock(session: Arc<Mutex<Session>>) -> Result<(), AgentError> {
use crate::providers::{ChatCompletionRequest, ChatCompletionResponse, Message};
let (provider, prompt) = {
let guard = session.lock().await;
let Some(prompt) = guard.title_prompt_snapshot() else {
return Ok(());
};
(guard.provider.clone(), prompt)
};
let request = ChatCompletionRequest {
messages: vec![Message::user(prompt)],
temperature: Some(0.3),
max_tokens: Some(20),
tools: None,
};
let response: ChatCompletionResponse = provider
.chat(request)
.await
.map_err(|e| AgentError::Other(format!("LLM call failed: {}", e)))?;
let title = response.content.trim().to_string();
let meta_snapshot = {
let mut guard = session.lock().await;
if guard.apply_generated_title(title) {
guard.session_meta_snapshot()
} else {
None
}
};
if let Some((storage, meta)) = meta_snapshot {
storage
.upsert_session(&meta)
.await
.map_err(|e| AgentError::Other(format!("failed to persist title: {}", e)))?;
}
Ok(())
}
async fn persist_added_message(
snapshot: Option<MessagePersistSnapshot>,
) -> Result<(), StorageError> {
let Some((storage, session_id, msg_meta, session_meta)) = snapshot else {
return Ok(());
};
storage
.append_message_with_retry(&session_id, &msg_meta)
.await?;
storage.upsert_session(&session_meta).await?;
Ok(())
}
fn spawn_agent_worker( fn spawn_agent_worker(
mut task_rx: mpsc::UnboundedReceiver<AgentTask>, mut task_rx: mpsc::UnboundedReceiver<AgentTask>,
session: Arc<Mutex<Session>>, session: Arc<Mutex<Session>>,
@ -1845,8 +1975,12 @@ fn spawn_agent_worker(
}); });
} }
// Phase 1: prepare data under session lock // Phase 1: capture a stable session snapshot under lock.
let (agent, history_out, system_prompt_out, cancel_rx) = { // Memory recall and compression happen outside this block so
// /stop and other commands are not blocked behind slow I/O or
// LLM-backed compaction.
let skills_prompt = skills_loader.build_skills_prompt();
let (agent, history_raw, mut compressor, base_version, cancel_rx) = {
let mut guard = session.lock().await; let mut guard = session.lock().await;
if guard.worker_generation != worker_gen { if guard.worker_generation != worker_gen {
@ -1857,7 +1991,9 @@ fn spawn_agent_worker(
task.media.iter().map(|m| m.to_media_ref()).collect(); task.media.iter().map(|m| m.to_media_ref()).collect();
let user_message = let user_message =
guard.create_user_message(&task.content, media_refs); guard.create_user_message(&task.content, media_refs);
if let Err(e) = guard.add_message(user_message, true).await { let user_persist = guard.add_message_in_memory(user_message, true);
drop(guard);
if let Err(e) = persist_added_message(user_persist).await {
tracing::error!(error = %e, "Failed to persist user message"); tracing::error!(error = %e, "Failed to persist user message");
let err_outbound = OutboundMessage { let err_outbound = OutboundMessage {
channel: task_chan.clone(), channel: task_chan.clone(),
@ -1871,61 +2007,12 @@ fn spawn_agent_worker(
let _ = bus.publish_outbound(err_outbound).await; let _ = bus.publish_outbound(err_outbound).await;
return; return;
} }
let mut guard = session.lock().await;
if guard.worker_generation != worker_gen {
return;
}
let history_raw = guard.get_history().to_vec(); let history_raw = guard.get_history().to_vec();
let skills_prompt = skills_loader.build_skills_prompt();
let memory_context = match memory_manager
.recall(
&task.content,
5,
Some(crate::memory::MemoryCategory::Knowledge),
None,
)
.await
{
Ok(entries) if !entries.is_empty() => Some(
entries
.iter()
.map(|e| format!("- {}: {}", e.key, e.content))
.collect::<Vec<_>>()
.join("\n"),
),
Err(e) => {
tracing::warn!(error = %e, "Failed to fetch memory context");
None
}
_ => None,
};
let system_prompt = guard
.build_system_prompt(&skills_prompt, memory_context.as_deref());
let result = guard
.compressor
.compress_if_needed(history_raw)
.await
.map(|r| {
if r.created_timelines {
guard.last_compressed_message_at =
Some(chrono::Utc::now().timestamp_millis());
}
r.history
})
.unwrap_or_else(|e| {
tracing::warn!(
error = %e,
"Context compression failed in worker"
);
guard.get_history().to_vec()
});
let mut history = result;
history.insert(0, ChatMessage::system(system_prompt.clone()));
let now = chrono::Utc::now().timestamp_millis();
guard.last_consolidated_at = Some(now);
let _ = guard.persist_session_meta().await;
let agent = match guard.create_agent_with_notify(notify_tx) { let agent = match guard.create_agent_with_notify(notify_tx) {
Ok(a) => a, Ok(a) => a,
@ -1952,9 +2039,87 @@ fn spawn_agent_worker(
} }
guard.current_cancel = Some(cancel_tx); guard.current_cancel = Some(cancel_tx);
(agent, history, system_prompt, cancel_rx) (
agent,
history_raw,
guard.fresh_context_compressor(),
guard.state_version,
cancel_rx,
)
}; // lock released }; // lock released
let memory_context = match memory_manager
.recall(
&task.content,
5,
Some(crate::memory::MemoryCategory::Knowledge),
None,
)
.await
{
Ok(entries) if !entries.is_empty() => Some(
entries
.iter()
.map(|e| format!("- {}: {}", e.key, e.content))
.collect::<Vec<_>>()
.join("\n"),
),
Err(e) => {
tracing::warn!(error = %e, "Failed to fetch memory context");
None
}
_ => None,
};
let system_prompt_out = {
let guard = session.lock().await;
if guard.worker_generation != worker_gen {
return;
}
guard.build_system_prompt(&skills_prompt, memory_context.as_deref())
};
let compression_result = compressor.compress_if_needed(history_raw).await;
let mut history_out = match compression_result {
Ok(result) => {
let meta_snapshot = {
let mut guard = session.lock().await;
if guard.worker_generation != worker_gen {
return;
}
if guard.state_version != base_version {
tracing::warn!(
session_id = %guard.id,
"Session changed while preparing agent history; dropping stale task"
);
return;
}
if result.created_timelines {
guard.last_compressed_message_at =
Some(chrono::Utc::now().timestamp_millis());
}
guard.last_consolidated_at =
Some(chrono::Utc::now().timestamp_millis());
guard.session_meta_snapshot()
};
if let Some((storage, meta)) = meta_snapshot
&& let Err(e) = storage.upsert_session(&meta).await
{
tracing::warn!(error = %e, "Failed to persist session meta after compression");
}
result.history
}
Err(e) => {
tracing::warn!(error = %e, "Context compression failed in worker");
let guard = session.lock().await;
if guard.worker_generation != worker_gen {
return;
}
guard.get_history().to_vec()
}
};
history_out.insert(0, ChatMessage::system(system_prompt_out.clone()));
// Phase 2 + 3: LLM call with cancellation // Phase 2 + 3: LLM call with cancellation
let session2 = session.clone(); let session2 = session.clone();
let bus2 = bus.clone(); let bus2 = bus.clone();
@ -1975,8 +2140,8 @@ fn spawn_agent_worker(
Err(AgentError::LlmError(ref msg)) Err(AgentError::LlmError(ref msg))
if is_context_overflow_error(msg) => if is_context_overflow_error(msg) =>
{ {
let retry_history = { let (raw, mut retry_compressor, retry_base_version, new_window) = {
let mut guard = session2.lock().await; let guard = session2.lock().await;
let new_window = let new_window =
crate::agent::ContextCompressor::parse_context_limit_from_error(msg) crate::agent::ContextCompressor::parse_context_limit_from_error(msg)
.unwrap_or(guard.compressor_threshold()); .unwrap_or(guard.compressor_threshold());
@ -1985,31 +2150,56 @@ fn spawn_agent_worker(
error = %msg, error = %msg,
"Context overflow in worker — retrying" "Context overflow in worker — retrying"
); );
(
guard.get_history().to_vec(),
guard.fresh_context_compressor(),
guard.state_version,
new_window,
)
};
retry_compressor.set_context_window(new_window);
let retry_result =
match retry_compressor.compress_if_needed(raw).await {
Ok(r) => r,
Err(e) => {
tracing::error!(error = %e, "Retry compression failed");
let err_outbound = OutboundMessage {
channel: chan2,
chat_id: cid2,
content: "Context overflow handling failed."
.to_string(),
reply_to: None,
media: vec![],
metadata: HashMap::new(),
};
let _ = bus2.publish_outbound(err_outbound).await;
return;
}
};
let meta_snapshot = {
let mut guard = session2.lock().await;
if guard.state_version != retry_base_version {
tracing::warn!(
session_id = %guard.id,
"Session changed while retry-compressing after context overflow"
);
return;
}
guard.compressor.set_context_window(new_window); guard.compressor.set_context_window(new_window);
let raw = guard.get_history().to_vec();
let retry_result =
match guard.compressor.compress_if_needed(raw).await {
Ok(r) => r,
Err(e) => {
tracing::error!(error = %e, "Retry compression failed");
let err_outbound = OutboundMessage {
channel: chan2,
chat_id: cid2,
content: "Context overflow handling failed."
.to_string(),
reply_to: None,
media: vec![],
metadata: HashMap::new(),
};
let _ = bus2.publish_outbound(err_outbound).await;
return;
}
};
if retry_result.created_timelines { if retry_result.created_timelines {
guard.last_compressed_message_at = guard.last_compressed_message_at =
Some(chrono::Utc::now().timestamp_millis()); Some(chrono::Utc::now().timestamp_millis());
let _ = guard.persist_session_meta().await;
} }
guard.session_meta_snapshot()
};
if let Some((storage, meta)) = meta_snapshot
&& let Err(e) = storage.upsert_session(&meta).await
{
tracing::warn!(error = %e, "Failed to persist session meta after retry compression");
}
let retry_history = {
let mut retry = retry_result.history; let mut retry = retry_result.history;
retry.insert( retry.insert(
0, 0,
@ -2055,20 +2245,24 @@ fn spawn_agent_worker(
let response = { let response = {
let mut guard = session2.lock().await; let mut guard = session2.lock().await;
let mut persist_snapshots = Vec::new();
for msg in result.emitted_messages { for msg in result.emitted_messages {
guard.add_message(msg, true).await.inspect_err(|e| { persist_snapshots.push(guard.add_message_in_memory(msg, true));
tracing::error!(error = %e, "Failed to persist message")
}).ok();
}
if guard.should_generate_title()
&& let Err(e) = guard.generate_title().await
{
tracing::warn!("failed to generate title: {}", e);
} }
let sent_count = guard.messages.len(); let sent_count = guard.messages.len();
guard.compressor.set_last_api_info(sent_count, result.total_tokens); guard.compressor.set_last_api_info(sent_count, result.total_tokens);
result.final_response.content (result.final_response.content, persist_snapshots)
}; };
let (response, persist_snapshots) = response;
for snapshot in persist_snapshots {
if let Err(e) = persist_added_message(snapshot).await {
tracing::error!(error = %e, "Failed to persist message");
}
}
if let Err(e) = maybe_generate_title_outside_lock(session2.clone()).await {
tracing::warn!("failed to generate title: {}", e);
}
let outbound = OutboundMessage { let outbound = OutboundMessage {
channel: chan2, channel: chan2,
@ -2158,25 +2352,34 @@ impl SessionManager {
unified_id: &UnifiedSessionId, unified_id: &UnifiedSessionId,
) -> Result<(), AgentError> { ) -> Result<(), AgentError> {
let session = self.get_or_create_session(unified_id).await?; let session = self.get_or_create_session(unified_id).await?;
let mut session_guard = session.lock().await; let (storage, session_id, meta_snapshot) = {
// Clear in-memory let mut session_guard = session.lock().await;
session_guard.messages.clear(); // Clear in-memory
session_guard.seq_counter = 1; session_guard.messages.clear();
session_guard.total_message_count = 0; session_guard.seq_counter = 1;
session_guard.message_count = 0; session_guard.total_message_count = 0;
session_guard.last_consolidated_at = None; session_guard.message_count = 0;
session_guard.last_compressed_message_at = None; session_guard.last_consolidated_at = None;
// Clear Storage session_guard.last_compressed_message_at = None;
if let Some(ref storage) = session_guard.storage { session_guard.state_version = session_guard.state_version.wrapping_add(1);
(
session_guard.storage.clone(),
session_guard.id.to_string(),
session_guard.session_meta_snapshot(),
)
};
// Clear Storage outside the session lock.
if let Some(storage) = storage {
storage storage
.clear_messages(&session_guard.id.to_string()) .clear_messages(&session_id)
.await .await
.map_err(|e| AgentError::Other(format!("failed to clear messages: {}", e)))?; .map_err(|e| AgentError::Other(format!("failed to clear messages: {}", e)))?;
} }
session_guard if let Some((storage, meta)) = meta_snapshot {
.persist_session_meta() storage.upsert_session(&meta).await.map_err(|e| {
.await AgentError::Other(format!("failed to persist cleared session: {}", e))
.map_err(|e| AgentError::Other(format!("failed to persist cleared session: {}", e)))?; })?;
}
Ok(()) Ok(())
} }
} }
@ -2235,14 +2438,14 @@ impl OutboundMessenger for SessionManager {
}; };
// Write source-tagged assistant message to target session history // Write source-tagged assistant message to target session history
{ let persist_snapshot = {
let mut guard = session.lock().await; let mut guard = session.lock().await;
let msg = ChatMessage::assistant_with_source(marked_content.clone(), source); let msg = ChatMessage::assistant_with_source(marked_content.clone(), source);
guard guard.add_message_in_memory(msg, true)
.add_message(msg, true) };
.await persist_added_message(persist_snapshot)
.map_err(|e| e.to_string())?; .await
} .map_err(|e| e.to_string())?;
// Restore active dialog if source and target share channel:chat_id but differ in dialog_id // Restore active dialog if source and target share channel:chat_id but differ in dialog_id
if let Some(ref origin_id) = origin_id { if let Some(ref origin_id) = origin_id {