use std::path::Path; use std::sync::Arc; use async_trait::async_trait; use tokio::sync::{broadcast, RwLock}; use serde::Deserialize; use futures_util::{SinkExt, StreamExt}; use prost::{Message as ProstMessage, bytes::Bytes}; use crate::bus::{MessageBus, MediaItem, OutboundMessage}; use crate::channels::base::{Channel, ChannelError}; use crate::config::{FeishuChannelConfig, LLMProviderConfig}; const FEISHU_API_BASE: &str = "https://open.feishu.cn/open-apis"; const FEISHU_WS_BASE: &str = "https://open.feishu.cn"; // ───────────────────────────────────────────────────────────────────────────── // Protobuf types for Feishu WebSocket protocol (pbbp2.proto) // ───────────────────────────────────────────────────────────────────────────── #[derive(Clone, PartialEq, prost::Message)] struct PbHeader { #[prost(string, tag = "1")] pub key: String, #[prost(string, tag = "2")] pub value: String, } /// Feishu WS frame. /// method=0 → CONTROL (ping/pong) method=1 → DATA (events) #[derive(Clone, PartialEq, prost::Message)] struct PbFrame { #[prost(uint64, tag = "1")] pub seq_id: u64, #[prost(uint64, tag = "2")] pub log_id: u64, #[prost(int32, tag = "3")] pub service: i32, #[prost(int32, tag = "4")] pub method: i32, #[prost(message, repeated, tag = "5")] pub headers: Vec, #[prost(bytes = "vec", optional, tag = "8")] pub payload: Option>, } /// POST /callback/ws/endpoint response #[derive(Deserialize)] struct WsEndpointResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct WsEndpoint { #[serde(rename = "URL")] url: String, #[serde(default)] client_config: Option, } #[derive(Deserialize, Default)] struct WsClientConfig { #[serde(rename = "PingInterval")] ping_interval: Option, } /// Lark event envelope (method=1 / type=event payload) #[derive(Deserialize)] struct LarkEvent { header: LarkEventHeader, event: serde_json::Value, } #[derive(Deserialize)] struct LarkEventHeader { event_type: String, #[allow(dead_code)] event_id: String, } impl std::fmt::Debug for LarkEventHeader { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("LarkEventHeader") .field("event_type", &self.event_type) .field("event_id", &self.event_id) .finish() } } #[derive(Deserialize)] struct MsgReceivePayload { sender: LarkSender, message: LarkMessage, } #[derive(Deserialize)] struct LarkSender { sender_id: LarkSenderId, #[serde(default)] sender_type: String, } #[derive(Deserialize, Default)] struct LarkSenderId { open_id: Option, } #[derive(Deserialize)] #[allow(dead_code)] struct LarkMessage { message_id: String, chat_id: String, chat_type: String, message_type: String, #[serde(default)] content: String, #[serde(default)] mentions: Vec, #[serde(default)] root_id: Option, #[serde(default)] parent_id: Option, } // ───────────────────────────────────────────────────────────────────────────── #[derive(Clone)] pub struct FeishuChannel { config: FeishuChannelConfig, http_client: reqwest::Client, running: Arc>, shutdown_tx: Arc>>>, connected: Arc>, } /// Parsed message data from a Feishu frame struct ParsedMessage { open_id: String, chat_id: String, content: String, media: Option, } impl FeishuChannel { pub fn new( config: FeishuChannelConfig, _provider_config: LLMProviderConfig, ) -> Result { Ok(Self { config, http_client: reqwest::Client::new(), running: Arc::new(RwLock::new(false)), shutdown_tx: Arc::new(RwLock::new(None)), connected: Arc::new(RwLock::new(false)), }) } /// Get WebSocket endpoint URL from Feishu API async fn get_ws_endpoint(&self, client: &reqwest::Client) -> Result<(String, WsClientConfig), ChannelError> { let resp = client .post(format!("{}/callback/ws/endpoint", FEISHU_WS_BASE)) .header("locale", "zh") .json(&serde_json::json!({ "AppID": self.config.app_id, "AppSecret": self.config.app_secret, })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("HTTP error: {}", e)))?; let endpoint_resp: WsEndpointResp = resp .json() .await .map_err(|e| ChannelError::ConnectionError(format!("Failed to parse endpoint response: {}", e)))?; if endpoint_resp.code != 0 { return Err(ChannelError::ConnectionError(format!( "WS endpoint failed: code={} msg={}", endpoint_resp.code, endpoint_resp.msg.as_deref().unwrap_or("unknown") ))); } let ep = endpoint_resp.data .ok_or_else(|| ChannelError::ConnectionError("Empty endpoint data".to_string()))?; let client_config = ep.client_config.unwrap_or_default(); Ok((ep.url, client_config)) } /// Get tenant access token async fn get_tenant_token(&self) -> Result { let resp = self.http_client .post(format!("{}/auth/v3/tenant_access_token/internal", FEISHU_API_BASE)) .header("Content-Type", "application/json") .json(&serde_json::json!({ "app_id": self.config.app_id, "app_secret": self.config.app_secret, })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("HTTP error: {}", e)))?; #[derive(Deserialize)] struct TokenResponse { code: i32, tenant_access_token: Option, } let token_resp: TokenResponse = resp .json() .await .map_err(|e| ChannelError::Other(format!("Failed to parse token response: {}", e)))?; if token_resp.code != 0 { return Err(ChannelError::Other("Auth failed".to_string())); } token_resp.tenant_access_token .ok_or_else(|| ChannelError::Other("No token in response".to_string())) } /// Download media and save locally, return (description, media_item) async fn download_media( &self, msg_type: &str, content_json: &serde_json::Value, message_id: &str, ) -> Result<(String, Option), ChannelError> { let media_dir = Path::new(&self.config.media_dir); tokio::fs::create_dir_all(media_dir).await .map_err(|e| ChannelError::Other(format!("Failed to create media dir: {}", e)))?; match msg_type { "image" => self.download_image(content_json, message_id, media_dir).await, "audio" | "file" | "media" => self.download_file(content_json, message_id, media_dir, msg_type).await, _ => Ok((format!("[unsupported media type: {}]", msg_type), None)), } } /// Download image from Feishu async fn download_image( &self, content_json: &serde_json::Value, message_id: &str, media_dir: &Path, ) -> Result<(String, Option), ChannelError> { let image_key = content_json.get("image_key") .and_then(|v| v.as_str()) .ok_or_else(|| ChannelError::Other("No image_key in message".to_string()))?; let token = self.get_tenant_token().await?; // Use message resource API for downloading message images let url = format!("{}/im/v1/messages/{}/resources/{}?type=image", FEISHU_API_BASE, message_id, image_key); #[cfg(debug_assertions)] tracing::debug!(url = %url, image_key = %image_key, message_id = %message_id, "Downloading image from Feishu via message resource API"); let resp = self.http_client .get(&url) .header("Authorization", format!("Bearer {}", token)) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Download image HTTP error: {}", e)))?; let status = resp.status(); #[cfg(debug_assertions)] tracing::debug!(status = %status, "Image download response status"); if !status.is_success() { let error_text = resp.text().await.unwrap_or_default(); return Err(ChannelError::Other(format!("Image download failed {}: {}", status, error_text))); } let data = resp.bytes().await .map_err(|e| ChannelError::Other(format!("Failed to read image data: {}", e)))? .to_vec(); #[cfg(debug_assertions)] tracing::debug!(data_len = %data.len(), "Downloaded image data"); let filename = format!("{}_{}.jpg", message_id, &image_key[..8.min(image_key.len())]); let file_path = media_dir.join(&filename); tokio::fs::write(&file_path, &data).await .map_err(|e| ChannelError::Other(format!("Failed to write image: {}", e)))?; let media_item = MediaItem::new( file_path.to_string_lossy().to_string(), "image", ); tracing::info!(message_id = %message_id, filename = %filename, "Downloaded image"); Ok((format!("[image: {}]", filename), Some(media_item))) } /// Download file/audio from Feishu async fn download_file( &self, content_json: &serde_json::Value, message_id: &str, media_dir: &Path, file_type: &str, ) -> Result<(String, Option), ChannelError> { let file_key = content_json.get("file_key") .and_then(|v| v.as_str()) .ok_or_else(|| ChannelError::Other("No file_key in message".to_string()))?; let token = self.get_tenant_token().await?; // Use message resource API for downloading message files let url = format!("{}/im/v1/messages/{}/resources/{}?type=file", FEISHU_API_BASE, message_id, file_key); #[cfg(debug_assertions)] tracing::debug!(url = %url, file_key = %file_key, message_id = %message_id, "Downloading file from Feishu via message resource API"); let resp = self.http_client .get(&url) .header("Authorization", format!("Bearer {}", token)) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Download file HTTP error: {}", e)))?; let status = resp.status(); if !status.is_success() { let error_text = resp.text().await.unwrap_or_default(); return Err(ChannelError::Other(format!("File download failed {}: {}", status, error_text))); } let data = resp.bytes().await .map_err(|e| ChannelError::Other(format!("Failed to read file data: {}", e)))? .to_vec(); let extension = match file_type { "audio" => "mp3", "video" => "mp4", _ => "bin", }; let filename = format!("{}_{}.{}", message_id, &file_key[..8.min(file_key.len())], extension); let file_path = media_dir.join(&filename); tokio::fs::write(&file_path, &data).await .map_err(|e| ChannelError::Other(format!("Failed to write file: {}", e)))?; let media_item = MediaItem::new( file_path.to_string_lossy().to_string(), file_type, ); tracing::info!(message_id = %message_id, filename = %filename, file_type = %file_type, "Downloaded file"); Ok((format!("[{}: {}]", file_type, filename), Some(media_item))) } /// Upload image to Feishu and return the image_key async fn upload_image(&self, file_path: &str) -> Result { let token = self.get_tenant_token().await?; let mime = mime_guess::from_path(file_path) .first_or_octet_stream() .to_string(); let file_name = std::path::Path::new(file_path) .file_name() .and_then(|n| n.to_str()) .unwrap_or("image.jpg"); let file_data = tokio::fs::read(file_path).await .map_err(|e| ChannelError::Other(format!("Failed to read file: {}", e)))?; let part = reqwest::multipart::Part::bytes(file_data) .file_name(file_name.to_string()) .mime_str(&mime) .map_err(|e| ChannelError::Other(format!("Invalid mime type: {}", e)))?; let form = reqwest::multipart::Form::new() .text("image_type", "message".to_string()) .part("image", part); let resp = self.http_client .post(format!("{}/im/v1/images/upload", FEISHU_API_BASE)) .header("Authorization", format!("Bearer {}", token)) .multipart(form) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Upload image HTTP error: {}", e)))?; #[derive(Deserialize)] struct UploadResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct UploadData { image_key: String, } let result: UploadResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse upload response error: {}", e)))?; if result.code != 0 { return Err(ChannelError::Other(format!( "Upload image failed: code={} msg={}", result.code, result.msg.as_deref().unwrap_or("unknown") ))); } result.data .map(|d| d.image_key) .ok_or_else(|| ChannelError::Other("No image_key in response".to_string())) } /// Upload file to Feishu and return the file_key async fn upload_file(&self, file_path: &str) -> Result { let token = self.get_tenant_token().await?; let file_name = std::path::Path::new(file_path) .file_name() .and_then(|n| n.to_str()) .unwrap_or("file.bin"); let extension = std::path::Path::new(file_path) .extension() .and_then(|e| e.to_str()) .unwrap_or("") .to_lowercase(); let file_type = match extension.as_str() { "mp3" | "m4a" | "wav" | "ogg" => "audio", "mp4" | "mov" | "avi" | "mkv" => "video", "pdf" | "doc" | "docx" | "xls" | "xlsx" => "doc", _ => "file", }; let file_data = tokio::fs::read(file_path).await .map_err(|e| ChannelError::Other(format!("Failed to read file: {}", e)))?; let part = reqwest::multipart::Part::bytes(file_data) .file_name(file_name.to_string()) .mime_str("application/octet-stream") .map_err(|e| ChannelError::Other(format!("Invalid mime type: {}", e)))?; let form = reqwest::multipart::Form::new() .text("file_type", file_type.to_string()) .text("file_name", file_name.to_string()) .part("file", part); let resp = self.http_client .post(format!("{}/im/v1/files", FEISHU_API_BASE)) .header("Authorization", format!("Bearer {}", token)) .multipart(form) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Upload file HTTP error: {}", e)))?; #[derive(Deserialize)] struct UploadResp { code: i32, msg: Option, data: Option, } #[derive(Deserialize)] struct UploadData { file_key: String, } let result: UploadResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse upload response error: {}", e)))?; if result.code != 0 { return Err(ChannelError::Other(format!( "Upload file failed: code={} msg={}", result.code, result.msg.as_deref().unwrap_or("unknown") ))); } result.data .map(|d| d.file_key) .ok_or_else(|| ChannelError::Other("No file_key in response".to_string())) } /// Send a text message to Feishu chat (implements Channel trait) async fn send_message_to_feishu(&self, receive_id: &str, receive_id_type: &str, content: &str) -> Result<(), ChannelError> { let token = self.get_tenant_token().await?; let text_content = serde_json::json!({ "text": content }).to_string(); let resp = self.http_client .post(format!("{}/im/v1/messages?receive_id_type={}", FEISHU_API_BASE, receive_id_type)) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", token)) .json(&serde_json::json!({ "receive_id": receive_id, "msg_type": "text", "content": text_content })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Send message HTTP error: {}", e)))?; #[derive(Deserialize)] struct SendResp { code: i32, msg: String, } let send_resp: SendResp = resp .json() .await .map_err(|e| ChannelError::Other(format!("Parse send response error: {}", e)))?; if send_resp.code != 0 { return Err(ChannelError::Other(format!("Send message failed: code={} msg={}", send_resp.code, send_resp.msg))); } Ok(()) } /// Extract service_id from WebSocket URL query params fn extract_service_id(url: &str) -> i32 { url.split('?') .nth(1) .and_then(|qs| { qs.split('&') .find(|kv| kv.starts_with("service_id=")) .and_then(|kv| kv.split('=').nth(1)) .and_then(|v| v.parse::().ok()) }) .unwrap_or(0) } /// Handle incoming binary PbFrame - returns Some(ParsedMessage) if we need to ack async fn handle_frame(&self, frame: &PbFrame) -> Result, ChannelError> { // method 0 = CONTROL (ping/pong) if frame.method == 0 { return Ok(None); } // method 1 = DATA (events) if frame.method != 1 { return Ok(None); } let payload = frame.payload.as_deref() .ok_or_else(|| ChannelError::Other("No payload in frame".to_string()))?; #[cfg(debug_assertions)] tracing::debug!(payload_len = %payload.len(), "Received frame payload"); let event: LarkEvent = serde_json::from_slice(payload) .map_err(|e| ChannelError::Other(format!("Parse event error: {}", e)))?; let event_type = event.header.event_type.as_str(); #[cfg(debug_assertions)] tracing::debug!(event_type = %event_type, "Received event type"); if event_type != "im.message.receive_v1" { return Ok(None); } let payload_data: MsgReceivePayload = serde_json::from_value(event.event.clone()) .map_err(|e| ChannelError::Other(format!("Parse payload error: {}", e)))?; // Skip bot messages if payload_data.sender.sender_type == "bot" { return Ok(None); } let message_id = payload_data.message.message_id.clone(); #[cfg(debug_assertions)] tracing::debug!(message_id = %message_id, "Received Feishu message"); let open_id = payload_data.sender.sender_id.open_id .ok_or_else(|| ChannelError::Other("No open_id".to_string()))?; let msg = payload_data.message; let chat_id = msg.chat_id.clone(); let msg_type = msg.message_type.as_str(); let raw_content = msg.content.clone(); #[cfg(debug_assertions)] tracing::debug!(msg_type = %msg_type, chat_id = %chat_id, open_id = %open_id, "Parsing message content"); let (content, media) = self.parse_and_download_message(msg_type, &raw_content, &message_id).await?; #[cfg(debug_assertions)] if let Some(ref m) = media { tracing::debug!(media_type = %m.media_type, media_path = %m.path, "Media downloaded successfully"); } Ok(Some(ParsedMessage { open_id, chat_id, content, media, })) } /// Parse message content and download media if needed async fn parse_and_download_message( &self, msg_type: &str, content: &str, message_id: &str, ) -> Result<(String, Option), ChannelError> { match msg_type { "text" => { let text = if let Ok(parsed) = serde_json::from_str::(content) { parsed.get("text").and_then(|v| v.as_str()).unwrap_or(content).to_string() } else { content.to_string() }; Ok((text, None)) } "post" => { let text = parse_post_content(content); Ok((text, None)) } "image" | "audio" | "file" | "media" => { if let Ok(content_json) = serde_json::from_str::(content) { self.download_media(msg_type, &content_json, message_id).await } else { Ok((format!("[{}: content unavailable]", msg_type), None)) } } _ => Ok((content.to_string(), None)), } } /// Send acknowledgment for a message async fn send_ack(frame: &PbFrame, write: &mut futures_util::stream::SplitSink>, tokio_tungstenite::tungstenite::Message>) -> Result<(), ChannelError> { let mut ack = frame.clone(); ack.payload = Some(br#"{"code":200,"headers":{},"data":[]}"#.to_vec()); ack.headers.push(PbHeader { key: "biz_rt".into(), value: "0".into(), }); write.send(tokio_tungstenite::tungstenite::Message::Binary(ack.encode_to_vec().into())) .await .map_err(|e| ChannelError::Other(format!("Failed to send ack: {}", e)))?; Ok(()) } async fn run_ws_loop(&self, bus: Arc, mut shutdown_rx: broadcast::Receiver<()>) -> Result<(), ChannelError> { let (wss_url, client_config) = self.get_ws_endpoint(&self.http_client).await?; let service_id = Self::extract_service_id(&wss_url); tracing::info!(url = %wss_url, "Connecting to Feishu WebSocket"); let (ws_stream, _) = tokio_tungstenite::connect_async(&wss_url) .await .map_err(|e| ChannelError::ConnectionError(format!("WebSocket connection failed: {}", e)))?; *self.connected.write().await = true; tracing::info!("Feishu WebSocket connected"); let (mut write, mut read) = ws_stream.split(); // Send initial ping let ping_frame = PbFrame { seq_id: 1, log_id: 0, service: service_id, method: 0, headers: vec![PbHeader { key: "type".into(), value: "ping".into(), }], payload: None, }; write.send(tokio_tungstenite::tungstenite::Message::Binary(ping_frame.encode_to_vec().into())) .await .map_err(|e| ChannelError::ConnectionError(format!("Failed to send initial ping: {}", e)))?; let ping_interval = client_config.ping_interval.unwrap_or(120).max(10); let mut ping_interval_tok = tokio::time::interval(tokio::time::Duration::from_secs(ping_interval)); let mut seq: u64 = 1; // Consume the immediate tick ping_interval_tok.tick().await; loop { tokio::select! { msg = read.next() => { match msg { Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(data))) => { let bytes: Bytes = data; if let Ok(frame) = PbFrame::decode(bytes.as_ref()) { match self.handle_frame(&frame).await { Ok(Some(parsed)) => { // Send ACK immediately (Feishu requires within 3 s) if let Err(e) = Self::send_ack(&frame, &mut write).await { tracing::error!(error = %e, "Failed to send ACK to Feishu"); } // Publish to bus asynchronously let channel = self.clone(); let bus = bus.clone(); tokio::spawn(async move { let media_count = if parsed.media.is_some() { 1 } else { 0 }; #[cfg(debug_assertions)] tracing::debug!(open_id = %parsed.open_id, chat_id = %parsed.chat_id, content_len = %parsed.content.len(), media_count = %media_count, "Publishing message to bus"); let msg = crate::bus::InboundMessage { channel: "feishu".to_string(), sender_id: parsed.open_id.clone(), chat_id: parsed.chat_id.clone(), content: parsed.content.clone(), timestamp: std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_millis() as i64, media: parsed.media.map(|m| vec![m]).unwrap_or_default(), metadata: std::collections::HashMap::new(), }; if let Err(e) = channel.handle_and_publish(&bus, &msg).await { tracing::error!(error = %e, open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Failed to publish Feishu message to bus"); } else { #[cfg(debug_assertions)] tracing::debug!(open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Message published to bus successfully"); } }); } Ok(None) => {} Err(e) => { tracing::warn!(error = %e, "Failed to parse Feishu frame"); } } } } Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(data))) => { let pong = PbFrame { seq_id: seq.wrapping_add(1), log_id: 0, service: service_id, method: 0, headers: vec![PbHeader { key: "type".into(), value: "pong".into(), }], payload: Some(data.to_vec()), }; let _ = write.send(tokio_tungstenite::tungstenite::Message::Binary(pong.encode_to_vec().into())).await; } Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => { #[cfg(debug_assertions)] tracing::debug!("Feishu WebSocket closed"); break; } Some(Err(e)) => { tracing::warn!(error = %e, "Feishu WebSocket error"); break; } _ => {} } } _ = ping_interval_tok.tick() => { seq = seq.wrapping_add(1); let ping = PbFrame { seq_id: seq, log_id: 0, service: service_id, method: 0, headers: vec![PbHeader { key: "type".into(), value: "ping".into(), }], payload: None, }; if write.send(tokio_tungstenite::tungstenite::Message::Binary(ping.encode_to_vec().into())).await.is_err() { tracing::warn!("Feishu ping failed, reconnecting"); break; } } _ = shutdown_rx.recv() => { tracing::info!("Feishu channel shutdown signal received"); break; } } } *self.connected.write().await = false; Ok(()) } } fn parse_post_content(content: &str) -> String { if let Ok(parsed) = serde_json::from_str::(content) { let mut texts = vec![]; if let Some(post) = parsed.get("post") { if let Some(content_arr) = post.get("content") { if let Some(arr) = content_arr.as_array() { for item in arr { if let Some(arr2) = item.as_array() { for inner in arr2 { if let Some(text) = inner.get("text").and_then(|v| v.as_str()) { texts.push(text.to_string()); } } } } } } } if texts.is_empty() { content.to_string() } else { texts.join("") } } else { content.to_string() } } #[async_trait] impl Channel for FeishuChannel { fn name(&self) -> &str { "feishu" } async fn start(&self, bus: Arc) -> Result<(), ChannelError> { if self.config.app_id.is_empty() || self.config.app_secret.is_empty() { return Err(ChannelError::ConfigError( "Feishu app_id or app_secret is not configured".to_string() )); } *self.running.write().await = true; let (shutdown_tx, _) = broadcast::channel(1); *self.shutdown_tx.write().await = Some(shutdown_tx.clone()); let channel = self.clone(); let bus = bus.clone(); tokio::spawn(async move { let mut consecutive_failures = 0; let max_failures = 3; loop { if !*channel.running.read().await { break; } let shutdown_rx = shutdown_tx.subscribe(); match channel.run_ws_loop(bus.clone(), shutdown_rx).await { Ok(_) => { tracing::info!("Feishu WebSocket disconnected"); } Err(e) => { consecutive_failures += 1; tracing::error!(attempt = consecutive_failures, error = %e, "Feishu WebSocket error"); if consecutive_failures >= max_failures { tracing::error!("Feishu channel: max failures reached, stopping"); break; } } } if !*channel.running.read().await { break; } tracing::info!("Feishu channel retrying in 5s..."); tokio::time::sleep(tokio::time::Duration::from_secs(5)).await; } *channel.running.write().await = false; tracing::info!("Feishu channel stopped"); }); tracing::info!("Feishu channel started"); Ok(()) } async fn stop(&self) -> Result<(), ChannelError> { *self.running.write().await = false; if let Some(tx) = self.shutdown_tx.write().await.take() { let _ = tx.send(()); } Ok(()) } fn is_running(&self) -> bool { self.running.try_read().map(|r| *r).unwrap_or(false) } async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError> { let receive_id = if msg.chat_id.starts_with("oc_") { &msg.chat_id } else { &msg.reply_to.as_ref().unwrap_or(&msg.chat_id) }; let receive_id_type = if msg.chat_id.starts_with("oc_") { "chat_id" } else { "open_id" }; // If no media, send text only if msg.media.is_empty() { return self.send_message_to_feishu(receive_id, receive_id_type, &msg.content).await; } // Handle multimodal message - send with media let token = self.get_tenant_token().await?; // Build content with media references let mut content_parts = Vec::new(); // Add text content if present if !msg.content.is_empty() { content_parts.push(serde_json::json!({ "tag": "text", "text": msg.content })); } // Upload and add media for media_item in &msg.media { let path = &media_item.path; match media_item.media_type.as_str() { "image" => { match self.upload_image(path).await { Ok(image_key) => { content_parts.push(serde_json::json!({ "tag": "image", "image_key": image_key })); } Err(e) => { tracing::warn!(error = %e, path = %path, "Failed to upload image"); } } } "audio" | "file" | "video" => { match self.upload_file(path).await { Ok(file_key) => { content_parts.push(serde_json::json!({ "tag": "file", "file_key": file_key })); } Err(e) => { tracing::warn!(error = %e, path = %path, "Failed to upload file"); } } } _ => { tracing::warn!(media_type = %media_item.media_type, "Unsupported media type for sending"); } } } // If no content parts after processing, just send empty text if content_parts.is_empty() { return self.send_message_to_feishu(receive_id, receive_id_type, "").await; } // Determine message type let has_image = msg.media.iter().any(|m| m.media_type == "image"); let msg_type = if has_image && msg.content.is_empty() { "image" } else { "post" }; let content = serde_json::json!({ "content": content_parts }).to_string(); let resp = self.http_client .post(format!("{}/im/v1/messages?receive_id_type={}", FEISHU_API_BASE, receive_id_type)) .header("Content-Type", "application/json") .header("Authorization", format!("Bearer {}", token)) .json(&serde_json::json!({ "receive_id": receive_id, "msg_type": msg_type, "content": content })) .send() .await .map_err(|e| ChannelError::ConnectionError(format!("Send multimodal message HTTP error: {}", e)))?; #[derive(Deserialize)] struct SendResp { code: i32, msg: String, } let send_resp: SendResp = resp.json().await .map_err(|e| ChannelError::Other(format!("Parse send response error: {}", e)))?; if send_resp.code != 0 { return Err(ChannelError::Other(format!("Send multimodal message failed: code={} msg={}", send_resp.code, send_resp.msg))); } Ok(()) } }