优化斜杠命令处理机制,增加/stop命令
This commit is contained in:
parent
b84c6f85db
commit
22be6e404b
@ -223,6 +223,12 @@ impl GatewayState {
|
|||||||
tracing::error!(error = %e, "Failed to publish outbound");
|
tracing::error!(error = %e, "Failed to publish outbound");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Ok(crate::session::session::HandleResult::AgentProcessing) => {
|
||||||
|
// Agent is processing in background; response will be
|
||||||
|
// sent via bus directly from the spawned task.
|
||||||
|
// The select loop remains free to handle subsequent
|
||||||
|
// messages (including slash commands).
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!(error = %e, "Failed to handle message");
|
tracing::error!(error = %e, "Failed to handle message");
|
||||||
}
|
}
|
||||||
|
|||||||
@ -179,6 +179,9 @@ impl Scheduler {
|
|||||||
|
|
||||||
let _ = self.storage.record_scheduled_job_run(&run).await;
|
let _ = self.storage.record_scheduled_job_run(&run).await;
|
||||||
}
|
}
|
||||||
|
Ok(HandleResult::AgentProcessing) => {
|
||||||
|
tracing::warn!(job_id = %job.id, "scheduler: unexpected AgentProcessing from cron — response sent via bus");
|
||||||
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
let error_str = e.to_string();
|
let error_str = e.to_string();
|
||||||
let run = JobRun {
|
let run = JobRun {
|
||||||
|
|||||||
@ -1,7 +1,7 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::{Mutex, mpsc, oneshot};
|
||||||
|
|
||||||
use crate::bus::{ChatMessage, MediaItem, MessageSource, OutboundMessage, SourceKind};
|
use crate::bus::{ChatMessage, MediaItem, MessageSource, OutboundMessage, SourceKind};
|
||||||
use crate::mcp::get_mcp_status;
|
use crate::mcp::get_mcp_status;
|
||||||
@ -18,6 +18,8 @@ pub enum HandleResult {
|
|||||||
AgentResponse(String),
|
AgentResponse(String),
|
||||||
/// Command output to be sent as CommandExecuted
|
/// Command output to be sent as CommandExecuted
|
||||||
CommandOutput(String),
|
CommandOutput(String),
|
||||||
|
/// Agent processing spawned in background; response will be sent via bus
|
||||||
|
AgentProcessing,
|
||||||
}
|
}
|
||||||
use crate::channels::slash_command::parse_slash_command;
|
use crate::channels::slash_command::parse_slash_command;
|
||||||
use crate::config::LLMProviderConfig;
|
use crate::config::LLMProviderConfig;
|
||||||
@ -72,6 +74,21 @@ pub struct Session {
|
|||||||
pub last_compressed_message_at: Option<i64>,
|
pub last_compressed_message_at: Option<i64>,
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
memory_manager: Arc<crate::memory::MemoryManager>,
|
memory_manager: Arc<crate::memory::MemoryManager>,
|
||||||
|
|
||||||
|
/// Task queue for per-session serial agent processing
|
||||||
|
agent_tx: Option<mpsc::UnboundedSender<AgentTask>>,
|
||||||
|
/// Cancel signal for the currently executing agent task
|
||||||
|
current_cancel: Option<oneshot::Sender<()>>,
|
||||||
|
/// Monotonic counter to detect stale workers
|
||||||
|
worker_generation: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A task to be processed by the per-session agent worker
|
||||||
|
struct AgentTask {
|
||||||
|
channel: String,
|
||||||
|
chat_id: String,
|
||||||
|
content: String,
|
||||||
|
media: Vec<MediaItem>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Session {
|
impl Session {
|
||||||
@ -119,6 +136,9 @@ impl Session {
|
|||||||
last_consolidated_at: None,
|
last_consolidated_at: None,
|
||||||
last_compressed_message_at: None,
|
last_compressed_message_at: None,
|
||||||
memory_manager,
|
memory_manager,
|
||||||
|
agent_tx: None,
|
||||||
|
current_cancel: None,
|
||||||
|
worker_generation: 0,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -251,6 +271,9 @@ impl Session {
|
|||||||
last_consolidated_at: session_meta.last_consolidated_at,
|
last_consolidated_at: session_meta.last_consolidated_at,
|
||||||
last_compressed_message_at: session_meta.last_compressed_message_at,
|
last_compressed_message_at: session_meta.last_compressed_message_at,
|
||||||
memory_manager,
|
memory_manager,
|
||||||
|
agent_tx: None,
|
||||||
|
current_cancel: None,
|
||||||
|
worker_generation: 0,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -806,6 +829,11 @@ pub static SLASH_COMMANDS: &[SlashCommand] = &[
|
|||||||
description: "显示 MCP 服务状态和工具列表",
|
description: "显示 MCP 服务状态和工具列表",
|
||||||
aliases: &["/mcp"],
|
aliases: &["/mcp"],
|
||||||
},
|
},
|
||||||
|
SlashCommand {
|
||||||
|
name: "stop",
|
||||||
|
description: "停止当前正在执行的任务并清空消息队列",
|
||||||
|
aliases: &["/stop"],
|
||||||
|
},
|
||||||
];
|
];
|
||||||
|
|
||||||
impl SessionManager {
|
impl SessionManager {
|
||||||
@ -1027,6 +1055,26 @@ impl SessionManager {
|
|||||||
}).collect();
|
}).collect();
|
||||||
Ok((None, format!("MCP 服务:\n\n{}", lines.join("\n\n"))))
|
Ok((None, format!("MCP 服务:\n\n{}", lines.join("\n\n"))))
|
||||||
}
|
}
|
||||||
|
"stop" => {
|
||||||
|
let sid = current_session_id
|
||||||
|
.ok_or_else(|| AgentError::Other("no active session".to_string()))?;
|
||||||
|
let session = self.get_or_create_session(sid).await?;
|
||||||
|
let mut guard = session.lock().await;
|
||||||
|
let mut msgs: Vec<String> = Vec::new();
|
||||||
|
if guard.current_cancel.take().is_some() {
|
||||||
|
msgs.push("当前任务已发送停止信号。".to_string());
|
||||||
|
}
|
||||||
|
if guard.agent_tx.take().is_some() {
|
||||||
|
msgs.push("消息队列已清空。".to_string());
|
||||||
|
}
|
||||||
|
guard.worker_generation = guard.worker_generation.wrapping_add(1);
|
||||||
|
let resp = if msgs.is_empty() {
|
||||||
|
"没有正在执行的任务或队列。".to_string()
|
||||||
|
} else {
|
||||||
|
msgs.join(" ")
|
||||||
|
};
|
||||||
|
Ok((None, resp))
|
||||||
|
}
|
||||||
_ => Err(AgentError::Other(format!("未知命令:/{}。输入 /? 获取帮助。", cmd.name))),
|
_ => Err(AgentError::Other(format!("未知命令:/{}。输入 /? 获取帮助。", cmd.name))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1348,158 +1396,346 @@ impl SessionManager {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// Normal message handling through LLM
|
// Normal message: enqueue to per-session worker for serial processing.
|
||||||
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
|
let task = AgentTask {
|
||||||
|
channel: channel.to_string(),
|
||||||
// Spawn notification publisher — sends immediately when tools are detected
|
chat_id: chat_id.to_string(),
|
||||||
|
content: content.to_string(),
|
||||||
|
media,
|
||||||
|
};
|
||||||
|
let session_clone = session.clone();
|
||||||
|
let unified_str = unified_id.to_string();
|
||||||
{
|
{
|
||||||
let bus = self.bus.clone();
|
let mut guard = session_clone.lock().await;
|
||||||
let ch = channel.to_string();
|
let needs_spawn =
|
||||||
let cid = chat_id.to_string();
|
guard.agent_tx.is_none() || guard.agent_tx.as_ref().is_some_and(|tx| tx.is_closed());
|
||||||
tokio::spawn(async move {
|
if needs_spawn {
|
||||||
while let Some(notif) = notify_rx.recv().await {
|
guard.agent_tx = None;
|
||||||
let mut metadata = HashMap::new();
|
guard.current_cancel = None;
|
||||||
metadata.insert("_type".to_string(), "notification".to_string());
|
guard.worker_generation = guard.worker_generation.wrapping_add(1);
|
||||||
let outbound = OutboundMessage {
|
let generation = guard.worker_generation;
|
||||||
channel: ch.clone(),
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
chat_id: cid.clone(),
|
guard.agent_tx = Some(tx);
|
||||||
content: notif,
|
spawn_agent_worker(
|
||||||
reply_to: None,
|
rx,
|
||||||
media: vec![],
|
session_clone.clone(),
|
||||||
metadata,
|
self.bus.clone(),
|
||||||
};
|
self.memory_manager.clone(),
|
||||||
let _ = bus.publish_outbound(outbound).await;
|
self.skills_loader.clone(),
|
||||||
}
|
generation,
|
||||||
});
|
unified_str.clone(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
if let Err(e) = guard.agent_tx.as_ref().unwrap().send(task) {
|
||||||
|
// Worker died after we just spawned it — respawn with the recovered task
|
||||||
|
let task = e.0;
|
||||||
|
guard.agent_tx = None;
|
||||||
|
guard.current_cancel = None;
|
||||||
|
guard.worker_generation = guard.worker_generation.wrapping_add(1);
|
||||||
|
let generation = guard.worker_generation;
|
||||||
|
let (tx, rx) = mpsc::unbounded_channel();
|
||||||
|
guard.agent_tx = Some(tx);
|
||||||
|
spawn_agent_worker(
|
||||||
|
rx,
|
||||||
|
session_clone.clone(),
|
||||||
|
self.bus.clone(),
|
||||||
|
self.memory_manager.clone(),
|
||||||
|
self.skills_loader.clone(),
|
||||||
|
generation,
|
||||||
|
unified_str.clone(),
|
||||||
|
);
|
||||||
|
guard.agent_tx.as_ref().unwrap().send(task).unwrap_or_else(|_| {
|
||||||
|
tracing::error!("Agent worker spawn+send failed irrecoverably");
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Ok(HandleResult::AgentProcessing)
|
||||||
// Phase 1: prepare data under session lock
|
|
||||||
let (agent, history, system_prompt) = {
|
|
||||||
let mut session_guard = session.lock().await;
|
|
||||||
|
|
||||||
let media_refs: Vec<String> = media.iter().map(|m| m.path.clone()).collect();
|
|
||||||
#[cfg(debug_assertions)]
|
|
||||||
if !media_refs.is_empty() {
|
|
||||||
tracing::debug!(media_count = %media.len(), media_refs = ?media_refs, "Adding user message with media");
|
|
||||||
}
|
|
||||||
|
|
||||||
let user_message = session_guard.create_user_message(content, media_refs);
|
|
||||||
session_guard.add_message(user_message, true).await
|
|
||||||
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
|
||||||
|
|
||||||
let history = session_guard.get_history().to_vec();
|
|
||||||
|
|
||||||
let skills_prompt = self.skills_loader.build_skills_prompt();
|
|
||||||
|
|
||||||
let memory_context = match self.memory_manager.recall(content, 5, Some(crate::memory::MemoryCategory::Knowledge), None).await {
|
|
||||||
Ok(entries) if !entries.is_empty() => {
|
|
||||||
Some(entries.iter()
|
|
||||||
.map(|e| format!("- {}: {}", e.key, e.content))
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.join("\n"))
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(error = %e, "Failed to fetch memory context");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let system_prompt = session_guard.build_system_prompt(&skills_prompt, memory_context.as_deref());
|
|
||||||
|
|
||||||
let result = session_guard.compressor
|
|
||||||
.compress_if_needed(history)
|
|
||||||
.await
|
|
||||||
.inspect_err(|e| {
|
|
||||||
tracing::warn!(error = %e, "Context compression failed in handle_message");
|
|
||||||
})?;
|
|
||||||
if result.created_timelines {
|
|
||||||
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
|
|
||||||
}
|
|
||||||
let mut history = result.history;
|
|
||||||
|
|
||||||
history.insert(0, ChatMessage::system(system_prompt.clone()));
|
|
||||||
|
|
||||||
let now = chrono::Utc::now().timestamp_millis();
|
|
||||||
session_guard.last_consolidated_at = Some(now);
|
|
||||||
if let Err(e) = session_guard.persist_session_meta().await {
|
|
||||||
tracing::warn!(error = %e, "Failed to persist consolidation timestamp");
|
|
||||||
}
|
|
||||||
|
|
||||||
let agent = session_guard.create_agent_with_notify(notify_tx)?;
|
|
||||||
(agent, history, system_prompt)
|
|
||||||
}; // session lock released — send_message can now lock freely
|
|
||||||
|
|
||||||
// Phase 2: LLM call (no session lock held)
|
|
||||||
let result = match agent.process(history).await {
|
|
||||||
Ok(r) => r,
|
|
||||||
Err(AgentError::LlmError(ref msg))
|
|
||||||
if is_context_overflow_error(msg) =>
|
|
||||||
{
|
|
||||||
let retry_history = {
|
|
||||||
let mut session_guard = session.lock().await;
|
|
||||||
let new_window = crate::agent::ContextCompressor::parse_context_limit_from_error(msg)
|
|
||||||
.unwrap_or(session_guard.compressor_threshold());
|
|
||||||
tracing::warn!(
|
|
||||||
new_window,
|
|
||||||
error = %msg,
|
|
||||||
"Context overflow in handle_message — retrying with tighter window"
|
|
||||||
);
|
|
||||||
session_guard.compressor.set_context_window(new_window);
|
|
||||||
let raw = session_guard.get_history().to_vec();
|
|
||||||
let retry_result = session_guard.compressor.compress_if_needed(raw).await?;
|
|
||||||
if retry_result.created_timelines {
|
|
||||||
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
|
|
||||||
if let Err(e) = session_guard.persist_session_meta().await {
|
|
||||||
tracing::warn!(error = %e, "Failed to persist compression marker on retry");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
let mut retry = retry_result.history;
|
|
||||||
retry.insert(0, ChatMessage::system(system_prompt.clone()));
|
|
||||||
retry
|
|
||||||
}; // lock released again for retry
|
|
||||||
|
|
||||||
agent.process(retry_history).await
|
|
||||||
.inspect_err(|e| {
|
|
||||||
tracing::error!(error = %e, "Agent retry after context compression failed");
|
|
||||||
})?
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(error = %e, "Agent processing error — propagating to caller");
|
|
||||||
return Err(e);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
// Phase 3: persist results under session lock
|
|
||||||
let response: String = {
|
|
||||||
let mut session_guard = session.lock().await;
|
|
||||||
|
|
||||||
for msg in result.emitted_messages {
|
|
||||||
session_guard.add_message(msg, true).await
|
|
||||||
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if session_guard.should_generate_title()
|
|
||||||
&& let Err(e) = session_guard.generate_title().await {
|
|
||||||
tracing::warn!("failed to generate title: {}", e);
|
|
||||||
}
|
|
||||||
|
|
||||||
result.final_response.content
|
|
||||||
};
|
|
||||||
|
|
||||||
#[cfg(debug_assertions)]
|
|
||||||
tracing::debug!(
|
|
||||||
channel = %channel,
|
|
||||||
chat_id = %chat_id,
|
|
||||||
response_len = %response.len(),
|
|
||||||
"Agent response received"
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(HandleResult::AgentResponse(response))
|
|
||||||
}).await
|
}).await
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle a message triggered by a scheduled cron job.
|
fn spawn_agent_worker(
|
||||||
|
mut task_rx: mpsc::UnboundedReceiver<AgentTask>,
|
||||||
|
session: Arc<Mutex<Session>>,
|
||||||
|
bus: Arc<MessageBus>,
|
||||||
|
memory_manager: Arc<crate::memory::MemoryManager>,
|
||||||
|
skills_loader: Arc<SkillsLoader>,
|
||||||
|
worker_gen: u64,
|
||||||
|
unified_str: String,
|
||||||
|
) {
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _scope = CURRENT_SOURCE_SESSION.scope(Some(unified_str), async {
|
||||||
|
while let Some(task) = task_rx.recv().await {
|
||||||
|
let task_chan = task.channel.clone();
|
||||||
|
let task_cid = task.chat_id.clone();
|
||||||
|
|
||||||
|
let (notify_tx, mut notify_rx) = mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
// Spawn notification publisher
|
||||||
|
{
|
||||||
|
let bus = bus.clone();
|
||||||
|
let ch = task_chan.clone();
|
||||||
|
let cid = task_cid.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(notif) = notify_rx.recv().await {
|
||||||
|
let mut metadata = HashMap::new();
|
||||||
|
metadata.insert("_type".to_string(), "notification".to_string());
|
||||||
|
let outbound = OutboundMessage {
|
||||||
|
channel: ch.clone(),
|
||||||
|
chat_id: cid.clone(),
|
||||||
|
content: notif,
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata,
|
||||||
|
};
|
||||||
|
let _ = bus.publish_outbound(outbound).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase 1: prepare data under session lock
|
||||||
|
let (agent, history_out, system_prompt_out, cancel_rx) = {
|
||||||
|
let mut guard = session.lock().await;
|
||||||
|
|
||||||
|
if guard.worker_generation != worker_gen {
|
||||||
|
return; // stale worker
|
||||||
|
}
|
||||||
|
|
||||||
|
let media_refs: Vec<String> =
|
||||||
|
task.media.iter().map(|m| m.path.clone()).collect();
|
||||||
|
let user_message =
|
||||||
|
guard.create_user_message(&task.content, media_refs);
|
||||||
|
if let Err(e) = guard.add_message(user_message, true).await {
|
||||||
|
tracing::error!(error = %e, "Failed to persist user message");
|
||||||
|
let err_outbound = OutboundMessage {
|
||||||
|
channel: task_chan.clone(),
|
||||||
|
chat_id: task_cid.clone(),
|
||||||
|
content: "Failed to save your message, please try again."
|
||||||
|
.to_string(),
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata: HashMap::new(),
|
||||||
|
};
|
||||||
|
let _ = bus.publish_outbound(err_outbound).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
let history_raw = guard.get_history().to_vec();
|
||||||
|
let skills_prompt = skills_loader.build_skills_prompt();
|
||||||
|
|
||||||
|
let memory_context = match memory_manager
|
||||||
|
.recall(
|
||||||
|
&task.content,
|
||||||
|
5,
|
||||||
|
Some(crate::memory::MemoryCategory::Knowledge),
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(entries) if !entries.is_empty() => Some(
|
||||||
|
entries
|
||||||
|
.iter()
|
||||||
|
.map(|e| format!("- {}: {}", e.key, e.content))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join("\n"),
|
||||||
|
),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "Failed to fetch memory context");
|
||||||
|
None
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
|
let system_prompt = guard
|
||||||
|
.build_system_prompt(&skills_prompt, memory_context.as_deref());
|
||||||
|
|
||||||
|
let result = guard
|
||||||
|
.compressor
|
||||||
|
.compress_if_needed(history_raw)
|
||||||
|
.await
|
||||||
|
.map(|r| {
|
||||||
|
if r.created_timelines {
|
||||||
|
guard.last_compressed_message_at =
|
||||||
|
Some(chrono::Utc::now().timestamp_millis());
|
||||||
|
}
|
||||||
|
r.history
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|e| {
|
||||||
|
tracing::warn!(
|
||||||
|
error = %e,
|
||||||
|
"Context compression failed in worker"
|
||||||
|
);
|
||||||
|
guard.get_history().to_vec()
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut history = result;
|
||||||
|
history.insert(0, ChatMessage::system(system_prompt.clone()));
|
||||||
|
|
||||||
|
let now = chrono::Utc::now().timestamp_millis();
|
||||||
|
guard.last_consolidated_at = Some(now);
|
||||||
|
let _ = guard.persist_session_meta().await;
|
||||||
|
|
||||||
|
let agent = match guard.create_agent_with_notify(notify_tx) {
|
||||||
|
Ok(a) => a,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, "Failed to create agent");
|
||||||
|
let err_outbound = OutboundMessage {
|
||||||
|
channel: task_chan.clone(),
|
||||||
|
chat_id: task_cid.clone(),
|
||||||
|
content: "Agent creation failed, please try again."
|
||||||
|
.to_string(),
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata: HashMap::new(),
|
||||||
|
};
|
||||||
|
let _ = bus.publish_outbound(err_outbound).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let (cancel_tx, cancel_rx) = oneshot::channel();
|
||||||
|
|
||||||
|
if guard.worker_generation != worker_gen {
|
||||||
|
return; // /stop replaced us
|
||||||
|
}
|
||||||
|
guard.current_cancel = Some(cancel_tx);
|
||||||
|
|
||||||
|
(agent, history, system_prompt, cancel_rx)
|
||||||
|
}; // lock released
|
||||||
|
|
||||||
|
// Phase 2 + 3: LLM call with cancellation
|
||||||
|
let session2 = session.clone();
|
||||||
|
let bus2 = bus.clone();
|
||||||
|
let chan2 = task_chan.clone();
|
||||||
|
let cid2 = task_cid.clone();
|
||||||
|
let process_future = async move {
|
||||||
|
let result = match agent.process(history_out.clone()).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(AgentError::LlmError(ref msg))
|
||||||
|
if is_context_overflow_error(msg) =>
|
||||||
|
{
|
||||||
|
let retry_history = {
|
||||||
|
let mut guard = session2.lock().await;
|
||||||
|
let new_window =
|
||||||
|
crate::agent::ContextCompressor::parse_context_limit_from_error(msg)
|
||||||
|
.unwrap_or(guard.compressor_threshold());
|
||||||
|
tracing::warn!(
|
||||||
|
new_window,
|
||||||
|
error = %msg,
|
||||||
|
"Context overflow in worker — retrying"
|
||||||
|
);
|
||||||
|
guard.compressor.set_context_window(new_window);
|
||||||
|
let raw = guard.get_history().to_vec();
|
||||||
|
let retry_result =
|
||||||
|
match guard.compressor.compress_if_needed(raw).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, "Retry compression failed");
|
||||||
|
let err_outbound = OutboundMessage {
|
||||||
|
channel: chan2,
|
||||||
|
chat_id: cid2,
|
||||||
|
content: "Context overflow handling failed."
|
||||||
|
.to_string(),
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata: HashMap::new(),
|
||||||
|
};
|
||||||
|
let _ = bus2.publish_outbound(err_outbound).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if retry_result.created_timelines {
|
||||||
|
guard.last_compressed_message_at =
|
||||||
|
Some(chrono::Utc::now().timestamp_millis());
|
||||||
|
let _ = guard.persist_session_meta().await;
|
||||||
|
}
|
||||||
|
let mut retry = retry_result.history;
|
||||||
|
retry.insert(
|
||||||
|
0,
|
||||||
|
ChatMessage::system(system_prompt_out.clone()),
|
||||||
|
);
|
||||||
|
retry
|
||||||
|
};
|
||||||
|
|
||||||
|
match agent.process(retry_history).await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(
|
||||||
|
error = %e,
|
||||||
|
"Agent retry after overflow failed"
|
||||||
|
);
|
||||||
|
let err_outbound = OutboundMessage {
|
||||||
|
channel: chan2,
|
||||||
|
chat_id: cid2,
|
||||||
|
content: format!("Processing error: {}", e),
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata: HashMap::new(),
|
||||||
|
};
|
||||||
|
let _ = bus2.publish_outbound(err_outbound).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(error = %e, "Agent processing error");
|
||||||
|
let err_outbound = OutboundMessage {
|
||||||
|
channel: chan2,
|
||||||
|
chat_id: cid2,
|
||||||
|
content: format!("Processing error: {}", e),
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata: HashMap::new(),
|
||||||
|
};
|
||||||
|
let _ = bus2.publish_outbound(err_outbound).await;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let response = {
|
||||||
|
let mut guard = session2.lock().await;
|
||||||
|
for msg in result.emitted_messages {
|
||||||
|
guard.add_message(msg, true).await.inspect_err(|e| {
|
||||||
|
tracing::error!(error = %e, "Failed to persist message")
|
||||||
|
}).ok();
|
||||||
|
}
|
||||||
|
if guard.should_generate_title()
|
||||||
|
&& let Err(e) = guard.generate_title().await
|
||||||
|
{
|
||||||
|
tracing::warn!("failed to generate title: {}", e);
|
||||||
|
}
|
||||||
|
result.final_response.content
|
||||||
|
};
|
||||||
|
|
||||||
|
let outbound = OutboundMessage {
|
||||||
|
channel: chan2,
|
||||||
|
chat_id: cid2,
|
||||||
|
content: response,
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata: HashMap::new(),
|
||||||
|
};
|
||||||
|
let _ = bus2.publish_outbound(outbound).await;
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
() = process_future => {}
|
||||||
|
_ = cancel_rx => {
|
||||||
|
// cancelled — current_cancel already taken by /stop
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up
|
||||||
|
let mut guard = session.lock().await;
|
||||||
|
if guard.worker_generation == worker_gen {
|
||||||
|
guard.current_cancel = None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SessionManager {
|
||||||
///
|
///
|
||||||
/// Runs in a stateless manner: no session creation, no history persistence.
|
/// Runs in a stateless manner: no session creation, no history persistence.
|
||||||
/// The cron system prompt instructs the LLM to deliver results via the
|
/// The cron system prompt instructs the LLM to deliver results via the
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user