refactor: 移除会话重置逻辑中的 reset_cutoff_seq 字段,优化会话管理和历史压缩逻辑

This commit is contained in:
ooodc 2026-05-23 18:38:34 +08:00
parent b4ef56803f
commit b6f2de053d
16 changed files with 34 additions and 312 deletions

View File

@ -249,13 +249,9 @@ PicoBot 会在 ~/.picobot/agent/AGENT.md 维护一份持久化 Agent 画像文
用户请求、关键标识符、文件路径、URL、工具调用、命令、结果、错误、决策、偏好和当前任务状态。
如果摘要调用失败,会退化为直接截断 transcript而不会中断主流程。
9. 摘要结果会被包装成一条新的 system 消息,并打上 SYSTEM_CONTEXT_HISTORY_COMPACTION 标记,内容前缀为 [Compressed History]。
10. 后台提交阶段不会直接修改旧消息,而是向消息表尾部追加一段“新的活动段”:
依次写入保留的关键 system 消息、压缩摘要消息、最近保留的消息,以及在压缩快照之后新产生的 delta 消息。
11. 提交成功后sessions.reset_cutoff_seq 会被推进到压缩前的最大 seq。
这样旧消息仍然留在数据库里用于审计或全量导出,但默认恢复到运行时上下文时,只会加载新的活动段。
12. 为避免并发覆盖,压缩提交前会检查快照是否过期:
如果 reset_cutoff_seq 已变化,或者压缩期间又有更新导致快照不再匹配,本次压缩会跳过,不会覆盖较新的上下文。
13. 压缩提交成功后Session 会重新加载当前 chat 的活动历史,后续轮次看到的就是“关键 system 消息 + 压缩摘要 + 最近若干完整 turn”的新上下文。
10. 后台提交阶段会删除旧消息,并追加新的活动段:
依次写入保留的关键 system 消息、压缩摘要消息、最近保留的消息。
11. 压缩提交成功后Session 会重新加载当前 chat 的活动历史,后续轮次看到的就是"关键 system 消息 + 压缩摘要 + 最近若干完整 turn"的新上下文。
这套机制的目标不是简单删历史,而是把“远端历史变成可恢复摘要”,同时保证:
@ -695,7 +691,6 @@ cargo run -- agent
CLI 中已实现的交互命令包括:
- /new [title] - 创建新会话
- /reset - 重置当前会话上下文
- /sessions - 列出当前通道的所有会话(支持跨通道隔离)
- /use <session> - 切换到指定会话
- /rename <title> - 重命名当前会话

View File

