feat(session): Phase 2 - 扩展 Session 结构

- Session 新增字段: title, created_at, last_active_at,
  message_count, total_message_count, seq_counter,
  storage, routing_info
- Session::new 新增 storage/routing_info/title 参数
- 新增 Session::from_storage() 从 Storage 恢复 Session
- 新增 Session::add_message_and_persist() 持久化版本
- 新增 Session::send_system_notification() 不记历史的通知
- 新增 Session::persist_session_meta() 写回 Storage
- 新增 Session::should_generate_title() / generate_title()
- 新增 LLM title 自动生成(10 条消息后触发)
- SessionManager::create_session 新增 routing_info 参数
- WsOutbound 新增 SystemNotification variant
- Client mod.rs 处理 SystemNotification
This commit is contained in:
xiaoxixi 2026-04-28 22:25:10 +08:00
parent c17e286db1
commit 5f7ffd28ef
3 changed files with 253 additions and 8 deletions

View File

@ -130,5 +130,8 @@ async fn handle_ws_message(app: &mut App, outbound: WsOutbound) {
WsOutbound::CommandExecuted { message } => { WsOutbound::CommandExecuted { message } => {
app.add_message(MessageRole::System, message); app.add_message(MessageRole::System, message);
} }
WsOutbound::SystemNotification { content } => {
app.add_message(MessageRole::System, content);
}
} }
} }

View File

