feat: 添加 topic_id 支持,确保消息按话题隔离

This commit is contained in:
ooodc 2026-06-07 17:49:22 +08:00
parent 3a623cc8a3
commit eeea7d44d0
7 changed files with 56 additions and 8 deletions

View File

@ -78,7 +78,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(), id: response.request_id.to_string(),
content: msg.content.clone(), content: msg.content.clone(),
role: "assistant".to_string(), role: "assistant".to_string(),
attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()),
}, },
MessageKind::Notification => { MessageKind::Notification => {
// 根据元数据判断具体类型 // 根据元数据判断具体类型
@ -98,7 +98,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(), id: response.request_id.to_string(),
content: msg.content.clone(), content: msg.content.clone(),
role: "assistant".to_string(), role: "assistant".to_string(),
attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()),
}, },
} }
} else if let Some(session_id) = response.metadata.get("session_id") { } else if let Some(session_id) = response.metadata.get("session_id") {
@ -137,7 +137,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(), id: response.request_id.to_string(),
content: msg.content.clone(), content: msg.content.clone(),
role: "assistant".to_string(), role: "assistant".to_string(),
attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()),
}, },
} }
} else if let Some(sessions_json) = response.metadata.get("sessions") { } else if let Some(sessions_json) = response.metadata.get("sessions") {
@ -155,7 +155,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(), id: response.request_id.to_string(),
content: msg.content.clone(), content: msg.content.clone(),
role: "assistant".to_string(), role: "assistant".to_string(),
attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()),
}, },
} }
} else if let Some(topics_json) = response.metadata.get("topics") { } else if let Some(topics_json) = response.metadata.get("topics") {
@ -174,7 +174,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(), id: response.request_id.to_string(),
content: msg.content.clone(), content: msg.content.clone(),
role: "assistant".to_string(), role: "assistant".to_string(),
attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()),
}, },
} }
} else { } else {
@ -183,7 +183,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(), id: response.request_id.to_string(),
content: msg.content.clone(), content: msg.content.clone(),
role: "assistant".to_string(), role: "assistant".to_string(),
attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()),
} }
} }
} }
@ -196,7 +196,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(), id: response.request_id.to_string(),
content: msg.content.clone(), content: msg.content.clone(),
role: "assistant".to_string(), role: "assistant".to_string(),
attachments: Vec::new(), subagent_task_id: None, timestamp: Some(crate::protocol::now_timestamp()), attachments: Vec::new(), subagent_task_id: None, topic_id: None, timestamp: Some(crate::protocol::now_timestamp()),
}, },
}; };
outbounds.push(outbound); outbounds.push(outbound);

View File

@ -244,12 +244,17 @@ impl InboundProcessor {
} }
// 普通消息进入 AgentLoop // 普通消息进入 AgentLoop
// 构建 emitter metadata包含 forwarded_metadata 和 topic_id用于前端消息隔离
let mut emitter_metadata = inbound.forwarded_metadata.clone();
if let Some(ref topic_id) = current_topic {
emitter_metadata.insert("topic_id".to_string(), topic_id.clone());
}
let live_emitter = Arc::new(PersistingEmittedMessageHandler::new( let live_emitter = Arc::new(PersistingEmittedMessageHandler::new(
BusToolCallEmitter::new( BusToolCallEmitter::new(
self.bus.clone(), self.bus.clone(),
inbound.channel.clone(), inbound.channel.clone(),
inbound.chat_id.clone(), inbound.chat_id.clone(),
inbound.forwarded_metadata.clone(), emitter_metadata,
), ),
self.session_manager.store(), self.session_manager.store(),
&session_id, &session_id,
@ -283,6 +288,10 @@ impl InboundProcessor {
Ok(outbound_messages) => { Ok(outbound_messages) => {
for mut outbound in outbound_messages { for mut outbound in outbound_messages {
outbound.metadata.extend(inbound.forwarded_metadata.clone()); outbound.metadata.extend(inbound.forwarded_metadata.clone());
// 注入 topic_id 到 outbound metadata用于前端按话题隔离消息
if let Some(ref topic_id) = current_topic {
outbound.metadata.insert("topic_id".to_string(), topic_id.clone());
}
if let Err(error) = self.bus.publish_outbound(outbound).await { if let Err(error) = self.bus.publish_outbound(outbound).await {
tracing::error!(error = %error, "Failed to publish outbound"); tracing::error!(error = %error, "Failed to publish outbound");
} }

View File

@ -717,6 +717,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option<WsOutbou
content: format!("{}\nargs: {}", tool_call.name, tool_call.arguments), content: format!("{}\nargs: {}", tool_call.name, tool_call.arguments),
role: msg.role.clone(), role: msg.role.clone(),
subagent_task_id: None, subagent_task_id: None,
topic_id: None,
timestamp: Some(crate::protocol::now_timestamp()), timestamp: Some(crate::protocol::now_timestamp()),
}); });
} }
@ -728,6 +729,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option<WsOutbou
role: msg.role.clone(), role: msg.role.clone(),
attachments: Vec::new(), attachments: Vec::new(),
subagent_task_id: None, subagent_task_id: None,
topic_id: None,
timestamp: Some(crate::protocol::now_timestamp()), timestamp: Some(crate::protocol::now_timestamp()),
}) })
} }
@ -741,6 +743,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option<WsOutbou
content: msg.content.clone(), content: msg.content.clone(),
role: msg.role.clone(), role: msg.role.clone(),
subagent_task_id: None, subagent_task_id: None,
topic_id: None,
duration_ms: msg.tool_duration_ms, duration_ms: msg.tool_duration_ms,
timestamp: Some(crate::protocol::now_timestamp()), timestamp: Some(crate::protocol::now_timestamp()),
}), }),
@ -752,6 +755,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option<WsOutbou
role: msg.role.clone(), role: msg.role.clone(),
resume_hint: "完成外部操作后,直接发一条继续消息即可。".to_string(), resume_hint: "完成外部操作后,直接发一条继续消息即可。".to_string(),
subagent_task_id: None, subagent_task_id: None,
topic_id: None,
timestamp: Some(crate::protocol::now_timestamp()), timestamp: Some(crate::protocol::now_timestamp()),
}), }),
} }
@ -762,6 +766,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option<WsOutbou
role: msg.role.clone(), role: msg.role.clone(),
attachments, attachments,
subagent_task_id: None, subagent_task_id: None,
topic_id: None,
timestamp: Some(crate::protocol::now_timestamp()), timestamp: Some(crate::protocol::now_timestamp()),
}), }),
_ => None, _ => None,

