diff --git a/Cargo.toml b/Cargo.toml index f253a9b..82c9ba6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,3 +36,4 @@ textwrap = "0.16" chrono = "0.4" hostname = "0.3" sqlx = { version = "0.8", features = ["sqlite", "macros", "chrono", "runtime-tokio"] } +jieba-rs = "0.9" diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index 9dc6d3a..6aed828 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -73,7 +73,7 @@ fn truncate_tool_result(output: &str) -> String { // Even after removing suffix, still too long - take from beginning format!( "{}...\n\n[Output truncated - {} characters removed]", - &output[..MAX_TOOL_RESULT_CHARS - 100], + &output[..output.ceil_char_boundary(MAX_TOOL_RESULT_CHARS - 100)], output.len() - MAX_TOOL_RESULT_CHARS + 100 ) } else { @@ -81,7 +81,7 @@ fn truncate_tool_result(output: &str) -> String { format!( "...\n\n[Output truncated - {} characters removed]\n\n{}", truncated_start_len, - &output[truncated_start_len..] + &output[output.floor_char_boundary(truncated_start_len)..] ) } } diff --git a/src/agent/context_compressor.rs b/src/agent/context_compressor.rs index 66a6703..0bf43ac 100644 --- a/src/agent/context_compressor.rs +++ b/src/agent/context_compressor.rs @@ -112,7 +112,7 @@ impl ContextCompressor { let removed = msg.content.len() - limit; msg.content = format!( "{}...\n\n[Output truncated - {} characters removed]", - &msg.content[..limit.min(msg.content.len())], + &msg.content[..msg.content.ceil_char_boundary(limit)], removed ); modified += 1; @@ -313,7 +313,7 @@ impl ContextCompressor { let transcript = if transcript.len() > self.config.summary_max_chars { format!( "{}...\n\n[Transcript truncated - {} characters removed]", - &transcript[..self.config.summary_max_chars], + &transcript[..transcript.ceil_char_boundary(self.config.summary_max_chars)], transcript.len() - self.config.summary_max_chars ) } else { @@ -356,7 +356,7 @@ Be concise, aim for {} characters or less. Err(e) => { // Fallback: just truncate the transcript tracing::warn!(error = %e, "LLM summarization failed, using truncated transcript"); - Ok(transcript[..transcript.len().min(2000)].to_string()) + Ok(transcript[..transcript.ceil_char_boundary(2000)].to_string()) } } } diff --git a/src/agent/system_prompt.rs b/src/agent/system_prompt.rs index 78c2a51..4b6cce9 100644 --- a/src/agent/system_prompt.rs +++ b/src/agent/system_prompt.rs @@ -243,10 +243,9 @@ impl PromptSection for CrossChannelSection { - dialog_id: 对话标识,同一 chat 下可以有多个 dialog {}### 跨会话消息 -对话历史中可能出现带有 `[message from X to Y]` 前缀的 assistant 消息, +对话历史中可能出现带有 `[message from X]` 前缀的 assistant 消息, 表示此消息由 send_message 工具从别处发送过来。 - X: 来源标识,可能是会话 ID、工具名或其他标识字符串;未指定时为 "unknown" -- Y: 目标会话的完整 session ID (::) 收到此类消息时一般不需要主动处理,只需知晓。如果用户问及相关信息, 可以尝试从来源处获取更多详情。 diff --git a/src/channels/feishu.rs b/src/channels/feishu.rs index 7329618..b008b23 100644 --- a/src/channels/feishu.rs +++ b/src/channels/feishu.rs @@ -778,7 +778,7 @@ impl FeishuChannel { let payload_content = if msg_type == "text" { let truncated = if content.len() > MAX_TEXT_LENGTH { - format!("{}...\n\n[Content truncated due to length limit]", &content[..MAX_TEXT_LENGTH]) + format!("{}...\n\n[Content truncated due to length limit]", &content[..content.ceil_char_boundary(MAX_TEXT_LENGTH)]) } else { content.to_string() }; @@ -788,7 +788,7 @@ impl FeishuChannel { // But we still need to check length if content.len() > MAX_TEXT_LENGTH { // Fallback to truncated text for post as well - serde_json::json!({ "text": format!("{}...\n\n[Content truncated due to length limit]", &content[..MAX_TEXT_LENGTH]) }).to_string() + serde_json::json!({ "text": format!("{}...\n\n[Content truncated due to length limit]", &content[..content.ceil_char_boundary(MAX_TEXT_LENGTH)]) }).to_string() } else { content.to_string() } @@ -2136,7 +2136,7 @@ impl Channel for FeishuChannel { if !msg.content.is_empty() { const MAX_TEXT_LENGTH: usize = 60_000; let truncated_text = if msg.content.len() > MAX_TEXT_LENGTH { - format!("{}...\n\n[Content truncated due to length limit]", &msg.content[..MAX_TEXT_LENGTH]) + format!("{}...\n\n[Content truncated due to length limit]", &msg.content[..msg.content.ceil_char_boundary(MAX_TEXT_LENGTH)]) } else { msg.content.clone() }; diff --git a/src/memory/mod.rs b/src/memory/mod.rs index e68883e..9699050 100644 --- a/src/memory/mod.rs +++ b/src/memory/mod.rs @@ -68,11 +68,12 @@ impl MemoryManager { &self, since: i64, until: i64, + query: Option<&str>, limit: usize, category: Option, ) -> Result, crate::storage::StorageError> { self.storage - .search_memories_by_time(since, until, category.as_ref(), limit) + .search_memories_by_time(since, until, query, category.as_ref(), limit) .await } diff --git a/src/providers/anthropic.rs b/src/providers/anthropic.rs index cfbd6c1..fd16c55 100644 --- a/src/providers/anthropic.rs +++ b/src/providers/anthropic.rs @@ -202,7 +202,7 @@ impl LLMProvider for AnthropicProvider { } else { let mut blocks = convert_content_blocks(&m.content); // Append tool_use blocks from assistant messages with tool calls - if let Some(ref tool_calls) = m.tool_calls { + if let Some(tool_calls) = m.tool_calls.as_ref().filter(|c| !c.is_empty()) { for tc in tool_calls { blocks.push(serde_json::json!({ "type": "tool_use", diff --git a/src/providers/openai.rs b/src/providers/openai.rs index 2fbdb76..f3d334a 100644 --- a/src/providers/openai.rs +++ b/src/providers/openai.rs @@ -77,7 +77,7 @@ impl OpenAIProvider { "tool_call_id": m.tool_call_id, "name": m.name, }) - } else if m.role == "assistant" && m.tool_calls.is_some() { + } else if m.role == "assistant" && m.tool_calls.as_ref().map_or(false, |c| !c.is_empty()) { json!({ "role": m.role, "content": convert_content_blocks(&m.content), diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index acf0275..10602a7 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -147,7 +147,7 @@ impl Scheduler { let _ = self.bus.publish_outbound(outbound).await; let output_truncated = if output.len() > 8000 { - format!("{}...[truncated]", &output[..8000]) + format!("{}...[truncated]", &output[..output.ceil_char_boundary(8000)]) } else { output.clone() }; diff --git a/src/session/session.rs b/src/session/session.rs index 7ca56e6..da1b790 100644 --- a/src/session/session.rs +++ b/src/session/session.rs @@ -135,22 +135,25 @@ impl Session { let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone()); compressor.set_session_id(Some(id.to_string())); - // Convert MessageMeta to ChatMessage - // Clear tool_call_id/tool_name — they're not valid across API sessions - let chat_messages: Vec = messages.into_iter().map(|m| { + // Convert MessageMeta to ChatMessage, then repair damaged tool call chains + let mut chat_messages: Vec = messages.into_iter().map(|m| { ChatMessage { id: m.id, role: m.role, content: m.content, media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(), timestamp: m.created_at, - tool_call_id: None, - tool_name: None, - tool_calls: m.tool_calls.map(|tc| serde_json::from_str(&tc).unwrap_or_default()), + tool_call_id: m.tool_call_id, + tool_name: m.tool_name, + tool_calls: m.tool_calls + .and_then(|tc| serde_json::from_str::>(&tc).ok()) + .filter(|v| !v.is_empty()), source: m.source.and_then(|s| serde_json::from_str(&s).ok()), } }).collect(); + repair_tool_call_chains(&mut chat_messages); + let seq_counter = chat_messages.len() as i64 + 1; let total_message_count = chat_messages.len() as i64; @@ -205,7 +208,7 @@ impl Session { }, tool_call_id: message.tool_call_id.clone(), tool_name: message.tool_name.clone(), - tool_calls: message.tool_calls.as_ref().map(|tc| serde_json::to_string(tc).unwrap_or_default()), + tool_calls: message.tool_calls.as_ref().and_then(|tc| serde_json::to_string(tc).ok()), source: message.source.as_ref().map(|s| serde_json::to_string(s).unwrap_or_default()), created_at: now, }; @@ -568,6 +571,67 @@ impl Session { } } +/// Repair damaged tool call chains after restoring from storage. +/// Handles cases where the gateway crashed mid-loop, leaving assistant +/// tool_calls without corresponding tool result messages. +fn repair_tool_call_chains(messages: &mut Vec) { + let mut i = 0; + while i < messages.len() { + let calls = match &messages[i].tool_calls { + Some(calls) if !calls.is_empty() => calls.clone(), + _ => { + i += 1; + continue; + } + }; + + if messages[i].role != "assistant" { + i += 1; + continue; + } + + // Collect expected tool call IDs + let expected_ids: std::collections::HashSet<&str> = calls.iter().map(|c| c.id.as_str()).collect(); + let expected_count = expected_ids.len(); + + // Check following messages for matching tool results (same tool_call_id) + let mut found = 0; + let mut j = i + 1; + while j < messages.len() && found < expected_count { + if messages[j].role == "tool" { + if let Some(ref tc_id) = messages[j].tool_call_id { + if expected_ids.contains(tc_id.as_str()) { + found += 1; + } + } + } else if messages[j].role == "user" || messages[j].role == "assistant" { + // Next user/assistant message — stop scanning, chain is broken + break; + } + j += 1; + } + + if found < expected_count { + // Incomplete chain: remove tool_calls and add interruption note + tracing::warn!( + found, + expected = expected_count, + "Repairing incomplete tool call chain — gateway restart likely interrupted execution" + ); + let old_content = std::mem::take(&mut messages[i].content); + messages[i].content = format!( + "{}\n\n[Tool calls ({}): {} — execution interrupted by gateway restart]", + old_content, + expected_count, + calls.iter().map(|c| c.name.as_str()).collect::>().join(", ") + ); + messages[i].tool_calls = None; + } + + i += 1; + } +} + /// SessionManager 管理所有 Session,按 channel_name 路由 #[derive(Clone)] pub struct SessionManager { @@ -1399,11 +1463,9 @@ impl SessionManager { } let raw_response = result.final_response.content; - - let target_id = unified_id.to_string(); let prefix = format!( - "[message from cron:{}({}) to {}]\n", - job_name, job_id, target_id + "[message from cron:{}({})]\n", + job_name, job_id ); let prefixed_response = format!("{}{}", prefix, raw_response); @@ -1481,11 +1543,10 @@ impl OutboundMessenger for SessionManager { (sid, session) }; - // Build message prefix: [message from to ] - let target_id = target_sid.to_string(); + // Build message prefix: [message from ] let origin = source.from_session.as_deref().unwrap_or("unknown"); let origin_id = source.from_session.clone(); - let prefix = format!("[message from {} to {}] ", origin, target_id); + let prefix = format!("[message from {}] ", origin); let marked_content = format!("{}\n{}", prefix, content); // Write source-tagged assistant message to target session history diff --git a/src/storage/memory.rs b/src/storage/memory.rs index 0ac41b5..4d17b2d 100644 --- a/src/storage/memory.rs +++ b/src/storage/memory.rs @@ -1,9 +1,17 @@ use sqlx::Row; +use std::sync::OnceLock; + +use jieba_rs::Jieba; use crate::memory::{MemoryCategory, MemoryEntry}; use super::StorageError; +fn jieba() -> &'static Jieba { + static INSTANCE: OnceLock = OnceLock::new(); + INSTANCE.get_or_init(Jieba::new) +} + impl super::Storage { /// Store or update a memory entry (upsert by key). pub async fn upsert_memory(&self, entry: &MemoryEntry) -> Result<(), StorageError> { @@ -50,9 +58,11 @@ impl super::Storage { category: Option<&MemoryCategory>, limit: usize, ) -> Result, StorageError> { - // Build FTS5 query: wrap each word in quotes and join with OR - let fts_query = query - .split_whitespace() + // Build FTS5 query: segment with jieba, wrap each term in quotes, join with OR + let fts_query = jieba() + .cut(query, true) + .into_iter() + .filter(|w| w.len() > 1 || w.bytes().any(|b| b > 127)) .map(|w| format!("\"{}\"", w.replace('"', ""))) .collect::>() .join(" OR "); @@ -80,39 +90,59 @@ impl super::Storage { let mut entries = parse_memory_rows(&rows)?; - // Fallback to LIKE if FTS5 returned nothing + // Fallback to term-based LIKE query if FTS5 returned nothing if entries.is_empty() { - let like_pattern = format!("%{}%", query.replace('%', "").replace('_', "")); - let rows = sqlx::query( - r#" - SELECT id, key, content, category, importance, - session_id, created_at, updated_at - FROM memories - WHERE (key LIKE ? OR content LIKE ?) - AND (? IS NULL OR category = ?) - ORDER BY importance DESC, updated_at DESC - LIMIT ? - "#, - ) - .bind(&like_pattern) - .bind(&like_pattern) - .bind(category_filter) - .bind(category_filter) - .bind(limit as i64) - .fetch_all(self.pool()) - .await?; + let terms: Vec = jieba() + .cut(query, true) + .into_iter() + .filter(|w| w.len() > 1 || w.bytes().any(|b| b > 127)) + .map(|w| w.replace('%', "").replace('_', "")) + .collect(); - entries = parse_memory_rows(&rows)?; + if !terms.is_empty() { + let like_clauses = terms + .iter() + .map(|_| "(key LIKE ? OR content LIKE ?)") + .collect::>() + .join(" OR "); + + let sql = format!( + r#" + SELECT id, key, content, category, importance, + session_id, created_at, updated_at + FROM memories + WHERE ({}) + AND (? IS NULL OR category = ?) + ORDER BY importance DESC, updated_at DESC + LIMIT ? + "#, + like_clauses + ); + + let mut query_builder = sqlx::query(&sql); + for term in &terms { + let pattern = format!("%{}%", term); + query_builder = query_builder.bind(pattern.clone()).bind(pattern); + } + query_builder = query_builder + .bind(category_filter) + .bind(category_filter) + .bind(limit as i64); + + let rows = query_builder.fetch_all(self.pool()).await?; + entries = parse_memory_rows(&rows)?; + } } Ok(entries) } - /// Retrieve memories within a time range. + /// Retrieve memories within a time range, optionally filtered by keyword query. pub async fn search_memories_by_time( &self, since: i64, until: i64, + query: Option<&str>, category: Option<&MemoryCategory>, limit: usize, ) -> Result, StorageError> { @@ -124,24 +154,71 @@ impl super::Storage { .unwrap_or_default() .to_rfc3339(); - let rows = sqlx::query( - r#" - SELECT id, key, content, category, importance, - session_id, created_at, updated_at - FROM memories - WHERE created_at >= ? AND created_at <= ? - AND (? IS NULL OR category = ?) - ORDER BY created_at DESC - LIMIT ? - "#, - ) - .bind(&since_dt) - .bind(&until_dt) - .bind(category_filter) - .bind(category_filter) - .bind(limit as i64) - .fetch_all(self.pool()) - .await?; + let rows = if let Some(q) = query { + let terms: Vec = jieba() + .cut(q, true) + .into_iter() + .filter(|w| w.len() > 1 || w.bytes().any(|b| b > 127)) + .map(|w| w.replace('%', "").replace('_', "")) + .collect(); + + if terms.is_empty() { + return Ok(Vec::new()); + } + + let like_clauses = terms + .iter() + .map(|_| "(key LIKE ? OR content LIKE ?)") + .collect::>() + .join(" OR "); + + let sql = format!( + r#" + SELECT id, key, content, category, importance, + session_id, created_at, updated_at + FROM memories + WHERE ({}) + AND created_at >= ? AND created_at <= ? + AND (? IS NULL OR category = ?) + ORDER BY created_at DESC + LIMIT ? + "#, + like_clauses + ); + + let mut query_builder = sqlx::query(&sql); + for term in &terms { + let pattern = format!("%{}%", term); + query_builder = query_builder.bind(pattern.clone()).bind(pattern); + } + query_builder = query_builder + .bind(&since_dt) + .bind(&until_dt) + .bind(category_filter) + .bind(category_filter) + .bind(limit as i64); + + query_builder.fetch_all(self.pool()).await? + } else { + sqlx::query( + r#" + SELECT id, key, content, category, importance, + session_id, created_at, updated_at + FROM memories + WHERE created_at >= ? AND created_at <= ? + AND (? IS NULL OR category = ?) + ORDER BY created_at DESC + LIMIT ? + "#, + ) + .bind(&since_dt) + .bind(&until_dt) + .bind(category_filter) + .bind(category_filter) + .bind(limit as i64) + .fetch_all(self.pool()) + .await? + }; parse_memory_rows(&rows) } diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 69f98f2..c697e74 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -127,6 +127,48 @@ impl Storage { .execute(&self.pool) .await?; + // Triggers to keep FTS5 index in sync with memories table + sqlx::query( + r#" + CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN + INSERT INTO memory_fts(rowid, key, content) VALUES (new.rowid, new.key, new.content); + END + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN + INSERT INTO memory_fts(memory_fts, rowid, key, content) + VALUES ('delete', old.rowid, old.key, old.content); + END + "#, + ) + .execute(&self.pool) + .await?; + + sqlx::query( + r#" + CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN + INSERT INTO memory_fts(memory_fts, rowid, key, content) + VALUES ('delete', old.rowid, old.key, old.content); + INSERT INTO memory_fts(rowid, key, content) + VALUES (new.rowid, new.key, new.content); + END + "#, + ) + .execute(&self.pool) + .await?; + + // Rebuild FTS5 index for any existing records + sqlx::query( + "INSERT INTO memory_fts(memory_fts) VALUES ('rebuild')", + ) + .execute(&self.pool) + .await?; + // Migration: add last_consolidated_at column if not exists sqlx::query( r#" diff --git a/src/tools/bash.rs b/src/tools/bash.rs index d2c1e85..7f03104 100644 --- a/src/tools/bash.rs +++ b/src/tools/bash.rs @@ -68,9 +68,9 @@ impl BashTool { let half = MAX_OUTPUT_CHARS / 2; format!( "{}...\n\n(... {} chars truncated ...)\n\n{}", - &output[..half], + &output[..output.ceil_char_boundary(half)], output.len() - MAX_OUTPUT_CHARS, - &output[output.len() - half..] + &output[output.floor_char_boundary(output.len() - half)..] ) } } diff --git a/src/tools/http_request.rs b/src/tools/http_request.rs index cc70cce..d3b05d3 100644 --- a/src/tools/http_request.rs +++ b/src/tools/http_request.rs @@ -101,7 +101,7 @@ impl HttpRequestTool { if text.len() > self.max_response_size { format!( "{}\n\n... [Response truncated due to size limit] ...", - &text[..self.max_response_size] + &text[..text.ceil_char_boundary(self.max_response_size)] ) } else { text.to_string() diff --git a/src/tools/memory.rs b/src/tools/memory.rs index 795c975..c34ad22 100644 --- a/src/tools/memory.rs +++ b/src/tools/memory.rs @@ -110,9 +110,12 @@ impl Tool for MemoryRecallTool { } fn description(&self) -> &str { - "Search and retrieve entries from long-term memory. \ + "Search and retrieve entries from long-term memory using keyword matching. \ Use this to recall previously stored facts, preferences, or conversation history. \ - Supports keyword search and optional time-range filtering." + IMPORTANT: query must be a space-separated list of RELEVANT KEYWORDS (not a question or sentence). \ + Use multiple synonymous or related terms to increase recall. \ + Example: instead of 'what is the user location', use 'user location address city residence'. \ + Supports optional time-range filtering via since/until (Unix ms)." } fn read_only(&self) -> bool { @@ -125,7 +128,7 @@ impl Tool for MemoryRecallTool { "properties": { "query": { "type": "string", - "description": "Search query — keywords to match against memory keys and content." + "description": "Space-separated KEYWORDS for memory search (NOT a natural language question). Use multiple related terms for better recall, e.g. 'address city location residence'." }, "category": { "type": "string", @@ -169,7 +172,7 @@ impl Tool for MemoryRecallTool { .and_then(|v| v.as_i64()) .unwrap_or(chrono::Utc::now().timestamp_millis()); self.memory - .recall_by_time(since, until, limit, category) + .recall_by_time(since, until, Some(query), limit, category) .await? } else { self.memory.recall(query, limit, category).await? diff --git a/src/tools/web_fetch.rs b/src/tools/web_fetch.rs index 3982bde..aad0987 100644 --- a/src/tools/web_fetch.rs +++ b/src/tools/web_fetch.rs @@ -53,7 +53,7 @@ impl WebFetchTool { if text.len() > self.max_response_size { format!( "{}\n\n... [Response truncated due to size limit] ...", - &text[..self.max_response_size] + &text[..text.ceil_char_boundary(self.max_response_size)] ) } else { text.to_string()