feat: 添加会话管理功能,支持会话过期清理;引入 session_ttl_hours 配置项以控制会话存活时间
This commit is contained in:
parent
e712fd7645
commit
86ba3b447e
@ -314,6 +314,8 @@ pub struct GatewayConfig {
|
||||
pub agent_prompt_reinject_every: u64,
|
||||
#[serde(default = "default_max_concurrent_requests")]
|
||||
pub max_concurrent_requests: usize,
|
||||
#[serde(default, rename = "session_ttl_hours")]
|
||||
pub session_ttl_hours: Option<u64>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize, Serialize)]
|
||||
@ -337,6 +339,7 @@ pub struct SchedulerConfig {
|
||||
}
|
||||
|
||||
pub const BUILTIN_MEMORY_MAINTENANCE_JOB_ID: &str = "builtin.memory_maintenance_daily";
|
||||
pub const BUILTIN_SESSION_CLEANUP_JOB_ID: &str = "builtin.session_cleanup_hourly";
|
||||
|
||||
#[derive(Debug, Clone, Copy, Deserialize, Serialize, PartialEq, Eq, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
@ -427,7 +430,8 @@ impl SchedulerJobConfig {
|
||||
|
||||
impl SchedulerConfig {
|
||||
pub fn builtin_jobs(time: &TimeConfig) -> Vec<SchedulerJobConfig> {
|
||||
vec![SchedulerJobConfig {
|
||||
vec![
|
||||
SchedulerJobConfig {
|
||||
id: BUILTIN_MEMORY_MAINTENANCE_JOB_ID.to_string(),
|
||||
enabled: true,
|
||||
kind: SchedulerJobKind::InternalEvent,
|
||||
@ -442,7 +446,24 @@ impl SchedulerConfig {
|
||||
"time_zone": time.timezone,
|
||||
"local_time": "every_4_hours"
|
||||
}),
|
||||
}]
|
||||
},
|
||||
SchedulerJobConfig {
|
||||
id: BUILTIN_SESSION_CLEANUP_JOB_ID.to_string(),
|
||||
enabled: true,
|
||||
kind: SchedulerJobKind::InternalEvent,
|
||||
schedule: Some(SchedulerSchedule::Cron {
|
||||
expression: "0 * * * *".to_string(),
|
||||
}),
|
||||
startup_delay_secs: 0,
|
||||
interval_secs: 0,
|
||||
target: SchedulerJobTarget::default(),
|
||||
payload: serde_json::json!({
|
||||
"event": "session_cleanup",
|
||||
"time_zone": time.timezone,
|
||||
"local_time": "every_hour"
|
||||
}),
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
pub fn effective_jobs(&self, time: &TimeConfig) -> Vec<SchedulerJobConfig> {
|
||||
@ -604,6 +625,7 @@ impl Default for GatewayConfig {
|
||||
chat_history_ttl_hours: Some(4),
|
||||
agent_prompt_reinject_every: default_agent_prompt_reinject_every(),
|
||||
max_concurrent_requests: default_max_concurrent_requests(),
|
||||
session_ttl_hours: Some(24),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -66,6 +66,8 @@ impl GatewayState {
|
||||
let agent_prompt_reinject_every = config.gateway.agent_prompt_reinject_every;
|
||||
let show_tool_results = config.gateway.show_tool_results;
|
||||
|
||||
let session_ttl_hours = config.gateway.session_ttl_hours;
|
||||
|
||||
let skills = Arc::new(SkillRuntime::from_config(config.skills.clone()));
|
||||
let channel_manager = ChannelManager::new();
|
||||
let bus = channel_manager.bus();
|
||||
@ -80,6 +82,7 @@ impl GatewayState {
|
||||
Arc::new(BusSessionMessageSender::new(bus.clone())),
|
||||
std::collections::HashSet::new(),
|
||||
chat_history_ttl_hours,
|
||||
session_ttl_hours,
|
||||
)?;
|
||||
|
||||
Ok(Self {
|
||||
|
||||
@ -30,6 +30,7 @@ pub(crate) fn build_session_manager(
|
||||
skills: Arc<SkillRuntime>,
|
||||
disabled_tools: HashSet<String>,
|
||||
chat_history_ttl_hours: Option<u64>,
|
||||
session_ttl_hours: Option<u64>,
|
||||
) -> Result<SessionManager, AgentError> {
|
||||
build_session_manager_with_sender(
|
||||
agent_prompt_reinject_every,
|
||||
@ -41,6 +42,7 @@ pub(crate) fn build_session_manager(
|
||||
Arc::new(NoopSessionMessageSender),
|
||||
disabled_tools,
|
||||
chat_history_ttl_hours,
|
||||
session_ttl_hours,
|
||||
)
|
||||
}
|
||||
|
||||
@ -54,6 +56,7 @@ pub(crate) fn build_session_manager_with_sender(
|
||||
session_message_sender: Arc<dyn SessionMessageSender>,
|
||||
disabled_tools: HashSet<String>,
|
||||
chat_history_ttl_hours: Option<u64>,
|
||||
session_ttl_hours: Option<u64>,
|
||||
) -> Result<SessionManager, AgentError> {
|
||||
let store = Arc::new(
|
||||
SessionStore::new()
|
||||
@ -101,7 +104,7 @@ pub(crate) fn build_session_manager_with_sender(
|
||||
skill_events,
|
||||
chat_history_ttl_hours,
|
||||
);
|
||||
let lifecycle = SessionLifecycleService::new(session_factory);
|
||||
let lifecycle = SessionLifecycleService::new(session_factory, session_ttl_hours);
|
||||
let cli_sessions = CliSessionService::new(store.clone());
|
||||
let messages = SessionMessageService::new(lifecycle.clone(), show_tool_results);
|
||||
let scheduled_tasks = ScheduledAgentTaskService::new(
|
||||
|
||||
@ -387,6 +387,7 @@ impl SessionManager {
|
||||
skills: Arc<SkillRuntime>,
|
||||
disabled_tools: std::collections::HashSet<String>,
|
||||
chat_history_ttl_hours: Option<u64>,
|
||||
session_ttl_hours: Option<u64>,
|
||||
) -> Result<Self, AgentError> {
|
||||
super::runtime::build_session_manager(
|
||||
agent_prompt_reinject_every,
|
||||
@ -397,6 +398,7 @@ impl SessionManager {
|
||||
skills,
|
||||
disabled_tools,
|
||||
chat_history_ttl_hours,
|
||||
session_ttl_hours,
|
||||
)
|
||||
}
|
||||
|
||||
@ -818,6 +820,7 @@ mod tests {
|
||||
Arc::new(SkillRuntime::default()),
|
||||
HashSet::new(),
|
||||
Some(4),
|
||||
Some(24),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -868,6 +871,7 @@ mod tests {
|
||||
Arc::new(SkillRuntime::default()),
|
||||
HashSet::new(),
|
||||
Some(4),
|
||||
Some(24),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -934,6 +938,7 @@ mod tests {
|
||||
Arc::new(SkillRuntime::default()),
|
||||
HashSet::new(),
|
||||
Some(4),
|
||||
Some(24),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1017,6 +1022,7 @@ mod tests {
|
||||
Arc::new(SkillRuntime::default()),
|
||||
HashSet::new(),
|
||||
Some(4),
|
||||
Some(24),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1101,6 +1107,7 @@ mod tests {
|
||||
Arc::new(SkillRuntime::default()),
|
||||
HashSet::new(),
|
||||
Some(4),
|
||||
Some(24),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1184,6 +1191,7 @@ mod tests {
|
||||
Arc::new(SkillRuntime::default()),
|
||||
HashSet::new(),
|
||||
Some(4),
|
||||
Some(24),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1249,6 +1257,7 @@ mod tests {
|
||||
Arc::new(SkillRuntime::default()),
|
||||
HashSet::new(),
|
||||
Some(4),
|
||||
Some(24),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1323,6 +1332,7 @@ mod tests {
|
||||
Arc::new(SkillRuntime::default()),
|
||||
HashSet::new(),
|
||||
Some(4),
|
||||
Some(24),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1384,6 +1394,7 @@ mod tests {
|
||||
Arc::new(SkillRuntime::default()),
|
||||
HashSet::new(),
|
||||
Some(4),
|
||||
Some(24),
|
||||
)
|
||||
.unwrap();
|
||||
|
||||
@ -1789,3 +1800,25 @@ mod tests {
|
||||
assert!(contents.contains(&"习惯先问方案再要代码".to_string()));
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl crate::scheduler::MaintenanceExecutor for SessionManager {
|
||||
async fn cleanup_expired_sessions(&self) -> usize {
|
||||
self.cleanup_expired_sessions().await
|
||||
}
|
||||
|
||||
async fn run_memory_maintenance_for_all_scopes(
|
||||
&self,
|
||||
) -> anyhow::Result<Vec<crate::scheduler::MaintenanceRunSummary>> {
|
||||
match self.run_memory_maintenance_for_all_scopes().await {
|
||||
Ok(Some(result)) => Ok(vec![crate::scheduler::MaintenanceRunSummary {
|
||||
scope_key: result.scope_key,
|
||||
merges: result.output.merges.len(),
|
||||
conflicts: result.output.conflicts.len(),
|
||||
low_value: result.output.low_value_ids.len(),
|
||||
}]),
|
||||
Ok(None) => Ok(vec![]),
|
||||
Err(error) => Err(anyhow::anyhow!(error.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -14,9 +14,9 @@ pub(crate) struct SessionLifecycleService {
|
||||
}
|
||||
|
||||
impl SessionLifecycleService {
|
||||
pub(crate) fn new(session_factory: SessionFactory) -> Self {
|
||||
pub(crate) fn new(session_factory: SessionFactory, session_ttl_hours: Option<u64>) -> Self {
|
||||
Self {
|
||||
session_pool: SessionPool::new(session_factory),
|
||||
session_pool: SessionPool::new(session_factory, session_ttl_hours),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -14,6 +14,7 @@ use super::session_factory::SessionFactory;
|
||||
pub(crate) struct SessionPool {
|
||||
inner: Arc<Mutex<SessionPoolInner>>,
|
||||
session_factory: SessionFactory,
|
||||
session_ttl_hours: Option<u64>,
|
||||
}
|
||||
|
||||
struct SessionPoolInner {
|
||||
@ -22,13 +23,14 @@ struct SessionPoolInner {
|
||||
}
|
||||
|
||||
impl SessionPool {
|
||||
pub(crate) fn new(session_factory: SessionFactory) -> Self {
|
||||
pub(crate) fn new(session_factory: SessionFactory, session_ttl_hours: Option<u64>) -> Self {
|
||||
Self {
|
||||
inner: Arc::new(Mutex::new(SessionPoolInner {
|
||||
sessions: HashMap::new(),
|
||||
session_timestamps: HashMap::new(),
|
||||
})),
|
||||
session_factory,
|
||||
session_ttl_hours,
|
||||
}
|
||||
}
|
||||
|
||||
@ -70,7 +72,39 @@ impl SessionPool {
|
||||
}
|
||||
|
||||
pub(crate) async fn cleanup_expired_sessions(&self) -> usize {
|
||||
// Session 级别不再自动清理,返回 0
|
||||
0
|
||||
let ttl_hours = match self.session_ttl_hours {
|
||||
Some(hours) if hours > 0 => hours,
|
||||
_ => return 0,
|
||||
};
|
||||
|
||||
let ttl_duration = std::time::Duration::from_secs(ttl_hours * 3600);
|
||||
let mut inner = self.inner.lock().await;
|
||||
let now = Instant::now();
|
||||
|
||||
let expired_channels: Vec<String> = inner
|
||||
.session_timestamps
|
||||
.iter()
|
||||
.filter_map(|(channel_name, last_active)| {
|
||||
let elapsed = now.duration_since(*last_active);
|
||||
if elapsed >= ttl_duration {
|
||||
tracing::info!(
|
||||
channel = %channel_name,
|
||||
elapsed_hours = elapsed.as_secs() / 3600,
|
||||
ttl_hours = ttl_hours,
|
||||
"Session expired, removing from memory pool"
|
||||
);
|
||||
Some(channel_name.clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
for channel_name in &expired_channels {
|
||||
inner.sessions.remove(channel_name);
|
||||
inner.session_timestamps.remove(channel_name);
|
||||
}
|
||||
|
||||
expired_channels.len()
|
||||
}
|
||||
}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user