From e65130450ea5b36c65f8620dd972890863e6bc41 Mon Sep 17 00:00:00 2001 From: xiaoxixi Date: Sun, 10 May 2026 14:40:13 +0800 Subject: [PATCH] feat(compressor): return CompressionResult with created_timelines flag, record marker in handle_message and /compact --- src/agent/context_compressor.rs | 22 +++++++++++++++------- src/session/session.rs | 27 ++++++++++++++++++++------- 2 files changed, 35 insertions(+), 14 deletions(-) diff --git a/src/agent/context_compressor.rs b/src/agent/context_compressor.rs index 081f473..f0ab650 100644 --- a/src/agent/context_compressor.rs +++ b/src/agent/context_compressor.rs @@ -70,6 +70,12 @@ pub struct ContextCompressor { session_id: Option, } +/// Result of context compression. +pub struct CompressionResult { + pub history: Vec, + pub created_timelines: bool, +} + impl ContextCompressor { /// Create a new compressor with the given provider, context window size, and memory manager. pub fn new( @@ -173,11 +179,11 @@ impl ContextCompressor { pub async fn compress_if_needed( &self, mut history: Vec, - ) -> Result, AgentError> { + ) -> Result { // Check if compression is needed let tokens = estimate_tokens(&history); if tokens <= self.threshold() { - return Ok(history); + return Ok(CompressionResult { history, created_timelines: false }); } #[cfg(debug_assertions)] @@ -200,11 +206,12 @@ impl ContextCompressor { ); } if tokens_after <= self.threshold() { - return Ok(history); + return Ok(CompressionResult { history, created_timelines: false }); } // LLM summarization pass let mut current_history = history; + let mut created_timelines = false; for pass in 0..self.config.max_passes { let tokens = estimate_tokens(¤t_history); if tokens <= self.threshold() { @@ -221,6 +228,7 @@ impl ContextCompressor { match self.compress_once(¤t_history).await { Ok(Some(compressed)) => { current_history = compressed; + created_timelines = true; } Ok(None) => { // No more compressible content @@ -270,7 +278,7 @@ impl ContextCompressor { "Context compression completed" ); - Ok(current_history) + Ok(CompressionResult { history: current_history, created_timelines }) } /// Try to extract the actual context token limit from an LLM error message. @@ -623,7 +631,7 @@ mod tests { ChatMessage::tool("call1", "bash", &"x".repeat(3000)), ]; - let result = compressor.compress_if_needed(messages).await.unwrap(); + let result = compressor.compress_if_needed(messages).await.unwrap().history; let tool_msg = result.iter().find(|m| m.role == "tool").unwrap(); assert!( @@ -677,7 +685,7 @@ mod tests { ChatMessage::user("Q4"), // 8: LAST, is user → B2B triggers ]; - let result = compressor.compress_if_needed(messages).await.unwrap(); + let result = compressor.compress_if_needed(messages).await.unwrap().history; // B2A: "Q1" must appear exactly once let q1_count = result.iter().filter(|m| m.role == "user" && m.content == "Q1").count(); @@ -721,7 +729,7 @@ mod tests { ChatMessage::tool("t3", "bash", &big), ]; - let result = compressor.compress_if_needed(messages).await.unwrap(); + let result = compressor.compress_if_needed(messages).await.unwrap().history; // After hard truncation: head (1) + trunc_note (1) + tail (2) = 4 messages assert!(result.len() < 7, "expected truncation reduction, got {} messages", result.len()); diff --git a/src/session/session.rs b/src/session/session.rs index 0cc60d2..1627abd 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -858,12 +858,16 @@ impl SessionManager { let mut session_guard = session.lock().await; let original_count = session_guard.get_history().len(); let history = session_guard.get_history().to_vec(); - let compressed = session_guard.compressor + let result = session_guard.compressor .compress_if_needed(history) .await?; - let compressed_count = compressed.len(); + let compressed_count = result.history.len(); + if result.created_timelines { + session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis()); + let _ = session_guard.persist_session_meta().await; + } session_guard.clear_history(); - for msg in compressed { + for msg in result.history { session_guard.add_message(msg, false).await .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; } @@ -1346,13 +1350,17 @@ impl SessionManager { // in context compression (system prompt is dynamic and should not be persisted). let system_prompt = session_guard.build_system_prompt(&skills_prompt, memory_context.as_deref()); - let mut history = session_guard.compressor + let result = session_guard.compressor .compress_if_needed(history) .await?; + if result.created_timelines { + session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis()); + } + let mut history = result.history; history.insert(0, ChatMessage::system(system_prompt.clone())); - // Advance consolidation pointer — future compressions skip already-processed messages + // Persist consolidation state let now = chrono::Utc::now().timestamp_millis(); session_guard.last_consolidated_at = Some(now); if let Err(e) = session_guard.persist_session_meta().await { @@ -1376,7 +1384,11 @@ impl SessionManager { ); session_guard.compressor.set_context_window(new_window); let raw = session_guard.get_history().to_vec(); - let mut retry = session_guard.compressor.compress_if_needed(raw).await?; + let retry_result = session_guard.compressor.compress_if_needed(raw).await?; + if retry_result.created_timelines { + session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis()); + } + let mut retry = retry_result.history; retry.insert(0, ChatMessage::system(system_prompt)); agent.process(retry).await? } @@ -1493,7 +1505,8 @@ impl SessionManager { // in context compression (system prompt is dynamic and should not be persisted). let mut history = session_guard.compressor .compress_if_needed(history) - .await?; + .await? + .history; history.insert(0, ChatMessage::system(full_system_prompt));