diff --git a/src/bus/message.rs b/src/bus/message.rs index 02b38b8..3a55053 100644 --- a/src/bus/message.rs +++ b/src/bus/message.rs @@ -145,7 +145,10 @@ pub struct InboundMessage { pub content: String, pub timestamp: i64, pub media: Vec, + /// Channel-specific data used internally by the channel (not forwarded). pub metadata: HashMap, + /// Data forwarded from inbound to outbound (copied to OutboundMessage.metadata by gateway). + pub forwarded_metadata: HashMap, } impl InboundMessage { diff --git a/src/channels/feishu.rs b/src/channels/feishu.rs index 99119dc..5835dbe 100644 --- a/src/channels/feishu.rs +++ b/src/channels/feishu.rs @@ -1,10 +1,13 @@ +use std::collections::HashMap; use std::path::Path; use std::sync::Arc; +use std::time::{Duration, Instant}; + use async_trait::async_trait; -use tokio::sync::{broadcast, RwLock}; -use serde::Deserialize; use futures_util::{SinkExt, StreamExt}; use prost::{Message as ProstMessage, bytes::Bytes}; +use serde::Deserialize; +use tokio::sync::{broadcast, RwLock}; use crate::bus::{MessageBus, MediaItem, OutboundMessage}; use crate::channels::base::{Channel, ChannelError}; @@ -13,6 +16,18 @@ use crate::config::{FeishuChannelConfig, LLMProviderConfig}; const FEISHU_API_BASE: &str = "https://open.feishu.cn/open-apis"; const FEISHU_WS_BASE: &str = "https://open.feishu.cn"; +/// Heartbeat timeout for WS connection — must be larger than ping_interval (default 120 s). +/// If no binary frame (pong or event) is received within this window, reconnect. +const WS_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(300); +/// Refresh tenant token this many seconds before the announced expiry. +const TOKEN_REFRESH_SKEW: Duration = Duration::from_secs(120); +/// Default tenant token TTL when `expire`/`expires_in` is absent. +const DEFAULT_TOKEN_TTL: Duration = Duration::from_secs(7200); +/// Feishu API business code for expired/invalid tenant access token. +const INVALID_ACCESS_TOKEN_CODE: i32 = 99991663; +/// Dedup cache TTL (30 minutes). +const DEDUP_CACHE_TTL: Duration = Duration::from_secs(30 * 60); + // ───────────────────────────────────────────────────────────────────────────── // Protobuf types for Feishu WebSocket protocol (pbbp2.proto) // ───────────────────────────────────────────────────────────────────────────── @@ -125,6 +140,13 @@ struct LarkMessage { // ───────────────────────────────────────────────────────────────────────────── +/// Cached tenant token with proactive refresh metadata. +#[derive(Clone)] +struct CachedTenantToken { + value: String, + refresh_after: Instant, +} + #[derive(Clone)] pub struct FeishuChannel { config: FeishuChannelConfig, @@ -132,10 +154,15 @@ pub struct FeishuChannel { running: Arc>, shutdown_tx: Arc>>>, connected: Arc>, + /// Cached tenant access token with proactive refresh. + tenant_token: Arc>>, + /// Dedup cache: WS message_ids seen in the last ~30 min. + seen_message_ids: Arc>>, } /// Parsed message data from a Feishu frame struct ParsedMessage { + message_id: String, open_id: String, chat_id: String, content: String, @@ -153,6 +180,8 @@ impl FeishuChannel { running: Arc::new(RwLock::new(false)), shutdown_tx: Arc::new(RwLock::new(None)), connected: Arc::new(RwLock::new(false)), + tenant_token: Arc::new(RwLock::new(None)), + seen_message_ids: Arc::new(RwLock::new(HashMap::new())), }) } @@ -189,8 +218,42 @@ impl FeishuChannel { Ok((ep.url, client_config)) } - /// Get tenant access token - async fn get_tenant_token(&self) -> Result { + /// Get tenant access token (cached with proactive refresh). + async fn get_tenant_access_token(&self) -> Result { + // 1. Check cache + { + let cached = self.tenant_token.read().await; + if let Some(ref token) = *cached { + if Instant::now() < token.refresh_after { + return Ok(token.value.clone()); + } + } + } + + // 2. Fetch new token + let (token, ttl) = self.fetch_new_token().await?; + + // 3. Cache with proactive refresh time (提前 120 秒) + let refresh_after = Instant::now() + ttl.saturating_sub(TOKEN_REFRESH_SKEW); + { + let mut cached = self.tenant_token.write().await; + *cached = Some(CachedTenantToken { + value: token.clone(), + refresh_after, + }); + } + + Ok(token) + } + + /// Invalidate cached token (called when API reports expired tenant token). + async fn invalidate_token(&self) { + let mut cached = self.tenant_token.write().await; + *cached = None; + } + + /// Fetch a new tenant access token from Feishu. + async fn fetch_new_token(&self) -> Result<(String, Duration), ChannelError> { let resp = self.http_client .post(format!("{}/auth/v3/tenant_access_token/internal", FEISHU_API_BASE)) .header("Content-Type", "application/json") @@ -206,6 +269,7 @@ impl FeishuChannel { struct TokenResponse { code: i32, tenant_access_token: Option, + expire: Option, } let token_resp: TokenResponse = resp @@ -217,8 +281,30 @@ impl FeishuChannel { return Err(ChannelError::Other("Auth failed".to_string())); } - token_resp.tenant_access_token - .ok_or_else(|| ChannelError::Other("No token in response".to_string())) + let token = token_resp.tenant_access_token + .ok_or_else(|| ChannelError::Other("No token in response".to_string()))?; + + let ttl = token_resp.expire + .and_then(|v| u64::try_from(v).ok()) + .map(Duration::from_secs) + .unwrap_or(DEFAULT_TOKEN_TTL); + + Ok((token, ttl)) + } + + /// Check if message_id has been seen (dedup), and mark it as seen if not. + /// Returns true if the message was already processed. + /// Note: GC of stale entries is handled in the heartbeat timeout_check loop. + async fn is_message_seen(&self, message_id: &str) -> bool { + let mut seen = self.seen_message_ids.write().await; + let now = Instant::now(); + + if seen.contains_key(message_id) { + true + } else { + seen.insert(message_id.to_string(), now); + false + } } /// Download media and save locally, return (description, media_item) @@ -250,7 +336,7 @@ impl FeishuChannel { .and_then(|v| v.as_str()) .ok_or_else(|| ChannelError::Other("No image_key in message".to_string()))?; - let token = self.get_tenant_token().await?; + let token = self.get_tenant_access_token().await?; // Use message resource API for downloading message images let url = format!("{}/im/v1/messages/{}/resources/{}?type=image", FEISHU_API_BASE, message_id, image_key); @@ -309,7 +395,7 @@ impl FeishuChannel { .and_then(|v| v.as_str()) .ok_or_else(|| ChannelError::Other("No file_key in message".to_string()))?; - let token = self.get_tenant_token().await?; + let token = self.get_tenant_access_token().await?; // Use message resource API for downloading message files let url = format!("{}/im/v1/messages/{}/resources/{}?type=file", FEISHU_API_BASE, message_id, file_key); @@ -357,7 +443,7 @@ impl FeishuChannel { /// Upload image to Feishu and return the image_key async fn upload_image(&self, file_path: &str) -> Result { - let token = self.get_tenant_token().await?; + let token = self.get_tenant_access_token().await?; let mime = mime_guess::from_path(file_path) .first_or_octet_stream() @@ -417,7 +503,7 @@ impl FeishuChannel { /// Upload file to Feishu and return the file_key async fn upload_file(&self, file_path: &str) -> Result { - let token = self.get_tenant_token().await?; + let token = self.get_tenant_access_token().await?; let file_name = std::path::Path::new(file_path) .file_name() @@ -485,9 +571,100 @@ impl FeishuChannel { .ok_or_else(|| ChannelError::Other("No file_key in response".to_string())) } + /// Add a reaction emoji to a message and store the reaction_id for later removal. + /// Returns the reaction_id if successful, None otherwise. + async fn add_reaction(&self, message_id: &str) -> Result, ChannelError> { + let emoji = self.config.reaction_emoji.as_str(); + let token = self.get_tenant_access_token().await?; + + let resp = self.http_client + .post(format!("{}/im/v1/messages/{}/reactions", FEISHU_API_BASE, message_id)) + .header("Authorization", format!("Bearer {}", token)) + .json(&serde_json::json!({ + "reaction_type": { "emoji_type": emoji } + })) + .send() + .await + .map_err(|e| ChannelError::ConnectionError(format!("Add reaction HTTP error: {}", e)))?; + + #[derive(Deserialize)] + struct ReactionResp { + code: i32, + msg: Option, + data: Option, + } + #[derive(Deserialize)] + struct ReactionData { + reaction_id: Option, + } + + let result: ReactionResp = resp.json().await + .map_err(|e| ChannelError::Other(format!("Parse reaction response error: {}", e)))?; + + if result.code != 0 { + tracing::warn!( + "Failed to add reaction to message {}: code={} msg={}", + message_id, + result.code, + result.msg.as_deref().unwrap_or("unknown") + ); + return Ok(None); + } + + let reaction_id = result.data.and_then(|d| d.reaction_id); + Ok(reaction_id) + } + + /// Remove reaction using feishu metadata propagated through OutboundMessage. + /// Reads feishu.message_id and feishu.reaction_id from metadata. + async fn remove_reaction_from_metadata(&self, metadata: &std::collections::HashMap) { + let (message_id, reaction_id) = match ( + metadata.get("feishu.message_id"), + metadata.get("feishu.reaction_id"), + ) { + (Some(msg_id), Some(rid)) => (msg_id.clone(), rid.clone()), + _ => return, + }; + if let Err(e) = self.remove_reaction(&message_id, &reaction_id).await { + tracing::debug!(error = %e, message_id = %message_id, "Failed to remove reaction"); + } + } + + /// Remove a reaction emoji from a message. + async fn remove_reaction(&self, message_id: &str, reaction_id: &str) -> Result<(), ChannelError> { + let token = self.get_tenant_access_token().await?; + + let resp = self.http_client + .delete(format!("{}/im/v1/messages/{}/reactions/{}", FEISHU_API_BASE, message_id, reaction_id)) + .header("Authorization", format!("Bearer {}", token)) + .send() + .await + .map_err(|e| ChannelError::ConnectionError(format!("Remove reaction HTTP error: {}", e)))?; + + #[derive(Deserialize)] + struct ReactionResp { + code: i32, + msg: Option, + } + + let result: ReactionResp = resp.json().await + .map_err(|e| ChannelError::Other(format!("Parse remove reaction response error: {}", e)))?; + + if result.code != 0 { + tracing::debug!( + "Failed to remove reaction {} from message {}: code={} msg={}", + reaction_id, + message_id, + result.code, + result.msg.as_deref().unwrap_or("unknown") + ); + } + Ok(()) + } + /// Send a text message to Feishu chat (implements Channel trait) async fn send_message_to_feishu(&self, receive_id: &str, receive_id_type: &str, content: &str) -> Result<(), ChannelError> { - let token = self.get_tenant_token().await?; + let token = self.get_tenant_access_token().await?; // Feishu text messages have content limits (~64KB). // Truncate if content is too long to avoid API error 230001. @@ -582,6 +759,13 @@ impl FeishuChannel { let message_id = payload_data.message.message_id.clone(); + // Deduplication check + if self.is_message_seen(&message_id).await { + #[cfg(debug_assertions)] + tracing::debug!(message_id = %message_id, "Duplicate message, skipping"); + return Ok(None); + } + #[cfg(debug_assertions)] tracing::debug!(message_id = %message_id, "Received Feishu message"); @@ -604,6 +788,7 @@ impl FeishuChannel { } Ok(Some(ParsedMessage { + message_id, open_id, chat_id, content, @@ -689,16 +874,20 @@ impl FeishuChannel { let ping_interval = client_config.ping_interval.unwrap_or(120).max(10); let mut ping_interval_tok = tokio::time::interval(tokio::time::Duration::from_secs(ping_interval)); + let mut timeout_check = tokio::time::interval(tokio::time::Duration::from_secs(10)); let mut seq: u64 = 1; + let mut last_recv = Instant::now(); // Consume the immediate tick ping_interval_tok.tick().await; + timeout_check.tick().await; loop { tokio::select! { msg = read.next() => { match msg { Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(data))) => { + last_recv = Instant::now(); let bytes: Bytes = data; if let Ok(frame) = PbFrame::decode(bytes.as_ref()) { match self.handle_frame(&frame).await { @@ -708,6 +897,24 @@ impl FeishuChannel { tracing::error!(error = %e, "Failed to send ACK to Feishu"); } + // Add reaction emoji (await so we get the reaction_id for later removal) + let message_id = parsed.message_id.clone(); + let reaction_id = match self.add_reaction(&message_id).await { + Ok(Some(rid)) => Some(rid), + Ok(None) => None, + Err(e) => { + tracing::debug!(error = %e, message_id = %message_id, "Failed to add reaction"); + None + } + }; + + // forwarded_metadata is copied to OutboundMessage.metadata by the gateway. + let mut forwarded_metadata = std::collections::HashMap::new(); + forwarded_metadata.insert("feishu.message_id".to_string(), message_id.clone()); + if let Some(ref rid) = reaction_id { + forwarded_metadata.insert("feishu.reaction_id".to_string(), rid.clone()); + } + // Publish to bus asynchronously let channel = self.clone(); let bus = bus.clone(); @@ -726,6 +933,7 @@ impl FeishuChannel { .as_millis() as i64, media: parsed.media.map(|m| vec![m]).unwrap_or_default(), metadata: std::collections::HashMap::new(), + forwarded_metadata, }; if let Err(e) = channel.handle_and_publish(&bus, &msg).await { tracing::error!(error = %e, open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Failed to publish Feishu message to bus"); @@ -743,6 +951,7 @@ impl FeishuChannel { } } Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(data))) => { + last_recv = Instant::now(); let pong = PbFrame { seq_id: seq.wrapping_add(1), log_id: 0, @@ -756,6 +965,9 @@ impl FeishuChannel { }; let _ = write.send(tokio_tungstenite::tungstenite::Message::Binary(pong.encode_to_vec().into())).await; } + Some(Ok(tokio_tungstenite::tungstenite::Message::Pong(_))) => { + last_recv = Instant::now(); + } Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => { #[cfg(debug_assertions)] tracing::debug!("Feishu WebSocket closed"); @@ -786,6 +998,16 @@ impl FeishuChannel { break; } } + _ = timeout_check.tick() => { + if last_recv.elapsed() > WS_HEARTBEAT_TIMEOUT { + tracing::warn!("Feishu WebSocket heartbeat timeout, reconnecting"); + break; + } + // GC dedup cache: remove entries older than TTL (matches zeroclaw pattern) + let now = Instant::now(); + let mut seen = self.seen_message_ids.write().await; + seen.retain(|_, ts| now.duration_since(*ts) < DEDUP_CACHE_TTL); + } _ = shutdown_rx.recv() => { tracing::info!("Feishu channel shutdown signal received"); break; @@ -906,11 +1128,14 @@ impl Channel for FeishuChannel { // If no media, send text only if msg.media.is_empty() { - return self.send_message_to_feishu(receive_id, receive_id_type, &msg.content).await; + let result = self.send_message_to_feishu(receive_id, receive_id_type, &msg.content).await; + // Remove pending reaction after sending (using metadata propagated from inbound) + self.remove_reaction_from_metadata(&msg.metadata).await; + return result; } // Handle multimodal message - send with media - let token = self.get_tenant_token().await?; + let token = self.get_tenant_access_token().await?; // Build content with media references let mut content_parts = Vec::new(); @@ -967,7 +1192,10 @@ impl Channel for FeishuChannel { // If no content parts after processing, just send empty text if content_parts.is_empty() { - return self.send_message_to_feishu(receive_id, receive_id_type, "").await; + let result = self.send_message_to_feishu(receive_id, receive_id_type, "").await; + // Remove pending reaction after sending (using metadata propagated from inbound) + self.remove_reaction_from_metadata(&msg.metadata).await; + return result; } // Determine message type @@ -1008,6 +1236,9 @@ impl Channel for FeishuChannel { return Err(ChannelError::Other(format!("Send multimodal message failed: code={} msg={}", send_resp.code, send_resp.msg))); } + // Remove pending reaction after successfully sending + self.remove_reaction_from_metadata(&msg.metadata).await; + Ok(()) } } diff --git a/src/config/mod.rs b/src/config/mod.rs index b462304..a44e64e 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -30,6 +30,9 @@ pub struct FeishuChannelConfig { pub agent: String, #[serde(default = "default_media_dir")] pub media_dir: String, + /// Emoji type for message reactions (e.g. "THUMBSUP", "OK", "EYES"). + #[serde(default = "default_reaction_emoji")] + pub reaction_emoji: String, } fn default_allow_from() -> Vec { @@ -41,6 +44,10 @@ fn default_media_dir() -> String { home.join(".picobot/media/feishu").to_string_lossy().to_string() } +fn default_reaction_emoji() -> String { + "THUMBSUP".to_string() +} + #[derive(Debug, Clone, Deserialize, Serialize)] pub struct ProviderConfig { #[serde(rename = "type")] diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 611919b..5ab521e 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -79,13 +79,16 @@ impl GatewayState { inbound.media, ).await { Ok(response_content) => { + // Forward channel-specific metadata from inbound to outbound. + // This allows channels to propagate context (e.g. feishu message_id for reaction cleanup) + // without gateway needing channel-specific code. let outbound = crate::bus::OutboundMessage { - channel: inbound.channel, - chat_id: inbound.chat_id, + channel: inbound.channel.clone(), + chat_id: inbound.chat_id.clone(), content: response_content, reply_to: None, media: vec![], - metadata: std::collections::HashMap::new(), + metadata: inbound.forwarded_metadata, }; if let Err(e) = bus_for_inbound.publish_outbound(outbound).await { tracing::error!(error = %e, "Failed to publish outbound");