refactor: 消息持久化从批量改为实时逐条,通过装饰器模式实现

- 新增 PersistingEmittedMessageHandler 装饰器,在 emitter 广播前逐条落库
- processor 和 task/runtime 使用装饰器包裹 emitter,替代 post-loop 批量写入
- 移除 session_history 中的批量 DB 写入,仅保留内存历史更新
- execution 中跳过已由 live emitter 实时广播的工具消息,避免重复
- 前端支持运行中 task 工具卡片"查看实时进度"跳转子智能体视图

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
oudecheng 2026-05-29 16:47:57 +08:00
parent 3b0b4c1f2e
commit d0051baa07
13 changed files with 128 additions and 56 deletions

View File

@ -2,6 +2,7 @@ use crate::agent::AgentRuntimeConfig;
use crate::agent::{SystemPromptContext, SystemPromptProvider};
use crate::bus::ChatMessage;
use crate::bus::message::ToolMessageState;
use crate::storage::ConversationRepository;
use crate::domain::messages::{ContentBlock, ToolCall};
use crate::observability::{
Observer, ObserverEvent, ToolExecutionOutcome, ToolExecutionState, truncate_args,
@ -657,6 +658,36 @@ pub trait EmittedMessageHandler: Send + Sync + 'static {
async fn handle(&self, message: ChatMessage);
}
/// 装饰器:在内部 emitter 广播前,先将消息持久化到 DB
pub struct PersistingEmittedMessageHandler<H: EmittedMessageHandler> {
inner: H,
conversation_repository: Arc<dyn ConversationRepository>,
session_id: String,
}
impl<H: EmittedMessageHandler> PersistingEmittedMessageHandler<H> {
pub fn new(
inner: H,
conversation_repository: Arc<dyn ConversationRepository>,
session_id: impl Into<String>,
) -> Self {
Self { inner, conversation_repository, session_id: session_id.into() }
}
}
#[async_trait]
impl<H: EmittedMessageHandler> EmittedMessageHandler for PersistingEmittedMessageHandler<H> {
async fn handle(&self, message: ChatMessage) {
if let Err(e) = self.conversation_repository
.append_message(&self.session_id, &message)
{
tracing::error!(error = %e, session_id = %self.session_id,
"Failed to persist emitted message");
}
self.inner.handle(message).await;
}
}
pub trait SkillProvider: Send + Sync + 'static {
fn system_index_prompt(&self) -> Option<String>;

View File

@ -4,7 +4,8 @@ pub mod runtime_config;
pub mod system_prompt;
pub use agent_loop::{
AgentError, AgentLoop, AgentProcessResult, EmittedMessageHandler, SkillProvider,
AgentError, AgentLoop, AgentProcessResult, EmittedMessageHandler,
PersistingEmittedMessageHandler, SkillProvider,
};
pub use context_compressor::ContextCompressor;
pub use runtime_config::AgentRuntimeConfig;

View File

@ -77,7 +77,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(),
content: msg.content.clone(),
role: "assistant".to_string(),
attachments: Vec::new(),
attachments: Vec::new(), subagent_task_id: None,
},
MessageKind::Notification => {
// 根据元数据判断具体类型
@ -97,7 +97,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(),
content: msg.content.clone(),
role: "assistant".to_string(),
attachments: Vec::new(),
attachments: Vec::new(), subagent_task_id: None,
},
}
} else if let Some(session_id) = response.metadata.get("session_id") {
@ -136,7 +136,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(),
content: msg.content.clone(),
role: "assistant".to_string(),
attachments: Vec::new(),
attachments: Vec::new(), subagent_task_id: None,
},
}
} else if let Some(sessions_json) = response.metadata.get("sessions") {
@ -154,7 +154,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(),
content: msg.content.clone(),
role: "assistant".to_string(),
attachments: Vec::new(),
attachments: Vec::new(), subagent_task_id: None,
},
}
} else if let Some(topics_json) = response.metadata.get("topics") {
@ -173,7 +173,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(),
content: msg.content.clone(),
role: "assistant".to_string(),
attachments: Vec::new(),
attachments: Vec::new(), subagent_task_id: None,
},
}
} else {
@ -182,7 +182,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(),
content: msg.content.clone(),
role: "assistant".to_string(),
attachments: Vec::new(),
attachments: Vec::new(), subagent_task_id: None,
}
}
}
@ -194,7 +194,7 @@ impl OutputAdapter for WebSocketOutputAdapter {
id: response.request_id.to_string(),
content: msg.content.clone(),
role: "assistant".to_string(),
attachments: Vec::new(),
attachments: Vec::new(), subagent_task_id: None,
},
};
outbounds.push(outbound);