@ -90,9 +90,8 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
| `archived_at` | `INTEGER` | 归档时间 | 非空表示会话已归档 |
| `deleted_at` | `INTEGER` | 删除时间 | 预留字段,当前读取逻辑会过滤该字段,但当前删除实现是物理删除 |
| `message_count` | `INTEGER NOT NULL DEFAULT 0` | 消息数 | 追加消息时自增,清空历史时重置 |
| `reset_cutoff_seq` | `INTEGER NOT NULL DEFAULT 0` | 逻辑重置切点 | `/reset` 后默认只恢复 `seq > reset_cutoff_seq` 的活动段 |
| `user_turn_count` | `INTEGER NOT NULL DEFAULT 0` | 当前活动段用户轮次数 | 只在追加 `role = user` 消息时递增,清空历史和 `/reset` 时归零 |
| `agent_prompt_reinjection_count` | `INTEGER NOT NULL DEFAULT 0` | AGENT.md 周期重注入次数 | 每完成一次“达到配置阈值后的下一轮前注入”就递增,清空历史和 `/reset` 时归零 |
| `user_turn_count` | `INTEGER NOT NULL DEFAULT 0` | 当前活动段用户轮次数 | 只在追加 `role = user` 消息时递增,清空历史时归零 |
| `agent_prompt_reinjection_count` | `INTEGER NOT NULL DEFAULT 0` | AGENT.md 周期重注入次数 | 每完成一次”达到配置阈值后的下一轮前注入”就递增,清空历史时归零 |
索引:
@ -224,12 +223,9 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
### 7.3 读取历史
`load_messages(session_id)` 会按 `seq ASC` 读取当前活动段历史,并把 JSON 字段反序列化回 `ChatMessage`活动段的定义是:
`load_messages(session_id)` 会按 `seq ASC` 读取当前活动段历史,并把 JSON 字段反序列化回 `ChatMessage`
- 只返回 `seq > sessions.reset_cutoff_seq` 的消息
- 因此 `/reset` 之后,旧消息仍然保留在数据库中,但不会默认回灌到运行时上下文
如果需要审计、导出或查看完整历史,应使用全量读取接口 `load_all_messages(session_id)`
如果需要审计、导出或查看历史,可使用全量读取接口 `load_all_messages(session_id)`(当前与 load_messages 相同)。
因此运行态恢复的是“当前活动段的逻辑顺序”,而不是简单按创建时间排序。只要 `seq` 连续,重放顺序就稳定。
@ -286,30 +282,14 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
- 删除该会话在 `messages` 中的所有记录
- 将 `sessions.message_count` 重置为 0
- 将 `sessions.reset_cutoff_seq` 重置为 0
- 将 `sessions.user_turn_count` 重置为 0
- 将 `sessions.agent_prompt_reinjection_count` 重置为 0
- 更新 `updated_at``last_active_at`
- 保留会话本身
这适合保留会话入口,但丢弃聊天内容”的场景。
这适合保留会话入口,但丢弃聊天内容”的场景。
### 8.4 逻辑重置
`reset_session(session_id)`
- 不删除 `messages` 中的任何记录
- 将当前会话的 `MAX(seq)` 写入 `sessions.reset_cutoff_seq`
- 将 `sessions.user_turn_count` 重置为 0
- 将 `sessions.agent_prompt_reinjection_count` 重置为 0
- 更新 `updated_at``last_active_at`
- 后续默认恢复和发给模型的历史,只包含这次重置之后新增的消息
这适合“开始新对话,但保留完整历史以便审计或未来检索”的场景。
由于 AGENT.md 注入消息也会持久化,`/reset` 前的 Agent 设定消息仍会保留在完整历史中,但不会继续出现在新的活动段。下一次活动段首次加载时,系统会重新读取当前版本的 `~/.picobot/agent/AGENT.md`,并把它作为新的首条系统消息写入活动段。
### 8.5 删除会话
### 8.4 删除会话
`delete_session(session_id)`
@ -351,11 +331,8 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
- `sessions.deleted_at`
- 当前查询逻辑兼容软删除
- 当前删除实现仍然是物理删除
- `sessions.reset_cutoff_seq`
- 当前已用于实现 `/reset` 的非破坏性逻辑重置
- 只影响默认恢复的活动段,不影响数据库中的全量历史
这说明当前 schema 已经为“会话摘要”和“软删除”预留了演进空间,但并未完全落地。
这说明当前 schema 已经为”会话摘要”和”软删除”预留了演进空间,但并未完全落地。
## 11. 给维护者的快速判断指南
@ -363,7 +340,6 @@ PicoBot 使用 SQLite 持久化会话和消息历史,当前只有一份数据
- 会话查不到:先看 `persistent_session_id` 是否和实际 `channel_name/chat_id` 一致
- 重启后没历史:检查 `ensure_chat_loaded()` 调用链,以及数据库文件路径是否正确
- `/reset` 后重启又带回旧上下文:检查 `sessions.reset_cutoff_seq` 是否已写入,以及恢复路径是否走了活动段读取而不是全量读取
- 消息顺序不对:检查 `messages.seq`
- 工具调用上下文异常:同时检查 `tool_calls_json``tool_call_id`
- 会话列表里看不到记录:检查 `archived_at``include_archived` 参数

View File

@ -740,7 +740,6 @@ mod tests {
archived_at: None,
deleted_at: None,
message_count: 0,
reset_cutoff_seq: 0,
user_turn_count: 0,
agent_prompt_reinjection_count: 0,
}

View File

