feat: 添加持久化消息处理程序,增强定时任务消息的持久化能力

This commit is contained in:
oudecheng 2026-06-02 18:42:50 +08:00
parent 025c355c7d
commit ea6fabe41d
2 changed files with 38 additions and 4 deletions

View File

@ -1,16 +1,26 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use crate::agent::{AgentError, AgentProcessResult, EmittedMessageHandler, SystemPromptContext}; use async_trait::async_trait;
use crate::agent::{AgentError, AgentProcessResult, EmittedMessageHandler, PersistingEmittedMessageHandler, SystemPromptContext};
use crate::bus::message::ToolMessageState; use crate::bus::message::ToolMessageState;
use crate::bus::{ChatMessage, MediaItem, OutboundMessage, SYSTEM_CONTEXT_SCHEDULED_PROMPT}; use crate::bus::{ChatMessage, MediaItem, OutboundMessage, SYSTEM_CONTEXT_SCHEDULED_PROMPT};
use crate::config::LLMProviderConfig; use crate::config::LLMProviderConfig;
use crate::storage::ConversationRepository;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use super::compaction::schedule_background_history_compaction; use super::compaction::schedule_background_history_compaction;
use super::message_prepare::enrich_user_content_with_media_refs; use super::message_prepare::enrich_user_content_with_media_refs;
use super::session::Session; use super::session::Session;
/// 空的 EmittedMessageHandler不转发消息仅配合 PersistingEmittedMessageHandler 做持久化。
struct NoOpEmittedMessageHandler;
#[async_trait]
impl EmittedMessageHandler for NoOpEmittedMessageHandler {
async fn handle(&self, _message: ChatMessage) {}
}
const SCHEDULED_TASK_EXECUTION_SYSTEM_PROMPT: &str = "系统说明当前输入来自一次已经触发的定时任务执行。你现在需要执行任务内容本身而不是创建、修改、恢复、暂停或查询新的定时任务。除非当前任务内容明确要求管理调度器否则不要调用任何定时任务管理工具像“每小时”、“每天”、“cron”、“定时”等词只应视为任务背景不应再解释为新的建任务请求。"; const SCHEDULED_TASK_EXECUTION_SYSTEM_PROMPT: &str = "系统说明当前输入来自一次已经触发的定时任务执行。你现在需要执行任务内容本身而不是创建、修改、恢复、暂停或查询新的定时任务。除非当前任务内容明确要求管理调度器否则不要调用任何定时任务管理工具像“每小时”、“每天”、“cron”、“定时”等词只应视为任务背景不应再解释为新的建任务请求。";
pub(crate) fn compose_scheduled_task_system_prompt(system_prompt: Option<&str>) -> String { pub(crate) fn compose_scheduled_task_system_prompt(system_prompt: Option<&str>) -> String {
@ -269,7 +279,7 @@ impl AgentExecutionService {
&self, &self,
request: ScheduledExecutionRequest<'_>, request: ScheduledExecutionRequest<'_>,
) -> Result<Vec<OutboundMessage>, AgentError> { ) -> Result<Vec<OutboundMessage>, AgentError> {
let (history, agent, user_message, user_message_count, original_topic_id) = { let (history, mut agent, user_message, user_message_count, original_topic_id, store, session_id) = {
let mut session_guard = request.session.lock().await; let mut session_guard = request.session.lock().await;
session_guard.ensure_persistent_session(request.chat_id)?; session_guard.ensure_persistent_session(request.chat_id)?;
@ -311,9 +321,27 @@ impl AgentExecutionService {
request.provider_config.clone(), request.provider_config.clone(),
)?; )?;
(history, agent, user_message, user_message_count, original_topic_id) // 获取 store 和 session_id用于构造消息持久化 handler
let store = session_guard.store();
let session_id = crate::storage::persistent_session_id(
request.channel_name,
request.chat_id,
);
(history, agent, user_message, user_message_count, original_topic_id, store, session_id)
}; };
// 定时任务没有 live_emitter需要 PersistingEmittedMessageHandler 来持久化消息
{
let persisting_handler = PersistingEmittedMessageHandler::new(
NoOpEmittedMessageHandler,
store as Arc<dyn ConversationRepository>,
&session_id,
None,
);
agent = agent.with_emitted_message_handler(Arc::new(persisting_handler));
}
// 构建系统提示词上下文 // 构建系统提示词上下文
let system_prompt_context = SystemPromptContext { let system_prompt_context = SystemPromptContext {
session_id: Some(format!("{}:{}", request.channel_name, request.chat_id)), session_id: Some(format!("{}:{}", request.channel_name, request.chat_id)),

View File

@ -491,11 +491,17 @@ async fn handle_inbound(
let load_chat_channel = response.metadata.get("load_chat_channel") let load_chat_channel = response.metadata.get("load_chat_channel")
.cloned() .cloned()
.unwrap_or_default(); .unwrap_or_default();
if let Err(e) = send_task_messages(&store, load_chat_id, sender).await { // session_id = "{channel}:{chat_id}" (cli channel 例外)
let session_id = crate::storage::persistent_session_id(
&load_chat_channel,
load_chat_id,
);
if let Err(e) = send_task_messages(&store, &session_id, sender).await {
tracing::warn!( tracing::warn!(
error = %e, error = %e,
channel = %load_chat_channel, channel = %load_chat_channel,
chat_id = %load_chat_id, chat_id = %load_chat_id,
session_id = %session_id,
"Failed to send chat messages" "Failed to send chat messages"
); );
} }