From eeb8a77ac8c55955ced84537191b3136168ce2fd Mon Sep 17 00:00:00 2001 From: xiaoxixi Date: Tue, 28 Apr 2026 22:13:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(storage):=20=E5=AE=9E=E7=8E=B0=20Session?= =?UTF-8?q?=20CRUD=20=E6=93=8D=E4=BD=9C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/storage/mod.rs | 168 ++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 167 insertions(+), 1 deletion(-) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 21035d6..c844177 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -4,7 +4,7 @@ pub mod message; pub use error::StorageError; -use sqlx::{Pool, Sqlite, SqlitePool}; +use sqlx::{Pool, Row, Sqlite, SqlitePool}; use std::path::Path; pub struct Storage { @@ -89,4 +89,170 @@ impl Storage { pub(crate) fn pool(&self) -> &Pool { &self.pool } + + pub async fn upsert_session(&self, meta: &crate::storage::session::SessionMeta) -> Result<(), StorageError> { + sqlx::query( + r#" + INSERT INTO sessions (id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(id) DO UPDATE SET + title = excluded.title, + last_active_at = excluded.last_active_at, + message_count = excluded.message_count, + routing_info = excluded.routing_info, + deleted_at = excluded.deleted_at + "#, + ) + .bind(&meta.id) + .bind(&meta.channel) + .bind(&meta.chat_id) + .bind(&meta.dialog_id) + .bind(&meta.title) + .bind(meta.created_at) + .bind(meta.last_active_at) + .bind(meta.message_count) + .bind(&meta.routing_info) + .bind(meta.deleted_at) + .execute(self.pool()) + .await?; + + Ok(()) + } + + pub async fn get_session(&self, id: &str) -> Result { + let row = sqlx::query( + r#" + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at + FROM sessions WHERE id = ? AND deleted_at IS NULL + "#, + ) + .bind(id) + .fetch_optional(self.pool()) + .await? + .ok_or_else(|| StorageError::NotFound(id.to_string()))?; + + Ok(crate::storage::session::SessionMeta { + id: row.get("id"), + channel: row.get("channel"), + chat_id: row.get("chat_id"), + dialog_id: row.get("dialog_id"), + title: row.get("title"), + created_at: row.get("created_at"), + last_active_at: row.get("last_active_at"), + message_count: row.get("message_count"), + routing_info: row.get("routing_info"), + deleted_at: row.get("deleted_at"), + }) + } + + pub async fn list_sessions( + &self, + channel: &str, + chat_id: &str, + limit: i64, + ) -> Result, StorageError> { + let rows = sqlx::query( + r#" + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at + FROM sessions + WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL + ORDER BY last_active_at DESC + LIMIT ? + "#, + ) + .bind(channel) + .bind(chat_id) + .bind(limit) + .fetch_all(self.pool()) + .await?; + + Ok(rows + .into_iter() + .map(|row| crate::storage::session::SessionMeta { + id: row.get("id"), + channel: row.get("channel"), + chat_id: row.get("chat_id"), + dialog_id: row.get("dialog_id"), + title: row.get("title"), + created_at: row.get("created_at"), + last_active_at: row.get("last_active_at"), + message_count: row.get("message_count"), + routing_info: row.get("routing_info"), + deleted_at: row.get("deleted_at"), + }) + .collect()) + } + + pub async fn touch_session( + &self, + id: &str, + message_count: i64, + last_active_at: i64, + ) -> Result<(), StorageError> { + sqlx::query( + r#" + UPDATE sessions SET message_count = ?, last_active_at = ? + WHERE id = ? + "#, + ) + .bind(message_count) + .bind(last_active_at) + .bind(id) + .execute(self.pool()) + .await?; + + Ok(()) + } + + pub async fn soft_delete_session(&self, id: &str) -> Result<(), StorageError> { + let now = chrono::Utc::now().timestamp_millis(); + sqlx::query( + r#"UPDATE sessions SET deleted_at = ? WHERE id = ?"#, + ) + .bind(now) + .bind(id) + .execute(self.pool()) + .await?; + + Ok(()) + } + + pub async fn find_active_session( + &self, + channel: &str, + chat_id: &str, + ttl_millis: i64, + ) -> Result, StorageError> { + let cutoff = chrono::Utc::now().timestamp_millis() - ttl_millis; + let row = sqlx::query( + r#" + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at + FROM sessions + WHERE channel = ? AND chat_id = ? AND deleted_at IS NULL AND last_active_at > ? + ORDER BY last_active_at DESC + LIMIT 1 + "#, + ) + .bind(channel) + .bind(chat_id) + .bind(cutoff) + .fetch_optional(self.pool()) + .await?; + + match row { + Some(row) => Ok(Some(crate::storage::session::SessionMeta { + id: row.get("id"), + channel: row.get("channel"), + chat_id: row.get("chat_id"), + dialog_id: row.get("dialog_id"), + title: row.get("title"), + created_at: row.get("created_at"), + last_active_at: row.get("last_active_at"), + message_count: row.get("message_count"), + routing_info: row.get("routing_info"), + deleted_at: row.get("deleted_at"), + })), + None => Ok(None), + } + } }