@ -397,8 +397,6 @@ pub struct GatewayConfig {
pub port: u16,
#[serde(default)]
pub show_tool_results: bool,
#[serde(default, rename = "chat_history_ttl_hours")]
pub chat_history_ttl_hours: Option<u64>,
#[serde(
default = "default_agent_prompt_reinject_every",
rename = "agent_prompt_reinject_every"
@ -714,7 +712,6 @@ impl Default for GatewayConfig {
host: default_gateway_host(),
port: default_gateway_port(),
show_tool_results: false,
chat_history_ttl_hours: Some(4),
agent_prompt_reinject_every: default_agent_prompt_reinject_every(),
max_concurrent_requests: default_max_concurrent_requests(),
session_ttl_hours: Some(24),

View File

@ -30,7 +30,6 @@ pub(crate) async fn schedule_background_history_compaction(
(
session_guard.store(),
session_guard.persistent_session_id(&chat_id),
session_record.reset_cutoff_seq,
session_record.message_count,
history,
compressor,
@ -41,7 +40,6 @@ pub(crate) async fn schedule_background_history_compaction(
let (
store,
session_id,
expected_reset_cutoff_seq,
snapshot_end_seq,
history,
compressor,
@ -61,7 +59,6 @@ pub(crate) async fn schedule_background_history_compaction(
match compaction_result {
Ok(Some(plan)) => match store.compact_active_history(
&session_id,
expected_reset_cutoff_seq,
snapshot_end_seq,
&plan.preserved_system_messages,
&plan.summary_message,

View File

@ -64,7 +64,6 @@ pub(crate) struct ScheduledExecutionRequest<'a> {
pub(crate) prompt: &'a str,
pub(crate) sender_id: &'a str,
pub(crate) provider_config: LLMProviderConfig,
pub(crate) fresh_session: bool,
pub(crate) system_prompt: Option<&'a str>,
pub(crate) metadata: &'a HashMap<String, String>,
}
@ -272,10 +271,6 @@ impl AgentExecutionService {
session_guard.ensure_persistent_session(request.chat_id)?;
if request.fresh_session {
session_guard.reset_chat_context(request.chat_id)?;
}
session_guard.ensure_chat_loaded(request.chat_id)?;
session_guard.ensure_agent_prompt_before_user_message(request.chat_id)?;

View File

@ -63,8 +63,6 @@ impl GatewayState {
provider_configs.insert(agent_name.clone(), config.get_provider_config(agent_name)?);
}
// Chat history TTL from config (default 4 hours)
let chat_history_ttl_hours = config.gateway.chat_history_ttl_hours;
let agent_prompt_reinject_every = config.gateway.agent_prompt_reinject_every;
let show_tool_results = config.gateway.show_tool_results;
@ -85,7 +83,6 @@ impl GatewayState {
std::collections::HashSet::new(),
config.tools.task.clone(),
config.memory_maintenance.clone(),
chat_history_ttl_hours,
session_ttl_hours,
)?;

View File

@ -35,7 +35,6 @@ pub(crate) fn build_session_manager(
disabled_tools: HashSet<String>,
task_config: TaskConfig,
maintenance_config: MemoryMaintenanceConfig,
chat_history_ttl_hours: Option<u64>,
session_ttl_hours: Option<u64>,
) -> Result<(SessionManager, Arc<dyn TaskRepository>), AgentError> {
build_session_manager_with_sender(
@ -49,7 +48,6 @@ pub(crate) fn build_session_manager(
disabled_tools,
task_config,
maintenance_config,
chat_history_ttl_hours,
session_ttl_hours,
)
}
@ -65,7 +63,6 @@ pub(crate) fn build_session_manager_with_sender(
disabled_tools: HashSet<String>,
task_config: TaskConfig,
maintenance_config: MemoryMaintenanceConfig,
chat_history_ttl_hours: Option<u64>,
session_ttl_hours: Option<u64>,
) -> Result<(SessionManager, Arc<dyn TaskRepository>), AgentError> {
let store = Arc::new(
@ -147,7 +144,6 @@ pub(crate) fn build_session_manager_with_sender(
conversations,
skill_events,
store.clone(),
chat_history_ttl_hours,
);
let lifecycle = SessionLifecycleService::new(session_factory, session_ttl_hours);
let cli_sessions = CliSessionService::new(store.clone());

View File

@ -50,7 +50,6 @@ impl ScheduledAgentTaskService {
prompt,
sender_id: &sender_id,
provider_config,
fresh_session: options.fresh_session,
system_prompt: options.system_prompt.as_deref(),
metadata: &options.metadata,
})

View File

@ -103,7 +103,6 @@ impl Session {
skills: Arc<SkillRuntime>,
store: Arc<SessionStore>,
agent_prompt_reinject_every: u64,
chat_history_ttl_hours: Option<u64>,
) -> Result<Self, AgentError> {
let conversations: Arc<dyn ConversationRepository> = store.clone();
let skill_events: Arc<dyn SkillEventRepository> = store.clone();
@ -122,7 +121,6 @@ impl Session {
agent_factory,
conversations,
skill_events,
chat_history_ttl_hours,
store,
)
.await
@ -136,7 +134,6 @@ impl Session {
agent_factory: AgentFactory,
conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>,
chat_history_ttl_hours: Option<u64>,
store: Arc<SessionStore>,
) -> Result<Self, AgentError> {
Ok(Self {
@ -151,7 +148,6 @@ impl Session {
channel_name,
conversations,
skill_events,
chat_history_ttl_hours,
),
store,
})
@ -267,10 +263,6 @@ impl Session {
self.history.clear_chat_history(chat_id)
}
pub fn reset_chat_context(&mut self, chat_id: &str) -> Result<(), AgentError> {
self.history.reset_chat_context(chat_id)
}
/// 将消息写入内存与持久化层(使用当前 topic
pub fn append_persisted_message(
&mut self,
@ -502,7 +494,6 @@ impl SessionManager {
disabled_tools: std::collections::HashSet<String>,
task_config: crate::config::TaskConfig,
maintenance_config: crate::config::MemoryMaintenanceConfig,
chat_history_ttl_hours: Option<u64>,
session_ttl_hours: Option<u64>,
) -> Result<Self, AgentError> {
super::runtime::build_session_manager(
@ -515,7 +506,6 @@ impl SessionManager {
disabled_tools,
task_config,
maintenance_config,
chat_history_ttl_hours,
session_ttl_hours,
)
.map(|(session_manager, _)| session_manager)
@ -731,7 +721,6 @@ mod tests {
skills,
store,
100,
Some(4),
)
.await
.unwrap();
@ -779,7 +768,6 @@ mod tests {
skills,
store.clone(),
100,
Some(4),
)
.await
.unwrap();
@ -813,7 +801,6 @@ mod tests {
store
.compact_active_history(
&session_id,
0,
snapshot_end_seq,
&[],
&ChatMessage::system("[Compressed History]\n\nsummary"),
@ -976,7 +963,6 @@ mod tests {
HashSet::new(),
crate::config::TaskConfig::default(),
crate::config::MemoryMaintenanceConfig::default(),
Some(4),
Some(24),
)
.unwrap();
@ -1029,7 +1015,6 @@ mod tests {
HashSet::new(),
crate::config::TaskConfig::default(),
crate::config::MemoryMaintenanceConfig::default(),
Some(4),
Some(24),
)
.unwrap();
@ -1041,7 +1026,6 @@ mod tests {
"请规划今天工作",
ScheduledAgentTaskOptions {
agent: Some("planner".to_string()),
fresh_session: true,
..Default::default()
},
)
@ -1057,7 +1041,6 @@ mod tests {
"请规划今天工作",
ScheduledAgentTaskOptions {
agent: Some("default".to_string()),
fresh_session: true,
..Default::default()
},
)
@ -1098,7 +1081,6 @@ mod tests {
HashSet::new(),
crate::config::TaskConfig::default(),
crate::config::MemoryMaintenanceConfig::default(),
Some(4),
Some(24),
)
.unwrap();
@ -1109,7 +1091,6 @@ mod tests {
"chat-guard",
"每小时执行以下流程:检查邮箱并同步待办",
ScheduledAgentTaskOptions {
fresh_session: true,
system_prompt: Some("你是邮箱待办同步助手。".to_string()),
..Default::default()
},
@ -1184,7 +1165,6 @@ mod tests {
HashSet::new(),
crate::config::TaskConfig::default(),
crate::config::MemoryMaintenanceConfig::default(),
Some(4),
Some(24),
)
.unwrap();
@ -1271,7 +1251,6 @@ mod tests {
HashSet::new(),
crate::config::TaskConfig::default(),
crate::config::MemoryMaintenanceConfig::default(),
Some(4),
Some(24),
)
.unwrap();
@ -1357,7 +1336,6 @@ mod tests {
HashSet::new(),
crate::config::TaskConfig::default(),
crate::config::MemoryMaintenanceConfig::default(),
Some(4),
Some(24),
)
.unwrap();
@ -1425,7 +1403,6 @@ mod tests {
HashSet::new(),
crate::config::TaskConfig::default(),
crate::config::MemoryMaintenanceConfig::default(),
Some(4),
Some(24),
)
.unwrap();
@ -1502,7 +1479,6 @@ mod tests {
HashSet::new(),
crate::config::TaskConfig::default(),
crate::config::MemoryMaintenanceConfig::default(),
Some(4),
Some(24),
)
.unwrap();
@ -1566,7 +1542,6 @@ mod tests {
HashSet::new(),
crate::config::TaskConfig::default(),
crate::config::MemoryMaintenanceConfig::default(),
Some(4),
Some(24),
)
.unwrap();
@ -1773,7 +1748,6 @@ mod tests {
skills,
store.clone(),
100,
Some(4),
)
.await
.unwrap();
@ -1813,7 +1787,6 @@ mod tests {
skills,
store.clone(),
100,
Some(4),
)
.await
.unwrap();
@ -1886,7 +1859,6 @@ mod tests {
skills,
store.clone(),
0,
Some(4),
)
.await
.unwrap();

View File

@ -19,7 +19,6 @@ pub(crate) struct SessionFactory {
conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>,
store: Arc<SessionStore>,
chat_history_ttl_hours: Option<u64>,
}
impl SessionFactory {
@ -30,7 +29,6 @@ impl SessionFactory {
conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>,
store: Arc<SessionStore>,
chat_history_ttl_hours: Option<u64>,
) -> Self {
Self {
provider_config,
@ -39,7 +37,6 @@ impl SessionFactory {
conversations,
skill_events,
store,
chat_history_ttl_hours,
}
}
@ -56,7 +53,6 @@ impl SessionFactory {
self.agent_factory.clone(),
self.conversations.clone(),
self.skill_events.clone(),
self.chat_history_ttl_hours,
self.store.clone(),
)
.await

View File

@ -15,13 +15,6 @@ fn preview_text(content: &str, max_chars: usize) -> String {
preview.replace('\n', "\\n")
}
fn current_timestamp() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs() as i64
}
pub(crate) struct SessionHistory {
channel_name: String,
chat_histories: HashMap<String, Vec<ChatMessage>>,
@ -30,7 +23,6 @@ pub(crate) struct SessionHistory {
compression_in_flight: HashSet<String>,
conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>,
chat_history_ttl_hours: Option<u64>,
}
impl SessionHistory {
@ -38,7 +30,6 @@ impl SessionHistory {
channel_name: impl Into<String>,
conversations: Arc<dyn ConversationRepository>,
skill_events: Arc<dyn SkillEventRepository>,
chat_history_ttl_hours: Option<u64>,
) -> Self {
Self {
channel_name: channel_name.into(),
@ -48,7 +39,6 @@ impl SessionHistory {
compression_in_flight: HashSet::new(),
conversations,
skill_events,
chat_history_ttl_hours,
}
}
@ -70,29 +60,6 @@ impl SessionHistory {
chat_id: &str,
topic_id: Option<&str>,
) -> Result<(), AgentError> {
// 获取 session 记录(用于检查最后活跃时间)
let session_record = self.ensure_persistent_session(chat_id)?;
// 检查是否超时
if let Some(ttl_hours) = self.chat_history_ttl_hours {
if ttl_hours > 0 {
let now = current_timestamp();
let elapsed_hours = (now - session_record.last_active_at) / 3600;
if elapsed_hours >= ttl_hours as i64 {
tracing::info!(
channel = %self.channel_name,
chat_id = %chat_id,
elapsed_hours = elapsed_hours,
ttl_hours = ttl_hours,
"Chat history expired, resetting context"
);
// 重置会话上下文(清空内存历史,但保留数据库记录)
self.reset_chat_context(chat_id)?;
}
}
}
// 原有逻辑
if self.chat_histories.contains_key(chat_id) {
return Ok(());
}
@ -178,19 +145,6 @@ impl SessionHistory {
.map_err(|err| AgentError::Other(format!("clear history persistence error: {}", err)))
}
pub(crate) fn reset_chat_context(&mut self, chat_id: &str) -> Result<(), AgentError> {
if let Some(history) = self.chat_histories.get_mut(chat_id) {
let len = history.len();
history.clear();
#[cfg(debug_assertions)]
tracing::debug!(chat_id = %chat_id, previous_len = len, "Chat history reset in memory");
}
self.conversations
.reset_session(&self.persistent_session_id(chat_id))
.map_err(|err| AgentError::Other(format!("reset history persistence error: {}", err)))
}
pub(crate) fn append_persisted_message(
&mut self,
chat_id: &str,

View File

@ -20,7 +20,6 @@ use crate::storage::{
#[derive(Debug, Clone, Default)]
pub struct ScheduledAgentTaskOptions {
pub sender_id: Option<String>,
pub fresh_session: bool,
pub system_prompt: Option<String>,
pub metadata: HashMap<String, String>,
pub agent: Option<String>,
@ -1019,11 +1018,6 @@ fn parse_scheduled_agent_task_options(
.get("sender_id")
.and_then(|value| value.as_str())
.map(ToString::to_string);
let fresh_session = job
.payload
.get("fresh_session")
.and_then(|value| value.as_bool())
.unwrap_or(false);
let system_prompt = job
.payload
.get("system_prompt")
@ -1038,7 +1032,6 @@ fn parse_scheduled_agent_task_options(
Ok(ScheduledAgentTaskOptions {
sender_id,
fresh_session,
system_prompt,
metadata,
agent,
@ -1219,7 +1212,6 @@ mod agent_task_tests {
let options = parse_scheduled_agent_task_options(&job).unwrap();
assert_eq!(options.agent.as_deref(), Some("planner"));
assert_eq!(options.sender_id.as_deref(), Some("scheduler-bot"));
assert!(options.fresh_session);
assert_eq!(options.system_prompt.as_deref(), Some("你是日报助手"));
assert_eq!(
options.metadata.get("job_type").map(String::as_str),

View File

@ -66,7 +66,6 @@ impl SessionStore {
archived_at INTEGER,
deleted_at INTEGER,
message_count INTEGER NOT NULL DEFAULT 0,
reset_cutoff_seq INTEGER NOT NULL DEFAULT 0,
user_turn_count INTEGER NOT NULL DEFAULT 0,
agent_prompt_reinjection_count INTEGER NOT NULL DEFAULT 0
);
@ -248,8 +247,8 @@ impl SessionStore {
INSERT INTO sessions (
id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at, archived_at, deleted_at, message_count,
reset_cutoff_seq, user_turn_count, agent_prompt_reinjection_count
) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0, 0)
user_turn_count, agent_prompt_reinjection_count
) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0)
",
params![&session_id, title, channel_name, id, now],
)?;
@ -291,8 +290,8 @@ impl SessionStore {
INSERT INTO sessions (
id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at, archived_at, deleted_at, message_count,
reset_cutoff_seq, user_turn_count, agent_prompt_reinjection_count
) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0, 0)
user_turn_count, agent_prompt_reinjection_count
) VALUES (?1, ?2, ?3, ?4, NULL, ?5, ?5, ?5, NULL, NULL, 0, 0, 0)
",
params![session_id, title, channel_name, chat_id, now],
)?;
@ -308,7 +307,7 @@ impl SessionStore {
"
SELECT id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at,
archived_at, deleted_at, message_count, reset_cutoff_seq,
archived_at, deleted_at, message_count,
user_turn_count, agent_prompt_reinjection_count
FROM sessions
WHERE id = ?1 AND deleted_at IS NULL
@ -330,7 +329,7 @@ impl SessionStore {
"
SELECT id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at,
archived_at, deleted_at, message_count, reset_cutoff_seq,
archived_at, deleted_at, message_count,
user_turn_count, agent_prompt_reinjection_count
FROM sessions
WHERE channel_name = ?1
@ -493,7 +492,6 @@ impl SessionStore {
SET message_count = 0,
updated_at = ?2,
last_active_at = ?2,
reset_cutoff_seq = 0,
user_turn_count = 0,
agent_prompt_reinjection_count = 0
WHERE id = ?1 AND deleted_at IS NULL
@ -503,35 +501,6 @@ impl SessionStore {
Ok(())
}
pub fn reset_session(&self, session_id: &str) -> Result<(), StorageError> {
let now = current_timestamp();
let conn = self.conn.lock().expect("session db mutex poisoned");
let tx = conn.unchecked_transaction()?;
let cutoff_seq: i64 = tx.query_row(
"SELECT COALESCE(MAX(seq), 0) FROM messages WHERE session_id = ?1",
params![session_id],
|row| row.get(0),
)?;
tx.execute(
"
UPDATE sessions
SET reset_cutoff_seq = ?2,
updated_at = ?3,
last_active_at = ?3,
archived_at = NULL,
user_turn_count = 0,
agent_prompt_reinjection_count = 0
WHERE id = ?1 AND deleted_at IS NULL
",
params![session_id, cutoff_seq, now],
)?;
tx.commit()?;
Ok(())
}
pub fn append_message(
&self,
session_id: &str,
@ -607,7 +576,6 @@ impl SessionStore {
pub fn compact_active_history(
&self,
session_id: &str,
expected_reset_cutoff_seq: i64,
snapshot_end_seq: i64,
preserved_system_messages: &[ChatMessage],
summary_message: &ChatMessage,
@ -616,18 +584,13 @@ impl SessionStore {
let conn = self.conn.lock().expect("session db mutex poisoned");
let tx = conn.unchecked_transaction()?;
let current_cutoff = active_reset_cutoff(&tx, session_id)?;
if current_cutoff != expected_reset_cutoff_seq {
return Ok(false);
}
let current_max_seq: i64 = tx.query_row(
"SELECT COALESCE(MAX(seq), 0) FROM messages WHERE session_id = ?1",
params![session_id],
|row| row.get(0),
)?;
if snapshot_end_seq <= current_cutoff || snapshot_end_seq > current_max_seq {
if snapshot_end_seq > current_max_seq {
return Ok(false);
}
@ -660,20 +623,24 @@ impl SessionStore {
inserted_count += 1;
}
// Delete all old messages (including delta messages that were just re-inserted)
tx.execute(
"DELETE FROM messages WHERE session_id = ?1 AND seq <= ?2",
params![session_id, current_max_seq],
)?;
tx.execute(
"
UPDATE sessions
SET reset_cutoff_seq = ?2,
message_count = message_count + ?3,
user_turn_count = ?4,
updated_at = ?5,
last_active_at = ?5,
SET message_count = ?2,
user_turn_count = ?3,
updated_at = ?4,
last_active_at = ?4,
archived_at = NULL
WHERE id = ?1 AND deleted_at IS NULL
",
params![
session_id,
current_max_seq,
inserted_count,
active_user_turn_count,
now,
@ -1309,8 +1276,7 @@ impl SessionStore {
pub fn load_messages(&self, session_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let cutoff_seq = active_reset_cutoff(&conn, session_id)?;
load_messages_after(&conn, session_id, cutoff_seq)
load_messages_after(&conn, session_id, 0)
}
pub fn load_messages_for_topic(&self, topic_id: &str) -> Result<Vec<ChatMessage>, StorageError> {
@ -1381,14 +1347,13 @@ impl SessionStore {
pub fn count_active_user_messages(&self, session_id: &str) -> Result<i64, StorageError> {
let conn = self.conn.lock().expect("session db mutex poisoned");
let cutoff_seq = active_reset_cutoff(&conn, session_id)?;
conn.query_row(
"
SELECT COUNT(*)
FROM messages
WHERE session_id = ?1 AND seq > ?2 AND role = 'user'
WHERE session_id = ?1 AND role = 'user'
",
params![session_id, cutoff_seq],
params![session_id],
|row| row.get(0),
)
.map_err(StorageError::from)
@ -1422,9 +1387,8 @@ fn map_session_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<SessionRecord
archived_at: row.get(8)?,
deleted_at: row.get(9)?,
message_count: row.get(10)?,
reset_cutoff_seq: row.get(11)?,
user_turn_count: row.get(12)?,
agent_prompt_reinjection_count: row.get(13)?,
user_turn_count: row.get(11)?,
agent_prompt_reinjection_count: row.get(12)?,
})
}
@ -1510,13 +1474,6 @@ fn map_scheduler_job_record(row: &rusqlite::Row<'_>) -> rusqlite::Result<Schedul
}
fn ensure_sessions_schema(conn: &Connection) -> Result<(), StorageError> {
if !has_column(conn, "sessions", "reset_cutoff_seq")? {
add_column_if_missing(
conn,
"ALTER TABLE sessions ADD COLUMN reset_cutoff_seq INTEGER NOT NULL DEFAULT 0",
)?;
}
if !has_column(conn, "sessions", "user_turn_count")? {
add_column_if_missing(
conn,
@ -1643,18 +1600,6 @@ fn add_column_if_missing(conn: &Connection, sql: &str) -> Result<(), StorageErro
}
}
fn active_reset_cutoff(conn: &Connection, session_id: &str) -> Result<i64, StorageError> {
let cutoff = conn
.query_row(
"SELECT reset_cutoff_seq FROM sessions WHERE id = ?1 AND deleted_at IS NULL",
params![session_id],
|row| row.get(0),
)
.optional()?;
Ok(cutoff.unwrap_or(0))
}
fn insert_message_with_seq(
conn: &rusqlite::Transaction<'_>,
session_id: &str,
@ -1875,7 +1820,6 @@ mod tests {
assert_eq!(session.channel_name, "cli");
assert_eq!(session.chat_id, session.id);
assert_eq!(session.message_count, 0);
assert_eq!(session.reset_cutoff_seq, 0);
assert_eq!(session.user_turn_count, 0);
assert_eq!(session.agent_prompt_reinjection_count, 0);
@ -1887,7 +1831,6 @@ mod tests {
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.message_count, 2);
assert!(stored.archived_at.is_none());
assert_eq!(stored.reset_cutoff_seq, 0);
assert_eq!(stored.user_turn_count, 1);
assert_eq!(stored.agent_prompt_reinjection_count, 0);
@ -1982,44 +1925,7 @@ mod tests {
}
#[test]
fn test_reset_session_preserves_full_history_and_hides_active_history() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("reset")).unwrap();
store
.append_message(&session.id, &ChatMessage::user("before"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::assistant("context"))
.unwrap();
store.reset_session(&session.id).unwrap();
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.reset_cutoff_seq, 2);
assert_eq!(stored.user_turn_count, 0);
assert_eq!(stored.agent_prompt_reinjection_count, 0);
let active_messages = store.load_messages(&session.id).unwrap();
assert!(active_messages.is_empty());
let all_messages = store.load_all_messages(&session.id).unwrap();
assert_eq!(all_messages.len(), 2);
assert_eq!(all_messages[0].content, "before");
assert_eq!(all_messages[1].content, "context");
store
.append_message(&session.id, &ChatMessage::user("after"))
.unwrap();
let active_messages = store.load_messages(&session.id).unwrap();
assert_eq!(active_messages.len(), 1);
assert_eq!(active_messages[0].content, "after");
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.user_turn_count, 1);
}
#[test]
fn test_schema_migration_adds_reset_cutoff_column() {
fn test_schema_migration_adds_user_turn_and_reinjection_columns() {
let conn = Connection::open_in_memory().unwrap();
conn.execute_batch(
"
@ -2057,7 +1963,6 @@ mod tests {
let store = SessionStore::from_connection(conn).unwrap();
let session = store.create_cli_session(Some("migrated")).unwrap();
assert_eq!(session.reset_cutoff_seq, 0);
assert_eq!(session.user_turn_count, 0);
assert_eq!(session.agent_prompt_reinjection_count, 0);
}
@ -2105,42 +2010,6 @@ mod tests {
assert!(has_column(&conn, "messages", "reasoning_content").unwrap());
}
#[test]
fn test_count_active_user_messages_respects_reset_cutoff_seq() {
let store = SessionStore::in_memory().unwrap();
let session = store.create_cli_session(Some("count-users")).unwrap();
store
.append_message(&session.id, &ChatMessage::system("agent"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::user("u1"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::assistant("a1"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::user("u2"))
.unwrap();
assert_eq!(store.count_active_user_messages(&session.id).unwrap(), 2);
store.reset_session(&session.id).unwrap();
assert_eq!(store.count_active_user_messages(&session.id).unwrap(), 0);
store
.append_message(&session.id, &ChatMessage::system("agent-again"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::user("u3"))
.unwrap();
store
.append_message(&session.id, &ChatMessage::user("u4"))
.unwrap();
assert_eq!(store.count_active_user_messages(&session.id).unwrap(), 2);
}
#[test]
fn test_compact_active_history_rebuilds_active_segment_with_delta_messages() {
let store = SessionStore::in_memory().unwrap();
@ -2185,7 +2054,6 @@ mod tests {
let compacted = store
.compact_active_history(
&session.id,
0,
snapshot_end_seq,
&preserved_system_messages,
&summary_message,
@ -2214,11 +2082,10 @@ mod tests {
assert_eq!(active_messages[9].content, "a5");
let stored = store.get_session(&session.id).unwrap().unwrap();
assert_eq!(stored.reset_cutoff_seq, 11);
assert_eq!(stored.user_turn_count, 4);
let all_messages = store.load_all_messages(&session.id).unwrap();
assert_eq!(all_messages.len(), 21);
assert_eq!(all_messages.len(), 10);
}
#[test]

View File

@ -35,12 +35,9 @@ pub trait ConversationRepository: Send + Sync + 'static {
fn clear_messages(&self, session_id: &str) -> Result<(), StorageError>;
fn reset_session(&self, session_id: &str) -> Result<(), StorageError>;
fn compact_active_history(
&self,
session_id: &str,
expected_reset_cutoff_seq: i64,
snapshot_end_seq: i64,
preserved_system_messages: &[ChatMessage],
summary_message: &ChatMessage,
@ -185,14 +182,9 @@ impl ConversationRepository for super::SessionStore {
super::SessionStore::clear_messages(self, session_id)
}
fn reset_session(&self, session_id: &str) -> Result<(), StorageError> {
super::SessionStore::reset_session(self, session_id)
}
fn compact_active_history(
&self,
session_id: &str,
expected_reset_cutoff_seq: i64,
snapshot_end_seq: i64,
preserved_system_messages: &[ChatMessage],
summary_message: &ChatMessage,
@ -201,7 +193,6 @@ impl ConversationRepository for super::SessionStore {
super::SessionStore::compact_active_history(
self,
session_id,
expected_reset_cutoff_seq,
snapshot_end_seq,
preserved_system_messages,
summary_message,

View File

@ -23,7 +23,6 @@ pub struct SessionRecord {
pub archived_at: Option<i64>,
pub deleted_at: Option<i64>,
pub message_count: i64,
pub reset_cutoff_seq: i64,
pub user_turn_count: i64,
pub agent_prompt_reinjection_count: i64,
}