PicoBot/src/session/session.rs

1668 lines
64 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::bus::{ChatMessage, MediaItem, MessageSource, OutboundMessage, SourceKind};
use crate::storage::{Storage, StorageError};
use std::sync::Arc as StdArc;
/// Result of handling a message - either an AI response or a command output
pub enum HandleResult {
/// AI response to be sent as AssistantResponse
AgentResponse(String),
/// Command output to be sent as CommandExecuted
CommandOutput(String),
}
use crate::channels::slash_command::parse_slash_command;
use crate::config::LLMProviderConfig;
use crate::agent::{AgentLoop, AgentError, ContextCompressor};
use crate::agent::system_prompt::build_system_prompt;
use crate::agent::context_compressor::ContextCompressionConfig;
/// Check if an LLM error message indicates a context window overflow.
fn is_context_overflow_error(msg: &str) -> bool {
let lower = msg.to_lowercase();
lower.contains("context length")
|| lower.contains("context window")
|| lower.contains("maximum context")
|| lower.contains("too many tokens")
|| lower.contains("token limit exceeded")
|| lower.contains("prompt is too long")
|| lower.contains("input is too long")
}
use crate::providers::{create_provider, LLMProvider};
use crate::session::session_id::UnifiedSessionId;
use crate::session::events::DialogInfo;
use crate::skills::SkillsLoader;
use crate::tools::{ToolRegistry, create_default_tools};
use crate::bus::MessageBus;
use crate::tools::OutboundMessenger;
use crate::tools::SendMessageTool;
/// Generate a short ID (8 characters) from a UUID
fn short_id() -> String {
Uuid::new_v4().to_string()[..8].to_string()
}
/// Session = 一个 dialog
/// 每个 Session 对应一个 UnifiedSessionId有独立的 messages history
pub struct Session {
pub id: UnifiedSessionId,
pub title: String,
pub created_at: i64,
pub last_active_at: i64,
pub message_count: i64,
pub total_message_count: i64,
messages: Vec<ChatMessage>,
seq_counter: i64,
provider_config: LLMProviderConfig,
provider: Arc<dyn LLMProvider>,
tools: Arc<ToolRegistry>,
compressor: ContextCompressor,
storage: Option<StdArc<Storage>>,
routing_info: String,
/// Timestamp (Unix ms) of the last consolidation.
/// Messages before this time have been compressed into memory.
pub last_consolidated_at: Option<i64>,
pub last_compressed_message_at: Option<i64>,
memory_manager: Arc<crate::memory::MemoryManager>,
}
impl Session {
pub async fn new(
id: UnifiedSessionId,
provider_config: LLMProviderConfig,
tools: Arc<ToolRegistry>,
storage: Option<StdArc<Storage>>,
routing_info: String,
title: String,
memory_manager: Arc<crate::memory::MemoryManager>,
) -> Result<Self, AgentError> {
let mut provider_box = create_provider(provider_config.clone())
.map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?;
if let Some(ref s) = storage {
provider_box.set_storage(s.clone());
}
let provider: Arc<dyn LLMProvider> = Arc::from(provider_box);
let compressor_config = ContextCompressionConfig {
protect_first_n: 2,
..Default::default()
};
let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone());
compressor.set_session_id(Some(id.to_string()));
let now = chrono::Utc::now().timestamp_millis();
Ok(Self {
id: id.clone(),
title,
created_at: now,
last_active_at: now,
message_count: 0,
total_message_count: 0,
messages: Vec::new(),
seq_counter: 1,
provider_config: provider_config.clone(),
provider: provider.clone(),
tools,
compressor,
storage,
routing_info,
last_consolidated_at: None,
last_compressed_message_at: None,
memory_manager,
})
}
/// 从 Storage 恢复 Session
pub async fn from_storage(
id: UnifiedSessionId,
provider_config: LLMProviderConfig,
tools: Arc<ToolRegistry>,
storage: StdArc<Storage>,
memory_manager: Arc<crate::memory::MemoryManager>,
) -> Result<Self, AgentError> {
let session_meta = storage.get_session(&id.to_string()).await
.map_err(|e| AgentError::Other(format!("failed to load session from storage: {}", e)))?;
let messages = storage.load_messages(&id.to_string(), 0).await
.map_err(|e| AgentError::Other(format!("failed to load messages from storage: {}", e)))?;
let mut provider_box = create_provider(provider_config.clone())
.map_err(|e| AgentError::Other(format!("provider creation error: {}", e)))?;
provider_box.set_storage(storage.clone());
let provider: Arc<dyn LLMProvider> = Arc::from(provider_box);
let compressor_config = ContextCompressionConfig {
protect_first_n: 2,
..Default::default()
};
let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone());
compressor.set_session_id(Some(id.to_string()));
// Convert MessageMeta to ChatMessage, then repair damaged tool call chains
let mut chat_messages: Vec<ChatMessage> = messages.into_iter().map(|m| {
ChatMessage {
id: m.id,
role: m.role,
content: m.content,
media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(),
timestamp: m.created_at,
tool_call_id: m.tool_call_id,
tool_name: m.tool_name,
tool_calls: m.tool_calls
.and_then(|tc| serde_json::from_str::<Vec<crate::providers::ToolCall>>(&tc).ok())
.filter(|v| !v.is_empty()),
source: m.source.and_then(|s| serde_json::from_str(&s).ok()),
}
}).collect();
repair_tool_call_chains(&mut chat_messages);
let seq_counter = chat_messages.len() as i64 + 1;
let total_message_count = chat_messages.len() as i64;
Ok(Self {
id: id.clone(),
title: session_meta.title,
created_at: session_meta.created_at,
last_active_at: session_meta.last_active_at,
message_count: session_meta.message_count,
total_message_count,
messages: chat_messages,
seq_counter,
provider_config: provider_config.clone(),
provider: provider.clone(),
tools,
compressor,
storage: Some(storage),
routing_info: session_meta.routing_info.unwrap_or_default(),
last_consolidated_at: session_meta.last_consolidated_at,
last_compressed_message_at: session_meta.last_compressed_message_at,
memory_manager,
})
}
/// 获取 session ID
pub fn session_id(&self) -> String {
self.id.to_string()
}
/// 添加消息到历史并持久化到 Storage
/// 如果 `persist` 为 false只更新内存用于 compaction 场景)
pub async fn add_message(&mut self, message: ChatMessage, persist: bool) -> Result<(), StorageError> {
let is_user = message.role == "user";
let now = chrono::Utc::now().timestamp_millis();
// Assign seq
let seq = self.seq_counter;
self.seq_counter += 1;
// Persist to Storage
if persist
&& let Some(ref storage) = self.storage {
let msg_meta = crate::storage::message::MessageMeta {
id: message.id.clone(),
session_id: self.id.to_string(),
seq,
role: message.role.clone(),
content: message.content.clone(),
media_refs: if message.media_refs.is_empty() {
None
} else {
Some(serde_json::to_string(&message.media_refs).unwrap_or_default())
},
tool_call_id: message.tool_call_id.clone(),
tool_name: message.tool_name.clone(),
tool_calls: message.tool_calls.as_ref().and_then(|tc| serde_json::to_string(tc).ok()),
source: message.source.as_ref().map(|s| serde_json::to_string(s).unwrap_or_default()),
created_at: now,
};
storage.append_message_with_retry(&self.id.to_string(), &msg_meta).await?;
}
// Update in-memory state
self.messages.push(message);
self.total_message_count += 1;
if is_user {
self.message_count += 1;
}
self.last_active_at = now;
// Sync message_count to Storage
if persist {
tracing::debug!(session_id = %self.id, last_active_at = %now, message_count = %self.message_count, "Persisting session meta after add_message");
if let Err(e) = self.persist_session_meta().await {
tracing::warn!("failed to persist session meta: {}", e);
}
}
Ok(())
}
/// 获取消息历史
pub fn get_history(&self) -> &[ChatMessage] {
&self.messages
}
/// 清除历史消息
pub fn clear_history(&mut self) {
let len = self.messages.len();
self.messages.clear();
self.seq_counter = 1;
self.total_message_count = 0;
self.message_count = 0;
#[cfg(debug_assertions)]
tracing::debug!(session_id = %self.id, previous_len = len, "Chat history cleared");
}
/// 重置对话上下文
pub fn reset_context(&mut self) {
let len = self.messages.len();
self.messages.clear();
self.seq_counter = 1;
self.total_message_count = 0;
self.message_count = 0;
#[cfg(debug_assertions)]
tracing::debug!(session_id = %self.id, previous_len = len, "Chat context reset in memory");
}
pub fn create_user_message(&self, content: &str, media_refs: Vec<String>) -> ChatMessage {
if media_refs.is_empty() {
ChatMessage::user(content)
} else {
ChatMessage::user_with_media(content, media_refs)
}
}
pub fn create_user_message_with_source(
&self,
content: &str,
media_refs: Vec<String>,
source: MessageSource,
) -> ChatMessage {
if media_refs.is_empty() {
ChatMessage::user_with_source(content, source)
} else {
ChatMessage::user_with_source(content, source)
}
}
/// 将 session 元数据写回 Storage
pub async fn persist_session_meta(&self) -> Result<(), StorageError> {
if let Some(ref storage) = self.storage {
let meta = crate::storage::session::SessionMeta {
id: self.id.to_string(),
channel: self.id.channel.clone(),
chat_id: self.id.chat_id.clone(),
dialog_id: self.id.dialog_id.clone(),
title: self.title.clone(),
created_at: self.created_at,
last_active_at: self.last_active_at,
message_count: self.message_count,
routing_info: if self.routing_info.is_empty() {
None
} else {
Some(self.routing_info.clone())
},
deleted_at: None,
last_consolidated_at: self.last_consolidated_at,
last_compressed_message_at: self.last_compressed_message_at,
};
storage.upsert_session(&meta).await?;
}
Ok(())
}
/// 检查是否需要自动生成 title5 条用户消息后)
pub fn should_generate_title(&self) -> bool {
self.title == "新对话" && self.message_count >= 5
}
/// 生成标题(调用 LLM
pub async fn generate_title(&mut self) -> Result<(), AgentError> {
if !self.should_generate_title() {
return Ok(());
}
let prompt = format!(
r#"给定以下对话历史生成一个简短的会话标题5-15 个中文字符),概括这个对话的核心内容或用户的主要需求。只返回一个标题,不要解释。
历史:
{}"#,
self.messages.iter()
.filter(|m| m.role == "user" || m.role == "assistant")
.take(20)
.map(|m| format!("[{}]: {}", m.role, m.content))
.collect::<Vec<_>>()
.join("\n")
);
let title = self.call_llm_for_title(&prompt).await?;
if !title.is_empty() {
self.title = title.clone();
if let Err(e) = self.persist_session_meta().await {
tracing::warn!("failed to persist title: {}", e);
}
}
Ok(())
}
/// 调用 LLM 生成标题
async fn call_llm_for_title(&self, prompt: &str) -> Result<String, AgentError> {
use crate::providers::{ChatCompletionRequest, ChatCompletionResponse, Message};
let request = ChatCompletionRequest {
messages: vec![
Message::user(prompt.to_string())
],
temperature: Some(0.3),
max_tokens: Some(20),
tools: None,
};
let response: ChatCompletionResponse = self.provider.chat(request).await
.map_err(|e| AgentError::Other(format!("LLM call failed: {}", e)))?;
Ok(response.content.trim().to_string())
}
/// 获取 provider_config 引用
pub fn provider_config(&self) -> &LLMProviderConfig {
&self.provider_config
}
/// 获取 compressor 引用
pub fn compressor(&self) -> &ContextCompressor {
&self.compressor
}
/// Get the compressor's current threshold for diagnostics/fallback.
pub fn compressor_threshold(&self) -> usize {
self.compressor.threshold()
}
/// 创建一个临时的 AgentLoop 实例来处理消息
pub fn create_agent(&self) -> Result<AgentLoop, AgentError> {
Ok(AgentLoop::with_provider_and_tools(
self.provider.clone(),
self.tools.clone(),
self.provider_config.max_tool_iterations,
self.provider_config.model_id.clone(),
self.provider_config.workspace_dir.clone(),
).with_context_window(self.provider_config.token_limit))
}
/// 创建一个附通知通道的 AgentLoop 实例
pub fn create_agent_with_notify(
&self,
notify_tx: tokio::sync::mpsc::UnboundedSender<String>,
) -> Result<AgentLoop, AgentError> {
Ok(self.create_agent()?.with_notify(notify_tx))
}
/// 构建系统提示词(包含 AgentLoop 的基础提示词 + skills + memory
pub fn build_system_prompt(&self, skills_prompt: &str, memory_context: Option<&str>) -> String {
let base_prompt = build_system_prompt(
&self.provider_config.workspace_dir,
&self.provider_config.model_id,
&self.tools,
Some(&self.id.to_string()),
memory_context,
);
if skills_prompt.trim().is_empty() {
base_prompt
} else {
format!("{}\n\n## Skills\n\n{}\n\nUse the `get_skill` tool to load a skill's full content when needed.", base_prompt, skills_prompt)
}
}
/// 将当前 session 导出为 markdown 文档并保存到文件
pub fn dump_to_file(&self, system_prompt: &str) -> std::io::Result<String> {
use chrono::Local;
use std::fs;
use std::io::Write;
let md = self.dump_as_markdown_with_system_prompt(system_prompt);
// Create dumps directory under workspace
let dumps_dir = self.provider_config.workspace_dir.join("dumps");
fs::create_dir_all(&dumps_dir)?;
// Generate filename based on session info
let timestamp = Local::now().format("%Y%m%d_%H%M%S");
let filename = format!("{}_{}_{}.md", self.id.channel, self.id.chat_id, timestamp);
let filepath = dumps_dir.join(&filename);
// Write to file
let mut file = fs::File::create(&filepath)?;
file.write_all(md.as_bytes())?;
Ok(filepath.to_string_lossy().to_string())
}
/// 将当前 session 导出为 markdown 文档(纯内存版本)
pub fn dump_as_markdown(&self) -> String {
use chrono::{DateTime, Local};
let now = Local::now().format("%Y-%m-%d %H:%M:%S");
let mut md = String::new();
md.push_str(&"# Session Dump\n\n".to_string());
md.push_str(&format!("- **Session ID**: `{}`\n", self.id));
md.push_str(&format!("- **Channel**: `{}`\n", self.id.channel));
md.push_str(&format!("- **Chat ID**: `{}`\n", self.id.chat_id));
md.push_str(&format!("- **Dialog ID**: `{}`\n", self.id.dialog_id));
md.push_str(&format!("- **Message Count**: {}\n", self.messages.len()));
md.push_str(&format!("- **Model**: `{}`\n", self.provider_config.model_id));
md.push_str(&format!("- **Exported At**: {}\n", now));
md.push_str("\n---\n\n");
md.push_str("## Conversation History\n\n");
for (i, msg) in self.messages.iter().enumerate() {
let role = match msg.role.as_str() {
"system" => "System",
"user" => "User",
"assistant" => "Assistant",
"tool" => "Tool",
r => r,
};
let timestamp = if msg.timestamp > 0 {
DateTime::from_timestamp_millis(msg.timestamp)
.map(|dt| dt.with_timezone(&Local).format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_default()
} else {
String::new()
};
md.push_str(&format!("### [{:03}] {} {}\n\n", i + 1, role, timestamp));
md.push_str("```\n");
if let Some(ref tool_calls) = msg.tool_calls {
md.push_str(&"[Tool Calls]\n".to_string());
for tc in tool_calls {
md.push_str(&format!("- {}: {:?}\n", tc.name, tc.arguments));
}
}
if let Some(ref tool_name) = msg.tool_name {
md.push_str(&format!("[Tool: {}]\n", tool_name));
}
if let Some(ref tool_call_id) = msg.tool_call_id {
md.push_str(&format!("[Tool Call ID: {}]\n", tool_call_id));
}
md.push_str(&msg.content);
md.push_str("\n```\n\n");
if !msg.media_refs.is_empty() {
md.push_str(&format!("**Media**: {:?}\n\n", msg.media_refs));
}
}
md
}
/// 将当前 session 导出为 markdown 文档(包含系统提示词)
pub fn dump_as_markdown_with_system_prompt(&self, system_prompt: &str) -> String {
use chrono::{DateTime, Local};
let now = Local::now().format("%Y-%m-%d %H:%M:%S");
let mut md = String::new();
md.push_str("# Session Dump\n\n");
md.push_str(&format!("- **Session ID**: `{}`\n", self.id));
md.push_str(&format!("- **Channel**: `{}`\n", self.id.channel));
md.push_str(&format!("- **Chat ID**: `{}`\n", self.id.chat_id));
md.push_str(&format!("- **Dialog ID**: `{}`\n", self.id.dialog_id));
md.push_str(&format!("- **Message Count**: {}\n", self.messages.len()));
md.push_str(&format!("- **Model**: `{}`\n", self.provider_config.model_id));
md.push_str(&format!("- **Exported At**: {}\n", now));
md.push_str("\n---\n\n");
// System Prompt Section
md.push_str("## System Prompt (Injected to Model)\n\n");
md.push_str("```\n");
md.push_str(system_prompt);
md.push_str("\n```\n\n");
md.push_str("---\n\n");
md.push_str("## Conversation History\n\n");
for (i, msg) in self.messages.iter().enumerate() {
let role = match msg.role.as_str() {
"system" => "System",
"user" => "User",
"assistant" => "Assistant",
"tool" => "Tool",
r => r,
};
let timestamp = if msg.timestamp > 0 {
DateTime::from_timestamp_millis(msg.timestamp)
.map(|dt| dt.with_timezone(&Local).format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_default()
} else {
String::new()
};
md.push_str(&format!("### [{:03}] {} {}\n\n", i + 1, role, timestamp));
md.push_str("```\n");
if let Some(ref tool_calls) = msg.tool_calls {
md.push_str("[Tool Calls]\n");
for tc in tool_calls {
md.push_str(&format!("- {}: {:?}\n", tc.name, tc.arguments));
}
}
if let Some(ref tool_name) = msg.tool_name {
md.push_str(&format!("[Tool: {}]\n", tool_name));
}
if let Some(ref tool_call_id) = msg.tool_call_id {
md.push_str(&format!("[Tool Call ID: {}]\n", tool_call_id));
}
md.push_str(&msg.content);
md.push_str("\n```\n\n");
if !msg.media_refs.is_empty() {
md.push_str(&format!("**Media**: {:?}\n\n", msg.media_refs));
}
}
md
}
}
/// Repair damaged tool call chains after restoring from storage.
/// Handles cases where the gateway crashed mid-loop, leaving assistant
/// tool_calls without corresponding tool result messages.
fn repair_tool_call_chains(messages: &mut Vec<ChatMessage>) {
let mut i = 0;
while i < messages.len() {
let calls = match &messages[i].tool_calls {
Some(calls) if !calls.is_empty() => calls.clone(),
_ => {
i += 1;
continue;
}
};
if messages[i].role != "assistant" {
i += 1;
continue;
}
// Collect expected tool call IDs
let expected_ids: std::collections::HashSet<&str> = calls.iter().map(|c| c.id.as_str()).collect();
let expected_count = expected_ids.len();
// Check following messages for matching tool results (same tool_call_id)
let mut found = 0;
let mut j = i + 1;
while j < messages.len() && found < expected_count {
if messages[j].role == "tool" {
if let Some(ref tc_id) = messages[j].tool_call_id
&& expected_ids.contains(tc_id.as_str()) {
found += 1;
}
} else if messages[j].role == "user" || messages[j].role == "assistant" {
// Next user/assistant message — stop scanning, chain is broken
break;
}
j += 1;
}
if found < expected_count {
// Incomplete chain: remove tool_calls and add interruption note
tracing::warn!(
found,
expected = expected_count,
"Repairing incomplete tool call chain — gateway restart likely interrupted execution"
);
let old_content = std::mem::take(&mut messages[i].content);
messages[i].content = format!(
"{}\n\n[Tool calls ({}): {} — execution interrupted by gateway restart]",
old_content,
expected_count,
calls.iter().map(|c| c.name.as_str()).collect::<Vec<_>>().join(", ")
);
messages[i].tool_calls = None;
}
i += 1;
}
}
/// SessionManager 管理所有 Session按 channel_name 路由
#[derive(Clone)]
pub struct SessionManager {
inner: Arc<Mutex<SessionManagerInner>>,
provider_config: LLMProviderConfig,
tools: Arc<ToolRegistry>,
skills_loader: Arc<SkillsLoader>,
storage: Arc<Storage>,
bus: Arc<MessageBus>,
current_source_session: Arc<Mutex<Option<String>>>,
memory_manager: Arc<crate::memory::MemoryManager>,
}
struct SessionManagerInner {
/// Sessions keyed by UnifiedSessionId.to_string()
sessions: HashMap<String, Arc<Mutex<Session>>>,
session_timestamps: HashMap<String, Instant>,
session_ttl: Duration,
/// Current active session per channel:chat_id
current_sessions: HashMap<String, String>,
}
/// 斜杠命令定义
#[derive(Debug, Clone)]
pub struct SlashCommand {
/// 命令名称
pub name: &'static str,
/// 命令描述
pub description: &'static str,
/// 命令别名(触发词)
pub aliases: &'static [&'static str],
}
impl SlashCommand {
/// 检查给定内容是否匹配此命令
pub fn matches(&self, content: &str) -> bool {
let trimmed = content.trim();
self.aliases.iter().any(|&alias| trimmed == alias || trimmed.starts_with(&format!("{} ", alias)))
}
}
/// Session 支持的斜杠命令列表
pub static SLASH_COMMANDS: &[SlashCommand] = &[
SlashCommand {
name: "new",
description: "创建新对话",
aliases: &["/new"],
},
SlashCommand {
name: "sessions",
description: "列出最近对话",
aliases: &["/sessions"],
},
SlashCommand {
name: "switch",
description: "切换到指定对话",
aliases: &["/switch"],
},
SlashCommand {
name: "rename",
description: "重命名当前对话",
aliases: &["/rename"],
},
SlashCommand {
name: "delete",
description: "删除当前对话",
aliases: &["/delete"],
},
SlashCommand {
name: "compact",
description: "手动触发上下文压缩",
aliases: &["/compact"],
},
SlashCommand {
name: "info",
description: "显示当前对话信息",
aliases: &["/info"],
},
SlashCommand {
name: "dump",
description: "保存当前对话为 markdown 文档",
aliases: &["/dump"],
},
SlashCommand {
name: "?",
description: "显示帮助",
aliases: &["/?", "/help"],
},
];
impl SessionManager {
pub fn new(
session_ttl_hours: u64,
provider_config: LLMProviderConfig,
storage: Arc<Storage>,
bus: Arc<MessageBus>,
memory_manager: Arc<crate::memory::MemoryManager>,
) -> Result<Self, AgentError> {
let skills_loader = SkillsLoader::new();
skills_loader.load_skills();
let skills_loader = Arc::new(skills_loader);
let tools = Arc::new(create_default_tools(skills_loader.clone(), memory_manager.clone()));
Ok(Self {
inner: Arc::new(Mutex::new(SessionManagerInner {
sessions: HashMap::new(),
session_timestamps: HashMap::new(),
session_ttl: Duration::from_secs(session_ttl_hours * 3600),
current_sessions: HashMap::new(),
})),
provider_config,
tools,
skills_loader,
storage,
bus,
current_source_session: Arc::new(Mutex::new(None)),
memory_manager,
})
}
/// Register the send_message tool (requires self in Arc)
pub fn register_outbound_tool(self: &Arc<Self>, available_channels: Vec<String>) {
let messenger: Arc<dyn OutboundMessenger> = self.clone();
self.tools.register(SendMessageTool::new(messenger, available_channels));
}
pub fn tools(&self) -> Arc<ToolRegistry> {
self.tools.clone()
}
/// 启动后台 TTL 清理任务
pub fn start_cleanup_task(self: Arc<Self>, interval_mins: u64) {
let cleanup_interval = Duration::from_secs(interval_mins * 60);
tokio::spawn(async move {
loop {
tokio::time::sleep(cleanup_interval).await;
self.run_cleanup().await;
}
});
}
/// 执行一次 TTL 清理:释放内存中过期的 sessionStorage 记录保留
async fn run_cleanup(&self) {
let inner = self.inner.lock().await;
let now = Instant::now();
let ttl = inner.session_ttl;
let expired: Vec<String> = inner
.session_timestamps
.iter()
.filter(|(_, last_touch)| now.duration_since(**last_touch) > ttl)
.map(|(id, _)| id.clone())
.collect();
drop(inner);
if !expired.is_empty() {
let mut inner = self.inner.lock().await;
for id in &expired {
inner.sessions.remove(id);
inner.session_timestamps.remove(id);
}
tracing::debug!(count = expired.len(), "Cleaned up expired sessions");
}
}
/// 获取所有可用的斜杠命令
pub fn get_slash_commands(&self) -> &[SlashCommand] {
SLASH_COMMANDS
}
/// 执行斜杠命令
/// 返回 (新session_id, 响应消息)
pub async fn execute_slash_command(
&self,
command: &str,
args: Option<&str>,
channel: &str,
chat_id: &str,
current_session_id: Option<&UnifiedSessionId>,
) -> Result<(Option<UnifiedSessionId>, String), AgentError> {
let cmd = SLASH_COMMANDS
.iter()
.find(|c| c.name == command)
.ok_or_else(|| AgentError::Other(format!("Unknown command: {}", command)))?;
tracing::info!(cmd = %cmd.name, args = ?args, "Executing slash command");
match cmd.name {
"new" => {
let title = args.map(|s| s.to_string());
let (new_id, title) = self.create_session(channel, chat_id, title.as_deref(), String::new()).await?;
Ok((Some(new_id), format!("新对话 '{}' 已创建。", title)))
}
"delete" => {
let (new_id, _title) = self.create_session(channel, chat_id, None, String::new()).await?;
Ok((Some(new_id), "对话已删除。新对话已创建。".to_string()))
}
"compact" => {
if let Some(sid) = current_session_id {
let session = self.get_or_create_session(sid).await?;
let mut session_guard = session.lock().await;
let original_count = session_guard.get_history().len();
let history = session_guard.get_history().to_vec();
let result = session_guard.compressor
.compress_if_needed(history)
.await?;
let compressed_count = result.history.len();
if result.created_timelines {
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
let _ = session_guard.persist_session_meta().await;
}
session_guard.clear_history();
for msg in result.history {
session_guard.add_message(msg, false).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
}
Ok((None, format!(
"Context compressed: {}{} messages.",
original_count, compressed_count
)))
} else {
Ok((None, "No active conversation to compress.".to_string()))
}
}
"info" => {
if let Some(sid) = current_session_id {
let session = self.get_or_create_session(sid).await?;
let session_guard = session.lock().await;
let message_count = session_guard.get_history().len();
let session_id_str = session_guard.session_id();
let title = &session_guard.title;
let model_name = &session_guard.provider_config.name;
let created_at = chrono::DateTime::from_timestamp_millis(session_guard.created_at)
.map(|dt| dt.with_timezone(&chrono::Local).format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_default();
let last_active_at = chrono::DateTime::from_timestamp_millis(session_guard.last_active_at)
.map(|dt| dt.with_timezone(&chrono::Local).format("%Y-%m-%d %H:%M:%S").to_string())
.unwrap_or_default();
Ok((None, format!(
"对话标题: {}\nSession ID: {}\n模型: {}\n用户消息: {} / 总消息: {}\n创建时间: {}\n最后活跃: {}",
title, session_id_str, model_name, session_guard.message_count, message_count, created_at, last_active_at
)))
} else {
Ok((None, "No active session.".to_string()))
}
}
"dump" => {
if let Some(sid) = current_session_id {
let session = self.get_or_create_session(sid).await?;
let session_guard = session.lock().await;
// Build the same system prompt that would be injected to the model
let skills_prompt = self.skills_loader.build_skills_prompt();
let system_prompt = session_guard.build_system_prompt(&skills_prompt, None);
let filepath = session_guard.dump_to_file(&system_prompt)
.map_err(|e| AgentError::Other(format!("Failed to save dump: {}", e)))?;
Ok((None, format!("Session dump saved to: {}", filepath)))
} else {
Ok((None, "No active session.".to_string()))
}
}
"sessions" => {
let (dialogs, _current) = self.list_dialogs(channel, chat_id, false).await?;
if dialogs.is_empty() {
Ok((None, "暂无对话记录。".to_string()))
} else {
let lines: Vec<String> = dialogs.iter().map(|d| {
let current = if current_session_id.map(|s| s.dialog_id == d.session_id.dialog_id).unwrap_or(false) {
" [当前]"
} else {
""
};
format!("- {} ({}){}{}", d.session_id.dialog_id, d.title, current, chrono::DateTime::from_timestamp_millis(d.last_active_at).map(|dt| dt.with_timezone(&chrono::Local).format("%m-%d %H:%M").to_string()).unwrap_or_default())
}).collect();
Ok((None, format!("最近对话:\n{}", lines.join("\n"))))
}
}
"switch" => {
let dialog_id = args.ok_or_else(|| AgentError::Other("Usage: /switch <dialog_id>".to_string()))?;
let new_id = self.switch_dialog(channel, chat_id, dialog_id).await?;
Ok((None, format!("已切换到对话:{}", new_id.dialog_id)))
}
"rename" => {
let title = args.ok_or_else(|| AgentError::Other("Usage: /rename <新标题>".to_string()))?;
if let Some(sid) = current_session_id {
self.rename_dialog(sid, title).await?;
Ok((None, format!("对话已重命名为:{}", title)))
} else {
Ok((None, "No active session.".to_string()))
}
}
"?" | "help" => {
let lines: Vec<String> = SLASH_COMMANDS.iter().map(|c| {
format!(" {} - {}", c.aliases.join(", "), c.description)
}).collect();
Ok((None, format!("可用命令:\n{}", lines.join("\n"))))
}
_ => Err(AgentError::Other(format!("未知命令:/{}。输入 /? 获取帮助。", cmd.name))),
}
}
pub async fn create_session(
&self,
channel: &str,
chat_id: &str,
title: Option<&str>,
routing_info: String,
) -> Result<(UnifiedSessionId, String), AgentError> {
let dialog_id = short_id();
let unified_id = UnifiedSessionId::new(channel, chat_id, &dialog_id);
let session_id_str = unified_id.to_string();
let title = title
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToOwned::to_owned)
.unwrap_or_else(|| "新对话".to_string());
// Write to Storage first
let now = chrono::Utc::now().timestamp_millis();
let meta = crate::storage::session::SessionMeta {
id: session_id_str.clone(),
channel: channel.to_string(),
chat_id: chat_id.to_string(),
dialog_id: dialog_id.clone(),
title: title.clone(),
created_at: now,
last_active_at: now,
message_count: 0,
routing_info: if routing_info.is_empty() { None } else { Some(routing_info.clone()) },
deleted_at: None,
last_consolidated_at: None,
last_compressed_message_at: None,
};
self.storage.upsert_session(&meta).await
.map_err(|e| AgentError::Other(format!("failed to create session in storage: {}", e)))?;
let session = Session::new(
unified_id.clone(),
self.provider_config.clone(),
self.tools.clone(),
Some(self.storage.clone()),
routing_info,
title.clone(),
self.memory_manager.clone(),
).await?;
let arc = Arc::new(Mutex::new(session));
let inner = &mut *self.inner.lock().await;
inner.sessions.insert(session_id_str.clone(), arc.clone());
inner.session_timestamps.insert(session_id_str.clone(), Instant::now());
// Set as current session for this channel:chat_id
let chat_scope = format!("{}:{}", channel, chat_id);
inner.current_sessions.insert(chat_scope, session_id_str);
Ok((unified_id, title))
}
pub async fn get_or_create_session(&self, unified_id: &UnifiedSessionId) -> Result<Arc<Mutex<Session>>, AgentError> {
let session_id_str = unified_id.to_string();
let inner = &mut *self.inner.lock().await;
if let Some(session) = inner.sessions.get(&session_id_str) {
inner.session_timestamps.insert(session_id_str, Instant::now());
return Ok(session.clone());
}
// Try to restore from Storage
match self.storage.get_session(&session_id_str).await {
Ok(meta) => {
tracing::debug!(session_id = %session_id_str, last_active_at = %meta.last_active_at, message_count = %meta.message_count, "Restoring session from Storage");
let session = Session::from_storage(
unified_id.clone(),
self.provider_config.clone(),
self.tools.clone(),
self.storage.clone(),
self.memory_manager.clone(),
).await?;
let arc = Arc::new(Mutex::new(session));
inner.sessions.insert(session_id_str.clone(), arc.clone());
inner.session_timestamps.insert(session_id_str.clone(), Instant::now());
// Set as current session
let chat_scope = format!("{}:{}", unified_id.channel, unified_id.chat_id);
inner.current_sessions.insert(chat_scope, session_id_str);
return Ok(arc);
}
Err(_) => {
// Session not in Storage, create new
}
}
// Create new session
let session = Session::new(
unified_id.clone(),
self.provider_config.clone(),
self.tools.clone(),
Some(self.storage.clone()),
String::new(),
"新对话".to_string(),
self.memory_manager.clone(),
).await?;
let arc = Arc::new(Mutex::new(session));
inner.sessions.insert(session_id_str.clone(), arc.clone());
inner.session_timestamps.insert(session_id_str.clone(), Instant::now());
// Set as current session
let chat_scope = format!("{}:{}", unified_id.channel, unified_id.chat_id);
inner.current_sessions.insert(chat_scope, session_id_str);
Ok(arc)
}
pub async fn create_dialog(
&self,
channel: &str,
chat_id: &str,
title: Option<&str>,
) -> Result<(UnifiedSessionId, String), AgentError> {
self.create_session(channel, chat_id, title, String::new()).await
}
pub async fn get_current_dialog(
&self,
_channel: &str,
_chat_id: &str,
) -> Result<Option<UnifiedSessionId>, AgentError> {
Ok(None)
}
pub async fn switch_dialog(
&self,
channel: &str,
chat_id: &str,
dialog_id: &str,
) -> Result<UnifiedSessionId, AgentError> {
let unified_id = UnifiedSessionId::new(channel, chat_id, dialog_id);
// Ensure session is loaded into memory
self.get_or_create_session(&unified_id).await?;
// Update current session tracking
let mut inner = self.inner.lock().await;
let chat_scope = format!("{}:{}", channel, chat_id);
inner.current_sessions.insert(chat_scope, unified_id.to_string());
Ok(unified_id)
}
pub async fn list_dialogs(
&self,
channel: &str,
chat_id: &str,
_include_archived: bool,
) -> Result<(Vec<DialogInfo>, Option<String>), AgentError> {
let metas = self.storage.list_sessions(channel, chat_id, 10).await
.map_err(|e| AgentError::Other(format!("failed to list dialogs: {}", e)))?;
let dialogs: Vec<DialogInfo> = metas.into_iter().map(|meta| {
DialogInfo {
session_id: UnifiedSessionId::new(channel, chat_id, &meta.dialog_id),
title: meta.title,
created_at: meta.created_at,
last_active_at: meta.last_active_at,
message_count: meta.message_count,
archived_at: None,
}
}).collect();
Ok((dialogs, None))
}
pub async fn rename_dialog(&self, session_id: &UnifiedSessionId, title: &str) -> Result<(), AgentError> {
// Update in-memory session
let session = self.get_or_create_session(session_id).await?;
let mut session_guard = session.lock().await;
session_guard.title = title.to_string();
session_guard.persist_session_meta().await
.map_err(|e| AgentError::Other(format!("failed to rename dialog: {}", e)))?;
Ok(())
}
pub async fn delete_dialog(&self, session_id: &UnifiedSessionId) -> Result<(), AgentError> {
let session_id_str = session_id.to_string();
// Soft delete from Storage
self.storage.soft_delete_session(&session_id_str).await
.map_err(|e| AgentError::Other(format!("failed to delete dialog: {}", e)))?;
// Remove from memory and current sessions
let mut inner = self.inner.lock().await;
inner.sessions.remove(&session_id_str);
inner.session_timestamps.remove(&session_id_str);
let chat_scope = format!("{}:{}", session_id.channel, session_id.chat_id);
inner.current_sessions.remove(&chat_scope);
Ok(())
}
pub fn archive_dialog(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> {
// Archive concept removed - just return OK
Ok(())
}
pub fn clear_dialog_history(&self, _session_id: &UnifiedSessionId) -> Result<(), AgentError> {
Err(AgentError::Other("clear_dialog_history not available".to_string()))
}
/// Get or activate a specific session by its full UnifiedSessionId.
/// Returns an error if the session does not exist in storage.
/// If the session was expired from memory but still in storage,
/// it will be restored (reactivated).
pub async fn get_or_activate_session(
&self,
unified_id: &UnifiedSessionId,
) -> Result<Arc<Mutex<Session>>, AgentError> {
let session_id_str = unified_id.to_string();
match self.storage.get_session(&session_id_str).await {
Ok(_) => self.get_or_create_session(unified_id).await,
Err(StorageError::NotFound(_)) => {
Err(AgentError::Other(format!("session not found: {}", unified_id)))
}
Err(e) => Err(AgentError::Other(format!("storage error: {}", e))),
}
}
async fn resolve_dialog_id(
&self,
channel: &str,
chat_id: &str,
) -> Result<UnifiedSessionId, AgentError> {
let chat_scope = format!("{}:{}", channel, chat_id);
let current_id = {
self.inner.lock().await.current_sessions.get(&chat_scope).cloned()
};
if let Some(ref current_id) = current_id {
if let Ok(_) = self.storage.get_session(current_id).await {
let parts: Vec<&str> = current_id.split(':').collect();
if parts.len() == 3 {
return Ok(UnifiedSessionId::new(channel, chat_id, parts[2]));
}
}
}
let ttl_millis = self.inner.lock().await.session_ttl.as_millis() as i64;
match self.storage.find_active_session(channel, chat_id, ttl_millis).await {
Ok(Some(meta)) => Ok(UnifiedSessionId::new(channel, chat_id, &meta.dialog_id)),
_ => {
let (new_id, _) = self.create_session(channel, chat_id, None, String::new()).await?;
Ok(new_id)
}
}
}
/// Send a system notification (no LLM triggered).
///
/// Flow:
/// 1. Resolve target session (resolve_dialog_id)
/// 2. Write assistant message with source tag to history
/// 3. Publish OutboundMessage via bus to target channel
pub async fn send_notification(
&self,
channel: &str,
chat_id: &str,
content: &str,
system_name: &str,
task_id: Option<&str>,
) -> Result<(), AgentError> {
let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
let session = self.get_or_create_session(&unified_id).await?;
{
let mut guard = session.lock().await;
let source = MessageSource {
kind: SourceKind::SystemNotification,
from_channel: None,
from_session: None,
from_user_id: None,
system_name: Some(system_name.to_string()),
task_id: task_id.map(|s| s.to_string()),
};
let msg = ChatMessage::assistant_with_source(content, source);
guard.add_message(msg, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
}
let outbound = OutboundMessage {
channel: channel.to_string(),
chat_id: chat_id.to_string(),
content: content.to_string(),
reply_to: None,
media: vec![],
metadata: HashMap::new(),
};
self.bus.publish_outbound(outbound).await
.map_err(|e| AgentError::Other(format!("bus publish error: {}", e)))?;
Ok(())
}
pub async fn handle_message(
&self,
channel: &str,
_sender_id: &str,
chat_id: &str,
content: &str,
media: Vec<MediaItem>,
) -> Result<HandleResult, AgentError> {
let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
*self.current_source_session.lock().await = Some(unified_id.to_string());
tracing::debug!(unified_id = %unified_id, "handle_message resolved unified_id");
let session = self.get_or_create_session(&unified_id).await?;
// Check for slash command
if let Some((cmd_name, cmd_args)) = parse_slash_command(content) {
let result = self.execute_slash_command(
cmd_name,
if cmd_args.is_empty() { None } else { Some(cmd_args) },
channel,
chat_id,
Some(&unified_id),
).await;
match result {
Ok((_new_session_id, response)) => {
*self.current_source_session.lock().await = None;
return Ok(HandleResult::CommandOutput(response));
}
Err(e) => {
*self.current_source_session.lock().await = None;
return Ok(HandleResult::CommandOutput(e.to_string()));
}
}
}
// Normal message handling through LLM
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
// Spawn notification publisher — sends immediately when tools are detected
{
let bus = self.bus.clone();
let ch = channel.to_string();
let cid = chat_id.to_string();
tokio::spawn(async move {
while let Some(notif) = notify_rx.recv().await {
let mut metadata = HashMap::new();
metadata.insert("_type".to_string(), "notification".to_string());
let outbound = OutboundMessage {
channel: ch.clone(),
chat_id: cid.clone(),
content: notif,
reply_to: None,
media: vec![],
metadata,
};
let _ = bus.publish_outbound(outbound).await;
}
});
}
let response: String = {
let mut session_guard = session.lock().await;
let media_refs: Vec<String> = media.iter().map(|m| m.path.clone()).collect();
#[cfg(debug_assertions)]
if !media_refs.is_empty() {
tracing::debug!(media_count = %media.len(), media_refs = ?media_refs, "Adding user message with media");
}
let user_message = session_guard.create_user_message(content, media_refs);
session_guard.add_message(user_message, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
let mut history = session_guard.get_history().to_vec();
// Build skills prompt
let skills_prompt = self.skills_loader.build_skills_prompt();
// Fetch memory context
let memory_context = match self.memory_manager.recall(content, 5, Some(crate::memory::MemoryCategory::Knowledge), None).await {
Ok(entries) if !entries.is_empty() => {
Some(entries.iter()
.map(|e| format!("- {}: {}", e.key, e.content))
.collect::<Vec<_>>()
.join("\n"))
}
Err(e) => {
tracing::warn!(error = %e, "Failed to fetch memory context");
None
}
_ => None,
};
// Build combined system prompt and inject at position 0 AFTER compression.
// This ensures AgentLoop.process() sees a system message without it participating
// in context compression (system prompt is dynamic and should not be persisted).
let system_prompt = session_guard.build_system_prompt(&skills_prompt, memory_context.as_deref());
let result = session_guard.compressor
.compress_if_needed(history)
.await?;
if result.created_timelines {
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
}
let mut history = result.history;
history.insert(0, ChatMessage::system(system_prompt.clone()));
// Persist consolidation state
let now = chrono::Utc::now().timestamp_millis();
session_guard.last_consolidated_at = Some(now);
if let Err(e) = session_guard.persist_session_meta().await {
tracing::warn!(error = %e, "Failed to persist consolidation timestamp");
}
let agent = session_guard.create_agent_with_notify(notify_tx)?;
// Try LLM call; on context overflow, re-compress with tighter window and retry once.
let result = match agent.process(history).await {
Ok(r) => r,
Err(AgentError::LlmError(ref msg))
if is_context_overflow_error(msg) =>
{
let new_window = crate::agent::ContextCompressor::parse_context_limit_from_error(msg)
.unwrap_or(session_guard.compressor_threshold());
tracing::warn!(
new_window,
error = %msg,
"Context overflow in handle_message — retrying with tighter window"
);
session_guard.compressor.set_context_window(new_window);
let raw = session_guard.get_history().to_vec();
let retry_result = session_guard.compressor.compress_if_needed(raw).await?;
if retry_result.created_timelines {
session_guard.last_compressed_message_at = Some(chrono::Utc::now().timestamp_millis());
}
let mut retry = retry_result.history;
retry.insert(0, ChatMessage::system(system_prompt));
agent.process(retry).await?
}
Err(e) => return Err(e),
};
for msg in result.emitted_messages {
session_guard.add_message(msg, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
}
// Check if we need to generate a title (after 10 user messages)
if session_guard.should_generate_title()
&& let Err(e) = session_guard.generate_title().await {
tracing::warn!("failed to generate title: {}", e);
}
result.final_response.content
};
#[cfg(debug_assertions)]
tracing::debug!(
channel = %channel,
chat_id = %chat_id,
response_len = %response.len(),
"Agent response received"
);
*self.current_source_session.lock().await = None;
Ok(HandleResult::AgentResponse(response))
}
/// Handle a message triggered by a scheduled cron job.
///
/// This is similar to `handle_message`, but the user message is created with
/// `SourceKind::ExternalTrigger` source metadata so that the cron job identity
/// is preserved in the conversation history and database.
pub async fn handle_cron_message(
&self,
channel: &str,
chat_id: &str,
prompt: &str,
job_id: &str,
job_name: &str,
) -> Result<HandleResult, AgentError> {
use crate::bus::{MessageSource, SourceKind};
let unified_id = self.resolve_dialog_id(channel, chat_id).await?;
*self.current_source_session.lock().await = Some(unified_id.to_string());
tracing::debug!(unified_id = %unified_id, job_id = %job_id, "handle_cron_message resolved");
let session = self.get_or_create_session(&unified_id).await?;
let (notify_tx, mut notify_rx) = tokio::sync::mpsc::unbounded_channel();
{
use std::collections::HashMap;
use crate::bus::OutboundMessage;
let bus = self.bus.clone();
let ch = channel.to_string();
let cid = chat_id.to_string();
tokio::spawn(async move {
while let Some(notif) = notify_rx.recv().await {
let mut metadata = HashMap::new();
metadata.insert("_type".to_string(), "notification".to_string());
let outbound = OutboundMessage {
channel: ch.clone(),
chat_id: cid.clone(),
content: notif,
reply_to: None,
media: vec![],
metadata,
};
let _ = bus.publish_outbound(outbound).await;
}
});
}
let response: String = {
let mut session_guard = session.lock().await;
let source = MessageSource {
kind: SourceKind::ExternalTrigger,
from_channel: Some(channel.to_string()),
from_session: None,
from_user_id: None,
system_name: Some(job_name.to_string()),
task_id: Some(job_id.to_string()),
};
let user_message = session_guard.create_user_message_with_source(prompt, vec![], source);
session_guard.add_message(user_message, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
let mut history = session_guard.get_history().to_vec();
let skills_prompt = self.skills_loader.build_skills_prompt();
let system_prompt = session_guard.build_system_prompt(&skills_prompt, None);
let cron_context = format!(
"\n\n## 定时任务执行\n\n\
你正在执行定时任务「{}」({})。\n\
目标渠道: {}:{}\n\n\
定时任务执行规则:\n\
- 这不是聊天对话,没有人会回复你,不要等待用户输入\n\
- 你的职责是根据任务指令直接生成要发送的消息内容\n\
- 只输出最终消息,不要输出中间思考过程或分析\n\
- 系统会自动将你的回复推送到目标渠道,不要使用 send_message 工具\n\
- 你的最终回复就是推送给用户的消息原文",
job_name, job_id, channel, chat_id
);
let full_system_prompt = format!("{}{}", system_prompt, cron_context);
// Inject system prompt AFTER compression so it doesn't participate
// in context compression (system prompt is dynamic and should not be persisted).
let mut history = session_guard.compressor
.compress_if_needed(history)
.await?
.history;
history.insert(0, ChatMessage::system(full_system_prompt));
let agent = session_guard.create_agent_with_notify(notify_tx)?;
let result = agent.process(history).await?;
for msg in result.emitted_messages {
session_guard.add_message(msg, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
}
if session_guard.should_generate_title()
&& let Err(e) = session_guard.generate_title().await {
tracing::warn!("failed to generate title: {}", e);
}
let raw_response = result.final_response.content;
let prefix = format!(
"[message from cron:{}({})]\n",
job_name, job_id
);
let prefixed_response = format!("{}{}", prefix, raw_response);
let source = MessageSource {
kind: SourceKind::CrossChannel,
from_channel: Some("cron".to_string()),
from_session: Some(format!("{}:{}", job_name, job_id)),
from_user_id: None,
system_name: Some(job_name.to_string()),
task_id: Some(job_id.to_string()),
};
let msg = ChatMessage::assistant_with_source(prefixed_response.clone(), source);
session_guard.add_message(msg, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
prefixed_response
};
#[cfg(debug_assertions)]
tracing::debug!(
channel = %channel,
chat_id = %chat_id,
job_id = %job_id,
response_len = %response.len(),
"Cron agent response received"
);
*self.current_source_session.lock().await = None;
Ok(HandleResult::AgentResponse(response))
}
pub async fn clear_session_history(&self, unified_id: &UnifiedSessionId) -> Result<(), AgentError> {
let session = self.get_or_create_session(unified_id).await?;
let mut session_guard = session.lock().await;
// Clear in-memory
session_guard.messages.clear();
session_guard.seq_counter = 1;
session_guard.total_message_count = 0;
session_guard.message_count = 0;
// Clear Storage
if let Some(ref storage) = session_guard.storage {
storage.clear_messages(&session_guard.id.to_string()).await
.map_err(|e| AgentError::Other(format!("failed to clear messages: {}", e)))?;
}
Ok(())
}
}
#[async_trait::async_trait]
impl OutboundMessenger for SessionManager {
async fn send_message(
&self,
channel: &str,
chat_id: &str,
dialog_id: Option<&str>,
content: &str,
mut source: MessageSource,
) -> Result<(), String> {
// Fill origin from current source session if not provided
if source.from_session.is_none() {
source.from_session = self.current_source_session.lock().await.clone();
}
let (target_sid, session) = if let Some(did) = dialog_id {
let sid = UnifiedSessionId::new(channel, chat_id, did);
let session = self.get_or_activate_session(&sid).await
.map_err(|e| e.to_string())?;
(sid, session)
} else {
let sid = self.resolve_dialog_id(channel, chat_id).await
.map_err(|e| e.to_string())?;
let session = self.get_or_create_session(&sid).await
.map_err(|e| e.to_string())?;
(sid, session)
};
// Build message prefix: [message from <origin>]
let origin = source.from_session.as_deref().unwrap_or("unknown");
let origin_id = source.from_session.clone();
let prefix = format!("[message from {}] ", origin);
let marked_content = format!("{}\n{}", prefix, content);
// Write source-tagged assistant message to target session history
{
let mut guard = session.lock().await;
let msg = ChatMessage::assistant_with_source(marked_content.clone(), source);
guard.add_message(msg, true).await
.map_err(|e| e.to_string())?;
}
// Restore active dialog if source and target share channel:chat_id but differ in dialog_id
if let Some(ref origin_id) = origin_id {
let parts: Vec<&str> = origin_id.split(':').collect();
if parts.len() == 3 && parts[0] == channel && parts[1] == chat_id && parts[2] != target_sid.dialog_id {
let scope = format!("{}:{}", channel, chat_id);
self.inner.lock().await.current_sessions.insert(scope, origin_id.clone());
}
}
// Publish OutboundMessage via bus to target channel
let outbound = OutboundMessage {
channel: channel.to_string(),
chat_id: chat_id.to_string(),
content: marked_content,
reply_to: None,
media: vec![],
metadata: HashMap::new(),
};
self.bus.publish_outbound(outbound).await
.map_err(|e| e.to_string())?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::collections::HashMap;
fn test_provider_config() -> LLMProviderConfig {
LLMProviderConfig {
provider_type: "openai".to_string(),
name: "test".to_string(),
base_url: "http://localhost".to_string(),
api_key: "test-key".to_string(),
extra_headers: HashMap::new(),
model_id: "test-model".to_string(),
temperature: Some(0.0),
max_tokens: Some(32),
model_extra: HashMap::new(),
max_tool_iterations: 1,
token_limit: 4096,
workspace_dir: std::path::PathBuf::from("/tmp/test-workspace"),
}
}
}