From d4c15e04787bda20dabb843cd6f6218e477f8332 Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Wed, 13 May 2026 21:56:01 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=20max=5Fconcurrent?= =?UTF-8?q?=5Frequests=20=E9=85=8D=E7=BD=AE=E9=A1=B9=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E5=B9=B6=E5=8F=91=E8=AF=B7=E6=B1=82=E6=8E=A7=E5=88=B6?= =?UTF-8?q?=EF=BC=9B=E6=9B=B4=E6=96=B0=20InboundProcessor=20=E4=BB=A5?= =?UTF-8?q?=E6=94=AF=E6=8C=81=E4=BF=A1=E5=8F=B7=E9=87=8F=E6=8E=A7=E5=88=B6?= =?UTF-8?q?=E5=B9=B6=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config/mod.rs | 7 ++ src/gateway/mod.rs | 9 ++- src/gateway/processor.rs | 140 ++++++++++++++++++++++++--------------- 3 files changed, 102 insertions(+), 54 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index 09dbaf7..bc3b3ad 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -312,6 +312,8 @@ pub struct GatewayConfig { rename = "agent_prompt_reinject_every" )] pub agent_prompt_reinject_every: u64, + #[serde(default = "default_max_concurrent_requests")] + pub max_concurrent_requests: usize, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -589,6 +591,10 @@ fn default_agent_prompt_reinject_every() -> u64 { 100 } +fn default_max_concurrent_requests() -> usize { + 10 +} + impl Default for GatewayConfig { fn default() -> Self { Self { @@ -597,6 +603,7 @@ impl Default for GatewayConfig { show_tool_results: false, chat_history_ttl_hours: Some(4), agent_prompt_reinject_every: default_agent_prompt_reinject_every(), + max_concurrent_requests: default_max_concurrent_requests(), } } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 41a12bc..99a13f5 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -29,6 +29,7 @@ use axum::{Router, routing}; use std::collections::HashMap; use std::sync::Arc; use tokio::net::TcpListener; +use tokio::sync::Semaphore; use crate::bus::MessageBus; use crate::channels::ChannelManager; @@ -92,8 +93,14 @@ impl GatewayState { /// Start the message processing loops pub async fn start_message_processing(&self) { let bus_for_outbound = self.bus.clone(); + + // Create semaphore for controlling concurrent requests + let max_concurrent = self.config.gateway.max_concurrent_requests; + let semaphore = Arc::new(Semaphore::new(max_concurrent)); + + // Spawn inbound processor with semaphore-controlled concurrency let inbound_processor = - InboundProcessor::new(self.bus.clone(), self.session_manager.clone()); + InboundProcessor::new(self.bus.clone(), self.session_manager.clone(), semaphore); tokio::spawn(inbound_processor.run()); // Spawn outbound dispatcher diff --git a/src/gateway/processor.rs b/src/gateway/processor.rs index eda2812..b468194 100644 --- a/src/gateway/processor.rs +++ b/src/gateway/processor.rs @@ -1,6 +1,9 @@ use std::sync::Arc; -use crate::bus::{MessageBus, OutboundMessage}; +use tokio::sync::Semaphore; + +use crate::agent::AgentError; +use crate::bus::{InboundMessage, MessageBus, OutboundMessage}; use super::session::{BusToolCallEmitter, SessionManager}; @@ -8,85 +11,116 @@ use super::session::{BusToolCallEmitter, SessionManager}; pub struct InboundProcessor { bus: Arc, session_manager: SessionManager, + semaphore: Arc, } impl InboundProcessor { - pub fn new(bus: Arc, session_manager: SessionManager) -> Self { + pub fn new( + bus: Arc, + session_manager: SessionManager, + semaphore: Arc, + ) -> Self { Self { bus, session_manager, + semaphore, } } pub async fn run(self) { - tracing::info!("Inbound processor started"); + let max_concurrent = self.semaphore.available_permits(); + tracing::info!( + max_concurrent_requests = max_concurrent, + "Inbound processor started" + ); loop { + // 1. 消费消息 let inbound = self.bus.consume_inbound().await; + #[cfg(debug_assertions)] { tracing::debug!( channel = %inbound.channel, chat_id = %inbound.chat_id, sender = %inbound.sender_id, - content = %inbound.content, + content_len = %inbound.content.len(), media_count = %inbound.media.len(), "Processing inbound message" ); - if !inbound.media.is_empty() { - for (i, media) in inbound.media.iter().enumerate() { - tracing::debug!(media_index = i, media_type = %media.media_type, path = %media.path, "Media item"); + } + + // 2. 获取 semaphore permit(控制并发) + let permit = match self.semaphore.clone().acquire_owned().await { + Ok(permit) => permit, + Err(_) => { + tracing::error!("Semaphore closed, stopping inbound processor"); + break; + } + }; + + // 3. 克隆 processor 用于新任务 + let processor = self.clone(); + + // 4. 独立任务处理(包含 permit,任务完成自动释放) + tokio::spawn(async move { + let _permit = permit; // 持有 permit 直到任务完成 + if let Err(e) = processor.process_one(inbound).await { + tracing::error!(error = %e, "Message processing failed"); + } + }); + } + } + + async fn process_one(&self, inbound: InboundMessage) -> Result<(), AgentError> { + let live_emitter = Arc::new(BusToolCallEmitter::new( + self.bus.clone(), + inbound.channel.clone(), + inbound.chat_id.clone(), + inbound.forwarded_metadata.clone(), + self.session_manager.show_tool_results(), + )); + + match self + .session_manager + .handle_message( + &inbound.channel, + &inbound.sender_id, + &inbound.chat_id, + &inbound.content, + inbound.media, + Some(live_emitter), + ) + .await + { + Ok(outbound_messages) => { + for mut outbound in outbound_messages { + outbound.metadata.extend(inbound.forwarded_metadata.clone()); + if let Err(error) = self.bus.publish_outbound(outbound).await { + tracing::error!(error = %error, "Failed to publish outbound"); } } } - - let live_emitter = Arc::new(BusToolCallEmitter::new( - self.bus.clone(), - inbound.channel.clone(), - inbound.chat_id.clone(), - inbound.forwarded_metadata.clone(), - self.session_manager.show_tool_results(), - )); - - match self - .session_manager - .handle_message( - &inbound.channel, - &inbound.sender_id, - &inbound.chat_id, - &inbound.content, - inbound.media, - Some(live_emitter), - ) - .await - { - Ok(outbound_messages) => { - for mut outbound in outbound_messages { - outbound.metadata.extend(inbound.forwarded_metadata.clone()); - if let Err(error) = self.bus.publish_outbound(outbound).await { - tracing::error!(error = %error, "Failed to publish outbound"); - } - } - } - Err(error) => { - tracing::error!(error = %error, "Failed to handle message"); - let mut metadata = inbound.forwarded_metadata.clone(); - metadata.insert("error_kind".to_string(), "agent_execution".to_string()); - if let Err(publish_error) = self - .bus - .publish_outbound(OutboundMessage::error_notification( - inbound.channel, - inbound.chat_id, - error.to_string(), - None, - metadata, - )) - .await - { - tracing::error!(error = %publish_error, "Failed to publish execution error outbound"); - } + Err(error) => { + tracing::error!(error = %error, "Failed to handle message"); + let mut metadata = inbound.forwarded_metadata.clone(); + metadata.insert("error_kind".to_string(), "agent_execution".to_string()); + if let Err(publish_error) = self + .bus + .publish_outbound(OutboundMessage::error_notification( + inbound.channel, + inbound.chat_id, + error.to_string(), + None, + metadata, + )) + .await + { + tracing::error!(error = %publish_error, "Failed to publish execution error outbound"); } } } + + Ok(()) } }