feat: 添加原始话题 ID 支持,优化消息保存逻辑以处理用户话题切换

This commit is contained in:
oudecheng 2026-05-20 12:05:58 +08:00
parent 018c104592
commit 8830027cbc
3 changed files with 108 additions and 35 deletions

View File

@ -38,6 +38,7 @@ pub(crate) struct FinalizeAgentResultRequest<'a> {
pub(crate) metadata: &'a HashMap<String, String>, pub(crate) metadata: &'a HashMap<String, String>,
pub(crate) suppress_live_tool_calls: bool, pub(crate) suppress_live_tool_calls: bool,
pub(crate) execution_kind: &'a str, pub(crate) execution_kind: &'a str,
pub(crate) original_topic_id: Option<String>,
} }
pub(crate) struct FinalizedAgentResult { pub(crate) struct FinalizedAgentResult {
@ -78,10 +79,14 @@ impl AgentExecutionService {
session: &mut Session, session: &mut Session,
request: FinalizeAgentResultRequest<'_>, request: FinalizeAgentResultRequest<'_>,
) -> Result<FinalizedAgentResult, AgentError> { ) -> Result<FinalizedAgentResult, AgentError> {
if !session.matches_current_user_turn(request.chat_id, request.user_message) { // 检查是否是最新的用户回合
let is_current_turn =
session.matches_current_user_turn(request.chat_id, request.user_message);
if !is_current_turn {
let (latest_user_id, latest_user_preview, compression_in_flight, history_len) = let (latest_user_id, latest_user_preview, compression_in_flight, history_len) =
session.stale_result_diagnostics(request.chat_id); session.stale_result_diagnostics(request.chat_id);
tracing::warn!( tracing::info!(
channel = %request.channel_name, channel = %request.channel_name,
chat_id = %request.chat_id, chat_id = %request.chat_id,
user_message_id = %request.user_message.id, user_message_id = %request.user_message.id,
@ -90,41 +95,66 @@ impl AgentExecutionService {
compression_in_flight, compression_in_flight,
history_len, history_len,
execution_kind = %request.execution_kind, execution_kind = %request.execution_kind,
"Skipping stale agent result because a newer user message is already present" original_topic_id = ?request.original_topic_id,
"User switched topic during agent execution - saving result to original topic"
); );
return Ok(FinalizedAgentResult {
outbound_messages: Vec::new(),
should_schedule_compaction: false,
});
} }
session // 确定保存消息的话题 ID
.append_persisted_messages(request.chat_id, request.result.emitted_messages.clone())?; // 如果是最新回合,使用当前话题;否则使用原始话题
let target_topic_id = if is_current_turn {
session.current_topic(request.chat_id)
} else {
request.original_topic_id.as_deref()
};
let outbound_messages = request // 将结果消息保存到确定的话题
.result if let Some(topic_id) = target_topic_id {
.emitted_messages if let Err(err) = session.append_messages_to_topic(
.iter() request.chat_id,
.filter(|message| { topic_id,
(!message.is_assistant_tool_call_message() || !request.suppress_live_tool_calls) &request.result.emitted_messages,
&& should_display_message_to_user(self.show_tool_results, message) ) {
}) tracing::error!(
.flat_map(|message| { error = %err,
OutboundMessage::from_chat_message( topic_id = %topic_id,
request.channel_name, "Failed to append messages to topic"
request.chat_id, );
None, // session_id }
None, }
request.metadata,
message, // 只有当是最新回合时才发送 outbound 消息给用户
) // 如果用户已经切换到其他话题,只保存结果,不发送消息(避免打扰)
}) let outbound_messages = if is_current_turn {
.collect(); request
.result
.emitted_messages
.iter()
.filter(|message| {
(!message.is_assistant_tool_call_message() || !request.suppress_live_tool_calls)
&& should_display_message_to_user(self.show_tool_results, message)
})
.flat_map(|message| {
OutboundMessage::from_chat_message(
request.channel_name,
request.chat_id,
None, // session_id
None,
request.metadata,
message,
)
})
.collect()
} else {
Vec::new()
};
// 只有当是最新回合时才触发历史压缩
let should_schedule_compaction = is_current_turn;
Ok(FinalizedAgentResult { Ok(FinalizedAgentResult {
outbound_messages, outbound_messages,
should_schedule_compaction: true, should_schedule_compaction,
}) })
} }
@ -132,7 +162,7 @@ impl AgentExecutionService {
&self, &self,
request: MessageExecutionRequest<'_>, request: MessageExecutionRequest<'_>,
) -> Result<Vec<OutboundMessage>, AgentError> { ) -> Result<Vec<OutboundMessage>, AgentError> {
let (history, agent, user_message, user_message_count) = { let (history, agent, user_message, user_message_count, original_topic_id) = {
let mut session_guard = request.session.lock().await; let mut session_guard = request.session.lock().await;
session_guard.ensure_persistent_session(request.chat_id)?; session_guard.ensure_persistent_session(request.chat_id)?;
@ -156,6 +186,11 @@ impl AgentExecutionService {
let history_before = session_guard.get_or_create_history(request.chat_id).clone(); let history_before = session_guard.get_or_create_history(request.chat_id).clone();
let user_message_count = history_before.iter().filter(|m| m.role == "user").count(); let user_message_count = history_before.iter().filter(|m| m.role == "user").count();
// 在添加用户消息前,记录当前话题 ID
let original_topic_id = session_guard
.current_topic(request.chat_id)
.map(|s| s.to_string());
let user_message = session_guard.create_user_message(&enriched_content, media_refs); let user_message = session_guard.create_user_message(&enriched_content, media_refs);
session_guard.append_persisted_message(request.chat_id, user_message.clone())?; session_guard.append_persisted_message(request.chat_id, user_message.clone())?;
@ -172,7 +207,7 @@ impl AgentExecutionService {
agent = agent.with_emitted_message_handler(handler); agent = agent.with_emitted_message_handler(handler);
} }
(history, agent, user_message, user_message_count) (history, agent, user_message, user_message_count, original_topic_id)
}; };
// 构建系统提示词上下文 // 构建系统提示词上下文
@ -195,6 +230,7 @@ impl AgentExecutionService {
metadata: &metadata, metadata: &metadata,
suppress_live_tool_calls: request.live_emitter.is_some(), suppress_live_tool_calls: request.live_emitter.is_some(),
execution_kind: "message", execution_kind: "message",
original_topic_id,
}, },
) )
.await .await
@ -204,7 +240,7 @@ impl AgentExecutionService {
&self, &self,
request: ScheduledExecutionRequest<'_>, request: ScheduledExecutionRequest<'_>,
) -> Result<Vec<OutboundMessage>, AgentError> { ) -> Result<Vec<OutboundMessage>, AgentError> {
let (history, agent, user_message, user_message_count) = { let (history, agent, user_message, user_message_count, original_topic_id) = {
let mut session_guard = request.session.lock().await; let mut session_guard = request.session.lock().await;
session_guard.ensure_persistent_session(request.chat_id)?; session_guard.ensure_persistent_session(request.chat_id)?;
@ -230,6 +266,11 @@ impl AgentExecutionService {
let history_before = session_guard.get_or_create_history(request.chat_id).clone(); let history_before = session_guard.get_or_create_history(request.chat_id).clone();
let user_message_count = history_before.iter().filter(|m| m.role == "user").count(); let user_message_count = history_before.iter().filter(|m| m.role == "user").count();
// 在添加用户消息前,记录当前话题 ID
let original_topic_id = session_guard
.current_topic(request.chat_id)
.map(|s| s.to_string());
let user_message = session_guard.create_user_message(request.prompt, Vec::new()); let user_message = session_guard.create_user_message(request.prompt, Vec::new());
session_guard.append_persisted_message(request.chat_id, user_message.clone())?; session_guard.append_persisted_message(request.chat_id, user_message.clone())?;
@ -239,13 +280,13 @@ impl AgentExecutionService {
let agent = session_guard.create_agent_with_provider_config( let agent = session_guard.create_agent_with_provider_config(
request.chat_id, request.chat_id,
request.notification_chat_id, // 传入真实 chat_id request.notification_chat_id, // 传入真实 chat_id
Some(request.sender_id), Some(request.sender_id),
Some(&user_message.id), Some(&user_message.id),
request.provider_config.clone(), request.provider_config.clone(),
)?; )?;
(history, agent, user_message, user_message_count) (history, agent, user_message, user_message_count, original_topic_id)
}; };
// 构建系统提示词上下文 // 构建系统提示词上下文
@ -267,6 +308,7 @@ impl AgentExecutionService {
metadata: request.metadata, metadata: request.metadata,
suppress_live_tool_calls: false, suppress_live_tool_calls: false,
execution_kind: "scheduled_task", execution_kind: "scheduled_task",
original_topic_id,
}, },
) )
.await .await

View File

@ -305,6 +305,16 @@ impl Session {
self.history.append_persisted_messages(chat_id, messages) self.history.append_persisted_messages(chat_id, messages)
} }
/// 将消息保存到指定话题(直接写入数据库,不更新内存历史)
pub fn append_messages_to_topic(
&self,
chat_id: &str,
topic_id: &str,
messages: &[ChatMessage],
) -> Result<(), AgentError> {
self.history.append_to_topic(chat_id, topic_id, messages)
}
pub fn create_user_message(&self, content: &str, media_refs: Vec<String>) -> ChatMessage { pub fn create_user_message(&self, content: &str, media_refs: Vec<String>) -> ChatMessage {
if media_refs.is_empty() { if media_refs.is_empty() {
ChatMessage::user(content) ChatMessage::user(content)

View File

@ -212,6 +212,27 @@ impl SessionHistory {
Ok(()) Ok(())
} }
/// 将消息保存到指定话题(直接写入数据库,不更新内存历史)
/// 用于异步执行结果保存到原始话题的场景
pub(crate) fn append_to_topic(
&self,
chat_id: &str,
topic_id: &str,
messages: &[ChatMessage],
) -> Result<(), AgentError> {
let session_id = self.persistent_session_id(chat_id);
for message in messages {
self.conversations
.append_message_with_topic(&session_id, Some(topic_id), message)
.map_err(|err| {
AgentError::Other(format!("append message to topic error: {}", err))
})?;
}
Ok(())
}
pub(crate) fn latest_user_message(&self, chat_id: &str) -> Option<&ChatMessage> { pub(crate) fn latest_user_message(&self, chat_id: &str) -> Option<&ChatMessage> {
self.get_history(chat_id) self.get_history(chat_id)
.and_then(|history| history.iter().rev().find(|message| message.role == "user")) .and_then(|history| history.iter().rev().find(|message| message.role == "user"))