feat: 添加会话消息发送工具,支持文本和附件的发送,优化消息发送逻辑

This commit is contained in:
ooodc 2026-05-02 09:15:36 +08:00
parent 531e72d24f
commit 260266b90f
8 changed files with 569 additions and 129 deletions

View File

@ -523,7 +523,7 @@ impl FeishuChannel {
let resp = self let resp = self
.http_client .http_client
.post(format!("{}/im/v1/images/upload", FEISHU_API_BASE)) .post(format!("{}/im/v1/images", FEISHU_API_BASE))
.header("Authorization", format!("Bearer {}", token)) .header("Authorization", format!("Bearer {}", token))
.multipart(form) .multipart(form)
.send() .send()
@ -543,10 +543,18 @@ impl FeishuChannel {
image_key: String, image_key: String,
} }
let result: UploadResp = resp let status = resp.status();
.json() let body = resp.text().await.map_err(|e| {
.await ChannelError::Other(format!("Read upload image response error: {}", e))
.map_err(|e| ChannelError::Other(format!("Parse upload response error: {}", e)))?; })?;
let result: UploadResp = serde_json::from_str(&body).map_err(|e| {
ChannelError::Other(format!(
"Parse upload image response error: {} (status={}, body={})",
e,
status,
truncate_with_ellipsis(&body, 500)
))
})?;
if result.code != 0 { if result.code != 0 {
return Err(ChannelError::Other(format!( return Err(ChannelError::Other(format!(
@ -578,10 +586,10 @@ impl FeishuChannel {
.to_lowercase(); .to_lowercase();
let file_type = match extension.as_str() { let file_type = match extension.as_str() {
"mp3" | "m4a" | "wav" | "ogg" => "audio", "mp3" | "m4a" | "wav" | "ogg" | "opus" => "opus",
"mp4" | "mov" | "avi" | "mkv" => "video", "mp4" | "mov" | "avi" | "mkv" => "video",
"pdf" | "doc" | "docx" | "xls" | "xlsx" => "doc", "pdf" | "doc" | "docx" | "xls" | "xlsx" => "doc",
_ => "file", _ => "stream",
}; };
let file_data = tokio::fs::read(file_path) let file_data = tokio::fs::read(file_path)
@ -618,10 +626,18 @@ impl FeishuChannel {
file_key: String, file_key: String,
} }
let result: UploadResp = resp let status = resp.status();
.json() let body = resp.text().await.map_err(|e| {
.await ChannelError::Other(format!("Read upload file response error: {}", e))
.map_err(|e| ChannelError::Other(format!("Parse upload response error: {}", e)))?; })?;
let result: UploadResp = serde_json::from_str(&body).map_err(|e| {
ChannelError::Other(format!(
"Parse upload file response error: {} (status={}, body={})",
e,
status,
truncate_with_ellipsis(&body, 500)
))
})?;
if result.code != 0 { if result.code != 0 {
return Err(ChannelError::Other(format!( return Err(ChannelError::Other(format!(
@ -2348,13 +2364,17 @@ impl Channel for FeishuChannel {
"open_id" "open_id"
}; };
let remove_reaction = async {
self.remove_reaction_from_metadata(&msg.metadata).await;
};
// If no media, use smart format detection // If no media, use smart format detection
if msg.media.is_empty() { if msg.media.is_empty() {
let content = msg.content.trim(); let content = msg.content.trim();
// Empty content // Empty content
if content.is_empty() { if content.is_empty() {
self.remove_reaction_from_metadata(&msg.metadata).await; remove_reaction.await;
return Ok(()); return Ok(());
} }
@ -2366,7 +2386,7 @@ impl Channel for FeishuChannel {
let result = self let result = self
.send_message_to_feishu(receive_id, receive_id_type, "text", content) .send_message_to_feishu(receive_id, receive_id_type, "text", content)
.await; .await;
self.remove_reaction_from_metadata(&msg.metadata).await; remove_reaction.await;
return result; return result;
} }
MsgFormat::Post => { MsgFormat::Post => {
@ -2375,7 +2395,7 @@ impl Channel for FeishuChannel {
let result = self let result = self
.send_message_to_feishu(receive_id, receive_id_type, "post", &post_body) .send_message_to_feishu(receive_id, receive_id_type, "post", &post_body)
.await; .await;
self.remove_reaction_from_metadata(&msg.metadata).await; remove_reaction.await;
return result; return result;
} }
MsgFormat::Interactive => { MsgFormat::Interactive => {
@ -2400,134 +2420,68 @@ impl Channel for FeishuChannel {
content, content,
) )
.await; .await;
self.remove_reaction_from_metadata(&msg.metadata).await; remove_reaction.await;
return result; return result;
} }
} }
self.remove_reaction_from_metadata(&msg.metadata).await; remove_reaction.await;
return Ok(()); return Ok(());
} }
} }
} }
// Handle multimodal message - send with media if !msg.content.trim().is_empty() {
let token = self.get_tenant_access_token().await?; self.send_message_to_feishu(receive_id, receive_id_type, "text", msg.content.trim())
.await?;
// Build content with media references
let mut content_parts = Vec::new();
// Add text content if present (truncate if too long for Feishu)
if !msg.content.is_empty() {
const MAX_TEXT_LENGTH: usize = 60_000;
let truncated_text = if msg.content.len() > MAX_TEXT_LENGTH {
format!(
"{}...\n\n[Content truncated due to length limit]",
&msg.content[..MAX_TEXT_LENGTH]
)
} else {
msg.content.clone()
};
content_parts.push(serde_json::json!({
"tag": "text",
"text": truncated_text
}));
} }
// Upload and add media let mut sent_media = 0usize;
for media_item in &msg.media { for media_item in &msg.media {
let path = &media_item.path; let path = &media_item.path;
match media_item.media_type.as_str() { let result = match media_item.media_type.as_str() {
"image" => match self.upload_image(path).await { "image" => {
Ok(image_key) => { let image_key = self.upload_image(path).await?;
content_parts.push(serde_json::json!({ self.send_message_to_feishu(
"tag": "image", receive_id,
"image_key": image_key receive_id_type,
})); "image",
} &serde_json::json!({ "image_key": image_key }).to_string(),
Err(e) => { )
tracing::warn!(error = %e, path = %path, "Failed to upload image"); .await
} }
}, "audio" | "file" | "video" => {
"audio" | "file" | "video" => match self.upload_file(path).await { let file_key = self.upload_file(path).await?;
Ok(file_key) => { self.send_message_to_feishu(
content_parts.push(serde_json::json!({ receive_id,
"tag": "file", receive_id_type,
"file_key": file_key "file",
})); &serde_json::json!({ "file_key": file_key }).to_string(),
} )
Err(e) => { .await
tracing::warn!(error = %e, path = %path, "Failed to upload file"); }
}
},
_ => { _ => {
tracing::warn!(media_type = %media_item.media_type, "Unsupported media type for sending"); tracing::warn!(media_type = %media_item.media_type, "Unsupported media type for sending");
continue;
}
};
match result {
Ok(()) => sent_media += 1,
Err(error) => {
tracing::warn!(error = %error, path = %path, media_type = %media_item.media_type, "Failed to send media message to Feishu");
return Err(error);
} }
} }
} }
// If no content parts after processing, just send empty text if msg.content.trim().is_empty() && sent_media == 0 {
if content_parts.is_empty() { remove_reaction.await;
let result = self return Err(ChannelError::Other(
.send_message_to_feishu(receive_id, receive_id_type, "text", "") "No supported media items were sent to Feishu".to_string(),
.await; ));
// Remove pending reaction after sending (using metadata propagated from inbound)
self.remove_reaction_from_metadata(&msg.metadata).await;
return result;
} }
// Determine message type remove_reaction.await;
let has_image = msg.media.iter().any(|m| m.media_type == "image");
let msg_type = if has_image && msg.content.is_empty() {
"image"
} else {
"post"
};
let content = serde_json::json!({
"content": content_parts
})
.to_string();
let resp = self
.http_client
.post(format!(
"{}/im/v1/messages?receive_id_type={}",
FEISHU_API_BASE, receive_id_type
))
.header("Content-Type", "application/json")
.header("Authorization", format!("Bearer {}", token))
.json(&serde_json::json!({
"receive_id": receive_id,
"msg_type": msg_type,
"content": content
}))
.send()
.await
.map_err(|e| {
ChannelError::ConnectionError(format!("Send multimodal message HTTP error: {}", e))
})?;
#[derive(Deserialize)]
struct SendResp {
code: i32,
msg: String,
}
let send_resp: SendResp = resp
.json()
.await
.map_err(|e| ChannelError::Other(format!("Parse send response error: {}", e)))?;
if send_resp.code != 0 {
return Err(ChannelError::Other(format!(
"Send multimodal message failed: code={} msg={}",
send_resp.code, send_resp.msg
)));
}
// Remove pending reaction after successfully sending
self.remove_reaction_from_metadata(&msg.metadata).await;
Ok(()) Ok(())
} }
} }

View File

@ -19,6 +19,7 @@ pub mod session;
pub mod session_factory; pub mod session_factory;
pub mod session_history; pub mod session_history;
pub mod session_lifecycle; pub mod session_lifecycle;
pub mod session_message_sender;
pub mod session_message_service; pub mod session_message_service;
pub mod session_pool; pub mod session_pool;
pub mod tool_registry_factory; pub mod tool_registry_factory;
@ -39,7 +40,8 @@ use crate::skills::SkillRuntime;
use agent_task_executor::{AgentTaskExecutor, SchedulerMaintenanceService}; use agent_task_executor::{AgentTaskExecutor, SchedulerMaintenanceService};
use outbound_dispatcher::OutboundDispatcher; use outbound_dispatcher::OutboundDispatcher;
use processor::InboundProcessor; use processor::InboundProcessor;
use runtime::build_session_manager; use runtime::build_session_manager_with_sender;
use session_message_sender::BusSessionMessageSender;
use session::SessionManager; use session::SessionManager;
pub struct GatewayState { pub struct GatewayState {
@ -64,8 +66,10 @@ impl GatewayState {
let show_tool_results = config.gateway.show_tool_results; let show_tool_results = config.gateway.show_tool_results;
let skills = Arc::new(SkillRuntime::from_config(config.skills.clone())); let skills = Arc::new(SkillRuntime::from_config(config.skills.clone()));
let channel_manager = ChannelManager::new();
let bus = channel_manager.bus();
let session_manager = build_session_manager( let session_manager = build_session_manager_with_sender(
session_ttl_hours, session_ttl_hours,
agent_prompt_reinject_every, agent_prompt_reinject_every,
show_tool_results, show_tool_results,
@ -73,9 +77,8 @@ impl GatewayState {
provider_config, provider_config,
provider_configs, provider_configs,
skills, skills,
Arc::new(BusSessionMessageSender::new(bus.clone())),
)?; )?;
let channel_manager = ChannelManager::new();
let bus = channel_manager.bus();
Ok(Self { Ok(Self {
config, config,

View File

@ -8,7 +8,7 @@ use crate::storage::{
ConversationRepository, MemoryRepository, PromptInjectionRepository, SchedulerJobRepository, ConversationRepository, MemoryRepository, PromptInjectionRepository, SchedulerJobRepository,
SessionStore, SkillEventRepository, SessionStore, SkillEventRepository,
}; };
use crate::tools::ToolRegistry; use crate::tools::{NoopSessionMessageSender, SessionMessageSender, ToolRegistry};
use super::agent_factory::AgentFactory; use super::agent_factory::AgentFactory;
use super::cli_session::CliSessionService; use super::cli_session::CliSessionService;
@ -30,6 +30,28 @@ pub(crate) fn build_session_manager(
provider_config: LLMProviderConfig, provider_config: LLMProviderConfig,
provider_configs: HashMap<String, LLMProviderConfig>, provider_configs: HashMap<String, LLMProviderConfig>,
skills: Arc<SkillRuntime>, skills: Arc<SkillRuntime>,
) -> Result<SessionManager, AgentError> {
build_session_manager_with_sender(
session_ttl_hours,
agent_prompt_reinject_every,
show_tool_results,
default_timezone,
provider_config,
provider_configs,
skills,
Arc::new(NoopSessionMessageSender),
)
}
pub(crate) fn build_session_manager_with_sender(
session_ttl_hours: u64,
agent_prompt_reinject_every: u64,
show_tool_results: bool,
default_timezone: String,
provider_config: LLMProviderConfig,
provider_configs: HashMap<String, LLMProviderConfig>,
skills: Arc<SkillRuntime>,
session_message_sender: Arc<dyn SessionMessageSender>,
) -> Result<SessionManager, AgentError> { ) -> Result<SessionManager, AgentError> {
let store = Arc::new( let store = Arc::new(
SessionStore::new() SessionStore::new()
@ -53,6 +75,7 @@ pub(crate) fn build_session_manager(
memories, memories,
scheduler_jobs, scheduler_jobs,
skill_events.clone(), skill_events.clone(),
session_message_sender,
known_agents, known_agents,
default_timezone, default_timezone,
) )

View File

@ -496,6 +496,7 @@ mod tests {
use crate::bus::MessageBus; use crate::bus::MessageBus;
use crate::gateway::tool_registry_factory::ToolRegistryFactory; use crate::gateway::tool_registry_factory::ToolRegistryFactory;
use crate::storage::MemoryRecord; use crate::storage::MemoryRecord;
use crate::tools::NoopSessionMessageSender;
use axum::http::StatusCode; use axum::http::StatusCode;
use axum::{Json, Router, routing::post}; use axum::{Json, Router, routing::post};
use serde_json::{Value, json}; use serde_json::{Value, json};
@ -537,6 +538,7 @@ mod tests {
store.clone(), store.clone(),
store.clone(), store.clone(),
store.clone(), store.clone(),
Arc::new(NoopSessionMessageSender),
HashSet::new(), HashSet::new(),
"Asia/Shanghai".to_string(), "Asia/Shanghai".to_string(),
) )
@ -581,6 +583,7 @@ mod tests {
store.clone(), store.clone(),
store.clone(), store.clone(),
store.clone(), store.clone(),
Arc::new(NoopSessionMessageSender),
HashSet::new(), HashSet::new(),
"Asia/Shanghai".to_string(), "Asia/Shanghai".to_string(),
) )
@ -1483,6 +1486,7 @@ mod tests {
store.clone(), store.clone(),
store.clone(), store.clone(),
store.clone(), store.clone(),
Arc::new(NoopSessionMessageSender),
HashSet::new(), HashSet::new(),
"Asia/Shanghai".to_string(), "Asia/Shanghai".to_string(),
) )
@ -1520,6 +1524,7 @@ mod tests {
store.clone(), store.clone(),
store.clone(), store.clone(),
store.clone(), store.clone(),
Arc::new(NoopSessionMessageSender),
HashSet::new(), HashSet::new(),
"Asia/Shanghai".to_string(), "Asia/Shanghai".to_string(),
) )
@ -1585,6 +1590,7 @@ mod tests {
store.clone(), store.clone(),
store.clone(), store.clone(),
store.clone(), store.clone(),
Arc::new(NoopSessionMessageSender),
HashSet::new(), HashSet::new(),
"Asia/Shanghai".to_string(), "Asia/Shanghai".to_string(),
) )
@ -1632,6 +1638,7 @@ mod tests {
store.clone(), store.clone(),
store.clone(), store.clone(),
store, store,
Arc::new(NoopSessionMessageSender),
HashSet::new(), HashSet::new(),
"Asia/Shanghai".to_string(), "Asia/Shanghai".to_string(),
) )

View File

@ -0,0 +1,123 @@
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use crate::bus::{MessageBus, OutboundMessage};
use crate::tools::{SessionMessageSender, SessionSendOutcome, SessionSendRequest, ToolContext};
pub(crate) struct BusSessionMessageSender {
bus: Arc<MessageBus>,
}
impl BusSessionMessageSender {
pub(crate) fn new(bus: Arc<MessageBus>) -> Self {
Self { bus }
}
}
#[async_trait]
impl SessionMessageSender for BusSessionMessageSender {
async fn send_to_current_session(
&self,
context: &ToolContext,
request: SessionSendRequest,
) -> anyhow::Result<SessionSendOutcome> {
let channel_name = context
.channel_name
.as_deref()
.ok_or_else(|| anyhow::anyhow!("missing channel_name in tool context"))?;
let chat_id = context
.chat_id
.as_deref()
.ok_or_else(|| anyhow::anyhow!("missing chat_id in tool context"))?;
let metadata = HashMap::new();
let mut published_messages = 0;
let text_sent = request
.text
.as_deref()
.map(str::trim)
.filter(|text| !text.is_empty())
.is_some();
if let Some(text) = request.text.filter(|value| !value.trim().is_empty()) {
self.bus
.publish_outbound(OutboundMessage::assistant(
channel_name.to_string(),
chat_id.to_string(),
text,
None,
metadata.clone(),
))
.await?;
published_messages += 1;
}
let attachment_count = request.attachments.len();
for attachment in request.attachments {
let mut outbound = OutboundMessage::assistant(
channel_name.to_string(),
chat_id.to_string(),
String::new(),
None,
metadata.clone(),
);
outbound.media = vec![attachment];
self.bus.publish_outbound(outbound).await?;
published_messages += 1;
}
Ok(SessionSendOutcome {
published_messages,
text_sent,
attachment_count,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bus::MediaItem;
#[tokio::test]
async fn bus_sender_publishes_text_then_attachment() {
let bus = MessageBus::new(8);
let sender = BusSessionMessageSender::new(bus.clone());
let context = ToolContext {
channel_name: Some("feishu".to_string()),
chat_id: Some("chat-1".to_string()),
..ToolContext::default()
};
let outcome = sender
.send_to_current_session(
&context,
SessionSendRequest {
text: Some("hello".to_string()),
attachments: vec![MediaItem::new("/tmp/demo.png", "image")],
},
)
.await
.unwrap();
assert_eq!(
outcome,
SessionSendOutcome {
published_messages: 2,
text_sent: true,
attachment_count: 1,
}
);
let first = bus.consume_outbound().await;
assert_eq!(first.content, "hello");
assert!(first.media.is_empty());
let second = bus.consume_outbound().await;
assert_eq!(second.content, "");
assert_eq!(second.media.len(), 1);
assert_eq!(second.media[0].media_type, "image");
}
}

View File

@ -5,8 +5,9 @@ use crate::skills::SkillRuntime;
use crate::storage::{MemoryRepository, SchedulerJobRepository, SkillEventRepository}; use crate::storage::{MemoryRepository, SchedulerJobRepository, SkillEventRepository};
use crate::tools::{ use crate::tools::{
BashTool, CalculatorTool, FileEditTool, FileReadTool, FileWriteTool, HttpRequestTool, BashTool, CalculatorTool, FileEditTool, FileReadTool, FileWriteTool, HttpRequestTool,
MemoryManageTool, MemorySearchTool, SchedulerManageTool, SkillActivateTool, SkillListTool, MemoryManageTool, MemorySearchTool, SchedulerManageTool, SessionMessageSender,
SkillManageTool, TimeTool, ToolRegistry, WebFetchTool, SessionSendTool, SkillActivateTool, SkillListTool, SkillManageTool, TimeTool, ToolRegistry,
WebFetchTool,
}; };
pub(crate) struct ToolRegistryFactory { pub(crate) struct ToolRegistryFactory {
@ -14,6 +15,7 @@ pub(crate) struct ToolRegistryFactory {
memories: Arc<dyn MemoryRepository>, memories: Arc<dyn MemoryRepository>,
scheduler_jobs: Arc<dyn SchedulerJobRepository>, scheduler_jobs: Arc<dyn SchedulerJobRepository>,
skill_events: Arc<dyn SkillEventRepository>, skill_events: Arc<dyn SkillEventRepository>,
session_message_sender: Arc<dyn SessionMessageSender>,
known_agents: HashSet<String>, known_agents: HashSet<String>,
default_timezone: String, default_timezone: String,
} }
@ -24,6 +26,7 @@ impl ToolRegistryFactory {
memories: Arc<dyn MemoryRepository>, memories: Arc<dyn MemoryRepository>,
scheduler_jobs: Arc<dyn SchedulerJobRepository>, scheduler_jobs: Arc<dyn SchedulerJobRepository>,
skill_events: Arc<dyn SkillEventRepository>, skill_events: Arc<dyn SkillEventRepository>,
session_message_sender: Arc<dyn SessionMessageSender>,
known_agents: HashSet<String>, known_agents: HashSet<String>,
default_timezone: String, default_timezone: String,
) -> Self { ) -> Self {
@ -32,6 +35,7 @@ impl ToolRegistryFactory {
memories, memories,
scheduler_jobs, scheduler_jobs,
skill_events, skill_events,
session_message_sender,
known_agents, known_agents,
default_timezone, default_timezone,
} }
@ -46,6 +50,7 @@ impl ToolRegistryFactory {
registry.register(FileEditTool::new()); registry.register(FileEditTool::new());
registry.register(MemorySearchTool::new(self.memories.clone())); registry.register(MemorySearchTool::new(self.memories.clone()));
registry.register(MemoryManageTool::new(self.memories.clone())); registry.register(MemoryManageTool::new(self.memories.clone()));
registry.register(SessionSendTool::new(self.session_message_sender.clone()));
registry.register(SchedulerManageTool::new( registry.register(SchedulerManageTool::new(
self.scheduler_jobs.clone(), self.scheduler_jobs.clone(),
self.known_agents.clone(), self.known_agents.clone(),

View File

@ -8,6 +8,7 @@ pub mod memory_manage;
pub mod memory_search; pub mod memory_search;
pub mod registry; pub mod registry;
pub mod scheduler_manage; pub mod scheduler_manage;
pub mod session_send;
pub mod schema; pub mod schema;
pub mod skill_activate; pub mod skill_activate;
pub mod skill_manage; pub mod skill_manage;
@ -25,6 +26,10 @@ pub use memory_manage::MemoryManageTool;
pub use memory_search::MemorySearchTool; pub use memory_search::MemorySearchTool;
pub use registry::ToolRegistry; pub use registry::ToolRegistry;
pub use scheduler_manage::SchedulerManageTool; pub use scheduler_manage::SchedulerManageTool;
pub use session_send::{
NoopSessionMessageSender, SessionMessageSender, SessionSendOutcome, SessionSendRequest,
SessionSendTool,
};
pub use schema::{CleaningStrategy, SchemaCleanr}; pub use schema::{CleaningStrategy, SchemaCleanr};
pub use skill_activate::SkillActivateTool; pub use skill_activate::SkillActivateTool;
pub use skill_manage::{SkillListTool, SkillManageTool}; pub use skill_manage::{SkillListTool, SkillManageTool};

320
src/tools/session_send.rs Normal file
View File

@ -0,0 +1,320 @@
use std::path::Path;
use std::sync::Arc;
use anyhow::anyhow;
use async_trait::async_trait;
use serde_json::json;
use crate::bus::MediaItem;
use super::traits::{Tool, ToolContext, ToolResult};
#[derive(Debug, Clone)]
pub struct SessionSendRequest {
pub text: Option<String>,
pub attachments: Vec<MediaItem>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct SessionSendOutcome {
pub published_messages: usize,
pub text_sent: bool,
pub attachment_count: usize,
}
#[async_trait]
pub trait SessionMessageSender: Send + Sync + 'static {
async fn send_to_current_session(
&self,
context: &ToolContext,
request: SessionSendRequest,
) -> anyhow::Result<SessionSendOutcome>;
}
pub struct NoopSessionMessageSender;
#[async_trait]
impl SessionMessageSender for NoopSessionMessageSender {
async fn send_to_current_session(
&self,
_context: &ToolContext,
_request: SessionSendRequest,
) -> anyhow::Result<SessionSendOutcome> {
Err(anyhow!(
"session send tool is not configured with an outbound sender"
))
}
}
pub struct SessionSendTool {
sender: Arc<dyn SessionMessageSender>,
}
impl SessionSendTool {
pub fn new(sender: Arc<dyn SessionMessageSender>) -> Self {
Self { sender }
}
}
#[async_trait]
impl Tool for SessionSendTool {
fn name(&self) -> &str {
"send_session_message"
}
fn description(&self) -> &str {
"Send a message to the current conversation through the normal channel reply path. You can send a text-only message, one or more local file attachments, or a text message followed by attachments. Use this when you need to proactively deliver content back to the current user."
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"text": {
"type": "string",
"description": "Optional text to send to the current conversation. This tool can send a text-only message even when no attachments are provided."
},
"attachments": {
"type": "array",
"description": "Optional list of local file paths to send to the current conversation.",
"items": {
"type": "string",
"description": "Absolute or workspace-relative local file path"
}
}
},
"anyOf": [
{ "required": ["text"] },
{ "required": ["attachments"] }
]
})
}
async fn execute(&self, _args: serde_json::Value) -> anyhow::Result<ToolResult> {
Ok(error_result(
"send_session_message requires tool context for the current conversation",
))
}
async fn execute_with_context(
&self,
context: &ToolContext,
args: serde_json::Value,
) -> anyhow::Result<ToolResult> {
if let Err(err) = validate_context(context) {
return Ok(error_result(&err.to_string()));
}
let text = args
.get("text")
.and_then(|value| value.as_str())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned);
let attachments = match args.get("attachments") {
Some(value) => match parse_attachments(value) {
Ok(attachments) => attachments,
Err(err) => return Ok(error_result(&err.to_string())),
},
None => Vec::new(),
};
if text.is_none() && attachments.is_empty() {
return Ok(error_result(
"send_session_message requires non-empty text, attachments, or both",
));
}
let outcome = match self
.sender
.send_to_current_session(
context,
SessionSendRequest {
text,
attachments,
},
)
.await
{
Ok(outcome) => outcome,
Err(err) => return Ok(error_result(&err.to_string())),
};
Ok(ToolResult {
success: true,
output: format_success(outcome),
error: None,
})
}
}
fn validate_context(context: &ToolContext) -> anyhow::Result<()> {
if context.channel_name.as_deref().unwrap_or_default().is_empty() {
return Err(anyhow!(
"send_session_message requires channel_name in tool context"
));
}
if context.chat_id.as_deref().unwrap_or_default().is_empty() {
return Err(anyhow!(
"send_session_message requires chat_id in tool context"
));
}
Ok(())
}
fn parse_attachments(value: &serde_json::Value) -> anyhow::Result<Vec<MediaItem>> {
let attachment_paths = value
.as_array()
.ok_or_else(|| anyhow!("attachments must be an array of local file paths"))?;
let mut attachments = Vec::with_capacity(attachment_paths.len());
for path_value in attachment_paths {
let raw_path = path_value
.as_str()
.ok_or_else(|| anyhow!("attachments entries must be strings"))?
.trim();
if raw_path.is_empty() {
return Err(anyhow!("attachment paths must not be empty"));
}
let metadata = std::fs::metadata(raw_path)
.map_err(|err| anyhow!("failed to access attachment '{}': {}", raw_path, err))?;
if !metadata.is_file() {
return Err(anyhow!("attachment path is not a file: {}", raw_path));
}
if metadata.len() == 0 {
return Err(anyhow!("attachment file is empty: {}", raw_path));
}
let media_type = infer_media_type(raw_path);
let mut item = MediaItem::new(raw_path.to_string(), media_type);
item.mime_type = mime_guess::from_path(raw_path)
.first_raw()
.map(ToOwned::to_owned);
attachments.push(item);
}
Ok(attachments)
}
fn infer_media_type(path: &str) -> &'static str {
let mime = mime_guess::from_path(path).first_or_octet_stream();
if mime.essence_str().starts_with("image/") {
return "image";
}
match Path::new(path)
.extension()
.and_then(|ext| ext.to_str())
.map(|ext| ext.to_ascii_lowercase())
.as_deref()
{
Some("mp3") | Some("wav") | Some("ogg") | Some("m4a") | Some("opus") => "audio",
Some("mp4") | Some("mov") | Some("avi") | Some("mkv") => "video",
_ => "file",
}
}
fn format_success(outcome: SessionSendOutcome) -> String {
match (outcome.text_sent, outcome.attachment_count) {
(true, 0) => "Sent 1 text message to the current conversation.".to_string(),
(false, count) => format!("Sent {} attachment(s) to the current conversation.", count),
(true, count) => format!(
"Sent 1 text message and {} attachment(s) to the current conversation.",
count
),
}
}
fn error_result(message: &str) -> ToolResult {
ToolResult {
success: false,
output: String::new(),
error: Some(message.to_string()),
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::NamedTempFile;
struct MockSender {
outcome: SessionSendOutcome,
}
#[async_trait]
impl SessionMessageSender for MockSender {
async fn send_to_current_session(
&self,
_context: &ToolContext,
_request: SessionSendRequest,
) -> anyhow::Result<SessionSendOutcome> {
Ok(self.outcome)
}
}
fn context() -> ToolContext {
ToolContext {
channel_name: Some("feishu".to_string()),
chat_id: Some("chat-1".to_string()),
..ToolContext::default()
}
}
#[tokio::test]
async fn send_session_message_supports_text_only() {
let tool = SessionSendTool::new(Arc::new(MockSender {
outcome: SessionSendOutcome {
published_messages: 1,
text_sent: true,
attachment_count: 0,
},
}));
let result = tool
.execute_with_context(&context(), json!({ "text": "hello" }))
.await
.unwrap();
assert!(result.success);
assert_eq!(result.output, "Sent 1 text message to the current conversation.");
}
#[tokio::test]
async fn send_session_message_rejects_empty_request() {
let tool = SessionSendTool::new(Arc::new(MockSender {
outcome: SessionSendOutcome {
published_messages: 0,
text_sent: false,
attachment_count: 0,
},
}));
let result = tool
.execute_with_context(&context(), json!({}))
.await
.unwrap();
assert!(!result.success);
assert_eq!(
result.error.as_deref(),
Some("send_session_message requires non-empty text, attachments, or both")
);
}
#[test]
fn parse_attachments_infers_image_media_type() {
let file = NamedTempFile::new().unwrap();
std::fs::write(file.path(), b"demo").unwrap();
let image_path = file.path().with_extension("png");
std::fs::rename(file.path(), &image_path).unwrap();
let attachments = parse_attachments(&json!([image_path.to_string_lossy().to_string()]))
.unwrap();
assert_eq!(attachments.len(), 1);
assert_eq!(attachments[0].media_type, "image");
}
}