feat(feishu): add reaction handling and metadata forwarding in messages

This commit is contained in:
xiaoski 2026-04-08 10:24:15 +08:00
parent a4399037ac
commit 21b4e60c44
4 changed files with 261 additions and 17 deletions

View File

@ -145,7 +145,10 @@ pub struct InboundMessage {
pub content: String,
pub timestamp: i64,
pub media: Vec<MediaItem>,
/// Channel-specific data used internally by the channel (not forwarded).
pub metadata: HashMap<String, String>,
/// Data forwarded from inbound to outbound (copied to OutboundMessage.metadata by gateway).
pub forwarded_metadata: HashMap<String, String>,
}
impl InboundMessage {

View File

@ -1,10 +1,13 @@
use std::collections::HashMap;
use std::path::Path;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use tokio::sync::{broadcast, RwLock};
use serde::Deserialize;
use futures_util::{SinkExt, StreamExt};
use prost::{Message as ProstMessage, bytes::Bytes};
use serde::Deserialize;
use tokio::sync::{broadcast, RwLock};
use crate::bus::{MessageBus, MediaItem, OutboundMessage};
use crate::channels::base::{Channel, ChannelError};
@ -13,6 +16,18 @@ use crate::config::{FeishuChannelConfig, LLMProviderConfig};
const FEISHU_API_BASE: &str = "https://open.feishu.cn/open-apis";
const FEISHU_WS_BASE: &str = "https://open.feishu.cn";
/// Heartbeat timeout for WS connection — must be larger than ping_interval (default 120 s).
/// If no binary frame (pong or event) is received within this window, reconnect.
const WS_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(300);
/// Refresh tenant token this many seconds before the announced expiry.
const TOKEN_REFRESH_SKEW: Duration = Duration::from_secs(120);
/// Default tenant token TTL when `expire`/`expires_in` is absent.
const DEFAULT_TOKEN_TTL: Duration = Duration::from_secs(7200);
/// Feishu API business code for expired/invalid tenant access token.
const INVALID_ACCESS_TOKEN_CODE: i32 = 99991663;
/// Dedup cache TTL (30 minutes).
const DEDUP_CACHE_TTL: Duration = Duration::from_secs(30 * 60);
// ─────────────────────────────────────────────────────────────────────────────
// Protobuf types for Feishu WebSocket protocol (pbbp2.proto)
// ─────────────────────────────────────────────────────────────────────────────
@ -125,6 +140,13 @@ struct LarkMessage {
// ─────────────────────────────────────────────────────────────────────────────
/// Cached tenant token with proactive refresh metadata.
#[derive(Clone)]
struct CachedTenantToken {
value: String,
refresh_after: Instant,
}
#[derive(Clone)]
pub struct FeishuChannel {
config: FeishuChannelConfig,
@ -132,10 +154,15 @@ pub struct FeishuChannel {
running: Arc<RwLock<bool>>,
shutdown_tx: Arc<RwLock<Option<broadcast::Sender<()>>>>,
connected: Arc<RwLock<bool>>,
/// Cached tenant access token with proactive refresh.
tenant_token: Arc<RwLock<Option<CachedTenantToken>>>,
/// Dedup cache: WS message_ids seen in the last ~30 min.
seen_message_ids: Arc<RwLock<HashMap<String, Instant>>>,
}
/// Parsed message data from a Feishu frame
struct ParsedMessage {
message_id: String,
open_id: String,
chat_id: String,
content: String,
@ -153,6 +180,8 @@ impl FeishuChannel {
running: Arc::new(RwLock::new(false)),
shutdown_tx: Arc::new(RwLock::new(None)),
connected: Arc::new(RwLock::new(false)),
tenant_token: Arc::new(RwLock::new(None)),
seen_message_ids: Arc::new(RwLock::new(HashMap::new())),
})
}
@ -189,8 +218,42 @@ impl FeishuChannel {
Ok((ep.url, client_config))
}
/// Get tenant access token
async fn get_tenant_token(&self) -> Result<String, ChannelError> {
/// Get tenant access token (cached with proactive refresh).
async fn get_tenant_access_token(&self) -> Result<String, ChannelError> {
// 1. Check cache
{
let cached = self.tenant_token.read().await;
if let Some(ref token) = *cached {
if Instant::now() < token.refresh_after {
return Ok(token.value.clone());
}
}
}
// 2. Fetch new token
let (token, ttl) = self.fetch_new_token().await?;
// 3. Cache with proactive refresh time (提前 120 秒)
let refresh_after = Instant::now() + ttl.saturating_sub(TOKEN_REFRESH_SKEW);
{
let mut cached = self.tenant_token.write().await;
*cached = Some(CachedTenantToken {
value: token.clone(),
refresh_after,
});
}
Ok(token)
}
/// Invalidate cached token (called when API reports expired tenant token).
async fn invalidate_token(&self) {
let mut cached = self.tenant_token.write().await;
*cached = None;
}
/// Fetch a new tenant access token from Feishu.
async fn fetch_new_token(&self) -> Result<(String, Duration), ChannelError> {
let resp = self.http_client
.post(format!("{}/auth/v3/tenant_access_token/internal", FEISHU_API_BASE))
.header("Content-Type", "application/json")
@ -206,6 +269,7 @@ impl FeishuChannel {
struct TokenResponse {
code: i32,
tenant_access_token: Option<String>,
expire: Option<i64>,
}
let token_resp: TokenResponse = resp
@ -217,8 +281,30 @@ impl FeishuChannel {
return Err(ChannelError::Other("Auth failed".to_string()));
}
token_resp.tenant_access_token
.ok_or_else(|| ChannelError::Other("No token in response".to_string()))
let token = token_resp.tenant_access_token
.ok_or_else(|| ChannelError::Other("No token in response".to_string()))?;
let ttl = token_resp.expire
.and_then(|v| u64::try_from(v).ok())
.map(Duration::from_secs)
.unwrap_or(DEFAULT_TOKEN_TTL);
Ok((token, ttl))
}
/// Check if message_id has been seen (dedup), and mark it as seen if not.
/// Returns true if the message was already processed.
/// Note: GC of stale entries is handled in the heartbeat timeout_check loop.
async fn is_message_seen(&self, message_id: &str) -> bool {
let mut seen = self.seen_message_ids.write().await;
let now = Instant::now();
if seen.contains_key(message_id) {
true
} else {
seen.insert(message_id.to_string(), now);
false
}
}
/// Download media and save locally, return (description, media_item)
@ -250,7 +336,7 @@ impl FeishuChannel {
.and_then(|v| v.as_str())
.ok_or_else(|| ChannelError::Other("No image_key in message".to_string()))?;
let token = self.get_tenant_token().await?;
let token = self.get_tenant_access_token().await?;
// Use message resource API for downloading message images
let url = format!("{}/im/v1/messages/{}/resources/{}?type=image", FEISHU_API_BASE, message_id, image_key);
@ -309,7 +395,7 @@ impl FeishuChannel {
.and_then(|v| v.as_str())
.ok_or_else(|| ChannelError::Other("No file_key in message".to_string()))?;
let token = self.get_tenant_token().await?;
let token = self.get_tenant_access_token().await?;
// Use message resource API for downloading message files
let url = format!("{}/im/v1/messages/{}/resources/{}?type=file", FEISHU_API_BASE, message_id, file_key);
@ -357,7 +443,7 @@ impl FeishuChannel {
/// Upload image to Feishu and return the image_key
async fn upload_image(&self, file_path: &str) -> Result<String, ChannelError> {
let token = self.get_tenant_token().await?;
let token = self.get_tenant_access_token().await?;
let mime = mime_guess::from_path(file_path)
.first_or_octet_stream()
@ -417,7 +503,7 @@ impl FeishuChannel {
/// Upload file to Feishu and return the file_key
async fn upload_file(&self, file_path: &str) -> Result<String, ChannelError> {
let token = self.get_tenant_token().await?;
let token = self.get_tenant_access_token().await?;
let file_name = std::path::Path::new(file_path)
.file_name()
@ -485,9 +571,100 @@ impl FeishuChannel {
.ok_or_else(|| ChannelError::Other("No file_key in response".to_string()))
}
/// Add a reaction emoji to a message and store the reaction_id for later removal.
/// Returns the reaction_id if successful, None otherwise.
async fn add_reaction(&self, message_id: &str) -> Result<Option<String>, ChannelError> {
let emoji = self.config.reaction_emoji.as_str();
let token = self.get_tenant_access_token().await?;
let resp = self.http_client
.post(format!("{}/im/v1/messages/{}/reactions", FEISHU_API_BASE, message_id))
.header("Authorization", format!("Bearer {}", token))
.json(&serde_json::json!({
"reaction_type": { "emoji_type": emoji }
}))
.send()
.await
.map_err(|e| ChannelError::ConnectionError(format!("Add reaction HTTP error: {}", e)))?;
#[derive(Deserialize)]
struct ReactionResp {
code: i32,
msg: Option<String>,
data: Option<ReactionData>,
}
#[derive(Deserialize)]
struct ReactionData {
reaction_id: Option<String>,
}
let result: ReactionResp = resp.json().await
.map_err(|e| ChannelError::Other(format!("Parse reaction response error: {}", e)))?;
if result.code != 0 {
tracing::warn!(
"Failed to add reaction to message {}: code={} msg={}",
message_id,
result.code,
result.msg.as_deref().unwrap_or("unknown")
);
return Ok(None);
}
let reaction_id = result.data.and_then(|d| d.reaction_id);
Ok(reaction_id)
}
/// Remove reaction using feishu metadata propagated through OutboundMessage.
/// Reads feishu.message_id and feishu.reaction_id from metadata.
async fn remove_reaction_from_metadata(&self, metadata: &std::collections::HashMap<String, String>) {
let (message_id, reaction_id) = match (
metadata.get("feishu.message_id"),
metadata.get("feishu.reaction_id"),
) {
(Some(msg_id), Some(rid)) => (msg_id.clone(), rid.clone()),
_ => return,
};
if let Err(e) = self.remove_reaction(&message_id, &reaction_id).await {
tracing::debug!(error = %e, message_id = %message_id, "Failed to remove reaction");
}
}
/// Remove a reaction emoji from a message.
async fn remove_reaction(&self, message_id: &str, reaction_id: &str) -> Result<(), ChannelError> {
let token = self.get_tenant_access_token().await?;
let resp = self.http_client
.delete(format!("{}/im/v1/messages/{}/reactions/{}", FEISHU_API_BASE, message_id, reaction_id))
.header("Authorization", format!("Bearer {}", token))
.send()
.await
.map_err(|e| ChannelError::ConnectionError(format!("Remove reaction HTTP error: {}", e)))?;
#[derive(Deserialize)]
struct ReactionResp {
code: i32,
msg: Option<String>,
}
let result: ReactionResp = resp.json().await
.map_err(|e| ChannelError::Other(format!("Parse remove reaction response error: {}", e)))?;
if result.code != 0 {
tracing::debug!(
"Failed to remove reaction {} from message {}: code={} msg={}",
reaction_id,
message_id,
result.code,
result.msg.as_deref().unwrap_or("unknown")
);
}
Ok(())
}
/// Send a text message to Feishu chat (implements Channel trait)
async fn send_message_to_feishu(&self, receive_id: &str, receive_id_type: &str, content: &str) -> Result<(), ChannelError> {
let token = self.get_tenant_token().await?;
let token = self.get_tenant_access_token().await?;
// Feishu text messages have content limits (~64KB).
// Truncate if content is too long to avoid API error 230001.
@ -582,6 +759,13 @@ impl FeishuChannel {
let message_id = payload_data.message.message_id.clone();
// Deduplication check
if self.is_message_seen(&message_id).await {
#[cfg(debug_assertions)]
tracing::debug!(message_id = %message_id, "Duplicate message, skipping");
return Ok(None);
}
#[cfg(debug_assertions)]
tracing::debug!(message_id = %message_id, "Received Feishu message");
@ -604,6 +788,7 @@ impl FeishuChannel {
}
Ok(Some(ParsedMessage {
message_id,
open_id,
chat_id,
content,
@ -689,16 +874,20 @@ impl FeishuChannel {
let ping_interval = client_config.ping_interval.unwrap_or(120).max(10);
let mut ping_interval_tok = tokio::time::interval(tokio::time::Duration::from_secs(ping_interval));
let mut timeout_check = tokio::time::interval(tokio::time::Duration::from_secs(10));
let mut seq: u64 = 1;
let mut last_recv = Instant::now();
// Consume the immediate tick
ping_interval_tok.tick().await;
timeout_check.tick().await;
loop {
tokio::select! {
msg = read.next() => {
match msg {
Some(Ok(tokio_tungstenite::tungstenite::Message::Binary(data))) => {
last_recv = Instant::now();
let bytes: Bytes = data;
if let Ok(frame) = PbFrame::decode(bytes.as_ref()) {
match self.handle_frame(&frame).await {
@ -708,6 +897,24 @@ impl FeishuChannel {
tracing::error!(error = %e, "Failed to send ACK to Feishu");
}
// Add reaction emoji (await so we get the reaction_id for later removal)
let message_id = parsed.message_id.clone();
let reaction_id = match self.add_reaction(&message_id).await {
Ok(Some(rid)) => Some(rid),
Ok(None) => None,
Err(e) => {
tracing::debug!(error = %e, message_id = %message_id, "Failed to add reaction");
None
}
};
// forwarded_metadata is copied to OutboundMessage.metadata by the gateway.
let mut forwarded_metadata = std::collections::HashMap::new();
forwarded_metadata.insert("feishu.message_id".to_string(), message_id.clone());
if let Some(ref rid) = reaction_id {
forwarded_metadata.insert("feishu.reaction_id".to_string(), rid.clone());
}
// Publish to bus asynchronously
let channel = self.clone();
let bus = bus.clone();
@ -726,6 +933,7 @@ impl FeishuChannel {
.as_millis() as i64,
media: parsed.media.map(|m| vec![m]).unwrap_or_default(),
metadata: std::collections::HashMap::new(),
forwarded_metadata,
};
if let Err(e) = channel.handle_and_publish(&bus, &msg).await {
tracing::error!(error = %e, open_id = %parsed.open_id, chat_id = %parsed.chat_id, "Failed to publish Feishu message to bus");
@ -743,6 +951,7 @@ impl FeishuChannel {
}
}
Some(Ok(tokio_tungstenite::tungstenite::Message::Ping(data))) => {
last_recv = Instant::now();
let pong = PbFrame {
seq_id: seq.wrapping_add(1),
log_id: 0,
@ -756,6 +965,9 @@ impl FeishuChannel {
};
let _ = write.send(tokio_tungstenite::tungstenite::Message::Binary(pong.encode_to_vec().into())).await;
}
Some(Ok(tokio_tungstenite::tungstenite::Message::Pong(_))) => {
last_recv = Instant::now();
}
Some(Ok(tokio_tungstenite::tungstenite::Message::Close(_))) | None => {
#[cfg(debug_assertions)]
tracing::debug!("Feishu WebSocket closed");
@ -786,6 +998,16 @@ impl FeishuChannel {
break;
}
}
_ = timeout_check.tick() => {
if last_recv.elapsed() > WS_HEARTBEAT_TIMEOUT {
tracing::warn!("Feishu WebSocket heartbeat timeout, reconnecting");
break;
}
// GC dedup cache: remove entries older than TTL (matches zeroclaw pattern)
let now = Instant::now();
let mut seen = self.seen_message_ids.write().await;
seen.retain(|_, ts| now.duration_since(*ts) < DEDUP_CACHE_TTL);
}
_ = shutdown_rx.recv() => {
tracing::info!("Feishu channel shutdown signal received");
break;
@ -906,11 +1128,14 @@ impl Channel for FeishuChannel {
// If no media, send text only
if msg.media.is_empty() {
return self.send_message_to_feishu(receive_id, receive_id_type, &msg.content).await;
let result = self.send_message_to_feishu(receive_id, receive_id_type, &msg.content).await;
// Remove pending reaction after sending (using metadata propagated from inbound)
self.remove_reaction_from_metadata(&msg.metadata).await;
return result;
}
// Handle multimodal message - send with media
let token = self.get_tenant_token().await?;
let token = self.get_tenant_access_token().await?;
// Build content with media references
let mut content_parts = Vec::new();
@ -967,7 +1192,10 @@ impl Channel for FeishuChannel {
// If no content parts after processing, just send empty text
if content_parts.is_empty() {
return self.send_message_to_feishu(receive_id, receive_id_type, "").await;
let result = self.send_message_to_feishu(receive_id, receive_id_type, "").await;
// Remove pending reaction after sending (using metadata propagated from inbound)
self.remove_reaction_from_metadata(&msg.metadata).await;
return result;
}
// Determine message type
@ -1008,6 +1236,9 @@ impl Channel for FeishuChannel {
return Err(ChannelError::Other(format!("Send multimodal message failed: code={} msg={}", send_resp.code, send_resp.msg)));
}
// Remove pending reaction after successfully sending
self.remove_reaction_from_metadata(&msg.metadata).await;
Ok(())
}
}

View File

@ -30,6 +30,9 @@ pub struct FeishuChannelConfig {
pub agent: String,
#[serde(default = "default_media_dir")]
pub media_dir: String,
/// Emoji type for message reactions (e.g. "THUMBSUP", "OK", "EYES").
#[serde(default = "default_reaction_emoji")]
pub reaction_emoji: String,
}
fn default_allow_from() -> Vec<String> {
@ -41,6 +44,10 @@ fn default_media_dir() -> String {
home.join(".picobot/media/feishu").to_string_lossy().to_string()
}
fn default_reaction_emoji() -> String {
"THUMBSUP".to_string()
}
#[derive(Debug, Clone, Deserialize, Serialize)]
pub struct ProviderConfig {
#[serde(rename = "type")]

View File

@ -79,13 +79,16 @@ impl GatewayState {
inbound.media,
).await {
Ok(response_content) => {
// Forward channel-specific metadata from inbound to outbound.
// This allows channels to propagate context (e.g. feishu message_id for reaction cleanup)
// without gateway needing channel-specific code.
let outbound = crate::bus::OutboundMessage {
channel: inbound.channel,
chat_id: inbound.chat_id,
channel: inbound.channel.clone(),
chat_id: inbound.chat_id.clone(),
content: response_content,
reply_to: None,
media: vec![],
metadata: std::collections::HashMap::new(),
metadata: inbound.forwarded_metadata,
};
if let Err(e) = bus_for_inbound.publish_outbound(outbound).await {
tracing::error!(error = %e, "Failed to publish outbound");