From 22be6e404b6bec221610690357dbfdf966f79740 Mon Sep 17 00:00:00 2001 From: xiaoski Date: Sun, 24 May 2026 18:12:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E6=96=9C=E6=9D=A0=E5=91=BD?= =?UTF-8?q?=E4=BB=A4=E5=A4=84=E7=90=86=E6=9C=BA=E5=88=B6=EF=BC=8C=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0/stop=E5=91=BD=E4=BB=A4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/gateway/mod.rs | 6 + src/scheduler/mod.rs | 3 + src/session/session.rs | 532 +++++++++++++++++++++++++++++------------ 3 files changed, 393 insertions(+), 148 deletions(-) diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index af5d1dc..541860e 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -223,6 +223,12 @@ impl GatewayState { tracing::error!(error = %e, "Failed to publish outbound"); } } + Ok(crate::session::session::HandleResult::AgentProcessing) => { + // Agent is processing in background; response will be + // sent via bus directly from the spawned task. + // The select loop remains free to handle subsequent + // messages (including slash commands). + } Err(e) => { tracing::error!(error = %e, "Failed to handle message"); } diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 53df8dd..8cbf82f 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -179,6 +179,9 @@ impl Scheduler { let _ = self.storage.record_scheduled_job_run(&run).await; } + Ok(HandleResult::AgentProcessing) => { + tracing::warn!(job_id = %job.id, "scheduler: unexpected AgentProcessing from cron — response sent via bus"); + } Err(e) => { let error_str = e.to_string(); let run = JobRun { diff --git a/src/session/session.rs b/src/session/session.rs index 511c4e6..d677581 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -1,7 +1,7 @@ use std::collections::HashMap; use std::sync::Arc; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, mpsc, oneshot}; use crate::bus::{ChatMessage, MediaItem, MessageSource, OutboundMessage, SourceKind}; use crate::mcp::get_mcp_status; @@ -18,6 +18,8 @@ pub enum HandleResult { AgentResponse(String), /// Command output to be sent as CommandExecuted CommandOutput(String), + /// Agent processing spawned in background; response will be sent via bus + AgentProcessing, } use crate::channels::slash_command::parse_slash_command; use crate::config::LLMProviderConfig; @@ -72,6 +74,21 @@ pub struct Session { pub last_compressed_message_at: Option, #[allow(dead_code)] memory_manager: Arc, + + /// Task queue for per-session serial agent processing + agent_tx: Option>, + /// Cancel signal for the currently executing agent task + current_cancel: Option>, + /// Monotonic counter to detect stale workers + worker_generation: u64, +} + +/// A task to be processed by the per-session agent worker +struct AgentTask { + channel: String, + chat_id: String, + content: String, + media: Vec, } impl Session { @@ -119,6 +136,9 @@ impl Session { last_consolidated_at: None, last_compressed_message_at: None, memory_manager, + agent_tx: None, + current_cancel: None, + worker_generation: 0, }) } @@ -251,6 +271,9 @@ impl Session { last_consolidated_at: session_meta.last_consolidated_at, last_compressed_message_at: session_meta.last_compressed_message_at, memory_manager, + agent_tx: None, + current_cancel: None, + worker_generation: 0, }) } @@ -806,6 +829,11 @@ pub static SLASH_COMMANDS: &[SlashCommand] = &[ description: "显示 MCP 服务状态和工具列表", aliases: &["/mcp"], }, + SlashCommand { + name: "stop", + description: "停止当前正在执行的任务并清空消息队列", + aliases: &["/stop"], + }, ]; impl SessionManager { @@ -1027,6 +1055,26 @@ impl SessionManager { }).collect(); Ok((None, format!("MCP 服务:\n\n{}", lines.join("\n\n")))) } + "stop" => { + let sid = current_session_id + .ok_or_else(|| AgentError::Other("no active session".to_string()))?; + let session = self.get_or_create_session(sid).await?; + let mut guard = session.lock().await; + let mut msgs: Vec = Vec::new(); + if guard.current_cancel.take().is_some() { + msgs.push("当前任务已发送停止信号。".to_string()); + } + if guard.agent_tx.take().is_some() { + msgs.push("消息队列已清空。".to_string()); + } + guard.worker_generation = guard.worker_generation.wrapping_add(1); + let resp = if msgs.is_empty() { + "没有正在执行的任务或队列。".to_string() + } else { + msgs.join(" ") + }; + Ok((None, resp)) + } _ => Err(AgentError::Other(format!("未知命令:/{}。输入 /? 获取帮助。", cmd.name))), } } @@ -1348,158 +1396,346 @@ impl SessionManager { }; } - // Normal message handling through LLM - let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel(); - - // Spawn notification publisher — sends immediately when tools are detected + // Normal message: enqueue to per-session worker for serial processing. + let task = AgentTask { + channel: channel.to_string(), + chat_id: chat_id.to_string(), + content: content.to_string(), + media, + }; + let session_clone = session.clone(); + let unified_str = unified_id.to_string(); { - let bus = self.bus.clone(); - let ch = channel.to_string(); - let cid = chat_id.to_string(); - tokio::spawn(async move { - while let Some(notif) = notify_rx.recv().await { - let mut metadata = HashMap::new(); - metadata.insert("_type".to_string(), "notification".to_string()); - let outbound = OutboundMessage { - channel: ch.clone(), - chat_id: cid.clone(), - content: notif, - reply_to: None, - media: vec![], - metadata, - }; - let _ = bus.publish_outbound(outbound).await; - } - }); + let mut guard = session_clone.lock().await; + let needs_spawn = + guard.agent_tx.is_none() || guard.agent_tx.as_ref().is_some_and(|tx| tx.is_closed()); + if needs_spawn { + guard.agent_tx = None; + guard.current_cancel = None; + guard.worker_generation = guard.worker_generation.wrapping_add(1); + let generation = guard.worker_generation; + let (tx, rx) = mpsc::unbounded_channel(); + guard.agent_tx = Some(tx); + spawn_agent_worker( + rx, + session_clone.clone(), + self.bus.clone(), + self.memory_manager.clone(), + self.skills_loader.clone(), + generation, + unified_str.clone(), + ); + } + if let Err(e) = guard.agent_tx.as_ref().unwrap().send(task) { + // Worker died after we just spawned it — respawn with the recovered task + let task = e.0; + guard.agent_tx = None; + guard.current_cancel = None; + guard.worker_generation = guard.worker_generation.wrapping_add(1); + let generation = guard.worker_generation; + let (tx, rx) = mpsc::unbounded_channel(); + guard.agent_tx = Some(tx); + spawn_agent_worker( + rx, + session_clone.clone(), + self.bus.clone(), + self.memory_manager.clone(), + self.skills_loader.clone(), + generation, + unified_str.clone(), + ); + guard.agent_tx.as_ref().unwrap().send(task).unwrap_or_else(|_| { + tracing::error!("Agent worker spawn+send failed irrecoverably"); + }); + } } - - // Phase 1: prepare data under session lock - let (agent, history, system_prompt) = { - let mut session_guard = session.lock().await; - - let media_refs: Vec = media.iter().map(|m| m.path.clone()).collect(); - #[cfg(debug_assertions)] - if !media_refs.is_empty() { - tracing::debug!(media_count = %media.len(), media_refs = ?media_refs, "Adding user message with media"); - } - - let user_message = session_guard.create_user_message(content, media_refs); - session_guard.add_message(user_message, true).await - .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; - - let history = session_guard.get_history().to_vec(); - - let skills_prompt = self.skills_loader.build_skills_prompt(); - - let memory_context = match self.memory_manager.recall(content, 5, Some(crate::memory::MemoryCategory::Knowledge), None).await { - Ok(entries) if !entries.is_empty() => { - Some(entries.iter() - .map(|e| format!("- {}: {}", e.key, e.content)) - .collect::>() - .join("\n")) - } - Err(e) => { - tracing::warn!(error = %e, "Failed to fetch memory context"); - None - } - _ => None, - }; - - let system_prompt = session_guard.build_system_prompt(&skills_prompt, memory_context.as_deref()); - - let result = session_guard.compressor - .compress_if_needed(history) - .await - .inspect_err(|e| { - tracing::warn!(error = %e, "Context compression failed in handle_message"); - })?; - if result.created_timelines { - session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis()); - } - let mut history = result.history; - - history.insert(0, ChatMessage::system(system_prompt.clone())); - - let now = chrono::Utc::now().timestamp_millis(); - session_guard.last_consolidated_at = Some(now); - if let Err(e) = session_guard.persist_session_meta().await { - tracing::warn!(error = %e, "Failed to persist consolidation timestamp"); - } - - let agent = session_guard.create_agent_with_notify(notify_tx)?; - (agent, history, system_prompt) - }; // session lock released — send_message can now lock freely - - // Phase 2: LLM call (no session lock held) - let result = match agent.process(history).await { - Ok(r) => r, - Err(AgentError::LlmError(ref msg)) - if is_context_overflow_error(msg) => - { - let retry_history = { - let mut session_guard = session.lock().await; - let new_window = crate::agent::ContextCompressor::parse_context_limit_from_error(msg) - .unwrap_or(session_guard.compressor_threshold()); - tracing::warn!( - new_window, - error = %msg, - "Context overflow in handle_message — retrying with tighter window" - ); - session_guard.compressor.set_context_window(new_window); - let raw = session_guard.get_history().to_vec(); - let retry_result = session_guard.compressor.compress_if_needed(raw).await?; - if retry_result.created_timelines { - session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis()); - if let Err(e) = session_guard.persist_session_meta().await { - tracing::warn!(error = %e, "Failed to persist compression marker on retry"); - } - } - let mut retry = retry_result.history; - retry.insert(0, ChatMessage::system(system_prompt.clone())); - retry - }; // lock released again for retry - - agent.process(retry_history).await - .inspect_err(|e| { - tracing::error!(error = %e, "Agent retry after context compression failed"); - })? - } - Err(e) => { - tracing::error!(error = %e, "Agent processing error — propagating to caller"); - return Err(e); - }, - }; - - // Phase 3: persist results under session lock - let response: String = { - let mut session_guard = session.lock().await; - - for msg in result.emitted_messages { - session_guard.add_message(msg, true).await - .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; - } - - if session_guard.should_generate_title() - && let Err(e) = session_guard.generate_title().await { - tracing::warn!("failed to generate title: {}", e); - } - - result.final_response.content - }; - - #[cfg(debug_assertions)] - tracing::debug!( - channel = %channel, - chat_id = %chat_id, - response_len = %response.len(), - "Agent response received" - ); - - Ok(HandleResult::AgentResponse(response)) + Ok(HandleResult::AgentProcessing) }).await } +} - /// Handle a message triggered by a scheduled cron job. +fn spawn_agent_worker( + mut task_rx: mpsc::UnboundedReceiver, + session: Arc>, + bus: Arc, + memory_manager: Arc, + skills_loader: Arc, + worker_gen: u64, + unified_str: String, +) { + tokio::spawn(async move { + let _scope = CURRENT_SOURCE_SESSION.scope(Some(unified_str), async { + while let Some(task) = task_rx.recv().await { + let task_chan = task.channel.clone(); + let task_cid = task.chat_id.clone(); + + let (notify_tx, mut notify_rx) = mpsc::unbounded_channel(); + + // Spawn notification publisher + { + let bus = bus.clone(); + let ch = task_chan.clone(); + let cid = task_cid.clone(); + tokio::spawn(async move { + while let Some(notif) = notify_rx.recv().await { + let mut metadata = HashMap::new(); + metadata.insert("_type".to_string(), "notification".to_string()); + let outbound = OutboundMessage { + channel: ch.clone(), + chat_id: cid.clone(), + content: notif, + reply_to: None, + media: vec![], + metadata, + }; + let _ = bus.publish_outbound(outbound).await; + } + }); + } + + // Phase 1: prepare data under session lock + let (agent, history_out, system_prompt_out, cancel_rx) = { + let mut guard = session.lock().await; + + if guard.worker_generation != worker_gen { + return; // stale worker + } + + let media_refs: Vec = + task.media.iter().map(|m| m.path.clone()).collect(); + let user_message = + guard.create_user_message(&task.content, media_refs); + if let Err(e) = guard.add_message(user_message, true).await { + tracing::error!(error = %e, "Failed to persist user message"); + let err_outbound = OutboundMessage { + channel: task_chan.clone(), + chat_id: task_cid.clone(), + content: "Failed to save your message, please try again." + .to_string(), + reply_to: None, + media: vec![], + metadata: HashMap::new(), + }; + let _ = bus.publish_outbound(err_outbound).await; + return; + } + + let history_raw = guard.get_history().to_vec(); + let skills_prompt = skills_loader.build_skills_prompt(); + + let memory_context = match memory_manager + .recall( + &task.content, + 5, + Some(crate::memory::MemoryCategory::Knowledge), + None, + ) + .await + { + Ok(entries) if !entries.is_empty() => Some( + entries + .iter() + .map(|e| format!("- {}: {}", e.key, e.content)) + .collect::>() + .join("\n"), + ), + Err(e) => { + tracing::warn!(error = %e, "Failed to fetch memory context"); + None + } + _ => None, + }; + + let system_prompt = guard + .build_system_prompt(&skills_prompt, memory_context.as_deref()); + + let result = guard + .compressor + .compress_if_needed(history_raw) + .await + .map(|r| { + if r.created_timelines { + guard.last_compressed_message_at = + Some(chrono::Utc::now().timestamp_millis()); + } + r.history + }) + .unwrap_or_else(|e| { + tracing::warn!( + error = %e, + "Context compression failed in worker" + ); + guard.get_history().to_vec() + }); + + let mut history = result; + history.insert(0, ChatMessage::system(system_prompt.clone())); + + let now = chrono::Utc::now().timestamp_millis(); + guard.last_consolidated_at = Some(now); + let _ = guard.persist_session_meta().await; + + let agent = match guard.create_agent_with_notify(notify_tx) { + Ok(a) => a, + Err(e) => { + tracing::error!(error = %e, "Failed to create agent"); + let err_outbound = OutboundMessage { + channel: task_chan.clone(), + chat_id: task_cid.clone(), + content: "Agent creation failed, please try again." + .to_string(), + reply_to: None, + media: vec![], + metadata: HashMap::new(), + }; + let _ = bus.publish_outbound(err_outbound).await; + return; + } + }; + + let (cancel_tx, cancel_rx) = oneshot::channel(); + + if guard.worker_generation != worker_gen { + return; // /stop replaced us + } + guard.current_cancel = Some(cancel_tx); + + (agent, history, system_prompt, cancel_rx) + }; // lock released + + // Phase 2 + 3: LLM call with cancellation + let session2 = session.clone(); + let bus2 = bus.clone(); + let chan2 = task_chan.clone(); + let cid2 = task_cid.clone(); + let process_future = async move { + let result = match agent.process(history_out.clone()).await { + Ok(r) => r, + Err(AgentError::LlmError(ref msg)) + if is_context_overflow_error(msg) => + { + let retry_history = { + let mut guard = session2.lock().await; + let new_window = + crate::agent::ContextCompressor::parse_context_limit_from_error(msg) + .unwrap_or(guard.compressor_threshold()); + tracing::warn!( + new_window, + error = %msg, + "Context overflow in worker — retrying" + ); + guard.compressor.set_context_window(new_window); + let raw = guard.get_history().to_vec(); + let retry_result = + match guard.compressor.compress_if_needed(raw).await { + Ok(r) => r, + Err(e) => { + tracing::error!(error = %e, "Retry compression failed"); + let err_outbound = OutboundMessage { + channel: chan2, + chat_id: cid2, + content: "Context overflow handling failed." + .to_string(), + reply_to: None, + media: vec![], + metadata: HashMap::new(), + }; + let _ = bus2.publish_outbound(err_outbound).await; + return; + } + }; + if retry_result.created_timelines { + guard.last_compressed_message_at = + Some(chrono::Utc::now().timestamp_millis()); + let _ = guard.persist_session_meta().await; + } + let mut retry = retry_result.history; + retry.insert( + 0, + ChatMessage::system(system_prompt_out.clone()), + ); + retry + }; + + match agent.process(retry_history).await { + Ok(r) => r, + Err(e) => { + tracing::error!( + error = %e, + "Agent retry after overflow failed" + ); + let err_outbound = OutboundMessage { + channel: chan2, + chat_id: cid2, + content: format!("Processing error: {}", e), + reply_to: None, + media: vec![], + metadata: HashMap::new(), + }; + let _ = bus2.publish_outbound(err_outbound).await; + return; + } + } + } + Err(e) => { + tracing::error!(error = %e, "Agent processing error"); + let err_outbound = OutboundMessage { + channel: chan2, + chat_id: cid2, + content: format!("Processing error: {}", e), + reply_to: None, + media: vec![], + metadata: HashMap::new(), + }; + let _ = bus2.publish_outbound(err_outbound).await; + return; + } + }; + + let response = { + let mut guard = session2.lock().await; + for msg in result.emitted_messages { + guard.add_message(msg, true).await.inspect_err(|e| { + tracing::error!(error = %e, "Failed to persist message") + }).ok(); + } + if guard.should_generate_title() + && let Err(e) = guard.generate_title().await + { + tracing::warn!("failed to generate title: {}", e); + } + result.final_response.content + }; + + let outbound = OutboundMessage { + channel: chan2, + chat_id: cid2, + content: response, + reply_to: None, + media: vec![], + metadata: HashMap::new(), + }; + let _ = bus2.publish_outbound(outbound).await; + }; + + tokio::select! { + () = process_future => {} + _ = cancel_rx => { + // cancelled — current_cancel already taken by /stop + } + } + + // Clean up + let mut guard = session.lock().await; + if guard.worker_generation == worker_gen { + guard.current_cancel = None; + } + } + }).await; + }); +} + +impl SessionManager { /// /// Runs in a stateless manner: no session creation, no history persistence. /// The cron system prompt instructs the LLM to deliver results via the