From c547b88a12546265a78bda0f63748a10e1cdffd9 Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Tue, 28 Apr 2026 14:49:31 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E6=B7=BB=E5=8A=A0=E6=8C=81=E4=B9=85?= =?UTF-8?q?=E5=8C=96=E6=8A=80=E8=83=BD=E4=BA=8B=E4=BB=B6=E5=A4=84=E7=90=86?= =?UTF-8?q?=E9=80=BB=E8=BE=91=EF=BC=8C=E9=87=8D=E6=9E=84=E6=8A=80=E8=83=BD?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E8=AE=B0=E5=BD=95=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot --- src/agent/agent_loop.rs | 101 +++++++++++++++++++++++++------- src/agent/mod.rs | 4 +- src/gateway/agent_factory.rs | 9 ++- src/gateway/mod.rs | 1 + src/gateway/skill_event_sink.rs | 32 ++++++++++ 5 files changed, 124 insertions(+), 23 deletions(-) create mode 100644 src/gateway/skill_event_sink.rs diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index 3af02e4..2962ccd 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -7,7 +7,6 @@ use crate::observability::{ }; use crate::providers::{ChatCompletionRequest, LLMProvider, Message, create_provider}; use crate::skills::SkillRuntime; -use crate::storage::SessionStore; use crate::text::{char_count, take_prefix_chars, take_suffix_chars}; use crate::tools::{ToolContext, ToolRegistry}; use async_trait::async_trait; @@ -298,8 +297,7 @@ pub struct AgentLoop { provider: Box, tools: Arc, skills: Arc, - skill_event_store: Option>, - skill_event_session_id: Option, + skill_event_sink: Option>, tool_context: ToolContext, observer: Option>, emitted_message_handler: Option>, @@ -317,6 +315,17 @@ pub trait EmittedMessageHandler: Send + Sync + 'static { async fn handle(&self, message: ChatMessage); } +#[derive(Debug, Clone)] +pub struct SkillEvent { + pub event_type: String, + pub skill_name: Option, + pub payload: serde_json::Value, +} + +pub trait SkillEventSink: Send + Sync + 'static { + fn record_skill_event(&self, event: SkillEvent); +} + impl AgentLoop { pub fn new(provider_config: LLMProviderConfig) -> Result { let max_iterations = provider_config.max_tool_iterations; @@ -328,8 +337,7 @@ impl AgentLoop { provider, tools: Arc::new(ToolRegistry::new()), skills: Arc::new(SkillRuntime::default()), - skill_event_store: None, - skill_event_session_id: None, + skill_event_sink: None, tool_context: ToolContext::default(), observer: None, emitted_message_handler: None, @@ -350,8 +358,7 @@ impl AgentLoop { provider, tools, skills: Arc::new(SkillRuntime::default()), - skill_event_store: None, - skill_event_session_id: None, + skill_event_sink: None, tool_context: ToolContext::default(), observer: None, emitted_message_handler: None, @@ -373,8 +380,7 @@ impl AgentLoop { provider, tools, skills, - skill_event_store: None, - skill_event_session_id: None, + skill_event_sink: None, tool_context: ToolContext::default(), observer: None, emitted_message_handler: None, @@ -382,9 +388,8 @@ impl AgentLoop { }) } - pub fn with_skill_event_store(mut self, store: Arc, session_id: String) -> Self { - self.skill_event_store = Some(store); - self.skill_event_session_id = Some(session_id); + pub fn with_skill_event_sink(mut self, sink: Arc) -> Self { + self.skill_event_sink = Some(sink); self } @@ -877,18 +882,15 @@ impl AgentLoop { skill_name: Option<&str>, payload: serde_json::Value, ) { - let (Some(store), Some(session_id)) = ( - self.skill_event_store.as_ref(), - self.skill_event_session_id.as_ref(), - ) else { + let Some(sink) = self.skill_event_sink.as_ref() else { return; }; - if let Err(err) = - store.append_skill_event(Some(session_id), event_type, skill_name, &payload) - { - tracing::warn!(error = %err, event_type = %event_type, "Failed to record skill event"); - } + sink.record_skill_event(SkillEvent { + event_type: event_type.to_string(), + skill_name: skill_name.map(str::to_string), + payload, + }); } } @@ -898,6 +900,25 @@ mod tests { use crate::observability::{MultiObserver, Observer}; use tempfile::tempdir; + fn test_provider_config() -> LLMProviderConfig { + LLMProviderConfig { + provider_type: "openai".to_string(), + name: "test".to_string(), + base_url: "http://localhost".to_string(), + api_key: "test-key".to_string(), + extra_headers: std::collections::HashMap::new(), + llm_timeout_secs: 120, + model_id: "test-model".to_string(), + temperature: Some(0.0), + max_tokens: Some(32), + context_window_tokens: None, + model_extra: std::collections::HashMap::new(), + max_tool_iterations: 1, + tool_result_max_chars: 20_000, + context_tool_result_trim_chars: 20_000, + } + } + struct TestObserver { events: std::sync::Mutex>, } @@ -910,6 +931,24 @@ mod tests { } } + struct TestSkillEventSink { + events: std::sync::Mutex>, + } + + impl TestSkillEventSink { + fn new() -> Self { + Self { + events: std::sync::Mutex::new(Vec::new()), + } + } + } + + impl SkillEventSink for TestSkillEventSink { + fn record_skill_event(&self, event: SkillEvent) { + self.events.lock().unwrap().push(event); + } + } + impl Observer for TestObserver { fn record_event(&self, event: &ObserverEvent) { self.events.lock().unwrap().push(event.clone()); @@ -936,6 +975,26 @@ mod tests { assert_eq!(multi.len(), 1); } + #[test] + fn test_skill_events_are_emitted_to_sink() { + let sink = Arc::new(TestSkillEventSink::new()); + let agent = AgentLoop::new(test_provider_config()) + .unwrap() + .with_skill_event_sink(sink.clone()); + + agent.record_skill_event( + "activated", + Some("demo"), + serde_json::json!({ "source": "test" }), + ); + + let events = sink.events.lock().unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_type, "activated"); + assert_eq!(events[0].skill_name.as_deref(), Some("demo")); + assert_eq!(events[0].payload, serde_json::json!({ "source": "test" })); + } + #[test] fn test_should_execute_in_parallel_single_tool() { // Would need a proper setup with AgentLoop to test fully diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 1e91055..743e7eb 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -1,5 +1,7 @@ pub mod agent_loop; pub mod context_compressor; -pub use agent_loop::{AgentError, AgentLoop, AgentProcessResult, EmittedMessageHandler}; +pub use agent_loop::{ + AgentError, AgentLoop, AgentProcessResult, EmittedMessageHandler, SkillEvent, SkillEventSink, +}; pub use context_compressor::ContextCompressor; diff --git a/src/gateway/agent_factory.rs b/src/gateway/agent_factory.rs index c82ac41..df031d5 100644 --- a/src/gateway/agent_factory.rs +++ b/src/gateway/agent_factory.rs @@ -6,6 +6,8 @@ use crate::skills::SkillRuntime; use crate::storage::{SessionStore, persistent_session_id}; use crate::tools::{ToolContext, ToolRegistry}; +use super::skill_event_sink::PersistentSkillEventSink; + #[derive(Clone)] pub(crate) struct AgentFactory { tools: Arc, @@ -42,8 +44,13 @@ impl AgentFactory { self.skills.clone(), ) .map(|agent| { + let skill_event_sink = Arc::new(PersistentSkillEventSink::new( + self.store.clone(), + session_id.clone(), + )); + agent - .with_skill_event_store(self.store.clone(), session_id.clone()) + .with_skill_event_sink(skill_event_sink) .with_tool_context(ToolContext { channel_name: Some(request.channel_name.to_string()), sender_id: request.sender_id.map(str::to_string), diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 1d55299..235cf66 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -18,6 +18,7 @@ pub mod session_factory; pub mod session_lifecycle; pub mod session_message_service; pub mod session_pool; +pub mod skill_event_sink; pub mod tool_registry_factory; pub mod ws; diff --git a/src/gateway/skill_event_sink.rs b/src/gateway/skill_event_sink.rs new file mode 100644 index 0000000..8899a0d --- /dev/null +++ b/src/gateway/skill_event_sink.rs @@ -0,0 +1,32 @@ +use std::sync::Arc; + +use crate::agent::{SkillEvent, SkillEventSink}; +use crate::storage::SessionStore; + +pub(crate) struct PersistentSkillEventSink { + store: Arc, + session_id: String, +} + +impl PersistentSkillEventSink { + pub(crate) fn new(store: Arc, session_id: String) -> Self { + Self { store, session_id } + } +} + +impl SkillEventSink for PersistentSkillEventSink { + fn record_skill_event(&self, event: SkillEvent) { + if let Err(err) = self.store.append_skill_event( + Some(&self.session_id), + &event.event_type, + event.skill_name.as_deref(), + &event.payload, + ) { + tracing::warn!( + error = %err, + event_type = %event.event_type, + "Failed to record skill event" + ); + } + } +}