From b6f2de053dbd6b7463b3ad42850664644a9184c9 Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Sat, 23 May 2026 18:38:34 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E7=A7=BB=E9=99=A4=E4=BC=9A?= =?UTF-8?q?=E8=AF=9D=E9=87=8D=E7=BD=AE=E9=80=BB=E8=BE=91=E4=B8=AD=E7=9A=84?= =?UTF-8?q?=20reset=5Fcutoff=5Fseq=20=E5=AD=97=E6=AE=B5=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E4=BC=9A=E8=AF=9D=E7=AE=A1=E7=90=86=E5=92=8C=E5=8E=86?= =?UTF-8?q?=E5=8F=B2=E5=8E=8B=E7=BC=A9=E9=80=BB=E8=BE=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 11 +- docs/PERSISTENCE.md | 38 +--- src/command/handlers/save_session.rs | 1 - src/config/mod.rs | 3 - src/gateway/compaction.rs | 3 - src/gateway/execution.rs | 5 - src/gateway/mod.rs | 3 - src/gateway/runtime.rs | 4 - src/gateway/scheduled_agent_task_service.rs | 1 - src/gateway/session.rs | 28 --- src/gateway/session_factory.rs | 4 - src/gateway/session_history.rs | 46 ----- src/scheduler/mod.rs | 8 - src/storage/mod.rs | 181 +++----------------- src/storage/ports.rs | 9 - src/storage/records.rs | 1 - 16 files changed, 34 insertions(+), 312 deletions(-) diff --git a/README.md b/README.md index 415374c..832344c 100644 --- a/README.md +++ b/README.md @@ -249,13 +249,9 @@ PicoBot 会在 ~/.picobot/agent/AGENT.md 维护一份持久化 Agent 画像文 用户请求、关键标识符、文件路径、URL、工具调用、命令、结果、错误、决策、偏好和当前任务状态。 如果摘要调用失败,会退化为直接截断 transcript,而不会中断主流程。 9. 摘要结果会被包装成一条新的 system 消息,并打上 SYSTEM_CONTEXT_HISTORY_COMPACTION 标记,内容前缀为 [Compressed History]。 -10. 后台提交阶段不会直接修改旧消息,而是向消息表尾部追加一段“新的活动段”: - 依次写入保留的关键 system 消息、压缩摘要消息、最近保留的消息,以及在压缩快照之后新产生的 delta 消息。 -11. 提交成功后,sessions.reset_cutoff_seq 会被推进到压缩前的最大 seq。 - 这样旧消息仍然留在数据库里用于审计或全量导出,但默认恢复到运行时上下文时,只会加载新的活动段。 -12. 为避免并发覆盖,压缩提交前会检查快照是否过期: - 如果 reset_cutoff_seq 已变化,或者压缩期间又有更新导致快照不再匹配,本次压缩会跳过,不会覆盖较新的上下文。 -13. 压缩提交成功后,Session 会重新加载当前 chat 的活动历史,后续轮次看到的就是“关键 system 消息 + 压缩摘要 + 最近若干完整 turn”的新上下文。 +10. 后台提交阶段会删除旧消息,并追加新的活动段: + 依次写入保留的关键 system 消息、压缩摘要消息、最近保留的消息。 +11. 压缩提交成功后,Session 会重新加载当前 chat 的活动历史,后续轮次看到的就是"关键 system 消息 + 压缩摘要 + 最近若干完整 turn"的新上下文。 这套机制的目标不是简单删历史,而是把“远端历史变成可恢复摘要”,同时保证: @@ -695,7 +691,6 @@ cargo run -- agent CLI 中已实现的交互命令包括: - /new [title] - 创建新会话 -- /reset - 重置当前会话上下文 - /sessions - 列出当前通道的所有会话(支持跨通道隔离) - /use - 切换到指定会话 - /rename - 重命名当前会话 diff --git a/docs/PERSISTENCE.md b/docs/PERSISTENCE.md index 61bbc9f..2a23456 100644 --- a/docs/PERSISTENCE.md +++ b/docs/PERSISTENCE.md @@ -90,9 +90,8 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 | `archived_at` | `INTEGER` | 归档时间 | 非空表示会话已归档 | | `deleted_at` | `INTEGER` | 删除时间 | 预留字段,当前读取逻辑会过滤该字段,但当前删除实现是物理删除 | | `message_count` | `INTEGER NOT NULL DEFAULT 0` | 消息数 | 追加消息时自增,清空历史时重置 | -| `reset_cutoff_seq` | `INTEGER NOT NULL DEFAULT 0` | 逻辑重置切点 | `/reset` 后默认只恢复 `seq > reset_cutoff_seq` 的活动段 | -| `user_turn_count` | `INTEGER NOT NULL DEFAULT 0` | 当前活动段用户轮次数 | 只在追加 `role = user` 消息时递增,清空历史和 `/reset` 时归零 | -| `agent_prompt_reinjection_count` | `INTEGER NOT NULL DEFAULT 0` | AGENT.md 周期重注入次数 | 每完成一次“达到配置阈值后的下一轮前注入”就递增,清空历史和 `/reset` 时归零 | +| `user_turn_count` | `INTEGER NOT NULL DEFAULT 0` | 当前活动段用户轮次数 | 只在追加 `role = user` 消息时递增,清空历史时归零 | +| `agent_prompt_reinjection_count` | `INTEGER NOT NULL DEFAULT 0` | AGENT.md 周期重注入次数 | 每完成一次”达到配置阈值后的下一轮前注入”就递增,清空历史时归零 | 索引: @@ -224,12 +223,9 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 ### 7.3 读取历史 -`load_messages(session_id)` 会按 `seq ASC` 读取当前活动段历史,并把 JSON 字段反序列化回 `ChatMessage`。活动段的定义是: +`load_messages(session_id)` 会按 `seq ASC` 读取当前活动段历史,并把 JSON 字段反序列化回 `ChatMessage`。 -- 只返回 `seq > sessions.reset_cutoff_seq` 的消息 -- 因此 `/reset` 之后,旧消息仍然保留在数据库中,但不会默认回灌到运行时上下文 - -如果需要审计、导出或查看完整历史,应使用全量读取接口 `load_all_messages(session_id)`。 +如果需要审计、导出或查看历史,可使用全量读取接口 `load_all_messages(session_id)`(当前与 load_messages 相同)。 因此运行态恢复的是“当前活动段的逻辑顺序”,而不是简单按创建时间排序。只要 `seq` 连续,重放顺序就稳定。 @@ -286,30 +282,14 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 - 删除该会话在 `messages` 中的所有记录 - 将 `sessions.message_count` 重置为 0 -- 将 `sessions.reset_cutoff_seq` 重置为 0 - 将 `sessions.user_turn_count` 重置为 0 - 将 `sessions.agent_prompt_reinjection_count` 重置为 0 - 更新 `updated_at` 和 `last_active_at` - 保留会话本身 -这适合“保留会话入口,但丢弃聊天内容”的场景。 +这适合”保留会话入口,但丢弃聊天内容”的场景。 -### 8.4 逻辑重置 - -`reset_session(session_id)`: - -- 不删除 `messages` 中的任何记录 -- 将当前会话的 `MAX(seq)` 写入 `sessions.reset_cutoff_seq` -- 将 `sessions.user_turn_count` 重置为 0 -- 将 `sessions.agent_prompt_reinjection_count` 重置为 0 -- 更新 `updated_at` 和 `last_active_at` -- 后续默认恢复和发给模型的历史,只包含这次重置之后新增的消息 - -这适合“开始新对话,但保留完整历史以便审计或未来检索”的场景。 - -由于 AGENT.md 注入消息也会持久化,`/reset` 前的 Agent 设定消息仍会保留在完整历史中,但不会继续出现在新的活动段。下一次活动段首次加载时,系统会重新读取当前版本的 `~/.picobot/agent/AGENT.md`,并把它作为新的首条系统消息写入活动段。 - -### 8.5 删除会话 +### 8.4 删除会话 `delete_session(session_id)`: @@ -351,11 +331,8 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 - `sessions.deleted_at` - 当前查询逻辑兼容软删除 - 当前删除实现仍然是物理删除 -- `sessions.reset_cutoff_seq` - - 当前已用于实现 `/reset` 的非破坏性逻辑重置 - - 只影响默认恢复的活动段,不影响数据库中的全量历史 -这说明当前 schema 已经为“会话摘要”和“软删除”预留了演进空间,但并未完全落地。 +这说明当前 schema 已经为”会话摘要”和”软删除”预留了演进空间,但并未完全落地。 ## 11. 给维护者的快速判断指南 @@ -363,7 +340,6 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据 - 会话查不到:先看 `persistent_session_id` 是否和实际 `channel_name/chat_id` 一致 - 重启后没历史:检查 `ensure_chat_loaded()` 调用链,以及数据库文件路径是否正确 -- `/reset` 后重启又带回旧上下文:检查 `sessions.reset_cutoff_seq` 是否已写入,以及恢复路径是否走了活动段读取而不是全量读取 - 消息顺序不对:检查 `messages.seq` - 工具调用上下文异常:同时检查 `tool_calls_json` 和 `tool_call_id` - 会话列表里看不到记录:检查 `archived_at` 和 `include_archived` 参数 diff --git a/src/command/handlers/save_session.rs b/src/command/handlers/save_session.rs index 6180914..03e3224 100644 --- a/src/command/handlers/save_session.rs +++ b/src/command/handlers/save_session.rs @@ -740,7 +740,6 @@ mod tests { archived_at: None, deleted_at: None, message_count: 0, - reset_cutoff_seq: 0, user_turn_count: 0, agent_prompt_reinjection_count: 0, } diff --git a/src/config/mod.rs b/src/config/mod.rs index d8656fd..2423cc0 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -397,8 +397,6 @@ pub struct GatewayConfig { pub port: u16, #[serde(default)] pub show_tool_results: bool, - #[serde(default, rename = "chat_history_ttl_hours")] - pub chat_history_ttl_hours: Option<u64>, #[serde( default = "default_agent_prompt_reinject_every", rename = "agent_prompt_reinject_every" @@ -714,7 +712,6 @@ impl Default for GatewayConfig { host: default_gateway_host(), port: default_gateway_port(), show_tool_results: false, - chat_history_ttl_hours: Some(4), agent_prompt_reinject_every: default_agent_prompt_reinject_every(), max_concurrent_requests: default_max_concurrent_requests(), session_ttl_hours: Some(24), diff --git a/src/gateway/compaction.rs b/src/gateway/compaction.rs index ffe8e56..3f753c3 100644 --- a/src/gateway/compaction.rs +++ b/src/gateway/compaction.rs @@ -30,7 +30,6 @@ pub(crate) async fn schedule_background_history_compaction( ( session_guard.store(), session_guard.persistent_session_id(&chat_id), - session_record.reset_cutoff_seq, session_record.message_count, history, compressor, @@ -41,7 +40,6 @@ pub(crate) async fn schedule_background_history_compaction( let ( store, session_id, - expected_reset_cutoff_seq, snapshot_end_seq, history, compressor, @@ -61,7 +59,6 @@ pub(crate) async fn schedule_background_history_compaction( match compaction_result { Ok(Some(plan)) => match store.compact_active_history( &session_id, - expected_reset_cutoff_seq, snapshot_end_seq, &plan.preserved_system_messages, &plan.summary_message, diff --git a/src/gateway/execution.rs b/src/gateway/execution.rs index 24e8bcc..ecac20f 100644 --- a/src/gateway/execution.rs +++ b/src/gateway/execution.rs @@ -64,7 +64,6 @@ pub(crate) struct ScheduledExecutionRequest<'a> { pub(crate) prompt: &'a str, pub(crate) sender_id: &'a str, pub(crate) provider_config: LLMProviderConfig, - pub(crate) fresh_session: bool, pub(crate) system_prompt: Option<&'a str>, pub(crate) metadata: &'a HashMap<String, String>, } @@ -272,10 +271,6 @@ impl AgentExecutionService { session_guard.ensure_persistent_session(request.chat_id)?; - if request.fresh_session { - session_guard.reset_chat_context(request.chat_id)?; - } - session_guard.ensure_chat_loaded(request.chat_id)?; session_guard.ensure_agent_prompt_before_user_message(request.chat_id)?; diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index e84624a..0b13adb 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -63,8 +63,6 @@ impl GatewayState { provider_configs.insert(agent_name.clone(), config.get_provider_config(agent_name)?); } - // Chat history TTL from config (default 4 hours) - let chat_history_ttl_hours = config.gateway.chat_history_ttl_hours; let agent_prompt_reinject_every = config.gateway.agent_prompt_reinject_every; let show_tool_results = config.gateway.show_tool_results; @@ -85,7 +83,6 @@ impl GatewayState { std::collections::HashSet::new(), config.tools.task.clone(), config.memory_maintenance.clone(), - chat_history_ttl_hours, session_ttl_hours, )?; diff --git a/src/gateway/runtime.rs b/src/gateway/runtime.rs index cc3f2cf..e2576d1 100644 --- a/src/gateway/runtime.rs +++ b/src/gateway/runtime.rs @@ -35,7 +35,6 @@ pub(crate) fn build_session_manager( disabled_tools: HashSet<String>, task_config: TaskConfig, maintenance_config: MemoryMaintenanceConfig, - chat_history_ttl_hours: Option<u64>, session_ttl_hours: Option<u64>, ) -> Result<(SessionManager, Arc<dyn TaskRepository>), AgentError> { build_session_manager_with_sender( @@ -49,7 +48,6 @@ pub(crate) fn build_session_manager( disabled_tools, task_config, maintenance_config, - chat_history_ttl_hours, session_ttl_hours, ) } @@ -65,7 +63,6 @@ pub(crate) fn build_session_manager_with_sender( disabled_tools: HashSet<String>, task_config: TaskConfig, maintenance_config: MemoryMaintenanceConfig, - chat_history_ttl_hours: Option<u64>, session_ttl_hours: Option<u64>, ) -> Result<(SessionManager, Arc<dyn TaskRepository>), AgentError> { let store = Arc::new( @@ -147,7 +144,6 @@ pub(crate) fn build_session_manager_with_sender( conversations, skill_events, store.clone(), - chat_history_ttl_hours, ); let lifecycle = SessionLifecycleService::new(session_factory, session_ttl_hours); let cli_sessions = CliSessionService::new(store.clone()); diff --git a/src/gateway/scheduled_agent_task_service.rs b/src/gateway/scheduled_agent_task_service.rs index deb63d8..063d447 100644 --- a/src/gateway/scheduled_agent_task_service.rs +++ b/src/gateway/scheduled_agent_task_service.rs @@ -50,7 +50,6 @@ impl ScheduledAgentTaskService { prompt, sender_id: &sender_id, provider_config, - fresh_session: options.fresh_session, system_prompt: options.system_prompt.as_deref(), metadata: &options.metadata, }) diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 517a6e3..cbb027d 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -103,7 +103,6 @@ impl Session { skills: Arc<SkillRuntime>, store: Arc<SessionStore>, agent_prompt_reinject_every: u64, - chat_history_ttl_hours: Option<u64>, ) -> Result<Self, AgentError> { let conversations: Arc<dyn ConversationRepository> = store.clone(); let skill_events: Arc<dyn SkillEventRepository> = store.clone(); @@ -122,7 +121,6 @@ impl Session { agent_factory, conversations, skill_events, - chat_history_ttl_hours, store, ) .await @@ -136,7 +134,6 @@ impl Session { agent_factory: AgentFactory, conversations: Arc<dyn ConversationRepository>, skill_events: Arc<dyn SkillEventRepository>, - chat_history_ttl_hours: Option<u64>, store: Arc<SessionStore>, ) -> Result<Self, AgentError> { Ok(Self { @@ -151,7 +148,6 @@ impl Session { channel_name, conversations, skill_events, - chat_history_ttl_hours, ), store, }) @@ -267,10 +263,6 @@ impl Session { self.history.clear_chat_history(chat_id) } - pub fn reset_chat_context(&mut self, chat_id: &str) -> Result<(), AgentError> { - self.history.reset_chat_context(chat_id) - } - /// 将消息写入内存与持久化层(使用当前 topic) pub fn append_persisted_message( &mut self, @@ -502,7 +494,6 @@ impl SessionManager { disabled_tools: std::collections::HashSet<String>, task_config: crate::config::TaskConfig, maintenance_config: crate::config::MemoryMaintenanceConfig, - chat_history_ttl_hours: Option<u64>, session_ttl_hours: Option<u64>, ) -> Result<Self, AgentError> { super::runtime::build_session_manager( @@ -515,7 +506,6 @@ impl SessionManager { disabled_tools, task_config, maintenance_config, - chat_history_ttl_hours, session_ttl_hours, ) .map(|(session_manager, _)| session_manager) @@ -731,7 +721,6 @@ mod tests { skills, store, 100, - Some(4), ) .await .unwrap(); @@ -779,7 +768,6 @@ mod tests { skills, store.clone(), 100, - Some(4), ) .await .unwrap(); @@ -813,7 +801,6 @@ mod tests { store .compact_active_history( &session_id, - 0, snapshot_end_seq, &[], &ChatMessage::system("[Compressed History]\n\nsummary"), @@ -976,7 +963,6 @@ mod tests { HashSet::new(), crate::config::TaskConfig::default(), crate::config::MemoryMaintenanceConfig::default(), - Some(4), Some(24), ) .unwrap(); @@ -1029,7 +1015,6 @@ mod tests { HashSet::new(), crate::config::TaskConfig::default(), crate::config::MemoryMaintenanceConfig::default(), - Some(4), Some(24), ) .unwrap(); @@ -1041,7 +1026,6 @@ mod tests { "请规划今天工作", ScheduledAgentTaskOptions { agent: Some("planner".to_string()), - fresh_session: true, ..Default::default() }, ) @@ -1057,7 +1041,6 @@ mod tests { "请规划今天工作", ScheduledAgentTaskOptions { agent: Some("default".to_string()), - fresh_session: true, ..Default::default() }, ) @@ -1098,7 +1081,6 @@ mod tests { HashSet::new(), crate::config::TaskConfig::default(), crate::config::MemoryMaintenanceConfig::default(), - Some(4), Some(24), ) .unwrap(); @@ -1109,7 +1091,6 @@ mod tests { "chat-guard", "每小时执行以下流程:检查邮箱并同步待办", ScheduledAgentTaskOptions { - fresh_session: true, system_prompt: Some("你是邮箱待办同步助手。".to_string()), ..Default::default() }, @@ -1184,7 +1165,6 @@ mod tests { HashSet::new(), crate::config::TaskConfig::default(), crate::config::MemoryMaintenanceConfig::default(), - Some(4), Some(24), ) .unwrap(); @@ -1271,7 +1251,6 @@ mod tests { HashSet::new(), crate::config::TaskConfig::default(), crate::config::MemoryMaintenanceConfig::default(), - Some(4), Some(24), ) .unwrap(); @@ -1357,7 +1336,6 @@ mod tests { HashSet::new(), crate::config::TaskConfig::default(), crate::config::MemoryMaintenanceConfig::default(), - Some(4), Some(24), ) .unwrap(); @@ -1425,7 +1403,6 @@ mod tests { HashSet::new(), crate::config::TaskConfig::default(), crate::config::MemoryMaintenanceConfig::default(), - Some(4), Some(24), ) .unwrap(); @@ -1502,7 +1479,6 @@ mod tests { HashSet::new(), crate::config::TaskConfig::default(), crate::config::MemoryMaintenanceConfig::default(), - Some(4), Some(24), ) .unwrap(); @@ -1566,7 +1542,6 @@ mod tests { HashSet::new(), crate::config::TaskConfig::default(), crate::config::MemoryMaintenanceConfig::default(), - Some(4), Some(24), ) .unwrap(); @@ -1773,7 +1748,6 @@ mod tests { skills, store.clone(), 100, - Some(4), ) .await .unwrap(); @@ -1813,7 +1787,6 @@ mod tests { skills, store.clone(), 100, - Some(4), ) .await .unwrap(); @@ -1886,7 +1859,6 @@ mod tests { skills, store.clone(), 0, - Some(4), ) .await .unwrap(); diff --git a/src/gateway/session_factory.rs b/src/gateway/session_factory.rs index c03eeb5..0993c4e 100644 --- a/src/gateway/session_factory.rs +++ b/src/gateway/session_factory.rs @@ -19,7 +19,6 @@ pub(crate) struct SessionFactory { conversations: Arc<dyn ConversationRepository>, skill_events: Arc<dyn SkillEventRepository>, store: Arc<SessionStore>, - chat_history_ttl_hours: Option<u64>, } impl SessionFactory { @@ -30,7 +29,6 @@ impl SessionFactory { conversations: Arc<dyn ConversationRepository>, skill_events: Arc<dyn SkillEventRepository>, store: Arc<SessionStore>, - chat_history_ttl_hours: Option<u64>, ) -> Self { Self { provider_config, @@ -39,7 +37,6 @@ impl SessionFactory { conversations, skill_events, store, - chat_history_ttl_hours, } } @@ -56,7 +53,6 @@ impl SessionFactory { self.agent_factory.clone(), self.conversations.clone(), self.skill_events.clone(), - self.chat_history_ttl_hours, self.store.clone(), ) .await diff --git a/src/gateway/session_history.rs b/src/gateway/session_history.rs index 2a8b61e..f6c4dc5 100644 --- a/src/gateway/session_history.rs +++ b/src/gateway/session_history.rs @@ -15,13 +15,6 @@ fn preview_text(content: &str, max_chars: usize) -> String { preview.replace('\n', "\\n") } -fn current_timestamp() -> i64 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs() as i64 -} - pub(crate) struct SessionHistory { channel_name: String, chat_histories: HashMap<String, Vec<ChatMessage>>, @@ -30,7 +23,6 @@ pub(crate) struct SessionHistory { compression_in_flight: HashSet<String>, conversations: Arc<dyn ConversationRepository>, skill_events: Arc<dyn SkillEventRepository>, - chat_history_ttl_hours: Option<u64>, } impl SessionHistory { @@ -38,7 +30,6 @@ impl SessionHistory { channel_name: impl Into<String>, conversations: Arc<dyn ConversationRepository>, skill_events: Arc<dyn SkillEventRepository>, - chat_history_ttl_hours: Option<u64>, ) -> Self { Self { channel_name: channel_name.into(), @@ -48,7 +39,6 @@ impl SessionHistory { compression_in_flight: HashSet::new(), conversations, skill_events, - chat_history_ttl_hours, } } @@ -70,29 +60,6 @@ impl SessionHistory { chat_id: &str, topic_id: Option<&str>, ) -> Result<(), AgentError> { - // 获取 session 记录(用于检查最后活跃时间) - let session_record = self.ensure_persistent_session(chat_id)?; - - // 检查是否超时 - if let Some(ttl_hours) = self.chat_history_ttl_hours { - if ttl_hours > 0 { - let now = current_timestamp(); - let elapsed_hours = (now - session_record.last_active_at) / 3600; - if elapsed_hours >= ttl_hours as i64 { - tracing::info!( - channel = %self.channel_name, - chat_id = %chat_id, - elapsed_hours = elapsed_hours, - ttl_hours = ttl_hours, - "Chat history expired, resetting context" - ); - // 重置会话上下文(清空内存历史,但保留数据库记录) - self.reset_chat_context(chat_id)?; - } - } - } - - // 原有逻辑 if self.chat_histories.contains_key(chat_id) { return Ok(()); } @@ -178,19 +145,6 @@ impl SessionHistory { .map_err(|err| AgentError::Other(format!("clear history persistence error: {}", err))) } - pub(crate) fn reset_chat_context(&mut self, chat_id: &str) -> Result<(), AgentError> { - if let Some(history) = self.chat_histories.get_mut(chat_id) { - let len = history.len(); - history.clear(); - #[cfg(debug_assertions)] - tracing::debug!(chat_id = %chat_id, previous_len = len, "Chat history reset in memory"); - } - - self.conversations - .reset_session(&self.persistent_session_id(chat_id)) - .map_err(|err| AgentError::Other(format!("reset history persistence error: {}", err))) - } - pub(crate) fn append_persisted_message( &mut self, chat_id: &str, diff --git a/src/scheduler/mod.rs b/src/scheduler/mod.rs index 12675bf..5593bef 100644 --- a/src/scheduler/mod.rs +++ b/src/scheduler/mod.rs @@ -20,7 +20,6 @@ use crate::storage::{ #[derive(Debug, Clone, Default)] pub struct ScheduledAgentTaskOptions { pub sender_id: Option<String>, - pub fresh_session: bool, pub system_prompt: Option<String>, pub metadata: HashMap<String, String>, pub agent: Option<String>, @@ -1019,11 +1018,6 @@ fn parse_scheduled_agent_task_options( .get("sender_id") .and_then(|value| value.as_str()) .map(ToString::to_string); - let fresh_session = job - .payload - .get("fresh_session") - .and_then(|value| value.as_bool()) - .unwrap_or(false); let system_prompt = job .payload .get("system_prompt") @@ -1038,7 +1032,6 @@ fn parse_scheduled_agent_task_options( Ok(ScheduledAgentTaskOptions { sender_id, - fresh_session, system_prompt, metadata, agent, @@ -1219,7 +1212,6 @@ mod agent_task_tests { let options = parse_scheduled_agent_task_options(&job).unwrap(); assert_eq!(options.agent.as_deref(), Some("planner")); assert_eq!(options.sender_id.as_deref(), Some("scheduler-bot")); - assert!(options.fresh_session); assert_eq!(options.system_prompt.as_deref(), Some("你是日报助手")); assert_eq!( options.metadata.get("job_type").map(String::as_str), diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 2474a17..b22a0b2 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -66,7 +66,6 @@ impl SessionStore { archived_at INTEGER, deleted_at INTEGER, message_count INTEGER NOT NULL DEFAULT 0, - reset_cutoff_seq INTEGER NOT NULL DEFAULT 0, user_turn_count INTEGER NOT NULL DEFAULT 0, agent_prompt_reinjection_count INTEGER NOT NULL DEFAULT 0 ); @@ -248,8 +247,8 @@ impl SessionStore { INSERT INTO sessions ( id, title, channel_name, chat_id, summary, created_at, updated_at, last_active_at, archived_at, deleted_at, message_count, - reset_cutoff_seq, user_turn_count, agent_prompt_reinjection_count - ) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0, 0) + user_turn_count, agent_prompt_reinjection_count + ) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0) ", params![&session_id, title, channel_name, id, now], )?; @@ -291,8 +290,8 @@ impl SessionStore { INSERT INTO sessions ( id, title, channel_name, chat_id, summary, created_at, updated_at, last_active_at, archived_at, deleted_at, message_count, - reset_cutoff_seq, user_turn_count, agent_prompt_reinjection_count - ) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0, 0) + user_turn_count, agent_prompt_reinjection_count + ) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0) ", params![session_id, title, channel_name, chat_id, now], )?; @@ -308,7 +307,7 @@ impl SessionStore { " SELECT id, title, channel_name, chat_id, summary, created_at, updated_at, last_active_at, - archived_at, deleted_at, message_count, reset_cutoff_seq, + archived_at, deleted_at, message_count, user_turn_count, agent_prompt_reinjection_count FROM sessions WHERE id = ?1 AND deleted_at IS NULL @@ -330,7 +329,7 @@ impl SessionStore { " SELECT id, title, channel_name, chat_id, summary, created_at, updated_at, last_active_at, - archived_at, deleted_at, message_count, reset_cutoff_seq, + archived_at, deleted_at, message_count, user_turn_count, agent_prompt_reinjection_count FROM sessions WHERE channel_name = ?1 @@ -493,7 +492,6 @@ impl SessionStore { SET message_count = 0, updated_at = ?2, last_active_at = ?2, - reset_cutoff_seq = 0, user_turn_count = 0, agent_prompt_reinjection_count = 0 WHERE id = ?1 AND deleted_at IS NULL @@ -503,35 +501,6 @@ impl SessionStore { Ok(()) } - pub fn reset_session(&self, session_id: &str) -> Result<(), StorageError> { - let now = current_timestamp(); - let conn = self.conn.lock().expect("session db mutex poisoned"); - let tx = conn.unchecked_transaction()?; - - let cutoff_seq: i64 = tx.query_row( - "SELECT COALESCE(MAX(seq), 0) FROM messages WHERE session_id = ?1", - params![session_id], - |row| row.get(0), - )?; - - tx.execute( - " - UPDATE sessions - SET reset_cutoff_seq = ?2, - updated_at = ?3, - last_active_at = ?3, - archived_at = NULL, - user_turn_count = 0, - agent_prompt_reinjection_count = 0 - WHERE id = ?1 AND deleted_at IS NULL - ", - params![session_id, cutoff_seq, now], - )?; - - tx.commit()?; - Ok(()) - } - pub fn append_message( &self, session_id: &str, @@ -607,7 +576,6 @@ impl SessionStore { pub fn compact_active_history( &self, session_id: &str, - expected_reset_cutoff_seq: i64, snapshot_end_seq: i64, preserved_system_messages: &[ChatMessage], summary_message: &ChatMessage, @@ -616,18 +584,13 @@ impl SessionStore { let conn = self.conn.lock().expect("session db mutex poisoned"); let tx = conn.unchecked_transaction()?; - let current_cutoff = active_reset_cutoff(&tx, session_id)?; - if current_cutoff != expected_reset_cutoff_seq { - return Ok(false); - } - let current_max_seq: i64 = tx.query_row( "SELECT COALESCE(MAX(seq), 0) FROM messages WHERE session_id = ?1", params![session_id], |row| row.get(0), )?; - if snapshot_end_seq <= current_cutoff || snapshot_end_seq > current_max_seq { + if snapshot_end_seq > current_max_seq { return Ok(false); } @@ -660,20 +623,24 @@ impl SessionStore { inserted_count += 1; } + // Delete all old messages (including delta messages that were just re-inserted) + tx.execute( + "DELETE FROM messages WHERE session_id = ?1 AND seq <= ?2", + params![session_id, current_max_seq], + )?; + tx.execute( " UPDATE sessions - SET reset_cutoff_seq = ?2, - message_count = message_count + ?3, - user_turn_count = ?4, - updated_at = ?5, - last_active_at = ?5, + SET message_count = ?2, + user_turn_count = ?3, + updated_at = ?4, + last_active_at = ?4, archived_at = NULL WHERE id = ?1 AND deleted_at IS NULL ", params![ session_id, - current_max_seq, inserted_count, active_user_turn_count, now, @@ -1309,8 +1276,7 @@ impl SessionStore { pub fn load_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> { let conn = self.conn.lock().expect("session db mutex poisoned"); - let cutoff_seq = active_reset_cutoff(&conn, session_id)?; - load_messages_after(&conn, session_id, cutoff_seq) + load_messages_after(&conn, session_id, 0) } pub fn load_messages_for_topic(&self, topic_id: &str) -> Result<Vec<ChatMessage>, StorageError> { @@ -1381,14 +1347,13 @@ impl SessionStore { pub fn count_active_user_messages(&self, session_id: &str) -> Result<i64, StorageError> { let conn = self.conn.lock().expect("session db mutex poisoned"); - let cutoff_seq = active_reset_cutoff(&conn, session_id)?; conn.query_row( " SELECT COUNT(*) FROM messages - WHERE session_id = ?1 AND seq > ?2 AND role = 'user' + WHERE session_id = ?1 AND role = 'user' ", - params![session_id, cutoff_seq], + params![session_id], |row| row.get(0), ) .map_err(StorageError::from) @@ -1422,9 +1387,8 @@ fn map_session_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord archived_at: row.get(8)?, deleted_at: row.get(9)?, message_count: row.get(10)?, - reset_cutoff_seq: row.get(11)?, - user_turn_count: row.get(12)?, - agent_prompt_reinjection_count: row.get(13)?, + user_turn_count: row.get(11)?, + agent_prompt_reinjection_count: row.get(12)?, }) } @@ -1510,13 +1474,6 @@ fn map_scheduler_job_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<Schedul } fn ensure_sessions_schema(conn: &Connection) -> Result<(), StorageError> { - if !has_column(conn, "sessions", "reset_cutoff_seq")? { - add_column_if_missing( - conn, - "ALTER TABLE sessions ADD COLUMN reset_cutoff_seq INTEGER NOT NULL DEFAULT 0", - )?; - } - if !has_column(conn, "sessions", "user_turn_count")? { add_column_if_missing( conn, @@ -1643,18 +1600,6 @@ fn add_column_if_missing(conn: &Connection, sql: &str) -> Result<(), StorageErro } } -fn active_reset_cutoff(conn: &Connection, session_id: &str) -> Result<i64, StorageError> { - let cutoff = conn - .query_row( - "SELECT reset_cutoff_seq FROM sessions WHERE id = ?1 AND deleted_at IS NULL", - params![session_id], - |row| row.get(0), - ) - .optional()?; - - Ok(cutoff.unwrap_or(0)) -} - fn insert_message_with_seq( conn: &rusqlite::Transaction<'_>, session_id: &str, @@ -1875,7 +1820,6 @@ mod tests { assert_eq!(session.channel_name, "cli"); assert_eq!(session.chat_id, session.id); assert_eq!(session.message_count, 0); - assert_eq!(session.reset_cutoff_seq, 0); assert_eq!(session.user_turn_count, 0); assert_eq!(session.agent_prompt_reinjection_count, 0); @@ -1887,7 +1831,6 @@ mod tests { let stored = store.get_session(&session.id).unwrap().unwrap(); assert_eq!(stored.message_count, 2); assert!(stored.archived_at.is_none()); - assert_eq!(stored.reset_cutoff_seq, 0); assert_eq!(stored.user_turn_count, 1); assert_eq!(stored.agent_prompt_reinjection_count, 0); @@ -1982,44 +1925,7 @@ mod tests { } #[test] - fn test_reset_session_preserves_full_history_and_hides_active_history() { - let store = SessionStore::in_memory().unwrap(); - let session = store.create_cli_session(Some("reset")).unwrap(); - - store - .append_message(&session.id, &ChatMessage::user("before")) - .unwrap(); - store - .append_message(&session.id, &ChatMessage::assistant("context")) - .unwrap(); - store.reset_session(&session.id).unwrap(); - - let stored = store.get_session(&session.id).unwrap().unwrap(); - assert_eq!(stored.reset_cutoff_seq, 2); - assert_eq!(stored.user_turn_count, 0); - assert_eq!(stored.agent_prompt_reinjection_count, 0); - - let active_messages = store.load_messages(&session.id).unwrap(); - assert!(active_messages.is_empty()); - - let all_messages = store.load_all_messages(&session.id).unwrap(); - assert_eq!(all_messages.len(), 2); - assert_eq!(all_messages[0].content, "before"); - assert_eq!(all_messages[1].content, "context"); - - store - .append_message(&session.id, &ChatMessage::user("after")) - .unwrap(); - let active_messages = store.load_messages(&session.id).unwrap(); - assert_eq!(active_messages.len(), 1); - assert_eq!(active_messages[0].content, "after"); - - let stored = store.get_session(&session.id).unwrap().unwrap(); - assert_eq!(stored.user_turn_count, 1); - } - - #[test] - fn test_schema_migration_adds_reset_cutoff_column() { + fn test_schema_migration_adds_user_turn_and_reinjection_columns() { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch( " @@ -2057,7 +1963,6 @@ mod tests { let store = SessionStore::from_connection(conn).unwrap(); let session = store.create_cli_session(Some("migrated")).unwrap(); - assert_eq!(session.reset_cutoff_seq, 0); assert_eq!(session.user_turn_count, 0); assert_eq!(session.agent_prompt_reinjection_count, 0); } @@ -2105,42 +2010,6 @@ mod tests { assert!(has_column(&conn, "messages", "reasoning_content").unwrap()); } - #[test] - fn test_count_active_user_messages_respects_reset_cutoff_seq() { - let store = SessionStore::in_memory().unwrap(); - let session = store.create_cli_session(Some("count-users")).unwrap(); - - store - .append_message(&session.id, &ChatMessage::system("agent")) - .unwrap(); - store - .append_message(&session.id, &ChatMessage::user("u1")) - .unwrap(); - store - .append_message(&session.id, &ChatMessage::assistant("a1")) - .unwrap(); - store - .append_message(&session.id, &ChatMessage::user("u2")) - .unwrap(); - - assert_eq!(store.count_active_user_messages(&session.id).unwrap(), 2); - - store.reset_session(&session.id).unwrap(); - assert_eq!(store.count_active_user_messages(&session.id).unwrap(), 0); - - store - .append_message(&session.id, &ChatMessage::system("agent-again")) - .unwrap(); - store - .append_message(&session.id, &ChatMessage::user("u3")) - .unwrap(); - store - .append_message(&session.id, &ChatMessage::user("u4")) - .unwrap(); - - assert_eq!(store.count_active_user_messages(&session.id).unwrap(), 2); - } - #[test] fn test_compact_active_history_rebuilds_active_segment_with_delta_messages() { let store = SessionStore::in_memory().unwrap(); @@ -2185,7 +2054,6 @@ mod tests { let compacted = store .compact_active_history( &session.id, - 0, snapshot_end_seq, &preserved_system_messages, &summary_message, @@ -2214,11 +2082,10 @@ mod tests { assert_eq!(active_messages[9].content, "a5"); let stored = store.get_session(&session.id).unwrap().unwrap(); - assert_eq!(stored.reset_cutoff_seq, 11); assert_eq!(stored.user_turn_count, 4); let all_messages = store.load_all_messages(&session.id).unwrap(); - assert_eq!(all_messages.len(), 21); + assert_eq!(all_messages.len(), 10); } #[test] diff --git a/src/storage/ports.rs b/src/storage/ports.rs index d27cb2c..acced9a 100644 --- a/src/storage/ports.rs +++ b/src/storage/ports.rs @@ -35,12 +35,9 @@ pub trait ConversationRepository: Send + Sync + 'static { fn clear_messages(&self, session_id: &str) -> Result<(), StorageError>; - fn reset_session(&self, session_id: &str) -> Result<(), StorageError>; - fn compact_active_history( &self, session_id: &str, - expected_reset_cutoff_seq: i64, snapshot_end_seq: i64, preserved_system_messages: &[ChatMessage], summary_message: &ChatMessage, @@ -185,14 +182,9 @@ impl ConversationRepository for super::SessionStore { super::SessionStore::clear_messages(self, session_id) } - fn reset_session(&self, session_id: &str) -> Result<(), StorageError> { - super::SessionStore::reset_session(self, session_id) - } - fn compact_active_history( &self, session_id: &str, - expected_reset_cutoff_seq: i64, snapshot_end_seq: i64, preserved_system_messages: &[ChatMessage], summary_message: &ChatMessage, @@ -201,7 +193,6 @@ impl ConversationRepository for super::SessionStore { super::SessionStore::compact_active_history( self, session_id, - expected_reset_cutoff_seq, snapshot_end_seq, preserved_system_messages, summary_message, diff --git a/src/storage/records.rs b/src/storage/records.rs index 9fa852e..dce7f4c 100644 --- a/src/storage/records.rs +++ b/src/storage/records.rs @@ -23,7 +23,6 @@ pub struct SessionRecord { pub archived_at: Option<i64>, pub deleted_at: Option<i64>, pub message_count: i64, - pub reset_cutoff_seq: i64, pub user_turn_count: i64, pub agent_prompt_reinjection_count: i64, }