From 5f7ffd28ef85193c75387844d4a5c71854bff8e7 Mon Sep 17 00:00:00 2001 From: xiaoxixi Date: Tue, 28 Apr 2026 22:25:10 +0800 Subject: [PATCH] =?UTF-8?q?feat(session):=20Phase=202=20-=20=E6=89=A9?= =?UTF-8?q?=E5=B1=95=20Session=20=E7=BB=93=E6=9E=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Session 新增字段: title, created_at, last_active_at, message_count, total_message_count, seq_counter, storage, routing_info - Session::new 新增 storage/routing_info/title 参数 - 新增 Session::from_storage() 从 Storage 恢复 Session - 新增 Session::add_message_and_persist() 持久化版本 - 新增 Session::send_system_notification() 不记历史的通知 - 新增 Session::persist_session_meta() 写回 Storage - 新增 Session::should_generate_title() / generate_title() - 新增 LLM title 自动生成(10 条消息后触发) - SessionManager::create_session 新增 routing_info 参数 - WsOutbound 新增 SystemNotification variant - Client mod.rs 处理 SystemNotification --- src/client/mod.rs | 3 + src/protocol.rs | 2 + src/session/session.rs | 256 +++++++++++++++++++++++++++++++++++++++-- 3 files changed, 253 insertions(+), 8 deletions(-) diff --git a/src/client/mod.rs b/src/client/mod.rs index 62879d7..f3e4f69 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -130,5 +130,8 @@ async fn handle_ws_message(app: &mut App, outbound: WsOutbound) { WsOutbound::CommandExecuted { message } => { app.add_message(MessageRole::System, message); } + WsOutbound::SystemNotification { content } => { + app.add_message(MessageRole::System, content); + } } } diff --git a/src/protocol.rs b/src/protocol.rs index b4aa2e7..6b1c5b5 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -114,6 +114,8 @@ pub enum WsOutbound { Pong, #[serde(rename = "command_executed")] CommandExecuted { message: String }, + #[serde(rename = "system_notification")] + SystemNotification { content: String }, } pub fn parse_inbound(raw: &str) -> Result { diff --git a/src/session/session.rs b/src/session/session.rs index d445b79..8481339 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -6,6 +6,8 @@ use tokio::sync::{Mutex, mpsc}; use uuid::Uuid; use crate::bus::ChatMessage; +use crate::storage::{Storage, StorageError}; +use std::sync::Arc as StdArc; /// Result of handling a message - either an AI response or a command output pub enum HandleResult { @@ -38,12 +40,23 @@ fn short_id() -> String { /// 每个 Session 对应一个 UnifiedSessionId,有独立的 messages history pub struct Session { pub id: UnifiedSessionId, + pub title: String, + pub created_at: i64, + pub last_active_at: i64, + pub message_count: i64, + pub total_message_count: i64, + messages: Vec, + seq_counter: i64, + pub user_tx: mpsc::Sender, provider_config: LLMProviderConfig, provider: Arc, tools: Arc, compressor: ContextCompressor, + + storage: Option>, + routing_info: String, } impl Session { @@ -52,6 +65,9 @@ impl Session { provider_config: LLMProviderConfig, user_tx: mpsc::Sender, tools: Arc, + storage: Option>, + routing_info: String, + title: String, ) -> Result { let provider_box = create_provider(provider_config.clone()) .map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?; @@ -62,14 +78,83 @@ impl Session { ..Default::default() }; + let now = chrono::Utc::now().timestamp_millis(); + Ok(Self { - id, + id: id.clone(), + title, + created_at: now, + last_active_at: now, + message_count: 0, + total_message_count: 0, messages: Vec::new(), + seq_counter: 1, user_tx, provider_config: provider_config.clone(), provider: provider.clone(), tools, compressor: ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config), + storage, + routing_info, + }) + } + + /// 从 Storage 恢复 Session + pub async fn from_storage( + id: UnifiedSessionId, + provider_config: LLMProviderConfig, + user_tx: mpsc::Sender, + tools: Arc, + storage: StdArc, + ) -> Result { + let session_meta = storage.get_session(&id.to_string()).await + .map_err(|e| AgentError::Other(format!("failed to load session from storage: {}", e)))?; + + let messages = storage.load_messages(&id.to_string(), 0).await + .map_err(|e| AgentError::Other(format!("failed to load messages from storage: {}", e)))?; + + let provider_box = create_provider(provider_config.clone()) + .map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?; + let provider: Arc = Arc::from(provider_box); + + let compressor_config = ContextCompressionConfig { + protect_first_n: 2, + ..Default::default() + }; + + // Convert MessageMeta to ChatMessage + let chat_messages: Vec = messages.into_iter().map(|m| { + ChatMessage { + id: m.id, + role: m.role, + content: m.content, + media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(), + timestamp: m.created_at, + tool_call_id: m.tool_call_id, + tool_name: m.tool_name, + tool_calls: m.tool_calls.map(|tc| serde_json::from_str(&tc).unwrap_or_default()), + } + }).collect(); + + let seq_counter = chat_messages.len() as i64 + 1; + let total_message_count = chat_messages.len() as i64; + + Ok(Self { + id: id.clone(), + title: session_meta.title, + created_at: session_meta.created_at, + last_active_at: session_meta.last_active_at, + message_count: session_meta.message_count, + total_message_count, + messages: chat_messages, + seq_counter, + user_tx, + provider_config: provider_config.clone(), + provider: provider.clone(), + tools, + compressor: ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config), + storage: Some(storage), + routing_info: session_meta.routing_info.unwrap_or_default(), }) } @@ -78,9 +163,64 @@ impl Session { self.id.to_string() } - /// 添加消息到历史 + /// 添加消息到历史(仅内存,Phase 3 会扩展为持久化) pub fn add_message(&mut self, message: ChatMessage) { + let is_user = message.role == "user"; + let now = chrono::Utc::now().timestamp_millis(); + + // Assign seq (in-memory only, persistence in Phase 3) + let _seq = self.seq_counter; + self.seq_counter += 1; + + // Update in-memory state self.messages.push(message); + self.total_message_count += 1; + if is_user { + self.message_count += 1; + } + self.last_active_at = now; + } + + /// 添加消息到历史并持久化到 Storage(Phase 3 使用) + /// 目前 storage 为 None,此方法退化为 add_message + pub async fn add_message_and_persist(&mut self, message: ChatMessage) -> Result<(), StorageError> { + let is_user = message.role == "user"; + let now = chrono::Utc::now().timestamp_millis(); + + // Assign seq + let seq = self.seq_counter; + self.seq_counter += 1; + + // Persist to Storage (currently None, wired up in Phase 3) + if let Some(ref storage) = self.storage { + let msg_meta = crate::storage::message::MessageMeta { + id: message.id.clone(), + session_id: self.id.to_string(), + seq, + role: message.role.clone(), + content: message.content.clone(), + media_refs: if message.media_refs.is_empty() { + None + } else { + Some(serde_json::to_string(&message.media_refs).unwrap_or_default()) + }, + tool_call_id: message.tool_call_id.clone(), + tool_name: message.tool_name.clone(), + tool_calls: message.tool_calls.as_ref().map(|tc| serde_json::to_string(tc).unwrap_or_default()), + created_at: now, + }; + storage.append_message_with_retry(&self.id.to_string(), &msg_meta).await?; + } + + // Update in-memory state + self.messages.push(message); + self.total_message_count += 1; + if is_user { + self.message_count += 1; + } + self.last_active_at = now; + + Ok(()) } /// 获取消息历史 @@ -92,6 +232,9 @@ impl Session { pub fn clear_history(&mut self) { let len = self.messages.len(); self.messages.clear(); + self.seq_counter = 1; + self.total_message_count = 0; + self.message_count = 0; #[cfg(debug_assertions)] tracing::debug!(session_id = %self.id, previous_len = len, "Chat history cleared"); } @@ -100,6 +243,9 @@ impl Session { pub fn reset_context(&mut self) { let len = self.messages.len(); self.messages.clear(); + self.seq_counter = 1; + self.total_message_count = 0; + self.message_count = 0; #[cfg(debug_assertions)] tracing::debug!(session_id = %self.id, previous_len = len, "Chat context reset in memory"); } @@ -116,6 +262,93 @@ impl Session { let _ = self.user_tx.send(msg).await; } + /// 发送系统通知(不记录进 session 历史) + pub async fn send_system_notification(&self, content: &str) { + let msg = WsOutbound::SystemNotification { + content: content.to_string(), + }; + let _ = self.user_tx.send(msg).await; + } + + /// 将 session 元数据写回 Storage + pub async fn persist_session_meta(&self) -> Result<(), StorageError> { + if let Some(ref storage) = self.storage { + let meta = crate::storage::session::SessionMeta { + id: self.id.to_string(), + channel: self.id.channel.clone(), + chat_id: self.id.chat_id.clone(), + dialog_id: self.id.dialog_id.clone(), + title: self.title.clone(), + created_at: self.created_at, + last_active_at: self.last_active_at, + message_count: self.message_count, + routing_info: if self.routing_info.is_empty() { + None + } else { + Some(self.routing_info.clone()) + }, + deleted_at: None, + }; + storage.upsert_session(&meta).await?; + } + Ok(()) + } + + /// 检查是否需要自动生成 title(10 条用户消息后) + pub fn should_generate_title(&self) -> bool { + self.title == "新对话" && self.message_count >= 10 + } + + /// 生成标题(调用 LLM) + pub async fn generate_title(&mut self) -> Result<(), AgentError> { + if !self.should_generate_title() { + return Ok(()); + } + + let prompt = format!( + r#"给定以下对话历史,生成一个简短的会话标题(5-15 个中文字符),概括这个对话的核心内容或用户的主要需求。只返回一个标题,不要解释。 + +历史: +{}"#, + self.messages.iter() + .filter(|m| m.role == "user" || m.role == "assistant") + .take(20) + .map(|m| format!("[{}]: {}", m.role, m.content)) + .collect::>() + .join("\n") + ); + + let title = self.call_llm_for_title(&prompt).await?; + + if !title.is_empty() { + self.title = title.clone(); + if let Err(e) = self.persist_session_meta().await { + tracing::warn!("failed to persist title: {}", e); + } + } + + Ok(()) + } + + /// 调用 LLM 生成标题 + async fn call_llm_for_title(&self, prompt: &str) -> Result { + use crate::providers::{ChatCompletionRequest, ChatCompletionResponse, Message}; + + let request = ChatCompletionRequest { + messages: vec![ + Message::user(prompt.to_string()) + ], + temperature: Some(0.3), + max_tokens: Some(20), + tools: None, + }; + + let response: ChatCompletionResponse = self.provider.chat(request).await + .map_err(|e| AgentError::Other(format!("LLM call failed: {}", e)))?; + + Ok(response.content.trim().to_string()) + } + /// 获取 provider_config 引用 pub fn provider_config(&self) -> &LLMProviderConfig { &self.provider_config @@ -445,12 +678,12 @@ impl SessionManager { match cmd.name { "new" => { let title = args.map(|s| s.to_string()); - let (new_id, title) = self.create_session(channel, chat_id, title.as_deref()).await?; - Ok((Some(new_id), format!("New conversation '{}' created.", title))) + let (new_id, title) = self.create_session(channel, chat_id, title.as_deref(), String::new()).await?; + Ok((Some(new_id), format!("新对话 '{}' 已创建。", title))) } "delete" => { - let (new_id, _title) = self.create_session(channel, chat_id, None).await?; - Ok((Some(new_id), "Conversation deleted. New conversation created.".to_string())) + let (new_id, _title) = self.create_session(channel, chat_id, None, String::new()).await?; + Ok((Some(new_id), "对话已删除。新对话已创建。".to_string())) } "compact" => { if let Some(sid) = current_session_id { @@ -513,6 +746,7 @@ impl SessionManager { channel: &str, chat_id: &str, title: Option<&str>, + routing_info: String, ) -> Result<(UnifiedSessionId, String), AgentError> { let dialog_id = short_id(); let unified_id = UnifiedSessionId::new(channel, chat_id, &dialog_id); @@ -522,7 +756,7 @@ impl SessionManager { .map(str::trim) .filter(|value| !value.is_empty()) .map(ToOwned::to_owned) - .unwrap_or_else(|| format!("Dialog {}", &dialog_id)); + .unwrap_or_else(|| "新对话".to_string()); let (user_tx, _rx) = mpsc::channel::(100); let session = Session::new( @@ -530,6 +764,9 @@ impl SessionManager { self.provider_config.clone(), user_tx, self.tools.clone(), + None, // storage injected in Phase 3 + routing_info, + title.clone(), ).await?; let arc = Arc::new(Mutex::new(session)); @@ -556,6 +793,9 @@ impl SessionManager { self.provider_config.clone(), user_tx, self.tools.clone(), + None, // storage injected in Phase 3 + String::new(), // routing_info - set by channel layer in Phase 3 + format!("Dialog {}", &unified_id.dialog_id), ).await?; let arc = Arc::new(Mutex::new(session)); @@ -570,7 +810,7 @@ impl SessionManager { chat_id: &str, title: Option<&str>, ) -> Result<(UnifiedSessionId, String), AgentError> { - self.create_session(channel, chat_id, title).await + self.create_session(channel, chat_id, title, String::new()).await } pub async fn get_current_dialog(