PicoBot/src/gateway/session_history.rs
oudecheng d0051baa07 refactor: 消息持久化从批量改为实时逐条,通过装饰器模式实现
- 新增 PersistingEmittedMessageHandler 装饰器,在 emitter 广播前逐条落库
- processor 和 task/runtime 使用装饰器包裹 emitter,替代 post-loop 批量写入
- 移除 session_history 中的批量 DB 写入,仅保留内存历史更新
- execution 中跳过已由 live emitter 实时广播的工具消息,避免重复
- 前端支持运行中 task 工具卡片"查看实时进度"跳转子智能体视图

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-29 16:47:57 +08:00

275 lines
9.5 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use crate::agent::AgentError;
use crate::bus::ChatMessage;
use crate::storage::{
ConversationRepository, SessionRecord, SkillEventRepository, persistent_session_id,
};
fn preview_text(content: &str, max_chars: usize) -> String {
let mut preview = content.chars().take(max_chars).collect::<String>();
if content.chars().count() > max_chars {
preview.push_str("...");
}
preview.replace('\n', "\\n")
}
pub(crate) struct SessionHistory {
channel_name: String,
chat_histories: HashMap<String, Vec<ChatMessage>>,
chat_topic_ids: HashMap<String, String>, // 每个 chat 的当前 topic
history_topic_ids: HashMap<String, String>, // 每个 chat 的历史所对应的话题
compression_in_flight: HashSet<String>,
conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>,
}
impl SessionHistory {
pub(crate) fn new(
channel_name: impl Into<String>,
conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>,
) -> Self {
Self {
channel_name: channel_name.into(),
chat_histories: HashMap::new(),
chat_topic_ids: HashMap::new(),
history_topic_ids: HashMap::new(),
compression_in_flight: HashSet::new(),
conversations,
skill_events,
}
}
pub(crate) fn persistent_session_id(&self, chat_id: &str) -> String {
persistent_session_id(&self.channel_name, chat_id)
}
pub(crate) fn ensure_persistent_session(
&self,
chat_id: &str,
) -> Result<SessionRecord, AgentError> {
self.conversations
.ensure_channel_session(&self.channel_name, chat_id)
.map_err(|err| AgentError::Other(format!("session persistence error: {}", err)))
}
pub(crate) fn ensure_chat_loaded(
&mut self,
chat_id: &str,
topic_id: Option<&str>,
) -> Result<(), AgentError> {
if self.chat_histories.contains_key(chat_id) {
return Ok(());
}
// 如果提供了 topic_id按 topic 加载;否则按 session 加载
let history = if let Some(tid) = topic_id {
self.conversations
.load_messages_for_topic(tid)
.map_err(|err| AgentError::Other(format!("session history load error: {}", err)))?
} else {
self.conversations
.load_messages(&self.persistent_session_id(chat_id))
.map_err(|err| AgentError::Other(format!("session history load error: {}", err)))?
};
self.chat_histories.insert(chat_id.to_string(), history);
Ok(())
}
pub(crate) fn ensure_agent_prompt_before_user_message(
&mut self,
_chat_id: &str,
) -> Result<(), AgentError> {
// 提示词现在由 AgentPromptProvider 统一处理,不需要在此处注入
Ok(())
}
pub(crate) fn get_or_create_history(&mut self, chat_id: &str) -> &mut Vec<ChatMessage> {
self.chat_histories.entry(chat_id.to_string()).or_default()
}
pub(crate) fn get_history(&self, chat_id: &str) -> Option<&Vec<ChatMessage>> {
self.chat_histories.get(chat_id)
}
pub(crate) fn set_history(&mut self, chat_id: &str, history: Vec<ChatMessage>) {
self.chat_histories.insert(chat_id.to_string(), history);
// 记录历史对应的话题(当前设置的话题)
if let Some(topic_id) = self.chat_topic_ids.get(chat_id) {
self.history_topic_ids.insert(chat_id.to_string(), topic_id.clone());
}
}
/// 获取指定 chat 的历史所对应的话题
pub(crate) fn history_topic(&self, chat_id: &str) -> Option<&str> {
self.history_topic_ids.get(chat_id).map(|s| s.as_str())
}
/// 设置指定 chat 的当前 topic
pub(crate) fn set_chat_topic(&mut self, chat_id: &str, topic_id: String) {
self.chat_topic_ids.insert(chat_id.to_string(), topic_id);
}
/// 获取指定 chat 的当前 topic
pub(crate) fn chat_topic(&self, chat_id: &str) -> Option<&str> {
self.chat_topic_ids.get(chat_id).map(|s| s.as_str())
}
/// 清除指定 chat 的 topic
pub(crate) fn clear_chat_topic(&mut self, chat_id: &str) {
self.chat_topic_ids.remove(chat_id);
}
pub(crate) fn add_message(&mut self, chat_id: &str, message: ChatMessage) {
self.get_or_create_history(chat_id).push(message);
}
pub(crate) fn remove_history(&mut self, chat_id: &str) {
self.chat_histories.remove(chat_id);
self.compression_in_flight.remove(chat_id);
self.history_topic_ids.remove(chat_id);
}
pub(crate) fn clear_chat_history(&mut self, chat_id: &str) -> Result<(), AgentError> {
if let Some(history) = self.chat_histories.get_mut(chat_id) {
let len = history.len();
history.clear();
#[cfg(debug_assertions)]
tracing::debug!(chat_id = %chat_id, previous_len = len, "Chat history cleared");
}
self.conversations
.clear_messages(&self.persistent_session_id(chat_id))
.map_err(|err| AgentError::Other(format!("clear history persistence error: {}", err)))
}
pub(crate) fn append_persisted_messages<I>(
&mut self,
chat_id: &str,
messages: I,
) -> Result<(), AgentError>
where
I: IntoIterator<Item = ChatMessage>,
{
let messages: Vec<ChatMessage> = messages.into_iter().collect();
if messages.is_empty() {
return Ok(());
}
for message in messages {
self.add_message(chat_id, message);
}
Ok(())
}
/// 将消息保存到指定话题
/// 每条消息已通过 PersistingEmittedMessageHandler 逐条持久化,此处仅保留接口兼容
pub(crate) fn append_to_topic(
&self,
_chat_id: &str,
_topic_id: &str,
messages: &[ChatMessage],
) -> Result<(), AgentError> {
if messages.is_empty() {
return Ok(());
}
Ok(())
}
pub(crate) fn latest_user_message(&self, chat_id: &str) -> Option<&ChatMessage> {
self.get_history(chat_id)
.and_then(|history| history.iter().rev().find(|message| message.role == "user"))
}
pub(crate) fn matches_current_user_turn(&self, chat_id: &str, message: &ChatMessage) -> bool {
self.latest_user_message(chat_id)
.map(|current| {
current.id == message.id
|| (current.content == message.content
&& current.timestamp == message.timestamp
&& current.media_refs == message.media_refs)
})
.unwrap_or(false)
}
pub(crate) fn stale_result_diagnostics(
&self,
chat_id: &str,
) -> (Option<&str>, Option<String>, bool, usize) {
let latest_user = self.latest_user_message(chat_id);
let latest_user_id = latest_user.map(|message| message.id.as_str());
let latest_user_preview = latest_user.map(|message| preview_text(&message.content, 80));
let compression_in_flight = self.compression_in_flight.contains(chat_id);
let history_len = self
.get_history(chat_id)
.map(|history| history.len())
.unwrap_or(0);
(
latest_user_id,
latest_user_preview,
compression_in_flight,
history_len,
)
}
pub(crate) fn clear_all_history(&mut self) -> Result<(), AgentError> {
let chat_ids: Vec<String> = self.chat_histories.keys().cloned().collect();
let total: usize = self.chat_histories.values().map(|h| h.len()).sum();
self.chat_histories.clear();
self.compression_in_flight.clear();
#[cfg(debug_assertions)]
tracing::debug!(previous_total = total, "All chat histories cleared");
for chat_id in chat_ids {
self.conversations
.clear_messages(&self.persistent_session_id(&chat_id))
.map_err(|err| {
AgentError::Other(format!("clear history persistence error: {}", err))
})?;
}
Ok(())
}
pub(crate) fn try_start_background_compaction(&mut self, chat_id: &str) -> bool {
self.compression_in_flight.insert(chat_id.to_string())
}
pub(crate) fn finish_background_compaction(&mut self, chat_id: &str) {
self.compression_in_flight.remove(chat_id);
}
pub(crate) fn reload_chat_history(&mut self, chat_id: &str) -> Result<(), AgentError> {
let history = self
.conversations
.load_messages(&self.persistent_session_id(chat_id))
.map_err(|err| AgentError::Other(format!("session history reload error: {}", err)))?;
self.chat_histories.insert(chat_id.to_string(), history);
Ok(())
}
pub(crate) fn conversations(&self) -> Arc<dyn ConversationRepository> {
self.conversations.clone()
}
pub(crate) fn append_skill_event(
&self,
chat_id: &str,
event_type: &str,
skill_name: Option<&str>,
payload: &serde_json::Value,
) -> Result<(), AgentError> {
self.skill_events
.append_skill_event(
Some(&self.persistent_session_id(chat_id)),
event_type,
skill_name,
payload,
)
.map_err(|err| AgentError::Other(format!("append skill event error: {}", err)))
}
}