feat(session): incremental recovery from storage using compressed timeline
This commit is contained in:
parent
e65130450e
commit
3d29854079
@ -133,9 +133,6 @@ impl Session {
|
|||||||
let session_meta = storage.get_session(&id.to_string()).await
|
let session_meta = storage.get_session(&id.to_string()).await
|
||||||
.map_err(|e| AgentError::Other(format!("failed to load session from storage: {}", e)))?;
|
.map_err(|e| AgentError::Other(format!("failed to load session from storage: {}", e)))?;
|
||||||
|
|
||||||
let messages = storage.load_messages(&id.to_string(), 0).await
|
|
||||||
.map_err(|e| AgentError::Other(format!("failed to load messages from storage: {}", e)))?;
|
|
||||||
|
|
||||||
let mut provider_box = create_provider(provider_config.clone())
|
let mut provider_box = create_provider(provider_config.clone())
|
||||||
.map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?;
|
.map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?;
|
||||||
provider_box.set_storage(storage.clone());
|
provider_box.set_storage(storage.clone());
|
||||||
@ -149,27 +146,86 @@ impl Session {
|
|||||||
let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone());
|
let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone());
|
||||||
compressor.set_session_id(Some(id.to_string()));
|
compressor.set_session_id(Some(id.to_string()));
|
||||||
|
|
||||||
// Convert MessageMeta to ChatMessage, then repair damaged tool call chains
|
let mut chat_messages: Vec<ChatMessage> = Vec::new();
|
||||||
let mut chat_messages: Vec<ChatMessage> = messages.into_iter().map(|m| {
|
|
||||||
ChatMessage {
|
if let Some(after_ts) = session_meta.last_compressed_message_at {
|
||||||
id: m.id,
|
// Load last 4 timelines to detect if there are more than 3
|
||||||
role: m.role,
|
let timelines = storage
|
||||||
content: m.content,
|
.load_session_timelines(&id.to_string(), 4)
|
||||||
media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(),
|
.await
|
||||||
timestamp: m.created_at,
|
.unwrap_or_default();
|
||||||
tool_call_id: m.tool_call_id,
|
|
||||||
tool_name: m.tool_name,
|
let has_more_timelines = timelines.len() > 3;
|
||||||
tool_calls: m.tool_calls
|
|
||||||
.and_then(|tc| serde_json::from_str::<Vec<crate::providers::ToolCall>>(&tc).ok())
|
if has_more_timelines {
|
||||||
.filter(|v| !v.is_empty()),
|
chat_messages.push(ChatMessage::user(
|
||||||
source: m.source.and_then(|s| serde_json::from_str(&s).ok()),
|
"[Earlier conversation summaries exist. \
|
||||||
|
Use `timeline_recall` to search if needed.]"
|
||||||
|
));
|
||||||
}
|
}
|
||||||
}).collect();
|
|
||||||
|
|
||||||
repair_tool_call_chains(&mut chat_messages);
|
// Insert latest 3 timelines as context (reversed: oldest first)
|
||||||
|
for tl in timelines.iter().take(3).rev() {
|
||||||
|
chat_messages.push(ChatMessage::user(format!(
|
||||||
|
"[Previous Context]\n{}", tl.content
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
let seq_counter = chat_messages.len() as i64 + 1;
|
// Load raw messages after compressed timestamp
|
||||||
let total_message_count = chat_messages.len() as i64;
|
let tail = storage
|
||||||
|
.load_messages_after_timestamp(&id.to_string(), after_ts)
|
||||||
|
.await
|
||||||
|
.unwrap_or_default();
|
||||||
|
|
||||||
|
let mut tail_msgs: Vec<ChatMessage> = tail.into_iter().map(|m| {
|
||||||
|
ChatMessage {
|
||||||
|
id: m.id,
|
||||||
|
role: m.role,
|
||||||
|
content: m.content,
|
||||||
|
media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(),
|
||||||
|
timestamp: m.created_at,
|
||||||
|
tool_call_id: m.tool_call_id,
|
||||||
|
tool_name: m.tool_name,
|
||||||
|
tool_calls: m.tool_calls
|
||||||
|
.and_then(|tc| serde_json::from_str::<Vec<crate::providers::ToolCall>>(&tc).ok())
|
||||||
|
.filter(|v| !v.is_empty()),
|
||||||
|
source: m.source.and_then(|s| serde_json::from_str(&s).ok()),
|
||||||
|
}
|
||||||
|
}).collect();
|
||||||
|
|
||||||
|
repair_tool_call_chains(&mut tail_msgs);
|
||||||
|
chat_messages.extend(tail_msgs);
|
||||||
|
} else {
|
||||||
|
// No prior compression — load all messages (existing behavior)
|
||||||
|
let messages = storage.load_messages(&id.to_string(), 0).await
|
||||||
|
.map_err(|e| AgentError::Other(format!("failed to load messages from storage: {}", e)))?;
|
||||||
|
|
||||||
|
chat_messages = messages.into_iter().map(|m| {
|
||||||
|
ChatMessage {
|
||||||
|
id: m.id,
|
||||||
|
role: m.role,
|
||||||
|
content: m.content,
|
||||||
|
media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(),
|
||||||
|
timestamp: m.created_at,
|
||||||
|
tool_call_id: m.tool_call_id,
|
||||||
|
tool_name: m.tool_name,
|
||||||
|
tool_calls: m.tool_calls
|
||||||
|
.and_then(|tc| serde_json::from_str::<Vec<crate::providers::ToolCall>>(&tc).ok())
|
||||||
|
.filter(|v| !v.is_empty()),
|
||||||
|
source: m.source.and_then(|s| serde_json::from_str(&s).ok()),
|
||||||
|
}
|
||||||
|
}).collect();
|
||||||
|
|
||||||
|
repair_tool_call_chains(&mut chat_messages);
|
||||||
|
}
|
||||||
|
|
||||||
|
// seq_counter from actual DB max
|
||||||
|
let max_seq = storage
|
||||||
|
.get_max_message_seq(&id.to_string())
|
||||||
|
.await
|
||||||
|
.unwrap_or(0);
|
||||||
|
let seq_counter = max_seq + 1;
|
||||||
|
let total_message_count = session_meta.message_count;
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
id: id.clone(),
|
id: id.clone(),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user