From 0757638c6f363c8c27568cdd4358a6a4c12d1ebb Mon Sep 17 00:00:00 2001 From: xiaoski Date: Tue, 5 May 2026 00:13:07 +0800 Subject: [PATCH] feat: add SessionManager::handle_cron_message for scheduled task execution --- src/session/session.rs | 102 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/src/session/session.rs b/src/session/session.rs index f724c2a..ebd7330 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -1259,6 +1259,108 @@ impl SessionManager { Ok(HandleResult::AgentResponse(response)) } + /// Handle a message triggered by a scheduled cron job. + /// + /// This is similar to `handle_message`, but the user message is created with + /// `SourceKind::ExternalTrigger` source metadata so that the cron job identity + /// is preserved in the conversation history and database. + pub async fn handle_cron_message( + &self, + channel: &str, + chat_id: &str, + prompt: &str, + job_id: &str, + job_name: &str, + ) -> Result { + use crate::bus::{MessageSource, SourceKind}; + + let unified_id = self.resolve_dialog_id(channel, chat_id).await?; + *self.current_source_session.lock().await = Some(unified_id.to_string()); + tracing::debug!(unified_id = %unified_id, job_id = %job_id, "handle_cron_message resolved"); + + let session = self.get_or_create_session(&unified_id).await?; + + let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel(); + + { + use std::collections::HashMap; + use crate::bus::OutboundMessage; + let bus = self.bus.clone(); + let ch = channel.to_string(); + let cid = chat_id.to_string(); + tokio::spawn(async move { + while let Some(notif) = notify_rx.recv().await { + let mut metadata = HashMap::new(); + metadata.insert("_type".to_string(), "notification".to_string()); + let outbound = OutboundMessage { + channel: ch.clone(), + chat_id: cid.clone(), + content: notif, + reply_to: None, + media: vec![], + metadata, + }; + let _ = bus.publish_outbound(outbound).await; + } + }); + } + + let response: String = { + let mut session_guard = session.lock().await; + + let source = MessageSource { + kind: SourceKind::ExternalTrigger, + from_channel: Some(channel.to_string()), + from_session: None, + from_user_id: None, + system_name: Some(job_name.to_string()), + task_id: Some(job_id.to_string()), + }; + let user_message = session_guard.create_user_message_with_source(prompt, vec![], source); + session_guard.add_message(user_message, true).await + .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; + + let mut history = session_guard.get_history().to_vec(); + + let skills_prompt = self.skills_loader.build_skills_prompt(); + let system_prompt = session_guard.build_system_prompt(&skills_prompt); + history.insert(0, ChatMessage::system(system_prompt)); + + let history = session_guard.compressor + .compress_if_needed(history) + .await?; + + let agent = session_guard.create_agent_with_notify(notify_tx)?; + let result = agent.process(history).await?; + + for msg in result.emitted_messages { + session_guard.add_message(msg, true).await + .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; + } + + if session_guard.should_generate_title() { + if let Err(e) = session_guard.generate_title().await { + tracing::warn!("failed to generate title: {}", e); + } + } + + result.final_response.content + }; + + #[cfg(debug_assertions)] + tracing::debug!( + channel = %channel, + chat_id = %chat_id, + job_id = %job_id, + response_len = %response.len(), + "Cron agent response received" + ); + + *self.current_source_session.lock().await = None; + + Ok(HandleResult::AgentResponse(response)) + } + pub async fn clear_session_history(&self, unified_id: &UnifiedSessionId) -> Result<(), AgentError> { let session = self.get_or_create_session(unified_id).await?; let mut session_guard = session.lock().await;