diff --git a/src/gateway/session.rs b/src/gateway/session.rs index ec7ed88..0763fd9 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -635,14 +635,13 @@ impl Session { } fn latest_user_message_id(&self, chat_id: &str) -> Option<&str> { + self.latest_user_message(chat_id) + .map(|message| message.id.as_str()) + } + + fn latest_user_message(&self, chat_id: &str) -> Option<&ChatMessage> { self.get_history(chat_id) - .and_then(|history| { - history - .iter() - .rev() - .find(|message| message.role == "user") - .map(|message| message.id.as_str()) - }) + .and_then(|history| history.iter().rev().find(|message| message.role == "user")) } fn is_latest_user_message(&self, chat_id: &str, message_id: &str) -> bool { @@ -651,6 +650,32 @@ impl Session { .unwrap_or(false) } + fn matches_current_user_turn(&self, chat_id: &str, message: &ChatMessage) -> bool { + self.latest_user_message(chat_id) + .map(|current| { + current.id == message.id + || (current.content == message.content + && current.timestamp == message.timestamp + && current.media_refs == message.media_refs) + }) + .unwrap_or(false) + } + + fn stale_result_diagnostics(&self, chat_id: &str) -> (Option<&str>, Option, bool, usize) { + let latest_user = self.latest_user_message(chat_id); + let latest_user_id = latest_user.map(|message| message.id.as_str()); + let latest_user_preview = latest_user.map(|message| preview_text(&message.content, 80)); + let compression_in_flight = self.compression_in_flight.contains(chat_id); + let history_len = self.get_history(chat_id).map(|history| history.len()).unwrap_or(0); + + ( + latest_user_id, + latest_user_preview, + compression_in_flight, + history_len, + ) + } + /// 清除所有历史 pub fn clear_all_history(&mut self) -> Result<(), AgentError> { let chat_ids: Vec = self.chat_histories.keys().cloned().collect(); @@ -1370,7 +1395,7 @@ impl SessionManager { .ok_or_else(|| AgentError::Other("Session not found".to_string()))?; // 处理消息 - let (history, agent, user_message_id) = { + let (history, agent, user_message) = { let mut session_guard = session.lock().await; session_guard.ensure_persistent_session(chat_id)?; @@ -1396,19 +1421,18 @@ impl SessionManager { } let enriched_content = enrich_user_content_with_media_refs(content, &media_refs)?; let user_message = session_guard.create_user_message(&enriched_content, media_refs); - let user_message_id = user_message.id.clone(); - session_guard.append_persisted_message(chat_id, user_message)?; + session_guard.append_persisted_message(chat_id, user_message.clone())?; let history = session_guard.get_or_create_history(chat_id).clone(); session_guard.record_skill_offer(chat_id)?; // 创建 agent 并处理 - let mut agent = session_guard.create_agent(chat_id, Some(sender_id), Some(&user_message_id))?; + let mut agent = session_guard.create_agent(chat_id, Some(sender_id), Some(&user_message.id))?; if let Some(handler) = live_emitter.clone() { agent = agent.with_emitted_message_handler(handler); } - (history, agent, user_message_id) + (history, agent, user_message) }; let result = agent.process(history).await?; @@ -1417,11 +1441,17 @@ impl SessionManager { let response = { let mut session_guard = session.lock().await; - if !session_guard.is_latest_user_message(chat_id, &user_message_id) { + if !session_guard.matches_current_user_turn(chat_id, &user_message) { + let (latest_user_id, latest_user_preview, compression_in_flight, history_len) = + session_guard.stale_result_diagnostics(chat_id); tracing::warn!( channel = %channel_name, chat_id = %chat_id, - user_message_id = %user_message_id, + user_message_id = %user_message.id, + latest_user_id, + latest_user_preview, + compression_in_flight, + history_len, "Skipping stale agent result because a newer user message is already present" ); Vec::new() @@ -1488,7 +1518,7 @@ impl SessionManager { .unwrap_or_else(|| "scheduler".to_string()); let provider_config = self.provider_config_for_agent(options.agent.as_deref())?; - let (history, agent, user_message_id) = { + let (history, agent, user_message) = { let mut session_guard = session.lock().await; session_guard.ensure_persistent_session(chat_id)?; @@ -1511,8 +1541,7 @@ impl SessionManager { )?; let user_message = session_guard.create_user_message(prompt, Vec::new()); - let user_message_id = user_message.id.clone(); - session_guard.append_persisted_message(chat_id, user_message)?; + session_guard.append_persisted_message(chat_id, user_message.clone())?; let history = session_guard.get_or_create_history(chat_id).clone(); @@ -1521,11 +1550,11 @@ impl SessionManager { let agent = session_guard.create_agent_with_provider_config( chat_id, Some(&sender_id), - Some(&user_message_id), + Some(&user_message.id), provider_config.clone(), )?; - (history, agent, user_message_id) + (history, agent, user_message) }; let result = agent.process(history).await?; @@ -1534,11 +1563,17 @@ impl SessionManager { let response = { let mut session_guard = session.lock().await; - if !session_guard.is_latest_user_message(chat_id, &user_message_id) { + if !session_guard.matches_current_user_turn(chat_id, &user_message) { + let (latest_user_id, latest_user_preview, compression_in_flight, history_len) = + session_guard.stale_result_diagnostics(chat_id); tracing::warn!( channel = %channel_name, chat_id = %chat_id, - user_message_id = %user_message_id, + user_message_id = %user_message.id, + latest_user_id, + latest_user_preview, + compression_in_flight, + history_len, "Skipping stale scheduled agent result because a newer user message is already present" ); Vec::new() @@ -1742,6 +1777,67 @@ mod tests { assert!(session.is_latest_user_message("chat-1", &second_id)); } + #[tokio::test] + async fn test_current_user_turn_match_survives_history_compaction_reload() { + let store = Arc::new(SessionStore::in_memory().unwrap()); + let (user_tx, _user_rx) = mpsc::channel(4); + let skills = Arc::new(SkillRuntime::default()); + let tools = Arc::new(default_tools( + skills.clone(), + store.clone(), + HashSet::new(), + "Asia/Shanghai".to_string(), + )); + let mut session = Session::new( + "feishu".to_string(), + test_provider_config(), + user_tx, + tools, + skills, + store.clone(), + 100, + ) + .await + .unwrap(); + + session.ensure_persistent_session("chat-1").unwrap(); + session.ensure_chat_loaded("chat-1").unwrap(); + + let first = session.create_user_message("first", Vec::new()); + let first_id = first.id.clone(); + session.append_persisted_message("chat-1", first).unwrap(); + session + .append_persisted_message("chat-1", ChatMessage::assistant("answer-1")) + .unwrap(); + + let second = session.create_user_message("second", Vec::new()); + session.append_persisted_message("chat-1", second.clone()).unwrap(); + session + .append_persisted_message("chat-1", ChatMessage::assistant("answer-2")) + .unwrap(); + + let session_id = session.persistent_session_id("chat-1"); + let snapshot_end_seq = store.get_session(&session_id).unwrap().unwrap().message_count; + let preserved_messages = session.get_history("chat-1").unwrap().clone(); + + store + .compact_active_history( + &session_id, + 0, + snapshot_end_seq, + &[], + &ChatMessage::system("[Compressed History]\n\nsummary"), + &preserved_messages, + ) + .unwrap(); + + session.reload_chat_history("chat-1").unwrap(); + + assert!(!session.is_latest_user_message("chat-1", &first_id)); + assert!(!session.is_latest_user_message("chat-1", &second.id)); + assert!(session.matches_current_user_turn("chat-1", &second)); + } + #[test] fn test_select_provider_config_falls_back_to_default() { let default_provider = test_provider_config_named("default-provider", "default-model");