From 3d298540793e86f92480e5af9dec1982d1b43dfe Mon Sep 17 00:00:00 2001 From: xiaoxixi Date: Sun, 10 May 2026 14:41:48 +0800 Subject: [PATCH] feat(session): incremental recovery from storage using compressed timeline --- src/session/session.rs | 98 +++++++++++++++++++++++++++++++++--------- 1 file changed, 77 insertions(+), 21 deletions(-) diff --git a/src/session/session.rs b/src/session/session.rs index 1627abd..ec68050 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -133,9 +133,6 @@ impl Session { let session_meta = storage.get_session(&id.to_string()).await .map_err(|e| AgentError::Other(format!("failed to load session from storage: {}", e)))?; - let messages = storage.load_messages(&id.to_string(), 0).await - .map_err(|e| AgentError::Other(format!("failed to load messages from storage: {}", e)))?; - let mut provider_box = create_provider(provider_config.clone()) .map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?; provider_box.set_storage(storage.clone()); @@ -149,27 +146,86 @@ impl Session { let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone()); compressor.set_session_id(Some(id.to_string())); - // Convert MessageMeta to ChatMessage, then repair damaged tool call chains - let mut chat_messages: Vec = messages.into_iter().map(|m| { - ChatMessage { - id: m.id, - role: m.role, - content: m.content, - media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(), - timestamp: m.created_at, - tool_call_id: m.tool_call_id, - tool_name: m.tool_name, - tool_calls: m.tool_calls - .and_then(|tc| serde_json::from_str::>(&tc).ok()) - .filter(|v| !v.is_empty()), - source: m.source.and_then(|s| serde_json::from_str(&s).ok()), + let mut chat_messages: Vec = Vec::new(); + + if let Some(after_ts) = session_meta.last_compressed_message_at { + // Load last 4 timelines to detect if there are more than 3 + let timelines = storage + .load_session_timelines(&id.to_string(), 4) + .await + .unwrap_or_default(); + + let has_more_timelines = timelines.len() > 3; + + if has_more_timelines { + chat_messages.push(ChatMessage::user( + "[Earlier conversation summaries exist. \ + Use `timeline_recall` to search if needed.]" + )); } - }).collect(); - repair_tool_call_chains(&mut chat_messages); + // Insert latest 3 timelines as context (reversed: oldest first) + for tl in timelines.iter().take(3).rev() { + chat_messages.push(ChatMessage::user(format!( + "[Previous Context]\n{}", tl.content + ))); + } - let seq_counter = chat_messages.len() as i64 + 1; - let total_message_count = chat_messages.len() as i64; + // Load raw messages after compressed timestamp + let tail = storage + .load_messages_after_timestamp(&id.to_string(), after_ts) + .await + .unwrap_or_default(); + + let mut tail_msgs: Vec = tail.into_iter().map(|m| { + ChatMessage { + id: m.id, + role: m.role, + content: m.content, + media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(), + timestamp: m.created_at, + tool_call_id: m.tool_call_id, + tool_name: m.tool_name, + tool_calls: m.tool_calls + .and_then(|tc| serde_json::from_str::>(&tc).ok()) + .filter(|v| !v.is_empty()), + source: m.source.and_then(|s| serde_json::from_str(&s).ok()), + } + }).collect(); + + repair_tool_call_chains(&mut tail_msgs); + chat_messages.extend(tail_msgs); + } else { + // No prior compression — load all messages (existing behavior) + let messages = storage.load_messages(&id.to_string(), 0).await + .map_err(|e| AgentError::Other(format!("failed to load messages from storage: {}", e)))?; + + chat_messages = messages.into_iter().map(|m| { + ChatMessage { + id: m.id, + role: m.role, + content: m.content, + media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(), + timestamp: m.created_at, + tool_call_id: m.tool_call_id, + tool_name: m.tool_name, + tool_calls: m.tool_calls + .and_then(|tc| serde_json::from_str::>(&tc).ok()) + .filter(|v| !v.is_empty()), + source: m.source.and_then(|s| serde_json::from_str(&s).ok()), + } + }).collect(); + + repair_tool_call_chains(&mut chat_messages); + } + + // seq_counter from actual DB max + let max_seq = storage + .get_max_message_seq(&id.to_string()) + .await + .unwrap_or(0); + let seq_counter = max_seq + 1; + let total_message_count = session_meta.message_count; Ok(Self { id: id.clone(),