diff --git a/src/session/session.rs b/src/session/session.rs index e4ff787..0cc60d2 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -70,6 +70,7 @@ pub struct Session { /// Timestamp (Unix ms) of the last consolidation. /// Messages before this time have been compressed into memory. pub last_consolidated_at: Option, + pub last_compressed_message_at: Option, memory_manager: Arc, } @@ -116,6 +117,7 @@ impl Session { storage, routing_info, last_consolidated_at: None, + last_compressed_message_at: None, memory_manager, }) } @@ -185,6 +187,7 @@ impl Session { storage: Some(storage), routing_info: session_meta.routing_info.unwrap_or_default(), last_consolidated_at: session_meta.last_consolidated_at, + last_compressed_message_at: session_meta.last_compressed_message_at, memory_manager, }) } @@ -313,6 +316,7 @@ impl Session { }, deleted_at: None, last_consolidated_at: self.last_consolidated_at, + last_compressed_message_at: self.last_compressed_message_at, }; storage.upsert_session(&meta).await?; } @@ -980,6 +984,7 @@ impl SessionManager { routing_info: if routing_info.is_empty() { None } else { Some(routing_info.clone()) }, deleted_at: None, last_consolidated_at: None, + last_compressed_message_at: None, }; self.storage.upsert_session(&meta).await .map_err(|e| AgentError::Other(format!("failed to create session in storage: {}", e)))?; diff --git a/src/storage/mod.rs b/src/storage/mod.rs index c697e74..ce97c34 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -42,6 +42,7 @@ impl Storage { routing_info TEXT, deleted_at INTEGER, last_consolidated_at INTEGER, + last_compressed_message_at INTEGER, UNIQUE(channel, chat_id, dialog_id) ) "#, @@ -179,6 +180,16 @@ impl Storage { .await .ok(); + // Migration: add last_compressed_message_at column if not exists + sqlx::query( + r#" + ALTER TABLE sessions ADD COLUMN last_compressed_message_at INTEGER + "#, + ) + .execute(&self.pool) + .await + .ok(); + sqlx::query( r#" CREATE TABLE IF NOT EXISTS llm_calls ( @@ -314,15 +325,16 @@ impl Storage { pub async fn upsert_session(&self, meta: &crate::storage::session::SessionMeta) -> Result<(), StorageError> { sqlx::query( r#" - INSERT INTO sessions (id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + INSERT INTO sessions (id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET title = excluded.title, last_active_at = excluded.last_active_at, message_count = excluded.message_count, routing_info = excluded.routing_info, deleted_at = excluded.deleted_at, - last_consolidated_at = excluded.last_consolidated_at + last_consolidated_at = excluded.last_consolidated_at, + last_compressed_message_at = excluded.last_compressed_message_at "#, ) .bind(&meta.id) @@ -336,6 +348,7 @@ impl Storage { .bind(&meta.routing_info) .bind(meta.deleted_at) .bind(meta.last_consolidated_at) + .bind(meta.last_compressed_message_at) .execute(self.pool()) .await?; @@ -345,7 +358,7 @@ impl Storage { pub async fn get_session(&self, id: &str) -> Result { let row = sqlx::query( r#" - SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at FROM sessions WHERE id = ? AND deleted_at IS NULL "#, ) @@ -366,6 +379,7 @@ impl Storage { routing_info: row.get("routing_info"), deleted_at: row.get("deleted_at"), last_consolidated_at: row.get("last_consolidated_at"), + last_compressed_message_at: row.get("last_compressed_message_at"), }) } @@ -377,7 +391,7 @@ impl Storage { ) -> Result, StorageError> { let rows = sqlx::query( r#" - SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at FROM sessions WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL ORDER BY last_active_at DESC @@ -404,6 +418,7 @@ impl Storage { routing_info: row.get("routing_info"), deleted_at: row.get("deleted_at"), last_consolidated_at: row.get("last_consolidated_at"), + last_compressed_message_at: row.get("last_compressed_message_at"), }) .collect()) } @@ -451,7 +466,7 @@ impl Storage { let cutoff = chrono::Utc::now().timestamp_millis() - ttl_millis; let row = sqlx::query( r#" - SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at FROM sessions WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL AND last_active_at > ? ORDER BY last_active_at DESC @@ -477,6 +492,7 @@ impl Storage { routing_info: row.get("routing_info"), deleted_at: row.get("deleted_at"), last_consolidated_at: row.get("last_consolidated_at"), + last_compressed_message_at: row.get("last_compressed_message_at"), })), None => Ok(None), } @@ -561,7 +577,7 @@ impl Storage { ) -> Result, StorageError> { let rows = sqlx::query( r#" - SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at FROM sessions WHERE deleted_at IS NULL ORDER BY last_active_at DESC @@ -586,6 +602,7 @@ impl Storage { routing_info: row.get("routing_info"), deleted_at: row.get("deleted_at"), last_consolidated_at: row.get("last_consolidated_at"), + last_compressed_message_at: row.get("last_compressed_message_at"), }) .collect()) } @@ -691,6 +708,7 @@ mod tests { routing_info: Some(r#"{"type":"cli"}"#.to_string()), deleted_at: None, last_consolidated_at: None, + last_compressed_message_at: None, }; storage.upsert_session(&meta).await.unwrap(); @@ -726,6 +744,7 @@ mod tests { routing_info: None, deleted_at: None, last_consolidated_at: None, + last_compressed_message_at: None, }; storage.upsert_session(&meta).await.unwrap(); } @@ -752,6 +771,7 @@ mod tests { routing_info: None, deleted_at: None, last_consolidated_at: None, + last_compressed_message_at: None, }; storage.upsert_session(&meta).await.unwrap(); @@ -778,6 +798,7 @@ mod tests { routing_info: None, deleted_at: None, last_consolidated_at: None, + last_compressed_message_at: None, }; storage.upsert_session(&session_meta).await.unwrap(); @@ -819,6 +840,7 @@ mod tests { routing_info: None, deleted_at: None, last_consolidated_at: None, + last_compressed_message_at: None, }; storage.upsert_session(&meta).await.unwrap(); diff --git a/src/storage/session.rs b/src/storage/session.rs index 72ae6d2..b9a1408 100644 --- a/src/storage/session.rs +++ b/src/storage/session.rs @@ -13,4 +13,5 @@ pub struct SessionMeta { pub routing_info: Option, pub deleted_at: Option, pub last_consolidated_at: Option, + pub last_compressed_message_at: Option, }