修复及完善cron任务

This commit is contained in:
xiaoski 2026-05-07 16:24:13 +08:00
parent db609342f7
commit 61d2fe9ef0
9 changed files with 525 additions and 30 deletions

View File

@ -341,7 +341,7 @@ impl AgentLoop {
// Build and inject system prompt if not present
let has_system = messages.first().map_or(false, |m| m.role == "system");
if !has_system {
let system_prompt = build_system_prompt(&self.workspace_dir, &self.model_name, &self.tools);
let system_prompt = build_system_prompt(&self.workspace_dir, &self.model_name, &self.tools, None);
#[cfg(debug_assertions)]
tracing::debug!("System prompt injected:\n{}", system_prompt);
messages.insert(0, ChatMessage::system(system_prompt));

View File

@ -18,6 +18,7 @@ pub struct PromptContext<'a> {
pub workspace_dir: &'a Path,
pub model_name: &'a str,
pub tools: &'a ToolRegistry,
pub session_id: Option<&'a str>,
}
/// Trait for system prompt sections.
@ -222,37 +223,43 @@ impl PromptSection for CrossChannelSection {
"cross_channel"
}
fn build(&self, _ctx: &PromptContext<'_>) -> String {
r#"## 关于跨渠道消息和系统通知
fn build(&self, ctx: &PromptContext<'_>) -> String {
let session_line = if let Some(id) = ctx.session_id {
format!("当前会话的 ID 是 `{}`。\n", id)
} else {
String::new()
};
`source`
format!(
r#"## 关于会话和跨渠道消息
### source.kind = "system_notification"
- `system_name`:
- `task_id`: ID
### ID
session ID<channel>:<chat_id>:<dialog_id>
- channel: "cli_chat""feishu"
- chat_id: /
- dialog_id: chat dialog
### source.kind = "cross_channel"
- `from_channel`: "feishu"
- `from_user_id`: ID
{}###
`[message from X to Y]` assistant
send_message
- X: ID "unknown"
- Y: session ID (<channel>:<chat_id>:<dialog_id>)
### send_message
- target_chat_id: <channel>:<chat_id> <channel>:<chat_id>:<dialog_id>
- content:
使 `send_message`
- `target_chat_id`: ID
1. `<channel>:<chat_id>`
2. `<channel>:<chat_id>:<dialog_id>`
- `content`:
- `origin`: 使 session_id
`[message from X to Y]`
LLM /
###
-
- "#
.to_string()
### chat_manager
- action = "list_sessions"
- action = "list_channels"
- action = "list_messages" session session_id count"#,
session_line
)
}
}
@ -314,11 +321,12 @@ fn load_file_from_dir(dir: &Path, filename: &str, max_chars: usize) -> Option<St
}
/// Build a complete system prompt with default configuration.
pub fn build_system_prompt(workspace_dir: &Path, model_name: &str, tools: &ToolRegistry) -> String {
pub fn build_system_prompt(workspace_dir: &Path, model_name: &str, tools: &ToolRegistry, session_id: Option<&str>) -> String {
let ctx = PromptContext {
workspace_dir,
model_name,
tools,
session_id,
};
SystemPromptBuilder::with_defaults().build(&ctx)
}
@ -337,6 +345,7 @@ mod tests {
workspace_dir: &temp_dir,
model_name: "test-model",
tools: &tools,
session_id: None,
};
let prompt = SystemPromptBuilder::with_defaults().build(&ctx);
@ -366,7 +375,7 @@ mod tests {
let temp_dir = std::env::temp_dir();
let tools = ToolRegistry::new();
let prompt = build_system_prompt(&temp_dir, "test-model", &tools);
let prompt = build_system_prompt(&temp_dir, "test-model", &tools, None);
assert!(!prompt.is_empty());
assert!(prompt.contains("test-model"));

View File

@ -78,6 +78,11 @@ impl GatewayState {
let valid_channels = available_channels.clone();
session_manager.register_outbound_tool(available_channels);
// Register chat_manager tool
session_manager.tools().register(
crate::tools::ChatManagerTool::new(storage.clone(), valid_channels.clone()),
);
// Initialize scheduler if enabled in config
let scheduler_config = config.gateway.scheduler.clone().unwrap_or_default();
if scheduler_config.enabled {
@ -204,6 +209,7 @@ impl GatewayState {
let sched = Arc::new(Scheduler::new(
self.storage.clone(),
self.session_manager.clone(),
self.channel_manager.bus(),
scheduler_config,
));
tokio::spawn(async move {

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use std::time::Instant;
use tokio::time;
use crate::bus::MessageBus;
use crate::config::SchedulerConfig;
use crate::session::session::HandleResult;
use crate::session::SessionManager;
@ -52,6 +53,7 @@ fn now_ms() -> i64 {
pub struct Scheduler {
storage: Arc<Storage>,
session_manager: Arc<SessionManager>,
bus: Arc<MessageBus>,
config: SchedulerConfig,
}
@ -59,11 +61,13 @@ impl Scheduler {
pub fn new(
storage: Arc<Storage>,
session_manager: Arc<SessionManager>,
bus: Arc<MessageBus>,
config: SchedulerConfig,
) -> Self {
Self {
storage,
session_manager,
bus,
config,
}
}
@ -132,6 +136,16 @@ impl Scheduler {
match result {
Ok(HandleResult::AgentResponse(output)) => {
let outbound = crate::bus::OutboundMessage {
channel: job.channel.clone(),
chat_id: job.chat_id.clone(),
content: output.clone(),
reply_to: None,
media: vec![],
metadata: std::collections::HashMap::new(),
};
let _ = self.bus.publish_outbound(outbound).await;
let output_truncated = if output.len() > 8000 {
format!("{}...[truncated]", &output[..8000])
} else {
@ -164,6 +178,16 @@ impl Scheduler {
);
}
Ok(HandleResult::CommandOutput(output)) => {
let outbound = crate::bus::OutboundMessage {
channel: job.channel.clone(),
chat_id: job.chat_id.clone(),
content: output.clone(),
reply_to: None,
media: vec![],
metadata: std::collections::HashMap::new(),
};
let _ = self.bus.publish_outbound(outbound).await;
let run = JobRun {
id: 0,
job_id: job.id.clone(),

View File

@ -378,6 +378,7 @@ impl Session {
&self.provider_config.workspace_dir,
&self.provider_config.model_id,
&self.tools,
Some(&self.id.to_string()),
);
if skills_prompt.trim().is_empty() {
@ -1324,7 +1325,20 @@ impl SessionManager {
let skills_prompt = self.skills_loader.build_skills_prompt();
let system_prompt = session_guard.build_system_prompt(&skills_prompt);
history.insert(0, ChatMessage::system(system_prompt));
let cron_context = format!(
"\n\n## 定时任务执行\n\n\
{}({})\n\
: {}:{}\n\n\
\n\
- \n\
- \n\
- \n\
- 使 send_message \n\
- ",
job_name, job_id, channel, chat_id
);
let full_system_prompt = format!("{}{}", system_prompt, cron_context);
history.insert(0, ChatMessage::system(full_system_prompt));
let history = session_guard.compressor
.compress_if_needed(history)
@ -1344,7 +1358,28 @@ impl SessionManager {
}
}
result.final_response.content
let raw_response = result.final_response.content;
let target_id = unified_id.to_string();
let prefix = format!(
"[message from cron:{}({}) to {}]\n",
job_name, job_id, target_id
);
let prefixed_response = format!("{}{}", prefix, raw_response);
let source = MessageSource {
kind: SourceKind::CrossChannel,
from_channel: Some("cron".to_string()),
from_session: Some(format!("{}:{}", job_name, job_id)),
from_user_id: None,
system_name: Some(job_name.to_string()),
task_id: Some(job_id.to_string()),
};
let msg = ChatMessage::assistant_with_source(prefixed_response.clone(), source);
session_guard.add_message(msg, true).await
.map_err(|e| AgentError::Other(format!("persist error: {}", e)))?;
prefixed_response
};
#[cfg(debug_assertions)]

View File

@ -465,6 +465,79 @@ impl Storage {
.collect())
}
pub async fn list_all_active_sessions(
&self,
limit: i64,
) -> Result<Vec<crate::storage::session::SessionMeta>, StorageError> {
let rows = sqlx::query(
r#"
SELECT id, channel, chat_id, dialog_id, title, created_at, last_active_at, message_count, routing_info, deleted_at
FROM sessions
WHERE deleted_at IS NULL
ORDER BY last_active_at DESC
LIMIT ?
"#,
)
.bind(limit)
.fetch_all(self.pool())
.await?;
Ok(rows
.into_iter()
.map(|row| crate::storage::session::SessionMeta {
id: row.get("id"),
channel: row.get("channel"),
chat_id: row.get("chat_id"),
dialog_id: row.get("dialog_id"),
title: row.get("title"),
created_at: row.get("created_at"),
last_active_at: row.get("last_active_at"),
message_count: row.get("message_count"),
routing_info: row.get("routing_info"),
deleted_at: row.get("deleted_at"),
})
.collect())
}
pub async fn list_recent_messages(
&self,
session_id: &str,
count: i64,
) -> Result<Vec<crate::storage::message::MessageMeta>, StorageError> {
let rows = sqlx::query(
r#"
SELECT id, session_id, seq, role, content, media_refs, tool_call_id, tool_name, tool_calls, source, created_at
FROM messages
WHERE session_id = ?
ORDER BY seq DESC
LIMIT ?
"#,
)
.bind(session_id)
.bind(count)
.fetch_all(self.pool())
.await?;
let mut messages: Vec<_> = rows
.into_iter()
.map(|row| crate::storage::message::MessageMeta {
id: row.get("id"),
session_id: row.get("session_id"),
seq: row.get("seq"),
role: row.get("role"),
content: row.get("content"),
media_refs: row.get("media_refs"),
tool_call_id: row.get("tool_call_id"),
tool_name: row.get("tool_name"),
tool_calls: row.get("tool_calls"),
source: row.get("source"),
created_at: row.get("created_at"),
})
.collect();
messages.reverse();
Ok(messages)
}
pub async fn clear_messages(&self, session_id: &str) -> Result<(), StorageError> {
sqlx::query(r#"DELETE FROM messages WHERE session_id = ?"#)
.bind(session_id)

343
src/tools/chat_manager.rs Normal file
View File

@ -0,0 +1,343 @@
use std::sync::Arc;
use async_trait::async_trait;
use serde_json::json;
use crate::storage::Storage;
use crate::tools::traits::{Tool, ToolResult};
pub struct ChatManagerTool {
storage: Arc<Storage>,
available_channels: Vec<String>,
}
impl ChatManagerTool {
pub fn new(storage: Arc<Storage>, available_channels: Vec<String>) -> Self {
Self {
storage,
available_channels,
}
}
}
#[async_trait]
impl Tool for ChatManagerTool {
fn name(&self) -> &str {
"chat_manager"
}
fn description(&self) -> &str {
"聊天管理工具。可以列出当前活跃的 session、可用的 channel、以及查看指定 session 的最近消息内容。\
action : list_sessions (), list_channels (), list_messages ()"
}
fn parameters_schema(&self) -> serde_json::Value {
json!({
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": ["list_sessions", "list_channels", "list_messages"],
"description": "操作类型: list_sessions 列出最近活跃会话, list_channels 列出可用渠道, list_messages 查看指定会话的最近消息"
},
"session_id": {
"type": "string",
"description": "会话 ID格式 channel:chat_id:dialog_id仅在 action 为 list_messages 时必填"
},
"count": {
"type": "integer",
"description": "获取最近消息的数量,仅在 action 为 list_messages 时有效,默认 20"
}
},
"required": ["action"]
})
}
fn read_only(&self) -> bool {
true
}
fn concurrency_safe(&self) -> bool {
true
}
async fn execute(&self, args: serde_json::Value) -> anyhow::Result<ToolResult> {
let action = args["action"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("missing required parameter: action"))?;
match action {
"list_channels" => self.list_channels().await,
"list_sessions" => self.list_sessions().await,
"list_messages" => self.list_messages(&args).await,
_ => Ok(ToolResult {
success: false,
output: String::new(),
error: Some(format!(
"Unknown action: {}. Supported: list_sessions, list_channels, list_messages",
action
)),
}),
}
}
}
impl ChatManagerTool {
async fn list_channels(&self) -> anyhow::Result<ToolResult> {
let channels = self.available_channels.join(", ");
Ok(ToolResult {
success: true,
output: format!("可用渠道 ({}): {}", self.available_channels.len(), channels),
error: None,
})
}
async fn list_sessions(&self) -> anyhow::Result<ToolResult> {
let sessions = self
.storage
.list_all_active_sessions(20)
.await
.map_err(|e| anyhow::anyhow!("Failed to list sessions: {}", e))?;
if sessions.is_empty() {
return Ok(ToolResult {
success: true,
output: "当前没有活跃的会话".to_string(),
error: None,
});
}
let now_ms = chrono::Utc::now().timestamp_millis();
let mut output = format!("活跃会话 (共 {} 个):\n", sessions.len());
for s in &sessions {
let ago = format_duration_ago(now_ms - s.last_active_at);
output.push_str(&format!(
"- {}\n title={} | channel={} chat_id={} | {}条消息 | 最后活动: {}前\n",
s.id, s.title, s.channel, s.chat_id, s.message_count, ago
));
}
Ok(ToolResult {
success: true,
output,
error: None,
})
}
async fn list_messages(&self, args: &serde_json::Value) -> anyhow::Result<ToolResult> {
let session_id = args["session_id"]
.as_str()
.ok_or_else(|| anyhow::anyhow!("missing required parameter: session_id"))?;
let count = args["count"].as_i64().unwrap_or(20).clamp(1, 100);
let session = self
.storage
.get_session(session_id)
.await
.map_err(|e| anyhow::anyhow!("Session not found: {}", e))?;
let messages = self
.storage
.list_recent_messages(session_id, count)
.await
.map_err(|e| anyhow::anyhow!("Failed to load messages: {}", e))?;
let mut output = format!(
"会话: {} ({})\n--- 最近 {} 条消息 (共 {} 条) ---\n",
session_id, session.title, messages.len(), session.message_count
);
if messages.is_empty() {
output.push_str("(暂无消息)\n");
} else {
for m in &messages {
let time = format_timestamp(m.created_at);
let role_tag = match m.role.as_str() {
"user" => "user ",
"assistant" => "assistant",
"tool" => "tool ",
"system" => "system ",
other => other,
};
let preview = truncate_content(&m.content, 200);
output.push_str(&format!(
"[{}] {} | {} | {}\n",
m.seq, time, role_tag, preview
));
}
}
Ok(ToolResult {
success: true,
output,
error: None,
})
}
}
fn format_duration_ago(millis: i64) -> String {
let secs = millis / 1000;
if secs < 60 {
format!("{}", secs)
} else if secs < 3600 {
format!("{}分钟", secs / 60)
} else if secs < 86400 {
format!("{}小时", secs / 3600)
} else {
format!("{}", secs / 86400)
}
}
fn format_timestamp(ms: i64) -> String {
if let Some(dt) = chrono::DateTime::from_timestamp_millis(ms) {
dt.format("%m-%d %H:%M").to_string()
} else {
ms.to_string()
}
}
fn truncate_content(content: &str, max_len: usize) -> String {
let content = content.replace('\n', " ");
if content.chars().count() > max_len {
format!("{}...", content.chars().take(max_len).collect::<String>())
} else {
content
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
async fn create_test_storage() -> (Arc<Storage>, TempDir) {
let dir = tempfile::tempdir().unwrap();
let db_path = dir.path().join("test.db");
let storage = Storage::new(&db_path).await.unwrap();
(Arc::new(storage), dir)
}
#[tokio::test]
async fn test_list_channels() {
let (storage, _dir) = create_test_storage().await;
let tool = ChatManagerTool::new(storage, vec!["cli_chat".into(), "feishu".into()]);
let result = tool
.execute(json!({ "action": "list_channels" }))
.await
.unwrap();
assert!(result.success);
assert!(result.output.contains("cli_chat"));
assert!(result.output.contains("feishu"));
}
#[tokio::test]
async fn test_list_sessions_empty() {
let (storage, _dir) = create_test_storage().await;
let tool = ChatManagerTool::new(storage, vec![]);
let result = tool
.execute(json!({ "action": "list_sessions" }))
.await
.unwrap();
assert!(result.success);
assert!(result.output.contains("没有"));
}
#[tokio::test]
async fn test_list_sessions_with_data() {
let (storage, _dir) = create_test_storage().await;
let now = chrono::Utc::now().timestamp_millis();
for i in 0..3 {
let meta = crate::storage::session::SessionMeta {
id: format!("cli_chat:sid{}:dialog{}", i, i),
channel: "cli_chat".to_string(),
chat_id: format!("sid{}", i),
dialog_id: format!("dialog{}", i),
title: format!("会话{}", i),
created_at: now - i * 3600_000,
last_active_at: now - i * 3600_000,
message_count: i * 5,
routing_info: None,
deleted_at: None,
};
storage.upsert_session(&meta).await.unwrap();
}
let tool = ChatManagerTool::new(storage, vec![]);
let result = tool
.execute(json!({ "action": "list_sessions" }))
.await
.unwrap();
assert!(result.success);
assert!(result.output.contains("会话0"));
assert!(result.output.contains("会话1"));
assert!(result.output.contains("会话2"));
}
#[tokio::test]
async fn test_list_messages() {
let (storage, _dir) = create_test_storage().await;
let now = chrono::Utc::now().timestamp_millis();
let session_id = "cli_chat:sid0:dialog0";
let meta = crate::storage::session::SessionMeta {
id: session_id.to_string(),
channel: "cli_chat".to_string(),
chat_id: "sid0".to_string(),
dialog_id: "dialog0".to_string(),
title: "测试会话".to_string(),
created_at: now,
last_active_at: now,
message_count: 3,
routing_info: None,
deleted_at: None,
};
storage.upsert_session(&meta).await.unwrap();
for i in 0..3 {
let msg = crate::storage::message::MessageMeta {
id: format!("msg{}", i),
session_id: session_id.to_string(),
seq: i as i64 + 1,
role: if i == 0 { "user".to_string() } else { "assistant".to_string() },
content: format!("消息内容 {}", i),
media_refs: None,
tool_call_id: None,
tool_name: None,
tool_calls: None,
source: None,
created_at: now + i * 1000,
};
storage.append_message(session_id, &msg).await.unwrap();
}
let tool = ChatManagerTool::new(storage, vec![]);
let result = tool
.execute(json!({ "action": "list_messages", "session_id": session_id }))
.await
.unwrap();
assert!(result.success);
assert!(result.output.contains("消息内容 0"));
assert!(result.output.contains("消息内容 2"));
assert!(result.output.contains("测试会话"));
}
#[tokio::test]
async fn test_unknown_action() {
let (storage, _dir) = create_test_storage().await;
let tool = ChatManagerTool::new(storage, vec![]);
let result = tool
.execute(json!({ "action": "unknown" }))
.await
.unwrap();
assert!(!result.success);
assert!(result.error.unwrap().contains("Unknown action"));
}
}

View File

@ -38,6 +38,9 @@ impl Tool for CronAddTool {
fn description(&self) -> &str {
"Create a new scheduled task (cron job). The task will execute an AI prompt on a schedule \
and deliver the result to the specified channel/chat. \
Important: the execution environment is a fresh session with no access to your current \
conversation history. The prompt parameter MUST include all necessary context: \
what to do, the target audience, required output format, and any background information. \
Schedule formats: \
- 'every': {\"type\":\"every\",\"every_ms\":3600000} for every hour, \
- 'at': {\"type\":\"at\",\"at\":<unix_timestamp_ms>} for one-shot, \

View File

@ -1,5 +1,6 @@
pub mod bash;
pub mod calculator;
pub mod chat_manager;
pub mod cron;
pub mod file_edit;
pub mod file_read;
@ -14,6 +15,7 @@ pub mod web_fetch;
pub use bash::BashTool;
pub use calculator::CalculatorTool;
pub use chat_manager::ChatManagerTool;
pub use file_edit::FileEditTool;
pub use file_read::FileReadTool;
pub use file_write::FileWriteTool;