feat: 添加 max_concurrent_requests 配置项,优化并发请求控制;更新 InboundProcessor 以支持信号量控制并发
This commit is contained in:
parent
35b9c42d07
commit
d4c15e0478
@ -312,6 +312,8 @@ pub struct GatewayConfig {
|
||||
rename = "agent_prompt_reinject_every"
|
||||
)]
|
||||
pub agent_prompt_reinject_every: u64,
|
||||
#[serde(default = "default_max_concurrent_requests")]
|
||||
pub max_concurrent_requests: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
@ -589,6 +591,10 @@ fn default_agent_prompt_reinject_every() -> u64 {
|
||||
100
|
||||
}
|
||||
|
||||
fn default_max_concurrent_requests() -> usize {
|
||||
10
|
||||
}
|
||||
|
||||
impl Default for GatewayConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
@ -597,6 +603,7 @@ impl Default for GatewayConfig {
|
||||
show_tool_results: false,
|
||||
chat_history_ttl_hours: Some(4),
|
||||
agent_prompt_reinject_every: default_agent_prompt_reinject_every(),
|
||||
max_concurrent_requests: default_max_concurrent_requests(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -29,6 +29,7 @@ use axum::{Router, routing};
|
||||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
use tokio::net::TcpListener;
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::bus::MessageBus;
|
||||
use crate::channels::ChannelManager;
|
||||
@ -92,8 +93,14 @@ impl GatewayState {
|
||||
/// Start the message processing loops
|
||||
pub async fn start_message_processing(&self) {
|
||||
let bus_for_outbound = self.bus.clone();
|
||||
|
||||
// Create semaphore for controlling concurrent requests
|
||||
let max_concurrent = self.config.gateway.max_concurrent_requests;
|
||||
let semaphore = Arc::new(Semaphore::new(max_concurrent));
|
||||
|
||||
// Spawn inbound processor with semaphore-controlled concurrency
|
||||
let inbound_processor =
|
||||
InboundProcessor::new(self.bus.clone(), self.session_manager.clone());
|
||||
InboundProcessor::new(self.bus.clone(), self.session_manager.clone(), semaphore);
|
||||
tokio::spawn(inbound_processor.run());
|
||||
|
||||
// Spawn outbound dispatcher
|
||||
|
||||
@ -1,6 +1,9 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::bus::{MessageBus, OutboundMessage};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use crate::agent::AgentError;
|
||||
use crate::bus::{InboundMessage, MessageBus, OutboundMessage};
|
||||
|
||||
use super::session::{BusToolCallEmitter, SessionManager};
|
||||
|
||||
@ -8,85 +11,116 @@ use super::session::{BusToolCallEmitter, SessionManager};
|
||||
pub struct InboundProcessor {
|
||||
bus: Arc<MessageBus>,
|
||||
session_manager: SessionManager,
|
||||
semaphore: Arc<Semaphore>,
|
||||
}
|
||||
|
||||
impl InboundProcessor {
|
||||
pub fn new(bus: Arc<MessageBus>, session_manager: SessionManager) -> Self {
|
||||
pub fn new(
|
||||
bus: Arc<MessageBus>,
|
||||
session_manager: SessionManager,
|
||||
semaphore: Arc<Semaphore>,
|
||||
) -> Self {
|
||||
Self {
|
||||
bus,
|
||||
session_manager,
|
||||
semaphore,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn run(self) {
|
||||
tracing::info!("Inbound processor started");
|
||||
let max_concurrent = self.semaphore.available_permits();
|
||||
tracing::info!(
|
||||
max_concurrent_requests = max_concurrent,
|
||||
"Inbound processor started"
|
||||
);
|
||||
|
||||
loop {
|
||||
// 1. 消费消息
|
||||
let inbound = self.bus.consume_inbound().await;
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
{
|
||||
tracing::debug!(
|
||||
channel = %inbound.channel,
|
||||
chat_id = %inbound.chat_id,
|
||||
sender = %inbound.sender_id,
|
||||
content = %inbound.content,
|
||||
content_len = %inbound.content.len(),
|
||||
media_count = %inbound.media.len(),
|
||||
"Processing inbound message"
|
||||
);
|
||||
if !inbound.media.is_empty() {
|
||||
for (i, media) in inbound.media.iter().enumerate() {
|
||||
tracing::debug!(media_index = i, media_type = %media.media_type, path = %media.path, "Media item");
|
||||
}
|
||||
|
||||
// 2. 获取 semaphore permit(控制并发)
|
||||
let permit = match self.semaphore.clone().acquire_owned().await {
|
||||
Ok(permit) => permit,
|
||||
Err(_) => {
|
||||
tracing::error!("Semaphore closed, stopping inbound processor");
|
||||
break;
|
||||
}
|
||||
};
|
||||
|
||||
// 3. 克隆 processor 用于新任务
|
||||
let processor = self.clone();
|
||||
|
||||
// 4. 独立任务处理(包含 permit,任务完成自动释放)
|
||||
tokio::spawn(async move {
|
||||
let _permit = permit; // 持有 permit 直到任务完成
|
||||
if let Err(e) = processor.process_one(inbound).await {
|
||||
tracing::error!(error = %e, "Message processing failed");
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
async fn process_one(&self, inbound: InboundMessage) -> Result<(), AgentError> {
|
||||
let live_emitter = Arc::new(BusToolCallEmitter::new(
|
||||
self.bus.clone(),
|
||||
inbound.channel.clone(),
|
||||
inbound.chat_id.clone(),
|
||||
inbound.forwarded_metadata.clone(),
|
||||
self.session_manager.show_tool_results(),
|
||||
));
|
||||
|
||||
match self
|
||||
.session_manager
|
||||
.handle_message(
|
||||
&inbound.channel,
|
||||
&inbound.sender_id,
|
||||
&inbound.chat_id,
|
||||
&inbound.content,
|
||||
inbound.media,
|
||||
Some(live_emitter),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(outbound_messages) => {
|
||||
for mut outbound in outbound_messages {
|
||||
outbound.metadata.extend(inbound.forwarded_metadata.clone());
|
||||
if let Err(error) = self.bus.publish_outbound(outbound).await {
|
||||
tracing::error!(error = %error, "Failed to publish outbound");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let live_emitter = Arc::new(BusToolCallEmitter::new(
|
||||
self.bus.clone(),
|
||||
inbound.channel.clone(),
|
||||
inbound.chat_id.clone(),
|
||||
inbound.forwarded_metadata.clone(),
|
||||
self.session_manager.show_tool_results(),
|
||||
));
|
||||
|
||||
match self
|
||||
.session_manager
|
||||
.handle_message(
|
||||
&inbound.channel,
|
||||
&inbound.sender_id,
|
||||
&inbound.chat_id,
|
||||
&inbound.content,
|
||||
inbound.media,
|
||||
Some(live_emitter),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(outbound_messages) => {
|
||||
for mut outbound in outbound_messages {
|
||||
outbound.metadata.extend(inbound.forwarded_metadata.clone());
|
||||
if let Err(error) = self.bus.publish_outbound(outbound).await {
|
||||
tracing::error!(error = %error, "Failed to publish outbound");
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::error!(error = %error, "Failed to handle message");
|
||||
let mut metadata = inbound.forwarded_metadata.clone();
|
||||
metadata.insert("error_kind".to_string(), "agent_execution".to_string());
|
||||
if let Err(publish_error) = self
|
||||
.bus
|
||||
.publish_outbound(OutboundMessage::error_notification(
|
||||
inbound.channel,
|
||||
inbound.chat_id,
|
||||
error.to_string(),
|
||||
None,
|
||||
metadata,
|
||||
))
|
||||
.await
|
||||
{
|
||||
tracing::error!(error = %publish_error, "Failed to publish execution error outbound");
|
||||
}
|
||||
Err(error) => {
|
||||
tracing::error!(error = %error, "Failed to handle message");
|
||||
let mut metadata = inbound.forwarded_metadata.clone();
|
||||
metadata.insert("error_kind".to_string(), "agent_execution".to_string());
|
||||
if let Err(publish_error) = self
|
||||
.bus
|
||||
.publish_outbound(OutboundMessage::error_notification(
|
||||
inbound.channel,
|
||||
inbound.chat_id,
|
||||
error.to_string(),
|
||||
None,
|
||||
metadata,
|
||||
))
|
||||
.await
|
||||
{
|
||||
tracing::error!(error = %publish_error, "Failed to publish execution error outbound");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user