Compare commits

..

2 Commits

Author SHA1 Message Date
oudecheng
ec5ddf644a fix: 优先使用 topic_id 更新 todo 列表,确保与工具内存状态一致 2026-06-12 17:25:50 +08:00
oudecheng
4866ea9538 fix: todo_write 持久化从 finalize_result 移到 BusToolCallEmitter,即时触发
之前持久化挂在 agent 全部完成后,长任务永远等不到。
现在每个 todo_write 工具调用完成时立即持久化到 SQLite。

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-12 16:58:05 +08:00
4 changed files with 76 additions and 10 deletions

View File

@ -190,12 +190,6 @@ impl AgentExecutionService {
// 只有当是最新回合时才触发历史压缩 // 只有当是最新回合时才触发历史压缩
let should_schedule_compaction = is_current_turn; let should_schedule_compaction = is_current_turn;
// 拦截 todo_write 结果:持久化 + 前端推送(不受 is_current_turn 限制)
session.intercept_todo_write_results(
&request.result.emitted_messages,
request.chat_id,
);
Ok(FinalizedAgentResult { Ok(FinalizedAgentResult {
outbound_messages, outbound_messages,
should_schedule_compaction, should_schedule_compaction,

View File

@ -255,6 +255,7 @@ impl InboundProcessor {
inbound.channel.clone(), inbound.channel.clone(),
inbound.chat_id.clone(), inbound.chat_id.clone(),
emitter_metadata, emitter_metadata,
self.session_manager.store(),
), ),
self.session_manager.store(), self.session_manager.store(),
&session_id, &session_id,

View File

@ -55,6 +55,7 @@ pub struct BusToolCallEmitter {
channel_name: String, channel_name: String,
chat_id: String, chat_id: String,
metadata: HashMap<String, String>, metadata: HashMap<String, String>,
store: Arc<SessionStore>,
} }
impl BusToolCallEmitter { impl BusToolCallEmitter {
@ -63,12 +64,14 @@ impl BusToolCallEmitter {
channel_name: impl Into<String>, channel_name: impl Into<String>,
chat_id: impl Into<String>, chat_id: impl Into<String>,
metadata: HashMap<String, String>, metadata: HashMap<String, String>,
store: Arc<SessionStore>,
) -> Self { ) -> Self {
Self { Self {
bus, bus,
channel_name: channel_name.into(), channel_name: channel_name.into(),
chat_id: chat_id.into(), chat_id: chat_id.into(),
metadata, metadata,
store,
} }
} }
} }
@ -107,6 +110,63 @@ impl EmittedMessageHandler for BusToolCallEmitter {
tracing::error!(error = %error, channel = %self.channel_name, chat_id = %self.chat_id, "Failed to publish live outbound tool call"); tracing::error!(error = %error, channel = %self.channel_name, chat_id = %self.chat_id, "Failed to publish live outbound tool call");
} }
} }
// 拦截 todo_write 结果:即时持久化到 SQLite
if message.tool_name.as_deref() == Some("todo_write") {
self.persist_todo_write_result(&message);
}
}
}
impl BusToolCallEmitter {
/// 从 todo_write 工具结果中提取 todos 并持久化
fn persist_todo_write_result(&self, message: &ChatMessage) {
let parsed: serde_json::Value = match serde_json::from_str(&message.content) {
Ok(v) => v,
Err(_) => return,
};
let Some(todos_array) = parsed.get("current_todos").and_then(|v| v.as_array()) else {
return;
};
let session_id = crate::storage::persistent_session_id(&self.channel_name, &self.chat_id);
// 优先用 topic_id与 list_todos handler 和 tool 内存状态保持一致)
let scope_key = self
.metadata
.get("topic_id")
.filter(|t| !t.is_empty())
.cloned()
.unwrap_or_else(|| session_id.clone());
let topic_id = self.metadata.get("topic_id").filter(|t| !t.is_empty()).cloned();
let records: Vec<crate::storage::TodoRecord> = todos_array
.iter()
.filter_map(|item| {
Some(crate::storage::TodoRecord {
id: item.get("id")?.as_str()?.to_string(),
scope_key: scope_key.clone(),
session_id: session_id.clone(),
topic_id: topic_id.clone(),
content: item.get("content")?.as_str()?.to_string(),
status: item.get("status")?.as_str()?.to_string(),
priority: item.get("priority")?.as_str()?.to_string(),
created_at: item.get("created_at")?.as_i64()?,
updated_at: item.get("updated_at")?.as_i64()?,
})
})
.collect();
tracing::info!(
scope_key = %scope_key,
todo_count = records.len(),
"BusToolCallEmitter: persisting todo_write result"
);
if let Err(e) = self.store.replace_todos(&scope_key, &records) {
tracing::warn!(error = %e, %scope_key, "Failed to persist todo list from BusToolCallEmitter");
}
} }
} }
@ -1907,6 +1967,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_bus_tool_call_emitter_emits_completed_tool_results() { async fn test_bus_tool_call_emitter_emits_completed_tool_results() {
let store = Arc::new(SessionStore::in_memory().unwrap());
let bus = MessageBus::new(4); let bus = MessageBus::new(4);
let emitter = let emitter =
BusToolCallEmitter::new( BusToolCallEmitter::new(
@ -1914,6 +1975,7 @@ mod tests {
"test-channel", "test-channel",
"chat-1", "chat-1",
HashMap::new(), HashMap::new(),
store,
); );
emitter emitter

View File

@ -332,11 +332,20 @@ function App() {
const skillCmd = requestSkillList() const skillCmd = requestSkillList()
handleCommand(skillCmd) handleCommand(skillCmd)
sendMessage({ type: 'command', payload: JSON.stringify(skillCmd) }) sendMessage({ type: 'command', payload: JSON.stringify(skillCmd) })
const todoCmd = requestTodoList()
handleCommand(todoCmd)
sendMessage({ type: 'command', payload: JSON.stringify(todoCmd) })
} }
}, [status, handleCommand, sendMessage, requestMemoryList, requestSkillList, requestTodoList]) }, [status, handleCommand, sendMessage, requestMemoryList, requestSkillList])
// 连接就绪、切换 topic、或进出子代理视图时刷新 todo 列表
const prevTodoTriggerRef = useRef<string>('')
useEffect(() => {
if (status !== 'connected') return
const key = `${selectedTopic ?? ''}|${subAgentView?.taskId ?? ''}`
if (key === prevTodoTriggerRef.current) return
prevTodoTriggerRef.current = key
const todoCmd = requestTodoList()
handleCommand(todoCmd)
sendMessage({ type: 'command', payload: JSON.stringify(todoCmd) })
}, [status, selectedTopic, subAgentView, handleCommand, sendMessage, requestTodoList])
const handleRefreshMemories = useCallback(() => { const handleRefreshMemories = useCallback(() => {
const cmd = requestMemoryList() const cmd = requestMemoryList()