1、修复和提升记忆系统 2、简化消息标记

This commit is contained in:
xiaoxixi 2026-05-08 15:09:27 +08:00
parent 2617558a27
commit 5d62141658
16 changed files with 263 additions and 79 deletions

View File

@ -36,3 +36,4 @@ textwrap = "0.16"
chrono = "0.4" chrono = "0.4"
hostname = "0.3" hostname = "0.3"
sqlx = { version = "0.8", features = ["sqlite", "macros", "chrono", "runtime-tokio"] } sqlx = { version = "0.8", features = ["sqlite", "macros", "chrono", "runtime-tokio"] }
jieba-rs = "0.9"

View File

@ -73,7 +73,7 @@ fn truncate_tool_result(output: &str) -> String {
// Even after removing suffix, still too long - take from beginning // Even after removing suffix, still too long - take from beginning
format!( format!(
"{}...\n\n[Output truncated - {} characters removed]", "{}...\n\n[Output truncated - {} characters removed]",
&output[..MAX_TOOL_RESULT_CHARS - 100], &output[..output.ceil_char_boundary(MAX_TOOL_RESULT_CHARS - 100)],
output.len() - MAX_TOOL_RESULT_CHARS + 100 output.len() - MAX_TOOL_RESULT_CHARS + 100
) )
} else { } else {
@ -81,7 +81,7 @@ fn truncate_tool_result(output: &str) -> String {
format!( format!(
"...\n\n[Output truncated - {} characters removed]\n\n{}", "...\n\n[Output truncated - {} characters removed]\n\n{}",
truncated_start_len, truncated_start_len,
&output[truncated_start_len..] &output[output.floor_char_boundary(truncated_start_len)..]
) )
} }
} }

View File

@ -112,7 +112,7 @@ impl ContextCompressor {
let removed = msg.content.len() - limit; let removed = msg.content.len() - limit;
msg.content = format!( msg.content = format!(
"{}...\n\n[Output truncated - {} characters removed]", "{}...\n\n[Output truncated - {} characters removed]",
&msg.content[..limit.min(msg.content.len())], &msg.content[..msg.content.ceil_char_boundary(limit)],
removed removed
); );
modified += 1; modified += 1;
@ -313,7 +313,7 @@ impl ContextCompressor {
let transcript = if transcript.len() > self.config.summary_max_chars { let transcript = if transcript.len() > self.config.summary_max_chars {
format!( format!(
"{}...\n\n[Transcript truncated - {} characters removed]", "{}...\n\n[Transcript truncated - {} characters removed]",
&transcript[..self.config.summary_max_chars], &transcript[..transcript.ceil_char_boundary(self.config.summary_max_chars)],
transcript.len() - self.config.summary_max_chars transcript.len() - self.config.summary_max_chars
) )
} else { } else {
@ -356,7 +356,7 @@ Be concise, aim for {} characters or less.
Err(e) => { Err(e) => {
// Fallback: just truncate the transcript // Fallback: just truncate the transcript
tracing::warn!(error = %e, "LLM summarization failed, using truncated transcript"); tracing::warn!(error = %e, "LLM summarization failed, using truncated transcript");
Ok(transcript[..transcript.len().min(2000)].to_string()) Ok(transcript[..transcript.ceil_char_boundary(2000)].to_string())
} }
} }
} }

View File

