diff --git a/src/command/adapters/websocket.rs b/src/command/adapters/websocket.rs index 148d867..e089978 100644 --- a/src/command/adapters/websocket.rs +++ b/src/command/adapters/websocket.rs @@ -78,7 +78,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), + attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()), }, MessageKind::Notification => { // 根据元数据判断具体类型 @@ -98,7 +98,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), + attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()), }, } } else if let Some(session_id) = response.metadata.get("session_id") { @@ -137,7 +137,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), + attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()), }, } } else if let Some(sessions_json) = response.metadata.get("sessions") { @@ -155,7 +155,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), + attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()), }, } } else if let Some(topics_json) = response.metadata.get("topics") { @@ -174,7 +174,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), + attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()), }, } } else { @@ -183,7 +183,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), + attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()), } } } @@ -196,7 +196,7 @@ impl OutputAdapter for WebSocketOutputAdapter { id: response.request_id.to_string(), content: msg.content.clone(), role: "assistant".to_string(), - attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), + attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()), }, }; outbounds.push(outbound); diff --git a/src/gateway/processor.rs b/src/gateway/processor.rs index 036c2f8..5335132 100644 --- a/src/gateway/processor.rs +++ b/src/gateway/processor.rs @@ -244,12 +244,17 @@ impl InboundProcessor { } // 普通消息进入 AgentLoop + // 构建 emitter metadata:包含 forwarded_metadata 和 topic_id(用于前端消息隔离) + let mut emitter_metadata = inbound.forwarded_metadata.clone(); + if let Some(ref topic_id) = current_topic { + emitter_metadata.insert("topic_id".to_string(), topic_id.clone()); + } let live_emitter = Arc::new(PersistingEmittedMessageHandler::new( BusToolCallEmitter::new( self.bus.clone(), inbound.channel.clone(), inbound.chat_id.clone(), - inbound.forwarded_metadata.clone(), + emitter_metadata, ), self.session_manager.store(), &session_id, @@ -283,6 +288,10 @@ impl InboundProcessor { Ok(outbound_messages) => { for mut outbound in outbound_messages { outbound.metadata.extend(inbound.forwarded_metadata.clone()); + // 注入 topic_id 到 outbound metadata,用于前端按话题隔离消息 + if let Some(ref topic_id) = current_topic { + outbound.metadata.insert("topic_id".to_string(), topic_id.clone()); + } if let Err(error) = self.bus.publish_outbound(outbound).await { tracing::error!(error = %error, "Failed to publish outbound"); } diff --git a/src/gateway/ws.rs b/src/gateway/ws.rs index c9a9b37..c0d8e06 100644 --- a/src/gateway/ws.rs +++ b/src/gateway/ws.rs @@ -717,6 +717,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option Option Option Option Option None, diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index dca8b4a..440b28e 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -117,6 +117,8 @@ pub enum WsOutbound { #[serde(default, skip_serializing_if = "Option::is_none")] subagent_task_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] + topic_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] timestamp: Option, }, #[serde(rename = "tool_call")] @@ -130,6 +132,8 @@ pub enum WsOutbound { #[serde(default, skip_serializing_if = "Option::is_none")] subagent_task_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] + topic_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] timestamp: Option, }, #[serde(rename = "tool_result")] @@ -142,6 +146,8 @@ pub enum WsOutbound { #[serde(default, skip_serializing_if = "Option::is_none")] subagent_task_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] + topic_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] duration_ms: Option, #[serde(default, skip_serializing_if = "Option::is_none")] timestamp: Option, @@ -157,6 +163,8 @@ pub enum WsOutbound { #[serde(default, skip_serializing_if = "Option::is_none")] subagent_task_id: Option, #[serde(default, skip_serializing_if = "Option::is_none")] + topic_id: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] timestamp: Option, }, #[serde(rename = "error")] diff --git a/src/protocol/ws_adapter.rs b/src/protocol/ws_adapter.rs index 14a35cb..5a7097b 100644 --- a/src/protocol/ws_adapter.rs +++ b/src/protocol/ws_adapter.rs @@ -22,6 +22,8 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec Vec Vec Vec vec![WsOutbound::ToolPending { id: message.id.clone(), @@ -67,6 +75,8 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec Vec::new(), @@ -93,6 +103,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve role: message.role.clone(), attachments, subagent_task_id: message.metadata.get("subagent_task_id").cloned(), + topic_id: message.metadata.get("topic_id").cloned(), timestamp: Some(crate::protocol::now_timestamp()), }] } @@ -110,6 +121,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve content: message.content.clone(), role: message.role.clone(), subagent_task_id: message.metadata.get("subagent_task_id").cloned(), + topic_id: message.metadata.get("topic_id").cloned(), timestamp: Some(crate::protocol::now_timestamp()), }], OutboundEventKind::ToolResult => vec![WsOutbound::ToolResult { @@ -122,6 +134,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve content: message.content.clone(), role: message.role.clone(), subagent_task_id: message.metadata.get("subagent_task_id").cloned(), + topic_id: message.metadata.get("topic_id").cloned(), duration_ms: message .metadata .get("tool_duration_ms") @@ -139,6 +152,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve role: message.role.clone(), resume_hint: TOOL_PENDING_RESUME_HINT.to_string(), subagent_task_id: message.metadata.get("subagent_task_id").cloned(), + topic_id: message.metadata.get("topic_id").cloned(), timestamp: Some(crate::protocol::now_timestamp()), }], OutboundEventKind::ErrorNotification => vec![WsOutbound::Error { diff --git a/web/src/hooks/useChat.ts b/web/src/hooks/useChat.ts index 4a190d6..2bbec72 100644 --- a/web/src/hooks/useChat.ts +++ b/web/src/hooks/useChat.ts @@ -392,6 +392,8 @@ export function useChat(): UseChatReturn { case 'assistant_response': { const msg = message as AssistantResponse + // 按 topic_id 隔离:如果消息属于其他话题则丢弃 + if (msg.topic_id && msg.topic_id !== selectedTopicRef.current) return const role = msg.role === 'user' || msg.role === 'tool' ? msg.role : 'assistant' setMessages((prev) => [ ...prev, @@ -416,6 +418,8 @@ export function useChat(): UseChatReturn { case 'tool_call': { const msg = message as ToolCall + // 按 topic_id 隔离:如果消息属于其他话题则丢弃 + if (msg.topic_id && msg.topic_id !== selectedTopicRef.current) return setMessages((prev) => [ ...prev, { @@ -435,6 +439,8 @@ export function useChat(): UseChatReturn { case 'tool_result': { const msg = message as ToolResult + // 按 topic_id 隔离:如果消息属于其他话题则丢弃 + if (msg.topic_id && msg.topic_id !== selectedTopicRef.current) return setMessages((prev) => [ ...prev, { @@ -454,6 +460,8 @@ export function useChat(): UseChatReturn { case 'tool_pending': { const msg = message as ToolPending + // 按 topic_id 隔离:如果消息属于其他话题则丢弃 + if (msg.topic_id && msg.topic_id !== selectedTopicRef.current) return setMessages((prev) => [ ...prev, { diff --git a/web/src/types/protocol.ts b/web/src/types/protocol.ts index 6e0ba2e..4154a3e 100644 --- a/web/src/types/protocol.ts +++ b/web/src/types/protocol.ts @@ -43,6 +43,7 @@ export interface AssistantResponse { role: string attachments?: Attachment[] subagent_task_id?: string + topic_id?: string timestamp?: number } @@ -55,6 +56,7 @@ export interface ToolCall { content: string role: string subagent_task_id?: string + topic_id?: string timestamp?: number } @@ -66,6 +68,7 @@ export interface ToolResult { content: string role: string subagent_task_id?: string + topic_id?: string duration_ms?: number timestamp?: number } @@ -79,6 +82,7 @@ export interface ToolPending { role: string resume_hint: string subagent_task_id?: string + topic_id?: string timestamp?: number }