重构:斜杠命令通过 handle_message 处理,Channel 不再管理 Session

架构优化:
- Feishu 渠道的 handle_and_publish 简化为只发布消息到 bus
- Session 创建/复用由 SessionManager 在 handle_message 内部处理
- 斜杠命令检测移到 handle_message,execute_slash_command 直接调用
- CLI 和 Feishu 统一通过消息总线处理斜杠命令
This commit is contained in:
xiaoxixi 2026-04-28 21:10:28 +08:00
parent e787203e94
commit e61b78eaff
2 changed files with 26 additions and 90 deletions

View File

@ -12,9 +12,7 @@ use tokio::sync::{broadcast, RwLock};
use crate::bus::{MessageBus, MediaItem, OutboundMessage}; use crate::bus::{MessageBus, MediaItem, OutboundMessage};
use crate::channels::base::{Channel, ChannelError}; use crate::channels::base::{Channel, ChannelError};
use crate::channels::slash_command::parse_slash_command;
use crate::config::FeishuChannelConfig; use crate::config::FeishuChannelConfig;
use crate::session::{SessionCommand, SessionEvent};
const FEISHU_API_BASE: &str = "https://open.feishu.cn/open-apis"; const FEISHU_API_BASE: &str = "https://open.feishu.cn/open-apis";
const FEISHU_WS_BASE: &str = "https://open.feishu.cn"; const FEISHU_WS_BASE: &str = "https://open.feishu.cn";
@ -2003,94 +2001,8 @@ impl Channel for FeishuChannel {
bus: &Arc<MessageBus>, bus: &Arc<MessageBus>,
msg: &crate::bus::InboundMessage, msg: &crate::bus::InboundMessage,
) -> Result<(), ChannelError> { ) -> Result<(), ChannelError> {
use tokio::sync::mpsc; // All messages (including slash commands) go through the normal inbound flow
// SessionManager handles session creation/reuse internally
if !self.is_allowed(&msg.sender_id) {
tracing::warn!(
channel = %self.name(),
sender = %msg.sender_id,
"Access denied"
);
return Ok(());
}
// Check for slash command
if let Some((cmd_name, cmd_args)) = parse_slash_command(&msg.content) {
tracing::info!(cmd = %cmd_name, "Feishu slash command detected");
// For slash commands, we need to ensure a session exists first
// Get or create session via control plane
let (reply_tx, mut reply_rx) = mpsc::channel(1);
bus.publish_control(crate::bus::ControlMessage {
op: SessionCommand::CreateDialog {
channel: msg.channel.clone(),
chat_id: msg.chat_id.clone(),
title: None,
},
reply_tx,
}).await?;
// Wait for session creation
let session_id = match reply_rx.recv().await {
Some(Ok(SessionEvent::DialogCreated { session_id, .. })) => Some(session_id),
_ => None,
};
// Now execute the slash command with the session
let (cmd_reply_tx, mut cmd_reply_rx) = mpsc::channel(1);
bus.publish_control(crate::bus::ControlMessage {
op: SessionCommand::ExecuteSlashCommand {
command: cmd_name.to_string(),
args: if cmd_args.is_empty() { None } else { Some(cmd_args.to_string()) },
channel: msg.channel.clone(),
chat_id: msg.chat_id.clone(),
current_session_id: session_id,
},
reply_tx: cmd_reply_tx,
}).await?;
// Handle response
if let Some(result) = reply_rx.recv().await {
match result {
Ok(SessionEvent::SlashCommandExecuted { message, .. }) => {
let outbound = crate::bus::OutboundMessage {
channel: msg.channel.clone(),
chat_id: msg.chat_id.clone(),
content: message,
reply_to: None,
media: vec![],
metadata: msg.forwarded_metadata.clone(),
};
bus.publish_outbound(outbound).await?;
}
Ok(SessionEvent::Error { message, .. }) => {
let outbound = crate::bus::OutboundMessage {
channel: msg.channel.clone(),
chat_id: msg.chat_id.clone(),
content: message,
reply_to: None,
media: vec![],
metadata: msg.forwarded_metadata.clone(),
};
bus.publish_outbound(outbound).await?;
}
Err(e) => {
let outbound = crate::bus::OutboundMessage {
channel: msg.channel.clone(),
chat_id: msg.chat_id.clone(),
content: format!("Error: {}", e),
reply_to: None,
media: vec![],
metadata: msg.forwarded_metadata.clone(),
};
bus.publish_outbound(outbound).await?;
}
_ => {}
}
}
return Ok(());
}
bus.publish_inbound(msg.clone()).await?; bus.publish_inbound(msg.clone()).await?;
Ok(()) Ok(())
} }

View File

@ -6,6 +6,7 @@ use tokio::sync::{Mutex, mpsc};
use uuid::Uuid; use uuid::Uuid;
use crate::bus::ChatMessage; use crate::bus::ChatMessage;
use crate::channels::slash_command::parse_slash_command;
use crate::config::LLMProviderConfig; use crate::config::LLMProviderConfig;
use crate::agent::{AgentLoop, AgentError, ContextCompressor}; use crate::agent::{AgentLoop, AgentError, ContextCompressor};
use crate::agent::system_prompt::build_system_prompt; use crate::agent::system_prompt::build_system_prompt;
@ -517,6 +518,29 @@ impl SessionManager {
let unified_id = UnifiedSessionId::new(channel, chat_id, dialog_id); let unified_id = UnifiedSessionId::new(channel, chat_id, dialog_id);
let session = self.get_or_create_session(&unified_id).await?; let session = self.get_or_create_session(&unified_id).await?;
// Check for slash command
if let Some((cmd_name, cmd_args)) = parse_slash_command(content) {
let (new_session_id, response) = self.execute_slash_command(
cmd_name,
if cmd_args.is_empty() { None } else { Some(cmd_args) },
channel,
chat_id,
Some(&unified_id),
).await?;
// If a new session was created (e.g., /new, /delete), update the session binding
if let Some(new_id) = new_session_id {
// Update the session in the map with the new ID
let mut inner = self.inner.lock().await;
if let Some(old_session) = inner.sessions.remove(&unified_id.to_string()) {
inner.sessions.insert(new_id.to_string(), old_session);
}
}
return Ok(response);
}
// Normal message handling through LLM
let response: String = { let response: String = {
let mut session_guard = session.lock().await; let mut session_guard = session.lock().await;