View File

@ -157,7 +157,11 @@ impl AgentExecutionService {
.emitted_messages
.iter()
.filter(|message| {
(!message.is_assistant_tool_call_message() || !request.suppress_live_tool_calls)
// 当存在 live_emitter 时,工具调用和工具结果已在 loop 中实时广播
// 只保留最终 assistant 文本通过 post-loop 路径发送
let already_emitted = request.suppress_live_tool_calls
&& (message.is_assistant_tool_call_message() || message.role == "tool");
!already_emitted
&& should_display_message_to_user(self.show_tool_results, message)
})
.flat_map(|message| {

View File

@ -2,7 +2,7 @@ use std::sync::Arc;
use tokio::sync::Semaphore;
use crate::agent::{AgentError, CompositeSystemPromptProvider};
use crate::agent::{AgentError, CompositeSystemPromptProvider, PersistingEmittedMessageHandler};
use crate::bus::{InboundMessage, MessageBus, OutboundMessage};
use crate::command::adapter::InputAdapter;
use crate::command::adapters::channel::ChannelInputAdapter;
@ -218,11 +218,15 @@ impl InboundProcessor {
}
// 普通消息进入 AgentLoop
let live_emitter = Arc::new(BusToolCallEmitter::new(
let live_emitter = Arc::new(PersistingEmittedMessageHandler::new(
BusToolCallEmitter::new(
self.bus.clone(),
inbound.channel.clone(),
inbound.chat_id.clone(),
inbound.forwarded_metadata.clone(),
),
self.session_manager.store(),
&session_id,
));
match self

View File

@ -158,39 +158,23 @@ impl SessionHistory {
return Ok(());
}
let session_id = self.persistent_session_id(chat_id);
let topic_id = self.chat_topic_ids.get(chat_id).map(|s| s.as_str());
self.conversations
.append_messages_batch(&session_id, topic_id, &messages)
.map_err(|err| {
AgentError::Other(format!("batch append messages error: {}", err))
})?;
for message in messages {
self.add_message(chat_id, message);
}
Ok(())
}
/// 将消息保存到指定话题(直接写入数据库,不更新内存历史)
/// 用于异步执行结果保存到原始话题的场景
/// 将消息保存到指定话题
/// 每条消息已通过 PersistingEmittedMessageHandler 逐条持久化,此处仅保留接口兼容
pub(crate) fn append_to_topic(
&self,
chat_id: &str,
topic_id: &str,
_chat_id: &str,
_topic_id: &str,
messages: &[ChatMessage],
) -> Result<(), AgentError> {
if messages.is_empty() {
return Ok(());
}
let session_id = self.persistent_session_id(chat_id);
self.conversations
.append_messages_batch(&session_id, Some(topic_id), messages)
.map_err(|err| {
AgentError::Other(format!("batch append to topic error: {}", err))
})?;
Ok(())
}

View File

@ -512,6 +512,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option<WsOutbou
content: msg.content.clone(),
role: msg.role.clone(),
attachments: Vec::new(),
subagent_task_id: None,
})
}
"tool" => {
@ -532,6 +533,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option<WsOutbou
content: msg.content.clone(),
role: msg.role.clone(),
resume_hint: "完成外部操作后,直接发一条继续消息即可。".to_string(),
subagent_task_id: None,
}),
}
}
@ -540,6 +542,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option<WsOutbou
content: msg.content.clone(),
role: msg.role.clone(),
attachments: Vec::new(),
subagent_task_id: None,
}),
_ => None,
}

