PicoBot/src/gateway/compaction.rs
ooodc 008aba91ac feat: 重构调度器以使用 AgentTaskExecutor 和 SchedulerMaintenanceService
- 更新调度器,将 SessionManager 替换为 AgentTaskExecutor 和 SchedulerMaintenanceService。
- 修改作业执行逻辑,使用新服务处理代理任务和内部事件。
- 添加新的 CliChannel 以处理 CLI 连接,并包括适当的注册和注销逻辑。
- 引入 AgentTaskExecutor 和 SchedulerMaintenanceService,用于管理代理任务和会话维护。
- 实现聊天命令处理,用于重置会话上下文。
- 添加后台历史压缩功能,以优化会话存储。
- 创建实用函数,用于准备通过 WebSocket 通信的出站消息。
- 为新功能添加测试,并确保现有测试通过。

Co-authored-by: Copilot <copilot@github.com>
2026-04-28 12:55:30 +08:00

106 lines
3.6 KiB
Rust

use std::sync::Arc;
use tokio::sync::Mutex;
use crate::agent::AgentError;
use super::session::Session;
pub(crate) async fn schedule_background_history_compaction(
session: Arc<Mutex<Session>>,
chat_id: impl Into<String>,
) -> Result<(), AgentError> {
let chat_id = chat_id.into();
let snapshot = {
let mut session_guard = session.lock().await;
let session_record = session_guard.ensure_persistent_session(&chat_id)?;
session_guard.ensure_chat_loaded(&chat_id)?;
let history = session_guard.get_or_create_history(&chat_id).clone();
let compressor = session_guard.compressor().clone();
if !compressor.should_compress(&history) {
return Ok(());
}
if !session_guard.try_start_background_compaction(&chat_id) {
return Ok(());
}
(
session_guard.store(),
session_guard.persistent_session_id(&chat_id),
session_record.reset_cutoff_seq,
session_record.message_count,
history,
compressor,
session_guard.provider_config().clone(),
)
};
let (
store,
session_id,
expected_reset_cutoff_seq,
snapshot_end_seq,
history,
compressor,
provider_config,
) = snapshot;
let session_for_task = session.clone();
let chat_id_for_task = chat_id.clone();
tokio::spawn(async move {
tracing::info!(chat_id = %chat_id_for_task, snapshot_end_seq, "Starting background history compaction");
let compaction_result = compressor
.build_compaction_plan(&history, &provider_config)
.await;
let mut committed = false;
match compaction_result {
Ok(Some(plan)) => match store.compact_active_history(
&session_id,
expected_reset_cutoff_seq,
snapshot_end_seq,
&plan.preserved_system_messages,
&plan.summary_message,
&plan.preserved_messages,
) {
Ok(true) => {
committed = true;
tracing::info!(
chat_id = %chat_id_for_task,
snapshot_end_seq,
compressed_turns = plan.compressed_turns,
preserved_turns = plan.preserved_turns,
"Background history compaction committed"
);
}
Ok(false) => {
tracing::info!(chat_id = %chat_id_for_task, snapshot_end_seq, "Background history compaction skipped due to stale snapshot");
}
Err(error) => {
tracing::warn!(chat_id = %chat_id_for_task, error = %error, "Background history compaction commit failed");
}
},
Ok(None) => {
tracing::debug!(chat_id = %chat_id_for_task, "Background history compaction not needed after snapshot analysis");
}
Err(error) => {
tracing::warn!(chat_id = %chat_id_for_task, error = %error, "Background history compaction build failed");
}
}
let mut session_guard = session_for_task.lock().await;
if committed {
if let Err(error) = session_guard.reload_chat_history(&chat_id_for_task) {
tracing::warn!(chat_id = %chat_id_for_task, error = %error, "Failed to reload history after background compaction");
}
}
session_guard.finish_background_compaction(&chat_id_for_task);
});
Ok(())
}