feat: 更新 StopExecution 命令和 CancelManager,支持按话题取消 Agent 执行

This commit is contained in:
oudecheng 2026-06-03 18:13:15 +08:00
parent 1b571e943f
commit 1d4ebb27a7
3 changed files with 38 additions and 30 deletions

View File

@ -6,7 +6,7 @@ use crate::command::response::{CommandError, CommandResponse, MessageKind};
use crate::command::Command; use crate::command::Command;
use crate::gateway::cancel_manager::CancelManager; use crate::gateway::cancel_manager::CancelManager;
/// 处理 StopExecution 命令:取消当前正在执行的 Agent。 /// 处理 StopExecution 命令:按话题取消当前正在执行的 Agent。
pub struct StopExecutionCommandHandler { pub struct StopExecutionCommandHandler {
cancel_manager: CancelManager, cancel_manager: CancelManager,
} }
@ -26,7 +26,7 @@ impl CommandHandler for StopExecutionCommandHandler {
fn metadata(&self) -> Option<CommandMetadata> { fn metadata(&self) -> Option<CommandMetadata> {
Some(CommandMetadata { Some(CommandMetadata {
name: "stop", name: "stop",
description: "停止当前正在执行的 Agent", description: "停止当前话题正在执行的 Agent",
usage: "/stop", usage: "/stop",
}) })
} }
@ -36,10 +36,15 @@ impl CommandHandler for StopExecutionCommandHandler {
_cmd: Command, _cmd: Command,
ctx: CommandContext, ctx: CommandContext,
) -> Result<CommandResponse, CommandError> { ) -> Result<CommandResponse, CommandError> {
let channel = &ctx.channel_name; let topic_id = match ctx.topic_id.as_deref() {
let chat_id = ctx.chat_id.as_deref().unwrap_or("default"); Some(id) => id,
None => {
return Ok(CommandResponse::success(ctx.request_id)
.with_message(MessageKind::Notification, "当前没有活跃的话题,无法停止"));
}
};
let cancelled = self.cancel_manager.cancel(channel, chat_id).await; let cancelled = self.cancel_manager.cancel_by_topic(topic_id).await;
if cancelled { if cancelled {
Ok(CommandResponse::success(ctx.request_id) Ok(CommandResponse::success(ctx.request_id)

View File

@ -4,12 +4,14 @@ use tokio::sync::{Mutex, watch};
/// 共享的 Agent 取消注册表。 /// 共享的 Agent 取消注册表。
/// ///
/// 每个正在执行的 Agent 在启动前注册一个 watch::Sender /// 每个正在执行的 Agent 在启动前按 topic_id 注册一个 watch::Sender
/// 外部(如 /stop 命令)通过 cancel() 发送取消信号。 /// 外部(如 /stop 命令)通过 cancel_by_topic() 发送取消信号。
/// Agent 循环内部通过 watch::Receiver::has_changed() 检测取消。 /// Agent 循环内部通过 watch::Receiver::has_changed() 检测取消。
///
/// key 使用 topic_idUUID全局唯一精确到话题级别。
#[derive(Clone)] #[derive(Clone)]
pub struct CancelManager { pub struct CancelManager {
tokens: Arc<Mutex<HashMap<(String, String), watch::Sender<()>>>>, tokens: Arc<Mutex<HashMap<String, watch::Sender<()>>>>,
} }
impl CancelManager { impl CancelManager {
@ -19,24 +21,22 @@ impl CancelManager {
} }
} }
/// 注册一个取消通道,返回 receiver 供 Agent 持有。 /// 按 topic_id 注册一个取消通道,返回 receiver 供 Agent 持有。
/// ///
/// 如果同 (channel, chat_id) 已有注册,旧 sender 被覆盖并 drop /// 如果同 topic_id 已有注册,旧 sender 被覆盖并 drop
/// 旧 receiver 将收到通道关闭信号。 /// 旧 receiver 将收到通道关闭信号。
pub async fn register(&self, channel: &str, chat_id: &str) -> watch::Receiver<()> { pub async fn register(&self, topic_id: &str) -> watch::Receiver<()> {
let (tx, rx) = watch::channel(()); let (tx, rx) = watch::channel(());
let key = (channel.to_string(), chat_id.to_string()); self.tokens.lock().await.insert(topic_id.to_string(), tx);
self.tokens.lock().await.insert(key, tx);
rx rx
} }
/// 发送取消信号并移除注册条目。 /// 按 topic_id 发送取消信号并移除注册条目。
/// ///
/// 返回 `true` 表示找到了对应的任务并发送了取消信号, /// 返回 `true` 表示找到了对应的任务并发送了取消信号,
/// 返回 `false` 表示没有找到对应的任务(可能已经完成或从未注册)。 /// 返回 `false` 表示没有找到对应的任务(可能已经完成或从未注册)。
pub async fn cancel(&self, channel: &str, chat_id: &str) -> bool { pub async fn cancel_by_topic(&self, topic_id: &str) -> bool {
let key = (channel.to_string(), chat_id.to_string()); if let Some(tx) = self.tokens.lock().await.remove(topic_id) {
if let Some(tx) = self.tokens.lock().await.remove(&key) {
// send 可能失败receiver 已被 drop这不影响语义 // send 可能失败receiver 已被 drop这不影响语义
let _ = tx.send(()); let _ = tx.send(());
true true
@ -45,13 +45,12 @@ impl CancelManager {
} }
} }
/// 正常完成后清理注册条目(幂等)。 /// 按 topic_id 正常完成后清理注册条目(幂等)。
/// ///
/// 与 cancel() 不同,此方法不发送取消信号,仅移除条目。 /// 与 cancel_by_topic() 不同,此方法不发送取消信号,仅移除条目。
/// 如果条目已被 cancel() 移除,此调用为 no-op。 /// 如果条目已被 cancel_by_topic() 移除,此调用为 no-op。
pub async fn remove(&self, channel: &str, chat_id: &str) { pub async fn remove_by_topic(&self, topic_id: &str) {
let key = (channel.to_string(), chat_id.to_string()); self.tokens.lock().await.remove(topic_id);
self.tokens.lock().await.remove(&key);
} }
} }

View File

@ -250,11 +250,13 @@ impl InboundProcessor {
let channel = inbound.channel.clone(); let channel = inbound.channel.clone();
let chat_id = inbound.chat_id.clone(); let chat_id = inbound.chat_id.clone();
// 注册取消信号Agent 构建时通过 Session 消费该 receiver // 按 topic_id 注册取消信号Agent 构建时通过 Session 消费该 receiver
let cancel_rx = self.cancel_manager.register(&channel, &chat_id).await; if let Some(ref topic_id) = current_topic {
self.session_manager let cancel_rx = self.cancel_manager.register(topic_id).await;
.set_agent_cancel_token(&channel, &chat_id, cancel_rx) self.session_manager
.await; .set_agent_cancel_token(&channel, &chat_id, cancel_rx)
.await;
}
match self match self
.session_manager .session_manager
@ -328,8 +330,10 @@ impl InboundProcessor {
} }
} }
// 清理取消信号注册(幂等:如果已被 cancel() 移除则为 no-op // 清理取消信号注册(幂等:如果已被 cancel_by_topic() 移除则为 no-op
self.cancel_manager.remove(&channel, &chat_id).await; if let Some(ref topic_id) = current_topic {
self.cancel_manager.remove_by_topic(topic_id).await;
}
Ok(()) Ok(())
} }