fix: 修复话题描述更新逻辑的竞态条件和语义错误,前端自动刷新描述
- topic_description.rs: LLM 返回空字符串时返回 Err 而非 Ok(""),防止空值写回 DB 触发循环生成
- processor.rs: 添加 Arc<Mutex<HashSet>> 生成中守卫防止重复触发,改用 DB 中真正第一条用户消息生成描述
- useChat.ts: assistant_response 时检测当前话题描述为空则递增刷新信号
- App.tsx: 监听刷新信号,500ms 防抖后自动发送 list_topics 获取新描述
This commit is contained in:
parent
b5e2886068
commit
3a623cc8a3
@ -1,4 +1,5 @@
|
|||||||
use std::sync::Arc;
|
use std::collections::HashSet;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use tokio::sync::Semaphore;
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
@ -35,6 +36,7 @@ pub struct InboundProcessor {
|
|||||||
provider_config: LLMProviderConfig,
|
provider_config: LLMProviderConfig,
|
||||||
command_router: Arc<CommandRouter>,
|
command_router: Arc<CommandRouter>,
|
||||||
cancel_manager: CancelManager,
|
cancel_manager: CancelManager,
|
||||||
|
description_generation_in_flight: Arc<Mutex<HashSet<String>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl InboundProcessor {
|
impl InboundProcessor {
|
||||||
@ -120,6 +122,7 @@ impl InboundProcessor {
|
|||||||
provider_config,
|
provider_config,
|
||||||
command_router: Arc::new(command_router),
|
command_router: Arc::new(command_router),
|
||||||
cancel_manager,
|
cancel_manager,
|
||||||
|
description_generation_in_flight: Arc::new(Mutex::new(HashSet::new())),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -285,33 +288,64 @@ impl InboundProcessor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 异步生成 topic 描述(仅第一条消息后触发一次)
|
// 异步生成 topic 描述(仅当描述为空且没有正在进行的生成任务时触发)
|
||||||
if let Some(ref topic_id) = current_topic {
|
if let Some(ref topic_id) = current_topic {
|
||||||
let store = self.session_manager.store();
|
let store = self.session_manager.store();
|
||||||
if let Ok(Some(topic)) = store.get_topic(topic_id) {
|
if let Ok(Some(topic)) = store.get_topic(topic_id) {
|
||||||
if topic.description.is_none() || topic.description.as_ref().map(|d| d.is_empty()).unwrap_or(true) {
|
if topic.description.is_none() || topic.description.as_ref().map(|d| d.is_empty()).unwrap_or(true) {
|
||||||
let provider_config = self.provider_config.clone();
|
// 检查并设置"生成中"守卫,防止竞态条件导致重复生成
|
||||||
let topic_id_clone = topic_id.clone();
|
let should_generate = {
|
||||||
let first_message = inbound.content.clone();
|
let mut in_flight = self.description_generation_in_flight.lock().unwrap();
|
||||||
let store_clone = store.clone();
|
if in_flight.contains(topic_id) {
|
||||||
|
false
|
||||||
|
} else {
|
||||||
|
in_flight.insert(topic_id.clone());
|
||||||
|
true
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
tokio::spawn(async move {
|
if should_generate {
|
||||||
let runtime_config: ProviderRuntimeConfig = provider_config.into();
|
let provider_config = self.provider_config.clone();
|
||||||
if let Ok(provider) = create_provider(runtime_config) {
|
let topic_id_clone = topic_id.clone();
|
||||||
match generate_topic_description(provider.as_ref(), &first_message).await {
|
let store_clone = store.clone();
|
||||||
Ok(description) => {
|
let in_flight = self.description_generation_in_flight.clone();
|
||||||
if let Err(e) = store_clone.update_topic_description(&topic_id_clone, &description) {
|
|
||||||
tracing::error!(error = %e, topic_id = %topic_id_clone, "Failed to update topic description");
|
tokio::spawn(async move {
|
||||||
} else {
|
// 从 DB 查询该 topic 的第一条用户消息作为描述生成的依据
|
||||||
tracing::info!(topic_id = %topic_id_clone, description = %description, "Topic description generated");
|
let first_user_message = store_clone
|
||||||
|
.load_messages_for_topic(&topic_id_clone)
|
||||||
|
.ok()
|
||||||
|
.and_then(|msgs| msgs.into_iter().find(|m| m.role == "user"))
|
||||||
|
.map(|m| m.content);
|
||||||
|
|
||||||
|
let message_content = match first_user_message {
|
||||||
|
Some(content) => content,
|
||||||
|
None => {
|
||||||
|
tracing::warn!(topic_id = %topic_id_clone, "No user message found for topic, skipping description generation");
|
||||||
|
in_flight.lock().unwrap().remove(&topic_id_clone);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let runtime_config: ProviderRuntimeConfig = provider_config.into();
|
||||||
|
if let Ok(provider) = create_provider(runtime_config) {
|
||||||
|
match generate_topic_description(provider.as_ref(), &message_content).await {
|
||||||
|
Ok(description) => {
|
||||||
|
if let Err(e) = store_clone.update_topic_description(&topic_id_clone, &description) {
|
||||||
|
tracing::error!(error = %e, topic_id = %topic_id_clone, "Failed to update topic description");
|
||||||
|
} else {
|
||||||
|
tracing::info!(topic_id = %topic_id_clone, description = %description, "Topic description generated");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, topic_id = %topic_id_clone, "Failed to generate topic description");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(error = %e, topic_id = %topic_id_clone, "Failed to generate topic description");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
// 无论成功失败,释放生成守卫
|
||||||
});
|
in_flight.lock().unwrap().remove(&topic_id_clone);
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -17,11 +17,15 @@ pub async fn generate_topic_description(
|
|||||||
};
|
};
|
||||||
|
|
||||||
let response = provider.chat(request).await?;
|
let response = provider.chat(request).await?;
|
||||||
let description = response.content.trim();
|
let description = response.content.trim().to_string();
|
||||||
|
|
||||||
|
if description.is_empty() {
|
||||||
|
return Err("LLM returned empty description".into());
|
||||||
|
}
|
||||||
|
|
||||||
if description.len() > 50 {
|
if description.len() > 50 {
|
||||||
Ok(description.chars().take(50).collect())
|
Ok(description.chars().take(50).collect())
|
||||||
} else {
|
} else {
|
||||||
Ok(description.to_string())
|
Ok(description)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -60,6 +60,7 @@ function App() {
|
|||||||
deleteTopic,
|
deleteTopic,
|
||||||
requestSessionList,
|
requestSessionList,
|
||||||
requestTopicList,
|
requestTopicList,
|
||||||
|
topicRefreshTrigger,
|
||||||
enterSubAgentView,
|
enterSubAgentView,
|
||||||
exitSubAgentView,
|
exitSubAgentView,
|
||||||
handleStop,
|
handleStop,
|
||||||
@ -125,6 +126,21 @@ function App() {
|
|||||||
}
|
}
|
||||||
}, [sessionId, status, handleCommand, sendMessage, requestTopicList])
|
}, [sessionId, status, handleCommand, sendMessage, requestTopicList])
|
||||||
|
|
||||||
|
// 话题描述异步生成后自动刷新话题列表
|
||||||
|
useEffect(() => {
|
||||||
|
if (topicRefreshTrigger === 0) return
|
||||||
|
if (status !== 'connected') return
|
||||||
|
const topicCmd = requestTopicList()
|
||||||
|
if (!topicCmd) return
|
||||||
|
|
||||||
|
const timer = setTimeout(() => {
|
||||||
|
handleCommand(topicCmd)
|
||||||
|
sendMessage({ type: 'command', payload: JSON.stringify(topicCmd) })
|
||||||
|
}, 500)
|
||||||
|
|
||||||
|
return () => clearTimeout(timer)
|
||||||
|
}, [topicRefreshTrigger])
|
||||||
|
|
||||||
// Topics 加载后,自动选择第一个并通知后端切换,以便加载历史消息
|
// Topics 加载后,自动选择第一个并通知后端切换,以便加载历史消息
|
||||||
useEffect(() => {
|
useEffect(() => {
|
||||||
if (topics.length === 0 || status !== 'connected') {
|
if (topics.length === 0 || status !== 'connected') {
|
||||||
|
|||||||
@ -68,6 +68,7 @@ interface UseChatReturn {
|
|||||||
// 初始化方法
|
// 初始化方法
|
||||||
requestSessionList: () => Command
|
requestSessionList: () => Command
|
||||||
requestTopicList: () => Command | null
|
requestTopicList: () => Command | null
|
||||||
|
topicRefreshTrigger: number
|
||||||
requestChannelList: () => Command
|
requestChannelList: () => Command
|
||||||
selectChannel: (channelId: string) => void
|
selectChannel: (channelId: string) => void
|
||||||
selectSession: (sessionId: string) => void
|
selectSession: (sessionId: string) => void
|
||||||
@ -118,6 +119,7 @@ export function useChat(): UseChatReturn {
|
|||||||
const [connectionId, setConnectionId] = useState<string | null>(null)
|
const [connectionId, setConnectionId] = useState<string | null>(null)
|
||||||
const [topics, setTopics] = useState<Topic[]>([])
|
const [topics, setTopics] = useState<Topic[]>([])
|
||||||
const [selectedTopic, setSelectedTopic] = useState<string | null>(null)
|
const [selectedTopic, setSelectedTopic] = useState<string | null>(null)
|
||||||
|
const [topicRefreshTrigger, setTopicRefreshTrigger] = useState(0)
|
||||||
const [sessions, setSessions] = useState<SessionSummary[]>([])
|
const [sessions, setSessions] = useState<SessionSummary[]>([])
|
||||||
const [selectedSessionId, setSelectedSessionId] = useState<string | null>(null)
|
const [selectedSessionId, setSelectedSessionId] = useState<string | null>(null)
|
||||||
const [subAgentView, setSubAgentView] = useState<SubAgentView | null>(null)
|
const [subAgentView, setSubAgentView] = useState<SubAgentView | null>(null)
|
||||||
@ -137,6 +139,8 @@ export function useChat(): UseChatReturn {
|
|||||||
// Ref to track subAgentView and schedulerView for use in callbacks
|
// Ref to track subAgentView and schedulerView for use in callbacks
|
||||||
const subAgentViewRef = useRef<SubAgentView | null>(null)
|
const subAgentViewRef = useRef<SubAgentView | null>(null)
|
||||||
const schedulerViewRef = useRef<SchedulerJobView | null>(null)
|
const schedulerViewRef = useRef<SchedulerJobView | null>(null)
|
||||||
|
const topicsRef = useRef<Topic[]>([])
|
||||||
|
const selectedTopicRef = useRef<string | null>(null)
|
||||||
|
|
||||||
const isConnected = useMemo(() => connectionId !== null, [connectionId])
|
const isConnected = useMemo(() => connectionId !== null, [connectionId])
|
||||||
const selectedSession = useMemo(
|
const selectedSession = useMemo(
|
||||||
@ -401,6 +405,12 @@ export function useChat(): UseChatReturn {
|
|||||||
},
|
},
|
||||||
])
|
])
|
||||||
setIsLoading(false)
|
setIsLoading(false)
|
||||||
|
|
||||||
|
// 当前话题无描述时,可能刚触发了异步生成,标记需要刷新
|
||||||
|
const currentTopic = topicsRef.current.find(t => t.id === selectedTopicRef.current)
|
||||||
|
if (currentTopic && !currentTopic.description) {
|
||||||
|
setTopicRefreshTrigger(n => n + 1)
|
||||||
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -617,6 +627,14 @@ export function useChat(): UseChatReturn {
|
|||||||
schedulerViewRef.current = schedulerView
|
schedulerViewRef.current = schedulerView
|
||||||
}, [schedulerView])
|
}, [schedulerView])
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
topicsRef.current = topics
|
||||||
|
}, [topics])
|
||||||
|
|
||||||
|
useEffect(() => {
|
||||||
|
selectedTopicRef.current = selectedTopic
|
||||||
|
}, [selectedTopic])
|
||||||
|
|
||||||
const enterSubAgentView = useCallback((taskId: string, description: string): Command => {
|
const enterSubAgentView = useCallback((taskId: string, description: string): Command => {
|
||||||
const newView: SubAgentView = {
|
const newView: SubAgentView = {
|
||||||
taskId,
|
taskId,
|
||||||
@ -714,6 +732,7 @@ export function useChat(): UseChatReturn {
|
|||||||
deleteTopic,
|
deleteTopic,
|
||||||
requestSessionList,
|
requestSessionList,
|
||||||
requestTopicList,
|
requestTopicList,
|
||||||
|
topicRefreshTrigger,
|
||||||
requestChannelList,
|
requestChannelList,
|
||||||
selectChannel,
|
selectChannel,
|
||||||
selectSession,
|
selectSession,
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user