diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index 4e9c856..42a0d2c 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -341,7 +341,7 @@ impl AgentLoop { // Build and inject system prompt if not present let has_system = messages.first().map_or(false, |m| m.role == "system"); if !has_system { - let system_prompt = build_system_prompt(&self.workspace_dir, &self.model_name, &self.tools); + let system_prompt = build_system_prompt(&self.workspace_dir, &self.model_name, &self.tools, None); #[cfg(debug_assertions)] tracing::debug!("System prompt injected:\n{}", system_prompt); messages.insert(0, ChatMessage::system(system_prompt)); diff --git a/src/agent/system_prompt.rs b/src/agent/system_prompt.rs index 4a2b976..de63f93 100644 --- a/src/agent/system_prompt.rs +++ b/src/agent/system_prompt.rs @@ -18,6 +18,7 @@ pub struct PromptContext<'a> { pub workspace_dir: &'a Path, pub model_name: &'a str, pub tools: &'a ToolRegistry, + pub session_id: Option<&'a str>, } /// Trait for system prompt sections. @@ -222,37 +223,43 @@ impl PromptSection for CrossChannelSection { "cross_channel" } - fn build(&self, _ctx: &PromptContext<'_>) -> String { - r#"## 关于跨渠道消息和系统通知 + fn build(&self, ctx: &PromptContext<'_>) -> String { + let session_line = if let Some(id) = ctx.session_id { + format!("当前会话的 ID 是 `{}`。\n", id) + } else { + String::new() + }; -当前对话中可能出现带有 `source` 标记的消息,这些消息不是用户直接输入: + format!( + r#"## 关于会话和跨渠道消息 -### 系统通知(source.kind = "system_notification") -来自机器人内部系统(如定时任务、后台任务)的通知。 -- `system_name`: 发出通知的系统名称 -- `task_id`: 关联的任务 ID +### 会话 ID 格式 +每个会话都有唯一的 session ID,由三部分组成::: +- channel: 消息渠道(如 "cli_chat"、"feishu") +- chat_id: 聊天/群组标识 +- dialog_id: 对话标识,同一 chat 下可以有多个 dialog -### 跨渠道消息(source.kind = "cross_channel") -来自其他渠道的消息被写入当前对话。 -- `from_channel`: 来源渠道(如 "feishu") -- `from_user_id`: 来源用户 ID +{}### 跨会话消息 +对话历史中可能出现带有 `[message from X to Y]` 前缀的 assistant 消息, +表示此消息由 send_message 工具从别处发送过来。 +- X: 来源标识,可能是会话 ID、工具名或其他标识字符串;未指定时为 "unknown" +- Y: 目标会话的完整 session ID (::) + +收到此类消息时一般不需要主动处理,只需知晓。如果用户问及相关信息, +可以尝试从来源处获取更多详情。 ### send_message 工具 +向指定会话发送消息。参数: +- target_chat_id: 格式 ::: +- content: 消息内容 -使用 `send_message` 向其他渠道发送消息。参数: -- `target_chat_id`: 目标会话ID,支持两种格式: - 1. `:` — 发送到该聊天下最新活跃的会话,若没有活跃会话则自动创建 - 2. `::` — 发送到指定会话,若会话已过期则自动激活 -- `content`: 要发送的消息内容 -- `origin`(可选): 消息来源标识,不填则自动使用当前会话的完整 session_id - -跨渠道消息到达目标会话时,内容前会带有 `[message from X to Y]` 标记, -表示该消息的来源和目标。目标会话的 LLM 应将此理解为来自其他渠道/会话的消息。 - -### 处理建议 -- 系统通知:可以提及但不建议以此为由改变对话主题 -- 跨渠道消息:当用户提及相关事务时可关联这些消息"# - .to_string() +### chat_manager 工具 +管理会话和查看消息。参数: +- action = "list_sessions" — 列出最近活跃的会话 +- action = "list_channels" — 列出所有可用渠道 +- action = "list_messages" — 查看指定 session 的最近消息,需提供 session_id 和 count"#, + session_line + ) } } @@ -314,11 +321,12 @@ fn load_file_from_dir(dir: &Path, filename: &str, max_chars: usize) -> Option String { +pub fn build_system_prompt(workspace_dir: &Path, model_name: &str, tools: &ToolRegistry, session_id: Option<&str>) -> String { let ctx = PromptContext { workspace_dir, model_name, tools, + session_id, }; SystemPromptBuilder::with_defaults().build(&ctx) } @@ -337,6 +345,7 @@ mod tests { workspace_dir: &temp_dir, model_name: "test-model", tools: &tools, + session_id: None, }; let prompt = SystemPromptBuilder::with_defaults().build(&ctx); @@ -366,7 +375,7 @@ mod tests { let temp_dir = std::env::temp_dir(); let tools = ToolRegistry::new(); - let prompt = build_system_prompt(&temp_dir, "test-model", &tools); + let prompt = build_system_prompt(&temp_dir, "test-model", &tools, None); assert!(!prompt.is_empty()); assert!(prompt.contains("test-model")); diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 78f3f55..e4ea877 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -78,6 +78,11 @@ impl GatewayState { let valid_channels = available_channels.clone(); session_manager.register_outbound_tool(available_channels); + // Register chat_manager tool + session_manager.tools().register( + crate::tools::ChatManagerTool::new(storage.clone(), valid_channels.clone()), + ); + // Initialize scheduler if enabled in config let scheduler_config = config.gateway.scheduler.clone().unwrap_or_default(); if scheduler_config.enabled { @@ -204,6 +209,7 @@ impl GatewayState { let sched = Arc::new(Scheduler::new( self.storage.clone(), self.session_manager.clone(), + self.channel_manager.bus(), scheduler_config, )); tokio::spawn(async move { diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index f70c4d1..acf0275 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::time::Instant; use tokio::time; +use crate::bus::MessageBus; use crate::config::SchedulerConfig; use crate::session::session::HandleResult; use crate::session::SessionManager; @@ -52,6 +53,7 @@ fn now_ms() -> i64 { pub struct Scheduler { storage: Arc, session_manager: Arc, + bus: Arc, config: SchedulerConfig, } @@ -59,11 +61,13 @@ impl Scheduler { pub fn new( storage: Arc, session_manager: Arc, + bus: Arc, config: SchedulerConfig, ) -> Self { Self { storage, session_manager, + bus, config, } } @@ -132,6 +136,16 @@ impl Scheduler { match result { Ok(HandleResult::AgentResponse(output)) => { + let outbound = crate::bus::OutboundMessage { + channel: job.channel.clone(), + chat_id: job.chat_id.clone(), + content: output.clone(), + reply_to: None, + media: vec![], + metadata: std::collections::HashMap::new(), + }; + let _ = self.bus.publish_outbound(outbound).await; + let output_truncated = if output.len() > 8000 { format!("{}...[truncated]", &output[..8000]) } else { @@ -164,6 +178,16 @@ impl Scheduler { ); } Ok(HandleResult::CommandOutput(output)) => { + let outbound = crate::bus::OutboundMessage { + channel: job.channel.clone(), + chat_id: job.chat_id.clone(), + content: output.clone(), + reply_to: None, + media: vec![], + metadata: std::collections::HashMap::new(), + }; + let _ = self.bus.publish_outbound(outbound).await; + let run = JobRun { id: 0, job_id: job.id.clone(), diff --git a/src/session/session.rs b/src/session/session.rs index ebd7330..fa88f32 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -378,6 +378,7 @@ impl Session { &self.provider_config.workspace_dir, &self.provider_config.model_id, &self.tools, + Some(&self.id.to_string()), ); if skills_prompt.trim().is_empty() { @@ -1324,7 +1325,20 @@ impl SessionManager { let skills_prompt = self.skills_loader.build_skills_prompt(); let system_prompt = session_guard.build_system_prompt(&skills_prompt); - history.insert(0, ChatMessage::system(system_prompt)); + let cron_context = format!( + "\n\n## 定时任务执行\n\n\ + 你正在执行定时任务「{}」({})。\n\ + 目标渠道: {}:{}\n\n\ + 定时任务执行规则:\n\ + - 这不是聊天对话,没有人会回复你,不要等待用户输入\n\ + - 你的职责是根据任务指令直接生成要发送的消息内容\n\ + - 只输出最终消息,不要输出中间思考过程或分析\n\ + - 系统会自动将你的回复推送到目标渠道,不要使用 send_message 工具\n\ + - 你的最终回复就是推送给用户的消息原文", + job_name, job_id, channel, chat_id + ); + let full_system_prompt = format!("{}{}", system_prompt, cron_context); + history.insert(0, ChatMessage::system(full_system_prompt)); let history = session_guard.compressor .compress_if_needed(history) @@ -1344,7 +1358,28 @@ impl SessionManager { } } - result.final_response.content + let raw_response = result.final_response.content; + + let target_id = unified_id.to_string(); + let prefix = format!( + "[message from cron:{}({}) to {}]\n", + job_name, job_id, target_id + ); + let prefixed_response = format!("{}{}", prefix, raw_response); + + let source = MessageSource { + kind: SourceKind::CrossChannel, + from_channel: Some("cron".to_string()), + from_session: Some(format!("{}:{}", job_name, job_id)), + from_user_id: None, + system_name: Some(job_name.to_string()), + task_id: Some(job_id.to_string()), + }; + let msg = ChatMessage::assistant_with_source(prefixed_response.clone(), source); + session_guard.add_message(msg, true).await + .map_err(|e| AgentError::Other(format!("persist error: {}", e)))?; + + prefixed_response }; #[cfg(debug_assertions)] diff --git a/src/storage/mod.rs b/src/storage/mod.rs index df77744..e581bf8 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -465,6 +465,79 @@ impl Storage { .collect()) } + pub async fn list_all_active_sessions( + &self, + limit: i64, + ) -> Result, StorageError> { + let rows = sqlx::query( + r#" + SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at + FROM sessions + WHERE deleted_at IS NULL + ORDER BY last_active_at DESC + LIMIT ? + "#, + ) + .bind(limit) + .fetch_all(self.pool()) + .await?; + + Ok(rows + .into_iter() + .map(|row| crate::storage::session::SessionMeta { + id: row.get("id"), + channel: row.get("channel"), + chat_id: row.get("chat_id"), + dialog_id: row.get("dialog_id"), + title: row.get("title"), + created_at: row.get("created_at"), + last_active_at: row.get("last_active_at"), + message_count: row.get("message_count"), + routing_info: row.get("routing_info"), + deleted_at: row.get("deleted_at"), + }) + .collect()) + } + + pub async fn list_recent_messages( + &self, + session_id: &str, + count: i64, + ) -> Result, StorageError> { + let rows = sqlx::query( + r#" + SELECT id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, source, created_at + FROM messages + WHERE session_id = ? + ORDER BY seq DESC + LIMIT ? + "#, + ) + .bind(session_id) + .bind(count) + .fetch_all(self.pool()) + .await?; + + let mut messages: Vec<_> = rows + .into_iter() + .map(|row| crate::storage::message::MessageMeta { + id: row.get("id"), + session_id: row.get("session_id"), + seq: row.get("seq"), + role: row.get("role"), + content: row.get("content"), + media_refs: row.get("media_refs"), + tool_call_id: row.get("tool_call_id"), + tool_name: row.get("tool_name"), + tool_calls: row.get("tool_calls"), + source: row.get("source"), + created_at: row.get("created_at"), + }) + .collect(); + messages.reverse(); + Ok(messages) + } + pub async fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> { sqlx::query(r#"DELETE FROM messages WHERE session_id = ?"#) .bind(session_id) diff --git a/src/tools/chat_manager.rs b/src/tools/chat_manager.rs new file mode 100644 index 0000000..4a5bf28 --- /dev/null +++ b/src/tools/chat_manager.rs @@ -0,0 +1,343 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use serde_json::json; + +use crate::storage::Storage; +use crate::tools::traits::{Tool, ToolResult}; + +pub struct ChatManagerTool { + storage: Arc, + available_channels: Vec, +} + +impl ChatManagerTool { + pub fn new(storage: Arc, available_channels: Vec) -> Self { + Self { + storage, + available_channels, + } + } +} + +#[async_trait] +impl Tool for ChatManagerTool { + fn name(&self) -> &str { + "chat_manager" + } + + fn description(&self) -> &str { + "聊天管理工具。可以列出当前活跃的 session、可用的 channel、以及查看指定 session 的最近消息内容。\ +action 可选值: list_sessions (列出最近活跃会话), list_channels (列出可用渠道), list_messages (查看最近消息)" + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": ["list_sessions", "list_channels", "list_messages"], + "description": "操作类型: list_sessions 列出最近活跃会话, list_channels 列出可用渠道, list_messages 查看指定会话的最近消息" + }, + "session_id": { + "type": "string", + "description": "会话 ID,格式 channel:chat_id:dialog_id,仅在 action 为 list_messages 时必填" + }, + "count": { + "type": "integer", + "description": "获取最近消息的数量,仅在 action 为 list_messages 时有效,默认 20" + } + }, + "required": ["action"] + }) + } + + fn read_only(&self) -> bool { + true + } + + fn concurrency_safe(&self) -> bool { + true + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + let action = args["action"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("missing required parameter: action"))?; + + match action { + "list_channels" => self.list_channels().await, + "list_sessions" => self.list_sessions().await, + "list_messages" => self.list_messages(&args).await, + _ => Ok(ToolResult { + success: false, + output: String::new(), + error: Some(format!( + "Unknown action: {}. Supported: list_sessions, list_channels, list_messages", + action + )), + }), + } + } +} + +impl ChatManagerTool { + async fn list_channels(&self) -> anyhow::Result { + let channels = self.available_channels.join(", "); + Ok(ToolResult { + success: true, + output: format!("可用渠道 ({}): {}", self.available_channels.len(), channels), + error: None, + }) + } + + async fn list_sessions(&self) -> anyhow::Result { + let sessions = self + .storage + .list_all_active_sessions(20) + .await + .map_err(|e| anyhow::anyhow!("Failed to list sessions: {}", e))?; + + if sessions.is_empty() { + return Ok(ToolResult { + success: true, + output: "当前没有活跃的会话".to_string(), + error: None, + }); + } + + let now_ms = chrono::Utc::now().timestamp_millis(); + let mut output = format!("活跃会话 (共 {} 个):\n", sessions.len()); + + for s in &sessions { + let ago = format_duration_ago(now_ms - s.last_active_at); + output.push_str(&format!( + "- {}\n title={} | channel={} chat_id={} | {}条消息 | 最后活动: {}前\n", + s.id, s.title, s.channel, s.chat_id, s.message_count, ago + )); + } + + Ok(ToolResult { + success: true, + output, + error: None, + }) + } + + async fn list_messages(&self, args: &serde_json::Value) -> anyhow::Result { + let session_id = args["session_id"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("missing required parameter: session_id"))?; + + let count = args["count"].as_i64().unwrap_or(20).clamp(1, 100); + + let session = self + .storage + .get_session(session_id) + .await + .map_err(|e| anyhow::anyhow!("Session not found: {}", e))?; + + let messages = self + .storage + .list_recent_messages(session_id, count) + .await + .map_err(|e| anyhow::anyhow!("Failed to load messages: {}", e))?; + + let mut output = format!( + "会话: {} ({})\n--- 最近 {} 条消息 (共 {} 条) ---\n", + session_id, session.title, messages.len(), session.message_count + ); + + if messages.is_empty() { + output.push_str("(暂无消息)\n"); + } else { + for m in &messages { + let time = format_timestamp(m.created_at); + let role_tag = match m.role.as_str() { + "user" => "user ", + "assistant" => "assistant", + "tool" => "tool ", + "system" => "system ", + other => other, + }; + let preview = truncate_content(&m.content, 200); + output.push_str(&format!( + "[{}] {} | {} | {}\n", + m.seq, time, role_tag, preview + )); + } + } + + Ok(ToolResult { + success: true, + output, + error: None, + }) + } +} + +fn format_duration_ago(millis: i64) -> String { + let secs = millis / 1000; + if secs < 60 { + format!("{}秒", secs) + } else if secs < 3600 { + format!("{}分钟", secs / 60) + } else if secs < 86400 { + format!("{}小时", secs / 3600) + } else { + format!("{}天", secs / 86400) + } +} + +fn format_timestamp(ms: i64) -> String { + if let Some(dt) = chrono::DateTime::from_timestamp_millis(ms) { + dt.format("%m-%d %H:%M").to_string() + } else { + ms.to_string() + } +} + +fn truncate_content(content: &str, max_len: usize) -> String { + let content = content.replace('\n', " "); + if content.chars().count() > max_len { + format!("{}...", content.chars().take(max_len).collect::()) + } else { + content + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + async fn create_test_storage() -> (Arc, TempDir) { + let dir = tempfile::tempdir().unwrap(); + let db_path = dir.path().join("test.db"); + let storage = Storage::new(&db_path).await.unwrap(); + (Arc::new(storage), dir) + } + + #[tokio::test] + async fn test_list_channels() { + let (storage, _dir) = create_test_storage().await; + let tool = ChatManagerTool::new(storage, vec!["cli_chat".into(), "feishu".into()]); + + let result = tool + .execute(json!({ "action": "list_channels" })) + .await + .unwrap(); + assert!(result.success); + assert!(result.output.contains("cli_chat")); + assert!(result.output.contains("feishu")); + } + + #[tokio::test] + async fn test_list_sessions_empty() { + let (storage, _dir) = create_test_storage().await; + let tool = ChatManagerTool::new(storage, vec![]); + + let result = tool + .execute(json!({ "action": "list_sessions" })) + .await + .unwrap(); + assert!(result.success); + assert!(result.output.contains("没有")); + } + + #[tokio::test] + async fn test_list_sessions_with_data() { + let (storage, _dir) = create_test_storage().await; + + let now = chrono::Utc::now().timestamp_millis(); + for i in 0..3 { + let meta = crate::storage::session::SessionMeta { + id: format!("cli_chat:sid{}:dialog{}", i, i), + channel: "cli_chat".to_string(), + chat_id: format!("sid{}", i), + dialog_id: format!("dialog{}", i), + title: format!("会话{}", i), + created_at: now - i * 3600_000, + last_active_at: now - i * 3600_000, + message_count: i * 5, + routing_info: None, + deleted_at: None, + }; + storage.upsert_session(&meta).await.unwrap(); + } + + let tool = ChatManagerTool::new(storage, vec![]); + + let result = tool + .execute(json!({ "action": "list_sessions" })) + .await + .unwrap(); + assert!(result.success); + assert!(result.output.contains("会话0")); + assert!(result.output.contains("会话1")); + assert!(result.output.contains("会话2")); + } + + #[tokio::test] + async fn test_list_messages() { + let (storage, _dir) = create_test_storage().await; + + let now = chrono::Utc::now().timestamp_millis(); + let session_id = "cli_chat:sid0:dialog0"; + let meta = crate::storage::session::SessionMeta { + id: session_id.to_string(), + channel: "cli_chat".to_string(), + chat_id: "sid0".to_string(), + dialog_id: "dialog0".to_string(), + title: "测试会话".to_string(), + created_at: now, + last_active_at: now, + message_count: 3, + routing_info: None, + deleted_at: None, + }; + storage.upsert_session(&meta).await.unwrap(); + + for i in 0..3 { + let msg = crate::storage::message::MessageMeta { + id: format!("msg{}", i), + session_id: session_id.to_string(), + seq: i as i64 + 1, + role: if i == 0 { "user".to_string() } else { "assistant".to_string() }, + content: format!("消息内容 {}", i), + media_refs: None, + tool_call_id: None, + tool_name: None, + tool_calls: None, + source: None, + created_at: now + i * 1000, + }; + storage.append_message(session_id, &msg).await.unwrap(); + } + + let tool = ChatManagerTool::new(storage, vec![]); + + let result = tool + .execute(json!({ "action": "list_messages", "session_id": session_id })) + .await + .unwrap(); + assert!(result.success); + assert!(result.output.contains("消息内容 0")); + assert!(result.output.contains("消息内容 2")); + assert!(result.output.contains("测试会话")); + } + + #[tokio::test] + async fn test_unknown_action() { + let (storage, _dir) = create_test_storage().await; + let tool = ChatManagerTool::new(storage, vec![]); + + let result = tool + .execute(json!({ "action": "unknown" })) + .await + .unwrap(); + assert!(!result.success); + assert!(result.error.unwrap().contains("Unknown action")); + } +} diff --git a/src/tools/cron.rs b/src/tools/cron.rs index e180870..f8d83de 100644 --- a/src/tools/cron.rs +++ b/src/tools/cron.rs @@ -38,6 +38,9 @@ impl Tool for CronAddTool { fn description(&self) -> &str { "Create a new scheduled task (cron job). The task will execute an AI prompt on a schedule \ and deliver the result to the specified channel/chat. \ + Important: the execution environment is a fresh session with no access to your current \ + conversation history. The prompt parameter MUST include all necessary context: \ + what to do, the target audience, required output format, and any background information. \ Schedule formats: \ - 'every': {\"type\":\"every\",\"every_ms\":3600000} for every hour, \ - 'at': {\"type\":\"at\",\"at\":} for one-shot, \ diff --git a/src/tools/mod.rs b/src/tools/mod.rs index a007f5b..b08adf9 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -1,5 +1,6 @@ pub mod bash; pub mod calculator; +pub mod chat_manager; pub mod cron; pub mod file_edit; pub mod file_read; @@ -14,6 +15,7 @@ pub mod web_fetch; pub use bash::BashTool; pub use calculator::CalculatorTool; +pub use chat_manager::ChatManagerTool; pub use file_edit::FileEditTool; pub use file_read::FileReadTool; pub use file_write::FileWriteTool;