From 1d4ebb27a7501e682b72e8d733d44eb3c08b6816 Mon Sep 17 00:00:00 2001 From: oudecheng <13802883547@139.com> Date: Wed, 3 Jun 2026 18:13:15 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=9B=B4=E6=96=B0=20StopExecution=20?= =?UTF-8?q?=E5=91=BD=E4=BB=A4=E5=92=8C=20CancelManager=EF=BC=8C=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E6=8C=89=E8=AF=9D=E9=A2=98=E5=8F=96=E6=B6=88=20Agent?= =?UTF-8?q?=20=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/command/handlers/stop_execution.rs | 15 +++++++---- src/gateway/cancel_manager.rs | 35 +++++++++++++------------- src/gateway/processor.rs | 18 +++++++------ 3 files changed, 38 insertions(+), 30 deletions(-) diff --git a/src/command/handlers/stop_execution.rs b/src/command/handlers/stop_execution.rs index 9e8b0da..e754db8 100644 --- a/src/command/handlers/stop_execution.rs +++ b/src/command/handlers/stop_execution.rs @@ -6,7 +6,7 @@ use crate::command::response::{CommandError, CommandResponse, MessageKind}; use crate::command::Command; use crate::gateway::cancel_manager::CancelManager; -/// 处理 StopExecution 命令:取消当前正在执行的 Agent。 +/// 处理 StopExecution 命令:按话题取消当前正在执行的 Agent。 pub struct StopExecutionCommandHandler { cancel_manager: CancelManager, } @@ -26,7 +26,7 @@ impl CommandHandler for StopExecutionCommandHandler { fn metadata(&self) -> Option { Some(CommandMetadata { name: "stop", - description: "停止当前正在执行的 Agent", + description: "停止当前话题正在执行的 Agent", usage: "/stop", }) } @@ -36,10 +36,15 @@ impl CommandHandler for StopExecutionCommandHandler { _cmd: Command, ctx: CommandContext, ) -> Result { - let channel = &ctx.channel_name; - let chat_id = ctx.chat_id.as_deref().unwrap_or("default"); + let topic_id = match ctx.topic_id.as_deref() { + Some(id) => id, + None => { + return Ok(CommandResponse::success(ctx.request_id) + .with_message(MessageKind::Notification, "当前没有活跃的话题,无法停止")); + } + }; - let cancelled = self.cancel_manager.cancel(channel, chat_id).await; + let cancelled = self.cancel_manager.cancel_by_topic(topic_id).await; if cancelled { Ok(CommandResponse::success(ctx.request_id) diff --git a/src/gateway/cancel_manager.rs b/src/gateway/cancel_manager.rs index a541a7a..e056bbe 100644 --- a/src/gateway/cancel_manager.rs +++ b/src/gateway/cancel_manager.rs @@ -4,12 +4,14 @@ use tokio::sync::{Mutex, watch}; /// 共享的 Agent 取消注册表。 /// -/// 每个正在执行的 Agent 在启动前注册一个 watch::Sender, -/// 外部(如 /stop 命令)通过 cancel() 发送取消信号。 +/// 每个正在执行的 Agent 在启动前按 topic_id 注册一个 watch::Sender, +/// 外部(如 /stop 命令)通过 cancel_by_topic() 发送取消信号。 /// Agent 循环内部通过 watch::Receiver::has_changed() 检测取消。 +/// +/// key 使用 topic_id(UUID),全局唯一,精确到话题级别。 #[derive(Clone)] pub struct CancelManager { - tokens: Arc>>>, + tokens: Arc>>>, } impl CancelManager { @@ -19,24 +21,22 @@ impl CancelManager { } } - /// 注册一个取消通道,返回 receiver 供 Agent 持有。 + /// 按 topic_id 注册一个取消通道,返回 receiver 供 Agent 持有。 /// - /// 如果同 (channel, chat_id) 已有注册,旧 sender 被覆盖并 drop, + /// 如果同 topic_id 已有注册,旧 sender 被覆盖并 drop, /// 旧 receiver 将收到通道关闭信号。 - pub async fn register(&self, channel: &str, chat_id: &str) -> watch::Receiver<()> { + pub async fn register(&self, topic_id: &str) -> watch::Receiver<()> { let (tx, rx) = watch::channel(()); - let key = (channel.to_string(), chat_id.to_string()); - self.tokens.lock().await.insert(key, tx); + self.tokens.lock().await.insert(topic_id.to_string(), tx); rx } - /// 发送取消信号并移除注册条目。 + /// 按 topic_id 发送取消信号并移除注册条目。 /// /// 返回 `true` 表示找到了对应的任务并发送了取消信号, /// 返回 `false` 表示没有找到对应的任务(可能已经完成或从未注册)。 - pub async fn cancel(&self, channel: &str, chat_id: &str) -> bool { - let key = (channel.to_string(), chat_id.to_string()); - if let Some(tx) = self.tokens.lock().await.remove(&key) { + pub async fn cancel_by_topic(&self, topic_id: &str) -> bool { + if let Some(tx) = self.tokens.lock().await.remove(topic_id) { // send 可能失败(receiver 已被 drop),这不影响语义 let _ = tx.send(()); true @@ -45,13 +45,12 @@ impl CancelManager { } } - /// 正常完成后清理注册条目(幂等)。 + /// 按 topic_id 正常完成后清理注册条目(幂等)。 /// - /// 与 cancel() 不同,此方法不发送取消信号,仅移除条目。 - /// 如果条目已被 cancel() 移除,此调用为 no-op。 - pub async fn remove(&self, channel: &str, chat_id: &str) { - let key = (channel.to_string(), chat_id.to_string()); - self.tokens.lock().await.remove(&key); + /// 与 cancel_by_topic() 不同,此方法不发送取消信号,仅移除条目。 + /// 如果条目已被 cancel_by_topic() 移除,此调用为 no-op。 + pub async fn remove_by_topic(&self, topic_id: &str) { + self.tokens.lock().await.remove(topic_id); } } diff --git a/src/gateway/processor.rs b/src/gateway/processor.rs index 6cf934d..7fb7b09 100644 --- a/src/gateway/processor.rs +++ b/src/gateway/processor.rs @@ -250,11 +250,13 @@ impl InboundProcessor { let channel = inbound.channel.clone(); let chat_id = inbound.chat_id.clone(); - // 注册取消信号:Agent 构建时通过 Session 消费该 receiver - let cancel_rx = self.cancel_manager.register(&channel, &chat_id).await; - self.session_manager - .set_agent_cancel_token(&channel, &chat_id, cancel_rx) - .await; + // 按 topic_id 注册取消信号:Agent 构建时通过 Session 消费该 receiver + if let Some(ref topic_id) = current_topic { + let cancel_rx = self.cancel_manager.register(topic_id).await; + self.session_manager + .set_agent_cancel_token(&channel, &chat_id, cancel_rx) + .await; + } match self .session_manager @@ -328,8 +330,10 @@ impl InboundProcessor { } } - // 清理取消信号注册(幂等:如果已被 cancel() 移除则为 no-op) - self.cancel_manager.remove(&channel, &chat_id).await; + // 清理取消信号注册(幂等:如果已被 cancel_by_topic() 移除则为 no-op) + if let Some(ref topic_id) = current_topic { + self.cancel_manager.remove_by_topic(topic_id).await; + } Ok(()) }