diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index d4ce0ad..a3ff44e 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -228,7 +228,6 @@ impl GatewayState { let sched = Arc::new(Scheduler::new( self.storage.clone(), self.session_manager.clone(), - self.channel_manager.bus(), scheduler_config, )); tokio::spawn(async move { diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index f189641..53df8dd 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -4,11 +4,12 @@ use std::sync::Arc; use std::time::Instant; use tokio::time; -use crate::bus::MessageBus; use crate::config::SchedulerConfig; use crate::session::session::HandleResult; use crate::session::SessionManager; -use crate::storage::{JobRun, ScheduledJob, Storage}; +use crate::storage::ScheduledJob; +use crate::storage::Storage; +use crate::storage::JobRun; pub use types::Schedule; @@ -53,7 +54,6 @@ fn now_ms() -> i64 { pub struct Scheduler { storage: Arc, session_manager: Arc, - bus: Arc, config: SchedulerConfig, } @@ -61,13 +61,11 @@ impl Scheduler { pub fn new( storage: Arc, session_manager: Arc, - bus: Arc, config: SchedulerConfig, ) -> Self { Self { storage, session_manager, - bus, config, } } @@ -136,18 +134,6 @@ impl Scheduler { match result { Ok(HandleResult::AgentResponse(output)) => { - let outbound = crate::bus::OutboundMessage { - channel: job.channel.clone(), - chat_id: job.chat_id.clone(), - content: output.clone(), - reply_to: None, - media: vec![], - metadata: std::collections::HashMap::new(), - }; - if let Err(e) = self.bus.publish_outbound(outbound).await { - tracing::warn!(job_id = %job.id, "scheduler: failed to publish outbound: {}", e); - } - let output_truncated = if output.len() > 8000 { format!("{}...[truncated]", &output[..output.ceil_char_boundary(8000)]) } else { @@ -180,18 +166,6 @@ impl Scheduler { ); } Ok(HandleResult::CommandOutput(output)) => { - let outbound = crate::bus::OutboundMessage { - channel: job.channel.clone(), - chat_id: job.chat_id.clone(), - content: output.clone(), - reply_to: None, - media: vec![], - metadata: std::collections::HashMap::new(), - }; - if let Err(e) = self.bus.publish_outbound(outbound).await { - tracing::warn!(job_id = %job.id, "scheduler: failed to publish outbound: {}", e); - } - let run = JobRun { id: 0, job_id: job.id.clone(), diff --git a/src/session/session.rs b/src/session/session.rs index a7f0c02..b144cd1 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -839,6 +839,19 @@ impl SessionManager { self.tools.clone() } + /// 为定时任务创建一个无 session 绑定的 AgentLoop + pub fn create_cron_agent(&self) -> Result { + let provider = create_provider(self.provider_config.clone()) + .map_err(|e| AgentError::Other(format!("failed to create cron provider: {}", e)))?; + Ok(AgentLoop::with_provider_and_tools( + Arc::from(provider), + self.tools.clone(), + self.provider_config.max_tool_iterations, + self.provider_config.model_id.clone(), + self.provider_config.workspace_dir.clone(), + ).with_context_window(self.provider_config.token_limit)) + } + /// 获取所有可用的斜杠命令 pub fn get_slash_commands(&self) -> &[SlashCommand] { SLASH_COMMANDS @@ -1320,7 +1333,8 @@ impl SessionManager { }); } - let response: String = { + // Phase 1: prepare data under session lock + let (agent, history, system_prompt) = { let mut session_guard = session.lock().await; let media_refs: Vec = media.iter().map(|m| m.path.clone()).collect(); @@ -1335,10 +1349,8 @@ impl SessionManager { let history = session_guard.get_history().to_vec(); - // Build skills prompt let skills_prompt = self.skills_loader.build_skills_prompt(); - // Fetch memory context let memory_context = match self.memory_manager.recall(content, 5, Some(crate::memory::MemoryCategory::Knowledge), None).await { Ok(entries) if !entries.is_empty() => { Some(entries.iter() @@ -1353,9 +1365,6 @@ impl SessionManager { _ => None, }; - // Build combined system prompt and inject at position 0 AFTER compression. - // This ensures AgentLoop.process() sees a system message without it participating - // in context compression (system prompt is dynamic and should not be persisted). let system_prompt = session_guard.build_system_prompt(&skills_prompt, memory_context.as_deref()); let result = session_guard.compressor @@ -1371,7 +1380,6 @@ impl SessionManager { history.insert(0, ChatMessage::system(system_prompt.clone())); - // Persist consolidation state let now = chrono::Utc::now().timestamp_millis(); session_guard.last_consolidated_at = Some(now); if let Err(e) = session_guard.persist_session_meta().await { @@ -1379,13 +1387,17 @@ impl SessionManager { } let agent = session_guard.create_agent_with_notify(notify_tx)?; + (agent, history, system_prompt) + }; // session lock released — send_message can now lock freely - // Try LLM call; on context overflow, re-compress with tighter window and retry once. - let result = match agent.process(history).await { - Ok(r) => r, - Err(AgentError::LlmError(ref msg)) - if is_context_overflow_error(msg) => - { + // Phase 2: LLM call (no session lock held) + let result = match agent.process(history).await { + Ok(r) => r, + Err(AgentError::LlmError(ref msg)) + if is_context_overflow_error(msg) => + { + let retry_history = { + let mut session_guard = session.lock().await; let new_window = crate::agent::ContextCompressor::parse_context_limit_from_error(msg) .unwrap_or(session_guard.compressor_threshold()); tracing::warn!( @@ -1403,28 +1415,30 @@ impl SessionManager { } } let mut retry = retry_result.history; - retry.insert(0, ChatMessage::system(system_prompt)); - agent.process(retry).await - .inspect_err(|e| { - tracing::error!(error = %e, "Agent retry after context compression failed"); - })? - } - Err(e) => { - tracing::error!( - error = %e, - elapsed = ?"LLM call in handle_message failed", - "Agent processing error — propagating to caller" - ); - return Err(e); - }, - }; + retry.insert(0, ChatMessage::system(system_prompt.clone())); + retry + }; // lock released again for retry + + agent.process(retry_history).await + .inspect_err(|e| { + tracing::error!(error = %e, "Agent retry after context compression failed"); + })? + } + Err(e) => { + tracing::error!(error = %e, "Agent processing error — propagating to caller"); + return Err(e); + }, + }; + + // Phase 3: persist results under session lock + let response: String = { + let mut session_guard = session.lock().await; for msg in result.emitted_messages { session_guard.add_message(msg, true).await .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; } - // Check if we need to generate a title (after 10 user messages) if session_guard.should_generate_title() && let Err(e) = session_guard.generate_title().await { tracing::warn!("failed to generate title: {}", e); @@ -1447,9 +1461,10 @@ impl SessionManager { /// Handle a message triggered by a scheduled cron job. /// - /// This is similar to `handle_message`, but the user message is created with - /// `SourceKind::ExternalTrigger` source metadata so that the cron job identity - /// is preserved in the conversation history and database. + /// Runs in a stateless manner: no session creation, no history persistence. + /// The cron system prompt instructs the LLM to deliver results via the + /// `send_message` tool, which handles both delivery and history writing + /// on the target session. pub async fn handle_cron_message( &self, channel: &str, @@ -1458,133 +1473,45 @@ impl SessionManager { job_id: &str, job_name: &str, ) -> Result { - use crate::bus::{MessageSource, SourceKind}; + let skills_prompt = self.skills_loader.build_skills_prompt(); - let unified_id = self.resolve_dialog_id(channel, chat_id).await?; - tracing::debug!(unified_id = %unified_id, job_id = %job_id, "handle_cron_message resolved"); + let base_prompt = build_system_prompt( + &self.provider_config.workspace_dir, + &self.provider_config.model_id, + &self.tools, + Some(&format!("cron:{}:{}", job_name, job_id)), + None, + false, + ); + let cron_context = format!( + "## 定时任务执行\n\n\ + 你正在执行定时任务「{job_name}」({job_id})。\n\ + 目标渠道: {channel}:{chat_id}\n\n\ + 规则:\n\ + - 这不是聊天对话,没有用户会直接看到你的输出\n\ + - 你必须使用 send_message 工具将最终结果发送到目标渠道\n\ + - send_message 格式: target_chat_id=\"{channel}:{chat_id}\", content=\"消息内容\"\n\ + - 可以调用其他工具收集信息、处理任务,但最终消息必须通过 send_message 发送\n\ + - 只输出最终消息内容,不要输出中间思考过程或分析!" + ); + let full_system_prompt = format!("{}\n\n{}\n\n{}", base_prompt, skills_prompt, cron_context); - let session = self.get_or_create_session(&unified_id).await?; + let history = vec![ + ChatMessage::system(full_system_prompt), + ChatMessage::user(prompt), + ]; - CURRENT_SOURCE_SESSION.scope(Some(unified_id.to_string()), async { - let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel(); + let agent = self.create_cron_agent()?; + let source_session = format!("cron:{}", job_name); + let result = CURRENT_SOURCE_SESSION.scope(Some(source_session), async { + agent.process(history).await + }) + .await + .inspect_err(|e| { + tracing::error!(error = %e, job_id = %job_id, "Cron agent processing error"); + })?; - { - use std::collections::HashMap; - use crate::bus::OutboundMessage; - let bus = self.bus.clone(); - let ch = channel.to_string(); - let cid = chat_id.to_string(); - tokio::spawn(async move { - while let Some(notif) = notify_rx.recv().await { - let mut metadata = HashMap::new(); - metadata.insert("_type".to_string(), "notification".to_string()); - let outbound = OutboundMessage { - channel: ch.clone(), - chat_id: cid.clone(), - content: notif, - reply_to: None, - media: vec![], - metadata, - }; - let _ = bus.publish_outbound(outbound).await; - } - }); - } - - let response: String = { - let mut session_guard = session.lock().await; - - let source = MessageSource { - kind: SourceKind::ExternalTrigger, - from_channel: Some(channel.to_string()), - from_session: None, - from_user_id: None, - system_name: Some(job_name.to_string()), - task_id: Some(job_id.to_string()), - }; - let user_message = session_guard.create_user_message_with_source(prompt, vec![], source); - session_guard.add_message(user_message, true).await - .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; - - let history = session_guard.get_history().to_vec(); - - let skills_prompt = self.skills_loader.build_skills_prompt(); - let system_prompt = session_guard.build_system_prompt(&skills_prompt, None); - let cron_context = format!( - "\n\n## 定时任务执行\n\n\ - 你正在执行定时任务「{}」({})。\n\ - 目标渠道: {}:{}\n\n\ - 定时任务执行规则:\n\ - - 这不是聊天对话,没有人会回复你,不要等待用户输入\n\ - - 你的职责是根据任务指令直接生成要发送的消息内容\n\ - - 只输出最终消息,不要输出中间思考过程或分析\n\ - - 系统会自动将你的回复推送到目标渠道,不要使用 send_message 工具\n\ - - 你的最终回复就是推送给用户的消息原文", - job_name, job_id, channel, chat_id - ); - let full_system_prompt = format!("{}{}", system_prompt, cron_context); - - // Inject system prompt AFTER compression so it doesn't participate - // in context compression (system prompt is dynamic and should not be persisted). - let mut history = session_guard.compressor - .compress_if_needed(history) - .await - .inspect_err(|e| { - tracing::warn!(error = %e, "Context compression failed in handle_cron_message"); - })? - .history; - - history.insert(0, ChatMessage::system(full_system_prompt)); - - let agent = session_guard.create_agent_with_notify(notify_tx)?; - let result = agent.process(history).await - .inspect_err(|e| { - tracing::error!(error = %e, "Agent processing error in handle_cron_message"); - })?; - - for msg in result.emitted_messages { - session_guard.add_message(msg, true).await - .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; - } - - if session_guard.should_generate_title() - && let Err(e) = session_guard.generate_title().await { - tracing::warn!("failed to generate title: {}", e); - } - - let raw_response = result.final_response.content; - let prefix = format!( - "[message from cron:{}({})]\n", - job_name, job_id - ); - let prefixed_response = format!("{}{}", prefix, raw_response); - - let source = MessageSource { - kind: SourceKind::CrossChannel, - from_channel: Some("cron".to_string()), - from_session: Some(format!("{}:{}", job_name, job_id)), - from_user_id: None, - system_name: Some(job_name.to_string()), - task_id: Some(job_id.to_string()), - }; - let msg = ChatMessage::assistant_with_source(prefixed_response.clone(), source); - session_guard.add_message(msg, true).await - .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; - - prefixed_response - }; - - #[cfg(debug_assertions)] - tracing::debug!( - channel = %channel, - chat_id = %chat_id, - job_id = %job_id, - response_len = %response.len(), - "Cron agent response received" - ); - - Ok(HandleResult::AgentResponse(response)) - }).await + Ok(HandleResult::AgentResponse(result.final_response.content)) } pub async fn clear_session_history(&self, unified_id: &UnifiedSessionId) -> Result<(), AgentError> {