From 4866ea9538b7ff63a1b39352481bbf0dc11971cf Mon Sep 17 00:00:00 2001 From: oudecheng <13802883547@139.com> Date: Fri, 12 Jun 2026 16:58:05 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20todo=5Fwrite=20=E6=8C=81=E4=B9=85?= =?UTF-8?q?=E5=8C=96=E4=BB=8E=20finalize=5Fresult=20=E7=A7=BB=E5=88=B0=20B?= =?UTF-8?q?usToolCallEmitter=EF=BC=8C=E5=8D=B3=E6=97=B6=E8=A7=A6=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 之前持久化挂在 agent 全部完成后,长任务永远等不到。 现在每个 todo_write 工具调用完成时立即持久化到 SQLite。 Co-Authored-By: Claude Opus 4.8 --- src/gateway/execution.rs | 6 ----- src/gateway/processor.rs | 1 + src/gateway/session.rs | 54 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 6 deletions(-) diff --git a/src/gateway/execution.rs b/src/gateway/execution.rs index c2ad6c4..4ab20be 100644 --- a/src/gateway/execution.rs +++ b/src/gateway/execution.rs @@ -190,12 +190,6 @@ impl AgentExecutionService { // 只有当是最新回合时才触发历史压缩 let should_schedule_compaction = is_current_turn; - // 拦截 todo_write 结果:持久化 + 前端推送(不受 is_current_turn 限制) - session.intercept_todo_write_results( - &request.result.emitted_messages, - request.chat_id, - ); - Ok(FinalizedAgentResult { outbound_messages, should_schedule_compaction, diff --git a/src/gateway/processor.rs b/src/gateway/processor.rs index 5335132..5194321 100644 --- a/src/gateway/processor.rs +++ b/src/gateway/processor.rs @@ -255,6 +255,7 @@ impl InboundProcessor { inbound.channel.clone(), inbound.chat_id.clone(), emitter_metadata, + self.session_manager.store(), ), self.session_manager.store(), &session_id, diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 133ab79..8da0ed1 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -55,6 +55,7 @@ pub struct BusToolCallEmitter { channel_name: String, chat_id: String, metadata: HashMap, + store: Arc, } impl BusToolCallEmitter { @@ -63,12 +64,14 @@ impl BusToolCallEmitter { channel_name: impl Into, chat_id: impl Into, metadata: HashMap, + store: Arc, ) -> Self { Self { bus, channel_name: channel_name.into(), chat_id: chat_id.into(), metadata, + store, } } } @@ -107,6 +110,55 @@ impl EmittedMessageHandler for BusToolCallEmitter { tracing::error!(error = %error, channel = %self.channel_name, chat_id = %self.chat_id, "Failed to publish live outbound tool call"); } } + + // 拦截 todo_write 结果:即时持久化到 SQLite + if message.tool_name.as_deref() == Some("todo_write") { + self.persist_todo_write_result(&message); + } + } +} + +impl BusToolCallEmitter { + /// 从 todo_write 工具结果中提取 todos 并持久化 + fn persist_todo_write_result(&self, message: &ChatMessage) { + let parsed: serde_json::Value = match serde_json::from_str(&message.content) { + Ok(v) => v, + Err(_) => return, + }; + + let Some(todos_array) = parsed.get("current_todos").and_then(|v| v.as_array()) else { + return; + }; + + let session_id = crate::storage::persistent_session_id(&self.channel_name, &self.chat_id); + let scope_key = &session_id; + + let records: Vec = todos_array + .iter() + .filter_map(|item| { + Some(crate::storage::TodoRecord { + id: item.get("id")?.as_str()?.to_string(), + scope_key: scope_key.clone(), + session_id: session_id.clone(), + topic_id: None, + content: item.get("content")?.as_str()?.to_string(), + status: item.get("status")?.as_str()?.to_string(), + priority: item.get("priority")?.as_str()?.to_string(), + created_at: item.get("created_at")?.as_i64()?, + updated_at: item.get("updated_at")?.as_i64()?, + }) + }) + .collect(); + + tracing::info!( + scope_key = %scope_key, + todo_count = records.len(), + "BusToolCallEmitter: persisting todo_write result" + ); + + if let Err(e) = self.store.replace_todos(scope_key, &records) { + tracing::warn!(error = %e, %scope_key, "Failed to persist todo list from BusToolCallEmitter"); + } } } @@ -1907,6 +1959,7 @@ mod tests { #[tokio::test] async fn test_bus_tool_call_emitter_emits_completed_tool_results() { + let store = Arc::new(SessionStore::in_memory().unwrap()); let bus = MessageBus::new(4); let emitter = BusToolCallEmitter::new( @@ -1914,6 +1967,7 @@ mod tests { "test-channel", "chat-1", HashMap::new(), + store, ); emitter