From 86ba3b447ebf186723bda4b230c79976e442d1a3 Mon Sep 17 00:00:00 2001 From: oudecheng <13802883547@139.com> Date: Thu, 14 May 2026 15:29:37 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E4=BC=9A=E8=AF=9D?= =?UTF-8?q?=E7=AE=A1=E7=90=86=E5=8A=9F=E8=83=BD=EF=BC=8C=E6=94=AF=E6=8C=81?= =?UTF-8?q?=E4=BC=9A=E8=AF=9D=E8=BF=87=E6=9C=9F=E6=B8=85=E7=90=86=EF=BC=9B?= =?UTF-8?q?=E5=BC=95=E5=85=A5=20session=5Fttl=5Fhours=20=E9=85=8D=E7=BD=AE?= =?UTF-8?q?=E9=A1=B9=E4=BB=A5=E6=8E=A7=E5=88=B6=E4=BC=9A=E8=AF=9D=E5=AD=98?= =?UTF-8?q?=E6=B4=BB=E6=97=B6=E9=97=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/config/mod.rs | 54 ++++++++++++++++++++++---------- src/gateway/mod.rs | 3 ++ src/gateway/runtime.rs | 5 ++- src/gateway/session.rs | 33 +++++++++++++++++++ src/gateway/session_lifecycle.rs | 4 +-- src/gateway/session_pool.rs | 40 +++++++++++++++++++++-- 6 files changed, 117 insertions(+), 22 deletions(-) diff --git a/src/config/mod.rs b/src/config/mod.rs index bc3b3ad..2a7fb86 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -314,6 +314,8 @@ pub struct GatewayConfig { pub agent_prompt_reinject_every: u64, #[serde(default = "default_max_concurrent_requests")] pub max_concurrent_requests: usize, + #[serde(default, rename = "session_ttl_hours")] + pub session_ttl_hours: Option, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -337,6 +339,7 @@ pub struct SchedulerConfig { } pub const BUILTIN_MEMORY_MAINTENANCE_JOB_ID: &str = "builtin.memory_maintenance_daily"; +pub const BUILTIN_SESSION_CLEANUP_JOB_ID: &str = "builtin.session_cleanup_hourly"; #[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)] #[serde(rename_all = "snake_case")] @@ -427,22 +430,40 @@ impl SchedulerJobConfig { impl SchedulerConfig { pub fn builtin_jobs(time: &TimeConfig) -> Vec { - vec![SchedulerJobConfig { - id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(), - enabled: true, - kind: SchedulerJobKind::InternalEvent, - schedule: Some(SchedulerSchedule::Cron { - expression: "0 */4 * * *".to_string(), - }), - startup_delay_secs: 0, - interval_secs: 0, - target: SchedulerJobTarget::default(), - payload: serde_json::json!({ - "event": "memory_maintenance", - "time_zone": time.timezone, - "local_time": "every_4_hours" - }), - }] + vec![ + SchedulerJobConfig { + id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(), + enabled: true, + kind: SchedulerJobKind::InternalEvent, + schedule: Some(SchedulerSchedule::Cron { + expression: "0 */4 * * *".to_string(), + }), + startup_delay_secs: 0, + interval_secs: 0, + target: SchedulerJobTarget::default(), + payload: serde_json::json!({ + "event": "memory_maintenance", + "time_zone": time.timezone, + "local_time": "every_4_hours" + }), + }, + SchedulerJobConfig { + id: BUILTIN_SESSION_CLEANUP_JOB_ID.to_string(), + enabled: true, + kind: SchedulerJobKind::InternalEvent, + schedule: Some(SchedulerSchedule::Cron { + expression: "0 * * * *".to_string(), + }), + startup_delay_secs: 0, + interval_secs: 0, + target: SchedulerJobTarget::default(), + payload: serde_json::json!({ + "event": "session_cleanup", + "time_zone": time.timezone, + "local_time": "every_hour" + }), + }, + ] } pub fn effective_jobs(&self, time: &TimeConfig) -> Vec { @@ -604,6 +625,7 @@ impl Default for GatewayConfig { chat_history_ttl_hours: Some(4), agent_prompt_reinject_every: default_agent_prompt_reinject_every(), max_concurrent_requests: default_max_concurrent_requests(), + session_ttl_hours: Some(24), } } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index be137de..4dcb9c1 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -66,6 +66,8 @@ impl GatewayState { let agent_prompt_reinject_every = config.gateway.agent_prompt_reinject_every; let show_tool_results = config.gateway.show_tool_results; + let session_ttl_hours = config.gateway.session_ttl_hours; + let skills = Arc::new(SkillRuntime::from_config(config.skills.clone())); let channel_manager = ChannelManager::new(); let bus = channel_manager.bus(); @@ -80,6 +82,7 @@ impl GatewayState { Arc::new(BusSessionMessageSender::new(bus.clone())), std::collections::HashSet::new(), chat_history_ttl_hours, + session_ttl_hours, )?; Ok(Self { diff --git a/src/gateway/runtime.rs b/src/gateway/runtime.rs index 446836c..5c0f81b 100644 --- a/src/gateway/runtime.rs +++ b/src/gateway/runtime.rs @@ -30,6 +30,7 @@ pub(crate) fn build_session_manager( skills: Arc, disabled_tools: HashSet, chat_history_ttl_hours: Option, + session_ttl_hours: Option, ) -> Result { build_session_manager_with_sender( agent_prompt_reinject_every, @@ -41,6 +42,7 @@ pub(crate) fn build_session_manager( Arc::new(NoopSessionMessageSender), disabled_tools, chat_history_ttl_hours, + session_ttl_hours, ) } @@ -54,6 +56,7 @@ pub(crate) fn build_session_manager_with_sender( session_message_sender: Arc, disabled_tools: HashSet, chat_history_ttl_hours: Option, + session_ttl_hours: Option, ) -> Result { let store = Arc::new( SessionStore::new() @@ -101,7 +104,7 @@ pub(crate) fn build_session_manager_with_sender( skill_events, chat_history_ttl_hours, ); - let lifecycle = SessionLifecycleService::new(session_factory); + let lifecycle = SessionLifecycleService::new(session_factory, session_ttl_hours); let cli_sessions = CliSessionService::new(store.clone()); let messages = SessionMessageService::new(lifecycle.clone(), show_tool_results); let scheduled_tasks = ScheduledAgentTaskService::new( diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 8d3d285..064b84d 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -387,6 +387,7 @@ impl SessionManager { skills: Arc, disabled_tools: std::collections::HashSet, chat_history_ttl_hours: Option, + session_ttl_hours: Option, ) -> Result { super::runtime::build_session_manager( agent_prompt_reinject_every, @@ -397,6 +398,7 @@ impl SessionManager { skills, disabled_tools, chat_history_ttl_hours, + session_ttl_hours, ) } @@ -818,6 +820,7 @@ mod tests { Arc::new(SkillRuntime::default()), HashSet::new(), Some(4), + Some(24), ) .unwrap(); @@ -868,6 +871,7 @@ mod tests { Arc::new(SkillRuntime::default()), HashSet::new(), Some(4), + Some(24), ) .unwrap(); @@ -934,6 +938,7 @@ mod tests { Arc::new(SkillRuntime::default()), HashSet::new(), Some(4), + Some(24), ) .unwrap(); @@ -1017,6 +1022,7 @@ mod tests { Arc::new(SkillRuntime::default()), HashSet::new(), Some(4), + Some(24), ) .unwrap(); @@ -1101,6 +1107,7 @@ mod tests { Arc::new(SkillRuntime::default()), HashSet::new(), Some(4), + Some(24), ) .unwrap(); @@ -1184,6 +1191,7 @@ mod tests { Arc::new(SkillRuntime::default()), HashSet::new(), Some(4), + Some(24), ) .unwrap(); @@ -1249,6 +1257,7 @@ mod tests { Arc::new(SkillRuntime::default()), HashSet::new(), Some(4), + Some(24), ) .unwrap(); @@ -1323,6 +1332,7 @@ mod tests { Arc::new(SkillRuntime::default()), HashSet::new(), Some(4), + Some(24), ) .unwrap(); @@ -1384,6 +1394,7 @@ mod tests { Arc::new(SkillRuntime::default()), HashSet::new(), Some(4), + Some(24), ) .unwrap(); @@ -1789,3 +1800,25 @@ mod tests { assert!(contents.contains(&"习惯先问方案再要代码".to_string())); } } + +#[async_trait] +impl crate::scheduler::MaintenanceExecutor for SessionManager { + async fn cleanup_expired_sessions(&self) -> usize { + self.cleanup_expired_sessions().await + } + + async fn run_memory_maintenance_for_all_scopes( + &self, + ) -> anyhow::Result> { + match self.run_memory_maintenance_for_all_scopes().await { + Ok(Some(result)) => Ok(vec![crate::scheduler::MaintenanceRunSummary { + scope_key: result.scope_key, + merges: result.output.merges.len(), + conflicts: result.output.conflicts.len(), + low_value: result.output.low_value_ids.len(), + }]), + Ok(None) => Ok(vec![]), + Err(error) => Err(anyhow::anyhow!(error.to_string())), + } + } +} diff --git a/src/gateway/session_lifecycle.rs b/src/gateway/session_lifecycle.rs index e433269..0ff3be9 100644 --- a/src/gateway/session_lifecycle.rs +++ b/src/gateway/session_lifecycle.rs @@ -14,9 +14,9 @@ pub(crate) struct SessionLifecycleService { } impl SessionLifecycleService { - pub(crate) fn new(session_factory: SessionFactory) -> Self { + pub(crate) fn new(session_factory: SessionFactory, session_ttl_hours: Option) -> Self { Self { - session_pool: SessionPool::new(session_factory), + session_pool: SessionPool::new(session_factory, session_ttl_hours), } } diff --git a/src/gateway/session_pool.rs b/src/gateway/session_pool.rs index 9aa156c..a26ada7 100644 --- a/src/gateway/session_pool.rs +++ b/src/gateway/session_pool.rs @@ -14,6 +14,7 @@ use super::session_factory::SessionFactory; pub(crate) struct SessionPool { inner: Arc>, session_factory: SessionFactory, + session_ttl_hours: Option, } struct SessionPoolInner { @@ -22,13 +23,14 @@ struct SessionPoolInner { } impl SessionPool { - pub(crate) fn new(session_factory: SessionFactory) -> Self { + pub(crate) fn new(session_factory: SessionFactory, session_ttl_hours: Option) -> Self { Self { inner: Arc::new(Mutex::new(SessionPoolInner { sessions: HashMap::new(), session_timestamps: HashMap::new(), })), session_factory, + session_ttl_hours, } } @@ -70,7 +72,39 @@ impl SessionPool { } pub(crate) async fn cleanup_expired_sessions(&self) -> usize { - // Session 级别不再自动清理,返回 0 - 0 + let ttl_hours = match self.session_ttl_hours { + Some(hours) if hours > 0 => hours, + _ => return 0, + }; + + let ttl_duration = std::time::Duration::from_secs(ttl_hours * 3600); + let mut inner = self.inner.lock().await; + let now = Instant::now(); + + let expired_channels: Vec = inner + .session_timestamps + .iter() + .filter_map(|(channel_name, last_active)| { + let elapsed = now.duration_since(*last_active); + if elapsed >= ttl_duration { + tracing::info!( + channel = %channel_name, + elapsed_hours = elapsed.as_secs() / 3600, + ttl_hours = ttl_hours, + "Session expired, removing from memory pool" + ); + Some(channel_name.clone()) + } else { + None + } + }) + .collect(); + + for channel_name in &expired_channels { + inner.sessions.remove(channel_name); + inner.session_timestamps.remove(channel_name); + } + + expired_channels.len() } }