feat: 实现消息发送的速率限制,确保同一用户之间的最小间隔

This commit is contained in:
ooodc 2026-06-06 10:57:40 +08:00
parent 988e77123c
commit b5a1635a05

View File

@ -2,7 +2,7 @@ use std::collections::HashMap;
use std::path::Path;
use std::path::PathBuf;
use std::sync::{
Arc,
Arc, Mutex,
atomic::{AtomicBool, Ordering},
};
use std::time::{Duration, UNIX_EPOCH};
@ -18,6 +18,44 @@ use crate::bus::message::OutboundEventKind;
use crate::channels::base::{Channel, ChannelError};
use crate::config::{LLMProviderConfig, WechatChannelConfig};
/// Rate limiter: ensures minimum interval between messages to the same chat.
static LAST_SEND: std::sync::LazyLock<Mutex<HashMap<String, tokio::time::Instant>>> =
std::sync::LazyLock::new(|| Mutex::new(HashMap::new()));
/// Minimum interval between consecutive messages to the same WeChat user.
const MIN_MSG_INTERVAL_MS: u64 = 1500;
/// Wait if needed to respect the rate limit, then record the send time.
async fn throttle(chat_id: &str) {
let now = tokio::time::Instant::now();
let sleep_ms = {
let mut map = LAST_SEND.lock().unwrap();
let delay = map.get(chat_id).and_then(|last| {
let elapsed = now.duration_since(*last);
if elapsed < Duration::from_millis(MIN_MSG_INTERVAL_MS) {
Some(Duration::from_millis(MIN_MSG_INTERVAL_MS) - elapsed)
} else {
None
}
});
// Record the expected send time now (before sleep), so concurrent
// callers also see the updated time and don't race.
map.insert(chat_id.to_string(), now);
delay
};
if let Some(ms) = sleep_ms {
tokio::time::sleep(ms).await;
}
}
/// Update rate-limit timestamp (without waiting) — used after each chunk.
fn touch_rate_limit(chat_id: &str) {
LAST_SEND
.lock()
.unwrap()
.insert(chat_id.to_string(), tokio::time::Instant::now());
}
#[derive(Clone)]
pub struct WechatChannel {
name: String,
@ -313,6 +351,9 @@ impl Channel for WechatChannel {
let mut text_sent = false;
if !text.is_empty() {
// Rate limit: ensure minimum interval between messages to the same user
throttle(&msg.chat_id).await;
let chunks = split_text(&text, MAX_WECHAT_CHUNK_CHARS);
if chunks.len() > 1 {
tracing::info!(
@ -335,6 +376,8 @@ impl Channel for WechatChannel {
error
))
})?;
// Update rate-limit timestamp so the next message or chunk respects the gap
touch_rate_limit(&msg.chat_id);
tracing::info!(
channel = %self.name,
chat_id = %msg.chat_id,