Compare commits
No commits in common. "ebbf7e403617fb33568ac1465973f29edcf33f54" and "1e69fa3bd1839111d41e0464d979556f68c3fb76" have entirely different histories.
ebbf7e4036
...
1e69fa3bd1
@ -228,6 +228,7 @@ impl GatewayState {
|
|||||||
let sched = Arc::new(Scheduler::new(
|
let sched = Arc::new(Scheduler::new(
|
||||||
self.storage.clone(),
|
self.storage.clone(),
|
||||||
self.session_manager.clone(),
|
self.session_manager.clone(),
|
||||||
|
self.channel_manager.bus(),
|
||||||
scheduler_config,
|
scheduler_config,
|
||||||
));
|
));
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|||||||
@ -2,7 +2,6 @@ use async_trait::async_trait;
|
|||||||
use reqwest::Client;
|
use reqwest::Client;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use crate::bus::message::ContentBlock;
|
use crate::bus::message::ContentBlock;
|
||||||
use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, Tool, ToolCall};
|
use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, Tool, ToolCall};
|
||||||
@ -10,8 +9,6 @@ use super::traits::Usage;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use crate::storage::Storage;
|
use crate::storage::Storage;
|
||||||
|
|
||||||
const LLM_REQUEST_TIMEOUT_SECS: u64 = 300;
|
|
||||||
|
|
||||||
fn convert_content_blocks(blocks: &[ContentBlock]) -> Vec<serde_json::Value> {
|
fn convert_content_blocks(blocks: &[ContentBlock]) -> Vec<serde_json::Value> {
|
||||||
blocks.iter().map(|b| match b {
|
blocks.iter().map(|b| match b {
|
||||||
ContentBlock::Text { text } => {
|
ContentBlock::Text { text } => {
|
||||||
@ -75,10 +72,7 @@ impl AnthropicProvider {
|
|||||||
model_extra: HashMap<String, serde_json::Value>,
|
model_extra: HashMap<String, serde_json::Value>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
client: Client::builder()
|
client: Client::new(),
|
||||||
.timeout(Duration::from_secs(LLM_REQUEST_TIMEOUT_SECS))
|
|
||||||
.build()
|
|
||||||
.unwrap_or_else(|_| Client::new()),
|
|
||||||
name,
|
name,
|
||||||
api_key,
|
api_key,
|
||||||
base_url,
|
base_url,
|
||||||
@ -244,19 +238,7 @@ impl LLMProvider for AnthropicProvider {
|
|||||||
let req_body_str = serde_json::to_string_pretty(&body).unwrap_or_default();
|
let req_body_str = serde_json::to_string_pretty(&body).unwrap_or_default();
|
||||||
tracing::debug!(req_body = %req_body_str, "LLM request");
|
tracing::debug!(req_body = %req_body_str, "LLM request");
|
||||||
|
|
||||||
let resp = req_builder.json(&body).send().await
|
let resp = req_builder.json(&body).send().await?;
|
||||||
.inspect_err(|e| {
|
|
||||||
let is_timeout = e.is_timeout();
|
|
||||||
tracing::error!(
|
|
||||||
provider = %self.name,
|
|
||||||
model = %self.model_id,
|
|
||||||
url = %url,
|
|
||||||
timeout = is_timeout,
|
|
||||||
error = %e,
|
|
||||||
elapsed_ms = %start.elapsed().as_millis(),
|
|
||||||
"LLM API request failed"
|
|
||||||
);
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let status = resp.status();
|
let status = resp.status();
|
||||||
let body_text = resp.text().await?;
|
let body_text = resp.text().await?;
|
||||||
@ -272,14 +254,6 @@ impl LLMProvider for AnthropicProvider {
|
|||||||
.map(|s| s.to_string())
|
.map(|s| s.to_string())
|
||||||
})
|
})
|
||||||
.unwrap_or_else(|| body_text.clone());
|
.unwrap_or_else(|| body_text.clone());
|
||||||
tracing::error!(
|
|
||||||
provider = %self.name,
|
|
||||||
model = %self.model_id,
|
|
||||||
http_status = %status,
|
|
||||||
error = %error_msg,
|
|
||||||
elapsed_ms = %start.elapsed().as_millis(),
|
|
||||||
"LLM API returned error"
|
|
||||||
);
|
|
||||||
if let Some(ref storage) = self.storage {
|
if let Some(ref storage) = self.storage {
|
||||||
let _ = storage.append_llm_call(
|
let _ = storage.append_llm_call(
|
||||||
&self.name, &self.model_id, &req_body_str,
|
&self.name, &self.model_id, &req_body_str,
|
||||||
|
|||||||
@ -3,7 +3,6 @@ use reqwest::Client;
|
|||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_json::{json, Value};
|
use serde_json::{json, Value};
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use crate::bus::message::ContentBlock;
|
use crate::bus::message::ContentBlock;
|
||||||
use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, ToolCall};
|
use super::{ChatCompletionRequest, ChatCompletionResponse, LLMProvider, ToolCall};
|
||||||
@ -11,8 +10,6 @@ use super::traits::Usage;
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use crate::storage::Storage;
|
use crate::storage::Storage;
|
||||||
|
|
||||||
const LLM_REQUEST_TIMEOUT_SECS: u64 = 300;
|
|
||||||
|
|
||||||
fn convert_content_blocks(blocks: &[ContentBlock]) -> Value {
|
fn convert_content_blocks(blocks: &[ContentBlock]) -> Value {
|
||||||
if blocks.len() == 1
|
if blocks.len() == 1
|
||||||
&& let ContentBlock::Text { text } = &blocks[0] {
|
&& let ContentBlock::Text { text } = &blocks[0] {
|
||||||
@ -51,10 +48,7 @@ impl OpenAIProvider {
|
|||||||
model_extra: HashMap<String, serde_json::Value>,
|
model_extra: HashMap<String, serde_json::Value>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
client: Client::builder()
|
client: Client::new(),
|
||||||
.timeout(Duration::from_secs(LLM_REQUEST_TIMEOUT_SECS))
|
|
||||||
.build()
|
|
||||||
.unwrap_or_else(|_| Client::new()),
|
|
||||||
name,
|
name,
|
||||||
api_key,
|
api_key,
|
||||||
base_url,
|
base_url,
|
||||||
@ -212,19 +206,7 @@ impl LLMProvider for OpenAIProvider {
|
|||||||
let req_body_str = serde_json::to_string_pretty(&body).unwrap_or_default();
|
let req_body_str = serde_json::to_string_pretty(&body).unwrap_or_default();
|
||||||
tracing::debug!(req_body = %req_body_str, "LLM request");
|
tracing::debug!(req_body = %req_body_str, "LLM request");
|
||||||
|
|
||||||
let resp = req_builder.json(&body).send().await
|
let resp = req_builder.json(&body).send().await?;
|
||||||
.inspect_err(|e| {
|
|
||||||
let is_timeout = e.is_timeout();
|
|
||||||
tracing::error!(
|
|
||||||
provider = %self.name,
|
|
||||||
model = %self.model_id,
|
|
||||||
url = %url,
|
|
||||||
timeout = is_timeout,
|
|
||||||
error = %e,
|
|
||||||
elapsed_ms = %start.elapsed().as_millis(),
|
|
||||||
"LLM API request failed"
|
|
||||||
);
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let status = resp.status();
|
let status = resp.status();
|
||||||
let text = resp.text().await?;
|
let text = resp.text().await?;
|
||||||
@ -232,14 +214,6 @@ impl LLMProvider for OpenAIProvider {
|
|||||||
|
|
||||||
if !status.is_success() {
|
if !status.is_success() {
|
||||||
let error = format!("API error {}: {}", status, text);
|
let error = format!("API error {}: {}", status, text);
|
||||||
tracing::error!(
|
|
||||||
provider = %self.name,
|
|
||||||
model = %self.model_id,
|
|
||||||
http_status = %status,
|
|
||||||
error = %error,
|
|
||||||
elapsed_ms = %start.elapsed().as_millis(),
|
|
||||||
"LLM API returned error"
|
|
||||||
);
|
|
||||||
if let Some(ref storage) = self.storage
|
if let Some(ref storage) = self.storage
|
||||||
&& let Err(e) = storage.append_llm_call(
|
&& let Err(e) = storage.append_llm_call(
|
||||||
&self.name, &self.model_id, &req_body_str,
|
&self.name, &self.model_id, &req_body_str,
|
||||||
|
|||||||
@ -4,12 +4,11 @@ use std::sync::Arc;
|
|||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
|
|
||||||
|
use crate::bus::MessageBus;
|
||||||
use crate::config::SchedulerConfig;
|
use crate::config::SchedulerConfig;
|
||||||
use crate::session::session::HandleResult;
|
use crate::session::session::HandleResult;
|
||||||
use crate::session::SessionManager;
|
use crate::session::SessionManager;
|
||||||
use crate::storage::ScheduledJob;
|
use crate::storage::{JobRun, ScheduledJob, Storage};
|
||||||
use crate::storage::Storage;
|
|
||||||
use crate::storage::JobRun;
|
|
||||||
|
|
||||||
pub use types::Schedule;
|
pub use types::Schedule;
|
||||||
|
|
||||||
@ -54,6 +53,7 @@ fn now_ms() -> i64 {
|
|||||||
pub struct Scheduler {
|
pub struct Scheduler {
|
||||||
storage: Arc<Storage>,
|
storage: Arc<Storage>,
|
||||||
session_manager: Arc<SessionManager>,
|
session_manager: Arc<SessionManager>,
|
||||||
|
bus: Arc<MessageBus>,
|
||||||
config: SchedulerConfig,
|
config: SchedulerConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -61,11 +61,13 @@ impl Scheduler {
|
|||||||
pub fn new(
|
pub fn new(
|
||||||
storage: Arc<Storage>,
|
storage: Arc<Storage>,
|
||||||
session_manager: Arc<SessionManager>,
|
session_manager: Arc<SessionManager>,
|
||||||
|
bus: Arc<MessageBus>,
|
||||||
config: SchedulerConfig,
|
config: SchedulerConfig,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
storage,
|
storage,
|
||||||
session_manager,
|
session_manager,
|
||||||
|
bus,
|
||||||
config,
|
config,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -134,6 +136,18 @@ impl Scheduler {
|
|||||||
|
|
||||||
match result {
|
match result {
|
||||||
Ok(HandleResult::AgentResponse(output)) => {
|
Ok(HandleResult::AgentResponse(output)) => {
|
||||||
|
let outbound = crate::bus::OutboundMessage {
|
||||||
|
channel: job.channel.clone(),
|
||||||
|
chat_id: job.chat_id.clone(),
|
||||||
|
content: output.clone(),
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata: std::collections::HashMap::new(),
|
||||||
|
};
|
||||||
|
if let Err(e) = self.bus.publish_outbound(outbound).await {
|
||||||
|
tracing::warn!(job_id = %job.id, "scheduler: failed to publish outbound: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
let output_truncated = if output.len() > 8000 {
|
let output_truncated = if output.len() > 8000 {
|
||||||
format!("{}...[truncated]", &output[..output.ceil_char_boundary(8000)])
|
format!("{}...[truncated]", &output[..output.ceil_char_boundary(8000)])
|
||||||
} else {
|
} else {
|
||||||
@ -166,6 +180,18 @@ impl Scheduler {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
Ok(HandleResult::CommandOutput(output)) => {
|
Ok(HandleResult::CommandOutput(output)) => {
|
||||||
|
let outbound = crate::bus::OutboundMessage {
|
||||||
|
channel: job.channel.clone(),
|
||||||
|
chat_id: job.chat_id.clone(),
|
||||||
|
content: output.clone(),
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata: std::collections::HashMap::new(),
|
||||||
|
};
|
||||||
|
if let Err(e) = self.bus.publish_outbound(outbound).await {
|
||||||
|
tracing::warn!(job_id = %job.id, "scheduler: failed to publish outbound: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
let run = JobRun {
|
let run = JobRun {
|
||||||
id: 0,
|
id: 0,
|
||||||
job_id: job.id.clone(),
|
job_id: job.id.clone(),
|
||||||
|
|||||||
@ -7,10 +7,6 @@ use crate::bus::{ChatMessage, MediaItem, MessageSource, OutboundMessage, SourceK
|
|||||||
use crate::storage::{Storage, StorageError};
|
use crate::storage::{Storage, StorageError};
|
||||||
use std::sync::Arc as StdArc;
|
use std::sync::Arc as StdArc;
|
||||||
|
|
||||||
tokio::task_local! {
|
|
||||||
static CURRENT_SOURCE_SESSION: Option<String>;
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Result of handling a message - either an AI response or a command output
|
/// Result of handling a message - either an AI response or a command output
|
||||||
pub enum HandleResult {
|
pub enum HandleResult {
|
||||||
/// AI response to be sent as AssistantResponse
|
/// AI response to be sent as AssistantResponse
|
||||||
@ -721,6 +717,7 @@ pub struct SessionManager {
|
|||||||
skills_loader: Arc<SkillsLoader>,
|
skills_loader: Arc<SkillsLoader>,
|
||||||
storage: Arc<Storage>,
|
storage: Arc<Storage>,
|
||||||
bus: Arc<MessageBus>,
|
bus: Arc<MessageBus>,
|
||||||
|
current_source_session: Arc<Mutex<Option<String>>>,
|
||||||
memory_manager: Arc<crate::memory::MemoryManager>,
|
memory_manager: Arc<crate::memory::MemoryManager>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -825,6 +822,7 @@ impl SessionManager {
|
|||||||
skills_loader,
|
skills_loader,
|
||||||
storage,
|
storage,
|
||||||
bus,
|
bus,
|
||||||
|
current_source_session: Arc::new(Mutex::new(None)),
|
||||||
memory_manager,
|
memory_manager,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -839,19 +837,6 @@ impl SessionManager {
|
|||||||
self.tools.clone()
|
self.tools.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// 为定时任务创建一个无 session 绑定的 AgentLoop
|
|
||||||
pub fn create_cron_agent(&self) -> Result<AgentLoop, AgentError> {
|
|
||||||
let provider = create_provider(self.provider_config.clone())
|
|
||||||
.map_err(|e| AgentError::Other(format!("failed to create cron provider: {}", e)))?;
|
|
||||||
Ok(AgentLoop::with_provider_and_tools(
|
|
||||||
Arc::from(provider),
|
|
||||||
self.tools.clone(),
|
|
||||||
self.provider_config.max_tool_iterations,
|
|
||||||
self.provider_config.model_id.clone(),
|
|
||||||
self.provider_config.workspace_dir.clone(),
|
|
||||||
).with_context_window(self.provider_config.token_limit))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// 获取所有可用的斜杠命令
|
/// 获取所有可用的斜杠命令
|
||||||
pub fn get_slash_commands(&self) -> &[SlashCommand] {
|
pub fn get_slash_commands(&self) -> &[SlashCommand] {
|
||||||
SLASH_COMMANDS
|
SLASH_COMMANDS
|
||||||
@ -1288,183 +1273,175 @@ impl SessionManager {
|
|||||||
media: Vec<MediaItem>,
|
media: Vec<MediaItem>,
|
||||||
) -> Result<HandleResult, AgentError> {
|
) -> Result<HandleResult, AgentError> {
|
||||||
let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
|
let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
|
||||||
|
*self.current_source_session.lock().await = Some(unified_id.to_string());
|
||||||
tracing::debug!(unified_id = %unified_id, "handle_message resolved unified_id");
|
tracing::debug!(unified_id = %unified_id, "handle_message resolved unified_id");
|
||||||
let session = self.get_or_create_session(&unified_id).await?;
|
let session = self.get_or_create_session(&unified_id).await?;
|
||||||
|
|
||||||
CURRENT_SOURCE_SESSION.scope(Some(unified_id.to_string()), async {
|
// Check for slash command
|
||||||
// Check for slash command
|
if let Some((cmd_name, cmd_args)) = parse_slash_command(content) {
|
||||||
if let Some((cmd_name, cmd_args)) = parse_slash_command(content) {
|
let result = self.execute_slash_command(
|
||||||
let result = self.execute_slash_command(
|
cmd_name,
|
||||||
cmd_name,
|
if cmd_args.is_empty() { None } else { Some(cmd_args) },
|
||||||
if cmd_args.is_empty() { None } else { Some(cmd_args) },
|
channel,
|
||||||
channel,
|
chat_id,
|
||||||
chat_id,
|
Some(&unified_id),
|
||||||
Some(&unified_id),
|
).await;
|
||||||
).await;
|
|
||||||
|
|
||||||
return match result {
|
match result {
|
||||||
Ok((_new_session_id, response)) => Ok(HandleResult::CommandOutput(response)),
|
Ok((_new_session_id, response)) => {
|
||||||
Err(e) => Ok(HandleResult::CommandOutput(e.to_string())),
|
*self.current_source_session.lock().await = None;
|
||||||
};
|
return Ok(HandleResult::CommandOutput(response));
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
*self.current_source_session.lock().await = None;
|
||||||
|
return Ok(HandleResult::CommandOutput(e.to_string()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Normal message handling through LLM
|
||||||
|
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
// Spawn notification publisher — sends immediately when tools are detected
|
||||||
|
{
|
||||||
|
let bus = self.bus.clone();
|
||||||
|
let ch = channel.to_string();
|
||||||
|
let cid = chat_id.to_string();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(notif) = notify_rx.recv().await {
|
||||||
|
let mut metadata = HashMap::new();
|
||||||
|
metadata.insert("_type".to_string(), "notification".to_string());
|
||||||
|
let outbound = OutboundMessage {
|
||||||
|
channel: ch.clone(),
|
||||||
|
chat_id: cid.clone(),
|
||||||
|
content: notif,
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata,
|
||||||
|
};
|
||||||
|
let _ = bus.publish_outbound(outbound).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let response: String = {
|
||||||
|
let mut session_guard = session.lock().await;
|
||||||
|
|
||||||
|
let media_refs: Vec<String> = media.iter().map(|m| m.path.clone()).collect();
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
if !media_refs.is_empty() {
|
||||||
|
tracing::debug!(media_count = %media.len(), media_refs = ?media_refs, "Adding user message with media");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Normal message handling through LLM
|
let user_message = session_guard.create_user_message(content, media_refs);
|
||||||
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
|
session_guard.add_message(user_message, true).await
|
||||||
|
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
||||||
|
|
||||||
// Spawn notification publisher — sends immediately when tools are detected
|
let history = session_guard.get_history().to_vec();
|
||||||
{
|
|
||||||
let bus = self.bus.clone();
|
// Build skills prompt
|
||||||
let ch = channel.to_string();
|
let skills_prompt = self.skills_loader.build_skills_prompt();
|
||||||
let cid = chat_id.to_string();
|
|
||||||
tokio::spawn(async move {
|
// Fetch memory context
|
||||||
while let Some(notif) = notify_rx.recv().await {
|
let memory_context = match self.memory_manager.recall(content, 5, Some(crate::memory::MemoryCategory::Knowledge), None).await {
|
||||||
let mut metadata = HashMap::new();
|
Ok(entries) if !entries.is_empty() => {
|
||||||
metadata.insert("_type".to_string(), "notification".to_string());
|
Some(entries.iter()
|
||||||
let outbound = OutboundMessage {
|
.map(|e| format!("- {}: {}", e.key, e.content))
|
||||||
channel: ch.clone(),
|
.collect::<Vec<_>>()
|
||||||
chat_id: cid.clone(),
|
.join("\n"))
|
||||||
content: notif,
|
}
|
||||||
reply_to: None,
|
Err(e) => {
|
||||||
media: vec![],
|
tracing::warn!(error = %e, "Failed to fetch memory context");
|
||||||
metadata,
|
None
|
||||||
};
|
}
|
||||||
let _ = bus.publish_outbound(outbound).await;
|
_ => None,
|
||||||
}
|
};
|
||||||
});
|
|
||||||
|
// Build combined system prompt and inject at position 0 AFTER compression.
|
||||||
|
// This ensures AgentLoop.process() sees a system message without it participating
|
||||||
|
// in context compression (system prompt is dynamic and should not be persisted).
|
||||||
|
let system_prompt = session_guard.build_system_prompt(&skills_prompt, memory_context.as_deref());
|
||||||
|
|
||||||
|
let result = session_guard.compressor
|
||||||
|
.compress_if_needed(history)
|
||||||
|
.await?;
|
||||||
|
if result.created_timelines {
|
||||||
|
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
|
||||||
|
}
|
||||||
|
let mut history = result.history;
|
||||||
|
|
||||||
|
history.insert(0, ChatMessage::system(system_prompt.clone()));
|
||||||
|
|
||||||
|
// Persist consolidation state
|
||||||
|
let now = chrono::Utc::now().timestamp_millis();
|
||||||
|
session_guard.last_consolidated_at = Some(now);
|
||||||
|
if let Err(e) = session_guard.persist_session_meta().await {
|
||||||
|
tracing::warn!(error = %e, "Failed to persist consolidation timestamp");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Phase 1: prepare data under session lock
|
let agent = session_guard.create_agent_with_notify(notify_tx)?;
|
||||||
let (agent, history, system_prompt) = {
|
|
||||||
let mut session_guard = session.lock().await;
|
|
||||||
|
|
||||||
let media_refs: Vec<String> = media.iter().map(|m| m.path.clone()).collect();
|
// Try LLM call; on context overflow, re-compress with tighter window and retry once.
|
||||||
#[cfg(debug_assertions)]
|
|
||||||
if !media_refs.is_empty() {
|
|
||||||
tracing::debug!(media_count = %media.len(), media_refs = ?media_refs, "Adding user message with media");
|
|
||||||
}
|
|
||||||
|
|
||||||
let user_message = session_guard.create_user_message(content, media_refs);
|
|
||||||
session_guard.add_message(user_message, true).await
|
|
||||||
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
|
||||||
|
|
||||||
let history = session_guard.get_history().to_vec();
|
|
||||||
|
|
||||||
let skills_prompt = self.skills_loader.build_skills_prompt();
|
|
||||||
|
|
||||||
let memory_context = match self.memory_manager.recall(content, 5, Some(crate::memory::MemoryCategory::Knowledge), None).await {
|
|
||||||
Ok(entries) if !entries.is_empty() => {
|
|
||||||
Some(entries.iter()
|
|
||||||
.map(|e| format!("- {}: {}", e.key, e.content))
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.join("\n"))
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::warn!(error = %e, "Failed to fetch memory context");
|
|
||||||
None
|
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
|
|
||||||
let system_prompt = session_guard.build_system_prompt(&skills_prompt, memory_context.as_deref());
|
|
||||||
|
|
||||||
let result = session_guard.compressor
|
|
||||||
.compress_if_needed(history)
|
|
||||||
.await
|
|
||||||
.inspect_err(|e| {
|
|
||||||
tracing::warn!(error = %e, "Context compression failed in handle_message");
|
|
||||||
})?;
|
|
||||||
if result.created_timelines {
|
|
||||||
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
|
|
||||||
}
|
|
||||||
let mut history = result.history;
|
|
||||||
|
|
||||||
history.insert(0, ChatMessage::system(system_prompt.clone()));
|
|
||||||
|
|
||||||
let now = chrono::Utc::now().timestamp_millis();
|
|
||||||
session_guard.last_consolidated_at = Some(now);
|
|
||||||
if let Err(e) = session_guard.persist_session_meta().await {
|
|
||||||
tracing::warn!(error = %e, "Failed to persist consolidation timestamp");
|
|
||||||
}
|
|
||||||
|
|
||||||
let agent = session_guard.create_agent_with_notify(notify_tx)?;
|
|
||||||
(agent, history, system_prompt)
|
|
||||||
}; // session lock released — send_message can now lock freely
|
|
||||||
|
|
||||||
// Phase 2: LLM call (no session lock held)
|
|
||||||
let result = match agent.process(history).await {
|
let result = match agent.process(history).await {
|
||||||
Ok(r) => r,
|
Ok(r) => r,
|
||||||
Err(AgentError::LlmError(ref msg))
|
Err(AgentError::LlmError(ref msg))
|
||||||
if is_context_overflow_error(msg) =>
|
if is_context_overflow_error(msg) =>
|
||||||
{
|
{
|
||||||
let retry_history = {
|
let new_window = crate::agent::ContextCompressor::parse_context_limit_from_error(msg)
|
||||||
let mut session_guard = session.lock().await;
|
.unwrap_or(session_guard.compressor_threshold());
|
||||||
let new_window = crate::agent::ContextCompressor::parse_context_limit_from_error(msg)
|
tracing::warn!(
|
||||||
.unwrap_or(session_guard.compressor_threshold());
|
new_window,
|
||||||
tracing::warn!(
|
error = %msg,
|
||||||
new_window,
|
"Context overflow in handle_message — retrying with tighter window"
|
||||||
error = %msg,
|
);
|
||||||
"Context overflow in handle_message — retrying with tighter window"
|
session_guard.compressor.set_context_window(new_window);
|
||||||
);
|
let raw = session_guard.get_history().to_vec();
|
||||||
session_guard.compressor.set_context_window(new_window);
|
let retry_result = session_guard.compressor.compress_if_needed(raw).await?;
|
||||||
let raw = session_guard.get_history().to_vec();
|
if retry_result.created_timelines {
|
||||||
let retry_result = session_guard.compressor.compress_if_needed(raw).await?;
|
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
|
||||||
if retry_result.created_timelines {
|
if let Err(e) = session_guard.persist_session_meta().await {
|
||||||
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
|
tracing::warn!(error = %e, "Failed to persist compression marker on retry");
|
||||||
if let Err(e) = session_guard.persist_session_meta().await {
|
|
||||||
tracing::warn!(error = %e, "Failed to persist compression marker on retry");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
let mut retry = retry_result.history;
|
|
||||||
retry.insert(0, ChatMessage::system(system_prompt.clone()));
|
|
||||||
retry
|
|
||||||
}; // lock released again for retry
|
|
||||||
|
|
||||||
agent.process(retry_history).await
|
|
||||||
.inspect_err(|e| {
|
|
||||||
tracing::error!(error = %e, "Agent retry after context compression failed");
|
|
||||||
})?
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
tracing::error!(error = %e, "Agent processing error — propagating to caller");
|
|
||||||
return Err(e);
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
// Phase 3: persist results under session lock
|
|
||||||
let response: String = {
|
|
||||||
let mut session_guard = session.lock().await;
|
|
||||||
|
|
||||||
for msg in result.emitted_messages {
|
|
||||||
session_guard.add_message(msg, true).await
|
|
||||||
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if session_guard.should_generate_title()
|
|
||||||
&& let Err(e) = session_guard.generate_title().await {
|
|
||||||
tracing::warn!("failed to generate title: {}", e);
|
|
||||||
}
|
}
|
||||||
|
let mut retry = retry_result.history;
|
||||||
result.final_response.content
|
retry.insert(0, ChatMessage::system(system_prompt));
|
||||||
|
agent.process(retry).await?
|
||||||
|
}
|
||||||
|
Err(e) => return Err(e),
|
||||||
};
|
};
|
||||||
|
|
||||||
#[cfg(debug_assertions)]
|
for msg in result.emitted_messages {
|
||||||
tracing::debug!(
|
session_guard.add_message(msg, true).await
|
||||||
channel = %channel,
|
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
||||||
chat_id = %chat_id,
|
}
|
||||||
response_len = %response.len(),
|
|
||||||
"Agent response received"
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(HandleResult::AgentResponse(response))
|
// Check if we need to generate a title (after 10 user messages)
|
||||||
}).await
|
if session_guard.should_generate_title()
|
||||||
|
&& let Err(e) = session_guard.generate_title().await {
|
||||||
|
tracing::warn!("failed to generate title: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
result.final_response.content
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
tracing::debug!(
|
||||||
|
channel = %channel,
|
||||||
|
chat_id = %chat_id,
|
||||||
|
response_len = %response.len(),
|
||||||
|
"Agent response received"
|
||||||
|
);
|
||||||
|
|
||||||
|
*self.current_source_session.lock().await = None;
|
||||||
|
|
||||||
|
Ok(HandleResult::AgentResponse(response))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a message triggered by a scheduled cron job.
|
/// Handle a message triggered by a scheduled cron job.
|
||||||
///
|
///
|
||||||
/// Runs in a stateless manner: no session creation, no history persistence.
|
/// This is similar to `handle_message`, but the user message is created with
|
||||||
/// The cron system prompt instructs the LLM to deliver results via the
|
/// `SourceKind::ExternalTrigger` source metadata so that the cron job identity
|
||||||
/// `send_message` tool, which handles both delivery and history writing
|
/// is preserved in the conversation history and database.
|
||||||
/// on the target session.
|
|
||||||
pub async fn handle_cron_message(
|
pub async fn handle_cron_message(
|
||||||
&self,
|
&self,
|
||||||
channel: &str,
|
channel: &str,
|
||||||
@ -1473,45 +1450,128 @@ impl SessionManager {
|
|||||||
job_id: &str,
|
job_id: &str,
|
||||||
job_name: &str,
|
job_name: &str,
|
||||||
) -> Result<HandleResult, AgentError> {
|
) -> Result<HandleResult, AgentError> {
|
||||||
let skills_prompt = self.skills_loader.build_skills_prompt();
|
use crate::bus::{MessageSource, SourceKind};
|
||||||
|
|
||||||
let base_prompt = build_system_prompt(
|
let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
|
||||||
&self.provider_config.workspace_dir,
|
*self.current_source_session.lock().await = Some(unified_id.to_string());
|
||||||
&self.provider_config.model_id,
|
tracing::debug!(unified_id = %unified_id, job_id = %job_id, "handle_cron_message resolved");
|
||||||
&self.tools,
|
|
||||||
Some(&format!("cron:{}:{}", job_name, job_id)),
|
let session = self.get_or_create_session(&unified_id).await?;
|
||||||
None,
|
|
||||||
false,
|
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
|
||||||
|
|
||||||
|
{
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use crate::bus::OutboundMessage;
|
||||||
|
let bus = self.bus.clone();
|
||||||
|
let ch = channel.to_string();
|
||||||
|
let cid = chat_id.to_string();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
while let Some(notif) = notify_rx.recv().await {
|
||||||
|
let mut metadata = HashMap::new();
|
||||||
|
metadata.insert("_type".to_string(), "notification".to_string());
|
||||||
|
let outbound = OutboundMessage {
|
||||||
|
channel: ch.clone(),
|
||||||
|
chat_id: cid.clone(),
|
||||||
|
content: notif,
|
||||||
|
reply_to: None,
|
||||||
|
media: vec![],
|
||||||
|
metadata,
|
||||||
|
};
|
||||||
|
let _ = bus.publish_outbound(outbound).await;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let response: String = {
|
||||||
|
let mut session_guard = session.lock().await;
|
||||||
|
|
||||||
|
let source = MessageSource {
|
||||||
|
kind: SourceKind::ExternalTrigger,
|
||||||
|
from_channel: Some(channel.to_string()),
|
||||||
|
from_session: None,
|
||||||
|
from_user_id: None,
|
||||||
|
system_name: Some(job_name.to_string()),
|
||||||
|
task_id: Some(job_id.to_string()),
|
||||||
|
};
|
||||||
|
let user_message = session_guard.create_user_message_with_source(prompt, vec![], source);
|
||||||
|
session_guard.add_message(user_message, true).await
|
||||||
|
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
||||||
|
|
||||||
|
let history = session_guard.get_history().to_vec();
|
||||||
|
|
||||||
|
let skills_prompt = self.skills_loader.build_skills_prompt();
|
||||||
|
let system_prompt = session_guard.build_system_prompt(&skills_prompt, None);
|
||||||
|
let cron_context = format!(
|
||||||
|
"\n\n## 定时任务执行\n\n\
|
||||||
|
你正在执行定时任务「{}」({})。\n\
|
||||||
|
目标渠道: {}:{}\n\n\
|
||||||
|
定时任务执行规则:\n\
|
||||||
|
- 这不是聊天对话,没有人会回复你,不要等待用户输入\n\
|
||||||
|
- 你的职责是根据任务指令直接生成要发送的消息内容\n\
|
||||||
|
- 只输出最终消息,不要输出中间思考过程或分析\n\
|
||||||
|
- 系统会自动将你的回复推送到目标渠道,不要使用 send_message 工具\n\
|
||||||
|
- 你的最终回复就是推送给用户的消息原文",
|
||||||
|
job_name, job_id, channel, chat_id
|
||||||
|
);
|
||||||
|
let full_system_prompt = format!("{}{}", system_prompt, cron_context);
|
||||||
|
|
||||||
|
// Inject system prompt AFTER compression so it doesn't participate
|
||||||
|
// in context compression (system prompt is dynamic and should not be persisted).
|
||||||
|
let mut history = session_guard.compressor
|
||||||
|
.compress_if_needed(history)
|
||||||
|
.await?
|
||||||
|
.history;
|
||||||
|
|
||||||
|
history.insert(0, ChatMessage::system(full_system_prompt));
|
||||||
|
|
||||||
|
let agent = session_guard.create_agent_with_notify(notify_tx)?;
|
||||||
|
let result = agent.process(history).await?;
|
||||||
|
|
||||||
|
for msg in result.emitted_messages {
|
||||||
|
session_guard.add_message(msg, true).await
|
||||||
|
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if session_guard.should_generate_title()
|
||||||
|
&& let Err(e) = session_guard.generate_title().await {
|
||||||
|
tracing::warn!("failed to generate title: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
let raw_response = result.final_response.content;
|
||||||
|
let prefix = format!(
|
||||||
|
"[message from cron:{}({})]\n",
|
||||||
|
job_name, job_id
|
||||||
|
);
|
||||||
|
let prefixed_response = format!("{}{}", prefix, raw_response);
|
||||||
|
|
||||||
|
let source = MessageSource {
|
||||||
|
kind: SourceKind::CrossChannel,
|
||||||
|
from_channel: Some("cron".to_string()),
|
||||||
|
from_session: Some(format!("{}:{}", job_name, job_id)),
|
||||||
|
from_user_id: None,
|
||||||
|
system_name: Some(job_name.to_string()),
|
||||||
|
task_id: Some(job_id.to_string()),
|
||||||
|
};
|
||||||
|
let msg = ChatMessage::assistant_with_source(prefixed_response.clone(), source);
|
||||||
|
session_guard.add_message(msg, true).await
|
||||||
|
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
|
||||||
|
|
||||||
|
prefixed_response
|
||||||
|
};
|
||||||
|
|
||||||
|
#[cfg(debug_assertions)]
|
||||||
|
tracing::debug!(
|
||||||
|
channel = %channel,
|
||||||
|
chat_id = %chat_id,
|
||||||
|
job_id = %job_id,
|
||||||
|
response_len = %response.len(),
|
||||||
|
"Cron agent response received"
|
||||||
);
|
);
|
||||||
let cron_context = format!(
|
|
||||||
"## 定时任务执行\n\n\
|
|
||||||
你正在执行定时任务「{job_name}」({job_id})。\n\
|
|
||||||
目标渠道: {channel}:{chat_id}\n\n\
|
|
||||||
规则:\n\
|
|
||||||
- 这不是聊天对话,没有用户会直接看到你的输出\n\
|
|
||||||
- 你必须使用 send_message 工具将最终结果发送到目标渠道\n\
|
|
||||||
- send_message 格式: target_chat_id=\"{channel}:{chat_id}\", content=\"消息内容\"\n\
|
|
||||||
- 可以调用其他工具收集信息、处理任务,但最终消息必须通过 send_message 发送\n\
|
|
||||||
- 只输出最终消息内容,不要输出中间思考过程或分析!"
|
|
||||||
);
|
|
||||||
let full_system_prompt = format!("{}\n\n{}\n\n{}", base_prompt, skills_prompt, cron_context);
|
|
||||||
|
|
||||||
let history = vec![
|
*self.current_source_session.lock().await = None;
|
||||||
ChatMessage::system(full_system_prompt),
|
|
||||||
ChatMessage::user(prompt),
|
|
||||||
];
|
|
||||||
|
|
||||||
let agent = self.create_cron_agent()?;
|
Ok(HandleResult::AgentResponse(response))
|
||||||
let source_session = format!("cron:{}", job_name);
|
|
||||||
let result = CURRENT_SOURCE_SESSION.scope(Some(source_session), async {
|
|
||||||
agent.process(history).await
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.inspect_err(|e| {
|
|
||||||
tracing::error!(error = %e, job_id = %job_id, "Cron agent processing error");
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Ok(HandleResult::AgentResponse(result.final_response.content))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn clear_session_history(&self, unified_id: &UnifiedSessionId) -> Result<(), AgentError> {
|
pub async fn clear_session_history(&self, unified_id: &UnifiedSessionId) -> Result<(), AgentError> {
|
||||||
@ -1543,7 +1603,7 @@ impl OutboundMessenger for SessionManager {
|
|||||||
) -> Result<(), String> {
|
) -> Result<(), String> {
|
||||||
// Fill origin from current source session if not provided
|
// Fill origin from current source session if not provided
|
||||||
if source.from_session.is_none() {
|
if source.from_session.is_none() {
|
||||||
source.from_session = CURRENT_SOURCE_SESSION.try_with(|v| v.clone()).ok().flatten();
|
source.from_session = self.current_source_session.lock().await.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
let (target_sid, session) = if let Some(did) = dialog_id {
|
let (target_sid, session) = if let Some(did) = dialog_id {
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user