feat: 添加持久化技能事件处理逻辑,重构技能事件记录机制

Co-authored-by: Copilot <copilot@github.com>
This commit is contained in:
ooodc 2026-04-28 14:49:31 +08:00
parent e5e2b37246
commit c547b88a12
5 changed files with 124 additions and 23 deletions

View File

@ -7,7 +7,6 @@ use crate::observability::{
}; };
use crate::providers::{ChatCompletionRequest, LLMProvider, Message, create_provider}; use crate::providers::{ChatCompletionRequest, LLMProvider, Message, create_provider};
use crate::skills::SkillRuntime; use crate::skills::SkillRuntime;
use crate::storage::SessionStore;
use crate::text::{char_count, take_prefix_chars, take_suffix_chars}; use crate::text::{char_count, take_prefix_chars, take_suffix_chars};
use crate::tools::{ToolContext, ToolRegistry}; use crate::tools::{ToolContext, ToolRegistry};
use async_trait::async_trait; use async_trait::async_trait;
@ -298,8 +297,7 @@ pub struct AgentLoop {
provider: Box<dyn LLMProvider>, provider: Box<dyn LLMProvider>,
tools: Arc<ToolRegistry>, tools: Arc<ToolRegistry>,
skills: Arc<SkillRuntime>, skills: Arc<SkillRuntime>,
skill_event_store: Option<Arc<SessionStore>>, skill_event_sink: Option<Arc<dyn SkillEventSink>>,
skill_event_session_id: Option<String>,
tool_context: ToolContext, tool_context: ToolContext,
observer: Option<Arc<dyn Observer>>, observer: Option<Arc<dyn Observer>>,
emitted_message_handler: Option<Arc<dyn EmittedMessageHandler>>, emitted_message_handler: Option<Arc<dyn EmittedMessageHandler>>,
@ -317,6 +315,17 @@ pub trait EmittedMessageHandler: Send + Sync + 'static {
async fn handle(&self, message: ChatMessage); async fn handle(&self, message: ChatMessage);
} }
#[derive(Debug, Clone)]
pub struct SkillEvent {
pub event_type: String,
pub skill_name: Option<String>,
pub payload: serde_json::Value,
}
pub trait SkillEventSink: Send + Sync + 'static {
fn record_skill_event(&self, event: SkillEvent);
}
impl AgentLoop { impl AgentLoop {
pub fn new(provider_config: LLMProviderConfig) -> Result<Self, AgentError> { pub fn new(provider_config: LLMProviderConfig) -> Result<Self, AgentError> {
let max_iterations = provider_config.max_tool_iterations; let max_iterations = provider_config.max_tool_iterations;
@ -328,8 +337,7 @@ impl AgentLoop {
provider, provider,
tools: Arc::new(ToolRegistry::new()), tools: Arc::new(ToolRegistry::new()),
skills: Arc::new(SkillRuntime::default()), skills: Arc::new(SkillRuntime::default()),
skill_event_store: None, skill_event_sink: None,
skill_event_session_id: None,
tool_context: ToolContext::default(), tool_context: ToolContext::default(),
observer: None, observer: None,
emitted_message_handler: None, emitted_message_handler: None,
@ -350,8 +358,7 @@ impl AgentLoop {
provider, provider,
tools, tools,
skills: Arc::new(SkillRuntime::default()), skills: Arc::new(SkillRuntime::default()),
skill_event_store: None, skill_event_sink: None,
skill_event_session_id: None,
tool_context: ToolContext::default(), tool_context: ToolContext::default(),
observer: None, observer: None,
emitted_message_handler: None, emitted_message_handler: None,
@ -373,8 +380,7 @@ impl AgentLoop {
provider, provider,
tools, tools,
skills, skills,
skill_event_store: None, skill_event_sink: None,
skill_event_session_id: None,
tool_context: ToolContext::default(), tool_context: ToolContext::default(),
observer: None, observer: None,
emitted_message_handler: None, emitted_message_handler: None,
@ -382,9 +388,8 @@ impl AgentLoop {
}) })
} }
pub fn with_skill_event_store(mut self, store: Arc<SessionStore>, session_id: String) -> Self { pub fn with_skill_event_sink(mut self, sink: Arc<dyn SkillEventSink>) -> Self {
self.skill_event_store = Some(store); self.skill_event_sink = Some(sink);
self.skill_event_session_id = Some(session_id);
self self
} }
@ -877,18 +882,15 @@ impl AgentLoop {
skill_name: Option<&str>, skill_name: Option<&str>,
payload: serde_json::Value, payload: serde_json::Value,
) { ) {
let (Some(store), Some(session_id)) = ( let Some(sink) = self.skill_event_sink.as_ref() else {
self.skill_event_store.as_ref(),
self.skill_event_session_id.as_ref(),
) else {
return; return;
}; };
if let Err(err) = sink.record_skill_event(SkillEvent {
store.append_skill_event(Some(session_id), event_type, skill_name, &payload) event_type: event_type.to_string(),
{ skill_name: skill_name.map(str::to_string),
tracing::warn!(error = %err, event_type = %event_type, "Failed to record skill event"); payload,
} });
} }
} }
@ -898,6 +900,25 @@ mod tests {
use crate::observability::{MultiObserver, Observer}; use crate::observability::{MultiObserver, Observer};
use tempfile::tempdir; use tempfile::tempdir;
fn test_provider_config() -> LLMProviderConfig {
LLMProviderConfig {
provider_type: "openai".to_string(),
name: "test".to_string(),
base_url: "http://localhost".to_string(),
api_key: "test-key".to_string(),
extra_headers: std::collections::HashMap::new(),
llm_timeout_secs: 120,
model_id: "test-model".to_string(),
temperature: Some(0.0),
max_tokens: Some(32),
context_window_tokens: None,
model_extra: std::collections::HashMap::new(),
max_tool_iterations: 1,
tool_result_max_chars: 20_000,
context_tool_result_trim_chars: 20_000,
}
}
struct TestObserver { struct TestObserver {
events: std::sync::Mutex<Vec<ObserverEvent>>, events: std::sync::Mutex<Vec<ObserverEvent>>,
} }
@ -910,6 +931,24 @@ mod tests {
} }
} }
struct TestSkillEventSink {
events: std::sync::Mutex<Vec<SkillEvent>>,
}
impl TestSkillEventSink {
fn new() -> Self {
Self {
events: std::sync::Mutex::new(Vec::new()),
}
}
}
impl SkillEventSink for TestSkillEventSink {
fn record_skill_event(&self, event: SkillEvent) {
self.events.lock().unwrap().push(event);
}
}
impl Observer for TestObserver { impl Observer for TestObserver {
fn record_event(&self, event: &ObserverEvent) { fn record_event(&self, event: &ObserverEvent) {
self.events.lock().unwrap().push(event.clone()); self.events.lock().unwrap().push(event.clone());
@ -936,6 +975,26 @@ mod tests {
assert_eq!(multi.len(), 1); assert_eq!(multi.len(), 1);
} }
#[test]
fn test_skill_events_are_emitted_to_sink() {
let sink = Arc::new(TestSkillEventSink::new());
let agent = AgentLoop::new(test_provider_config())
.unwrap()
.with_skill_event_sink(sink.clone());
agent.record_skill_event(
"activated",
Some("demo"),
serde_json::json!({ "source": "test" }),
);
let events = sink.events.lock().unwrap();
assert_eq!(events.len(), 1);
assert_eq!(events[0].event_type, "activated");
assert_eq!(events[0].skill_name.as_deref(), Some("demo"));
assert_eq!(events[0].payload, serde_json::json!({ "source": "test" }));
}
#[test] #[test]
fn test_should_execute_in_parallel_single_tool() { fn test_should_execute_in_parallel_single_tool() {
// Would need a proper setup with AgentLoop to test fully // Would need a proper setup with AgentLoop to test fully

View File

@ -1,5 +1,7 @@
pub mod agent_loop; pub mod agent_loop;
pub mod context_compressor; pub mod context_compressor;
pub use agent_loop::{AgentError, AgentLoop, AgentProcessResult, EmittedMessageHandler}; pub use agent_loop::{
AgentError, AgentLoop, AgentProcessResult, EmittedMessageHandler, SkillEvent, SkillEventSink,
};
pub use context_compressor::ContextCompressor; pub use context_compressor::ContextCompressor;

View File

@ -6,6 +6,8 @@ use crate::skills::SkillRuntime;
use crate::storage::{SessionStore, persistent_session_id}; use crate::storage::{SessionStore, persistent_session_id};
use crate::tools::{ToolContext, ToolRegistry}; use crate::tools::{ToolContext, ToolRegistry};
use super::skill_event_sink::PersistentSkillEventSink;
#[derive(Clone)] #[derive(Clone)]
pub(crate) struct AgentFactory { pub(crate) struct AgentFactory {
tools: Arc<ToolRegistry>, tools: Arc<ToolRegistry>,
@ -42,8 +44,13 @@ impl AgentFactory {
self.skills.clone(), self.skills.clone(),
) )
.map(|agent| { .map(|agent| {
let skill_event_sink = Arc::new(PersistentSkillEventSink::new(
self.store.clone(),
session_id.clone(),
));
agent agent
.with_skill_event_store(self.store.clone(), session_id.clone()) .with_skill_event_sink(skill_event_sink)
.with_tool_context(ToolContext { .with_tool_context(ToolContext {
channel_name: Some(request.channel_name.to_string()), channel_name: Some(request.channel_name.to_string()),
sender_id: request.sender_id.map(str::to_string), sender_id: request.sender_id.map(str::to_string),

View File

@ -18,6 +18,7 @@ pub mod session_factory;
pub mod session_lifecycle; pub mod session_lifecycle;
pub mod session_message_service; pub mod session_message_service;
pub mod session_pool; pub mod session_pool;
pub mod skill_event_sink;
pub mod tool_registry_factory; pub mod tool_registry_factory;
pub mod ws; pub mod ws;

View File

@ -0,0 +1,32 @@
use std::sync::Arc;
use crate::agent::{SkillEvent, SkillEventSink};
use crate::storage::SessionStore;
pub(crate) struct PersistentSkillEventSink {
store: Arc<SessionStore>,
session_id: String,
}
impl PersistentSkillEventSink {
pub(crate) fn new(store: Arc<SessionStore>, session_id: String) -> Self {
Self { store, session_id }
}
}
impl SkillEventSink for PersistentSkillEventSink {
fn record_skill_event(&self, event: SkillEvent) {
if let Err(err) = self.store.append_skill_event(
Some(&self.session_id),
&event.event_type,
event.skill_name.as_deref(),
&event.payload,
) {
tracing::warn!(
error = %err,
event_type = %event.event_type,
"Failed to record skill event"
);
}
}
}