@ -243,10 +243,9 @@ impl PromptSection for CrossChannelSection {
- dialog_id: chat dialog - dialog_id: chat dialog
{}### {}###
`[message from X to Y]` assistant `[message from X]` assistant
send_message send_message
- X: ID "unknown" - X: ID "unknown"
- Y: session ID (<channel>:<chat_id>:<dialog_id>)

View File

@ -778,7 +778,7 @@ impl FeishuChannel {
let payload_content = if msg_type == "text" { let payload_content = if msg_type == "text" {
let truncated = if content.len() > MAX_TEXT_LENGTH { let truncated = if content.len() > MAX_TEXT_LENGTH {
format!("{}...\n\n[Content truncated due to length limit]", &content[..MAX_TEXT_LENGTH]) format!("{}...\n\n[Content truncated due to length limit]", &content[..content.ceil_char_boundary(MAX_TEXT_LENGTH)])
} else { } else {
content.to_string() content.to_string()
}; };
@ -788,7 +788,7 @@ impl FeishuChannel {
// But we still need to check length // But we still need to check length
if content.len() > MAX_TEXT_LENGTH { if content.len() > MAX_TEXT_LENGTH {
// Fallback to truncated text for post as well // Fallback to truncated text for post as well
serde_json::json!({ "text": format!("{}...\n\n[Content truncated due to length limit]", &content[..MAX_TEXT_LENGTH]) }).to_string() serde_json::json!({ "text": format!("{}...\n\n[Content truncated due to length limit]", &content[..content.ceil_char_boundary(MAX_TEXT_LENGTH)]) }).to_string()
} else { } else {
content.to_string() content.to_string()
} }
@ -2136,7 +2136,7 @@ impl Channel for FeishuChannel {
if !msg.content.is_empty() { if !msg.content.is_empty() {
const MAX_TEXT_LENGTH: usize = 60_000; const MAX_TEXT_LENGTH: usize = 60_000;
let truncated_text = if msg.content.len() > MAX_TEXT_LENGTH { let truncated_text = if msg.content.len() > MAX_TEXT_LENGTH {
format!("{}...\n\n[Content truncated due to length limit]", &msg.content[..MAX_TEXT_LENGTH]) format!("{}...\n\n[Content truncated due to length limit]", &msg.content[..msg.content.ceil_char_boundary(MAX_TEXT_LENGTH)])
} else { } else {
msg.content.clone() msg.content.clone()
}; };

View File

@ -68,11 +68,12 @@ impl MemoryManager {
&self, &self,
since: i64, since: i64,
until: i64, until: i64,
query: Option<&str>,
limit: usize, limit: usize,
category: Option<MemoryCategory>, category: Option<MemoryCategory>,
) -> Result<Vec<MemoryEntry>, crate::storage::StorageError> { ) -> Result<Vec<MemoryEntry>, crate::storage::StorageError> {
self.storage self.storage
.search_memories_by_time(since, until, category.as_ref(), limit) .search_memories_by_time(since, until, query, category.as_ref(), limit)
.await .await
} }

View File

@ -202,7 +202,7 @@ impl LLMProvider for AnthropicProvider {
} else { } else {
let mut blocks = convert_content_blocks(&m.content); let mut blocks = convert_content_blocks(&m.content);
// Append tool_use blocks from assistant messages with tool calls // Append tool_use blocks from assistant messages with tool calls
if let Some(ref tool_calls) = m.tool_calls { if let Some(tool_calls) = m.tool_calls.as_ref().filter(|c| !c.is_empty()) {
for tc in tool_calls { for tc in tool_calls {
blocks.push(serde_json::json!({ blocks.push(serde_json::json!({
"type": "tool_use", "type": "tool_use",

View File

@ -77,7 +77,7 @@ impl OpenAIProvider {
"tool_call_id": m.tool_call_id, "tool_call_id": m.tool_call_id,
"name": m.name, "name": m.name,
}) })
} else if m.role == "assistant" && m.tool_calls.is_some() { } else if m.role == "assistant" && m.tool_calls.as_ref().map_or(false, |c| !c.is_empty()) {
json!({ json!({
"role": m.role, "role": m.role,
"content": convert_content_blocks(&m.content), "content": convert_content_blocks(&m.content),

View File

@ -147,7 +147,7 @@ impl Scheduler {
let _ = self.bus.publish_outbound(outbound).await; let _ = self.bus.publish_outbound(outbound).await;
let output_truncated = if output.len() > 8000 { let output_truncated = if output.len() > 8000 {
format!("{}...[truncated]", &output[..8000]) format!("{}...[truncated]", &output[..output.ceil_char_boundary(8000)])
} else { } else {
output.clone() output.clone()
}; };

View File

@ -135,22 +135,25 @@ impl Session {
let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone()); let mut compressor = ContextCompressor::with_config(provider.clone(), provider_config.token_limit, compressor_config, memory_manager.clone());
compressor.set_session_id(Some(id.to_string())); compressor.set_session_id(Some(id.to_string()));
// Convert MessageMeta to ChatMessage // Convert MessageMeta to ChatMessage, then repair damaged tool call chains
// Clear tool_call_id/tool_name — they're not valid across API sessions let mut chat_messages: Vec<ChatMessage> = messages.into_iter().map(|m| {
let chat_messages: Vec<ChatMessage> = messages.into_iter().map(|m| {
ChatMessage { ChatMessage {
id: m.id, id: m.id,
role: m.role, role: m.role,
content: m.content, content: m.content,
media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(), media_refs: m.media_refs.map(|refs| serde_json::from_str(&refs).unwrap_or_default()).unwrap_or_default(),
timestamp: m.created_at, timestamp: m.created_at,
tool_call_id: None, tool_call_id: m.tool_call_id,
tool_name: None, tool_name: m.tool_name,
tool_calls: m.tool_calls.map(|tc| serde_json::from_str(&tc).unwrap_or_default()), tool_calls: m.tool_calls
.and_then(|tc| serde_json::from_str::<Vec<crate::providers::ToolCall>>(&tc).ok())
.filter(|v| !v.is_empty()),
source: m.source.and_then(|s| serde_json::from_str(&s).ok()), source: m.source.and_then(|s| serde_json::from_str(&s).ok()),
} }
}).collect(); }).collect();
repair_tool_call_chains(&mut chat_messages);
let seq_counter = chat_messages.len() as i64 + 1; let seq_counter = chat_messages.len() as i64 + 1;
let total_message_count = chat_messages.len() as i64; let total_message_count = chat_messages.len() as i64;
@ -205,7 +208,7 @@ impl Session {
}, },
tool_call_id: message.tool_call_id.clone(), tool_call_id: message.tool_call_id.clone(),
tool_name: message.tool_name.clone(), tool_name: message.tool_name.clone(),
tool_calls: message.tool_calls.as_ref().map(|tc| serde_json::to_string(tc).unwrap_or_default()), tool_calls: message.tool_calls.as_ref().and_then(|tc| serde_json::to_string(tc).ok()),
source: message.source.as_ref().map(|s| serde_json::to_string(s).unwrap_or_default()), source: message.source.as_ref().map(|s| serde_json::to_string(s).unwrap_or_default()),
created_at: now, created_at: now,
}; };
@ -568,6 +571,67 @@ impl Session {
} }
} }
/// Repair damaged tool call chains after restoring from storage.
/// Handles cases where the gateway crashed mid-loop, leaving assistant
/// tool_calls without corresponding tool result messages.
fn repair_tool_call_chains(messages: &mut Vec<ChatMessage>) {
let mut i = 0;
while i < messages.len() {
let calls = match &messages[i].tool_calls {
Some(calls) if !calls.is_empty() => calls.clone(),
_ => {
i += 1;
continue;
}
};
if messages[i].role != "assistant" {
i += 1;
continue;
}
// Collect expected tool call IDs
let expected_ids: std::collections::HashSet<&str> = calls.iter().map(|c| c.id.as_str()).collect();
let expected_count = expected_ids.len();
// Check following messages for matching tool results (same tool_call_id)
let mut found = 0;
let mut j = i + 1;
while j < messages.len() && found < expected_count {
if messages[j].role == "tool" {
if let Some(ref tc_id) = messages[j].tool_call_id {
if expected_ids.contains(tc_id.as_str()) {
found += 1;
}
}
} else if messages[j].role == "user" || messages[j].role == "assistant" {
// Next user/assistant message — stop scanning, chain is broken
break;
}
j += 1;
}
if found < expected_count {
// Incomplete chain: remove tool_calls and add interruption note
tracing::warn!(
found,
expected = expected_count,
"Repairing incomplete tool call chain — gateway restart likely interrupted execution"
);
let old_content = std::mem::take(&mut messages[i].content);
messages[i].content = format!(
"{}\n\n[Tool calls ({}): {} — execution interrupted by gateway restart]",
old_content,
expected_count,
calls.iter().map(|c| c.name.as_str()).collect::<Vec<_>>().join(", ")
);
messages[i].tool_calls = None;
}
i += 1;
}
}
/// SessionManager 管理所有 Session按 channel_name 路由 /// SessionManager 管理所有 Session按 channel_name 路由
#[derive(Clone)] #[derive(Clone)]
pub struct SessionManager { pub struct SessionManager {
@ -1399,11 +1463,9 @@ impl SessionManager {
} }
let raw_response = result.final_response.content; let raw_response = result.final_response.content;
let target_id = unified_id.to_string();
let prefix = format!( let prefix = format!(
"[message from cron:{}({}) to {}]\n", "[message from cron:{}({})]\n",
job_name, job_id, target_id job_name, job_id
); );
let prefixed_response = format!("{}{}", prefix, raw_response); let prefixed_response = format!("{}{}", prefix, raw_response);
@ -1481,11 +1543,10 @@ impl OutboundMessenger for SessionManager {
(sid, session) (sid, session)
}; };
// Build message prefix: [message from <origin> to <channel:chat_id:dialog_id>] // Build message prefix: [message from <origin>]
let target_id = target_sid.to_string();
let origin = source.from_session.as_deref().unwrap_or("unknown"); let origin = source.from_session.as_deref().unwrap_or("unknown");
let origin_id = source.from_session.clone(); let origin_id = source.from_session.clone();
let prefix = format!("[message from {} to {}] ", origin, target_id); let prefix = format!("[message from {}] ", origin);
let marked_content = format!("{}\n{}", prefix, content); let marked_content = format!("{}\n{}", prefix, content);
// Write source-tagged assistant message to target session history // Write source-tagged assistant message to target session history

View File

@ -1,9 +1,17 @@
use sqlx::Row; use sqlx::Row;
use std::sync::OnceLock;
use jieba_rs::Jieba;
use crate::memory::{MemoryCategory, MemoryEntry}; use crate::memory::{MemoryCategory, MemoryEntry};
use super::StorageError; use super::StorageError;
fn jieba() -> &'static Jieba {
static INSTANCE: OnceLock<Jieba> = OnceLock::new();
INSTANCE.get_or_init(Jieba::new)
}
impl super::Storage { impl super::Storage {
/// Store or update a memory entry (upsert by key). /// Store or update a memory entry (upsert by key).
pub async fn upsert_memory(&self, entry: &MemoryEntry) -> Result<(), StorageError> { pub async fn upsert_memory(&self, entry: &MemoryEntry) -> Result<(), StorageError> {
@ -50,9 +58,11 @@ impl super::Storage {
category: Option<&MemoryCategory>, category: Option<&MemoryCategory>,
limit: usize, limit: usize,
) -> Result<Vec<MemoryEntry>, StorageError> { ) -> Result<Vec<MemoryEntry>, StorageError> {
// Build FTS5 query: wrap each word in quotes and join with OR // Build FTS5 query: segment with jieba, wrap each term in quotes, join with OR
let fts_query = query let fts_query = jieba()
.split_whitespace() .cut(query, true)
.into_iter()
.filter(|w| w.len() > 1 || w.bytes().any(|b| b > 127))
.map(|w| format!("\"{}\"", w.replace('"', ""))) .map(|w| format!("\"{}\"", w.replace('"', "")))
.collect::<Vec<_>>() .collect::<Vec<_>>()
.join(" OR "); .join(" OR ");
@ -80,39 +90,59 @@ impl super::Storage {
let mut entries = parse_memory_rows(&rows)?; let mut entries = parse_memory_rows(&rows)?;
// Fallback to LIKE if FTS5 returned nothing // Fallback to term-based LIKE query if FTS5 returned nothing
if entries.is_empty() { if entries.is_empty() {
let like_pattern = format!("%{}%", query.replace('%', "").replace('_', "")); let terms: Vec<String> = jieba()
let rows = sqlx::query( .cut(query, true)
.into_iter()
.filter(|w| w.len() > 1 || w.bytes().any(|b| b > 127))
.map(|w| w.replace('%', "").replace('_', ""))
.collect();
if !terms.is_empty() {
let like_clauses = terms
.iter()
.map(|_| "(key LIKE ? OR content LIKE ?)")
.collect::<Vec<_>>()
.join(" OR ");
let sql = format!(
r#" r#"
SELECT id, key, content, category, importance, SELECT id, key, content, category, importance,
session_id, created_at, updated_at session_id, created_at, updated_at
FROM memories FROM memories
WHERE (key LIKE ? OR content LIKE ?) WHERE ({})
AND (? IS NULL OR category = ?) AND (? IS NULL OR category = ?)
ORDER BY importance DESC, updated_at DESC ORDER BY importance DESC, updated_at DESC
LIMIT ? LIMIT ?
"#, "#,
) like_clauses
.bind(&like_pattern) );
.bind(&like_pattern)
.bind(category_filter)
.bind(category_filter)
.bind(limit as i64)
.fetch_all(self.pool())
.await?;
let mut query_builder = sqlx::query(&sql);
for term in &terms {
let pattern = format!("%{}%", term);
query_builder = query_builder.bind(pattern.clone()).bind(pattern);
}
query_builder = query_builder
.bind(category_filter)
.bind(category_filter)
.bind(limit as i64);
let rows = query_builder.fetch_all(self.pool()).await?;
entries = parse_memory_rows(&rows)?; entries = parse_memory_rows(&rows)?;
} }
}
Ok(entries) Ok(entries)
} }
/// Retrieve memories within a time range. /// Retrieve memories within a time range, optionally filtered by keyword query.
pub async fn search_memories_by_time( pub async fn search_memories_by_time(
&self, &self,
since: i64, since: i64,
until: i64, until: i64,
query: Option<&str>,
category: Option<&MemoryCategory>, category: Option<&MemoryCategory>,
limit: usize, limit: usize,
) -> Result<Vec<MemoryEntry>, StorageError> { ) -> Result<Vec<MemoryEntry>, StorageError> {
@ -124,7 +154,53 @@ impl super::Storage {
.unwrap_or_default() .unwrap_or_default()
.to_rfc3339(); .to_rfc3339();
let rows = sqlx::query( let rows = if let Some(q) = query {
let terms: Vec<String> = jieba()
.cut(q, true)
.into_iter()
.filter(|w| w.len() > 1 || w.bytes().any(|b| b > 127))
.map(|w| w.replace('%', "").replace('_', ""))
.collect();
if terms.is_empty() {
return Ok(Vec::new());
}
let like_clauses = terms
.iter()
.map(|_| "(key LIKE ? OR content LIKE ?)")
.collect::<Vec<_>>()
.join(" OR ");
let sql = format!(
r#"
SELECT id, key, content, category, importance,
session_id, created_at, updated_at
FROM memories
WHERE ({})
AND created_at >= ? AND created_at <= ?
AND (? IS NULL OR category = ?)
ORDER BY created_at DESC
LIMIT ?
"#,
like_clauses
);
let mut query_builder = sqlx::query(&sql);
for term in &terms {
let pattern = format!("%{}%", term);
query_builder = query_builder.bind(pattern.clone()).bind(pattern);
}
query_builder = query_builder
.bind(&since_dt)
.bind(&until_dt)
.bind(category_filter)
.bind(category_filter)
.bind(limit as i64);
query_builder.fetch_all(self.pool()).await?
} else {
sqlx::query(
r#" r#"
SELECT id, key, content, category, importance, SELECT id, key, content, category, importance,
session_id, created_at, updated_at session_id, created_at, updated_at
@ -141,7 +217,8 @@ impl super::Storage {
.bind(category_filter) .bind(category_filter)
.bind(limit as i64) .bind(limit as i64)
.fetch_all(self.pool()) .fetch_all(self.pool())
.await?; .await?
};
parse_memory_rows(&rows) parse_memory_rows(&rows)
} }

View File

@ -127,6 +127,48 @@ impl Storage {
.execute(&self.pool) .execute(&self.pool)
.await?; .await?;
// Triggers to keep FTS5 index in sync with memories table
sqlx::query(
r#"
CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
INSERT INTO memory_fts(rowid, key, content) VALUES (new.rowid, new.key, new.content);
END
"#,
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
INSERT INTO memory_fts(memory_fts, rowid, key, content)
VALUES ('delete', old.rowid, old.key, old.content);
END
"#,
)
.execute(&self.pool)
.await?;
sqlx::query(
r#"
CREATE TRIGGER IF NOT EXISTS memories_au AFTER UPDATE ON memories BEGIN
INSERT INTO memory_fts(memory_fts, rowid, key, content)
VALUES ('delete', old.rowid, old.key, old.content);
INSERT INTO memory_fts(rowid, key, content)
VALUES (new.rowid, new.key, new.content);
END
"#,
)
.execute(&self.pool)
.await?;
// Rebuild FTS5 index for any existing records
sqlx::query(
"INSERT INTO memory_fts(memory_fts) VALUES ('rebuild')",
)
.execute(&self.pool)
.await?;
// Migration: add last_consolidated_at column if not exists // Migration: add last_consolidated_at column if not exists
sqlx::query( sqlx::query(
r#" r#"

View File

@ -68,9 +68,9 @@ impl BashTool {
let half = MAX_OUTPUT_CHARS / 2; let half = MAX_OUTPUT_CHARS / 2;
format!( format!(
"{}...\n\n(... {} chars truncated ...)\n\n{}", "{}...\n\n(... {} chars truncated ...)\n\n{}",
&output[..half], &output[..output.ceil_char_boundary(half)],
output.len() - MAX_OUTPUT_CHARS, output.len() - MAX_OUTPUT_CHARS,
&output[output.len() - half..] &output[output.floor_char_boundary(output.len() - half)..]
) )
} }
} }

View File

@ -101,7 +101,7 @@ impl HttpRequestTool {
if text.len() > self.max_response_size { if text.len() > self.max_response_size {
format!( format!(
"{}\n\n... [Response truncated due to size limit] ...", "{}\n\n... [Response truncated due to size limit] ...",
&text[..self.max_response_size] &text[..text.ceil_char_boundary(self.max_response_size)]
) )
} else { } else {
text.to_string() text.to_string()

View File

@ -110,9 +110,12 @@ impl Tool for MemoryRecallTool {
} }
fn description(&self) -> &str { fn description(&self) -> &str {
"Search and retrieve entries from long-term memory. \ "Search and retrieve entries from long-term memory using keyword matching. \
Use this to recall previously stored facts, preferences, or conversation history. \ Use this to recall previously stored facts, preferences, or conversation history. \
Supports keyword search and optional time-range filtering." IMPORTANT: query must be a space-separated list of RELEVANT KEYWORDS (not a question or sentence). \
Use multiple synonymous or related terms to increase recall. \
Example: instead of 'what is the user location', use 'user location address city residence'. \
Supports optional time-range filtering via since/until (Unix ms)."
} }
fn read_only(&self) -> bool { fn read_only(&self) -> bool {
@ -125,7 +128,7 @@ impl Tool for MemoryRecallTool {
"properties": { "properties": {
"query": { "query": {
"type": "string", "type": "string",
"description": "Search query — keywords to match against memory keys and content." "description": "Space-separated KEYWORDS for memory search (NOT a natural language question). Use multiple related terms for better recall, e.g. 'address city location residence'."
}, },
"category": { "category": {
"type": "string", "type": "string",
@ -169,7 +172,7 @@ impl Tool for MemoryRecallTool {
.and_then(|v| v.as_i64()) .and_then(|v| v.as_i64())
.unwrap_or(chrono::Utc::now().timestamp_millis()); .unwrap_or(chrono::Utc::now().timestamp_millis());
self.memory self.memory
.recall_by_time(since, until, limit, category) .recall_by_time(since, until, Some(query), limit, category)
.await? .await?
} else { } else {
self.memory.recall(query, limit, category).await? self.memory.recall(query, limit, category).await?

View File

@ -53,7 +53,7 @@ impl WebFetchTool {
if text.len() > self.max_response_size { if text.len() > self.max_response_size {
format!( format!(
"{}\n\n... [Response truncated due to size limit] ...", "{}\n\n... [Response truncated due to size limit] ...",
&text[..self.max_response_size] &text[..text.ceil_char_boundary(self.max_response_size)]
) )
} else { } else {
text.to_string() text.to_string()