feat(storage): add last_compressed_message_at column to sessions table and Session struct

This commit is contained in:
xiaoxixi 2026-05-10 14:35:21 +08:00
parent cb1140e9be
commit d022e30943
3 changed files with 35 additions and 7 deletions

View File

@ -70,6 +70,7 @@ pub struct Session {
/// Timestamp (Unix ms) of the last consolidation. /// Timestamp (Unix ms) of the last consolidation.
/// Messages before this time have been compressed into memory. /// Messages before this time have been compressed into memory.
pub last_consolidated_at: Option<i64>, pub last_consolidated_at: Option<i64>,
pub last_compressed_message_at: Option<i64>,
memory_manager: Arc<crate::memory::MemoryManager>, memory_manager: Arc<crate::memory::MemoryManager>,
} }
@ -116,6 +117,7 @@ impl Session {
storage, storage,
routing_info, routing_info,
last_consolidated_at: None, last_consolidated_at: None,
last_compressed_message_at: None,
memory_manager, memory_manager,
}) })
} }
@ -185,6 +187,7 @@ impl Session {
storage: Some(storage), storage: Some(storage),
routing_info: session_meta.routing_info.unwrap_or_default(), routing_info: session_meta.routing_info.unwrap_or_default(),
last_consolidated_at: session_meta.last_consolidated_at, last_consolidated_at: session_meta.last_consolidated_at,
last_compressed_message_at: session_meta.last_compressed_message_at,
memory_manager, memory_manager,
}) })
} }
@ -313,6 +316,7 @@ impl Session {
}, },
deleted_at: None, deleted_at: None,
last_consolidated_at: self.last_consolidated_at, last_consolidated_at: self.last_consolidated_at,
last_compressed_message_at: self.last_compressed_message_at,
}; };
storage.upsert_session(&meta).await?; storage.upsert_session(&meta).await?;
} }
@ -980,6 +984,7 @@ impl SessionManager {
routing_info: if routing_info.is_empty() { None } else { Some(routing_info.clone()) }, routing_info: if routing_info.is_empty() { None } else { Some(routing_info.clone()) },
deleted_at: None, deleted_at: None,
last_consolidated_at: None, last_consolidated_at: None,
last_compressed_message_at: None,
}; };
self.storage.upsert_session(&meta).await self.storage.upsert_session(&meta).await
.map_err(|e| AgentError::Other(format!("failed to create session in storage: {}", e)))?; .map_err(|e| AgentError::Other(format!("failed to create session in storage: {}", e)))?;

View File

