use std::collections::HashMap; use std::path::Path; use std::sync::Arc; use std::time::{Duration, Instant}; use async_trait::async_trait; use futures_util::{SinkExt, StreamExt}; use prost::{Message as ProstMessage, bytes::Bytes}; use regex::Regex; use serde::Deserialize; use tokio::sync::{broadcast, RwLock}; use crate::bus::{MessageBus, MediaItem, OutboundMessage}; use crate::channels::base::{Channel, ChannelError}; use crate::config::FeishuChannelConfig; const FEISHU_API_BASE: &str = "https://open.feishu.cn/open-apis"; const FEISHU_WS_BASE: &str = "https://open.feishu.cn"; /// Heartbeat timeout for WS connection — must be larger than ping_interval (default 120 s). /// If no binary frame (pong or event) is received within this window, reconnect. const WS_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(300); /// Refresh tenant token this many seconds before the announced expiry. const TOKEN_REFRESH_SKEW: Duration = Duration::from_secs(120); /// Default tenant token TTL when `expire`/`expires_in` is absent. const DEFAULT_TOKEN_TTL: Duration = Duration::from_secs(7200); /// Dedup cache TTL (30 minutes). const DEDUP_CACHE_TTL: Duration = Duration::from_secs(30 * 60); // ───────────────────────────────────────────────────────────────────────────── // Protobuf types for Feishu WebSocket protocol (pbbp2.proto) // ───────────────────────────────────────────────────────────────────────────── #[derive(Clone, PartialEq, prost::Message)] struct PbHeader { #[prost(string, tag = "1")] pub key: String, #[prost(string, tag = "2")] pub value: String, } /// Feishu WS frame. /// method=0 → CONTROL (ping/pong) method=1 → DATA (events) #[derive(Clone, PartialEq, prost::Message)] struct PbFrame { #[prost(uint64, tag = "1")] pub seq_id: u64, #[prost(uint64, tag = "2")] pub log_id: u64, #[prost(int32, tag = "3")] pub service: i32, #[prost(int32, tag = "4")] pub method: i32, #[prost(message, repeated, tag = "5")] pub headers: Vec, #[prost(bytes = "vec", optional, tag = "8")] pub payload: Option>, } /// POST /callback/ws/endpoint response #[derive(Deserialize)] struct WsEndpointResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct WsEndpoint { #[serde(rename = "URL")] url: String, #[serde(default)] client_config: Option, } #[derive(Deserialize, Default)] struct WsClientConfig { #[serde(rename = "PingInterval")] ping_interval: Option, } /// Lark event envelope (method=1 / type=event payload) #[derive(Deserialize)] struct LarkEvent { header: LarkEventHeader, event: serde_json::Value, } #[derive(Deserialize)] struct LarkEventHeader { event_type: String, #[allow(dead_code)] event_id: String, } impl std::fmt::Debug for LarkEventHeader { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LarkEventHeader") .field("event_type", &self.event_type) .field("event_id", &self.event_id) .finish() } } #[derive(Deserialize)] struct MsgReceivePayload { sender: LarkSender, message: LarkMessage, } #[derive(Deserialize)] struct LarkSender { sender_id: LarkSenderId, #[serde(default)] sender_type: String, } #[derive(Deserialize, Default)] struct LarkSenderId { open_id: Option, } #[derive(Deserialize)] #[allow(dead_code)] struct LarkMessage { message_id: String, chat_id: String, chat_type: String, message_type: String, #[serde(default)] content: String, #[serde(default)] mentions: Vec, #[serde(default)] root_id: Option, #[serde(default)] parent_id: Option, } // ───────────────────────────────────────────────────────────────────────────── /// Cached tenant token with proactive refresh metadata. #[derive(Clone)] struct CachedTenantToken { value: String, refresh_after: Instant, } #[derive(Clone)] pub struct FeishuChannel { config: FeishuChannelConfig, http_client: reqwest::Client, running: Arc>, shutdown_tx: Arc>>>, connected: Arc>, /// Cached tenant access token with proactive refresh. tenant_token: Arc>>, /// Dedup cache: WS message_ids seen in the last ~30 min. seen_message_ids: Arc>>, } /// Parsed message data from a Feishu frame struct ParsedMessage { message_id: String, open_id: String, chat_id: String, content: String, media: Option, /// ID of the message this message is replying to (if any). /// Used to fetch quoted message content for display. parent_id: Option, } impl FeishuChannel { pub fn new( mut config: FeishuChannelConfig, workspace_dir: &Path, ) -> Result { // Override media_dir to use workspace_dir/media/feishu let media_dir = workspace_dir.join("media").join("feishu"); config.media_dir = media_dir.to_string_lossy().to_string(); Ok(Self { config, http_client: reqwest::Client::new(), running: Arc::new(RwLock::new(false)), shutdown_tx: Arc::new(RwLock::new(None)), connected: Arc::new(RwLock::new(false)), tenant_token: Arc::new(RwLock::new(None)), seen_message_ids: Arc::new(RwLock::new(HashMap::new())), }) } /// Get WebSocket endpoint URL from Feishu API async fn get_ws_endpoint(&self, client: &reqwest::Client) -> Result<(String, WsClientConfig), ChannelError> { let resp = client .post(format!("{}/callback/ws/endpoint", FEISHU_WS_BASE)) .header("locale", "zh") .json(&serde_json::json!({ "AppID": self.config.app_id, "AppSecret": self.config.app_secret, })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("HTTP error: {}", e)))?; let endpoint_resp: WsEndpointResp = resp .json() .await .map_err(|e| ChannelError::ConnectionError(format!("Failed to parse endpoint response: {}", e)))?; if endpoint_resp.code != 0 { return Err(ChannelError::ConnectionError(format!( "WS endpoint failed: code={} msg={}", endpoint_resp.code, endpoint_resp.msg.as_deref().unwrap_or("unknown") ))); } let ep = endpoint_resp.data .ok_or_else(|| ChannelError::ConnectionError("Empty endpoint data".to_string()))?; let client_config = ep.client_config.unwrap_or_default(); Ok((ep.url, client_config)) } /// Get tenant access token (cached with proactive refresh). async fn get_tenant_access_token(&self) -> Result { // 1. Check cache { let cached = self.tenant_token.read().await; if let Some(ref token) = *cached { if Instant::now() < token.refresh_after { return Ok(token.value.clone()); } } } // 2. Fetch new token let (token, ttl) = self.fetch_new_token().await?; // 3. Cache with proactive refresh time (提前 120 秒) let refresh_after = Instant::now() + ttl.saturating_sub(TOKEN_REFRESH_SKEW); { let mut cached = self.tenant_token.write().await; *cached = Some(CachedTenantToken { value: token.clone(), refresh_after, }); } Ok(token) } /// Fetch a new tenant access token from Feishu. async fn fetch_new_token(&self) -> Result<(String, Duration), ChannelError> { let resp = self.http_client .post(format!("{}/auth/v3/tenant_access_token/internal", FEISHU_API_BASE)) .header("Content-Type", "application/json") .json(&serde_json::json!({ "app_id": self.config.app_id, "app_secret": self.config.app_secret, })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("HTTP error: {}", e)))?; #[derive(Deserialize)] struct TokenResponse { code: i32, tenant_access_token: Option, expire: Option, } let token_resp: TokenResponse = resp .json() .await .map_err(|e| ChannelError::Other(format!("Failed to parse token response: {}", e)))?; if token_resp.code != 0 { return Err(ChannelError::Other("Auth failed".to_string())); } let token = token_resp.tenant_access_token .ok_or_else(|| ChannelError::Other("No token in response".to_string()))?; let ttl = token_resp.expire .and_then(|v| u64::try_from(v).ok()) .map(Duration::from_secs) .unwrap_or(DEFAULT_TOKEN_TTL); Ok((token, ttl)) } /// Check if message_id has been seen (dedup), and mark it as seen if not. /// Returns true if the message was already processed. /// Note: GC of stale entries is handled in the heartbeat timeout_check loop. async fn is_message_seen(&self, message_id: &str) -> bool { let mut seen = self.seen_message_ids.write().await; let now = Instant::now(); if seen.contains_key(message_id) { true } else { seen.insert(message_id.to_string(), now); false } } /// Download media and save locally, return (description, media_item) async fn download_media( &self, msg_type: &str, content_json: &serde_json::Value, message_id: &str, ) -> Result<(String, Option), ChannelError> { let media_dir = Path::new(&self.config.media_dir); tokio::fs::create_dir_all(media_dir).await .map_err(|e| ChannelError::Other(format!("Failed to create media dir: {}", e)))?; match msg_type { "image" => self.download_image(content_json, message_id, media_dir).await, "audio" | "file" | "media" => self.download_file(content_json, message_id, media_dir, msg_type).await, _ => Ok((format!("[unsupported media type: {}]", msg_type), None)), } } /// Download image from Feishu async fn download_image( &self, content_json: &serde_json::Value, message_id: &str, media_dir: &Path, ) -> Result<(String, Option), ChannelError> { let image_key = content_json.get("image_key") .and_then(|v| v.as_str()) .ok_or_else(|| ChannelError::Other("No image_key in message".to_string()))?; let token = self.get_tenant_access_token().await?; // Use message resource API for downloading message images let url = format!("{}/im/v1/messages/{}/resources/{}?type=image", FEISHU_API_BASE, message_id, image_key); #[cfg(debug_assertions)] tracing::debug!(url = %url, image_key = %image_key, message_id = %message_id, "Downloading image from Feishu via message resource API"); let resp = self.http_client .get(&url) .header("Authorization", format!("Bearer {}", token)) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Download image HTTP error: {}", e)))?; let status = resp.status(); #[cfg(debug_assertions)] tracing::debug!(status = %status, "Image download response status"); if !status.is_success() { let error_text = resp.text().await.unwrap_or_default(); return Err(ChannelError::Other(format!("Image download failed {}: {}", status, error_text))); } let content_type = resp .headers() .get("content-type") .and_then(|v| v.to_str().ok()) .unwrap_or("image/jpeg") .to_string(); let ext = match content_type.as_str() { "image/png" => "png", "image/gif" => "gif", "image/webp" => "webp", "image/bmp" => "bmp", _ => "jpg", }; let data = resp.bytes().await .map_err(|e| ChannelError::Other(format!("Failed to read image data: {}", e)))? .to_vec(); #[cfg(debug_assertions)] tracing::debug!(data_len = %data.len(), content_type = %content_type, "Downloaded image data"); let filename = format!("{}_{}.{}", message_id, &image_key[..8.min(image_key.len())], ext); let file_path = media_dir.join(&filename); tokio::fs::write(&file_path, &data).await .map_err(|e| ChannelError::Other(format!("Failed to write image: {}", e)))?; let media_item = MediaItem::new( file_path.to_string_lossy().to_string(), "image", ); tracing::info!(message_id = %message_id, filename = %filename, "Downloaded image"); Ok((format!("[image: {}]", filename), Some(media_item))) } /// Download file/audio from Feishu async fn download_file( &self, content_json: &serde_json::Value, message_id: &str, media_dir: &Path, file_type: &str, ) -> Result<(String, Option), ChannelError> { let file_key = content_json.get("file_key") .and_then(|v| v.as_str()) .ok_or_else(|| ChannelError::Other("No file_key in message".to_string()))?; let token = self.get_tenant_access_token().await?; // Use message resource API for downloading message files let url = format!("{}/im/v1/messages/{}/resources/{}?type=file", FEISHU_API_BASE, message_id, file_key); #[cfg(debug_assertions)] tracing::debug!(url = %url, file_key = %file_key, message_id = %message_id, "Downloading file from Feishu via message resource API"); let resp = self.http_client .get(&url) .header("Authorization", format!("Bearer {}", token)) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Download file HTTP error: {}", e)))?; let status = resp.status(); if !status.is_success() { let error_text = resp.text().await.unwrap_or_default(); return Err(ChannelError::Other(format!("File download failed {}: {}", status, error_text))); } let data = resp.bytes().await .map_err(|e| ChannelError::Other(format!("Failed to read file data: {}", e)))? .to_vec(); let extension = match file_type { "audio" => "mp3", "video" => "mp4", _ => "bin", }; let filename = format!("{}_{}.{}", message_id, &file_key[..8.min(file_key.len())], extension); let file_path = media_dir.join(&filename); tokio::fs::write(&file_path, &data).await .map_err(|e| ChannelError::Other(format!("Failed to write file: {}", e)))?; let media_item = MediaItem::new( file_path.to_string_lossy().to_string(), file_type, ); tracing::info!(message_id = %message_id, filename = %filename, file_type = %file_type, "Downloaded file"); Ok((format!("[{}: {}]", file_type, filename), Some(media_item))) } /// Upload image to Feishu and return the image_key async fn upload_image(&self, file_path: &str) -> Result { let token = self.get_tenant_access_token().await?; let mime = mime_guess::from_path(file_path) .first_or_octet_stream() .to_string(); let file_name = std::path::Path::new(file_path) .file_name() .and_then(|n| n.to_str()) .unwrap_or("image.jpg"); let file_data = tokio::fs::read(file_path).await .map_err(|e| ChannelError::Other(format!("Failed to read file: {}", e)))?; let part = reqwest::multipart::Part::bytes(file_data) .file_name(file_name.to_string()) .mime_str(&mime) .map_err(|e| ChannelError::Other(format!("Invalid mime type: {}", e)))?; let form = reqwest::multipart::Form::new() .text("image_type", "message".to_string()) .part("image", part); let resp = self.http_client .post(format!("{}/im/v1/images/upload", FEISHU_API_BASE)) .header("Authorization", format!("Bearer {}", token)) .multipart(form) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Upload image HTTP error: {}", e)))?; #[derive(Deserialize)] struct UploadResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct UploadData { image_key: String, } let result: UploadResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse upload response error: {}", e)))?; if result.code != 0 { return Err(ChannelError::Other(format!( "Upload image failed: code={} msg={}", result.code, result.msg.as_deref().unwrap_or("unknown") ))); } result.data .map(|d| d.image_key) .ok_or_else(|| ChannelError::Other("No image_key in response".to_string())) } /// Upload file to Feishu and return the file_key async fn upload_file(&self, file_path: &str) -> Result { let token = self.get_tenant_access_token().await?; let file_name = std::path::Path::new(file_path) .file_name() .and_then(|n| n.to_str()) .unwrap_or("file.bin"); let extension = std::path::Path::new(file_path) .extension() .and_then(|e| e.to_str()) .unwrap_or("") .to_lowercase(); let file_type = match extension.as_str() { "mp3" | "m4a" | "wav" | "ogg" => "audio", "mp4" | "mov" | "avi" | "mkv" => "video", "pdf" | "doc" | "docx" | "xls" | "xlsx" => "doc", _ => "file", }; let file_data = tokio::fs::read(file_path).await .map_err(|e| ChannelError::Other(format!("Failed to read file: {}", e)))?; let part = reqwest::multipart::Part::bytes(file_data) .file_name(file_name.to_string()) .mime_str("application/octet-stream") .map_err(|e| ChannelError::Other(format!("Invalid mime type: {}", e)))?; let form = reqwest::multipart::Form::new() .text("file_type", file_type.to_string()) .text("file_name", file_name.to_string()) .part("file", part); let resp = self.http_client .post(format!("{}/im/v1/files", FEISHU_API_BASE)) .header("Authorization", format!("Bearer {}", token)) .multipart(form) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Upload file HTTP error: {}", e)))?; #[derive(Deserialize)] struct UploadResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct UploadData { file_key: String, } let result: UploadResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse upload response error: {}", e)))?; if result.code != 0 { return Err(ChannelError::Other(format!( "Upload file failed: code={} msg={}", result.code, result.msg.as_deref().unwrap_or("unknown") ))); } result.data .map(|d| d.file_key) .ok_or_else(|| ChannelError::Other("No file_key in response".to_string())) } /// Add a reaction emoji to a message and store the reaction_id for later removal. /// Returns the reaction_id if successful, None otherwise. async fn add_reaction(&self, message_id: &str) -> Result, ChannelError> { let emoji = self.config.reaction_emoji.as_str(); let token = self.get_tenant_access_token().await?; let resp = self.http_client .post(format!("{}/im/v1/messages/{}/reactions", FEISHU_API_BASE, message_id)) .header("Authorization", format!("Bearer {}", token)) .json(&serde_json::json!({ "reaction_type": { "emoji_type": emoji } })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Add reaction HTTP error: {}", e)))?; #[derive(Deserialize)] struct ReactionResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct ReactionData { reaction_id: Option, } let result: ReactionResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse reaction response error: {}", e)))?; if result.code != 0 { tracing::warn!( "Failed to add reaction to message {}: code={} msg={}", message_id, result.code, result.msg.as_deref().unwrap_or("unknown") ); return Ok(None); } let reaction_id = result.data.and_then(|d| d.reaction_id); Ok(reaction_id) } /// Remove reaction using feishu metadata propagated through OutboundMessage. /// Reads feishu.message_id and feishu.reaction_id from metadata. async fn remove_reaction_from_metadata(&self, metadata: &std::collections::HashMap) { let (message_id, reaction_id) = match ( metadata.get("feishu.message_id"), metadata.get("feishu.reaction_id"), ) { (Some(msg_id), Some(rid)) => (msg_id.clone(), rid.clone()), _ => return, }; if let Err(e) = self.remove_reaction(&message_id, &reaction_id).await { tracing::debug!(error = %e, message_id = %message_id, "Failed to remove reaction"); } } /// Remove a reaction emoji from a message. async fn remove_reaction(&self, message_id: &str, reaction_id: &str) -> Result<(), ChannelError> { let token = self.get_tenant_access_token().await?; let resp = self.http_client .delete(format!("{}/im/v1/messages/{}/reactions/{}", FEISHU_API_BASE, message_id, reaction_id)) .header("Authorization", format!("Bearer {}", token)) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Remove reaction HTTP error: {}", e)))?; #[derive(Deserialize)] struct ReactionResp { code: i32, msg: Option, } let result: ReactionResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse remove reaction response error: {}", e)))?; if result.code != 0 { tracing::debug!( "Failed to remove reaction {} from message {}: code={} msg={}", reaction_id, message_id, result.code, result.msg.as_deref().unwrap_or("unknown") ); } Ok(()) } const REPLY_CONTEXT_MAX_LEN: usize = 500; /// Fetch the text content of a Feishu message by ID. /// Returns a "[Reply to: ...]" context string, or None on failure. async fn get_message_content(&self, message_id: &str) -> Option { let token = match self.get_tenant_access_token().await { Ok(t) => t, Err(e) => { tracing::debug!(error = %e, message_id = %message_id, "Feishu: failed to get token for fetching parent message"); return None; } }; let resp = self.http_client .get(format!("{}/im/v1/messages/{}", FEISHU_API_BASE, message_id)) .header("Authorization", format!("Bearer {}", token)) .send() .await .ok()?; #[derive(Deserialize)] struct MessageResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct MessageData { items: Option>, } #[derive(Deserialize)] struct MessageItem { msg_type: String, body: Option, } #[derive(Deserialize)] struct MessageBody { content: Option, } let result: MessageResp = match resp.json().await { Ok(r) => r, Err(e) => { tracing::debug!(error = %e, message_id = %message_id, "Feishu: failed to parse parent message response"); return None; } }; if result.code != 0 { tracing::debug!( message_id = %message_id, code = %result.code, msg = ?result.msg, "Feishu: failed to fetch parent message" ); return None; } let items = result.data?.items?; let msg_obj = items.first()?; let raw_content = msg_obj.body.as_ref()?.content.as_ref()?; let msg_type = msg_obj.msg_type.as_str(); let text = match msg_type { "text" => { serde_json::from_str::(raw_content) .ok()? .get("text")? .as_str()? .to_string() } "post" => { parse_post_content(raw_content) } _ => String::new(), }; if text.is_empty() { return None; } let text = if text.len() > Self::REPLY_CONTEXT_MAX_LEN { format!("{}...", &text[..Self::REPLY_CONTEXT_MAX_LEN]) } else { text }; Some(format!("[Reply to: {}]", text)) } /// Send a message to Feishu chat with specified message type async fn send_message_to_feishu(&self, receive_id: &str, receive_id_type: &str, msg_type: &str, content: &str) -> Result<(), ChannelError> { let token = self.get_tenant_access_token().await?; // Feishu text messages have content limits (~64KB). // Truncate if content is too long to avoid API error 230001. const MAX_TEXT_LENGTH: usize = 60_000; let payload_content = if msg_type == "text" { let truncated = if content.len() > MAX_TEXT_LENGTH { format!("{}...\n\n[Content truncated due to length limit]", &content[..MAX_TEXT_LENGTH]) } else { content.to_string() }; serde_json::json!({ "text": truncated }).to_string() } else { // For post messages, content is already JSON (from markdown_to_post) // But we still need to check length if content.len() > MAX_TEXT_LENGTH { // Fallback to truncated text for post as well serde_json::json!({ "text": format!("{}...\n\n[Content truncated due to length limit]", &content[..MAX_TEXT_LENGTH]) }).to_string() } else { content.to_string() } }; let resp = self.http_client .post(format!("{}/im/v1/messages?receive_id_type={}", FEISHU_API_BASE, receive_id_type)) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", token)) .json(&serde_json::json!({ "receive_id": receive_id, "msg_type": msg_type, "content": payload_content })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Send message HTTP error: {}", e)))?; #[derive(Deserialize)] struct SendResp { code: i32, msg: String, } let send_resp: SendResp = resp .json() .await .map_err(|e| ChannelError::Other(format!("Parse send response error: {}", e)))?; if send_resp.code != 0 { return Err(ChannelError::Other(format!("Send message failed: code={} msg={}", send_resp.code, send_resp.msg))); } Ok(()) } /// Extract service_id from WebSocket URL query params fn extract_service_id(url: &str) -> i32 { url.split('?') .nth(1) .and_then(|qs| { qs.split('&') .find(|kv| kv.starts_with("service_id=")) .and_then(|kv| kv.split('=').nth(1)) .and_then(|v| v.parse::().ok()) }) .unwrap_or(0) } /// Handle incoming binary PbFrame - returns Some(ParsedMessage) if we need to ack async fn handle_frame(&self, frame: &PbFrame) -> Result, ChannelError> { // method 0 = CONTROL (ping/pong) if frame.method == 0 { return Ok(None); } // method 1 = DATA (events) if frame.method != 1 { return Ok(None); } let payload = frame.payload.as_deref() .ok_or_else(|| ChannelError::Other("No payload in frame".to_string()))?; #[cfg(debug_assertions)] tracing::debug!(payload_len = %payload.len(), "Received frame payload"); let event: LarkEvent = serde_json::from_slice(payload) .map_err(|e| ChannelError::Other(format!("Parse event error: {}", e)))?; let event_type = event.header.event_type.as_str(); #[cfg(debug_assertions)] tracing::debug!(event_type = %event_type, "Received event type"); if event_type != "im.message.receive_v1" { return Ok(None); } let payload_data: MsgReceivePayload = serde_json::from_value(event.event.clone()) .map_err(|e| ChannelError::Other(format!("Parse payload error: {}", e)))?; // Skip bot messages if payload_data.sender.sender_type == "bot" { return Ok(None); } let message_id = payload_data.message.message_id.clone(); // Deduplication check if self.is_message_seen(&message_id).await { #[cfg(debug_assertions)] tracing::debug!(message_id = %message_id, "Duplicate message, skipping"); return Ok(None); } #[cfg(debug_assertions)] tracing::debug!(message_id = %message_id, "Received Feishu message"); let open_id = payload_data.sender.sender_id.open_id .ok_or_else(|| ChannelError::Other("No open_id".to_string()))?; let msg = payload_data.message; let chat_id = msg.chat_id.clone(); let msg_type = msg.message_type.as_str(); let raw_content = msg.content.clone(); let parent_id = msg.parent_id.clone(); #[cfg(debug_assertions)] tracing::debug!(msg_type = %msg_type, chat_id = %chat_id, open_id = %open_id, "Parsing message content"); let (mut content, media) = self.parse_and_download_message(msg_type, &raw_content, &message_id).await?; // Fetch and prepend quoted message content if this is a reply if let Some(ref pid) = parent_id { if let Some(reply_ctx) = self.get_message_content(pid).await { content = format!("{}\n{}", reply_ctx, content); } } #[cfg(debug_assertions)] if let Some(ref m) = media { tracing::debug!(media_type = %m.media_type, media_path = %m.path, "Media downloaded successfully"); } Ok(Some(ParsedMessage { message_id, open_id, chat_id, content, media, parent_id, })) } /// Parse message content and download media if needed async fn parse_and_download_message( &self, msg_type: &str, content: &str, message_id: &str, ) -> Result<(String, Option), ChannelError> { let (text, media) = match msg_type { "text" => { let text = if let Ok(parsed) = serde_json::from_str::(content) { parsed.get("text").and_then(|v| v.as_str()).unwrap_or(content).to_string() } else { content.to_string() }; (text, None) } "post" => { (parse_post_content(content), None) } "image" | "audio" | "file" | "media" => { if let Ok(content_json) = serde_json::from_str::(content) { match self.download_media(msg_type, &content_json, message_id).await { Ok((text, media)) => (text, media), Err(_) => (format!("[{}: content unavailable]", msg_type), None), } } else { (format!("[{}: content unavailable]", msg_type), None) } } "share_chat" => { // Shared chat/cannel messages if let Ok(parsed) = serde_json::from_str::(content) { let chat_id = parsed.get("chat_id").and_then(|v| v.as_str()).unwrap_or("unknown"); (format!("[shared chat: {}]", chat_id), None) } else { ("[shared chat]".to_string(), None) } } "share_user" => { // Shared user messages if let Ok(parsed) = serde_json::from_str::(content) { let user_id = parsed.get("user_id").and_then(|v| v.as_str()).unwrap_or("unknown"); (format!("[shared user: {}]", user_id), None) } else { ("[shared user]".to_string(), None) } } "interactive" => { // Interactive card messages - extract text content match extract_interactive_content(content) { Ok((text, media)) => (text, media), Err(e) => { tracing::warn!(error = %e, "Failed to extract interactive content"); (content.to_string(), None) } } } "list" => { // List/bullet messages match parse_list_content(content) { Ok((text, media)) => (text, media), Err(_) => (content.to_string(), None), } } "merge_forward" => { ("[merged forward messages]".to_string(), None) } "share_calendar_event" => { if let Ok(parsed) = serde_json::from_str::(content) { let event_key = parsed.get("event_key").and_then(|v| v.as_str()).unwrap_or("unknown"); (format!("[shared calendar event: {}]", event_key), None) } else { ("[shared calendar event]".to_string(), None) } } "system" => { ("[system message]".to_string(), None) } _ => (content.to_string(), None), }; // Strip @_user_N placeholders from group chat @mentions let clean_text = strip_at_placeholders(&text); Ok((clean_text, media)) } /// Send acknowledgment for a message async fn send_ack(frame: &PbFrame, write: &mut futures_util::stream::SplitSink>, tokio_tungstenite::tungstenite::Message>) -> Result<(), ChannelError> { let mut ack = frame.clone(); ack.payload = Some(br#"{"code":200,"headers":{},"data":[]}"#.to_vec()); ack.headers.push(PbHeader { key: "biz_rt".into(), value: "0".into(), }); write.send(tokio_tungstenite::tungstenite::Message::Binary(ack.encode_to_vec().into())) .await .map_err(|e| ChannelError::Other(format!("Failed to send ack: {}", e)))?; Ok(()) } async fn run_ws_loop(&self, bus: Arc, mut shutdown_rx: broadcast::Receiver<()>) -> Result<(), ChannelError> { let (wss_url, client_config) = self.get_ws_endpoint(&self.http_client).await?; let service_id = Self::extract_service_id(&wss_url); tracing::info!(url = %wss_url, "Connecting to Feishu WebSocket"); let (ws_stream, _) = tokio_tungstenite::connect_async(&wss_url) .await .map_err(|e| ChannelError::ConnectionError(format!("WebSocket connection failed: {}", e)))?; *self.connected.write().await = true; tracing::info!("Feishu WebSocket connected"); let (mut write, mut read) = ws_stream.split(); // Send initial ping let ping_frame = PbFrame { seq_id: 1, log_id: 0, service: service_id, method: 0, headers: vec![PbHeader { key: "type".into(), value: "ping".into(), }], payload: None, }; write.send(tokio_tungstenite::tungstenite::Message::Binary(ping_frame.encode_to_vec().into())) .await .map_err(|e| ChannelError::ConnectionError(format!("Failed to send initial ping: {}", e)))?; let ping_interval = client_config.ping_interval.unwrap_or(120).max(10); let mut ping_interval_tok = tokio::time::interval(tokio::time::Duration::from_secs(ping_interval)); let mut timeout_check = tokio::time::interval(tokio::time::Duration::from_secs(10)); let mut seq: u64 = 1; let mut last_recv = Instant::now(); // Consume the immediate tick ping_interval_tok.tick().await; timeout_check.tick().await; loop { tokio::select! { msg = read.next() => { match msg { Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(data))) => { last_recv = Instant::now(); let bytes: Bytes = data; if let Ok(frame) = PbFrame::decode(bytes.as_ref()) { match self.handle_frame(&frame).await { Ok(Some(parsed)) => { // Send ACK immediately (Feishu requires within 3 s) if let Err(e) = Self::send_ack(&frame, &mut write).await { tracing::error!(error = %e, "Failed to send ACK to Feishu"); } // Add reaction emoji (await so we get the reaction_id for later removal) let message_id = parsed.message_id.clone(); let reaction_id = match self.add_reaction(&message_id).await { Ok(Some(rid)) => Some(rid), Ok(None) => None, Err(e) => { tracing::debug!(error = %e, message_id = %message_id, "Failed to add reaction"); None } }; // forwarded_metadata is copied to OutboundMessage.metadata by the gateway. let mut forwarded_metadata = std::collections::HashMap::new(); forwarded_metadata.insert("feishu.message_id".to_string(), message_id.clone()); if let Some(ref rid) = reaction_id { forwarded_metadata.insert("feishu.reaction_id".to_string(), rid.clone()); } if let Some(ref pid) = parsed.parent_id { forwarded_metadata.insert("feishu.parent_id".to_string(), pid.clone()); } // Publish to bus asynchronously let channel = self.clone(); let bus = bus.clone(); tokio::spawn(async move { let media_count = if parsed.media.is_some() { 1 } else { 0 }; #[cfg(debug_assertions)] tracing::debug!(open_id = %parsed.open_id, chat_id = %parsed.chat_id, content_len = %parsed.content.len(), media_count = %media_count, "Publishing message to bus"); let msg = crate::bus::InboundMessage { channel: "feishu".to_string(), sender_id: parsed.open_id.clone(), chat_id: parsed.chat_id.clone(), content: parsed.content.clone(), timestamp: crate::bus::message::current_timestamp(), media: parsed.media.map(|m| vec![m]).unwrap_or_default(), metadata: std::collections::HashMap::new(), forwarded_metadata, }; if let Err(e) = channel.handle_and_publish(&bus, &msg).await { tracing::error!(error = %e, open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Failed to publish Feishu message to bus"); } else { #[cfg(debug_assertions)] tracing::debug!(open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Message published to bus successfully"); } }); } Ok(None) => {} Err(e) => { tracing::warn!(error = %e, "Failed to parse Feishu frame"); } } } } Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(data))) => { last_recv = Instant::now(); let pong = PbFrame { seq_id: seq.wrapping_add(1), log_id: 0, service: service_id, method: 0, headers: vec![PbHeader { key: "type".into(), value: "pong".into(), }], payload: Some(data.to_vec()), }; let _ = write.send(tokio_tungstenite::tungstenite::Message::Binary(pong.encode_to_vec().into())).await; } Some(Ok(tokio_tungstenite::tungstenite::Message::Pong(_))) => { last_recv = Instant::now(); } Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => { #[cfg(debug_assertions)] tracing::debug!("Feishu WebSocket closed"); break; } Some(Err(e)) => { tracing::warn!(error = %e, "Feishu WebSocket error"); break; } _ => {} } } _ = ping_interval_tok.tick() => { seq = seq.wrapping_add(1); let ping = PbFrame { seq_id: seq, log_id: 0, service: service_id, method: 0, headers: vec![PbHeader { key: "type".into(), value: "ping".into(), }], payload: None, }; if write.send(tokio_tungstenite::tungstenite::Message::Binary(ping.encode_to_vec().into())).await.is_err() { tracing::warn!("Feishu ping failed, reconnecting"); break; } } _ = timeout_check.tick() => { if last_recv.elapsed() > WS_HEARTBEAT_TIMEOUT { tracing::warn!("Feishu WebSocket heartbeat timeout, reconnecting"); break; } // GC dedup cache: remove entries older than TTL (matches zeroclaw pattern) let now = Instant::now(); let mut seen = self.seen_message_ids.write().await; seen.retain(|_, ts| now.duration_since(*ts) < DEDUP_CACHE_TTL); } _ = shutdown_rx.recv() => { tracing::info!("Feishu channel shutdown signal received"); break; } } } *self.connected.write().await = false; Ok(()) } } fn parse_post_content(content: &str) -> String { /// Extract text from a single post element (text, link, at-mention). fn extract_element(el: &serde_json::Value, out: &mut Vec) { match el.get("tag").and_then(|t| t.as_str()).unwrap_or("") { "text" => { if let Some(text) = el.get("text").and_then(|t| t.as_str()) { out.push(text.to_string()); } } "a" => { let link_text = el.get("text") .and_then(|t| t.as_str()) .filter(|s| !s.is_empty()) .or_else(|| el.get("href").and_then(|h| h.as_str())) .unwrap_or(""); out.push(link_text.to_string()); } "at" => { let name = el.get("user_name") .and_then(|n| n.as_str()) .or_else(|| el.get("user_id").and_then(|i| i.as_str())) .unwrap_or("user"); out.push(format!("@{}", name)); } "code_block" => { let lang = el.get("language").and_then(|l| l.as_str()).unwrap_or(""); let code_text = el.get("text").and_then(|t| t.as_str()).unwrap_or(""); out.push(format!("\n```{}\n{}\n```\n", lang, code_text)); } _ => { if let Some(text) = el.get("text").and_then(|t| t.as_str()) { out.push(text.to_string()); } } } } /// Parse a single block {title, content: [[...]]} and append text to out. fn parse_block(block: &serde_json::Value, out: &mut Vec) { let title = block.get("title").and_then(|t| t.as_str()).filter(|s| !s.is_empty()); if let Some(t) = title { out.push(t.to_string()); out.push("\n\n".to_string()); } if let Some(content_arr) = block.get("content").and_then(|c| c.as_array()) { for row in content_arr { if let Some(row_arr) = row.as_array() { for el in row_arr { extract_element(el, out); } out.push("\n".to_string()); } } } } let Ok(parsed) = serde_json::from_str::(content) else { return content.to_string(); }; let mut texts = Vec::new(); // Unwrap optional {"post": ...} envelope (nanobot: root = root["post"]) let root = if parsed.get("post").and_then(|p| p.as_object()).is_some() { parsed.get("post").unwrap() } else { &parsed }; // Try direct format: {"title": ..., "content": [[...]]} if root.get("content").and_then(|c| c.as_array()).is_some() { parse_block(root, &mut texts); let result = texts.join(""); if !result.trim().is_empty() { return result.trim().to_string(); } texts.clear(); } // Try localized: {"zh_cn": {"title": ..., "content": [...]}} for key in ["zh_cn", "en_us", "ja_jp"] { if let Some(locale_data) = root.get(key).and_then(|l| l.as_object()) { parse_block(&serde_json::json!(locale_data), &mut texts); let result = texts.join(""); if !result.trim().is_empty() { return result.trim().to_string(); } texts.clear(); } } // Fall back: try any dict child if let Some(root_obj) = root.as_object() { for (_key, val) in root_obj { if let Some(obj) = val.as_object() { if obj.get("content").and_then(|c| c.as_array()).is_some() { parse_block(val, &mut texts); let result = texts.join(""); if !result.trim().is_empty() { return result.trim().to_string(); } texts.clear(); } } } } content.to_string() } /// Extract text content from interactive card messages fn extract_interactive_content(content: &str) -> Result<(String, Option), ChannelError> { let parsed = match serde_json::from_str::(content) { Ok(p) => p, Err(_) => return Ok((content.to_string(), None)), }; let mut texts = Vec::new(); // Extract from elements array if let Some(elements) = parsed.get("elements").and_then(|e| e.as_array()) { for el in elements { extract_element_content(el, &mut texts); } } // Extract from card object if let Some(card) = parsed.get("card").and_then(|c| c.as_object()) { if let Some(elements) = card.get("elements").and_then(|e| e.as_array()) { for el in elements { extract_element_content(el, &mut texts); } } } // Extract from header if let Some(header) = parsed.get("header").and_then(|h| h.as_object()) { if let Some(title) = header.get("title").and_then(|t| t.as_object()) { if let Some(text) = title.get("content").and_then(|c| c.as_str()) { texts.push(format!("title: {}\n", text)); } } } let result = texts.join("").trim().to_string(); if result.is_empty() { Ok((content.to_string(), None)) } else { Ok((result, None)) } } /// Extract content from a single card element fn extract_element_content(element: &serde_json::Value, texts: &mut Vec) { let tag = element.get("tag").and_then(|t| t.as_str()).unwrap_or(""); match tag { "markdown" | "lark_md" => { if let Some(content) = element.get("content").and_then(|c| c.as_str()) { texts.push(content.to_string()); texts.push("\n".to_string()); } } "div" => { if let Some(text_obj) = element.get("text").and_then(|t| t.as_object()) { let content = text_obj.get("content") .and_then(|c| c.as_str()) .unwrap_or(""); texts.push(content.to_string()); } else if let Some(content) = element.get("text").and_then(|t| t.as_str()) { texts.push(content.to_string()); } texts.push("\n".to_string()); } "a" => { let href = element.get("href") .and_then(|h| h.as_str()) .unwrap_or(""); let text = element.get("text") .and_then(|t| t.as_str()) .unwrap_or(""); if !text.is_empty() { texts.push(text.to_string()); } else if !href.is_empty() { texts.push(format!("link: {}", href)); } } "img" => { let alt = element.get("alt"); let alt_text = alt.and_then(|a| a.as_str()) .or_else(|| alt.and_then(|a| a.as_object()).and_then(|o| o.get("content")).and_then(|c| c.as_str())) .unwrap_or("[image]"); texts.push(format!("{}\n", alt_text)); } "note" => { if let Some(elements) = element.get("elements").and_then(|e| e.as_array()) { for el in elements { extract_element_content(el, texts); } } } "column_set" => { if let Some(columns) = element.get("columns").and_then(|c| c.as_array()) { for col in columns { if let Some(elements) = col.get("elements").and_then(|e| e.as_array()) { for el in elements { extract_element_content(el, texts); } } } } } "table" => { // Tables are complex, just indicate presence texts.push("[table]\n".to_string()); } _ => { // Recursively check for nested elements if let Some(elements) = element.get("elements").and_then(|e| e.as_array()) { for el in elements { extract_element_content(el, texts); } } } } } /// Parse Feishu list/bullet message content into plain text fn parse_list_content(content: &str) -> Result<(String, Option), ChannelError> { let parsed = match serde_json::from_str::(content) { Ok(p) => p, Err(_) => return Ok((content.to_string(), None)), }; let items = parsed.get("items") .and_then(|i| i.as_array()) .or_else(|| parsed.get("content").and_then(|c| c.as_array())); let Some(items) = items else { return Ok((content.to_string(), None)); }; let mut lines = Vec::new(); collect_list_items(items, &mut lines, 0); let result = lines.join("\n").trim().to_string(); if result.is_empty() { Ok((content.to_string(), None)) } else { Ok((result, None)) } } /// Recursively collect list item text with indentation fn collect_list_items(items: &[serde_json::Value], lines: &mut Vec, depth: usize) { let indent = " ".repeat(depth); for item in items { // Items can be arrays of inline elements or objects with content/children let inline_elements = if let Some(arr) = item.as_array() { arr.as_slice() } else if let Some(obj) = item.as_object() { obj.get("content") .and_then(|c| c.as_array()) .map(|a| a.as_slice()) .unwrap_or(&[]) } else { continue; }; let mut text = String::new(); for el in inline_elements { extract_inline_text(el, &mut text); } let trimmed = text.trim(); if !trimmed.is_empty() { lines.push(format!("{}- {}", indent, trimmed)); } // Handle nested children if let Some(obj) = item.as_object() { if let Some(children) = obj.get("children").and_then(|c| c.as_array()) { collect_list_items(children, lines, depth + 1); } } else if let Some(children_arr) = item.as_array().and_then(|arr| { arr.iter().find_map(|child| { if child.as_object().and_then(|o| o.get("children")).is_some() { Some(child) } else { None } }) }) { if let Some(children) = children_arr.as_object().and_then(|o| o.get("children")).and_then(|c| c.as_array()) { collect_list_items(children, lines, depth + 1); } } } } /// Extract text from inline elements (text, link, at-mention) fn extract_inline_text(el: &serde_json::Value, out: &mut String) { match el.get("tag").and_then(|t| t.as_str()).unwrap_or("") { "text" => { if let Some(text) = el.get("text").and_then(|t| t.as_str()) { out.push_str(text); } } "a" => { let text = el.get("text") .and_then(|t| t.as_str()) .filter(|s| !s.is_empty()) .or_else(|| el.get("href").and_then(|h| h.as_str())) .unwrap_or(""); out.push_str(text); } "at" => { let name = el.get("user_name") .and_then(|n| n.as_str()) .or_else(|| el.get("user_id").and_then(|i| i.as_str())) .unwrap_or("user"); out.push_str(&format!("@{}", name)); } _ => {} } } /// Remove @_user_N placeholder tokens injected by Feishu in group chats fn strip_at_placeholders(text: &str) -> String { let mut result = String::with_capacity(text.len()); let mut chars = text.chars().peekable(); while let Some(ch) = chars.next() { if ch == '@' { let rest: String = chars.clone().collect(); if let Some(after) = rest.strip_prefix("_user_") { // Skip until we hit a non-alphanumeric character let placeholder_len = after.find(|c: char| !c.is_alphanumeric()).unwrap_or(after.len()); // Skip the placeholder for _ in 0..placeholder_len { chars.next(); } // Also skip the underscore after user_N if present if chars.peek() == Some(&'_') { chars.next(); } continue; } } result.push(ch); } result } // ───────────────────────────────────────────────────────────────────────────── // Markdown parsing and Feishu card element building // ───────────────────────────────────────────────────────────────────────────── /// Message format types for Feishu #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum MsgFormat { /// Plain text, short and no markdown Text, /// Rich text (links only, moderate length) Post, /// Interactive card with full markdown rendering Interactive, } /// Regex patterns for markdown detection struct MdPatterns { /// Regex to match markdown tables (header + separator + data rows) table_re: Regex, /// Regex to match headings (# heading) heading_re: Regex, /// Regex to match code blocks (```...```) code_block_re: Regex, /// Regex to match bold **text** or __text__ bold_re: Regex, /// Regex to match italic *text* or _text_ italic_re: Regex, /// Regex to match strikethrough ~~text~~ strike_re: Regex, /// Regex to match markdown links [text](url) link_re: Regex, /// Regex to match unordered list items (- item or * item) list_re: Regex, /// Regex to match ordered list items (1. item) olist_re: Regex, } impl MdPatterns { fn new() -> Self { Self { table_re: Regex::new(r"((?:^[ \t]*\|.+\|[ \t]*\n)(?:^[ \t]*\|[-:\s|]+\|[ \t]*\n)(?:^[ \t]*\|.+\|[ \t]*\n?)+)").unwrap(), heading_re: Regex::new(r"^(#{1,6})\s+(.+)$").unwrap(), code_block_re: Regex::new(r"(```[\s\S]*?```)").unwrap(), bold_re: Regex::new(r"\*\*(.+?)\*\*|__(.+?)__").unwrap(), // Simple pattern: detect *text* or _text_ markers (may overlap with bold, but that's ok for detection) // We use a conservative approach: detect potential italic markers italic_re: Regex::new(r"\*[^*]+\*|_[^_]+_").unwrap(), strike_re: Regex::new(r"~~.+?~~").unwrap(), link_re: Regex::new(r"\[([^\]]+)\]\((https?://[^\)]+)\)").unwrap(), list_re: Regex::new(r"^[\s]*[-*+]\s+").unwrap(), olist_re: Regex::new(r"^[\s]*\d+\.\s+").unwrap(), } } } impl Default for MdPatterns { fn default() -> Self { Self::new() } } impl FeishuChannel { /// Determine the optimal Feishu message format for content. fn detect_msg_format(content: &str) -> MsgFormat { let patterns = MdPatterns::new(); let stripped = content.trim(); // Complex markdown (code blocks, tables, headings) → always interactive card if patterns.table_re.is_match(stripped) || patterns.heading_re.is_match(stripped) || patterns.code_block_re.is_match(stripped) { return MsgFormat::Interactive; } // Long content → interactive card (better readability) if stripped.len() > 2000 { return MsgFormat::Interactive; } // Has bold/italic/strikethrough → interactive card (post format can't render these) if patterns.bold_re.is_match(stripped) || patterns.italic_re.is_match(stripped) || patterns.strike_re.is_match(stripped) { return MsgFormat::Interactive; } // Has list items → interactive card (post format can't render list bullets well) if patterns.list_re.is_match(stripped) || patterns.olist_re.is_match(stripped) { return MsgFormat::Interactive; } // Has links → post format (supports tags) if patterns.link_re.is_match(stripped) { return MsgFormat::Post; } // Short plain text → text format if stripped.len() <= 200 { return MsgFormat::Text; } // Medium plain text without formatting → post format MsgFormat::Post } /// Strip markdown formatting markers from text for plain display. fn strip_md_formatting(text: &str) -> String { let patterns = MdPatterns::new(); let mut result = text.to_string(); // Remove bold markers result = patterns.bold_re.replace_all(&result, "$1$2").to_string(); // Remove italic markers result = patterns.italic_re.replace_all(&result, "$1$2").to_string(); // Remove strikethrough markers result = patterns.strike_re.replace_all(&result, "$1").to_string(); result } /// Parse a markdown table into a Feishu table element. fn parse_md_table(table_text: &str) -> Option { let lines: Vec<&str> = table_text .trim() .split('\n') .map(|l| l.trim()) .filter(|l| !l.is_empty()) .collect(); if lines.len() < 3 { return None; } fn split(line: &str) -> Vec { line.trim_start_matches('|') .trim_end_matches('|') .split('|') .map(|c| c.trim().to_string()) .collect() } let headers = split(lines[0]); let rows: Vec> = lines[2..] .iter() .map(|line| { let cells: Vec = split(line); headers .iter() .enumerate() .map(|(i, _h)| { ( format!("c{}", i), if i < cells.len() { Self::strip_md_formatting(&cells[i]) } else { String::new() }, ) }) .collect() }) .collect(); let columns: Vec = headers .iter() .enumerate() .map(|(i, h)| { serde_json::json!({ "tag": "column", "name": format!("c{}", i), "display_name": Self::strip_md_formatting(h), "width": "auto" }) }) .collect(); Some(serde_json::json!({ "tag": "table", "page_size": rows.len() + 1, "columns": columns, "rows": rows, })) } /// Split content by headings, converting headings to div elements. fn split_headings(content: &str) -> Vec { let patterns = MdPatterns::new(); let mut protected = content.to_string(); // Protect code blocks by replacing them with placeholders let mut code_blocks: Vec = Vec::new(); for m in patterns.code_block_re.find_iter(content) { code_blocks.push(m.as_str().to_string()); protected = protected.replace(m.as_str(), &format!("\x00CODE{}\x00", code_blocks.len() - 1)); } let mut elements: Vec = Vec::new(); let mut last_end = 0; for m in patterns.heading_re.find_iter(&protected) { let before = &protected[last_end..m.start()].trim(); if !before.is_empty() { elements.push(serde_json::json!({ "tag": "markdown", "content": before })); } let heading_text = Self::strip_md_formatting(m.as_str().trim_start_matches('#').trim()); let display_text = if heading_text.is_empty() { String::new() } else { format!("**{}**", heading_text) }; elements.push(serde_json::json!({ "tag": "div", "text": { "tag": "lark_md", "content": display_text } })); last_end = m.end(); } let remaining = protected[last_end..].trim(); if !remaining.is_empty() { // Restore code blocks let mut final_content = remaining.to_string(); for (i, cb) in code_blocks.iter().enumerate() { final_content = final_content.replace(&format!("\x00CODE{}\x00", i), cb); } elements.push(serde_json::json!({ "tag": "markdown", "content": final_content })); } if elements.is_empty() { elements.push(serde_json::json!({ "tag": "markdown", "content": content })); } elements } /// Split card elements into groups with at most one table element each. /// Feishu cards have a hard limit of one table per card (API error 11310). fn split_elements_by_table_limit(elements: &[serde_json::Value]) -> Vec> { if elements.is_empty() { return vec![vec![]]; } let mut groups: Vec> = Vec::new(); let mut current: Vec = Vec::new(); let mut table_count = 0; for el in elements { if el.get("tag").and_then(|t| t.as_str()) == Some("table") { if table_count >= 1 { groups.push(current); current = Vec::new(); table_count = 0; } current.push(el.clone()); table_count += 1; } else { current.push(el.clone()); } } if !current.is_empty() { groups.push(current); } groups } /// Build content into card elements (div/markdown + table). fn build_card_elements(content: &str) -> Vec { let patterns = MdPatterns::new(); let mut elements: Vec = Vec::new(); let mut last_end = 0; // Find all tables in content for m in patterns.table_re.find_iter(content) { let before = &content[last_end..m.start()]; if !before.trim().is_empty() { elements.extend(Self::split_headings(before)); } if let Some(table) = Self::parse_md_table(m.as_str()) { elements.push(table); } else { elements.push(serde_json::json!({ "tag": "markdown", "content": m.as_str() })); } last_end = m.end(); } let remaining = &content[last_end..]; if !remaining.trim().is_empty() { elements.extend(Self::split_headings(remaining)); } if elements.is_empty() { elements.push(serde_json::json!({ "tag": "markdown", "content": content })); } elements } /// Convert markdown content to Feishu post message JSON. /// Handles links [text](url) as tags; everything else as text tags. fn markdown_to_post(content: &str) -> String { let patterns = MdPatterns::new(); let lines = content.trim().split('\n'); let mut paragraphs: Vec> = Vec::new(); for line in lines { let mut elements: Vec = Vec::new(); let mut last_end = 0; for cap in patterns.link_re.captures_iter(line) { // Text before this link let m = cap.get(0).unwrap(); let before = &line[last_end..m.start()]; if !before.is_empty() { elements.push(serde_json::json!({ "tag": "text", "text": before })); } let link_text = cap.get(1).map(|g| g.as_str()).unwrap_or(""); let link_href = cap.get(2).map(|g| g.as_str()).unwrap_or(""); elements.push(serde_json::json!({ "tag": "a", "text": link_text, "href": link_href })); last_end = m.end(); } // Remaining text after last link let remaining = &line[last_end..]; if !remaining.is_empty() { elements.push(serde_json::json!({ "tag": "text", "text": remaining })); } // Empty line → empty paragraph for spacing if elements.is_empty() { elements.push(serde_json::json!({ "tag": "text", "text": "" })); } paragraphs.push(elements); } let post_body = serde_json::json!({ "zh_cn": { "content": paragraphs } }); post_body.to_string() } /// Send an interactive card message to Feishu. async fn send_interactive_card( &self, receive_id: &str, receive_id_type: &str, card_content: &str, ) -> Result<(), ChannelError> { let token = self.get_tenant_access_token().await?; let resp = self.http_client .post(format!("{}/im/v1/messages?receive_id_type={}", FEISHU_API_BASE, receive_id_type)) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", token)) .json(&serde_json::json!({ "receive_id": receive_id, "msg_type": "interactive", "content": card_content })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Send interactive card HTTP error: {}", e)))?; #[derive(Deserialize)] struct SendResp { code: i32, msg: String, } let send_resp: SendResp = resp .json() .await .map_err(|e| ChannelError::Other(format!("Parse send interactive card response error: {}", e)))?; if send_resp.code != 0 { return Err(ChannelError::Other(format!( "Send interactive card failed: code={} msg={}", send_resp.code, send_resp.msg ))); } Ok(()) } } #[async_trait] impl Channel for FeishuChannel { fn name(&self) -> &str { "feishu" } /// Handle an inbound message: check for slash commands first, then publish to bus async fn handle_and_publish( &self, bus: &Arc, msg: &crate::bus::InboundMessage, ) -> Result<(), ChannelError> { // All messages (including slash commands) go through the normal inbound flow // SessionManager handles session creation/reuse internally bus.publish_inbound(msg.clone()).await?; Ok(()) } async fn start(&self, bus: Arc) -> Result<(), ChannelError> { if self.config.app_id.is_empty() || self.config.app_secret.is_empty() { return Err(ChannelError::ConfigError( "Feishu app_id or app_secret is not configured".to_string() )); } *self.running.write().await = true; let (shutdown_tx, _) = broadcast::channel(1); *self.shutdown_tx.write().await = Some(shutdown_tx.clone()); let channel = self.clone(); let bus = bus.clone(); tokio::spawn(async move { let mut consecutive_failures = 0; let max_failures = 3; loop { if !*channel.running.read().await { break; } let shutdown_rx = shutdown_tx.subscribe(); match channel.run_ws_loop(bus.clone(), shutdown_rx).await { Ok(_) => { tracing::info!("Feishu WebSocket disconnected"); } Err(e) => { consecutive_failures += 1; tracing::error!(attempt = consecutive_failures, error = %e, "Feishu WebSocket error"); if consecutive_failures >= max_failures { tracing::error!("Feishu channel: max failures reached, stopping"); break; } } } if !*channel.running.read().await { break; } tracing::info!("Feishu channel retrying in 5s..."); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } *channel.running.write().await = false; tracing::info!("Feishu channel stopped"); }); tracing::info!("Feishu channel started"); Ok(()) } async fn stop(&self) -> Result<(), ChannelError> { *self.running.write().await = false; if let Some(tx) = self.shutdown_tx.write().await.take() { let _ = tx.send(()); } Ok(()) } fn is_running(&self) -> bool { self.running.try_read().map(|r| *r).unwrap_or(false) } async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError> { let receive_id = if msg.chat_id.starts_with("oc_") { &msg.chat_id } else { &msg.reply_to.as_ref().unwrap_or(&msg.chat_id) }; let receive_id_type = if msg.chat_id.starts_with("oc_") { "chat_id" } else { "open_id" }; // If no media, use smart format detection if msg.media.is_empty() { let content = msg.content.trim(); // Empty content if content.is_empty() { self.remove_reaction_from_metadata(&msg.metadata).await; return Ok(()); } let fmt = Self::detect_msg_format(content); match fmt { MsgFormat::Text => { // Short plain text – send as simple text message let result = self.send_message_to_feishu(receive_id, receive_id_type, "text", content).await; self.remove_reaction_from_metadata(&msg.metadata).await; return result; } MsgFormat::Post => { // Medium content with links – send as rich-text post let post_body = Self::markdown_to_post(content); let result = self.send_message_to_feishu(receive_id, receive_id_type, "post", &post_body).await; self.remove_reaction_from_metadata(&msg.metadata).await; return result; } MsgFormat::Interactive => { // Complex / long content – send as interactive card let elements = Self::build_card_elements(content); for chunk in Self::split_elements_by_table_limit(&elements) { let card = serde_json::json!({ "config": { "wide_screen_mode": true }, "elements": chunk }); if let Err(e) = self.send_interactive_card(receive_id, receive_id_type, &card.to_string()).await { tracing::warn!(error = %e, "Failed to send interactive card, falling back to text"); // Fallback to plain text let result = self.send_message_to_feishu(receive_id, receive_id_type, "text", content).await; self.remove_reaction_from_metadata(&msg.metadata).await; return result; } } self.remove_reaction_from_metadata(&msg.metadata).await; return Ok(()); } } } // Handle multimodal message - send with media let token = self.get_tenant_access_token().await?; // Build content with media references let mut content_parts = Vec::new(); // Add text content if present (truncate if too long for Feishu) if !msg.content.is_empty() { const MAX_TEXT_LENGTH: usize = 60_000; let truncated_text = if msg.content.len() > MAX_TEXT_LENGTH { format!("{}...\n\n[Content truncated due to length limit]", &msg.content[..MAX_TEXT_LENGTH]) } else { msg.content.clone() }; content_parts.push(serde_json::json!({ "tag": "text", "text": truncated_text })); } // Upload and add media for media_item in &msg.media { let path = &media_item.path; match media_item.media_type.as_str() { "image" => { match self.upload_image(path).await { Ok(image_key) => { content_parts.push(serde_json::json!({ "tag": "image", "image_key": image_key })); } Err(e) => { tracing::warn!(error = %e, path = %path, "Failed to upload image"); } } } "audio" | "file" | "video" => { match self.upload_file(path).await { Ok(file_key) => { content_parts.push(serde_json::json!({ "tag": "file", "file_key": file_key })); } Err(e) => { tracing::warn!(error = %e, path = %path, "Failed to upload file"); } } } _ => { tracing::warn!(media_type = %media_item.media_type, "Unsupported media type for sending"); } } } // If no content parts after processing, just send empty text if content_parts.is_empty() { let result = self.send_message_to_feishu(receive_id, receive_id_type, "text", "").await; // Remove pending reaction after sending (using metadata propagated from inbound) self.remove_reaction_from_metadata(&msg.metadata).await; return result; } // Determine message type let has_image = msg.media.iter().any(|m| m.media_type == "image"); let msg_type = if has_image && msg.content.is_empty() { "image" } else { "post" }; let content = serde_json::json!({ "content": content_parts }).to_string(); let resp = self.http_client .post(format!("{}/im/v1/messages?receive_id_type={}", FEISHU_API_BASE, receive_id_type)) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", token)) .json(&serde_json::json!({ "receive_id": receive_id, "msg_type": msg_type, "content": content })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Send multimodal message HTTP error: {}", e)))?; #[derive(Deserialize)] struct SendResp { code: i32, msg: String, } let send_resp: SendResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse send response error: {}", e)))?; if send_resp.code != 0 { return Err(ChannelError::Other(format!("Send multimodal message failed: code={} msg={}", send_resp.code, send_resp.msg))); } // Remove pending reaction after successfully sending self.remove_reaction_from_metadata(&msg.metadata).await; Ok(()) } }