From 4cb26b5b675867763bb6d10e4f6f72afdc4ea8ed Mon Sep 17 00:00:00 2001 From: oudecheng <13802883547@139.com> Date: Fri, 29 May 2026 11:15:38 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=AD=90=E6=99=BA=E8=83=BD=E4=BD=93?= =?UTF-8?q?=E4=BB=BB=E5=8A=A1=E6=B6=88=E6=81=AF=E6=9F=A5=E7=9C=8B=EF=BC=8C?= =?UTF-8?q?=E5=AE=9E=E6=97=B6=E5=B9=BF=E6=92=AD=E5=B7=A5=E5=85=B7=E8=B0=83?= =?UTF-8?q?=E7=94=A8=E4=BA=8B=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 LoadTaskMessages 命令,加载子智能体任务的历史消息 - SubAgentEmitter 通过 MessageBus 实时广播子智能体工具调用 - 前端新增子智能体视图,支持导航进入/退出子智能体会话 - 外部渠道过滤子智能体事件,避免推送到飞书/微信 - ToolCall/ToolResult 新增 subagent_task_id 字段 Co-Authored-By: Claude Opus 4.7 --- src/channels/feishu.rs | 4 +- src/channels/wechat.rs | 4 +- src/command/handlers/load_task_messages.rs | 171 ++++++++++++++++++ src/command/handlers/mod.rs | 1 + src/command/mod.rs | 3 + src/gateway/mod.rs | 1 + src/gateway/runtime.rs | 5 + src/gateway/session.rs | 1 + src/gateway/ws.rs | 50 ++++++ src/protocol/mod.rs | 13 ++ src/protocol/ws_adapter.rs | 4 + src/storage/mod.rs | 27 +++ src/tools/task/repository.rs | 22 ++- src/tools/task/runtime.rs | 87 +++++++++- web/src/App.tsx | 94 ++++++++-- web/src/components/Chat/ChatContainer.tsx | 4 +- web/src/components/Chat/MessageBubble.tsx | 191 +++++++++++++++++---- web/src/components/Chat/MessageList.tsx | 5 +- web/src/hooks/useChat.ts | 186 +++++++++++++++++++- web/src/types/protocol.ts | 27 +++ 20 files changed, 838 insertions(+), 62 deletions(-) create mode 100644 src/command/handlers/load_task_messages.rs diff --git a/src/channels/feishu.rs b/src/channels/feishu.rs index 59f8c1f..c27cd36 100644 --- a/src/channels/feishu.rs +++ b/src/channels/feishu.rs @@ -2402,7 +2402,9 @@ impl Channel for FeishuChannel { } async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError> { - if matches!(msg.event_kind, OutboundEventKind::ToolResult | OutboundEventKind::ToolPending) { + if matches!(msg.event_kind, OutboundEventKind::ToolResult | OutboundEventKind::ToolPending) + || msg.metadata.get("is_subagent_event").map(|v| v == "true").unwrap_or(false) + { return Ok(()); } diff --git a/src/channels/wechat.rs b/src/channels/wechat.rs index 226aae5..00112b1 100644 --- a/src/channels/wechat.rs +++ b/src/channels/wechat.rs @@ -287,7 +287,9 @@ impl Channel for WechatChannel { } async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError> { - if matches!(msg.event_kind, OutboundEventKind::ToolResult | OutboundEventKind::ToolPending) { + if matches!(msg.event_kind, OutboundEventKind::ToolResult | OutboundEventKind::ToolPending) + || msg.metadata.get("is_subagent_event").map(|v| v == "true").unwrap_or(false) + { return Ok(()); } diff --git a/src/command/handlers/load_task_messages.rs b/src/command/handlers/load_task_messages.rs new file mode 100644 index 0000000..9cb0470 --- /dev/null +++ b/src/command/handlers/load_task_messages.rs @@ -0,0 +1,171 @@ +use crate::command::context::CommandContext; +use crate::command::handler::{CommandHandler, CommandMetadata}; +use crate::command::response::{CommandError, CommandResponse}; +use crate::command::Command; +use crate::storage::SessionStore; +use crate::tools::task::repository::TaskRepository; +use crate::tools::task::types::{TaskSession, TaskSessionState}; +use async_trait::async_trait; +use std::sync::Arc; + +pub struct LoadTaskMessagesCommandHandler { + task_repository: Arc, + store: Arc, +} + +impl LoadTaskMessagesCommandHandler { + pub fn new( + task_repository: Arc, + store: Arc, + ) -> Self { + Self { task_repository, store } + } +} + +#[async_trait] +impl CommandHandler for LoadTaskMessagesCommandHandler { + fn can_handle(&self, cmd: &Command) -> bool { + matches!(cmd, Command::LoadTaskMessages { .. }) + } + + fn metadata(&self) -> Option { + Some(CommandMetadata { + name: "load_task_messages", + description: "加载子智能体任务的消息历史", + usage: "/load_task_messages ", + }) + } + + async fn handle( + &self, + cmd: Command, + ctx: CommandContext, + ) -> Result { + match cmd { + Command::LoadTaskMessages { task_id } => { + handle_load_task_messages(self, task_id, ctx).await + } + _ => unreachable!(), + } + } +} + +async fn handle_load_task_messages( + handler: &LoadTaskMessagesCommandHandler, + task_id: String, + ctx: CommandContext, +) -> Result { + tracing::info!( + task_id = %task_id, + request_id = %ctx.request_id, + "LoadTaskMessages: looking up task" + ); + + // 1. Try in-memory repository first + let task = match handler + .task_repository + .load_task_session(&task_id) + .await + { + Ok(Some(task)) => { + tracing::info!( + task_id = %task.id, + session_id = %task.session_id, + state = ?task.state, + "LoadTaskMessages: task found in memory" + ); + Some(task) + } + Ok(None) => { + tracing::info!( + task_id = %task_id, + "LoadTaskMessages: task not in memory, searching database" + ); + // 2. Fall back to database (survives restarts) + reconstruct_task_from_db(&handler.store, &task_id)? + } + Err(e) => { + tracing::error!( + task_id = %task_id, + error = %e, + "LoadTaskMessages: repository error during lookup" + ); + return Err(CommandError::new("LOAD_TASK_ERROR", e.to_string())); + } + }; + + let task = task.ok_or_else(|| { + tracing::warn!( + task_id = %task_id, + "LoadTaskMessages: task not found in repository or database" + ); + CommandError::new("TASK_NOT_FOUND", format!("Task not found: {}", task_id)) + })?; + + let status = format!("{:?}", task.state).to_lowercase(); + + let mut response = CommandResponse::success(ctx.request_id) + .with_metadata("task_session_id", &task.session_id) + .with_metadata("task_id", &task.id) + .with_metadata("task_description", &task.description) + .with_metadata("task_subagent_type", &task.subagent_type) + .with_metadata("task_status", &status); + + if let Some(ref summary) = task.summary { + response = response.with_metadata("task_summary", summary); + } + + Ok(response) +} + +/// Reconstruct a TaskSession from the database when it's not in the in-memory repository. +/// Task sessions have id format: sub:{parent_session_id}:task:{uuid} +fn reconstruct_task_from_db( + store: &SessionStore, + task_id: &str, +) -> Result, CommandError> { + let sessions = store + .find_sessions_by_id_suffix(&format!(":{}", task_id)) + .map_err(|e| CommandError::new("DB_ERROR", e.to_string()))?; + + if sessions.is_empty() { + return Ok(None); + } + + let record = &sessions[0]; + let session_id = record.id.clone(); + + // Extract parent_session_id from session_id: "sub:{parent}:task:{uuid}" + let parent_session_id = session_id + .strip_prefix("sub:") + .and_then(|rest| { + let pos = rest.find(":task:"); + pos.map(|p| rest[..p].to_string()) + }) + .unwrap_or_default(); + + // Extract description from title: "Subagent: {description}" + let description = record + .title + .strip_prefix("Subagent: ") + .unwrap_or(&record.title) + .to_string(); + + let now = record.updated_at; + + Ok(Some(TaskSession { + id: task_id.to_string(), + session_id, + parent_session_id, + parent_topic_id: None, + parent_chat_id: record.chat_id.clone(), + parent_channel_name: record.channel_name.clone(), + description, + subagent_type: "general".to_string(), + state: TaskSessionState::Completed, + created_at: record.created_at, + updated_at: now, + summary: None, + error: None, + })) +} diff --git a/src/command/handlers/mod.rs b/src/command/handlers/mod.rs index 63c770b..5b89bb2 100644 --- a/src/command/handlers/mod.rs +++ b/src/command/handlers/mod.rs @@ -4,6 +4,7 @@ pub mod list_channels; pub mod list_sessions; pub mod list_sessions_by_channel; pub mod list_topics; +pub mod load_task_messages; pub mod load_topic; pub mod save_session; pub mod save_topic; diff --git a/src/command/mod.rs b/src/command/mod.rs index 0d3a050..bf55c9b 100644 --- a/src/command/mod.rs +++ b/src/command/mod.rs @@ -43,6 +43,8 @@ pub enum Command { }, /// 列出 Session 的所有 Topics ListTopics { session_id: String }, + /// 加载子智能体任务的消息历史 + LoadTaskMessages { task_id: String }, } impl Command { @@ -60,6 +62,7 @@ impl Command { Command::ListChannels => "list_channels", Command::ListSessionsByChannel { .. } => "list_sessions_by_channel", Command::ListTopics { .. } => "list_topics", + Command::LoadTaskMessages { .. } => "load_task_messages", } } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 6a30535..0dd73f5 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -91,6 +91,7 @@ impl GatewayState { config.memory_maintenance.clone(), session_ttl_hours, mcp_config, + Some(bus.clone()), )?; Ok(Self { diff --git a/src/gateway/runtime.rs b/src/gateway/runtime.rs index 2da441f..bc40403 100644 --- a/src/gateway/runtime.rs +++ b/src/gateway/runtime.rs @@ -4,6 +4,7 @@ use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::agent::AgentError; +use crate::bus::MessageBus; use crate::config::{LLMProviderConfig, MemoryMaintenanceConfig, SubagentsConfig, TaskConfig}; use crate::gateway::tool_registry_factory::ToolRegistryFactory; use crate::mcp::McpInitializer; @@ -44,6 +45,7 @@ pub(crate) fn build_session_manager( maintenance_config: MemoryMaintenanceConfig, session_ttl_hours: Option, mcp_config: crate::mcp::McpConfig, + bus: Option>, ) -> Result<(SessionManager, Arc), AgentError> { build_session_manager_with_sender( agent_prompt_reinject_every, @@ -59,6 +61,7 @@ pub(crate) fn build_session_manager( maintenance_config, session_ttl_hours, mcp_config, + bus, ) } @@ -77,6 +80,7 @@ pub(crate) fn build_session_manager_with_sender( maintenance_config: MemoryMaintenanceConfig, session_ttl_hours: Option, mcp_config: crate::mcp::McpConfig, + bus: Option>, ) -> Result<(SessionManager, Arc), AgentError> { let store = Arc::new( SessionStore::new() @@ -188,6 +192,7 @@ pub(crate) fn build_session_manager_with_sender( subagent_tools, provider_config.clone(), catalog, + bus.clone(), )); (factory.with_subagent_runtime(subagent_runtime), task_repository) diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 4d56032..bc9d4c3 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -505,6 +505,7 @@ impl SessionManager { maintenance_config, session_ttl_hours, mcp_config, + None, ) .map(|(session_manager, _)| session_manager) } diff --git a/src/gateway/ws.rs b/src/gateway/ws.rs index 937ff60..b843181 100644 --- a/src/gateway/ws.rs +++ b/src/gateway/ws.rs @@ -11,6 +11,7 @@ use crate::command::handlers::list_channels::ListChannelsCommandHandler; use crate::command::handlers::list_sessions::ListSessionsCommandHandler; use crate::command::handlers::list_sessions_by_channel::ListSessionsByChannelCommandHandler; use crate::command::handlers::list_topics::ListTopicsCommandHandler; +use crate::command::handlers::load_task_messages::LoadTaskMessagesCommandHandler; use crate::command::handlers::load_topic::LoadTopicCommandHandler; use crate::command::handlers::save_session::SaveSessionCommandHandler; use crate::command::handlers::session::SessionCommandHandler; @@ -299,6 +300,11 @@ async fn handle_inbound( router.register(Box::new(GetCurrentSessionCommandHandler::new(store.clone()))); // 注册 load_topic 处理器 router.register(Box::new(LoadTopicCommandHandler::new(store.clone()))); + // 注册 load_task_messages 处理器 + router.register(Box::new(LoadTaskMessagesCommandHandler::new( + state.task_repository.clone(), + store.clone(), + ))); router.register(Box::new(SaveSessionCommandHandler::new( store.clone(), state.task_repository.clone(), @@ -359,6 +365,28 @@ async fn handle_inbound( tracing::warn!(error = %e, topic_id = %topic_id, "Failed to send topic history"); } } + // 加载子智能体任务消息 + if let Some(task_session_id) = response.metadata.get("task_session_id") { + if let Err(e) = send_task_messages(&store, task_session_id, sender).await { + tracing::warn!(error = %e, task_session_id = %task_session_id, "Failed to send task messages"); + } + + // 发送 TaskMessagesLoaded 元数据 + let task_id = response.metadata.get("task_id").cloned().unwrap_or_default(); + let description = response.metadata.get("task_description").cloned().unwrap_or_default(); + let subagent_type = response.metadata.get("task_subagent_type").cloned().unwrap_or_default(); + let status = response.metadata.get("task_status").cloned().unwrap_or_default(); + let summary = response.metadata.get("task_summary").cloned(); + + let _ = sender.send(WsOutbound::TaskMessagesLoaded { + task_id, + description, + subagent_type, + status, + summary, + }).await; + } + if current_topic_id.is_none() { if let Some(topics_json) = response.metadata.get("topics") { match serde_json::from_str::>(topics_json) { @@ -438,6 +466,26 @@ async fn send_topic_history( Ok(()) } +/// 加载并发送子智能体任务的历史消息 +async fn send_task_messages( + store: &Arc, + session_id: &str, + sender: &mpsc::Sender, +) -> Result<(), Box> { + let messages = store.load_messages(session_id)?; + + tracing::info!(session_id = %session_id, message_count = messages.len(), "Sending task messages"); + + for msg in messages { + let outbound = chat_message_to_ws_outbound(&msg); + if let Some(outbound) = outbound { + let _ = sender.send(outbound).await; + } + } + + Ok(()) +} + /// 将 ChatMessage 转换为 WsOutbound fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option { use crate::bus::message::ToolMessageState; @@ -454,6 +502,7 @@ fn chat_message_to_ws_outbound(msg: &crate::bus::ChatMessage) -> Option Option Some(WsOutbound::ToolPending { id: msg.id.clone(), diff --git a/src/protocol/mod.rs b/src/protocol/mod.rs index 918f4f0..289e1e1 100644 --- a/src/protocol/mod.rs +++ b/src/protocol/mod.rs @@ -88,6 +88,8 @@ pub enum WsOutbound { arguments: serde_json::Value, content: String, role: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + subagent_task_id: Option, }, #[serde(rename = "tool_result")] ToolResult { @@ -96,6 +98,8 @@ pub enum WsOutbound { tool_name: String, content: String, role: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + subagent_task_id: Option, }, #[serde(rename = "tool_pending")] ToolPending { @@ -137,6 +141,15 @@ pub enum WsOutbound { }, #[serde(rename = "session_saved")] SessionSaved { session_id: String, filepath: String }, + #[serde(rename = "task_messages_loaded")] + TaskMessagesLoaded { + task_id: String, + description: String, + subagent_type: String, + status: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + summary: Option, + }, #[serde(rename = "pong")] Pong, } diff --git a/src/protocol/ws_adapter.rs b/src/protocol/ws_adapter.rs index f2dcd5c..9c67821 100644 --- a/src/protocol/ws_adapter.rs +++ b/src/protocol/ws_adapter.rs @@ -31,6 +31,7 @@ pub(crate) fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec Vec vec![WsOutbound::ToolPending { id: message.id.clone(), @@ -101,6 +103,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve .unwrap_or(serde_json::Value::Null), content: message.content.clone(), role: message.role.clone(), + subagent_task_id: message.metadata.get("subagent_task_id").cloned(), }], OutboundEventKind::ToolResult => vec![WsOutbound::ToolResult { id: message @@ -111,6 +114,7 @@ pub(crate) fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Ve tool_name: message.tool_name.clone().unwrap_or_default(), content: message.content.clone(), role: message.role.clone(), + subagent_task_id: message.metadata.get("subagent_task_id").cloned(), }], OutboundEventKind::ToolPending => vec![WsOutbound::ToolPending { id: message diff --git a/src/storage/mod.rs b/src/storage/mod.rs index bb8fee6..0009b8c 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -319,6 +319,33 @@ impl SessionStore { .map_err(StorageError::from) } + /// Find sessions whose id ends with the given suffix (used for task session lookup) + pub fn find_sessions_by_id_suffix( + &self, + suffix: &str, + ) -> Result, StorageError> { + let conn = self.conn.lock().expect("session db mutex poisoned"); + let pattern = format!("%{}", suffix); + let mut stmt = conn.prepare( + " + SELECT id, title, channel_name, chat_id, summary, + created_at, updated_at, last_active_at, + archived_at, deleted_at, message_count, + user_turn_count, agent_prompt_reinjection_count + FROM sessions + WHERE id LIKE ?1 AND deleted_at IS NULL + ORDER BY last_active_at DESC + ", + )?; + + let rows = stmt.query_map(params![pattern], map_session_record)?; + let mut sessions = Vec::new(); + for row in rows { + sessions.push(row?); + } + Ok(sessions) + } + pub fn list_sessions( &self, channel_name: &str, diff --git a/src/tools/task/repository.rs b/src/tools/task/repository.rs index d3a9117..20fe23f 100644 --- a/src/tools/task/repository.rs +++ b/src/tools/task/repository.rs @@ -57,15 +57,35 @@ impl Default for InMemoryTaskRepository { #[async_trait] impl TaskRepository for InMemoryTaskRepository { async fn save_task_session(&self, session: &TaskSession) -> Result<(), StorageError> { + tracing::warn!( + task_id = %session.id, + session_id = %session.session_id, + state = ?session.state, + "REPO_SAVE: Saving task session" + ); self.sessions .write() .unwrap() .insert(session.id.clone(), session.clone()); + tracing::warn!( + task_id = %session.id, + total_tasks = self.sessions.read().unwrap().len(), + "REPO_SAVE: Task session saved, current repository size" + ); Ok(()) } async fn load_task_session(&self, task_id: &str) -> Result, StorageError> { - Ok(self.sessions.read().unwrap().get(task_id).cloned()) + let sessions = self.sessions.read().unwrap(); + let total = sessions.len(); + let keys: Vec<&str> = sessions.keys().map(|k| k.as_str()).collect(); + tracing::warn!( + lookup_task_id = %task_id, + total_tasks = total, + all_keys = ?keys, + "REPO_LOOKUP: Looking up task session" + ); + Ok(sessions.get(task_id).cloned()) } async fn delete_task_session(&self, task_id: &str) -> Result { diff --git a/src/tools/task/runtime.rs b/src/tools/task/runtime.rs index 2393195..bc52531 100644 --- a/src/tools/task/runtime.rs +++ b/src/tools/task/runtime.rs @@ -1,4 +1,4 @@ -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::fs; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -7,8 +7,10 @@ use std::time::Duration; use async_trait::async_trait; use serde::Deserialize; -use crate::agent::{AgentLoop, AgentRuntimeConfig, SystemPrompt, SystemPromptContext, SystemPromptProvider}; +use crate::agent::{AgentLoop, AgentRuntimeConfig, EmittedMessageHandler, SystemPrompt, SystemPromptContext, SystemPromptProvider}; use crate::bus::ChatMessage; +use crate::bus::message::OutboundMessage; +use crate::bus::MessageBus; use crate::config::{LLMProviderConfig, SubagentsConfig}; use crate::storage::ConversationRepository; use crate::tools::{ToolContext, ToolRegistry}; @@ -97,6 +99,37 @@ impl StaticSystemPromptProvider { } } +/// 子智能体工具调用实时广播器(不依赖 gateway 层) +struct SubAgentEmitter { + bus: Arc, + channel_name: String, + chat_id: String, + metadata: HashMap, +} + +#[async_trait] +impl EmittedMessageHandler for SubAgentEmitter { + async fn handle(&self, message: ChatMessage) { + for outbound in OutboundMessage::from_chat_message( + &self.channel_name, + &self.chat_id, + None, + None, + &self.metadata, + &message, + ) { + if let Err(error) = self.bus.publish_outbound(outbound).await { + tracing::error!( + error = %error, + channel = %self.channel_name, + chat_id = %self.chat_id, + "Failed to publish live sub-agent tool call" + ); + } + } + } +} + impl SystemPromptProvider for StaticSystemPromptProvider { fn build(&self, _context: &SystemPromptContext) -> Option { Some(SystemPrompt { @@ -115,6 +148,7 @@ pub struct DefaultSubAgentRuntime { provider_config: LLMProviderConfig, /// 子代理定义目录(内置 + 自定义) catalog: Arc, + bus: Option>, } impl DefaultSubAgentRuntime { @@ -125,6 +159,7 @@ impl DefaultSubAgentRuntime { subagent_tools: Arc, provider_config: LLMProviderConfig, catalog: Arc, + bus: Option>, ) -> Self { Self { config, @@ -133,6 +168,7 @@ impl DefaultSubAgentRuntime { subagent_tools, provider_config, catalog, + bus, } } @@ -174,16 +210,34 @@ impl DefaultSubAgentRuntime { None, // 子代理不需要 skill provider ) .map(|agent| { - agent.with_tool_context(ToolContext { + let agent = agent.with_tool_context(ToolContext { channel_name: Some(session.parent_channel_name.clone()), sender_id: None, - chat_id: Some(session.parent_chat_id.clone()), // 使用父会话 chat_id - session_id: Some(session.session_id.clone()), // 子代理自己的 session_id - topic_id: session.parent_topic_id.clone(), // 继承父话题 ID + chat_id: Some(session.parent_chat_id.clone()), + session_id: Some(session.session_id.clone()), + topic_id: session.parent_topic_id.clone(), message_id: None, message_seq: None, subagent_description: Some(session.description.clone()), - }) + }); + + // 如果有 MessageBus,附加实时广播 emitter + if let Some(bus) = &self.bus { + let mut metadata = HashMap::new(); + metadata.insert("subagent_task_id".to_string(), session.id.clone()); + metadata.insert("is_subagent_event".to_string(), "true".to_string()); + + let emitter = Arc::new(SubAgentEmitter { + bus: bus.clone(), + channel_name: session.parent_channel_name.clone(), + chat_id: session.parent_chat_id.clone(), + metadata, + }); + + return agent.with_emitted_message_handler(emitter); + } + + agent }) .map_err(|e| TaskError::AgentCreationFailed(e.to_string())) } @@ -340,6 +394,13 @@ impl SubAgentRuntime for DefaultSubAgentRuntime { } // 5. 保存任务会话 + tracing::info!( + task_id = %session.id, + session_id = %session.session_id, + description = %session.description, + subagent_type = %session.subagent_type, + "Spawning sub-agent task" + ); self.task_repository.save_task_session(&session).await?; // 6. 构建子代理系统提示词 @@ -364,12 +425,24 @@ impl SubAgentRuntime for DefaultSubAgentRuntime { Ok(tool_result) => { let mut session = session; session.mark_completed(tool_result.summary.clone()); + tracing::info!( + task_id = %session.id, + session_id = %session.session_id, + "Task completed, updating session" + ); self.task_repository.save_task_session(&session).await?; Ok(tool_result) } Err(e) => { let mut session = session; let status = e.as_status(); + tracing::warn!( + task_id = %session.id, + session_id = %session.session_id, + status = %status, + error = %e, + "Task failed, updating session" + ); if status == "timeout" { session.mark_timeout(); } else { diff --git a/web/src/App.tsx b/web/src/App.tsx index e232b24..4c143ce 100644 --- a/web/src/App.tsx +++ b/web/src/App.tsx @@ -1,5 +1,5 @@ import { useCallback, useEffect, useMemo, useRef } from 'react' -import { Zap, Cpu, MessageSquare } from 'lucide-react' +import { Zap, Cpu, MessageSquare, ArrowLeft, Bot } from 'lucide-react' import { ChatContainer } from './components/Chat/ChatContainer' import { TopicList } from './components/Sidebar/TopicList' import { SessionInfo } from './components/Sidebar/SessionInfo' @@ -29,6 +29,8 @@ function App() { messages, isLoading, isReadOnly, + // 子智能体视图 + subAgentView, // 方法 handleMessage, handleCommand, @@ -38,6 +40,8 @@ function App() { switchTopic, requestSessionList, requestTopicList, + enterSubAgentView, + exitSubAgentView, } = useChat() const { status, sendMessage } = useWebSocket({ @@ -156,6 +160,25 @@ function App() { [sendMessage, handleCommand, switchTopic, selectTopic] ) + const handleNavigateToSubAgent = useCallback( + (taskId: string, description: string) => { + const cmd = enterSubAgentView(taskId, description) + handleCommand(cmd) + sendMessage({ type: 'command', payload: JSON.stringify(cmd) }) + }, + [enterSubAgentView, handleCommand, sendMessage] + ) + + const handleExitSubAgentView = useCallback(() => { + exitSubAgentView() + // Reload current topic messages + if (selectedTopic) { + const cmd: Command = { type: 'load_topic', topic_id: selectedTopic } + handleCommand(cmd) + sendMessage({ type: 'command', payload: JSON.stringify(cmd) }) + } + }, [exitSubAgentView, selectedTopic, handleCommand, sendMessage]) + const chatMessages = useMemo(() => { const result: ChatMessage[] = [] const toolCallIndex = new Map() @@ -230,18 +253,13 @@ function App() { {/* Main Content */}
- {/* Left Sidebar - 简化为 Session 信息 + Topic 列表 */} -
- {/* Session Info */} + {/* Left Sidebar */} +
- - {/* Divider */}
- - {/* Topic List */}
{/* Center - Chat */} -
- +
+ {/* Sub-agent back bar */} + {subAgentView && ( +
+ +
+
+ + 子智能体: + {subAgentView.description} +
+
+
+ 类型: + {subAgentView.subagentType || '...'} +
+
+
+ 状态: + + {subAgentView.status === 'completed' ? '已完成' : + subAgentView.status === 'failed' ? '失败' : + subAgentView.status === 'timeout' ? '超时' : + subAgentView.status === 'running' ? '执行中' : + subAgentView.status === 'loading' ? '加载中...' : + subAgentView.status} + +
+
+ )} +
+ {} : handleSendMessage} + onNavigateToSubAgent={handleNavigateToSubAgent} + /> +
{/* Right Sidebar - Tool Panel */} diff --git a/web/src/components/Chat/ChatContainer.tsx b/web/src/components/Chat/ChatContainer.tsx index b1e4aca..a1f29c6 100644 --- a/web/src/components/Chat/ChatContainer.tsx +++ b/web/src/components/Chat/ChatContainer.tsx @@ -8,6 +8,7 @@ interface ChatContainerProps { isReadOnly?: boolean channelName?: string onSendMessage: (content: string) => void + onNavigateToSubAgent?: (taskId: string, description: string) => void } export function ChatContainer({ @@ -16,11 +17,12 @@ export function ChatContainer({ isReadOnly = false, channelName, onSendMessage, + onNavigateToSubAgent, }: ChatContainerProps) { return (
- +
void } function getAttachmentIcon(mediaType: string) { @@ -110,7 +111,26 @@ function CopyButton({ text, className = '' }: { text: string; className?: string ) } -export function MessageBubble({ message }: MessageBubbleProps) { +function parseTaskResult(content: string): TaskToolResult | null { + if (!content) return null + try { + const parsed = JSON.parse(content) + if ( + parsed && + typeof parsed.status === 'string' && + typeof parsed.output === 'string' && + typeof parsed.summary === 'string' && + typeof parsed.task_id === 'string' + ) { + return parsed as TaskToolResult + } + return null + } catch { + return null + } +} + +export function MessageBubble({ message, onNavigateToSubAgent }: MessageBubbleProps) { const isUser = message.role === 'user' const isTool = message.role === 'tool' const isMergedTool = message.type === 'merged_tool' @@ -174,28 +194,66 @@ export function MessageBubble({ message }: MessageBubbleProps) { const displayContent = hasResult ? stripToolResultPrefix(message.resultContent!) : '' + const isTaskTool = message.toolName === 'task' + const taskResult = isTaskTool && hasResult ? parseTaskResult(message.resultContent!) : null + const isSubAgent = !!message.subagentTaskId + const subagentType = (message.arguments as Record | null)?.subagent_type as string || 'general' + const taskDescription = (message.arguments as Record | null)?.description as string || '' + const taskPrompt = (message.arguments as Record | null)?.prompt as string || '' + + // task tool 专用的状态配色 + const taskStatusConfig = { + success: { dot: 'bg-emerald-400', label: '成功', borderColor: 'border-emerald-500/40', labelColor: 'text-emerald-400' }, + failed: { dot: 'bg-red-400', label: '失败', borderColor: 'border-red-500/40', labelColor: 'text-red-400' }, + timeout: { dot: 'bg-amber-400', label: '超时', borderColor: 'border-amber-500/40', labelColor: 'text-amber-400' }, + } as const + return (
-
- +
+ {isTaskTool ? ( + + ) : ( + + )}
{message.toolName || 'Tool'} + {isTaskTool && ( + + 子智能体·{subagentType} + + )} + {!isTaskTool && isSubAgent && ( + + 子智能体 + + )} {formatTime(message.timestamp)}
setToolExpanded(!toolExpanded)} - className={`cursor-pointer rounded-xl border bg-[#1a1a25]/60 w-full transition-all duration-500 hover:bg-[#1a1a25]/80 group ${statusConfig.fullBorder}`} + className={`cursor-pointer rounded-xl border bg-[#1a1a25]/60 w-full transition-all duration-500 hover:bg-[#1a1a25]/80 group ${ + taskResult ? taskStatusConfig[taskResult.status].borderColor : statusConfig.fullBorder + }`} > {/* Header row */}
- - {message.toolName || 'Tool'} - - {statusConfig.label} + + + {isTaskTool ? (taskDescription || '子智能体任务') : (message.toolName || 'Tool')} - {hasResult && } + + {taskResult ? taskStatusConfig[taskResult.status].label : statusConfig.label} + + {hasResult && } {toolExpanded ? ( @@ -208,7 +266,7 @@ export function MessageBubble({ message }: MessageBubbleProps) { {/* Collapsed preview */} {!toolExpanded && ( <> - {hasArgs && argsPreview && ( + {hasArgs && argsPreview && !taskResult && (
)} - {hasResult ? ( + {taskResult && taskResult.summary && ( +
+ {taskResult.summary} +
+ )} + {taskResult ? ( + <> +
+ +
+
+ 点击查看子智能体输出 +
+ + ) : hasResult ? (
点击查看工具结果
) : ( -
等待工具执行...
+
+ {isTaskTool ? '子智能体正在执行...' : '等待工具执行...'} +
)} )} @@ -232,24 +315,70 @@ export function MessageBubble({ message }: MessageBubbleProps) { {/* Expanded */} {toolExpanded && (
- {hasArgs && ( -
-
参数
-
-                      {JSON.stringify(message.arguments, null, 2)}
-                    
-
- )} - {hasResult && ( -
-
结果
-
-                      {formatJSON(displayContent)}
-                    
-
- )} - {!hasArgs && !hasResult && ( -
等待工具执行...
+ {taskResult ? ( + <> + {taskPrompt && ( +
+
任务指令
+
+                          {taskPrompt}
+                        
+
+ )} + {taskResult.summary && ( +
+
摘要
+
{taskResult.summary}
+
+ )} +
+
输出
+
+ + {taskResult.output} + +
+
+
+ task_id: {taskResult.task_id} +
+ + + ) : ( + <> + {hasArgs && ( +
+
参数
+
+                          {JSON.stringify(message.arguments, null, 2)}
+                        
+
+ )} + {hasResult && ( +
+
结果
+
+                          {formatJSON(displayContent)}
+                        
+
+ )} + {!hasArgs && !hasResult && ( +
等待工具执行...
+ )} + )}
)} diff --git a/web/src/components/Chat/MessageList.tsx b/web/src/components/Chat/MessageList.tsx index 92bc24d..3302f0b 100644 --- a/web/src/components/Chat/MessageList.tsx +++ b/web/src/components/Chat/MessageList.tsx @@ -5,9 +5,10 @@ import { Sparkles } from 'lucide-react' interface MessageListProps { messages: ChatMessage[] + onNavigateToSubAgent?: (taskId: string, description: string) => void } -export function MessageList({ messages }: MessageListProps) { +export function MessageList({ messages, onNavigateToSubAgent }: MessageListProps) { const bottomRef = useRef(null) const containerRef = useRef(null) @@ -41,7 +42,7 @@ export function MessageList({ messages }: MessageListProps) { className="h-full overflow-y-auto p-6 space-y-6" > {messages.map((message) => ( - + ))}
diff --git a/web/src/hooks/useChat.ts b/web/src/hooks/useChat.ts index 2a7c0a2..1266afe 100644 --- a/web/src/hooks/useChat.ts +++ b/web/src/hooks/useChat.ts @@ -1,4 +1,4 @@ -import { useState, useCallback, useRef, useMemo } from 'react' +import { useState, useCallback, useEffect, useRef, useMemo } from 'react' import type { Command, ChatMessage, @@ -13,6 +13,7 @@ import type { TopicList, TopicSummary, Session, + TaskMessagesLoaded, } from '../types/protocol' // 简化后的层级状态 @@ -35,6 +36,9 @@ interface UseChatReturn { // 是否只读(WebSocket 通道始终可写) isReadOnly: boolean + // 子智能体视图 + subAgentView: SubAgentView | null + // 方法 handleMessage: (content: string) => void handleCommand: (command: Command) => void @@ -49,6 +53,19 @@ interface UseChatReturn { // 初始化方法 requestSessionList: () => Command requestTopicList: () => Command | null + + // 子智能体导航方法 + enterSubAgentView: (taskId: string, description: string) => Command + exitSubAgentView: () => void +} + +interface SubAgentView { + taskId: string + description: string + subagentType: string + status: string + summary?: string + messages: ChatMessage[] } const DEFAULT_CHANNEL = 'websocket' @@ -63,6 +80,7 @@ export function useChat(): UseChatReturn { const [session, setSession] = useState(null) const [topics, setTopics] = useState([]) const [selectedTopic, setSelectedTopic] = useState(null) + const [subAgentView, setSubAgentView] = useState(null) // Message ID generator const messageIdCounter = useRef(0) @@ -71,13 +89,137 @@ export function useChat(): UseChatReturn { return `msg_${Date.now()}_${messageIdCounter.current}` } + // Ref to track subAgentView for use in callbacks + const subAgentViewRef = useRef(null) + const isConnected = useMemo(() => connectionId !== null, [connectionId]) const sessionId = useMemo(() => session?.id ?? null, [session]) const chatId = useMemo(() => sessionId ?? DEFAULT_CHAT_ID, [sessionId]) + // Extract subagent_task_id from a message if present + const getSubagentTaskId = (message: WsOutbound): string | undefined => { + if (message.type === 'tool_call' || message.type === 'tool_result') { + return (message as ToolCall | ToolResult).subagent_task_id + } + return undefined + } + + // Convert a server message to ChatMessage (extracted from handleServerMessage logic) + const serverMessageToChatMessage = (message: WsOutbound): ChatMessage | null => { + switch (message.type) { + case 'assistant_response': { + const msg = message as AssistantResponse + const role = msg.role === 'user' || msg.role === 'tool' ? msg.role : 'assistant' + return { + id: msg.id, + role: role as ChatMessage['role'], + content: msg.content, + timestamp: Date.now(), + type: 'message', + attachments: msg.attachments, + } + } + case 'tool_call': { + const msg = message as ToolCall + return { + id: msg.id, + role: 'tool', + content: msg.content, + timestamp: Date.now(), + type: 'tool_call', + toolName: msg.tool_name, + toolCallId: msg.tool_call_id, + arguments: msg.arguments, + subagentTaskId: msg.subagent_task_id, + } + } + case 'tool_result': { + const msg = message as ToolResult + return { + id: msg.id, + role: 'tool', + content: msg.content, + timestamp: Date.now(), + type: 'tool_result', + toolName: msg.tool_name, + toolCallId: msg.tool_call_id, + subagentTaskId: msg.subagent_task_id, + } + } + case 'tool_pending': { + const msg = message as ToolPending + return { + id: msg.id, + role: 'tool', + content: `${msg.content}\n\n${msg.resume_hint}`, + timestamp: Date.now(), + type: 'tool_pending', + toolName: msg.tool_name, + toolCallId: msg.tool_call_id, + } + } + case 'error': { + return { + id: generateMessageId(), + role: 'assistant', + content: `Error: ${message.message}`, + timestamp: Date.now(), + type: 'message', + } + } + default: + return null + } + } + + // Append a server message to the sub-agent view + const appendToSubAgentViewMessage = (message: WsOutbound) => { + const chatMsg = serverMessageToChatMessage(message) + if (chatMsg) { + setSubAgentView((prev) => + prev + ? { ...prev, messages: [...prev.messages, chatMsg] } + : prev + ) + } + } + const handleServerMessage = useCallback((message: WsOutbound) => { console.log('Received message:', message) + // Route to sub-agent view if active + const currentSubAgentView = subAgentViewRef.current + if (currentSubAgentView) { + if (message.type === 'task_messages_loaded') { + const msg = message as TaskMessagesLoaded + setSubAgentView((prev) => + prev + ? { + ...prev, + subagentType: msg.subagent_type, + status: msg.status, + summary: msg.summary, + } + : prev + ) + return + } + + // Route messages to sub-agent view: + // - Messages without subagent_task_id = loaded history, always accept + // - Messages with subagent_task_id = live emitter, only accept if matching + const msgSubagentTaskId = getSubagentTaskId(message) + if (!msgSubagentTaskId || msgSubagentTaskId === currentSubAgentView.taskId) { + appendToSubAgentViewMessage(message) + } + return + } + + // In main view, skip sub-agent messages (they belong to sub-agent view) + if (getSubagentTaskId(message)) { + return + } + switch (message.type) { case 'session_established': { const msg = message as SessionEstablished @@ -168,6 +310,7 @@ export function useChat(): UseChatReturn { toolName: msg.tool_name, toolCallId: msg.tool_call_id, arguments: msg.arguments, + subagentTaskId: msg.subagent_task_id, }, ]) break @@ -185,6 +328,7 @@ export function useChat(): UseChatReturn { type: 'tool_result', toolName: msg.tool_name, toolCallId: msg.tool_call_id, + subagentTaskId: msg.subagent_task_id, }, ]) break @@ -297,6 +441,41 @@ export function useChat(): UseChatReturn { } }, [sessionId]) + // Keep ref in sync with state + useEffect(() => { + subAgentViewRef.current = subAgentView + }, [subAgentView]) + + const enterSubAgentView = useCallback((taskId: string, description: string): Command => { + const newView: SubAgentView = { + taskId, + description, + subagentType: '', + status: 'loading', + messages: [], + } + // Sync ref immediately so WebSocket response routing works correctly + subAgentViewRef.current = newView + setSubAgentView(newView) + return { + type: 'load_task_messages', + task_id: taskId, + } + }, []) + + const exitSubAgentView = useCallback(() => { + subAgentViewRef.current = null + setSubAgentView(null) + }, []) + + // Memoize messages: when in sub-agent view, return sub-agent messages + const resolvedMessages = useMemo(() => { + if (subAgentView) { + return subAgentView.messages + } + return messages + }, [subAgentView, messages]) + // WebSocket 通道始终可写 const isReadOnly = false @@ -308,9 +487,10 @@ export function useChat(): UseChatReturn { chatId, topics, selectedTopic, - messages, + messages: resolvedMessages, isLoading, isReadOnly, + subAgentView, handleMessage, handleCommand, clearMessages, @@ -320,5 +500,7 @@ export function useChat(): UseChatReturn { switchTopic, requestSessionList, requestTopicList, + enterSubAgentView, + exitSubAgentView, } } diff --git a/web/src/types/protocol.ts b/web/src/types/protocol.ts index b50b1da..5804966 100644 --- a/web/src/types/protocol.ts +++ b/web/src/types/protocol.ts @@ -51,6 +51,7 @@ export interface ToolCall { arguments: unknown content: string role: string + subagent_task_id?: string } export interface ToolResult { @@ -60,6 +61,7 @@ export interface ToolResult { tool_name: string content: string role: string + subagent_task_id?: string } export interface ToolPending { @@ -151,6 +153,15 @@ export interface Pong { type: 'pong' } +export interface TaskMessagesLoaded { + type: 'task_messages_loaded' + task_id: string + description: string + subagent_type: string + status: string + summary?: string +} + export type WsOutbound = | AssistantResponse | ToolCall @@ -164,6 +175,7 @@ export type WsOutbound = | SessionSaved | TopicList | ChannelList + | TaskMessagesLoaded | Pong // ============================================================================ @@ -226,6 +238,11 @@ export interface ListTopicsCommand { session_id: string } +export interface LoadTaskMessagesCommand { + type: 'load_task_messages' + task_id: string +} + export type Command = | CreateSessionCommand | ListSessionsCommand @@ -238,6 +255,7 @@ export type Command = | ListChannelsCommand | ListSessionsByChannelCommand | ListTopicsCommand + | LoadTaskMessagesCommand // ============================================================================ // UI Types @@ -256,6 +274,15 @@ export interface ChatMessage { status?: 'calling' | 'result' | 'pending' resultContent?: string callContent?: string + subagentTaskId?: string +} + +/** task 工具返回的 JSON 结构 */ +export interface TaskToolResult { + status: 'success' | 'failed' | 'timeout' + summary: string + output: string + task_id: string } export interface Topic {