PicoBot/src/channels/feishu.rs

2547 lines
90 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures_util::{SinkExt, StreamExt};
use prost::{Message as ProstMessage, bytes::Bytes};
use regex::Regex;
use serde::Deserialize;
use tokio::sync::{RwLock, broadcast};
use crate::bus::{MediaItem, MessageBus, OutboundMessage};
use crate::bus::message::OutboundEventKind;
use crate::channels::base::{Channel, ChannelError};
use crate::config::{FeishuChannelConfig, LLMProviderConfig};
use crate::text::{char_count, truncate_with_ellipsis};
const FEISHU_API_BASE: &str = "https://open.feishu.cn/open-apis";
const FEISHU_WS_BASE: &str = "https://open.feishu.cn";
/// Heartbeat timeout for WS connection — must be larger than ping_interval (default 120 s).
/// If no binary frame (pong or event) is received within this window, reconnect.
const WS_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(300);
/// Refresh tenant token this many seconds before the announced expiry.
const TOKEN_REFRESH_SKEW: Duration = Duration::from_secs(120);
/// Default tenant token TTL when `expire`/`expires_in` is absent.
const DEFAULT_TOKEN_TTL: Duration = Duration::from_secs(7200);
/// Dedup cache TTL (30 minutes).
const DEDUP_CACHE_TTL: Duration = Duration::from_secs(30 * 60);
// ─────────────────────────────────────────────────────────────────────────────
// Protobuf types for Feishu WebSocket protocol (pbbp2.proto)
// ─────────────────────────────────────────────────────────────────────────────
#[derive(Clone, PartialEq, prost::Message)]
struct PbHeader {
#[prost(string, tag = "1")]
pub key: String,
#[prost(string, tag = "2")]
pub value: String,
}
/// Feishu WS frame.
/// method=0 → CONTROL (ping/pong) method=1 → DATA (events)
#[derive(Clone, PartialEq, prost::Message)]
struct PbFrame {
#[prost(uint64, tag = "1")]
pub seq_id: u64,
#[prost(uint64, tag = "2")]
pub log_id: u64,
#[prost(int32, tag = "3")]
pub service: i32,
#[prost(int32, tag = "4")]
pub method: i32,
#[prost(message, repeated, tag = "5")]
pub headers: Vec<PbHeader>,
#[prost(bytes = "vec", optional, tag = "8")]
pub payload: Option<Vec<u8>>,
}
/// POST /callback/ws/endpoint response
#[derive(Deserialize)]
struct WsEndpointResp {
code: i32,
msg: Option<String>,
data: Option<WsEndpoint>,
}
#[derive(Deserialize)]
struct WsEndpoint {
#[serde(rename = "URL")]
url: String,
#[serde(default)]
client_config: Option<WsClientConfig>,
}
#[derive(Deserialize, Default)]
struct WsClientConfig {
#[serde(rename = "PingInterval")]
ping_interval: Option<u64>,
}
/// Lark event envelope (method=1 / type=event payload)
#[derive(Deserialize)]
struct LarkEvent {
header: LarkEventHeader,
event: serde_json::Value,
}
#[derive(Deserialize)]
struct LarkEventHeader {
event_type: String,
#[allow(dead_code)]
event_id: String,
}
impl std::fmt::Debug for LarkEventHeader {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("LarkEventHeader")
.field("event_type", &self.event_type)
.field("event_id", &self.event_id)
.finish()
}
}
#[derive(Deserialize)]
struct MsgReceivePayload {
sender: LarkSender,
message: LarkMessage,
}
#[derive(Deserialize)]
struct LarkSender {
sender_id: LarkSenderId,
#[serde(default)]
sender_type: String,
}
#[derive(Deserialize, Default)]
struct LarkSenderId {
open_id: Option<String>,
}
#[derive(Deserialize)]
#[allow(dead_code)]
struct LarkMessage {
message_id: String,
chat_id: String,
chat_type: String,
message_type: String,
#[serde(default)]
content: String,
#[serde(default)]
mentions: Vec<serde_json::Value>,
#[serde(default)]
root_id: Option<String>,
#[serde(default)]
parent_id: Option<String>,
}
// ─────────────────────────────────────────────────────────────────────────────
/// Cached tenant token with proactive refresh metadata.
#[derive(Clone)]
struct CachedTenantToken {
value: String,
refresh_after: Instant,
}
#[derive(Clone)]
pub struct FeishuChannel {
name: String,
config: FeishuChannelConfig,
http_client: reqwest::Client,
running: Arc<RwLock<bool>>,
shutdown_tx: Arc<RwLock<Option<broadcast::Sender<()>>>>,
connected: Arc<RwLock<bool>>,
/// Cached tenant access token with proactive refresh.
tenant_token: Arc<RwLock<Option<CachedTenantToken>>>,
/// Dedup cache: WS message_ids seen in the last ~30 min.
seen_message_ids: Arc<RwLock<HashMap<String, Instant>>>,
}
/// Parsed message data from a Feishu frame
struct ParsedMessage {
message_id: String,
open_id: String,
chat_id: String,
content: String,
media: Option<MediaItem>,
/// ID of the message this message is replying to (if any).
/// Used to fetch quoted message content for display.
parent_id: Option<String>,
}
impl FeishuChannel {
pub fn new(
name: String,
config: FeishuChannelConfig,
_provider_config: LLMProviderConfig,
) -> Result<Self, ChannelError> {
Ok(Self {
name,
config,
http_client: reqwest::Client::new(),
running: Arc::new(RwLock::new(false)),
shutdown_tx: Arc::new(RwLock::new(None)),
connected: Arc::new(RwLock::new(false)),
tenant_token: Arc::new(RwLock::new(None)),
seen_message_ids: Arc::new(RwLock::new(HashMap::new())),
})
}
/// Get WebSocket endpoint URL from Feishu API
async fn get_ws_endpoint(
&self,
client: &reqwest::Client,
) -> Result<(String, WsClientConfig), ChannelError> {
let resp = client
.post(format!("{}/callback/ws/endpoint", FEISHU_WS_BASE))
.header("locale", "zh")
.json(&serde_json::json!({
"AppID": self.config.app_id,
"AppSecret": self.config.app_secret,
}))
.send()
.await
.map_err(|e| ChannelError::ConnectionError(format!("HTTP error: {}", e)))?;
let endpoint_resp: WsEndpointResp = resp.json().await.map_err(|e| {
ChannelError::ConnectionError(format!("Failed to parse endpoint response: {}", e))
})?;
if endpoint_resp.code != 0 {
return Err(ChannelError::ConnectionError(format!(
"WS endpoint failed: code={} msg={}",
endpoint_resp.code,
endpoint_resp.msg.as_deref().unwrap_or("unknown")
)));
}
let ep = endpoint_resp
.data
.ok_or_else(|| ChannelError::ConnectionError("Empty endpoint data".to_string()))?;
let client_config = ep.client_config.unwrap_or_default();
Ok((ep.url, client_config))
}
/// Get tenant access token (cached with proactive refresh).
async fn get_tenant_access_token(&self) -> Result<String, ChannelError> {
// 1. Check cache
{
let cached = self.tenant_token.read().await;
if let Some(ref token) = *cached {
if Instant::now() < token.refresh_after {
return Ok(token.value.clone());
}
}
}
// 2. Fetch new token
let (token, ttl) = self.fetch_new_token().await?;
// 3. Cache with proactive refresh time (提前 120 秒)
let refresh_after = Instant::now() + ttl.saturating_sub(TOKEN_REFRESH_SKEW);
{
let mut cached = self.tenant_token.write().await;
*cached = Some(CachedTenantToken {
value: token.clone(),
refresh_after,
});
}
Ok(token)
}
/// Fetch a new tenant access token from Feishu.
async fn fetch_new_token(&self) -> Result<(String, Duration), ChannelError> {
let resp = self
.http_client
.post(format!(
"{}/auth/v3/tenant_access_token/internal",
FEISHU_API_BASE
))
.header("Content-Type", "application/json")
.json(&serde_json::json!({
"app_id": self.config.app_id,
"app_secret": self.config.app_secret,
}))
.send()
.await
.map_err(|e| ChannelError::ConnectionError(format!("HTTP error: {}", e)))?;
#[derive(Deserialize)]
struct TokenResponse {
code: i32,
tenant_access_token: Option<String>,
expire: Option<i64>,
}
let token_resp: TokenResponse = resp
.json()
.await
.map_err(|e| ChannelError::Other(format!("Failed to parse token response: {}", e)))?;
if token_resp.code != 0 {
return Err(ChannelError::Other("Auth failed".to_string()));
}
let token = token_resp
.tenant_access_token
.ok_or_else(|| ChannelError::Other("No token in response".to_string()))?;
let ttl = token_resp
.expire
.and_then(|v| u64::try_from(v).ok())
.map(Duration::from_secs)
.unwrap_or(DEFAULT_TOKEN_TTL);
Ok((token, ttl))
}
/// Check if message_id has been seen (dedup), and mark it as seen if not.
/// Returns true if the message was already processed.
/// Note: GC of stale entries is handled in the heartbeat timeout_check loop.
async fn is_message_seen(&self, message_id: &str) -> bool {
let mut seen = self.seen_message_ids.write().await;
let now = Instant::now();
if seen.contains_key(message_id) {
true
} else {
seen.insert(message_id.to_string(), now);
false
}
}
/// Download media and save locally, return (description, media_item)
async fn download_media(
&self,
msg_type: &str,
content_json: &serde_json::Value,
message_id: &str,
) -> Result<(String, Option<MediaItem>), ChannelError> {
let media_dir = Path::new(&self.config.media_dir);
tokio::fs::create_dir_all(media_dir)
.await
.map_err(|e| ChannelError::Other(format!("Failed to create media dir: {}", e)))?;
match msg_type {
"image" => {
self.download_image(content_json, message_id, media_dir)
.await
}
"audio" | "file" | "media" => {
self.download_file(content_json, message_id, media_dir, msg_type)
.await
}
_ => Ok((format!("[unsupported media type: {}]", msg_type), None)),
}
}
/// Download image from Feishu
async fn download_image(
&self,
content_json: &serde_json::Value,
message_id: &str,
media_dir: &Path,
) -> Result<(String, Option<MediaItem>), ChannelError> {
let image_key = content_json
.get("image_key")
.and_then(|v| v.as_str())
.ok_or_else(|| ChannelError::Other("No image_key in message".to_string()))?;
let token = self.get_tenant_access_token().await?;
// Use message resource API for downloading message images
let url = format!(
"{}/im/v1/messages/{}/resources/{}?type=image",
FEISHU_API_BASE, message_id, image_key
);
#[cfg(debug_assertions)]
tracing::debug!(url = %url, image_key = %image_key, message_id = %message_id, "Downloading image from Feishu via message resource API");
let resp = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", token))
.send()
.await
.map_err(|e| {
ChannelError::ConnectionError(format!("Download image HTTP error: {}", e))
})?;
let status = resp.status();
#[cfg(debug_assertions)]
tracing::debug!(status = %status, "Image download response status");
if !status.is_success() {
let error_text = resp.text().await.unwrap_or_default();
return Err(ChannelError::Other(format!(
"Image download failed {}: {}",
status, error_text
)));
}
let data = resp
.bytes()
.await
.map_err(|e| ChannelError::Other(format!("Failed to read image data: {}", e)))?
.to_vec();
#[cfg(debug_assertions)]
tracing::debug!(data_len = %data.len(), "Downloaded image data");
let filename = format!(
"{}_{}.jpg",
message_id,
&image_key[..8.min(image_key.len())]
);
let file_path = media_dir.join(&filename);
tokio::fs::write(&file_path, &data)
.await
.map_err(|e| ChannelError::Other(format!("Failed to write image: {}", e)))?;
let media_item = MediaItem::new(file_path.to_string_lossy().to_string(), "image");
tracing::info!(message_id = %message_id, filename = %filename, "Downloaded image");
Ok((format!("[image: {}]", filename), Some(media_item)))
}
/// Download file/audio from Feishu
async fn download_file(
&self,
content_json: &serde_json::Value,
message_id: &str,
media_dir: &Path,
file_type: &str,
) -> Result<(String, Option<MediaItem>), ChannelError> {
let file_key = content_json
.get("file_key")
.and_then(|v| v.as_str())
.ok_or_else(|| ChannelError::Other("No file_key in message".to_string()))?;
let token = self.get_tenant_access_token().await?;
// Use message resource API for downloading message files
let url = format!(
"{}/im/v1/messages/{}/resources/{}?type=file",
FEISHU_API_BASE, message_id, file_key
);
#[cfg(debug_assertions)]
tracing::debug!(url = %url, file_key = %file_key, message_id = %message_id, "Downloading file from Feishu via message resource API");
let resp = self
.http_client
.get(&url)
.header("Authorization", format!("Bearer {}", token))
.send()
.await
.map_err(|e| {
ChannelError::ConnectionError(format!("Download file HTTP error: {}", e))
})?;
let status = resp.status();
if !status.is_success() {
let error_text = resp.text().await.unwrap_or_default();
return Err(ChannelError::Other(format!(
"File download failed {}: {}",
status, error_text
)));
}
let response_headers = resp.headers().clone();
let data = resp
.bytes()
.await
.map_err(|e| ChannelError::Other(format!("Failed to read file data: {}", e)))?
.to_vec();
let filename = infer_download_filename(
content_json,
&response_headers,
message_id,
file_key,
file_type,
);
let file_path = media_dir.join(&filename);
tokio::fs::write(&file_path, &data)
.await
.map_err(|e| ChannelError::Other(format!("Failed to write file: {}", e)))?;
let media_item = MediaItem::new(file_path.to_string_lossy().to_string(), file_type);
tracing::info!(message_id = %message_id, filename = %filename, file_type = %file_type, "Downloaded file");
Ok((format!("[{}: {}]", file_type, filename), Some(media_item)))
}
fn fallback_download_filename(message_id: &str, file_key: &str, file_type: &str) -> String {
let extension = match file_type {
"audio" => "mp3",
"video" => "mp4",
_ => "bin",
};
format!(
"{}_{}.{}",
message_id,
&file_key[..8.min(file_key.len())],
extension
)
}
/// Upload image to Feishu and return the image_key
async fn upload_image(&self, file_path: &str) -> Result<String, ChannelError> {
let token = self.get_tenant_access_token().await?;
let mime = mime_guess::from_path(file_path)
.first_or_octet_stream()
.to_string();
let file_name = std::path::Path::new(file_path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("image.jpg");
let file_data = tokio::fs::read(file_path)
.await
.map_err(|e| ChannelError::Other(format!("Failed to read file: {}", e)))?;
let part = reqwest::multipart::Part::bytes(file_data)
.file_name(file_name.to_string())
.mime_str(&mime)
.map_err(|e| ChannelError::Other(format!("Invalid mime type: {}", e)))?;
let form = reqwest::multipart::Form::new()
.text("image_type", "message".to_string())
.part("image", part);
let resp = self
.http_client
.post(format!("{}/im/v1/images", FEISHU_API_BASE))
.header("Authorization", format!("Bearer {}", token))
.multipart(form)
.send()
.await
.map_err(|e| {
ChannelError::ConnectionError(format!("Upload image HTTP error: {}", e))
})?;
#[derive(Deserialize)]
struct UploadResp {
code: i32,
msg: Option<String>,
data: Option<UploadData>,
}
#[derive(Deserialize)]
struct UploadData {
image_key: String,
}
let status = resp.status();
let body = resp.text().await.map_err(|e| {
ChannelError::Other(format!("Read upload image response error: {}", e))
})?;
let result: UploadResp = serde_json::from_str(&body).map_err(|e| {
ChannelError::Other(format!(
"Parse upload image response error: {} (status={}, body={})",
e,
status,
truncate_with_ellipsis(&body, 500)
))
})?;
if result.code != 0 {
return Err(ChannelError::Other(format!(
"Upload image failed: code={} msg={}",
result.code,
result.msg.as_deref().unwrap_or("unknown")
)));
}
result
.data
.map(|d| d.image_key)
.ok_or_else(|| ChannelError::Other("No image_key in response".to_string()))
}
/// Upload file to Feishu and return the file_key
async fn upload_file(&self, file_path: &str) -> Result<String, ChannelError> {
let token = self.get_tenant_access_token().await?;
let file_name = std::path::Path::new(file_path)
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("file.bin");
let extension = std::path::Path::new(file_path)
.extension()
.and_then(|e| e.to_str())
.unwrap_or("")
.to_lowercase();
let file_type = match extension.as_str() {
"mp3" | "m4a" | "wav" | "ogg" | "opus" => "opus",
"mp4" | "mov" | "avi" | "mkv" => "video",
"pdf" | "doc" | "docx" | "xls" | "xlsx" => "doc",
_ => "stream",
};
let file_data = tokio::fs::read(file_path)
.await
.map_err(|e| ChannelError::Other(format!("Failed to read file: {}", e)))?;
let part = reqwest::multipart::Part::bytes(file_data)
.file_name(file_name.to_string())
.mime_str("application/octet-stream")
.map_err(|e| ChannelError::Other(format!("Invalid mime type: {}", e)))?;
let form = reqwest::multipart::Form::new()
.text("file_type", file_type.to_string())
.text("file_name", file_name.to_string())
.part("file", part);
let resp = self
.http_client
.post(format!("{}/im/v1/files", FEISHU_API_BASE))
.header("Authorization", format!("Bearer {}", token))
.multipart(form)
.send()
.await
.map_err(|e| ChannelError::ConnectionError(format!("Upload file HTTP error: {}", e)))?;
#[derive(Deserialize)]
struct UploadResp {
code: i32,
msg: Option<String>,
data: Option<UploadData>,
}
#[derive(Deserialize)]
struct UploadData {
file_key: String,
}
let status = resp.status();
let body = resp.text().await.map_err(|e| {
ChannelError::Other(format!("Read upload file response error: {}", e))
})?;
let result: UploadResp = serde_json::from_str(&body).map_err(|e| {
ChannelError::Other(format!(
"Parse upload file response error: {} (status={}, body={})",
e,
status,
truncate_with_ellipsis(&body, 500)
))
})?;
if result.code != 0 {
return Err(ChannelError::Other(format!(
"Upload file failed: code={} msg={}",
result.code,
result.msg.as_deref().unwrap_or("unknown")
)));
}
result
.data
.map(|d| d.file_key)
.ok_or_else(|| ChannelError::Other("No file_key in response".to_string()))
}
/// Add a reaction emoji to a message and store the reaction_id for later removal.
/// Returns the reaction_id if successful, None otherwise.
async fn add_reaction(&self, message_id: &str) -> Result<Option<String>, ChannelError> {
let emoji = self.config.reaction_emoji.as_str();
let token = self.get_tenant_access_token().await?;
let resp = self
.http_client
.post(format!(
"{}/im/v1/messages/{}/reactions",
FEISHU_API_BASE, message_id
))
.header("Authorization", format!("Bearer {}", token))
.json(&serde_json::json!({
"reaction_type": { "emoji_type": emoji }
}))
.send()
.await
.map_err(|e| {
ChannelError::ConnectionError(format!("Add reaction HTTP error: {}", e))
})?;
#[derive(Deserialize)]
struct ReactionResp {
code: i32,
msg: Option<String>,
data: Option<ReactionData>,
}
#[derive(Deserialize)]
struct ReactionData {
reaction_id: Option<String>,
}
let result: ReactionResp = resp
.json()
.await
.map_err(|e| ChannelError::Other(format!("Parse reaction response error: {}", e)))?;
if result.code != 0 {
tracing::warn!(
"Failed to add reaction to message {}: code={} msg={}",
message_id,
result.code,
result.msg.as_deref().unwrap_or("unknown")
);
return Ok(None);
}
let reaction_id = result.data.and_then(|d| d.reaction_id);
Ok(reaction_id)
}
/// Remove reaction using feishu metadata propagated through OutboundMessage.
/// Reads feishu.message_id and feishu.reaction_id from metadata.
async fn remove_reaction_from_metadata(
&self,
metadata: &std::collections::HashMap<String, String>,
) {
let (message_id, reaction_id) = match (
metadata.get("feishu.message_id"),
metadata.get("feishu.reaction_id"),
) {
(Some(msg_id), Some(rid)) => (msg_id.clone(), rid.clone()),
_ => return,
};
if let Err(e) = self.remove_reaction(&message_id, &reaction_id).await {
tracing::debug!(error = %e, message_id = %message_id, "Failed to remove reaction");
}
}
/// Remove a reaction emoji from a message.
async fn remove_reaction(
&self,
message_id: &str,
reaction_id: &str,
) -> Result<(), ChannelError> {
let token = self.get_tenant_access_token().await?;
let resp = self
.http_client
.delete(format!(
"{}/im/v1/messages/{}/reactions/{}",
FEISHU_API_BASE, message_id, reaction_id
))
.header("Authorization", format!("Bearer {}", token))
.send()
.await
.map_err(|e| {
ChannelError::ConnectionError(format!("Remove reaction HTTP error: {}", e))
})?;
#[derive(Deserialize)]
struct ReactionResp {
code: i32,
msg: Option<String>,
}
let result: ReactionResp = resp.json().await.map_err(|e| {
ChannelError::Other(format!("Parse remove reaction response error: {}", e))
})?;
if result.code != 0 {
tracing::debug!(
"Failed to remove reaction {} from message {}: code={} msg={}",
reaction_id,
message_id,
result.code,
result.msg.as_deref().unwrap_or("unknown")
);
}
Ok(())
}
/// Fetch the text content of a Feishu message by ID.
/// Returns a "[Reply to: ...]" context string, or None on failure.
async fn get_message_content(&self, message_id: &str) -> Option<String> {
let token = match self.get_tenant_access_token().await {
Ok(t) => t,
Err(e) => {
tracing::debug!(error = %e, message_id = %message_id, "Feishu: failed to get token for fetching parent message");
return None;
}
};
let resp = self
.http_client
.get(format!("{}/im/v1/messages/{}", FEISHU_API_BASE, message_id))
.header("Authorization", format!("Bearer {}", token))
.send()
.await
.ok()?;
#[derive(Deserialize)]
struct MessageResp {
code: i32,
msg: Option<String>,
data: Option<MessageData>,
}
#[derive(Deserialize)]
struct MessageData {
items: Option<Vec<MessageItem>>,
}
#[derive(Deserialize)]
struct MessageItem {
msg_type: String,
body: Option<MessageBody>,
}
#[derive(Deserialize)]
struct MessageBody {
content: Option<String>,
}
let result: MessageResp = match resp.json().await {
Ok(r) => r,
Err(e) => {
tracing::debug!(error = %e, message_id = %message_id, "Feishu: failed to parse parent message response");
return None;
}
};
if result.code != 0 {
tracing::debug!(
message_id = %message_id,
code = %result.code,
msg = ?result.msg,
"Feishu: failed to fetch parent message"
);
return None;
}
let items = result.data?.items?;
let msg_obj = items.first()?;
let raw_content = msg_obj.body.as_ref()?.content.as_ref()?;
let msg_type = msg_obj.msg_type.as_str();
let text = match msg_type {
"text" => serde_json::from_str::<serde_json::Value>(raw_content)
.ok()?
.get("text")?
.as_str()?
.to_string(),
"post" => parse_post_content(raw_content),
_ => String::new(),
};
if text.is_empty() {
return None;
}
let text = if char_count(&text) > self.config.reply_context_max_chars {
truncate_with_ellipsis(&text, self.config.reply_context_max_chars)
} else {
text
};
Some(format!("[Reply to: {}]", text))
}
/// Send a message to Feishu chat with specified message type
async fn send_message_to_feishu(
&self,
receive_id: &str,
receive_id_type: &str,
msg_type: &str,
content: &str,
) -> Result<(), ChannelError> {
let token = self.get_tenant_access_token().await?;
let payload_content = if msg_type == "text" {
let truncated = if char_count(content) > self.config.max_message_chars {
format!(
"{}\n\n[Content truncated due to length limit]",
truncate_with_ellipsis(content, self.config.max_message_chars)
)
} else {
content.to_string()
};
serde_json::json!({ "text": truncated }).to_string()
} else {
// For post messages, content is already JSON (from markdown_to_post)
// But we still need to check length
if char_count(content) > self.config.max_message_chars {
// Fallback to truncated text for post as well
serde_json::json!({
"text": format!(
"{}\n\n[Content truncated due to length limit]",
truncate_with_ellipsis(content, self.config.max_message_chars)
)
})
.to_string()
} else {
content.to_string()
}
};
let resp = self
.http_client
.post(format!(
"{}/im/v1/messages?receive_id_type={}",
FEISHU_API_BASE, receive_id_type
))
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token))
.json(&serde_json::json!({
"receive_id": receive_id,
"msg_type": msg_type,
"content": payload_content
}))
.send()
.await
.map_err(|e| {
ChannelError::ConnectionError(format!("Send message HTTP error: {}", e))
})?;
#[derive(Deserialize)]
struct SendResp {
code: i32,
msg: String,
}
let send_resp: SendResp = resp
.json()
.await
.map_err(|e| ChannelError::Other(format!("Parse send response error: {}", e)))?;
if send_resp.code != 0 {
return Err(ChannelError::Other(format!(
"Send message failed: code={} msg={}",
send_resp.code, send_resp.msg
)));
}
Ok(())
}
/// Extract service_id from WebSocket URL query params
fn extract_service_id(url: &str) -> i32 {
url.split('?')
.nth(1)
.and_then(|qs| {
qs.split('&')
.find(|kv| kv.starts_with("service_id="))
.and_then(|kv| kv.split('=').nth(1))
.and_then(|v| v.parse::<i32>().ok())
})
.unwrap_or(0)
}
/// Handle incoming binary PbFrame - returns Some(ParsedMessage) if we need to ack
async fn handle_frame(&self, frame: &PbFrame) -> Result<Option<ParsedMessage>, ChannelError> {
// method 0 = CONTROL (ping/pong)
if frame.method == 0 {
return Ok(None);
}
// method 1 = DATA (events)
if frame.method != 1 {
return Ok(None);
}
let payload = frame
.payload
.as_deref()
.ok_or_else(|| ChannelError::Other("No payload in frame".to_string()))?;
#[cfg(debug_assertions)]
tracing::debug!(payload_len = %payload.len(), "Received frame payload");
let event: LarkEvent = serde_json::from_slice(payload)
.map_err(|e| ChannelError::Other(format!("Parse event error: {}", e)))?;
let event_type = event.header.event_type.as_str();
#[cfg(debug_assertions)]
tracing::debug!(event_type = %event_type, "Received event type");
if event_type != "im.message.receive_v1" {
return Ok(None);
}
let payload_data: MsgReceivePayload = serde_json::from_value(event.event.clone())
.map_err(|e| ChannelError::Other(format!("Parse payload error: {}", e)))?;
// Skip bot messages
if payload_data.sender.sender_type == "bot" {
return Ok(None);
}
let message_id = payload_data.message.message_id.clone();
// Deduplication check
if self.is_message_seen(&message_id).await {
#[cfg(debug_assertions)]
tracing::debug!(message_id = %message_id, "Duplicate message, skipping");
return Ok(None);
}
#[cfg(debug_assertions)]
tracing::debug!(message_id = %message_id, "Received Feishu message");
let open_id = payload_data
.sender
.sender_id
.open_id
.ok_or_else(|| ChannelError::Other("No open_id".to_string()))?;
let msg = payload_data.message;
let chat_id = msg.chat_id.clone();
let msg_type = msg.message_type.as_str();
let raw_content = msg.content.clone();
let parent_id = msg.parent_id.clone();
#[cfg(debug_assertions)]
tracing::debug!(msg_type = %msg_type, chat_id = %chat_id, open_id = %open_id, "Parsing message content");
let (mut content, media) = self
.parse_and_download_message(msg_type, &raw_content, &message_id)
.await?;
// Fetch and prepend quoted message content if this is a reply
if let Some(ref pid) = parent_id {
if let Some(reply_ctx) = self.get_message_content(pid).await {
content = format!("{}\n{}", reply_ctx, content);
}
}
#[cfg(debug_assertions)]
if let Some(ref m) = media {
tracing::debug!(media_type = %m.media_type, media_path = %m.path, "Media downloaded successfully");
}
Ok(Some(ParsedMessage {
message_id,
open_id,
chat_id,
content,
media,
parent_id,
}))
}
/// Parse message content and download media if needed
async fn parse_and_download_message(
&self,
msg_type: &str,
content: &str,
message_id: &str,
) -> Result<(String, Option<MediaItem>), ChannelError> {
let (text, media) = match msg_type {
"text" => {
let text = if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(content) {
parsed
.get("text")
.and_then(|v| v.as_str())
.unwrap_or(content)
.to_string()
} else {
content.to_string()
};
(text, None)
}
"post" => (parse_post_content(content), None),
"image" | "audio" | "file" | "media" => {
if let Ok(content_json) = serde_json::from_str::<serde_json::Value>(content) {
match self
.download_media(msg_type, &content_json, message_id)
.await
{
Ok((text, media)) => (text, media),
Err(_) => (format!("[{}: content unavailable]", msg_type), None),
}
} else {
(format!("[{}: content unavailable]", msg_type), None)
}
}
"share_chat" => {
// Shared chat/cannel messages
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(content) {
let chat_id = parsed
.get("chat_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
(format!("[shared chat: {}]", chat_id), None)
} else {
("[shared chat]".to_string(), None)
}
}
"share_user" => {
// Shared user messages
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(content) {
let user_id = parsed
.get("user_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
(format!("[shared user: {}]", user_id), None)
} else {
("[shared user]".to_string(), None)
}
}
"interactive" => {
// Interactive card messages - extract text content
match extract_interactive_content(content) {
Ok((text, media)) => (text, media),
Err(e) => {
tracing::warn!(error = %e, "Failed to extract interactive content");
(content.to_string(), None)
}
}
}
"list" => {
// List/bullet messages
match parse_list_content(content) {
Ok((text, media)) => (text, media),
Err(_) => (content.to_string(), None),
}
}
"merge_forward" => ("[merged forward messages]".to_string(), None),
"share_calendar_event" => {
if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(content) {
let event_key = parsed
.get("event_key")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
(format!("[shared calendar event: {}]", event_key), None)
} else {
("[shared calendar event]".to_string(), None)
}
}
"system" => ("[system message]".to_string(), None),
_ => (content.to_string(), None),
};
// Strip @_user_N placeholders from group chat @mentions
let clean_text = strip_at_placeholders(&text);
Ok((clean_text, media))
}
/// Send acknowledgment for a message
async fn send_ack(
frame: &PbFrame,
write: &mut futures_util::stream::SplitSink<
tokio_tungstenite::WebSocketStream<
tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>,
>,
tokio_tungstenite::tungstenite::Message,
>,
) -> Result<(), ChannelError> {
let mut ack = frame.clone();
ack.payload = Some(br#"{"code":200,"headers":{},"data":[]}"#.to_vec());
ack.headers.push(PbHeader {
key: "biz_rt".into(),
value: "0".into(),
});
write
.send(tokio_tungstenite::tungstenite::Message::Binary(
ack.encode_to_vec().into(),
))
.await
.map_err(|e| ChannelError::Other(format!("Failed to send ack: {}", e)))?;
Ok(())
}
async fn run_ws_loop(
&self,
bus: Arc<MessageBus>,
mut shutdown_rx: broadcast::Receiver<()>,
) -> Result<(), ChannelError> {
let (wss_url, client_config) = self.get_ws_endpoint(&self.http_client).await?;
let service_id = Self::extract_service_id(&wss_url);
tracing::info!(url = %wss_url, "Connecting to Feishu WebSocket");
let (ws_stream, _) = tokio_tungstenite::connect_async(&wss_url)
.await
.map_err(|e| {
ChannelError::ConnectionError(format!("WebSocket connection failed: {}", e))
})?;
*self.connected.write().await = true;
tracing::info!("Feishu WebSocket connected");
let (mut write, mut read) = ws_stream.split();
// Send initial ping
let ping_frame = PbFrame {
seq_id: 1,
log_id: 0,
service: service_id,
method: 0,
headers: vec![PbHeader {
key: "type".into(),
value: "ping".into(),
}],
payload: None,
};
write
.send(tokio_tungstenite::tungstenite::Message::Binary(
ping_frame.encode_to_vec().into(),
))
.await
.map_err(|e| {
ChannelError::ConnectionError(format!("Failed to send initial ping: {}", e))
})?;
let ping_interval = client_config.ping_interval.unwrap_or(120).max(10);
let mut ping_interval_tok =
tokio::time::interval(tokio::time::Duration::from_secs(ping_interval));
let mut timeout_check = tokio::time::interval(tokio::time::Duration::from_secs(10));
let mut seq: u64 = 1;
let mut last_recv = Instant::now();
// Consume the immediate tick
ping_interval_tok.tick().await;
timeout_check.tick().await;
loop {
tokio::select! {
msg = read.next() => {
match msg {
Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(data))) => {
last_recv = Instant::now();
let bytes: Bytes = data;
if let Ok(frame) = PbFrame::decode(bytes.as_ref()) {
match self.handle_frame(&frame).await {
Ok(Some(parsed)) => {
// Send ACK immediately (Feishu requires within 3 s)
if let Err(e) = Self::send_ack(&frame, &mut write).await {
tracing::error!(error = %e, "Failed to send ACK to Feishu");
}
// Add reaction emoji (await so we get the reaction_id for later removal)
let message_id = parsed.message_id.clone();
let reaction_id = match self.add_reaction(&message_id).await {
Ok(Some(rid)) => Some(rid),
Ok(None) => None,
Err(e) => {
tracing::debug!(error = %e, message_id = %message_id, "Failed to add reaction");
None
}
};
// forwarded_metadata is copied to OutboundMessage.metadata by the gateway.
let mut forwarded_metadata = std::collections::HashMap::new();
forwarded_metadata.insert("feishu.message_id".to_string(), message_id.clone());
if let Some(ref rid) = reaction_id {
forwarded_metadata.insert("feishu.reaction_id".to_string(), rid.clone());
}
if let Some(ref pid) = parsed.parent_id {
forwarded_metadata.insert("feishu.parent_id".to_string(), pid.clone());
}
// Publish to bus asynchronously
let channel = self.clone();
let bus = bus.clone();
tokio::spawn(async move {
let media_count = if parsed.media.is_some() { 1 } else { 0 };
#[cfg(debug_assertions)]
tracing::debug!(open_id = %parsed.open_id, chat_id = %parsed.chat_id, content_len = %parsed.content.len(), media_count = %media_count, "Publishing message to bus");
let msg = crate::bus::InboundMessage {
channel: channel.name().to_string(),
sender_id: parsed.open_id.clone(),
chat_id: parsed.chat_id.clone(),
content: parsed.content.clone(),
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64,
media: parsed.media.map(|m| vec![m]).unwrap_or_default(),
metadata: std::collections::HashMap::new(),
forwarded_metadata,
};
if let Err(e) = channel.handle_and_publish(&bus, &msg).await {
tracing::error!(error = %e, open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Failed to publish Feishu message to bus");
} else {
#[cfg(debug_assertions)]
tracing::debug!(open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Message published to bus successfully");
}
});
}
Ok(None) => {}
Err(e) => {
tracing::warn!(error = %e, "Failed to parse Feishu frame");
}
}
}
}
Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(data))) => {
last_recv = Instant::now();
let pong = PbFrame {
seq_id: seq.wrapping_add(1),
log_id: 0,
service: service_id,
method: 0,
headers: vec![PbHeader {
key: "type".into(),
value: "pong".into(),
}],
payload: Some(data.to_vec()),
};
let _ = write.send(tokio_tungstenite::tungstenite::Message::Binary(pong.encode_to_vec().into())).await;
}
Some(Ok(tokio_tungstenite::tungstenite::Message::Pong(_))) => {
last_recv = Instant::now();
}
Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => {
#[cfg(debug_assertions)]
tracing::debug!("Feishu WebSocket closed");
break;
}
Some(Err(e)) => {
tracing::warn!(error = %e, "Feishu WebSocket error");
break;
}
_ => {}
}
}
_ = ping_interval_tok.tick() => {
seq = seq.wrapping_add(1);
let ping = PbFrame {
seq_id: seq,
log_id: 0,
service: service_id,
method: 0,
headers: vec![PbHeader {
key: "type".into(),
value: "ping".into(),
}],
payload: None,
};
if write.send(tokio_tungstenite::tungstenite::Message::Binary(ping.encode_to_vec().into())).await.is_err() {
tracing::warn!("Feishu ping failed, reconnecting");
break;
}
}
_ = timeout_check.tick() => {
if last_recv.elapsed() > WS_HEARTBEAT_TIMEOUT {
tracing::warn!("Feishu WebSocket heartbeat timeout, reconnecting");
break;
}
// GC dedup cache: remove entries older than TTL (matches zeroclaw pattern)
let now = Instant::now();
let mut seen = self.seen_message_ids.write().await;
seen.retain(|_, ts| now.duration_since(*ts) < DEDUP_CACHE_TTL);
}
_ = shutdown_rx.recv() => {
tracing::info!("Feishu channel shutdown signal received");
break;
}
}
}
*self.connected.write().await = false;
Ok(())
}
}
fn parse_post_content(content: &str) -> String {
/// Extract text from a single post element (text, link, at-mention).
fn extract_element(el: &serde_json::Value, out: &mut Vec<String>) {
match el.get("tag").and_then(|t| t.as_str()).unwrap_or("") {
"md" => {
if let Some(text) = el.get("text").and_then(|t| t.as_str()) {
out.push(text.to_string());
}
}
"text" => {
if let Some(text) = el.get("text").and_then(|t| t.as_str()) {
out.push(text.to_string());
}
}
"a" => {
let link_text = el
.get("text")
.and_then(|t| t.as_str())
.filter(|s| !s.is_empty())
.or_else(|| el.get("href").and_then(|h| h.as_str()))
.unwrap_or("");
out.push(link_text.to_string());
}
"at" => {
let name = el
.get("user_name")
.and_then(|n| n.as_str())
.or_else(|| el.get("user_id").and_then(|i| i.as_str()))
.unwrap_or("user");
out.push(format!("@{}", name));
}
"code_block" => {
let lang = el.get("language").and_then(|l| l.as_str()).unwrap_or("");
let code_text = if let Some(content_arr) = el.get("content").and_then(|c| c.as_array()) {
content_arr
.iter()
.filter_map(|item| item.get("text").and_then(|t| t.as_str()))
.collect::<Vec<_>>()
.join("")
} else {
// Fallback to text field for backwards compatibility
el.get("text").and_then(|t| t.as_str()).unwrap_or("").to_string()
};
out.push(format!("\n```{}\n{}\n```\n", lang, code_text));
}
_ => {
if let Some(text) = el.get("text").and_then(|t| t.as_str()) {
out.push(text.to_string());
}
}
}
}
/// Parse a single block {title, content: [[...]]} and append text to out.
fn parse_block(block: &serde_json::Value, out: &mut Vec<String>) {
let title = block
.get("title")
.and_then(|t| t.as_str())
.filter(|s| !s.is_empty());
if let Some(t) = title {
out.push(t.to_string());
out.push("\n\n".to_string());
}
if let Some(content_arr) = block.get("content").and_then(|c| c.as_array()) {
for row in content_arr {
if let Some(row_arr) = row.as_array() {
for el in row_arr {
extract_element(el, out);
}
out.push("\n".to_string());
}
}
}
}
let Ok(parsed) = serde_json::from_str::<serde_json::Value>(content) else {
return content.to_string();
};
let mut texts = Vec::new();
// Unwrap optional {"post": ...} envelope (nanobot: root = root["post"])
let root = if parsed.get("post").and_then(|p| p.as_object()).is_some() {
parsed.get("post").unwrap()
} else {
&parsed
};
// Try direct format: {"title": ..., "content": [[...]]}
if root.get("content").and_then(|c| c.as_array()).is_some() {
parse_block(root, &mut texts);
let result = texts.join("");
if !result.trim().is_empty() {
return result.trim().to_string();
}
texts.clear();
}
// Try localized: {"zh_cn": {"title": ..., "content": [...]}}
for key in ["zh_cn", "en_us", "ja_jp"] {
if let Some(locale_data) = root.get(key).and_then(|l| l.as_object()) {
parse_block(&serde_json::json!(locale_data), &mut texts);
let result = texts.join("");
if !result.trim().is_empty() {
return result.trim().to_string();
}
texts.clear();
}
}
// Fall back: try any dict child
if let Some(root_obj) = root.as_object() {
for (_key, val) in root_obj {
if let Some(obj) = val.as_object() {
if obj.get("content").and_then(|c| c.as_array()).is_some() {
parse_block(val, &mut texts);
let result = texts.join("");
if !result.trim().is_empty() {
return result.trim().to_string();
}
texts.clear();
}
}
}
}
content.to_string()
}
/// Extract text content from interactive card messages
fn extract_interactive_content(content: &str) -> Result<(String, Option<MediaItem>), ChannelError> {
let parsed = match serde_json::from_str::<serde_json::Value>(content) {
Ok(p) => p,
Err(_) => return Ok((content.to_string(), None)),
};
let mut texts = Vec::new();
// Extract from elements array
if let Some(elements) = parsed.get("elements").and_then(|e| e.as_array()) {
for el in elements {
extract_element_content(el, &mut texts);
}
}
// Extract from card object
if let Some(card) = parsed.get("card").and_then(|c| c.as_object()) {
if let Some(elements) = card.get("elements").and_then(|e| e.as_array()) {
for el in elements {
extract_element_content(el, &mut texts);
}
}
}
// Extract from header
if let Some(header) = parsed.get("header").and_then(|h| h.as_object()) {
if let Some(title) = header.get("title").and_then(|t| t.as_object()) {
if let Some(text) = title.get("content").and_then(|c| c.as_str()) {
texts.push(format!("title: {}\n", text));
}
}
}
let result = texts.join("").trim().to_string();
if result.is_empty() {
Ok((content.to_string(), None))
} else {
Ok((result, None))
}
}
/// Extract content from a single card element
fn extract_element_content(element: &serde_json::Value, texts: &mut Vec<String>) {
let tag = element.get("tag").and_then(|t| t.as_str()).unwrap_or("");
match tag {
"markdown" | "lark_md" => {
if let Some(content) = element.get("content").and_then(|c| c.as_str()) {
texts.push(content.to_string());
texts.push("\n".to_string());
}
}
"div" => {
if let Some(text_obj) = element.get("text").and_then(|t| t.as_object()) {
let content = text_obj
.get("content")
.and_then(|c| c.as_str())
.unwrap_or("");
texts.push(content.to_string());
} else if let Some(content) = element.get("text").and_then(|t| t.as_str()) {
texts.push(content.to_string());
}
texts.push("\n".to_string());
}
"a" => {
let href = element.get("href").and_then(|h| h.as_str()).unwrap_or("");
let text = element.get("text").and_then(|t| t.as_str()).unwrap_or("");
if !text.is_empty() {
texts.push(text.to_string());
} else if !href.is_empty() {
texts.push(format!("link: {}", href));
}
}
"img" => {
let alt = element.get("alt");
let alt_text = alt
.and_then(|a| a.as_str())
.or_else(|| {
alt.and_then(|a| a.as_object())
.and_then(|o| o.get("content"))
.and_then(|c| c.as_str())
})
.unwrap_or("[image]");
texts.push(format!("{}\n", alt_text));
}
"note" => {
if let Some(elements) = element.get("elements").and_then(|e| e.as_array()) {
for el in elements {
extract_element_content(el, texts);
}
}
}
"column_set" => {
if let Some(columns) = element.get("columns").and_then(|c| c.as_array()) {
for col in columns {
if let Some(elements) = col.get("elements").and_then(|e| e.as_array()) {
for el in elements {
extract_element_content(el, texts);
}
}
}
}
}
"table" => {
// Tables are complex, just indicate presence
texts.push("[table]\n".to_string());
}
_ => {
// Recursively check for nested elements
if let Some(elements) = element.get("elements").and_then(|e| e.as_array()) {
for el in elements {
extract_element_content(el, texts);
}
}
}
}
}
/// Parse Feishu list/bullet message content into plain text
fn parse_list_content(content: &str) -> Result<(String, Option<MediaItem>), ChannelError> {
let parsed = match serde_json::from_str::<serde_json::Value>(content) {
Ok(p) => p,
Err(_) => return Ok((content.to_string(), None)),
};
let items = parsed
.get("items")
.and_then(|i| i.as_array())
.or_else(|| parsed.get("content").and_then(|c| c.as_array()));
let Some(items) = items else {
return Ok((content.to_string(), None));
};
let mut lines = Vec::new();
collect_list_items(items, &mut lines, 0);
let result = lines.join("\n").trim().to_string();
if result.is_empty() {
Ok((content.to_string(), None))
} else {
Ok((result, None))
}
}
/// Recursively collect list item text with indentation
fn collect_list_items(items: &[serde_json::Value], lines: &mut Vec<String>, depth: usize) {
let indent = " ".repeat(depth);
for item in items {
// Items can be arrays of inline elements or objects with content/children
let inline_elements = if let Some(arr) = item.as_array() {
arr.as_slice()
} else if let Some(obj) = item.as_object() {
obj.get("content")
.and_then(|c| c.as_array())
.map(|a| a.as_slice())
.unwrap_or(&[])
} else {
continue;
};
let mut text = String::new();
for el in inline_elements {
extract_inline_text(el, &mut text);
}
let trimmed = text.trim();
if !trimmed.is_empty() {
lines.push(format!("{}- {}", indent, trimmed));
}
// Handle nested children
if let Some(obj) = item.as_object() {
if let Some(children) = obj.get("children").and_then(|c| c.as_array()) {
collect_list_items(children, lines, depth + 1);
}
} else if let Some(children_arr) = item.as_array().and_then(|arr| {
arr.iter().find_map(|child| {
if child.as_object().and_then(|o| o.get("children")).is_some() {
Some(child)
} else {
None
}
})
}) {
if let Some(children) = children_arr
.as_object()
.and_then(|o| o.get("children"))
.and_then(|c| c.as_array())
{
collect_list_items(children, lines, depth + 1);
}
}
}
}
/// Extract text from inline elements (text, link, at-mention)
fn extract_inline_text(el: &serde_json::Value, out: &mut String) {
match el.get("tag").and_then(|t| t.as_str()).unwrap_or("") {
"text" => {
if let Some(text) = el.get("text").and_then(|t| t.as_str()) {
out.push_str(text);
}
}
"a" => {
let text = el
.get("text")
.and_then(|t| t.as_str())
.filter(|s| !s.is_empty())
.or_else(|| el.get("href").and_then(|h| h.as_str()))
.unwrap_or("");
out.push_str(text);
}
"at" => {
let name = el
.get("user_name")
.and_then(|n| n.as_str())
.or_else(|| el.get("user_id").and_then(|i| i.as_str()))
.unwrap_or("user");
out.push_str(&format!("@{}", name));
}
_ => {}
}
}
/// Remove @_user_N placeholder tokens injected by Feishu in group chats
fn strip_at_placeholders(text: &str) -> String {
let mut result = String::with_capacity(text.len());
let mut chars = text.chars().peekable();
while let Some(ch) = chars.next() {
if ch == '@' {
let rest: String = chars.clone().collect();
if let Some(after) = rest.strip_prefix("_user_") {
// Skip until we hit a non-alphanumeric character
let placeholder_len = after
.find(|c: char| !c.is_alphanumeric())
.unwrap_or(after.len());
// Skip the placeholder
for _ in 0..placeholder_len {
chars.next();
}
// Also skip the underscore after user_N if present
if chars.peek() == Some(&'_') {
chars.next();
}
continue;
}
}
result.push(ch);
}
result
}
// ─────────────────────────────────────────────────────────────────────────────
// Markdown parsing and Feishu card element building
// ─────────────────────────────────────────────────────────────────────────────
/// Message format types for Feishu
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum MsgFormat {
/// Plain text, short and no markdown
Text,
/// Rich text (links only, moderate length)
Post,
/// Interactive card with full markdown rendering
Interactive,
}
/// Regex patterns for markdown detection
struct MdPatterns {
/// Regex to match markdown tables (header + separator + data rows)
table_re: Regex,
/// Regex to match headings (# heading)
heading_re: Regex,
/// Regex to match code blocks (```...```)
code_block_re: Regex,
/// Regex to match bold **text** or __text__
bold_re: Regex,
/// Regex to match italic *text* or _text_
italic_re: Regex,
/// Regex to match strikethrough ~~text~~
strike_re: Regex,
/// Regex to match markdown links [text](url)
link_re: Regex,
/// Regex to match unordered list items (- item or * item)
list_re: Regex,
/// Regex to match ordered list items (1. item)
olist_re: Regex,
}
impl MdPatterns {
fn new() -> Self {
Self {
table_re: Regex::new(r"(?m)((?:^[ \t]*\|.+\|[ \t]*\n)(?:^[ \t]*\|[-:\s|]+\|[ \t]*\n)(?:^[ \t]*\|.+\|[ \t]*\n?)+)").unwrap(),
heading_re: Regex::new(r"(?m)^(#{1,6})\s+(.+)$").unwrap(),
code_block_re: Regex::new(r"(```[\s\S]*?```)").unwrap(),
bold_re: Regex::new(r"\*\*(.+?)\*\*|__(.+?)__").unwrap(),
// Simple pattern: detect *text* or _text_ markers (may overlap with bold, but that's ok for detection)
// We use a conservative approach: detect potential italic markers
italic_re: Regex::new(r"\*[^*]+\*|_[^_]+_").unwrap(),
strike_re: Regex::new(r"~~.+?~~").unwrap(),
link_re: Regex::new(r"\[([^\]]+)\]\((https?://[^\)]+)\)").unwrap(),
list_re: Regex::new(r"(?m)^[\s]*[-*+]\s+").unwrap(),
olist_re: Regex::new(r"(?m)^[\s]*\d+\.\s+").unwrap(),
}
}
}
impl Default for MdPatterns {
fn default() -> Self {
Self::new()
}
}
impl FeishuChannel {
/// Determine the optimal Feishu message format for content.
fn detect_msg_format(content: &str) -> MsgFormat {
let patterns = MdPatterns::new();
let stripped = content.trim();
// Tables and headings are not supported by post `md` nodes, so use cards.
if patterns.table_re.is_match(stripped) || patterns.heading_re.is_match(stripped) {
return MsgFormat::Interactive;
}
// Long content → interactive card (better readability)
if stripped.len() > 2000 {
return MsgFormat::Interactive;
}
// Markdown that is supported by Feishu post `md` should be sent as post.
if patterns.bold_re.is_match(stripped)
|| patterns.italic_re.is_match(stripped)
|| patterns.strike_re.is_match(stripped)
|| patterns.code_block_re.is_match(stripped)
|| patterns.list_re.is_match(stripped)
|| patterns.olist_re.is_match(stripped)
|| patterns.link_re.is_match(stripped)
{
return MsgFormat::Post;
}
// Short plain text → text format
if stripped.len() <= 200 {
return MsgFormat::Text;
}
// Medium plain text without formatting → post format
MsgFormat::Post
}
/// Strip markdown formatting markers from text for plain display.
fn strip_md_formatting(text: &str) -> String {
let patterns = MdPatterns::new();
let mut result = text.to_string();
// Remove bold markers
result = patterns.bold_re.replace_all(&result, "$1$2").to_string();
// Remove italic markers
result = patterns.italic_re.replace_all(&result, "$1$2").to_string();
// Remove strikethrough markers
result = patterns.strike_re.replace_all(&result, "$1").to_string();
result
}
/// Parse a markdown table into a Feishu table element.
fn parse_md_table(table_text: &str) -> Option<serde_json::Value> {
let lines: Vec<&str> = table_text
.trim()
.split('\n')
.map(|l| l.trim())
.filter(|l| !l.is_empty())
.collect();
if lines.len() < 3 {
return None;
}
fn split(line: &str) -> Vec<String> {
line.trim_start_matches('|')
.trim_end_matches('|')
.split('|')
.map(|c| c.trim().to_string())
.collect()
}
let headers = split(lines[0]);
let rows: Vec<std::collections::HashMap<String, String>> = lines[2..]
.iter()
.map(|line| {
let cells: Vec<String> = split(line);
headers
.iter()
.enumerate()
.map(|(i, _h)| {
(
format!("c{}", i),
if i < cells.len() {
Self::strip_md_formatting(&cells[i])
} else {
String::new()
},
)
})
.collect()
})
.collect();
let columns: Vec<serde_json::Value> = headers
.iter()
.enumerate()
.map(|(i, h)| {
serde_json::json!({
"tag": "column",
"name": format!("c{}", i),
"display_name": Self::strip_md_formatting(h),
"width": "auto"
})
})
.collect();
Some(serde_json::json!({
"tag": "table",
"page_size": rows.len() + 1,
"columns": columns,
"rows": rows,
}))
}
/// Split content by headings, converting headings to div elements.
fn split_headings(content: &str) -> Vec<serde_json::Value> {
let patterns = MdPatterns::new();
let mut protected = content.to_string();
// Protect code blocks by replacing them with placeholders
let mut code_blocks: Vec<String> = Vec::new();
for m in patterns.code_block_re.find_iter(content) {
code_blocks.push(m.as_str().to_string());
protected = protected.replace(
m.as_str(),
&format!("\x00CODE{}\x00", code_blocks.len() - 1),
);
}
/// Restore all code block placeholders in the given text.
fn restore_code_blocks(text: &str, code_blocks: &[String]) -> String {
let mut result = text.to_string();
for (i, cb) in code_blocks.iter().enumerate() {
result = result.replace(&format!("\x00CODE{}\x00", i), cb);
}
result
}
let mut elements: Vec<serde_json::Value> = Vec::new();
let mut last_end = 0;
for m in patterns.heading_re.find_iter(&protected) {
let before = protected[last_end..m.start()].trim();
if !before.is_empty() {
elements.push(serde_json::json!({
"tag": "markdown",
"content": restore_code_blocks(before, &code_blocks)
}));
}
let heading_text = Self::strip_md_formatting(m.as_str().trim_start_matches('#').trim());
let display_text = if heading_text.is_empty() {
String::new()
} else {
format!("**{}**", heading_text)
};
elements.push(serde_json::json!({
"tag": "div",
"text": {
"tag": "lark_md",
"content": display_text
}
}));
last_end = m.end();
}
let remaining = protected[last_end..].trim();
if !remaining.is_empty() {
elements.push(serde_json::json!({
"tag": "markdown",
"content": restore_code_blocks(remaining, &code_blocks)
}));
}
if elements.is_empty() {
elements.push(serde_json::json!({
"tag": "markdown",
"content": content
}));
}
elements
}
/// Split card elements into groups with at most one table element each.
/// Feishu cards have a hard limit of one table per card (API error 11310).
fn split_elements_by_table_limit(
elements: &[serde_json::Value],
) -> Vec<Vec<serde_json::Value>> {
if elements.is_empty() {
return vec![vec![]];
}
let mut groups: Vec<Vec<serde_json::Value>> = Vec::new();
let mut current: Vec<serde_json::Value> = Vec::new();
let mut table_count = 0;
for el in elements {
if el.get("tag").and_then(|t| t.as_str()) == Some("table") {
if table_count >= 1 {
groups.push(current);
current = Vec::new();
table_count = 0;
}
current.push(el.clone());
table_count += 1;
} else {
current.push(el.clone());
}
}
if !current.is_empty() {
groups.push(current);
}
groups
}
/// Build content into card elements (div/markdown + table).
fn build_card_elements(content: &str) -> Vec<serde_json::Value> {
let patterns = MdPatterns::new();
let mut elements: Vec<serde_json::Value> = Vec::new();
let mut last_end = 0;
// Find all tables in content
for m in patterns.table_re.find_iter(content) {
let before = &content[last_end..m.start()];
if !before.trim().is_empty() {
elements.extend(Self::split_headings(before));
}
if let Some(table) = Self::parse_md_table(m.as_str()) {
elements.push(table);
} else {
elements.push(serde_json::json!({
"tag": "markdown",
"content": m.as_str()
}));
}
last_end = m.end();
}
let remaining = &content[last_end..];
if !remaining.trim().is_empty() {
elements.extend(Self::split_headings(remaining));
}
if elements.is_empty() {
elements.push(serde_json::json!({
"tag": "markdown",
"content": content
}));
}
elements
}
/// Convert markdown content to Feishu post message JSON.
/// Feishu Markdown is sent via a post message with an `md` node.
fn markdown_to_post(content: &str) -> String {
let post_body = serde_json::json!({
"zh_cn": {
"content": [[{
"tag": "md",
"text": content.trim()
}]]
}
});
post_body.to_string()
}
/// Send an interactive card message to Feishu.
async fn send_interactive_card(
&self,
receive_id: &str,
receive_id_type: &str,
card_content: &str,
) -> Result<(), ChannelError> {
let token = self.get_tenant_access_token().await?;
let resp = self
.http_client
.post(format!(
"{}/im/v1/messages?receive_id_type={}",
FEISHU_API_BASE, receive_id_type
))
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token))
.json(&serde_json::json!({
"receive_id": receive_id,
"msg_type": "interactive",
"content": card_content
}))
.send()
.await
.map_err(|e| {
ChannelError::ConnectionError(format!("Send interactive card HTTP error: {}", e))
})?;
#[derive(Deserialize)]
struct SendResp {
code: i32,
msg: String,
}
let send_resp: SendResp = resp.json().await.map_err(|e| {
ChannelError::Other(format!("Parse send interactive card response error: {}", e))
})?;
if send_resp.code != 0 {
return Err(ChannelError::Other(format!(
"Send interactive card failed: code={} msg={}",
send_resp.code, send_resp.msg
)));
}
Ok(())
}
}
fn infer_download_filename(
content_json: &serde_json::Value,
headers: &reqwest::header::HeaderMap,
message_id: &str,
file_key: &str,
file_type: &str,
) -> String {
if let Some(file_name) = extract_original_file_name(content_json, headers) {
let sanitized = sanitize_download_file_name(&file_name);
if !sanitized.is_empty() {
return format!("{}_{}", message_id, sanitized);
}
}
FeishuChannel::fallback_download_filename(message_id, file_key, file_type)
}
fn extract_original_file_name(
content_json: &serde_json::Value,
headers: &reqwest::header::HeaderMap,
) -> Option<String> {
let content_name = ["file_name", "filename", "name"]
.into_iter()
.find_map(|key| content_json.get(key).and_then(|value| value.as_str()))
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned);
if content_name.is_some() {
return content_name;
}
extract_file_name_from_content_disposition(headers)
}
fn extract_file_name_from_content_disposition(
headers: &reqwest::header::HeaderMap,
) -> Option<String> {
let header = headers
.get(reqwest::header::CONTENT_DISPOSITION)
.and_then(|value| value.to_str().ok())?;
for segment in header.split(';').map(str::trim) {
if let Some(value) = segment.strip_prefix("filename*=") {
let decoded = value.split("''").last().unwrap_or(value).trim_matches('"');
if !decoded.is_empty() {
return Some(decoded.to_string());
}
}
if let Some(value) = segment.strip_prefix("filename=") {
let cleaned = value.trim_matches('"').trim();
if !cleaned.is_empty() {
return Some(cleaned.to_string());
}
}
}
None
}
fn sanitize_download_file_name(file_name: &str) -> String {
file_name
.chars()
.map(|ch| match ch {
'/' | '\\' | ':' | '*' | '?' | '"' | '<' | '>' | '|' => '_',
_ => ch,
})
.collect::<String>()
.trim_matches('.')
.trim()
.to_string()
}
#[cfg(test)]
mod tests {
use super::{
FeishuChannel, MsgFormat, extract_file_name_from_content_disposition,
infer_download_filename, parse_post_content, sanitize_download_file_name,
};
#[test]
fn markdown_post_uses_md_tag() {
let content = "**bold**\n1. item1\n2. item2\n[link](https://open.feishu.cn)";
let post = FeishuChannel::markdown_to_post(content);
let parsed: serde_json::Value = serde_json::from_str(&post).unwrap();
assert_eq!(parsed["zh_cn"]["content"][0][0]["tag"], "md");
assert_eq!(parsed["zh_cn"]["content"][0][0]["text"], content);
}
#[test]
fn multiline_markdown_is_not_misclassified_as_plain_post() {
let content = "intro\n1. item1\n2. item2";
assert_eq!(FeishuChannel::detect_msg_format(content), MsgFormat::Post);
}
#[test]
fn headings_still_use_interactive() {
let content = "intro\n## heading";
assert_eq!(
FeishuChannel::detect_msg_format(content),
MsgFormat::Interactive
);
}
#[test]
fn infer_download_filename_prefers_original_file_name() {
let content = serde_json::json!({
"file_key": "file_key_123",
"file_name": "demo-archive.zip"
});
let headers = reqwest::header::HeaderMap::new();
let filename =
infer_download_filename(&content, &headers, "om_123", "file_key_123", "file");
assert_eq!(filename, "om_123_demo-archive.zip");
}
#[test]
fn infer_download_filename_uses_content_disposition_when_message_lacks_name() {
let content = serde_json::json!({
"file_key": "file_key_123"
});
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::CONTENT_DISPOSITION,
reqwest::header::HeaderValue::from_static("attachment; filename=meeting-notes.zip"),
);
let filename =
infer_download_filename(&content, &headers, "om_123", "file_key_123", "file");
assert_eq!(filename, "om_123_meeting-notes.zip");
}
#[test]
fn infer_download_filename_falls_back_to_bin_without_name() {
let content = serde_json::json!({
"file_key": "file_key_123"
});
let headers = reqwest::header::HeaderMap::new();
let filename =
infer_download_filename(&content, &headers, "om_123", "file_key_123", "file");
assert_eq!(filename, "om_123_file_key.bin");
}
#[test]
fn sanitize_download_file_name_replaces_path_separators() {
let sanitized = sanitize_download_file_name("../../demo/archive.zip");
assert_eq!(sanitized, "_.._demo_archive.zip");
}
#[test]
fn extract_file_name_from_content_disposition_supports_filename_star() {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
reqwest::header::CONTENT_DISPOSITION,
reqwest::header::HeaderValue::from_static("attachment; filename*=UTF-8''archive.zip"),
);
let file_name = extract_file_name_from_content_disposition(&headers);
assert_eq!(file_name.as_deref(), Some("archive.zip"));
}
#[test]
fn parse_post_content_handles_code_block_with_content_array() {
// Test parsing code_block with content array (standard Feishu format)
let post_json = r#"{"post":{"zh_cn":{"content":[[{"tag":"code_block","language":"python","content":[{"tag":"text","text":"def hello():"},{"tag":"text","text":" print('world')"}]}]]}}}"#;
let result = parse_post_content(post_json);
assert!(result.contains("```python"));
assert!(result.contains("def hello():"));
assert!(result.contains("print('world')"));
}
#[test]
fn parse_post_content_handles_code_block_with_fallback_text() {
// Backwards compatibility: some formats might use text field directly
let post_json = r#"{"post":{"zh_cn":{"content":[[{"tag":"code_block","language":"rust","text":"fn main() {}"}]]}}}"#;
let result = parse_post_content(post_json);
assert!(result.contains("```rust"));
assert!(result.contains("fn main() {}"));
}
#[test]
fn parse_post_content_handles_code_block_without_language() {
// Test code_block without language field
let post_json = r#"{"post":{"zh_cn":{"content":[[{"tag":"code_block","content":[{"tag":"text","text":"plain text"}]}]]}}}"#;
let result = parse_post_content(post_json);
assert!(result.contains("```"));
assert!(result.contains("plain text"));
}
#[test]
fn parse_post_content_handles_empty_code_block() {
// Test code_block with empty content
let post_json = r#"{"post":{"zh_cn":{"content":[[{"tag":"code_block","language":"go"}]]}}}"#;
let result = parse_post_content(post_json);
assert!(result.contains("```go"));
}
}
#[async_trait]
impl Channel for FeishuChannel {
fn name(&self) -> &str {
&self.name
}
async fn start(&self, bus: Arc<MessageBus>) -> Result<(), ChannelError> {
if self.config.app_id.is_empty() || self.config.app_secret.is_empty() {
return Err(ChannelError::ConfigError(
"Feishu app_id or app_secret is not configured".to_string(),
));
}
*self.running.write().await = true;
let (shutdown_tx, _) = broadcast::channel(1);
*self.shutdown_tx.write().await = Some(shutdown_tx.clone());
let channel = self.clone();
let bus = bus.clone();
tokio::spawn(async move {
let mut consecutive_failures = 0;
let max_failures = 3;
loop {
if !*channel.running.read().await {
break;
}
let shutdown_rx = shutdown_tx.subscribe();
match channel.run_ws_loop(bus.clone(), shutdown_rx).await {
Ok(_) => {
tracing::info!("Feishu WebSocket disconnected");
}
Err(e) => {
consecutive_failures += 1;
tracing::error!(attempt = consecutive_failures, error = %e, "Feishu WebSocket error");
if consecutive_failures >= max_failures {
tracing::error!("Feishu channel: max failures reached, stopping");
break;
}
}
}
if !*channel.running.read().await {
break;
}
tracing::info!("Feishu channel retrying in 5s...");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
}
*channel.running.write().await = false;
tracing::info!("Feishu channel stopped");
});
tracing::info!("Feishu channel started");
Ok(())
}
async fn stop(&self) -> Result<(), ChannelError> {
*self.running.write().await = false;
if let Some(tx) = self.shutdown_tx.write().await.take() {
let _ = tx.send(());
}
Ok(())
}
fn is_running(&self) -> bool {
self.running.try_read().map(|r| *r).unwrap_or(false)
}
async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError> {
if matches!(msg.event_kind, OutboundEventKind::ToolResult | OutboundEventKind::ToolPending)
|| msg.metadata.get("is_subagent_event").map(|v| v == "true").unwrap_or(false)
{
return Ok(());
}
let receive_id = if msg.chat_id.starts_with("oc_") {
&msg.chat_id
} else {
&msg.reply_to.as_ref().unwrap_or(&msg.chat_id)
};
let receive_id_type = if msg.chat_id.starts_with("oc_") {
"chat_id"
} else {
"open_id"
};
let remove_reaction = async {
self.remove_reaction_from_metadata(&msg.metadata).await;
};
// If no media, use smart format detection
if msg.media.is_empty() {
let content = msg.content.trim();
// Empty content
if content.is_empty() {
remove_reaction.await;
return Ok(());
}
let fmt = Self::detect_msg_format(content);
match fmt {
MsgFormat::Text => {
// Short plain text send as simple text message
let result = self
.send_message_to_feishu(receive_id, receive_id_type, "text", content)
.await;
remove_reaction.await;
return result;
}
MsgFormat::Post => {
// Medium content with links send as rich-text post
let post_body = Self::markdown_to_post(content);
let result = self
.send_message_to_feishu(receive_id, receive_id_type, "post", &post_body)
.await;
remove_reaction.await;
return result;
}
MsgFormat::Interactive => {
// Complex / long content send as interactive card
let elements = Self::build_card_elements(content);
for chunk in Self::split_elements_by_table_limit(&elements) {
let card = serde_json::json!({
"config": { "wide_screen_mode": true },
"elements": chunk
});
if let Err(e) = self
.send_interactive_card(receive_id, receive_id_type, &card.to_string())
.await
{
tracing::warn!(error = %e, "Failed to send interactive card, falling back to text");
// Fallback to plain text
let result = self
.send_message_to_feishu(
receive_id,
receive_id_type,
"text",
content,
)
.await;
remove_reaction.await;
return result;
}
}
remove_reaction.await;
return Ok(());
}
}
}
if !msg.content.trim().is_empty() {
self.send_message_to_feishu(receive_id, receive_id_type, "text", msg.content.trim())
.await?;
}
let mut sent_media = 0usize;
for media_item in &msg.media {
let path = &media_item.path;
let result = match media_item.media_type.as_str() {
"image" => {
let image_key = self.upload_image(path).await?;
self.send_message_to_feishu(
receive_id,
receive_id_type,
"image",
&serde_json::json!({ "image_key": image_key }).to_string(),
)
.await
}
"audio" | "file" | "video" => {
let file_key = self.upload_file(path).await?;
self.send_message_to_feishu(
receive_id,
receive_id_type,
"file",
&serde_json::json!({ "file_key": file_key }).to_string(),
)
.await
}
_ => {
tracing::warn!(media_type = %media_item.media_type, "Unsupported media type for sending");
continue;
}
};
match result {
Ok(()) => sent_media += 1,
Err(error) => {
tracing::warn!(error = %error, path = %path, media_type = %media_item.media_type, "Failed to send media message to Feishu");
return Err(error);
}
}
}
if msg.content.trim().is_empty() && sent_media == 0 {
remove_reaction.await;
return Err(ChannelError::Other(
"No supported media items were sent to Feishu".to_string(),
));
}
remove_reaction.await;
Ok(())
}
}