View File

@ -79,6 +79,8 @@ pub enum WsOutbound {
role: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
attachments: Vec<MediaSummary>,
#[serde(default, skip_serializing_if = "Option::is_none")]
subagent_task_id: Option<String>,
},
#[serde(rename = "tool_call")]
ToolCall {
@ -109,6 +111,8 @@ pub enum WsOutbound {
content: String,
role: String,
resume_hint: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
subagent_task_id: Option<String>,
},
#[serde(rename = "error")]
Error { code: String, message: String },

View File

@ -21,6 +21,7 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec<WsOutb
content: message.content.clone(),
role: message.role.clone(),
attachments: Vec::new(),
subagent_task_id: None,
});
}
@ -40,6 +41,7 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec<WsOutb
content: message.content.clone(),
role: message.role.clone(),
attachments: Vec::new(),
subagent_task_id: None,
}]
}
}
@ -63,6 +65,7 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec<WsOutb
content: message.content.clone(),
role: message.role.clone(),
resume_hint: TOOL_PENDING_RESUME_HINT.to_string(),
subagent_task_id: None,
}],
},
_ => Vec::new(),
@ -88,6 +91,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve
content: message.content.clone(),
role: message.role.clone(),
attachments,
subagent_task_id: message.metadata.get("subagent_task_id").cloned(),
}]
}
OutboundEventKind::ToolCall => vec![WsOutbound::ToolCall {
@ -126,6 +130,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve
content: message.content.clone(),
role: message.role.clone(),
resume_hint: TOOL_PENDING_RESUME_HINT.to_string(),
subagent_task_id: message.metadata.get("subagent_task_id").cloned(),
}],
OutboundEventKind::ErrorNotification => vec![WsOutbound::Error {
code: "AGENT_ERROR".to_string(),

View File

@ -7,7 +7,7 @@ use std::time::Duration;
use async_trait::async_trait;
use serde::Deserialize;
use crate::agent::{AgentLoop, AgentRuntimeConfig, EmittedMessageHandler, SystemPrompt, SystemPromptContext, SystemPromptProvider};
use crate::agent::{AgentLoop, AgentRuntimeConfig, EmittedMessageHandler, PersistingEmittedMessageHandler, SystemPrompt, SystemPromptContext, SystemPromptProvider};
use crate::bus::ChatMessage;
use crate::bus::message::OutboundMessage;
use crate::bus::MessageBus;
@ -227,12 +227,16 @@ impl DefaultSubAgentRuntime {
metadata.insert("subagent_task_id".to_string(), session.id.clone());
metadata.insert("is_subagent_event".to_string(), "true".to_string());
let emitter = Arc::new(SubAgentEmitter {
let emitter = Arc::new(PersistingEmittedMessageHandler::new(
SubAgentEmitter {
bus: bus.clone(),
channel_name: session.parent_channel_name.clone(),
chat_id: session.parent_chat_id.clone(),
metadata,
});
},
self.conversation_repository.clone(),
session.session_id.clone(),
));
return agent.with_emitted_message_handler(emitter);
}
@ -274,11 +278,6 @@ impl DefaultSubAgentRuntime {
match result {
Ok(Ok(process_result)) => {
// 保存子智能体产生的所有消息到数据库(批量单事务)
self.conversation_repository
.append_messages_batch(&session.session_id, None, &process_result.emitted_messages)
.map_err(TaskError::RepositoryError)?;
let final_message = process_result.final_response;
Ok(TaskToolResult {
status: "success".to_string(),
@ -324,11 +323,6 @@ impl DefaultSubAgentRuntime {
match result {
Ok(Ok(process_result)) => {
// 保存子智能体产生的所有消息到数据库(批量单事务)
self.conversation_repository
.append_messages_batch(&session.session_id, None, &process_result.emitted_messages)
.map_err(TaskError::RepositoryError)?;
let final_message = process_result.final_response;
Ok(TaskToolResult {
status: "success".to_string(),

View File

@ -300,6 +300,19 @@ export function MessageBubble({ message, onNavigateToSubAgent }: MessageBubblePr
<span></span>
</div>
</>
) : isTaskTool && message.subagentTaskId ? (
<div className="px-3 pb-1">
<button
onClick={(e) => {
e.stopPropagation()
onNavigateToSubAgent?.(message.subagentTaskId!, taskDescription || '子智能体任务')
}}
className="text-xs text-[#00f0ff] hover:text-[#00f0ff]/80 hover:underline transition-colors flex items-center gap-1"
>
<span></span>
<span></span>
</button>
</div>
) : hasResult ? (
<div className="px-3 pb-2 text-xs text-[#00f0ff]/50 flex items-center gap-1 select-none">
<span></span>
@ -378,6 +391,18 @@ export function MessageBubble({ message, onNavigateToSubAgent }: MessageBubblePr
{!hasArgs && !hasResult && (
<div className="text-xs text-zinc-500">...</div>
)}
{isTaskTool && message.subagentTaskId && (
<button
onClick={(e) => {
e.stopPropagation()
onNavigateToSubAgent?.(message.subagentTaskId!, taskDescription || '子智能体任务')
}}
className="text-xs text-[#00f0ff] hover:text-[#00f0ff]/80 hover:underline transition-colors flex items-center gap-1"
>
<span></span>
<span></span>
</button>
)}
</>
)}
</div>

View File

@ -98,8 +98,9 @@ export function useChat(): UseChatReturn {
// Extract subagent_task_id from a message if present
const getSubagentTaskId = (message: WsOutbound): string | undefined => {
if (message.type === 'tool_call' || message.type === 'tool_result') {
return (message as ToolCall | ToolResult).subagent_task_id
if (message.type === 'tool_call' || message.type === 'tool_result'
|| message.type === 'tool_pending' || message.type === 'assistant_response') {
return (message as ToolCall | ToolResult | ToolPending | AssistantResponse).subagent_task_id
}
return undefined
}
@ -117,6 +118,7 @@ export function useChat(): UseChatReturn {
timestamp: Date.now(),
type: 'message',
attachments: msg.attachments,
subagentTaskId: msg.subagent_task_id,
}
}
case 'tool_call': {
@ -156,6 +158,7 @@ export function useChat(): UseChatReturn {
type: 'tool_pending',
toolName: msg.tool_name,
toolCallId: msg.tool_call_id,
subagentTaskId: msg.subagent_task_id,
}
}
case 'error': {
@ -215,8 +218,20 @@ export function useChat(): UseChatReturn {
return
}
// In main view, skip sub-agent messages (they belong to sub-agent view)
if (getSubagentTaskId(message)) {
// In main view, skip sub-agent messages (they belong to sub-agent view).
// But use the task_id to associate with the running task tool card.
const msgSubagentTaskId = getSubagentTaskId(message)
if (msgSubagentTaskId) {
setMessages((prev) => {
for (let i = prev.length - 1; i >= 0; i--) {
if (prev[i].type === 'tool_call' && prev[i].toolName === 'task' && !prev[i].subagentTaskId) {
const updated = [...prev]
updated[i] = { ...updated[i], subagentTaskId: msgSubagentTaskId }
return updated
}
}
return prev
})
return
}

View File

@ -41,6 +41,7 @@ export interface AssistantResponse {
content: string
role: string
attachments?: Attachment[]
subagent_task_id?: string
}
export interface ToolCall {
@ -72,6 +73,7 @@ export interface ToolPending {
content: string
role: string
resume_hint: string
subagent_task_id?: string
}
export interface WsError {