diff --git a/src/gateway/execution.rs b/src/gateway/execution.rs index 7597b75..408e636 100644 --- a/src/gateway/execution.rs +++ b/src/gateway/execution.rs @@ -1,16 +1,26 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::agent::{AgentError, AgentProcessResult, EmittedMessageHandler, SystemPromptContext}; +use async_trait::async_trait; +use crate::agent::{AgentError, AgentProcessResult, EmittedMessageHandler, PersistingEmittedMessageHandler, SystemPromptContext}; use crate::bus::message::ToolMessageState; use crate::bus::{ChatMessage, MediaItem, OutboundMessage, SYSTEM_CONTEXT_SCHEDULED_PROMPT}; use crate::config::LLMProviderConfig; +use crate::storage::ConversationRepository; use tokio::sync::Mutex; use super::compaction::schedule_background_history_compaction; use super::message_prepare::enrich_user_content_with_media_refs; use super::session::Session; +/// 空的 EmittedMessageHandler,不转发消息,仅配合 PersistingEmittedMessageHandler 做持久化。 +struct NoOpEmittedMessageHandler; + +#[async_trait] +impl EmittedMessageHandler for NoOpEmittedMessageHandler { + async fn handle(&self, _message: ChatMessage) {} +} + const SCHEDULED_TASK_EXECUTION_SYSTEM_PROMPT: &str = "系统说明:当前输入来自一次已经触发的定时任务执行。你现在需要执行任务内容本身,而不是创建、修改、恢复、暂停或查询新的定时任务。除非当前任务内容明确要求管理调度器,否则不要调用任何定时任务管理工具;像“每小时”、“每天”、“cron”、“定时”等词,只应视为任务背景,不应再解释为新的建任务请求。"; pub(crate) fn compose_scheduled_task_system_prompt(system_prompt: Option<&str>) -> String { @@ -269,7 +279,7 @@ impl AgentExecutionService { &self, request: ScheduledExecutionRequest<'_>, ) -> Result, AgentError> { - let (history, agent, user_message, user_message_count, original_topic_id) = { + let (history, mut agent, user_message, user_message_count, original_topic_id, store, session_id) = { let mut session_guard = request.session.lock().await; session_guard.ensure_persistent_session(request.chat_id)?; @@ -311,9 +321,27 @@ impl AgentExecutionService { request.provider_config.clone(), )?; - (history, agent, user_message, user_message_count, original_topic_id) + // 获取 store 和 session_id,用于构造消息持久化 handler + let store = session_guard.store(); + let session_id = crate::storage::persistent_session_id( + request.channel_name, + request.chat_id, + ); + + (history, agent, user_message, user_message_count, original_topic_id, store, session_id) }; + // 定时任务没有 live_emitter,需要 PersistingEmittedMessageHandler 来持久化消息 + { + let persisting_handler = PersistingEmittedMessageHandler::new( + NoOpEmittedMessageHandler, + store as Arc, + &session_id, + None, + ); + agent = agent.with_emitted_message_handler(Arc::new(persisting_handler)); + } + // 构建系统提示词上下文 let system_prompt_context = SystemPromptContext { session_id: Some(format!("{}:{}", request.channel_name, request.chat_id)), diff --git a/src/gateway/ws.rs b/src/gateway/ws.rs index fcc4465..73234bf 100644 --- a/src/gateway/ws.rs +++ b/src/gateway/ws.rs @@ -491,11 +491,17 @@ async fn handle_inbound( let load_chat_channel = response.metadata.get("load_chat_channel") .cloned() .unwrap_or_default(); - if let Err(e) = send_task_messages(&store, load_chat_id, sender).await { + // session_id = "{channel}:{chat_id}" (cli channel 例外) + let session_id = crate::storage::persistent_session_id( + &load_chat_channel, + load_chat_id, + ); + if let Err(e) = send_task_messages(&store, &session_id, sender).await { tracing::warn!( error = %e, channel = %load_chat_channel, chat_id = %load_chat_id, + session_id = %session_id, "Failed to send chat messages" ); }