use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; use futures_util::{SinkExt, StreamExt}; use prost::{Message as ProstMessage, bytes::Bytes}; use serde::Deserialize; use tokio::sync::{broadcast, RwLock}; use crate::bus::{MessageBus, MediaItem, OutboundMessage}; use crate::channels::base::{Channel, ChannelError}; use crate::config::{FeishuChannelConfig, LLMProviderConfig}; const FEISHU_API_BASE: &str = "https://open.feishu.cn/open-apis"; const FEISHU_WS_BASE: &str = "https://open.feishu.cn"; /// Heartbeat timeout for WS connection — must be larger than ping_interval (default 120 s). /// If no binary frame (pong or event) is received within this window, reconnect. const WS_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(300); /// Refresh tenant token this many seconds before the announced expiry. const TOKEN_REFRESH_SKEW: Duration = Duration::from_secs(120); /// Default tenant token TTL when `expire`/`expires_in` is absent. const DEFAULT_TOKEN_TTL: Duration = Duration::from_secs(7200); /// Feishu API business code for expired/invalid tenant access token. const INVALID_ACCESS_TOKEN_CODE: i32 = 99991663; /// Dedup cache TTL (30 minutes). const DEDUP_CACHE_TTL: Duration = Duration::from_secs(30 * 60); // ───────────────────────────────────────────────────────────────────────────── // Protobuf types for Feishu WebSocket protocol (pbbp2.proto) // ───────────────────────────────────────────────────────────────────────────── #[derive(Clone, PartialEq, prost::Message)] struct PbHeader { #[prost(string, tag = "1")] pub key: String, #[prost(string, tag = "2")] pub value: String, } /// Feishu WS frame. /// method=0 → CONTROL (ping/pong) method=1 → DATA (events) #[derive(Clone, PartialEq, prost::Message)] struct PbFrame { #[prost(uint64, tag = "1")] pub seq_id: u64, #[prost(uint64, tag = "2")] pub log_id: u64, #[prost(int32, tag = "3")] pub service: i32, #[prost(int32, tag = "4")] pub method: i32, #[prost(message, repeated, tag = "5")] pub headers: Vec, #[prost(bytes = "vec", optional, tag = "8")] pub payload: Option>, } /// POST /callback/ws/endpoint response #[derive(Deserialize)] struct WsEndpointResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct WsEndpoint { #[serde(rename = "URL")] url: String, #[serde(default)] client_config: Option, } #[derive(Deserialize, Default)] struct WsClientConfig { #[serde(rename = "PingInterval")] ping_interval: Option, } /// Lark event envelope (method=1 / type=event payload) #[derive(Deserialize)] struct LarkEvent { header: LarkEventHeader, event: serde_json::Value, } #[derive(Deserialize)] struct LarkEventHeader { event_type: String, #[allow(dead_code)] event_id: String, } impl std::fmt::Debug for LarkEventHeader { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LarkEventHeader") .field("event_type", &self.event_type) .field("event_id", &self.event_id) .finish() } } #[derive(Deserialize)] struct MsgReceivePayload { sender: LarkSender, message: LarkMessage, } #[derive(Deserialize)] struct LarkSender { sender_id: LarkSenderId, #[serde(default)] sender_type: String, } #[derive(Deserialize, Default)] struct LarkSenderId { open_id: Option, } #[derive(Deserialize)] #[allow(dead_code)] struct LarkMessage { message_id: String, chat_id: String, chat_type: String, message_type: String, #[serde(default)] content: String, #[serde(default)] mentions: Vec, #[serde(default)] root_id: Option, #[serde(default)] parent_id: Option, } // ───────────────────────────────────────────────────────────────────────────── /// Cached tenant token with proactive refresh metadata. #[derive(Clone)] struct CachedTenantToken { value: String, refresh_after: Instant, } #[derive(Clone)] pub struct FeishuChannel { config: FeishuChannelConfig, http_client: reqwest::Client, running: Arc>, shutdown_tx: Arc>>>, connected: Arc>, /// Cached tenant access token with proactive refresh. tenant_token: Arc>>, /// Dedup cache: WS message_ids seen in the last ~30 min. seen_message_ids: Arc>>, } /// Parsed message data from a Feishu frame struct ParsedMessage { message_id: String, open_id: String, chat_id: String, content: String, media: Option, } impl FeishuChannel { pub fn new( config: FeishuChannelConfig, _provider_config: LLMProviderConfig, ) -> Result { Ok(Self { config, http_client: reqwest::Client::new(), running: Arc::new(RwLock::new(false)), shutdown_tx: Arc::new(RwLock::new(None)), connected: Arc::new(RwLock::new(false)), tenant_token: Arc::new(RwLock::new(None)), seen_message_ids: Arc::new(RwLock::new(HashMap::new())), }) } /// Get WebSocket endpoint URL from Feishu API async fn get_ws_endpoint(&self, client: &reqwest::Client) -> Result<(String, WsClientConfig), ChannelError> { let resp = client .post(format!("{}/callback/ws/endpoint", FEISHU_WS_BASE)) .header("locale", "zh") .json(&serde_json::json!({ "AppID": self.config.app_id, "AppSecret": self.config.app_secret, })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("HTTP error: {}", e)))?; let endpoint_resp: WsEndpointResp = resp .json() .await .map_err(|e| ChannelError::ConnectionError(format!("Failed to parse endpoint response: {}", e)))?; if endpoint_resp.code != 0 { return Err(ChannelError::ConnectionError(format!( "WS endpoint failed: code={} msg={}", endpoint_resp.code, endpoint_resp.msg.as_deref().unwrap_or("unknown") ))); } let ep = endpoint_resp.data .ok_or_else(|| ChannelError::ConnectionError("Empty endpoint data".to_string()))?; let client_config = ep.client_config.unwrap_or_default(); Ok((ep.url, client_config)) } /// Get tenant access token (cached with proactive refresh). async fn get_tenant_access_token(&self) -> Result { // 1. Check cache { let cached = self.tenant_token.read().await; if let Some(ref token) = *cached { if Instant::now() < token.refresh_after { return Ok(token.value.clone()); } } } // 2. Fetch new token let (token, ttl) = self.fetch_new_token().await?; // 3. Cache with proactive refresh time (提前 120 秒) let refresh_after = Instant::now() + ttl.saturating_sub(TOKEN_REFRESH_SKEW); { let mut cached = self.tenant_token.write().await; *cached = Some(CachedTenantToken { value: token.clone(), refresh_after, }); } Ok(token) } /// Invalidate cached token (called when API reports expired tenant token). async fn invalidate_token(&self) { let mut cached = self.tenant_token.write().await; *cached = None; } /// Fetch a new tenant access token from Feishu. async fn fetch_new_token(&self) -> Result<(String, Duration), ChannelError> { let resp = self.http_client .post(format!("{}/auth/v3/tenant_access_token/internal", FEISHU_API_BASE)) .header("Content-Type", "application/json") .json(&serde_json::json!({ "app_id": self.config.app_id, "app_secret": self.config.app_secret, })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("HTTP error: {}", e)))?; #[derive(Deserialize)] struct TokenResponse { code: i32, tenant_access_token: Option, expire: Option, } let token_resp: TokenResponse = resp .json() .await .map_err(|e| ChannelError::Other(format!("Failed to parse token response: {}", e)))?; if token_resp.code != 0 { return Err(ChannelError::Other("Auth failed".to_string())); } let token = token_resp.tenant_access_token .ok_or_else(|| ChannelError::Other("No token in response".to_string()))?; let ttl = token_resp.expire .and_then(|v| u64::try_from(v).ok()) .map(Duration::from_secs) .unwrap_or(DEFAULT_TOKEN_TTL); Ok((token, ttl)) } /// Check if message_id has been seen (dedup), and mark it as seen if not. /// Returns true if the message was already processed. /// Note: GC of stale entries is handled in the heartbeat timeout_check loop. async fn is_message_seen(&self, message_id: &str) -> bool { let mut seen = self.seen_message_ids.write().await; let now = Instant::now(); if seen.contains_key(message_id) { true } else { seen.insert(message_id.to_string(), now); false } } /// Download media and save locally, return (description, media_item) async fn download_media( &self, msg_type: &str, content_json: &serde_json::Value, message_id: &str, ) -> Result<(String, Option), ChannelError> { let media_dir = Path::new(&self.config.media_dir); tokio::fs::create_dir_all(media_dir).await .map_err(|e| ChannelError::Other(format!("Failed to create media dir: {}", e)))?; match msg_type { "image" => self.download_image(content_json, message_id, media_dir).await, "audio" | "file" | "media" => self.download_file(content_json, message_id, media_dir, msg_type).await, _ => Ok((format!("[unsupported media type: {}]", msg_type), None)), } } /// Download image from Feishu async fn download_image( &self, content_json: &serde_json::Value, message_id: &str, media_dir: &Path, ) -> Result<(String, Option), ChannelError> { let image_key = content_json.get("image_key") .and_then(|v| v.as_str()) .ok_or_else(|| ChannelError::Other("No image_key in message".to_string()))?; let token = self.get_tenant_access_token().await?; // Use message resource API for downloading message images let url = format!("{}/im/v1/messages/{}/resources/{}?type=image", FEISHU_API_BASE, message_id, image_key); #[cfg(debug_assertions)] tracing::debug!(url = %url, image_key = %image_key, message_id = %message_id, "Downloading image from Feishu via message resource API"); let resp = self.http_client .get(&url) .header("Authorization", format!("Bearer {}", token)) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Download image HTTP error: {}", e)))?; let status = resp.status(); #[cfg(debug_assertions)] tracing::debug!(status = %status, "Image download response status"); if !status.is_success() { let error_text = resp.text().await.unwrap_or_default(); return Err(ChannelError::Other(format!("Image download failed {}: {}", status, error_text))); } let data = resp.bytes().await .map_err(|e| ChannelError::Other(format!("Failed to read image data: {}", e)))? .to_vec(); #[cfg(debug_assertions)] tracing::debug!(data_len = %data.len(), "Downloaded image data"); let filename = format!("{}_{}.jpg", message_id, &image_key[..8.min(image_key.len())]); let file_path = media_dir.join(&filename); tokio::fs::write(&file_path, &data).await .map_err(|e| ChannelError::Other(format!("Failed to write image: {}", e)))?; let media_item = MediaItem::new( file_path.to_string_lossy().to_string(), "image", ); tracing::info!(message_id = %message_id, filename = %filename, "Downloaded image"); Ok((format!("[image: {}]", filename), Some(media_item))) } /// Download file/audio from Feishu async fn download_file( &self, content_json: &serde_json::Value, message_id: &str, media_dir: &Path, file_type: &str, ) -> Result<(String, Option), ChannelError> { let file_key = content_json.get("file_key") .and_then(|v| v.as_str()) .ok_or_else(|| ChannelError::Other("No file_key in message".to_string()))?; let token = self.get_tenant_access_token().await?; // Use message resource API for downloading message files let url = format!("{}/im/v1/messages/{}/resources/{}?type=file", FEISHU_API_BASE, message_id, file_key); #[cfg(debug_assertions)] tracing::debug!(url = %url, file_key = %file_key, message_id = %message_id, "Downloading file from Feishu via message resource API"); let resp = self.http_client .get(&url) .header("Authorization", format!("Bearer {}", token)) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Download file HTTP error: {}", e)))?; let status = resp.status(); if !status.is_success() { let error_text = resp.text().await.unwrap_or_default(); return Err(ChannelError::Other(format!("File download failed {}: {}", status, error_text))); } let data = resp.bytes().await .map_err(|e| ChannelError::Other(format!("Failed to read file data: {}", e)))? .to_vec(); let extension = match file_type { "audio" => "mp3", "video" => "mp4", _ => "bin", }; let filename = format!("{}_{}.{}", message_id, &file_key[..8.min(file_key.len())], extension); let file_path = media_dir.join(&filename); tokio::fs::write(&file_path, &data).await .map_err(|e| ChannelError::Other(format!("Failed to write file: {}", e)))?; let media_item = MediaItem::new( file_path.to_string_lossy().to_string(), file_type, ); tracing::info!(message_id = %message_id, filename = %filename, file_type = %file_type, "Downloaded file"); Ok((format!("[{}: {}]", file_type, filename), Some(media_item))) } /// Upload image to Feishu and return the image_key async fn upload_image(&self, file_path: &str) -> Result { let token = self.get_tenant_access_token().await?; let mime = mime_guess::from_path(file_path) .first_or_octet_stream() .to_string(); let file_name = std::path::Path::new(file_path) .file_name() .and_then(|n| n.to_str()) .unwrap_or("image.jpg"); let file_data = tokio::fs::read(file_path).await .map_err(|e| ChannelError::Other(format!("Failed to read file: {}", e)))?; let part = reqwest::multipart::Part::bytes(file_data) .file_name(file_name.to_string()) .mime_str(&mime) .map_err(|e| ChannelError::Other(format!("Invalid mime type: {}", e)))?; let form = reqwest::multipart::Form::new() .text("image_type", "message".to_string()) .part("image", part); let resp = self.http_client .post(format!("{}/im/v1/images/upload", FEISHU_API_BASE)) .header("Authorization", format!("Bearer {}", token)) .multipart(form) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Upload image HTTP error: {}", e)))?; #[derive(Deserialize)] struct UploadResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct UploadData { image_key: String, } let result: UploadResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse upload response error: {}", e)))?; if result.code != 0 { return Err(ChannelError::Other(format!( "Upload image failed: code={} msg={}", result.code, result.msg.as_deref().unwrap_or("unknown") ))); } result.data .map(|d| d.image_key) .ok_or_else(|| ChannelError::Other("No image_key in response".to_string())) } /// Upload file to Feishu and return the file_key async fn upload_file(&self, file_path: &str) -> Result { let token = self.get_tenant_access_token().await?; let file_name = std::path::Path::new(file_path) .file_name() .and_then(|n| n.to_str()) .unwrap_or("file.bin"); let extension = std::path::Path::new(file_path) .extension() .and_then(|e| e.to_str()) .unwrap_or("") .to_lowercase(); let file_type = match extension.as_str() { "mp3" | "m4a" | "wav" | "ogg" => "audio", "mp4" | "mov" | "avi" | "mkv" => "video", "pdf" | "doc" | "docx" | "xls" | "xlsx" => "doc", _ => "file", }; let file_data = tokio::fs::read(file_path).await .map_err(|e| ChannelError::Other(format!("Failed to read file: {}", e)))?; let part = reqwest::multipart::Part::bytes(file_data) .file_name(file_name.to_string()) .mime_str("application/octet-stream") .map_err(|e| ChannelError::Other(format!("Invalid mime type: {}", e)))?; let form = reqwest::multipart::Form::new() .text("file_type", file_type.to_string()) .text("file_name", file_name.to_string()) .part("file", part); let resp = self.http_client .post(format!("{}/im/v1/files", FEISHU_API_BASE)) .header("Authorization", format!("Bearer {}", token)) .multipart(form) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Upload file HTTP error: {}", e)))?; #[derive(Deserialize)] struct UploadResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct UploadData { file_key: String, } let result: UploadResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse upload response error: {}", e)))?; if result.code != 0 { return Err(ChannelError::Other(format!( "Upload file failed: code={} msg={}", result.code, result.msg.as_deref().unwrap_or("unknown") ))); } result.data .map(|d| d.file_key) .ok_or_else(|| ChannelError::Other("No file_key in response".to_string())) } /// Add a reaction emoji to a message and store the reaction_id for later removal. /// Returns the reaction_id if successful, None otherwise. async fn add_reaction(&self, message_id: &str) -> Result, ChannelError> { let emoji = self.config.reaction_emoji.as_str(); let token = self.get_tenant_access_token().await?; let resp = self.http_client .post(format!("{}/im/v1/messages/{}/reactions", FEISHU_API_BASE, message_id)) .header("Authorization", format!("Bearer {}", token)) .json(&serde_json::json!({ "reaction_type": { "emoji_type": emoji } })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Add reaction HTTP error: {}", e)))?; #[derive(Deserialize)] struct ReactionResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct ReactionData { reaction_id: Option, } let result: ReactionResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse reaction response error: {}", e)))?; if result.code != 0 { tracing::warn!( "Failed to add reaction to message {}: code={} msg={}", message_id, result.code, result.msg.as_deref().unwrap_or("unknown") ); return Ok(None); } let reaction_id = result.data.and_then(|d| d.reaction_id); Ok(reaction_id) } /// Remove reaction using feishu metadata propagated through OutboundMessage. /// Reads feishu.message_id and feishu.reaction_id from metadata. async fn remove_reaction_from_metadata(&self, metadata: &std::collections::HashMap) { let (message_id, reaction_id) = match ( metadata.get("feishu.message_id"), metadata.get("feishu.reaction_id"), ) { (Some(msg_id), Some(rid)) => (msg_id.clone(), rid.clone()), _ => return, }; if let Err(e) = self.remove_reaction(&message_id, &reaction_id).await { tracing::debug!(error = %e, message_id = %message_id, "Failed to remove reaction"); } } /// Remove a reaction emoji from a message. async fn remove_reaction(&self, message_id: &str, reaction_id: &str) -> Result<(), ChannelError> { let token = self.get_tenant_access_token().await?; let resp = self.http_client .delete(format!("{}/im/v1/messages/{}/reactions/{}", FEISHU_API_BASE, message_id, reaction_id)) .header("Authorization", format!("Bearer {}", token)) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Remove reaction HTTP error: {}", e)))?; #[derive(Deserialize)] struct ReactionResp { code: i32, msg: Option, } let result: ReactionResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse remove reaction response error: {}", e)))?; if result.code != 0 { tracing::debug!( "Failed to remove reaction {} from message {}: code={} msg={}", reaction_id, message_id, result.code, result.msg.as_deref().unwrap_or("unknown") ); } Ok(()) } /// Send a text message to Feishu chat (implements Channel trait) async fn send_message_to_feishu(&self, receive_id: &str, receive_id_type: &str, content: &str) -> Result<(), ChannelError> { let token = self.get_tenant_access_token().await?; // Feishu text messages have content limits (~64KB). // Truncate if content is too long to avoid API error 230001. const MAX_TEXT_LENGTH: usize = 60_000; let truncated = if content.len() > MAX_TEXT_LENGTH { format!("{}...\n\n[Content truncated due to length limit]", &content[..MAX_TEXT_LENGTH]) } else { content.to_string() }; let text_content = serde_json::json!({ "text": truncated }).to_string(); let resp = self.http_client .post(format!("{}/im/v1/messages?receive_id_type={}", FEISHU_API_BASE, receive_id_type)) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", token)) .json(&serde_json::json!({ "receive_id": receive_id, "msg_type": "text", "content": text_content })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Send message HTTP error: {}", e)))?; #[derive(Deserialize)] struct SendResp { code: i32, msg: String, } let send_resp: SendResp = resp .json() .await .map_err(|e| ChannelError::Other(format!("Parse send response error: {}", e)))?; if send_resp.code != 0 { return Err(ChannelError::Other(format!("Send message failed: code={} msg={}", send_resp.code, send_resp.msg))); } Ok(()) } /// Extract service_id from WebSocket URL query params fn extract_service_id(url: &str) -> i32 { url.split('?') .nth(1) .and_then(|qs| { qs.split('&') .find(|kv| kv.starts_with("service_id=")) .and_then(|kv| kv.split('=').nth(1)) .and_then(|v| v.parse::().ok()) }) .unwrap_or(0) } /// Handle incoming binary PbFrame - returns Some(ParsedMessage) if we need to ack async fn handle_frame(&self, frame: &PbFrame) -> Result, ChannelError> { // method 0 = CONTROL (ping/pong) if frame.method == 0 { return Ok(None); } // method 1 = DATA (events) if frame.method != 1 { return Ok(None); } let payload = frame.payload.as_deref() .ok_or_else(|| ChannelError::Other("No payload in frame".to_string()))?; #[cfg(debug_assertions)] tracing::debug!(payload_len = %payload.len(), "Received frame payload"); let event: LarkEvent = serde_json::from_slice(payload) .map_err(|e| ChannelError::Other(format!("Parse event error: {}", e)))?; let event_type = event.header.event_type.as_str(); #[cfg(debug_assertions)] tracing::debug!(event_type = %event_type, "Received event type"); if event_type != "im.message.receive_v1" { return Ok(None); } let payload_data: MsgReceivePayload = serde_json::from_value(event.event.clone()) .map_err(|e| ChannelError::Other(format!("Parse payload error: {}", e)))?; // Skip bot messages if payload_data.sender.sender_type == "bot" { return Ok(None); } let message_id = payload_data.message.message_id.clone(); // Deduplication check if self.is_message_seen(&message_id).await { #[cfg(debug_assertions)] tracing::debug!(message_id = %message_id, "Duplicate message, skipping"); return Ok(None); } #[cfg(debug_assertions)] tracing::debug!(message_id = %message_id, "Received Feishu message"); let open_id = payload_data.sender.sender_id.open_id .ok_or_else(|| ChannelError::Other("No open_id".to_string()))?; let msg = payload_data.message; let chat_id = msg.chat_id.clone(); let msg_type = msg.message_type.as_str(); let raw_content = msg.content.clone(); #[cfg(debug_assertions)] tracing::debug!(msg_type = %msg_type, chat_id = %chat_id, open_id = %open_id, "Parsing message content"); let (content, media) = self.parse_and_download_message(msg_type, &raw_content, &message_id).await?; #[cfg(debug_assertions)] if let Some(ref m) = media { tracing::debug!(media_type = %m.media_type, media_path = %m.path, "Media downloaded successfully"); } Ok(Some(ParsedMessage { message_id, open_id, chat_id, content, media, })) } /// Parse message content and download media if needed async fn parse_and_download_message( &self, msg_type: &str, content: &str, message_id: &str, ) -> Result<(String, Option), ChannelError> { match msg_type { "text" => { let text = if let Ok(parsed) = serde_json::from_str::(content) { parsed.get("text").and_then(|v| v.as_str()).unwrap_or(content).to_string() } else { content.to_string() }; Ok((text, None)) } "post" => { let text = parse_post_content(content); Ok((text, None)) } "image" | "audio" | "file" | "media" => { if let Ok(content_json) = serde_json::from_str::(content) { self.download_media(msg_type, &content_json, message_id).await } else { Ok((format!("[{}: content unavailable]", msg_type), None)) } } _ => Ok((content.to_string(), None)), } } /// Send acknowledgment for a message async fn send_ack(frame: &PbFrame, write: &mut futures_util::stream::SplitSink>, tokio_tungstenite::tungstenite::Message>) -> Result<(), ChannelError> { let mut ack = frame.clone(); ack.payload = Some(br#"{"code":200,"headers":{},"data":[]}"#.to_vec()); ack.headers.push(PbHeader { key: "biz_rt".into(), value: "0".into(), }); write.send(tokio_tungstenite::tungstenite::Message::Binary(ack.encode_to_vec().into())) .await .map_err(|e| ChannelError::Other(format!("Failed to send ack: {}", e)))?; Ok(()) } async fn run_ws_loop(&self, bus: Arc, mut shutdown_rx: broadcast::Receiver<()>) -> Result<(), ChannelError> { let (wss_url, client_config) = self.get_ws_endpoint(&self.http_client).await?; let service_id = Self::extract_service_id(&wss_url); tracing::info!(url = %wss_url, "Connecting to Feishu WebSocket"); let (ws_stream, _) = tokio_tungstenite::connect_async(&wss_url) .await .map_err(|e| ChannelError::ConnectionError(format!("WebSocket connection failed: {}", e)))?; *self.connected.write().await = true; tracing::info!("Feishu WebSocket connected"); let (mut write, mut read) = ws_stream.split(); // Send initial ping let ping_frame = PbFrame { seq_id: 1, log_id: 0, service: service_id, method: 0, headers: vec![PbHeader { key: "type".into(), value: "ping".into(), }], payload: None, }; write.send(tokio_tungstenite::tungstenite::Message::Binary(ping_frame.encode_to_vec().into())) .await .map_err(|e| ChannelError::ConnectionError(format!("Failed to send initial ping: {}", e)))?; let ping_interval = client_config.ping_interval.unwrap_or(120).max(10); let mut ping_interval_tok = tokio::time::interval(tokio::time::Duration::from_secs(ping_interval)); let mut timeout_check = tokio::time::interval(tokio::time::Duration::from_secs(10)); let mut seq: u64 = 1; let mut last_recv = Instant::now(); // Consume the immediate tick ping_interval_tok.tick().await; timeout_check.tick().await; loop { tokio::select! { msg = read.next() => { match msg { Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(data))) => { last_recv = Instant::now(); let bytes: Bytes = data; if let Ok(frame) = PbFrame::decode(bytes.as_ref()) { match self.handle_frame(&frame).await { Ok(Some(parsed)) => { // Send ACK immediately (Feishu requires within 3 s) if let Err(e) = Self::send_ack(&frame, &mut write).await { tracing::error!(error = %e, "Failed to send ACK to Feishu"); } // Add reaction emoji (await so we get the reaction_id for later removal) let message_id = parsed.message_id.clone(); let reaction_id = match self.add_reaction(&message_id).await { Ok(Some(rid)) => Some(rid), Ok(None) => None, Err(e) => { tracing::debug!(error = %e, message_id = %message_id, "Failed to add reaction"); None } }; // forwarded_metadata is copied to OutboundMessage.metadata by the gateway. let mut forwarded_metadata = std::collections::HashMap::new(); forwarded_metadata.insert("feishu.message_id".to_string(), message_id.clone()); if let Some(ref rid) = reaction_id { forwarded_metadata.insert("feishu.reaction_id".to_string(), rid.clone()); } // Publish to bus asynchronously let channel = self.clone(); let bus = bus.clone(); tokio::spawn(async move { let media_count = if parsed.media.is_some() { 1 } else { 0 }; #[cfg(debug_assertions)] tracing::debug!(open_id = %parsed.open_id, chat_id = %parsed.chat_id, content_len = %parsed.content.len(), media_count = %media_count, "Publishing message to bus"); let msg = crate::bus::InboundMessage { channel: "feishu".to_string(), sender_id: parsed.open_id.clone(), chat_id: parsed.chat_id.clone(), content: parsed.content.clone(), timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_millis() as i64, media: parsed.media.map(|m| vec![m]).unwrap_or_default(), metadata: std::collections::HashMap::new(), forwarded_metadata, }; if let Err(e) = channel.handle_and_publish(&bus, &msg).await { tracing::error!(error = %e, open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Failed to publish Feishu message to bus"); } else { #[cfg(debug_assertions)] tracing::debug!(open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Message published to bus successfully"); } }); } Ok(None) => {} Err(e) => { tracing::warn!(error = %e, "Failed to parse Feishu frame"); } } } } Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(data))) => { last_recv = Instant::now(); let pong = PbFrame { seq_id: seq.wrapping_add(1), log_id: 0, service: service_id, method: 0, headers: vec![PbHeader { key: "type".into(), value: "pong".into(), }], payload: Some(data.to_vec()), }; let _ = write.send(tokio_tungstenite::tungstenite::Message::Binary(pong.encode_to_vec().into())).await; } Some(Ok(tokio_tungstenite::tungstenite::Message::Pong(_))) => { last_recv = Instant::now(); } Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => { #[cfg(debug_assertions)] tracing::debug!("Feishu WebSocket closed"); break; } Some(Err(e)) => { tracing::warn!(error = %e, "Feishu WebSocket error"); break; } _ => {} } } _ = ping_interval_tok.tick() => { seq = seq.wrapping_add(1); let ping = PbFrame { seq_id: seq, log_id: 0, service: service_id, method: 0, headers: vec![PbHeader { key: "type".into(), value: "ping".into(), }], payload: None, }; if write.send(tokio_tungstenite::tungstenite::Message::Binary(ping.encode_to_vec().into())).await.is_err() { tracing::warn!("Feishu ping failed, reconnecting"); break; } } _ = timeout_check.tick() => { if last_recv.elapsed() > WS_HEARTBEAT_TIMEOUT { tracing::warn!("Feishu WebSocket heartbeat timeout, reconnecting"); break; } // GC dedup cache: remove entries older than TTL (matches zeroclaw pattern) let now = Instant::now(); let mut seen = self.seen_message_ids.write().await; seen.retain(|_, ts| now.duration_since(*ts) < DEDUP_CACHE_TTL); } _ = shutdown_rx.recv() => { tracing::info!("Feishu channel shutdown signal received"); break; } } } *self.connected.write().await = false; Ok(()) } } fn parse_post_content(content: &str) -> String { if let Ok(parsed) = serde_json::from_str::(content) { let mut texts = vec![]; if let Some(post) = parsed.get("post") { if let Some(content_arr) = post.get("content") { if let Some(arr) = content_arr.as_array() { for item in arr { if let Some(arr2) = item.as_array() { for inner in arr2 { if let Some(text) = inner.get("text").and_then(|v| v.as_str()) { texts.push(text.to_string()); } } } } } } } if texts.is_empty() { content.to_string() } else { texts.join("") } } else { content.to_string() } } #[async_trait] impl Channel for FeishuChannel { fn name(&self) -> &str { "feishu" } async fn start(&self, bus: Arc) -> Result<(), ChannelError> { if self.config.app_id.is_empty() || self.config.app_secret.is_empty() { return Err(ChannelError::ConfigError( "Feishu app_id or app_secret is not configured".to_string() )); } *self.running.write().await = true; let (shutdown_tx, _) = broadcast::channel(1); *self.shutdown_tx.write().await = Some(shutdown_tx.clone()); let channel = self.clone(); let bus = bus.clone(); tokio::spawn(async move { let mut consecutive_failures = 0; let max_failures = 3; loop { if !*channel.running.read().await { break; } let shutdown_rx = shutdown_tx.subscribe(); match channel.run_ws_loop(bus.clone(), shutdown_rx).await { Ok(_) => { tracing::info!("Feishu WebSocket disconnected"); } Err(e) => { consecutive_failures += 1; tracing::error!(attempt = consecutive_failures, error = %e, "Feishu WebSocket error"); if consecutive_failures >= max_failures { tracing::error!("Feishu channel: max failures reached, stopping"); break; } } } if !*channel.running.read().await { break; } tracing::info!("Feishu channel retrying in 5s..."); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } *channel.running.write().await = false; tracing::info!("Feishu channel stopped"); }); tracing::info!("Feishu channel started"); Ok(()) } async fn stop(&self) -> Result<(), ChannelError> { *self.running.write().await = false; if let Some(tx) = self.shutdown_tx.write().await.take() { let _ = tx.send(()); } Ok(()) } fn is_running(&self) -> bool { self.running.try_read().map(|r| *r).unwrap_or(false) } async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError> { let receive_id = if msg.chat_id.starts_with("oc_") { &msg.chat_id } else { &msg.reply_to.as_ref().unwrap_or(&msg.chat_id) }; let receive_id_type = if msg.chat_id.starts_with("oc_") { "chat_id" } else { "open_id" }; // If no media, send text only if msg.media.is_empty() { let result = self.send_message_to_feishu(receive_id, receive_id_type, &msg.content).await; // Remove pending reaction after sending (using metadata propagated from inbound) self.remove_reaction_from_metadata(&msg.metadata).await; return result; } // Handle multimodal message - send with media let token = self.get_tenant_access_token().await?; // Build content with media references let mut content_parts = Vec::new(); // Add text content if present (truncate if too long for Feishu) if !msg.content.is_empty() { const MAX_TEXT_LENGTH: usize = 60_000; let truncated_text = if msg.content.len() > MAX_TEXT_LENGTH { format!("{}...\n\n[Content truncated due to length limit]", &msg.content[..MAX_TEXT_LENGTH]) } else { msg.content.clone() }; content_parts.push(serde_json::json!({ "tag": "text", "text": truncated_text })); } // Upload and add media for media_item in &msg.media { let path = &media_item.path; match media_item.media_type.as_str() { "image" => { match self.upload_image(path).await { Ok(image_key) => { content_parts.push(serde_json::json!({ "tag": "image", "image_key": image_key })); } Err(e) => { tracing::warn!(error = %e, path = %path, "Failed to upload image"); } } } "audio" | "file" | "video" => { match self.upload_file(path).await { Ok(file_key) => { content_parts.push(serde_json::json!({ "tag": "file", "file_key": file_key })); } Err(e) => { tracing::warn!(error = %e, path = %path, "Failed to upload file"); } } } _ => { tracing::warn!(media_type = %media_item.media_type, "Unsupported media type for sending"); } } } // If no content parts after processing, just send empty text if content_parts.is_empty() { let result = self.send_message_to_feishu(receive_id, receive_id_type, "").await; // Remove pending reaction after sending (using metadata propagated from inbound) self.remove_reaction_from_metadata(&msg.metadata).await; return result; } // Determine message type let has_image = msg.media.iter().any(|m| m.media_type == "image"); let msg_type = if has_image && msg.content.is_empty() { "image" } else { "post" }; let content = serde_json::json!({ "content": content_parts }).to_string(); let resp = self.http_client .post(format!("{}/im/v1/messages?receive_id_type={}", FEISHU_API_BASE, receive_id_type)) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", token)) .json(&serde_json::json!({ "receive_id": receive_id, "msg_type": msg_type, "content": content })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Send multimodal message HTTP error: {}", e)))?; #[derive(Deserialize)] struct SendResp { code: i32, msg: String, } let send_resp: SendResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse send response error: {}", e)))?; if send_resp.code != 0 { return Err(ChannelError::Other(format!("Send multimodal message failed: code={} msg={}", send_resp.code, send_resp.msg))); } // Remove pending reaction after successfully sending self.remove_reaction_from_metadata(&msg.metadata).await; Ok(()) } }