@ -114,6 +114,8 @@ pub enum WsOutbound {
Pong, Pong,
#[serde(rename = "command_executed")] #[serde(rename = "command_executed")]
CommandExecuted { message: String }, CommandExecuted { message: String },
#[serde(rename = "system_notification")]
SystemNotification { content: String },
} }
pub fn parse_inbound(raw: &str) -> Result<WsInbound, serde_json::Error> { pub fn parse_inbound(raw: &str) -> Result<WsInbound, serde_json::Error> {

View File

@ -6,6 +6,8 @@ use tokio::sync::{Mutex, mpsc};
use uuid::Uuid; use uuid::Uuid;
use crate::bus::ChatMessage; use crate::bus::ChatMessage;
use crate::storage::{Storage, StorageError};
use std::sync::Arc as StdArc;
/// Result of handling a message - either an AI response or a command output /// Result of handling a message - either an AI response or a command output
pub enum HandleResult { pub enum HandleResult {
@ -38,12 +40,23 @@ fn short_id() -> String {
/// 每个 Session 对应一个 UnifiedSessionId有独立的 messages history /// 每个 Session 对应一个 UnifiedSessionId有独立的 messages history
pub struct Session { pub struct Session {
pub id: UnifiedSessionId, pub id: UnifiedSessionId,
pub title: String,
pub created_at: i64,
pub last_active_at: i64,
pub message_count: i64,
pub total_message_count: i64,
messages: Vec<ChatMessage>, messages: Vec<ChatMessage>,
seq_counter: i64,
pub user_tx: mpsc::Sender<WsOutbound>, pub user_tx: mpsc::Sender<WsOutbound>,
provider_config: LLMProviderConfig, provider_config: LLMProviderConfig,
provider: Arc<dyn LLMProvider>, provider: Arc<dyn LLMProvider>,
tools: Arc<ToolRegistry>, tools: Arc<ToolRegistry>,
compressor: ContextCompressor, compressor: ContextCompressor,
storage: Option<StdArc<Storage>>,
routing_info: String,
} }
impl Session { impl Session {
@ -52,6 +65,9 @@ impl Session {
provider_config: LLMProviderConfig, provider_config: LLMProviderConfig,
user_tx: mpsc::Sender<WsOutbound>, user_tx: mpsc::Sender<WsOutbound>,
tools: Arc<ToolRegistry>, tools: Arc<ToolRegistry>,
storage: Option<StdArc<Storage>>,
routing_info: String,
title: String,
) -> Result<Self, AgentError> { ) -> Result<Self, AgentError> {
let provider_box = create_provider(provider_config.clone()) let provider_box = create_provider(provider_config.clone())
.map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?; .map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?;
@ -62,14 +78,83 @@ impl Session {
..Default::default() ..Default::default()
}; };
let now = chrono::Utc::now().timestamp_millis();
Ok(Self { Ok(Self {
id, id: id.clone(),
title,
created_at: now,
last_active_at: now,
message_count: 0,
total_message_count: 0,
messages: Vec::new(), messages: Vec::new(),
seq_counter: 1,
user_tx, user_tx,
provider_config: provider_config.clone(), provider_config: provider_config.clone(),
provider: provider.clone(), provider: provider.clone(),
tools, tools,
compressor: ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config), compressor: ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config),
storage,
routing_info,
})
}
/// 从 Storage 恢复 Session
pub async fn from_storage(
id: UnifiedSessionId,
provider_config: LLMProviderConfig,
user_tx: mpsc::Sender<WsOutbound>,
tools: Arc<ToolRegistry>,
storage: StdArc<Storage>,
) -> Result<Self, AgentError> {
let session_meta = storage.get_session(&id.to_string()).await
.map_err(|e| AgentError::Other(format!("failed to load session from storage: {}", e)))?;
let messages = storage.load_messages(&id.to_string(), 0).await
.map_err(|e| AgentError::Other(format!("failed to load messages from storage: {}", e)))?;
let provider_box = create_provider(provider_config.clone())
.map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?;
let provider: Arc<dyn LLMProvider> = Arc::from(provider_box);
let compressor_config = ContextCompressionConfig {
protect_first_n: 2,
..Default::default()
};
// Convert MessageMeta to ChatMessage
let chat_messages: Vec<ChatMessage> = messages.into_iter().map(|m| {
ChatMessage {
id: m.id,
role: m.role,
content: m.content,
media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(),
timestamp: m.created_at,
tool_call_id: m.tool_call_id,
tool_name: m.tool_name,
tool_calls: m.tool_calls.map(|tc| serde_json::from_str(&tc).unwrap_or_default()),
}
}).collect();
let seq_counter = chat_messages.len() as i64 + 1;
let total_message_count = chat_messages.len() as i64;
Ok(Self {
id: id.clone(),
title: session_meta.title,
created_at: session_meta.created_at,
last_active_at: session_meta.last_active_at,
message_count: session_meta.message_count,
total_message_count,
messages: chat_messages,
seq_counter,
user_tx,
provider_config: provider_config.clone(),
provider: provider.clone(),
tools,
compressor: ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config),
storage: Some(storage),
routing_info: session_meta.routing_info.unwrap_or_default(),
}) })
} }
@ -78,9 +163,64 @@ impl Session {
self.id.to_string() self.id.to_string()
} }
/// 添加消息到历史 /// 添加消息到历史仅内存Phase 3 会扩展为持久化)
pub fn add_message(&mut self, message: ChatMessage) { pub fn add_message(&mut self, message: ChatMessage) {
let is_user = message.role == "user";
let now = chrono::Utc::now().timestamp_millis();
// Assign seq (in-memory only, persistence in Phase 3)
let _seq = self.seq_counter;
self.seq_counter += 1;
// Update in-memory state
self.messages.push(message); self.messages.push(message);
self.total_message_count += 1;
if is_user {
self.message_count += 1;
}
self.last_active_at = now;
}
/// 添加消息到历史并持久化到 StoragePhase 3 使用)
/// 目前 storage 为 None此方法退化为 add_message
pub async fn add_message_and_persist(&mut self, message: ChatMessage) -> Result<(), StorageError> {
let is_user = message.role == "user";
let now = chrono::Utc::now().timestamp_millis();
// Assign seq
let seq = self.seq_counter;
self.seq_counter += 1;
// Persist to Storage (currently None, wired up in Phase 3)
if let Some(ref storage) = self.storage {
let msg_meta = crate::storage::message::MessageMeta {
id: message.id.clone(),
session_id: self.id.to_string(),
seq,
role: message.role.clone(),
content: message.content.clone(),
media_refs: if message.media_refs.is_empty() {
None
} else {
Some(serde_json::to_string(&message.media_refs).unwrap_or_default())
},
tool_call_id: message.tool_call_id.clone(),
tool_name: message.tool_name.clone(),
tool_calls: message.tool_calls.as_ref().map(|tc| serde_json::to_string(tc).unwrap_or_default()),
created_at: now,
};
storage.append_message_with_retry(&self.id.to_string(), &msg_meta).await?;
}
// Update in-memory state
self.messages.push(message);
self.total_message_count += 1;
if is_user {
self.message_count += 1;
}
self.last_active_at = now;
Ok(())
} }
/// 获取消息历史 /// 获取消息历史
@ -92,6 +232,9 @@ impl Session {
pub fn clear_history(&mut self) { pub fn clear_history(&mut self) {
let len = self.messages.len(); let len = self.messages.len();
self.messages.clear(); self.messages.clear();
self.seq_counter = 1;
self.total_message_count = 0;
self.message_count = 0;
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::debug!(session_id = %self.id, previous_len = len, "Chat history cleared"); tracing::debug!(session_id = %self.id, previous_len = len, "Chat history cleared");
} }
@ -100,6 +243,9 @@ impl Session {
pub fn reset_context(&mut self) { pub fn reset_context(&mut self) {
let len = self.messages.len(); let len = self.messages.len();
self.messages.clear(); self.messages.clear();
self.seq_counter = 1;
self.total_message_count = 0;
self.message_count = 0;
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
tracing::debug!(session_id = %self.id, previous_len = len, "Chat context reset in memory"); tracing::debug!(session_id = %self.id, previous_len = len, "Chat context reset in memory");
} }
@ -116,6 +262,93 @@ impl Session {
let _ = self.user_tx.send(msg).await; let _ = self.user_tx.send(msg).await;
} }
/// 发送系统通知(不记录进 session 历史)
pub async fn send_system_notification(&self, content: &str) {
let msg = WsOutbound::SystemNotification {
content: content.to_string(),
};
let _ = self.user_tx.send(msg).await;
}
/// 将 session 元数据写回 Storage
pub async fn persist_session_meta(&self) -> Result<(), StorageError> {
if let Some(ref storage) = self.storage {
let meta = crate::storage::session::SessionMeta {
id: self.id.to_string(),
channel: self.id.channel.clone(),
chat_id: self.id.chat_id.clone(),
dialog_id: self.id.dialog_id.clone(),
title: self.title.clone(),
created_at: self.created_at,
last_active_at: self.last_active_at,
message_count: self.message_count,
routing_info: if self.routing_info.is_empty() {
None
} else {
Some(self.routing_info.clone())
},
deleted_at: None,
};
storage.upsert_session(&meta).await?;
}
Ok(())
}
/// 检查是否需要自动生成 title10 条用户消息后)
pub fn should_generate_title(&self) -> bool {
self.title == "新对话" && self.message_count >= 10
}
/// 生成标题(调用 LLM
pub async fn generate_title(&mut self) -> Result<(), AgentError> {
if !self.should_generate_title() {
return Ok(());
}
let prompt = format!(
r#"给定以下对话历史生成一个简短的会话标题5-15 个中文字符),概括这个对话的核心内容或用户的主要需求。只返回一个标题,不要解释。
{}"#,
self.messages.iter()
.filter(|m| m.role == "user" || m.role == "assistant")
.take(20)
.map(|m| format!("[{}]: {}", m.role, m.content))
.collect::<Vec<_>>()
.join("\n")
);
let title = self.call_llm_for_title(&prompt).await?;
if !title.is_empty() {
self.title = title.clone();
if let Err(e) = self.persist_session_meta().await {
tracing::warn!("failed to persist title: {}", e);
}
}
Ok(())
}
/// 调用 LLM 生成标题
async fn call_llm_for_title(&self, prompt: &str) -> Result<String, AgentError> {
use crate::providers::{ChatCompletionRequest, ChatCompletionResponse, Message};
let request = ChatCompletionRequest {
messages: vec![
Message::user(prompt.to_string())
],
temperature: Some(0.3),
max_tokens: Some(20),
tools: None,
};
let response: ChatCompletionResponse = self.provider.chat(request).await
.map_err(|e| AgentError::Other(format!("LLM call failed: {}", e)))?;
Ok(response.content.trim().to_string())
}
/// 获取 provider_config 引用 /// 获取 provider_config 引用
pub fn provider_config(&self) -> &LLMProviderConfig { pub fn provider_config(&self) -> &LLMProviderConfig {
&self.provider_config &self.provider_config
@ -445,12 +678,12 @@ impl SessionManager {
match cmd.name { match cmd.name {
"new" => { "new" => {
let title = args.map(|s| s.to_string()); let title = args.map(|s| s.to_string());
let (new_id, title) = self.create_session(channel, chat_id, title.as_deref()).await?; let (new_id, title) = self.create_session(channel, chat_id, title.as_deref(), String::new()).await?;
Ok((Some(new_id), format!("New conversation '{}' created.", title))) Ok((Some(new_id), format!("新对话 '{}' 已创建。", title)))
} }
"delete" => { "delete" => {
let (new_id, _title) = self.create_session(channel, chat_id, None).await?; let (new_id, _title) = self.create_session(channel, chat_id, None, String::new()).await?;
Ok((Some(new_id), "Conversation deleted. New conversation created.".to_string())) Ok((Some(new_id), "对话已删除。新对话已创建。".to_string()))
} }
"compact" => { "compact" => {
if let Some(sid) = current_session_id { if let Some(sid) = current_session_id {
@ -513,6 +746,7 @@ impl SessionManager {
channel: &str, channel: &str,
chat_id: &str, chat_id: &str,
title: Option<&str>, title: Option<&str>,
routing_info: String,
) -> Result<(UnifiedSessionId, String), AgentError> { ) -> Result<(UnifiedSessionId, String), AgentError> {
let dialog_id = short_id(); let dialog_id = short_id();
let unified_id = UnifiedSessionId::new(channel, chat_id, &dialog_id); let unified_id = UnifiedSessionId::new(channel, chat_id, &dialog_id);
@ -522,7 +756,7 @@ impl SessionManager {
.map(str::trim) .map(str::trim)
.filter(|value| !value.is_empty()) .filter(|value| !value.is_empty())
.map(ToOwned::to_owned) .map(ToOwned::to_owned)
.unwrap_or_else(|| format!("Dialog {}", &dialog_id)); .unwrap_or_else(|| "新对话".to_string());
let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100); let (user_tx, _rx) = mpsc::channel::<WsOutbound>(100);
let session = Session::new( let session = Session::new(
@ -530,6 +764,9 @@ impl SessionManager {
self.provider_config.clone(), self.provider_config.clone(),
user_tx, user_tx,
self.tools.clone(), self.tools.clone(),
None, // storage injected in Phase 3
routing_info,
title.clone(),
).await?; ).await?;
let arc = Arc::new(Mutex::new(session)); let arc = Arc::new(Mutex::new(session));
@ -556,6 +793,9 @@ impl SessionManager {
self.provider_config.clone(), self.provider_config.clone(),
user_tx, user_tx,
self.tools.clone(), self.tools.clone(),
None, // storage injected in Phase 3
String::new(), // routing_info - set by channel layer in Phase 3
format!("Dialog {}", &unified_id.dialog_id),
).await?; ).await?;
let arc = Arc::new(Mutex::new(session)); let arc = Arc::new(Mutex::new(session));
@ -570,7 +810,7 @@ impl SessionManager {
chat_id: &str, chat_id: &str,
title: Option<&str>, title: Option<&str>,
) -> Result<(UnifiedSessionId, String), AgentError> { ) -> Result<(UnifiedSessionId, String), AgentError> {
self.create_session(channel, chat_id, title).await self.create_session(channel, chat_id, title, String::new()).await
} }
pub async fn get_current_dialog( pub async fn get_current_dialog(