From 025c0b5d7fec03f1a4811ad9e5137e24a652bd4d Mon Sep 17 00:00:00 2001 From: oudecheng <13802883547@139.com> Date: Fri, 15 May 2026 10:00:17 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20session=5Fid=20?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E5=88=B0=20OutboundMessage=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E4=BC=9A=E8=AF=9D=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/bus/message.rs | 29 +++++++++++++++++++++++++-- src/channels/cli.rs | 2 ++ src/gateway/execution.rs | 1 + src/gateway/processor.rs | 23 ++++++++------------- src/gateway/session.rs | 1 + src/gateway/session_message_sender.rs | 2 ++ src/protocol/ws_adapter.rs | 1 + src/scheduler/mod.rs | 2 ++ 8 files changed, 44 insertions(+), 17 deletions(-) diff --git a/src/bus/message.rs b/src/bus/message.rs index 652b55f..03175f1 100644 --- a/src/bus/message.rs +++ b/src/bus/message.rs @@ -251,7 +251,12 @@ impl InboundMessage { #[derive(Debug, Clone)] pub struct OutboundMessage { pub channel: String, + /// 消息发送目标 ID(如飞书 open_id、微信 chat_id) + /// 注意:这始终是原始入站消息的 chat_id,不会被修改为会话 ID pub chat_id: String, + /// 内部会话 ID(对应 sessions.id) + /// 用于会话管理和消息持久化,与消息发送目标无关 + pub session_id: Option, pub content: String, pub reply_to: Option, pub media: Vec, @@ -281,6 +286,7 @@ impl OutboundMessage { pub fn assistant( channel: impl Into, chat_id: impl Into, + session_id: Option, content: impl Into, reply_to: Option, metadata: HashMap, @@ -288,6 +294,7 @@ impl OutboundMessage { Self { channel: channel.into(), chat_id: chat_id.into(), + session_id, content: content.into(), reply_to, media: Vec::new(), @@ -303,11 +310,12 @@ impl OutboundMessage { pub fn scheduler_notification( channel: impl Into, chat_id: impl Into, + session_id: Option, content: impl Into, reply_to: Option, metadata: HashMap, ) -> Self { - let mut message = Self::assistant(channel, chat_id, content, reply_to, metadata); + let mut message = Self::assistant(channel, chat_id, session_id, content, reply_to, metadata); message.event_kind = OutboundEventKind::SchedulerNotification; message } @@ -315,11 +323,12 @@ impl OutboundMessage { pub fn error_notification( channel: impl Into, chat_id: impl Into, + session_id: Option, content: impl Into, reply_to: Option, metadata: HashMap, ) -> Self { - let mut message = Self::assistant(channel, chat_id, content, reply_to, metadata); + let mut message = Self::assistant(channel, chat_id, session_id, content, reply_to, metadata); message.event_kind = OutboundEventKind::ErrorNotification; message } @@ -327,6 +336,7 @@ impl OutboundMessage { pub fn tool_call( channel: impl Into, chat_id: impl Into, + session_id: Option, message_id: impl Into, tool_name: impl Into, tool_arguments: serde_json::Value, @@ -338,6 +348,7 @@ impl OutboundMessage { Self { channel: channel.into(), chat_id: chat_id.into(), + session_id, content, reply_to, media: Vec::new(), @@ -353,6 +364,7 @@ impl OutboundMessage { pub fn tool_result( channel: impl Into, chat_id: impl Into, + session_id: Option, tool_call_id: impl Into, tool_name: impl Into, content: impl Into, @@ -365,6 +377,7 @@ impl OutboundMessage { Self { channel: channel.into(), chat_id: chat_id.into(), + session_id, content, reply_to, media: Vec::new(), @@ -380,6 +393,7 @@ impl OutboundMessage { pub fn tool_pending( channel: impl Into, chat_id: impl Into, + session_id: Option, tool_call_id: impl Into, tool_name: impl Into, content: impl Into, @@ -392,6 +406,7 @@ impl OutboundMessage { Self { channel: channel.into(), chat_id: chat_id.into(), + session_id, content, reply_to, media: Vec::new(), @@ -407,6 +422,7 @@ impl OutboundMessage { pub fn from_chat_message( channel: &str, chat_id: &str, + session_id: Option, reply_to: Option, metadata: &HashMap, message: &ChatMessage, @@ -419,6 +435,7 @@ impl OutboundMessage { outbound.push(Self::assistant( channel.to_string(), chat_id.to_string(), + session_id.clone(), message.content.clone(), reply_to.clone(), metadata.clone(), @@ -429,6 +446,7 @@ impl OutboundMessage { Self::tool_call( channel.to_string(), chat_id.to_string(), + session_id.clone(), tool_call.id.clone(), tool_call.name.clone(), tool_call.arguments.clone(), @@ -441,6 +459,7 @@ impl OutboundMessage { vec![Self::assistant( channel.to_string(), chat_id.to_string(), + session_id, message.content.clone(), reply_to, metadata.clone(), @@ -455,6 +474,7 @@ impl OutboundMessage { ToolMessageState::Completed => vec![Self::tool_result( channel.to_string(), chat_id.to_string(), + session_id, message.tool_call_id.clone().unwrap_or_default(), message.tool_name.clone().unwrap_or_default(), message.content.clone(), @@ -464,6 +484,7 @@ impl OutboundMessage { ToolMessageState::PendingUserAction => vec![Self::tool_pending( channel.to_string(), chat_id.to_string(), + session_id, message.tool_call_id.clone().unwrap_or_default(), message.tool_name.clone().unwrap_or_default(), message.content.clone(), @@ -562,6 +583,7 @@ mod tests { TEST_CHANNEL, "chat-1", None, + None, &HashMap::new(), &message, ); @@ -599,6 +621,7 @@ mod tests { TEST_CHANNEL, "chat-1", None, + None, &HashMap::new(), &message, ); @@ -618,6 +641,7 @@ mod tests { TEST_CHANNEL, "chat-1", None, + None, &HashMap::new(), &message, ); @@ -639,6 +663,7 @@ mod tests { TEST_CHANNEL, "chat-1", None, + None, &HashMap::new(), &message, ); diff --git a/src/channels/cli.rs b/src/channels/cli.rs index 8e0e82f..0ce9656 100644 --- a/src/channels/cli.rs +++ b/src/channels/cli.rs @@ -119,6 +119,7 @@ mod tests { .send(OutboundMessage::assistant( "cli", "session-1", + None, // session_id "hello", None, HashMap::new(), @@ -143,6 +144,7 @@ mod tests { .send(OutboundMessage::assistant( "cli", "session-1", + None, // session_id "hello", None, HashMap::new(), diff --git a/src/gateway/execution.rs b/src/gateway/execution.rs index 265527a..e6430ee 100644 --- a/src/gateway/execution.rs +++ b/src/gateway/execution.rs @@ -114,6 +114,7 @@ impl AgentExecutionService { OutboundMessage::from_chat_message( request.channel_name, request.chat_id, + None, // session_id None, request.metadata, message, diff --git a/src/gateway/processor.rs b/src/gateway/processor.rs index b945977..9710153 100644 --- a/src/gateway/processor.rs +++ b/src/gateway/processor.rs @@ -125,32 +125,23 @@ impl InboundProcessor { let cmd_ctx = crate::command::context::CommandContext::new(&inbound.channel, &inbound.channel) .with_session_id(&inbound.chat_id); - // 记录是否是创建会话命令(用于后续自动切换) - let is_create_session = matches!(cmd, Command::CreateSession { .. }); + // 记录是否是创建会话命令(用于后续处理) + let _is_create_session = matches!(cmd, Command::CreateSession { .. }); let response = self.command_router.dispatch_with_response(cmd, cmd_ctx).await; // 发送响应给用户 if response.success { - // 如果是创建会话,更新 chat_id 到新会话 - let target_chat_id = if let Some(session_id) = response.metadata.get("session_id") { - if is_create_session { - // 自动切换到新会话 - session_id.clone() - } else { - inbound.chat_id.clone() - } - } else { - inbound.chat_id.clone() - }; - // 提取响应消息 + // chat_id 保持为 inbound.chat_id(飞书 open_id) + // session_id 放入 metadata 用于会话管理 for msg in &response.messages { if let Err(error) = self .bus .publish_outbound(OutboundMessage::assistant( inbound.channel.clone(), - target_chat_id.clone(), + inbound.chat_id.clone(), + response.metadata.get("session_id").cloned(), msg.content.clone(), None, inbound.forwarded_metadata.clone(), @@ -166,6 +157,7 @@ impl InboundProcessor { .publish_outbound(OutboundMessage::assistant( inbound.channel.clone(), inbound.chat_id.clone(), + response.metadata.get("session_id").cloned(), format!("Error [{}]: {}", error.code, error.message), None, inbound.forwarded_metadata.clone(), @@ -216,6 +208,7 @@ impl InboundProcessor { .publish_outbound(OutboundMessage::error_notification( inbound.channel, inbound.chat_id, + None, // session_id error.to_string(), None, metadata, diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 064b84d..60426de 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -79,6 +79,7 @@ impl EmittedMessageHandler for BusToolCallEmitter { for outbound in OutboundMessage::from_chat_message( &self.channel_name, &self.chat_id, + None, // session_id None, &self.metadata, &message, diff --git a/src/gateway/session_message_sender.rs b/src/gateway/session_message_sender.rs index 94be56e..805b905 100644 --- a/src/gateway/session_message_sender.rs +++ b/src/gateway/session_message_sender.rs @@ -47,6 +47,7 @@ impl SessionMessageSender for BusSessionMessageSender { .publish_outbound(OutboundMessage::assistant( channel_name.to_string(), chat_id.to_string(), + None, // session_id text, None, metadata.clone(), @@ -68,6 +69,7 @@ impl SessionMessageSender for BusSessionMessageSender { let mut outbound = OutboundMessage::assistant( channel_name.to_string(), chat_id.to_string(), + None, // session_id String::new(), None, metadata.clone(), diff --git a/src/protocol/ws_adapter.rs b/src/protocol/ws_adapter.rs index 9ae99dd..766829f 100644 --- a/src/protocol/ws_adapter.rs +++ b/src/protocol/ws_adapter.rs @@ -201,6 +201,7 @@ mod tests { let message = OutboundMessage::tool_call( "cli", "session-1", + None, // session_id "call-1", "calculator", json!({"expression": "1 + 1"}), diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 445a5df..12675bf 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -409,6 +409,7 @@ impl Scheduler { .publish_outbound(OutboundMessage::error_notification( channel, chat_id, + None, // session_id format!( "定时任务执行失败:{}\n{}", job.id, @@ -904,6 +905,7 @@ fn build_outbound_message(job: &RuntimeJob) -> anyhow::Result { Ok(OutboundMessage::scheduler_notification( channel, chat_id, + None, // session_id content.to_string(), job.target.reply_to.clone(), metadata,