feat: 重构用户消息处理逻辑,添加最新用户消息匹配功能并优化历史记录诊断
This commit is contained in:
parent
4e1b831948
commit
137a62f1cc
@ -635,14 +635,13 @@ impl Session {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn latest_user_message_id(&self, chat_id: &str) -> Option<&str> {
|
fn latest_user_message_id(&self, chat_id: &str) -> Option<&str> {
|
||||||
|
self.latest_user_message(chat_id)
|
||||||
|
.map(|message| message.id.as_str())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn latest_user_message(&self, chat_id: &str) -> Option<&ChatMessage> {
|
||||||
self.get_history(chat_id)
|
self.get_history(chat_id)
|
||||||
.and_then(|history| {
|
.and_then(|history| history.iter().rev().find(|message| message.role == "user"))
|
||||||
history
|
|
||||||
.iter()
|
|
||||||
.rev()
|
|
||||||
.find(|message| message.role == "user")
|
|
||||||
.map(|message| message.id.as_str())
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_latest_user_message(&self, chat_id: &str, message_id: &str) -> bool {
|
fn is_latest_user_message(&self, chat_id: &str, message_id: &str) -> bool {
|
||||||
@ -651,6 +650,32 @@ impl Session {
|
|||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn matches_current_user_turn(&self, chat_id: &str, message: &ChatMessage) -> bool {
|
||||||
|
self.latest_user_message(chat_id)
|
||||||
|
.map(|current| {
|
||||||
|
current.id == message.id
|
||||||
|
|| (current.content == message.content
|
||||||
|
&& current.timestamp == message.timestamp
|
||||||
|
&& current.media_refs == message.media_refs)
|
||||||
|
})
|
||||||
|
.unwrap_or(false)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stale_result_diagnostics(&self, chat_id: &str) -> (Option<&str>, Option<String>, bool, usize) {
|
||||||
|
let latest_user = self.latest_user_message(chat_id);
|
||||||
|
let latest_user_id = latest_user.map(|message| message.id.as_str());
|
||||||
|
let latest_user_preview = latest_user.map(|message| preview_text(&message.content, 80));
|
||||||
|
let compression_in_flight = self.compression_in_flight.contains(chat_id);
|
||||||
|
let history_len = self.get_history(chat_id).map(|history| history.len()).unwrap_or(0);
|
||||||
|
|
||||||
|
(
|
||||||
|
latest_user_id,
|
||||||
|
latest_user_preview,
|
||||||
|
compression_in_flight,
|
||||||
|
history_len,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
/// 清除所有历史
|
/// 清除所有历史
|
||||||
pub fn clear_all_history(&mut self) -> Result<(), AgentError> {
|
pub fn clear_all_history(&mut self) -> Result<(), AgentError> {
|
||||||
let chat_ids: Vec<String> = self.chat_histories.keys().cloned().collect();
|
let chat_ids: Vec<String> = self.chat_histories.keys().cloned().collect();
|
||||||
@ -1370,7 +1395,7 @@ impl SessionManager {
|
|||||||
.ok_or_else(|| AgentError::Other("Session not found".to_string()))?;
|
.ok_or_else(|| AgentError::Other("Session not found".to_string()))?;
|
||||||
|
|
||||||
// 处理消息
|
// 处理消息
|
||||||
let (history, agent, user_message_id) = {
|
let (history, agent, user_message) = {
|
||||||
let mut session_guard = session.lock().await;
|
let mut session_guard = session.lock().await;
|
||||||
|
|
||||||
session_guard.ensure_persistent_session(chat_id)?;
|
session_guard.ensure_persistent_session(chat_id)?;
|
||||||
@ -1396,19 +1421,18 @@ impl SessionManager {
|
|||||||
}
|
}
|
||||||
let enriched_content = enrich_user_content_with_media_refs(content, &media_refs)?;
|
let enriched_content = enrich_user_content_with_media_refs(content, &media_refs)?;
|
||||||
let user_message = session_guard.create_user_message(&enriched_content, media_refs);
|
let user_message = session_guard.create_user_message(&enriched_content, media_refs);
|
||||||
let user_message_id = user_message.id.clone();
|
session_guard.append_persisted_message(chat_id, user_message.clone())?;
|
||||||
session_guard.append_persisted_message(chat_id, user_message)?;
|
|
||||||
|
|
||||||
let history = session_guard.get_or_create_history(chat_id).clone();
|
let history = session_guard.get_or_create_history(chat_id).clone();
|
||||||
session_guard.record_skill_offer(chat_id)?;
|
session_guard.record_skill_offer(chat_id)?;
|
||||||
|
|
||||||
// 创建 agent 并处理
|
// 创建 agent 并处理
|
||||||
let mut agent = session_guard.create_agent(chat_id, Some(sender_id), Some(&user_message_id))?;
|
let mut agent = session_guard.create_agent(chat_id, Some(sender_id), Some(&user_message.id))?;
|
||||||
if let Some(handler) = live_emitter.clone() {
|
if let Some(handler) = live_emitter.clone() {
|
||||||
agent = agent.with_emitted_message_handler(handler);
|
agent = agent.with_emitted_message_handler(handler);
|
||||||
}
|
}
|
||||||
|
|
||||||
(history, agent, user_message_id)
|
(history, agent, user_message)
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = agent.process(history).await?;
|
let result = agent.process(history).await?;
|
||||||
@ -1417,11 +1441,17 @@ impl SessionManager {
|
|||||||
let response = {
|
let response = {
|
||||||
let mut session_guard = session.lock().await;
|
let mut session_guard = session.lock().await;
|
||||||
|
|
||||||
if !session_guard.is_latest_user_message(chat_id, &user_message_id) {
|
if !session_guard.matches_current_user_turn(chat_id, &user_message) {
|
||||||
|
let (latest_user_id, latest_user_preview, compression_in_flight, history_len) =
|
||||||
|
session_guard.stale_result_diagnostics(chat_id);
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
channel = %channel_name,
|
channel = %channel_name,
|
||||||
chat_id = %chat_id,
|
chat_id = %chat_id,
|
||||||
user_message_id = %user_message_id,
|
user_message_id = %user_message.id,
|
||||||
|
latest_user_id,
|
||||||
|
latest_user_preview,
|
||||||
|
compression_in_flight,
|
||||||
|
history_len,
|
||||||
"Skipping stale agent result because a newer user message is already present"
|
"Skipping stale agent result because a newer user message is already present"
|
||||||
);
|
);
|
||||||
Vec::new()
|
Vec::new()
|
||||||
@ -1488,7 +1518,7 @@ impl SessionManager {
|
|||||||
.unwrap_or_else(|| "scheduler".to_string());
|
.unwrap_or_else(|| "scheduler".to_string());
|
||||||
let provider_config = self.provider_config_for_agent(options.agent.as_deref())?;
|
let provider_config = self.provider_config_for_agent(options.agent.as_deref())?;
|
||||||
|
|
||||||
let (history, agent, user_message_id) = {
|
let (history, agent, user_message) = {
|
||||||
let mut session_guard = session.lock().await;
|
let mut session_guard = session.lock().await;
|
||||||
|
|
||||||
session_guard.ensure_persistent_session(chat_id)?;
|
session_guard.ensure_persistent_session(chat_id)?;
|
||||||
@ -1511,8 +1541,7 @@ impl SessionManager {
|
|||||||
)?;
|
)?;
|
||||||
|
|
||||||
let user_message = session_guard.create_user_message(prompt, Vec::new());
|
let user_message = session_guard.create_user_message(prompt, Vec::new());
|
||||||
let user_message_id = user_message.id.clone();
|
session_guard.append_persisted_message(chat_id, user_message.clone())?;
|
||||||
session_guard.append_persisted_message(chat_id, user_message)?;
|
|
||||||
|
|
||||||
let history = session_guard.get_or_create_history(chat_id).clone();
|
let history = session_guard.get_or_create_history(chat_id).clone();
|
||||||
|
|
||||||
@ -1521,11 +1550,11 @@ impl SessionManager {
|
|||||||
let agent = session_guard.create_agent_with_provider_config(
|
let agent = session_guard.create_agent_with_provider_config(
|
||||||
chat_id,
|
chat_id,
|
||||||
Some(&sender_id),
|
Some(&sender_id),
|
||||||
Some(&user_message_id),
|
Some(&user_message.id),
|
||||||
provider_config.clone(),
|
provider_config.clone(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
(history, agent, user_message_id)
|
(history, agent, user_message)
|
||||||
};
|
};
|
||||||
|
|
||||||
let result = agent.process(history).await?;
|
let result = agent.process(history).await?;
|
||||||
@ -1534,11 +1563,17 @@ impl SessionManager {
|
|||||||
let response = {
|
let response = {
|
||||||
let mut session_guard = session.lock().await;
|
let mut session_guard = session.lock().await;
|
||||||
|
|
||||||
if !session_guard.is_latest_user_message(chat_id, &user_message_id) {
|
if !session_guard.matches_current_user_turn(chat_id, &user_message) {
|
||||||
|
let (latest_user_id, latest_user_preview, compression_in_flight, history_len) =
|
||||||
|
session_guard.stale_result_diagnostics(chat_id);
|
||||||
tracing::warn!(
|
tracing::warn!(
|
||||||
channel = %channel_name,
|
channel = %channel_name,
|
||||||
chat_id = %chat_id,
|
chat_id = %chat_id,
|
||||||
user_message_id = %user_message_id,
|
user_message_id = %user_message.id,
|
||||||
|
latest_user_id,
|
||||||
|
latest_user_preview,
|
||||||
|
compression_in_flight,
|
||||||
|
history_len,
|
||||||
"Skipping stale scheduled agent result because a newer user message is already present"
|
"Skipping stale scheduled agent result because a newer user message is already present"
|
||||||
);
|
);
|
||||||
Vec::new()
|
Vec::new()
|
||||||
@ -1742,6 +1777,67 @@ mod tests {
|
|||||||
assert!(session.is_latest_user_message("chat-1", &second_id));
|
assert!(session.is_latest_user_message("chat-1", &second_id));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_current_user_turn_match_survives_history_compaction_reload() {
|
||||||
|
let store = Arc::new(SessionStore::in_memory().unwrap());
|
||||||
|
let (user_tx, _user_rx) = mpsc::channel(4);
|
||||||
|
let skills = Arc::new(SkillRuntime::default());
|
||||||
|
let tools = Arc::new(default_tools(
|
||||||
|
skills.clone(),
|
||||||
|
store.clone(),
|
||||||
|
HashSet::new(),
|
||||||
|
"Asia/Shanghai".to_string(),
|
||||||
|
));
|
||||||
|
let mut session = Session::new(
|
||||||
|
"feishu".to_string(),
|
||||||
|
test_provider_config(),
|
||||||
|
user_tx,
|
||||||
|
tools,
|
||||||
|
skills,
|
||||||
|
store.clone(),
|
||||||
|
100,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
session.ensure_persistent_session("chat-1").unwrap();
|
||||||
|
session.ensure_chat_loaded("chat-1").unwrap();
|
||||||
|
|
||||||
|
let first = session.create_user_message("first", Vec::new());
|
||||||
|
let first_id = first.id.clone();
|
||||||
|
session.append_persisted_message("chat-1", first).unwrap();
|
||||||
|
session
|
||||||
|
.append_persisted_message("chat-1", ChatMessage::assistant("answer-1"))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let second = session.create_user_message("second", Vec::new());
|
||||||
|
session.append_persisted_message("chat-1", second.clone()).unwrap();
|
||||||
|
session
|
||||||
|
.append_persisted_message("chat-1", ChatMessage::assistant("answer-2"))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
let session_id = session.persistent_session_id("chat-1");
|
||||||
|
let snapshot_end_seq = store.get_session(&session_id).unwrap().unwrap().message_count;
|
||||||
|
let preserved_messages = session.get_history("chat-1").unwrap().clone();
|
||||||
|
|
||||||
|
store
|
||||||
|
.compact_active_history(
|
||||||
|
&session_id,
|
||||||
|
0,
|
||||||
|
snapshot_end_seq,
|
||||||
|
&[],
|
||||||
|
&ChatMessage::system("[Compressed History]\n\nsummary"),
|
||||||
|
&preserved_messages,
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
session.reload_chat_history("chat-1").unwrap();
|
||||||
|
|
||||||
|
assert!(!session.is_latest_user_message("chat-1", &first_id));
|
||||||
|
assert!(!session.is_latest_user_message("chat-1", &second.id));
|
||||||
|
assert!(session.matches_current_user_turn("chat-1", &second));
|
||||||
|
}
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn test_select_provider_config_falls_back_to_default() {
|
fn test_select_provider_config_falls_back_to_default() {
|
||||||
let default_provider = test_provider_config_named("default-provider", "default-model");
|
let default_provider = test_provider_config_named("default-provider", "default-model");
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user