View File

@ -117,6 +117,8 @@ pub enum WsOutbound {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
subagent_task_id: Option<String>, subagent_task_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
topic_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
timestamp: Option<i64>, timestamp: Option<i64>,
}, },
#[serde(rename = "tool_call")] #[serde(rename = "tool_call")]
@ -130,6 +132,8 @@ pub enum WsOutbound {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
subagent_task_id: Option<String>, subagent_task_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
topic_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
timestamp: Option<i64>, timestamp: Option<i64>,
}, },
#[serde(rename = "tool_result")] #[serde(rename = "tool_result")]
@ -142,6 +146,8 @@ pub enum WsOutbound {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
subagent_task_id: Option<String>, subagent_task_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
topic_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
duration_ms: Option<u64>, duration_ms: Option<u64>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
timestamp: Option<i64>, timestamp: Option<i64>,
@ -157,6 +163,8 @@ pub enum WsOutbound {
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
subagent_task_id: Option<String>, subagent_task_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")] #[serde(default, skip_serializing_if = "Option::is_none")]
topic_id: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
timestamp: Option<i64>, timestamp: Option<i64>,
}, },
#[serde(rename = "error")] #[serde(rename = "error")]

View File

@ -22,6 +22,8 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec<WsOutb
role: message.role.clone(), role: message.role.clone(),
attachments: Vec::new(), attachments: Vec::new(),
subagent_task_id: None, subagent_task_id: None,
topic_id: None,
timestamp: None,
}); });
} }
@ -33,6 +35,8 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec<WsOutb
content: format_tool_call_content(&tool_call.name, &tool_call.arguments), content: format_tool_call_content(&tool_call.name, &tool_call.arguments),
role: message.role.clone(), role: message.role.clone(),
subagent_task_id: None, subagent_task_id: None,
topic_id: None,
timestamp: None,
})); }));
outbound outbound
} else { } else {
@ -42,6 +46,8 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec<WsOutb
role: message.role.clone(), role: message.role.clone(),
attachments: Vec::new(), attachments: Vec::new(),
subagent_task_id: None, subagent_task_id: None,
topic_id: None,
timestamp: None,
}] }]
} }
} }
@ -57,7 +63,9 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec<WsOutb
content: message.content.clone(), content: message.content.clone(),
role: message.role.clone(), role: message.role.clone(),
subagent_task_id: None, subagent_task_id: None,
topic_id: None,
duration_ms: None, duration_ms: None,
timestamp: None,
}], }],
ToolMessageState::PendingUserAction => vec![WsOutbound::ToolPending { ToolMessageState::PendingUserAction => vec![WsOutbound::ToolPending {
id: message.id.clone(), id: message.id.clone(),
@ -67,6 +75,8 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec<WsOutb
role: message.role.clone(), role: message.role.clone(),
resume_hint: TOOL_PENDING_RESUME_HINT.to_string(), resume_hint: TOOL_PENDING_RESUME_HINT.to_string(),
subagent_task_id: None, subagent_task_id: None,
topic_id: None,
timestamp: None,
}], }],
}, },
_ => Vec::new(), _ => Vec::new(),
@ -93,6 +103,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve
role: message.role.clone(), role: message.role.clone(),
attachments, attachments,
subagent_task_id: message.metadata.get("subagent_task_id").cloned(), subagent_task_id: message.metadata.get("subagent_task_id").cloned(),
topic_id: message.metadata.get("topic_id").cloned(),
timestamp: Some(crate::protocol::now_timestamp()), timestamp: Some(crate::protocol::now_timestamp()),
}] }]
} }
@ -110,6 +121,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve
content: message.content.clone(), content: message.content.clone(),
role: message.role.clone(), role: message.role.clone(),
subagent_task_id: message.metadata.get("subagent_task_id").cloned(), subagent_task_id: message.metadata.get("subagent_task_id").cloned(),
topic_id: message.metadata.get("topic_id").cloned(),
timestamp: Some(crate::protocol::now_timestamp()), timestamp: Some(crate::protocol::now_timestamp()),
}], }],
OutboundEventKind::ToolResult => vec![WsOutbound::ToolResult { OutboundEventKind::ToolResult => vec![WsOutbound::ToolResult {
@ -122,6 +134,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve
content: message.content.clone(), content: message.content.clone(),
role: message.role.clone(), role: message.role.clone(),
subagent_task_id: message.metadata.get("subagent_task_id").cloned(), subagent_task_id: message.metadata.get("subagent_task_id").cloned(),
topic_id: message.metadata.get("topic_id").cloned(),
duration_ms: message duration_ms: message
.metadata .metadata
.get("tool_duration_ms") .get("tool_duration_ms")
@ -139,6 +152,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve
role: message.role.clone(), role: message.role.clone(),
resume_hint: TOOL_PENDING_RESUME_HINT.to_string(), resume_hint: TOOL_PENDING_RESUME_HINT.to_string(),
subagent_task_id: message.metadata.get("subagent_task_id").cloned(), subagent_task_id: message.metadata.get("subagent_task_id").cloned(),
topic_id: message.metadata.get("topic_id").cloned(),
timestamp: Some(crate::protocol::now_timestamp()), timestamp: Some(crate::protocol::now_timestamp()),
}], }],
OutboundEventKind::ErrorNotification => vec![WsOutbound::Error { OutboundEventKind::ErrorNotification => vec![WsOutbound::Error {

View File

@ -392,6 +392,8 @@ export function useChat(): UseChatReturn {
case 'assistant_response': { case 'assistant_response': {
const msg = message as AssistantResponse const msg = message as AssistantResponse
// 按 topic_id 隔离:如果消息属于其他话题则丢弃
if (msg.topic_id && msg.topic_id !== selectedTopicRef.current) return
const role = msg.role === 'user' || msg.role === 'tool' ? msg.role : 'assistant' const role = msg.role === 'user' || msg.role === 'tool' ? msg.role : 'assistant'
setMessages((prev) => [ setMessages((prev) => [
...prev, ...prev,
@ -416,6 +418,8 @@ export function useChat(): UseChatReturn {
case 'tool_call': { case 'tool_call': {
const msg = message as ToolCall const msg = message as ToolCall
// 按 topic_id 隔离:如果消息属于其他话题则丢弃
if (msg.topic_id && msg.topic_id !== selectedTopicRef.current) return
setMessages((prev) => [ setMessages((prev) => [
...prev, ...prev,
{ {
@ -435,6 +439,8 @@ export function useChat(): UseChatReturn {
case 'tool_result': { case 'tool_result': {
const msg = message as ToolResult const msg = message as ToolResult
// 按 topic_id 隔离:如果消息属于其他话题则丢弃
if (msg.topic_id && msg.topic_id !== selectedTopicRef.current) return
setMessages((prev) => [ setMessages((prev) => [
...prev, ...prev,
{ {
@ -454,6 +460,8 @@ export function useChat(): UseChatReturn {
case 'tool_pending': { case 'tool_pending': {
const msg = message as ToolPending const msg = message as ToolPending
// 按 topic_id 隔离:如果消息属于其他话题则丢弃
if (msg.topic_id && msg.topic_id !== selectedTopicRef.current) return
setMessages((prev) => [ setMessages((prev) => [
...prev, ...prev,
{ {

View File

@ -43,6 +43,7 @@ export interface AssistantResponse {
role: string role: string
attachments?: Attachment[] attachments?: Attachment[]
subagent_task_id?: string subagent_task_id?: string
topic_id?: string
timestamp?: number timestamp?: number
} }
@ -55,6 +56,7 @@ export interface ToolCall {
content: string content: string
role: string role: string
subagent_task_id?: string subagent_task_id?: string
topic_id?: string
timestamp?: number timestamp?: number
} }
@ -66,6 +68,7 @@ export interface ToolResult {
content: string content: string
role: string role: string
subagent_task_id?: string subagent_task_id?: string
topic_id?: string
duration_ms?: number duration_ms?: number
timestamp?: number timestamp?: number
} }
@ -79,6 +82,7 @@ export interface ToolPending {
role: string role: string
resume_hint: string resume_hint: string
subagent_task_id?: string subagent_task_id?: string
topic_id?: string
timestamp?: number timestamp?: number
} }