Compare commits

..

No commits in common. "ebbf7e403617fb33568ac1465973f29edcf33f54" and "1e69fa3bd1839111d41e0464d979556f68c3fb76" have entirely different histories.

5 changed files with 300 additions and 265 deletions

View File

@ -228,6 +228,7 @@ impl GatewayState {
let sched = Arc::new(Scheduler::new( let sched = Arc::new(Scheduler::new(
self.storage.clone(), self.storage.clone(),
self.session_manager.clone(), self.session_manager.clone(),
self.channel_manager.bus(),
scheduler_config, scheduler_config,
)); ));
tokio::spawn(async move { tokio::spawn(async move {

View File

@ -2,7 +2,6 @@ use async_trait::async_trait;
use reqwest::Client; use reqwest::Client;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Duration;
use crate::bus::message::ContentBlock; use crate::bus::message::ContentBlock;
use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, Tool, ToolCall}; use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, Tool, ToolCall};
@ -10,8 +9,6 @@ use super::traits::Usage;
use std::sync::Arc; use std::sync::Arc;
use crate::storage::Storage; use crate::storage::Storage;
const LLM_REQUEST_TIMEOUT_SECS: u64 = 300;
fn convert_content_blocks(blocks: &[ContentBlock]) -> Vec<serde_json::Value> { fn convert_content_blocks(blocks: &[ContentBlock]) -> Vec<serde_json::Value> {
blocks.iter().map(|b| match b { blocks.iter().map(|b| match b {
ContentBlock::Text { text } => { ContentBlock::Text { text } => {
@ -75,10 +72,7 @@ impl AnthropicProvider {
model_extra: HashMap<String, serde_json::Value>, model_extra: HashMap<String, serde_json::Value>,
) -> Self { ) -> Self {
Self { Self {
client: Client::builder() client: Client::new(),
.timeout(Duration::from_secs(LLM_REQUEST_TIMEOUT_SECS))
.build()
.unwrap_or_else(|_| Client::new()),
name, name,
api_key, api_key,
base_url, base_url,
@ -244,19 +238,7 @@ impl LLMProvider for AnthropicProvider {
let req_body_str = serde_json::to_string_pretty(&body).unwrap_or_default(); let req_body_str = serde_json::to_string_pretty(&body).unwrap_or_default();
tracing::debug!(req_body = %req_body_str, "LLM request"); tracing::debug!(req_body = %req_body_str, "LLM request");
let resp = req_builder.json(&body).send().await let resp = req_builder.json(&body).send().await?;
.inspect_err(|e| {
let is_timeout = e.is_timeout();
tracing::error!(
provider = %self.name,
model = %self.model_id,
url = %url,
timeout = is_timeout,
error = %e,
elapsed_ms = %start.elapsed().as_millis(),
"LLM API request failed"
);
})?;
let status = resp.status(); let status = resp.status();
let body_text = resp.text().await?; let body_text = resp.text().await?;
@ -272,14 +254,6 @@ impl LLMProvider for AnthropicProvider {
.map(|s| s.to_string()) .map(|s| s.to_string())
}) })
.unwrap_or_else(|| body_text.clone()); .unwrap_or_else(|| body_text.clone());
tracing::error!(
provider = %self.name,
model = %self.model_id,
http_status = %status,
error = %error_msg,
elapsed_ms = %start.elapsed().as_millis(),
"LLM API returned error"
);
if let Some(ref storage) = self.storage { if let Some(ref storage) = self.storage {
let _ = storage.append_llm_call( let _ = storage.append_llm_call(
&self.name, &self.model_id, &req_body_str, &self.name, &self.model_id, &req_body_str,

View File

@ -3,7 +3,6 @@ use reqwest::Client;
use serde::Deserialize; use serde::Deserialize;
use serde_json::{json, Value}; use serde_json::{json, Value};
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Duration;
use crate::bus::message::ContentBlock; use crate::bus::message::ContentBlock;
use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, ToolCall}; use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, ToolCall};
@ -11,8 +10,6 @@ use super::traits::Usage;
use std::sync::Arc; use std::sync::Arc;
use crate::storage::Storage; use crate::storage::Storage;
const LLM_REQUEST_TIMEOUT_SECS: u64 = 300;
fn convert_content_blocks(blocks: &[ContentBlock]) -> Value { fn convert_content_blocks(blocks: &[ContentBlock]) -> Value {
if blocks.len() == 1 if blocks.len() == 1
&& let ContentBlock::Text { text } = &blocks[0] { && let ContentBlock::Text { text } = &blocks[0] {
@ -51,10 +48,7 @@ impl OpenAIProvider {
model_extra: HashMap<String, serde_json::Value>, model_extra: HashMap<String, serde_json::Value>,
) -> Self { ) -> Self {
Self { Self {
client: Client::builder() client: Client::new(),
.timeout(Duration::from_secs(LLM_REQUEST_TIMEOUT_SECS))
.build()
.unwrap_or_else(|_| Client::new()),
name, name,
api_key, api_key,
base_url, base_url,
@ -212,19 +206,7 @@ impl LLMProvider for OpenAIProvider {
let req_body_str = serde_json::to_string_pretty(&body).unwrap_or_default(); let req_body_str = serde_json::to_string_pretty(&body).unwrap_or_default();
tracing::debug!(req_body = %req_body_str, "LLM request"); tracing::debug!(req_body = %req_body_str, "LLM request");
let resp = req_builder.json(&body).send().await let resp = req_builder.json(&body).send().await?;
.inspect_err(|e| {
let is_timeout = e.is_timeout();
tracing::error!(
provider = %self.name,
model = %self.model_id,
url = %url,
timeout = is_timeout,
error = %e,
elapsed_ms = %start.elapsed().as_millis(),
"LLM API request failed"
);
})?;
let status = resp.status(); let status = resp.status();
let text = resp.text().await?; let text = resp.text().await?;
@ -232,14 +214,6 @@ impl LLMProvider for OpenAIProvider {
if !status.is_success() { if !status.is_success() {
let error = format!("API error {}: {}", status, text); let error = format!("API error {}: {}", status, text);
tracing::error!(
provider = %self.name,
model = %self.model_id,
http_status = %status,
error = %error,
elapsed_ms = %start.elapsed().as_millis(),
"LLM API returned error"
);
if let Some(ref storage) = self.storage if let Some(ref storage) = self.storage
&& let Err(e) = storage.append_llm_call( && let Err(e) = storage.append_llm_call(
&self.name, &self.model_id, &req_body_str, &self.name, &self.model_id, &req_body_str,

View File

@ -4,12 +4,11 @@ use std::sync::Arc;
use std::time::Instant; use std::time::Instant;
use tokio::time; use tokio::time;
use crate::bus::MessageBus;
use crate::config::SchedulerConfig; use crate::config::SchedulerConfig;
use crate::session::session::HandleResult; use crate::session::session::HandleResult;
use crate::session::SessionManager; use crate::session::SessionManager;
use crate::storage::ScheduledJob; use crate::storage::{JobRun, ScheduledJob, Storage};
use crate::storage::Storage;
use crate::storage::JobRun;
pub use types::Schedule; pub use types::Schedule;
@ -54,6 +53,7 @@ fn now_ms() -> i64 {
pub struct Scheduler { pub struct Scheduler {
storage: Arc<Storage>, storage: Arc<Storage>,
session_manager: Arc<SessionManager>, session_manager: Arc<SessionManager>,
bus: Arc<MessageBus>,
config: SchedulerConfig, config: SchedulerConfig,
} }
@ -61,11 +61,13 @@ impl Scheduler {
pub fn new( pub fn new(
storage: Arc<Storage>, storage: Arc<Storage>,
session_manager: Arc<SessionManager>, session_manager: Arc<SessionManager>,
bus: Arc<MessageBus>,
config: SchedulerConfig, config: SchedulerConfig,
) -> Self { ) -> Self {
Self { Self {
storage, storage,
session_manager, session_manager,
bus,
config, config,
} }
} }
@ -134,6 +136,18 @@ impl Scheduler {
match result { match result {
Ok(HandleResult::AgentResponse(output)) => { Ok(HandleResult::AgentResponse(output)) => {
let outbound = crate::bus::OutboundMessage {
channel: job.channel.clone(),
chat_id: job.chat_id.clone(),
content: output.clone(),
reply_to: None,
media: vec![],
metadata: std::collections::HashMap::new(),
};
if let Err(e) = self.bus.publish_outbound(outbound).await {
tracing::warn!(job_id = %job.id, "scheduler: failed to publish outbound: {}", e);
}
let output_truncated = if output.len() > 8000 { let output_truncated = if output.len() > 8000 {
format!("{}...[truncated]", &output[..output.ceil_char_boundary(8000)]) format!("{}...[truncated]", &output[..output.ceil_char_boundary(8000)])
} else { } else {
@ -166,6 +180,18 @@ impl Scheduler {
); );
} }
Ok(HandleResult::CommandOutput(output)) => { Ok(HandleResult::CommandOutput(output)) => {
let outbound = crate::bus::OutboundMessage {
channel: job.channel.clone(),
chat_id: job.chat_id.clone(),
content: output.clone(),
reply_to: None,
media: vec![],
metadata: std::collections::HashMap::new(),
};
if let Err(e) = self.bus.publish_outbound(outbound).await {
tracing::warn!(job_id = %job.id, "scheduler: failed to publish outbound: {}", e);
}
let run = JobRun { let run = JobRun {
id: 0, id: 0,
job_id: job.id.clone(), job_id: job.id.clone(),

View File

@ -7,10 +7,6 @@ use crate::bus::{ChatMessage, MediaItem, MessageSource, OutboundMessage, SourceK
use crate::storage::{Storage, StorageError}; use crate::storage::{Storage, StorageError};
use std::sync::Arc as StdArc; use std::sync::Arc as StdArc;
tokio::task_local! {
static CURRENT_SOURCE_SESSION: Option<String>;
}
/// Result of handling a message - either an AI response or a command output /// Result of handling a message - either an AI response or a command output
pub enum HandleResult { pub enum HandleResult {
/// AI response to be sent as AssistantResponse /// AI response to be sent as AssistantResponse
@ -721,6 +717,7 @@ pub struct SessionManager {
skills_loader: Arc<SkillsLoader>, skills_loader: Arc<SkillsLoader>,
storage: Arc<Storage>, storage: Arc<Storage>,
bus: Arc<MessageBus>, bus: Arc<MessageBus>,
current_source_session: Arc<Mutex<Option<String>>>,
memory_manager: Arc<crate::memory::MemoryManager>, memory_manager: Arc<crate::memory::MemoryManager>,
} }
@ -825,6 +822,7 @@ impl SessionManager {
skills_loader, skills_loader,
storage, storage,
bus, bus,
current_source_session: Arc::new(Mutex::new(None)),
memory_manager, memory_manager,
}) })
} }
@ -839,19 +837,6 @@ impl SessionManager {
self.tools.clone() self.tools.clone()
} }
/// 为定时任务创建一个无 session 绑定的 AgentLoop
pub fn create_cron_agent(&self) -> Result<AgentLoop, AgentError> {
let provider = create_provider(self.provider_config.clone())
.map_err(|e| AgentError::Other(format!("failed to create cron provider: {}", e)))?;
Ok(AgentLoop::with_provider_and_tools(
Arc::from(provider),
self.tools.clone(),
self.provider_config.max_tool_iterations,
self.provider_config.model_id.clone(),
self.provider_config.workspace_dir.clone(),
).with_context_window(self.provider_config.token_limit))
}
/// 获取所有可用的斜杠命令 /// 获取所有可用的斜杠命令
pub fn get_slash_commands(&self) -> &[SlashCommand] { pub fn get_slash_commands(&self) -> &[SlashCommand] {
SLASH_COMMANDS SLASH_COMMANDS
@ -1288,183 +1273,175 @@ impl SessionManager {
media: Vec<MediaItem>, media: Vec<MediaItem>,
) -> Result<HandleResult, AgentError> { ) -> Result<HandleResult, AgentError> {
let unified_id = self.resolve_dialog_id(channel, chat_id).await?; let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
*self.current_source_session.lock().await = Some(unified_id.to_string());
tracing::debug!(unified_id = %unified_id, "handle_message resolved unified_id"); tracing::debug!(unified_id = %unified_id, "handle_message resolved unified_id");
let session = self.get_or_create_session(&unified_id).await?; let session = self.get_or_create_session(&unified_id).await?;
CURRENT_SOURCE_SESSION.scope(Some(unified_id.to_string()), async { // Check for slash command
// Check for slash command if let Some((cmd_name, cmd_args)) = parse_slash_command(content) {
if let Some((cmd_name, cmd_args)) = parse_slash_command(content) { let result = self.execute_slash_command(
let result = self.execute_slash_command( cmd_name,
cmd_name, if cmd_args.is_empty() { None } else { Some(cmd_args) },
if cmd_args.is_empty() { None } else { Some(cmd_args) }, channel,
channel, chat_id,
chat_id, Some(&unified_id),
Some(&unified_id), ).await;
).await;
return match result { match result {
Ok((_new_session_id, response)) => Ok(HandleResult::CommandOutput(response)), Ok((_new_session_id, response)) => {
Err(e) => Ok(HandleResult::CommandOutput(e.to_string())), *self.current_source_session.lock().await = None;
}; return Ok(HandleResult::CommandOutput(response));
}
Err(e) => {
*self.current_source_session.lock().await = None;
return Ok(HandleResult::CommandOutput(e.to_string()));
}
}
}
// Normal message handling through LLM
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
// Spawn notification publisher — sends immediately when tools are detected
{
let bus = self.bus.clone();
let ch = channel.to_string();
let cid = chat_id.to_string();
tokio::spawn(async move {
while let Some(notif) = notify_rx.recv().await {
let mut metadata = HashMap::new();
metadata.insert("_type".to_string(), "notification".to_string());
let outbound = OutboundMessage {
channel: ch.clone(),
chat_id: cid.clone(),
content: notif,
reply_to: None,
media: vec![],
metadata,
};
let _ = bus.publish_outbound(outbound).await;
}
});
}
let response: String = {
let mut session_guard = session.lock().await;
let media_refs: Vec<String> = media.iter().map(|m| m.path.clone()).collect();
#[cfg(debug_assertions)]
if !media_refs.is_empty() {
tracing::debug!(media_count = %media.len(), media_refs = ?media_refs, "Adding user message with media");
} }
// Normal message handling through LLM let user_message = session_guard.create_user_message(content, media_refs);
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel(); session_guard.add_message(user_message, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
// Spawn notification publisher — sends immediately when tools are detected let history = session_guard.get_history().to_vec();
{
let bus = self.bus.clone(); // Build skills prompt
let ch = channel.to_string(); let skills_prompt = self.skills_loader.build_skills_prompt();
let cid = chat_id.to_string();
tokio::spawn(async move { // Fetch memory context
while let Some(notif) = notify_rx.recv().await { let memory_context = match self.memory_manager.recall(content, 5, Some(crate::memory::MemoryCategory::Knowledge), None).await {
let mut metadata = HashMap::new(); Ok(entries) if !entries.is_empty() => {
metadata.insert("_type".to_string(), "notification".to_string()); Some(entries.iter()
let outbound = OutboundMessage { .map(|e| format!("- {}: {}", e.key, e.content))
channel: ch.clone(), .collect::<Vec<_>>()
chat_id: cid.clone(), .join("\n"))
content: notif, }
reply_to: None, Err(e) => {
media: vec![], tracing::warn!(error = %e, "Failed to fetch memory context");
metadata, None
}; }
let _ = bus.publish_outbound(outbound).await; _ => None,
} };
});
// Build combined system prompt and inject at position 0 AFTER compression.
// This ensures AgentLoop.process() sees a system message without it participating
// in context compression (system prompt is dynamic and should not be persisted).
let system_prompt = session_guard.build_system_prompt(&skills_prompt, memory_context.as_deref());
let result = session_guard.compressor
.compress_if_needed(history)
.await?;
if result.created_timelines {
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
}
let mut history = result.history;
history.insert(0, ChatMessage::system(system_prompt.clone()));
// Persist consolidation state
let now = chrono::Utc::now().timestamp_millis();
session_guard.last_consolidated_at = Some(now);
if let Err(e) = session_guard.persist_session_meta().await {
tracing::warn!(error = %e, "Failed to persist consolidation timestamp");
} }
// Phase 1: prepare data under session lock let agent = session_guard.create_agent_with_notify(notify_tx)?;
let (agent, history, system_prompt) = {
let mut session_guard = session.lock().await;
let media_refs: Vec<String> = media.iter().map(|m| m.path.clone()).collect(); // Try LLM call; on context overflow, re-compress with tighter window and retry once.
#[cfg(debug_assertions)]
if !media_refs.is_empty() {
tracing::debug!(media_count = %media.len(), media_refs = ?media_refs, "Adding user message with media");
}
let user_message = session_guard.create_user_message(content, media_refs);
session_guard.add_message(user_message, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
let history = session_guard.get_history().to_vec();
let skills_prompt = self.skills_loader.build_skills_prompt();
let memory_context = match self.memory_manager.recall(content, 5, Some(crate::memory::MemoryCategory::Knowledge), None).await {
Ok(entries) if !entries.is_empty() => {
Some(entries.iter()
.map(|e| format!("- {}: {}", e.key, e.content))
.collect::<Vec<_>>()
.join("\n"))
}
Err(e) => {
tracing::warn!(error = %e, "Failed to fetch memory context");
None
}
_ => None,
};
let system_prompt = session_guard.build_system_prompt(&skills_prompt, memory_context.as_deref());
let result = session_guard.compressor
.compress_if_needed(history)
.await
.inspect_err(|e| {
tracing::warn!(error = %e, "Context compression failed in handle_message");
})?;
if result.created_timelines {
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
}
let mut history = result.history;
history.insert(0, ChatMessage::system(system_prompt.clone()));
let now = chrono::Utc::now().timestamp_millis();
session_guard.last_consolidated_at = Some(now);
if let Err(e) = session_guard.persist_session_meta().await {
tracing::warn!(error = %e, "Failed to persist consolidation timestamp");
}
let agent = session_guard.create_agent_with_notify(notify_tx)?;
(agent, history, system_prompt)
}; // session lock released — send_message can now lock freely
// Phase 2: LLM call (no session lock held)
let result = match agent.process(history).await { let result = match agent.process(history).await {
Ok(r) => r, Ok(r) => r,
Err(AgentError::LlmError(ref msg)) Err(AgentError::LlmError(ref msg))
if is_context_overflow_error(msg) => if is_context_overflow_error(msg) =>
{ {
let retry_history = { let new_window = crate::agent::ContextCompressor::parse_context_limit_from_error(msg)
let mut session_guard = session.lock().await; .unwrap_or(session_guard.compressor_threshold());
let new_window = crate::agent::ContextCompressor::parse_context_limit_from_error(msg) tracing::warn!(
.unwrap_or(session_guard.compressor_threshold()); new_window,
tracing::warn!( error = %msg,
new_window, "Context overflow in handle_message — retrying with tighter window"
error = %msg, );
"Context overflow in handle_message — retrying with tighter window" session_guard.compressor.set_context_window(new_window);
); let raw = session_guard.get_history().to_vec();
session_guard.compressor.set_context_window(new_window); let retry_result = session_guard.compressor.compress_if_needed(raw).await?;
let raw = session_guard.get_history().to_vec(); if retry_result.created_timelines {
let retry_result = session_guard.compressor.compress_if_needed(raw).await?; session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
if retry_result.created_timelines { if let Err(e) = session_guard.persist_session_meta().await {
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis()); tracing::warn!(error = %e, "Failed to persist compression marker on retry");
if let Err(e) = session_guard.persist_session_meta().await {
tracing::warn!(error = %e, "Failed to persist compression marker on retry");
}
} }
let mut retry = retry_result.history;
retry.insert(0, ChatMessage::system(system_prompt.clone()));
retry
}; // lock released again for retry
agent.process(retry_history).await
.inspect_err(|e| {
tracing::error!(error = %e, "Agent retry after context compression failed");
})?
}
Err(e) => {
tracing::error!(error = %e, "Agent processing error — propagating to caller");
return Err(e);
},
};
// Phase 3: persist results under session lock
let response: String = {
let mut session_guard = session.lock().await;
for msg in result.emitted_messages {
session_guard.add_message(msg, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
}
if session_guard.should_generate_title()
&& let Err(e) = session_guard.generate_title().await {
tracing::warn!("failed to generate title: {}", e);
} }
let mut retry = retry_result.history;
result.final_response.content retry.insert(0, ChatMessage::system(system_prompt));
agent.process(retry).await?
}
Err(e) => return Err(e),
}; };
#[cfg(debug_assertions)] for msg in result.emitted_messages {
tracing::debug!( session_guard.add_message(msg, true).await
channel = %channel, .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
chat_id = %chat_id, }
response_len = %response.len(),
"Agent response received"
);
Ok(HandleResult::AgentResponse(response)) // Check if we need to generate a title (after 10 user messages)
}).await if session_guard.should_generate_title()
&& let Err(e) = session_guard.generate_title().await {
tracing::warn!("failed to generate title: {}", e);
}
result.final_response.content
};
#[cfg(debug_assertions)]
tracing::debug!(
channel = %channel,
chat_id = %chat_id,
response_len = %response.len(),
"Agent response received"
);
*self.current_source_session.lock().await = None;
Ok(HandleResult::AgentResponse(response))
} }
/// Handle a message triggered by a scheduled cron job. /// Handle a message triggered by a scheduled cron job.
/// ///
/// Runs in a stateless manner: no session creation, no history persistence. /// This is similar to `handle_message`, but the user message is created with
/// The cron system prompt instructs the LLM to deliver results via the /// `SourceKind::ExternalTrigger` source metadata so that the cron job identity
/// `send_message` tool, which handles both delivery and history writing /// is preserved in the conversation history and database.
/// on the target session.
pub async fn handle_cron_message( pub async fn handle_cron_message(
&self, &self,
channel: &str, channel: &str,
@ -1473,45 +1450,128 @@ impl SessionManager {
job_id: &str, job_id: &str,
job_name: &str, job_name: &str,
) -> Result<HandleResult, AgentError> { ) -> Result<HandleResult, AgentError> {
let skills_prompt = self.skills_loader.build_skills_prompt(); use crate::bus::{MessageSource, SourceKind};
let base_prompt = build_system_prompt( let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
&self.provider_config.workspace_dir, *self.current_source_session.lock().await = Some(unified_id.to_string());
&self.provider_config.model_id, tracing::debug!(unified_id = %unified_id, job_id = %job_id, "handle_cron_message resolved");
&self.tools,
Some(&format!("cron:{}:{}", job_name, job_id)), let session = self.get_or_create_session(&unified_id).await?;
None,
false, let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
{
use std::collections::HashMap;
use crate::bus::OutboundMessage;
let bus = self.bus.clone();
let ch = channel.to_string();
let cid = chat_id.to_string();
tokio::spawn(async move {
while let Some(notif) = notify_rx.recv().await {
let mut metadata = HashMap::new();
metadata.insert("_type".to_string(), "notification".to_string());
let outbound = OutboundMessage {
channel: ch.clone(),
chat_id: cid.clone(),
content: notif,
reply_to: None,
media: vec![],
metadata,
};
let _ = bus.publish_outbound(outbound).await;
}
});
}
let response: String = {
let mut session_guard = session.lock().await;
let source = MessageSource {
kind: SourceKind::ExternalTrigger,
from_channel: Some(channel.to_string()),
from_session: None,
from_user_id: None,
system_name: Some(job_name.to_string()),
task_id: Some(job_id.to_string()),
};
let user_message = session_guard.create_user_message_with_source(prompt, vec![], source);
session_guard.add_message(user_message, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
let history = session_guard.get_history().to_vec();
let skills_prompt = self.skills_loader.build_skills_prompt();
let system_prompt = session_guard.build_system_prompt(&skills_prompt, None);
let cron_context = format!(
"\n\n## 定时任务执行\n\n\
{}({})\n\
: {}:{}\n\n\
\n\
- \n\
- \n\
- \n\
- 使 send_message \n\
- ",
job_name, job_id, channel, chat_id
);
let full_system_prompt = format!("{}{}", system_prompt, cron_context);
// Inject system prompt AFTER compression so it doesn't participate
// in context compression (system prompt is dynamic and should not be persisted).
let mut history = session_guard.compressor
.compress_if_needed(history)
.await?
.history;
history.insert(0, ChatMessage::system(full_system_prompt));
let agent = session_guard.create_agent_with_notify(notify_tx)?;
let result = agent.process(history).await?;
for msg in result.emitted_messages {
session_guard.add_message(msg, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
}
if session_guard.should_generate_title()
&& let Err(e) = session_guard.generate_title().await {
tracing::warn!("failed to generate title: {}", e);
}
let raw_response = result.final_response.content;
let prefix = format!(
"[message from cron:{}({})]\n",
job_name, job_id
);
let prefixed_response = format!("{}{}", prefix, raw_response);
let source = MessageSource {
kind: SourceKind::CrossChannel,
from_channel: Some("cron".to_string()),
from_session: Some(format!("{}:{}", job_name, job_id)),
from_user_id: None,
system_name: Some(job_name.to_string()),
task_id: Some(job_id.to_string()),
};
let msg = ChatMessage::assistant_with_source(prefixed_response.clone(), source);
session_guard.add_message(msg, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
prefixed_response
};
#[cfg(debug_assertions)]
tracing::debug!(
channel = %channel,
chat_id = %chat_id,
job_id = %job_id,
response_len = %response.len(),
"Cron agent response received"
); );
let cron_context = format!(
"## 定时任务执行\n\n\
{job_name}({job_id})\n\
: {channel}:{chat_id}\n\n\
:\n\
- \n\
- 使 send_message \n\
- send_message : target_chat_id=\"{channel}:{chat_id}\", content=\"消息内容\"\n\
- send_message \n\
- "
);
let full_system_prompt = format!("{}\n\n{}\n\n{}", base_prompt, skills_prompt, cron_context);
let history = vec![ *self.current_source_session.lock().await = None;
ChatMessage::system(full_system_prompt),
ChatMessage::user(prompt),
];
let agent = self.create_cron_agent()?; Ok(HandleResult::AgentResponse(response))
let source_session = format!("cron:{}", job_name);
let result = CURRENT_SOURCE_SESSION.scope(Some(source_session), async {
agent.process(history).await
})
.await
.inspect_err(|e| {
tracing::error!(error = %e, job_id = %job_id, "Cron agent processing error");
})?;
Ok(HandleResult::AgentResponse(result.final_response.content))
} }
pub async fn clear_session_history(&self, unified_id: &UnifiedSessionId) -> Result<(), AgentError> { pub async fn clear_session_history(&self, unified_id: &UnifiedSessionId) -> Result<(), AgentError> {
@ -1543,7 +1603,7 @@ impl OutboundMessenger for SessionManager {
) -> Result<(), String> { ) -> Result<(), String> {
// Fill origin from current source session if not provided // Fill origin from current source session if not provided
if source.from_session.is_none() { if source.from_session.is_none() {
source.from_session = CURRENT_SOURCE_SESSION.try_with(|v| v.clone()).ok().flatten(); source.from_session = self.current_source_session.lock().await.clone();
} }
let (target_sid, session) = if let Some(did) = dialog_id { let (target_sid, session) = if let Some(did) = dialog_id {