@ -42,6 +42,7 @@ impl Storage {
routing_info TEXT, routing_info TEXT,
deleted_at INTEGER, deleted_at INTEGER,
last_consolidated_at INTEGER, last_consolidated_at INTEGER,
last_compressed_message_at INTEGER,
UNIQUE(channel, chat_id, dialog_id) UNIQUE(channel, chat_id, dialog_id)
) )
"#, "#,
@ -179,6 +180,16 @@ impl Storage {
.await .await
.ok(); .ok();
// Migration: add last_compressed_message_at column if not exists
sqlx::query(
r#"
ALTER TABLE sessions ADD COLUMN last_compressed_message_at INTEGER
"#,
)
.execute(&self.pool)
.await
.ok();
sqlx::query( sqlx::query(
r#" r#"
CREATE TABLE IF NOT EXISTS llm_calls ( CREATE TABLE IF NOT EXISTS llm_calls (
@ -314,15 +325,16 @@ impl Storage {
pub async fn upsert_session(&self, meta: &crate::storage::session::SessionMeta) -> Result<(), StorageError> { pub async fn upsert_session(&self, meta: &crate::storage::session::SessionMeta) -> Result<(), StorageError> {
sqlx::query( sqlx::query(
r#" r#"
INSERT INTO sessions (id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at) INSERT INTO sessions (id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET ON CONFLICT(id) DO UPDATE SET
title = excluded.title, title = excluded.title,
last_active_at = excluded.last_active_at, last_active_at = excluded.last_active_at,
message_count = excluded.message_count, message_count = excluded.message_count,
routing_info = excluded.routing_info, routing_info = excluded.routing_info,
deleted_at = excluded.deleted_at, deleted_at = excluded.deleted_at,
last_consolidated_at = excluded.last_consolidated_at last_consolidated_at = excluded.last_consolidated_at,
last_compressed_message_at = excluded.last_compressed_message_at
"#, "#,
) )
.bind(&meta.id) .bind(&meta.id)
@ -336,6 +348,7 @@ impl Storage {
.bind(&meta.routing_info) .bind(&meta.routing_info)
.bind(meta.deleted_at) .bind(meta.deleted_at)
.bind(meta.last_consolidated_at) .bind(meta.last_consolidated_at)
.bind(meta.last_compressed_message_at)
.execute(self.pool()) .execute(self.pool())
.await?; .await?;
@ -345,7 +358,7 @@ impl Storage {
pub async fn get_session(&self, id: &str) -> Result<crate::storage::session::SessionMeta, StorageError> { pub async fn get_session(&self, id: &str) -> Result<crate::storage::session::SessionMeta, StorageError> {
let row = sqlx::query( let row = sqlx::query(
r#" r#"
SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at
FROM sessions WHERE id = ? AND deleted_at IS NULL FROM sessions WHERE id = ? AND deleted_at IS NULL
"#, "#,
) )
@ -366,6 +379,7 @@ impl Storage {
routing_info: row.get("routing_info"), routing_info: row.get("routing_info"),
deleted_at: row.get("deleted_at"), deleted_at: row.get("deleted_at"),
last_consolidated_at: row.get("last_consolidated_at"), last_consolidated_at: row.get("last_consolidated_at"),
last_compressed_message_at: row.get("last_compressed_message_at"),
}) })
} }
@ -377,7 +391,7 @@ impl Storage {
) -> Result<Vec<crate::storage::session::SessionMeta>, StorageError> { ) -> Result<Vec<crate::storage::session::SessionMeta>, StorageError> {
let rows = sqlx::query( let rows = sqlx::query(
r#" r#"
SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at
FROM sessions FROM sessions
WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL
ORDER BY last_active_at DESC ORDER BY last_active_at DESC
@ -404,6 +418,7 @@ impl Storage {
routing_info: row.get("routing_info"), routing_info: row.get("routing_info"),
deleted_at: row.get("deleted_at"), deleted_at: row.get("deleted_at"),
last_consolidated_at: row.get("last_consolidated_at"), last_consolidated_at: row.get("last_consolidated_at"),
last_compressed_message_at: row.get("last_compressed_message_at"),
}) })
.collect()) .collect())
} }
@ -451,7 +466,7 @@ impl Storage {
let cutoff = chrono::Utc::now().timestamp_millis() - ttl_millis; let cutoff = chrono::Utc::now().timestamp_millis() - ttl_millis;
let row = sqlx::query( let row = sqlx::query(
r#" r#"
SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at
FROM sessions FROM sessions
WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL AND last_active_at > ? WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL AND last_active_at > ?
ORDER BY last_active_at DESC ORDER BY last_active_at DESC
@ -477,6 +492,7 @@ impl Storage {
routing_info: row.get("routing_info"), routing_info: row.get("routing_info"),
deleted_at: row.get("deleted_at"), deleted_at: row.get("deleted_at"),
last_consolidated_at: row.get("last_consolidated_at"), last_consolidated_at: row.get("last_consolidated_at"),
last_compressed_message_at: row.get("last_compressed_message_at"),
})), })),
None => Ok(None), None => Ok(None),
} }
@ -561,7 +577,7 @@ impl Storage {
) -> Result<Vec<crate::storage::session::SessionMeta>, StorageError> { ) -> Result<Vec<crate::storage::session::SessionMeta>, StorageError> {
let rows = sqlx::query( let rows = sqlx::query(
r#" r#"
SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at, last_consolidated_at, last_compressed_message_at
FROM sessions FROM sessions
WHERE deleted_at IS NULL WHERE deleted_at IS NULL
ORDER BY last_active_at DESC ORDER BY last_active_at DESC
@ -586,6 +602,7 @@ impl Storage {
routing_info: row.get("routing_info"), routing_info: row.get("routing_info"),
deleted_at: row.get("deleted_at"), deleted_at: row.get("deleted_at"),
last_consolidated_at: row.get("last_consolidated_at"), last_consolidated_at: row.get("last_consolidated_at"),
last_compressed_message_at: row.get("last_compressed_message_at"),
}) })
.collect()) .collect())
} }
@ -691,6 +708,7 @@ mod tests {
routing_info: Some(r#"{"type":"cli"}"#.to_string()), routing_info: Some(r#"{"type":"cli"}"#.to_string()),
deleted_at: None, deleted_at: None,
last_consolidated_at: None, last_consolidated_at: None,
last_compressed_message_at: None,
}; };
storage.upsert_session(&meta).await.unwrap(); storage.upsert_session(&meta).await.unwrap();
@ -726,6 +744,7 @@ mod tests {
routing_info: None, routing_info: None,
deleted_at: None, deleted_at: None,
last_consolidated_at: None, last_consolidated_at: None,
last_compressed_message_at: None,
}; };
storage.upsert_session(&meta).await.unwrap(); storage.upsert_session(&meta).await.unwrap();
} }
@ -752,6 +771,7 @@ mod tests {
routing_info: None, routing_info: None,
deleted_at: None, deleted_at: None,
last_consolidated_at: None, last_consolidated_at: None,
last_compressed_message_at: None,
}; };
storage.upsert_session(&meta).await.unwrap(); storage.upsert_session(&meta).await.unwrap();
@ -778,6 +798,7 @@ mod tests {
routing_info: None, routing_info: None,
deleted_at: None, deleted_at: None,
last_consolidated_at: None, last_consolidated_at: None,
last_compressed_message_at: None,
}; };
storage.upsert_session(&session_meta).await.unwrap(); storage.upsert_session(&session_meta).await.unwrap();
@ -819,6 +840,7 @@ mod tests {
routing_info: None, routing_info: None,
deleted_at: None, deleted_at: None,
last_consolidated_at: None, last_consolidated_at: None,
last_compressed_message_at: None,
}; };
storage.upsert_session(&meta).await.unwrap(); storage.upsert_session(&meta).await.unwrap();

View File

@ -13,4 +13,5 @@ pub struct SessionMeta {
pub routing_info: Option<String>, pub routing_info: Option<String>,
pub deleted_at: Option<i64>, pub deleted_at: Option<i64>,
pub last_consolidated_at: Option<i64>, pub last_consolidated_at: Option<i64>,
pub last_compressed_message_at: Option<i64>,
} }