From 90e44950cbf7d4db64de7c53cf5075adacaee7dd Mon Sep 17 00:00:00 2001 From: ooodc <549496103@qq.com> Date: Tue, 28 Apr 2026 15:31:56 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E9=87=8D=E6=9E=84=E6=8A=80=E8=83=BD?= =?UTF-8?q?=E4=BA=8B=E4=BB=B6=E5=A4=84=E7=90=86=E9=80=BB=E8=BE=91=EF=BC=8C?= =?UTF-8?q?=E7=A7=BB=E9=99=A4=20SkillEventSink=EF=BC=8C=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=20SkillActivateTool=20=E6=A8=A1=E5=9D=97=E4=BB=A5=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E6=8A=80=E8=83=BD=E6=BF=80=E6=B4=BB=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot --- src/agent/agent_loop.rs | 158 +-------------------------- src/agent/mod.rs | 3 +- src/gateway/agent_factory.rs | 40 ++----- src/gateway/mod.rs | 1 - src/gateway/session.rs | 4 +- src/gateway/skill_event_sink.rs | 32 ------ src/gateway/tool_registry_factory.rs | 8 +- src/skills/mod.rs | 69 ------------ src/tools/mod.rs | 2 + src/tools/skill_activate.rs | 151 +++++++++++++++++++++++++ 10 files changed, 174 insertions(+), 294 deletions(-) delete mode 100644 src/gateway/skill_event_sink.rs create mode 100644 src/tools/skill_activate.rs diff --git a/src/agent/agent_loop.rs b/src/agent/agent_loop.rs index d98894a..dc2ecfe 100644 --- a/src/agent/agent_loop.rs +++ b/src/agent/agent_loop.rs @@ -2,7 +2,6 @@ use crate::bus::ChatMessage; use crate::bus::message::ToolMessageState; use crate::config::LLMProviderConfig; use crate::domain::messages::{ContentBlock, ToolCall}; -use crate::domain::tools::Tool; use crate::observability::{ Observer, ObserverEvent, ToolExecutionOutcome, ToolExecutionState, truncate_args, }; @@ -297,7 +296,6 @@ pub struct AgentLoop { provider: Box, tools: Arc, skills: Arc, - skill_event_sink: Option>, tool_context: ToolContext, observer: Option>, emitted_message_handler: Option>, @@ -315,25 +313,8 @@ pub trait EmittedMessageHandler: Send + Sync + 'static { async fn handle(&self, message: ChatMessage); } -#[derive(Debug, Clone)] -pub struct SkillEvent { - pub event_type: String, - pub skill_name: Option, - pub payload: serde_json::Value, -} - -pub trait SkillEventSink: Send + Sync + 'static { - fn record_skill_event(&self, event: SkillEvent); -} - pub trait SkillProvider: Send + Sync + 'static { fn system_index_prompt(&self) -> Option; - - fn skill_tool_definition(&self) -> Option; - - fn activation_payload(&self, name: &str) -> Result; - - fn activation_event_payload(&self, name: &str) -> Result; } #[derive(Default)] @@ -343,18 +324,6 @@ impl SkillProvider for EmptySkillProvider { fn system_index_prompt(&self) -> Option { None } - - fn skill_tool_definition(&self) -> Option { - None - } - - fn activation_payload(&self, name: &str) -> Result { - Err(format!("skill '{}' not found", name)) - } - - fn activation_event_payload(&self, name: &str) -> Result { - Err(format!("skill '{}' not found", name)) - } } impl AgentLoop { @@ -368,7 +337,6 @@ impl AgentLoop { provider, tools: Arc::new(ToolRegistry::new()), skills: Arc::new(EmptySkillProvider), - skill_event_sink: None, tool_context: ToolContext::default(), observer: None, emitted_message_handler: None, @@ -389,7 +357,6 @@ impl AgentLoop { provider, tools, skills: Arc::new(EmptySkillProvider), - skill_event_sink: None, tool_context: ToolContext::default(), observer: None, emitted_message_handler: None, @@ -411,7 +378,6 @@ impl AgentLoop { provider, tools, skills, - skill_event_sink: None, tool_context: ToolContext::default(), observer: None, emitted_message_handler: None, @@ -419,11 +385,6 @@ impl AgentLoop { }) } - pub fn with_skill_event_sink(mut self, sink: Arc) -> Self { - self.skill_event_sink = Some(sink); - self - } - pub fn with_tool_context(mut self, context: ToolContext) -> Self { self.tool_context = context; self @@ -479,10 +440,7 @@ impl AgentLoop { messages_for_llm.extend(messages.iter().map(chat_message_to_llm_message)); // Build request - let mut tool_defs = self.tools.get_definitions(); - if let Some(skill_tool) = self.skills.skill_tool_definition() { - tool_defs.push(skill_tool); - } + let tool_defs = self.tools.get_definitions(); let tools = if tool_defs.is_empty() { None } else { @@ -818,46 +776,6 @@ impl AgentLoop { async fn execute_tool_internal(&self, tool_call: &ToolCall) -> ToolExecutionOutcome { let normalized_arguments = normalize_tool_arguments(&tool_call.arguments); - if tool_call.name == "skill_activate" { - let skill_name = match normalized_arguments.get("name").and_then(|v| v.as_str()) { - Some(name) if !name.trim().is_empty() => name, - _ => { - self.record_skill_event( - "activation_failed", - None, - serde_json::json!({ - "reason": "missing_name", - "arguments": normalized_arguments, - }), - ); - return ToolExecutionOutcome::failure( - "Error: Missing required parameter: name".to_string(), - Some("Missing required parameter: name".to_string()), - ); - } - }; - - return match self.skills.activation_payload(skill_name) { - Ok(output) => { - if let Ok(payload) = self.skills.activation_event_payload(skill_name) { - self.record_skill_event("activated", Some(skill_name), payload); - } - ToolExecutionOutcome::success(output) - } - Err(err) => { - self.record_skill_event( - "activation_failed", - Some(skill_name), - serde_json::json!({ - "reason": err, - "arguments": normalized_arguments, - }), - ); - ToolExecutionOutcome::failure(format!("Error: {}", err), Some(err)) - } - }; - } - let tool = match self.tools.get(&tool_call.name) { Some(t) => t, None => { @@ -906,23 +824,6 @@ impl AgentLoop { } } } - - fn record_skill_event( - &self, - event_type: &str, - skill_name: Option<&str>, - payload: serde_json::Value, - ) { - let Some(sink) = self.skill_event_sink.as_ref() else { - return; - }; - - sink.record_skill_event(SkillEvent { - event_type: event_type.to_string(), - skill_name: skill_name.map(str::to_string), - payload, - }); - } } #[cfg(test)] @@ -931,25 +832,6 @@ mod tests { use crate::observability::{MultiObserver, Observer}; use tempfile::tempdir; - fn test_provider_config() -> LLMProviderConfig { - LLMProviderConfig { - provider_type: "openai".to_string(), - name: "test".to_string(), - base_url: "http://localhost".to_string(), - api_key: "test-key".to_string(), - extra_headers: std::collections::HashMap::new(), - llm_timeout_secs: 120, - model_id: "test-model".to_string(), - temperature: Some(0.0), - max_tokens: Some(32), - context_window_tokens: None, - model_extra: std::collections::HashMap::new(), - max_tool_iterations: 1, - tool_result_max_chars: 20_000, - context_tool_result_trim_chars: 20_000, - } - } - struct TestObserver { events: std::sync::Mutex>, } @@ -962,24 +844,6 @@ mod tests { } } - struct TestSkillEventSink { - events: std::sync::Mutex>, - } - - impl TestSkillEventSink { - fn new() -> Self { - Self { - events: std::sync::Mutex::new(Vec::new()), - } - } - } - - impl SkillEventSink for TestSkillEventSink { - fn record_skill_event(&self, event: SkillEvent) { - self.events.lock().unwrap().push(event); - } - } - impl Observer for TestObserver { fn record_event(&self, event: &ObserverEvent) { self.events.lock().unwrap().push(event.clone()); @@ -1006,26 +870,6 @@ mod tests { assert_eq!(multi.len(), 1); } - #[test] - fn test_skill_events_are_emitted_to_sink() { - let sink = Arc::new(TestSkillEventSink::new()); - let agent = AgentLoop::new(test_provider_config()) - .unwrap() - .with_skill_event_sink(sink.clone()); - - agent.record_skill_event( - "activated", - Some("demo"), - serde_json::json!({ "source": "test" }), - ); - - let events = sink.events.lock().unwrap(); - assert_eq!(events.len(), 1); - assert_eq!(events[0].event_type, "activated"); - assert_eq!(events[0].skill_name.as_deref(), Some("demo")); - assert_eq!(events[0].payload, serde_json::json!({ "source": "test" })); - } - #[test] fn test_should_execute_in_parallel_single_tool() { // Would need a proper setup with AgentLoop to test fully diff --git a/src/agent/mod.rs b/src/agent/mod.rs index 90dd9ba..d4b57ac 100644 --- a/src/agent/mod.rs +++ b/src/agent/mod.rs @@ -2,7 +2,6 @@ pub mod agent_loop; pub mod context_compressor; pub use agent_loop::{ - AgentError, AgentLoop, AgentProcessResult, EmittedMessageHandler, SkillEvent, SkillEventSink, - SkillProvider, + AgentError, AgentLoop, AgentProcessResult, EmittedMessageHandler, SkillProvider, }; pub use context_compressor::ContextCompressor; diff --git a/src/gateway/agent_factory.rs b/src/gateway/agent_factory.rs index 69cd262..9fb26fc 100644 --- a/src/gateway/agent_factory.rs +++ b/src/gateway/agent_factory.rs @@ -2,16 +2,13 @@ use std::sync::Arc; use crate::agent::{AgentError, AgentLoop, SkillProvider}; use crate::config::LLMProviderConfig; -use crate::storage::{SessionStore, persistent_session_id}; +use crate::storage::persistent_session_id; use crate::tools::{ToolContext, ToolRegistry}; -use super::skill_event_sink::PersistentSkillEventSink; - #[derive(Clone)] pub(crate) struct AgentFactory { tools: Arc, skills: Arc, - store: Arc, } pub(crate) struct AgentBuildRequest<'a> { @@ -23,16 +20,8 @@ pub(crate) struct AgentBuildRequest<'a> { } impl AgentFactory { - pub(crate) fn new( - tools: Arc, - skills: Arc, - store: Arc, - ) -> Self { - Self { - tools, - skills, - store, - } + pub(crate) fn new(tools: Arc, skills: Arc) -> Self { + Self { tools, skills } } pub(crate) fn create(&self, request: AgentBuildRequest<'_>) -> Result { @@ -43,21 +32,14 @@ impl AgentFactory { self.skills.clone(), ) .map(|agent| { - let skill_event_sink = Arc::new(PersistentSkillEventSink::new( - self.store.clone(), - session_id.clone(), - )); - - agent - .with_skill_event_sink(skill_event_sink) - .with_tool_context(ToolContext { - channel_name: Some(request.channel_name.to_string()), - sender_id: request.sender_id.map(str::to_string), - chat_id: Some(request.chat_id.to_string()), - session_id: Some(session_id), - message_id: request.message_id.map(str::to_string), - message_seq: None, - }) + agent.with_tool_context(ToolContext { + channel_name: Some(request.channel_name.to_string()), + sender_id: request.sender_id.map(str::to_string), + chat_id: Some(request.chat_id.to_string()), + session_id: Some(session_id), + message_id: request.message_id.map(str::to_string), + message_seq: None, + }) }) } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 053b8c8..9cfa501 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -20,7 +20,6 @@ pub mod session_history; pub mod session_lifecycle; pub mod session_message_service; pub mod session_pool; -pub mod skill_event_sink; pub mod tool_registry_factory; pub mod ws; diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 7e04e6a..2bfc4e3 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -104,7 +104,7 @@ impl Session { store: Arc, agent_prompt_reinject_every: u64, ) -> Result { - let agent_factory = AgentFactory::new(tools, skills.clone(), store.clone()); + let agent_factory = AgentFactory::new(tools, skills.clone()); let prompt_injector = PromptInjector::new(store.clone(), agent_prompt_reinject_every); Self::with_factories( channel_name, @@ -370,7 +370,7 @@ impl SessionManager { ) .build(), ); - let agent_factory = AgentFactory::new(tools.clone(), skills.clone(), store.clone()); + let agent_factory = AgentFactory::new(tools.clone(), skills.clone()); let prompt_injector = PromptInjector::new(store.clone(), agent_prompt_reinject_every); let session_factory = SessionFactory::new( provider_config.clone(), diff --git a/src/gateway/skill_event_sink.rs b/src/gateway/skill_event_sink.rs deleted file mode 100644 index ecabd7a..0000000 --- a/src/gateway/skill_event_sink.rs +++ /dev/null @@ -1,32 +0,0 @@ -use std::sync::Arc; - -use crate::agent::{SkillEvent, SkillEventSink}; -use crate::storage::SkillEventRepository; - -pub(crate) struct PersistentSkillEventSink { - events: Arc, - session_id: String, -} - -impl PersistentSkillEventSink { - pub(crate) fn new(events: Arc, session_id: String) -> Self { - Self { events, session_id } - } -} - -impl SkillEventSink for PersistentSkillEventSink { - fn record_skill_event(&self, event: SkillEvent) { - if let Err(err) = self.events.append_skill_event( - Some(&self.session_id), - &event.event_type, - event.skill_name.as_deref(), - &event.payload, - ) { - tracing::warn!( - error = %err, - event_type = %event.event_type, - "Failed to record skill event" - ); - } - } -} diff --git a/src/gateway/tool_registry_factory.rs b/src/gateway/tool_registry_factory.rs index fd585af..035cf3d 100644 --- a/src/gateway/tool_registry_factory.rs +++ b/src/gateway/tool_registry_factory.rs @@ -5,8 +5,8 @@ use crate::skills::SkillRuntime; use crate::storage::SessionStore; use crate::tools::{ BashTool, CalculatorTool, FileEditTool, FileReadTool, FileWriteTool, HttpRequestTool, - MemoryManageTool, MemorySearchTool, SchedulerManageTool, SkillListTool, SkillManageTool, - TimeTool, ToolRegistry, WebFetchTool, + MemoryManageTool, MemorySearchTool, SchedulerManageTool, SkillActivateTool, SkillListTool, + SkillManageTool, TimeTool, ToolRegistry, WebFetchTool, }; pub(crate) struct ToolRegistryFactory { @@ -44,6 +44,10 @@ impl ToolRegistryFactory { self.store.clone(), self.known_agents.clone(), )); + registry.register(SkillActivateTool::new( + self.skills.clone(), + self.store.clone(), + )); registry.register(SkillListTool::new(self.skills.clone())); registry.register(SkillManageTool::new(self.skills.clone())); registry.register(BashTool::new()); diff --git a/src/skills/mod.rs b/src/skills/mod.rs index 93e06fa..9fa3873 100644 --- a/src/skills/mod.rs +++ b/src/skills/mod.rs @@ -6,7 +6,6 @@ use std::path::{Path, PathBuf}; use std::sync::RwLock; use crate::config::SkillsConfig; -use crate::domain::tools::{Tool, ToolFunction}; #[derive(Debug, Clone)] pub struct Skill { @@ -120,13 +119,6 @@ impl SkillRuntime { .offered_event_payload() } - pub fn skill_tool_definition(&self) -> Option { - self.catalog - .read() - .expect("skills rwlock poisoned") - .skill_tool_definition() - } - pub fn activation_payload(&self, name: &str) -> Result { self.catalog .read() @@ -234,18 +226,6 @@ impl crate::agent::SkillProvider for SkillRuntime { fn system_index_prompt(&self) -> Option { SkillRuntime::system_index_prompt(self) } - - fn skill_tool_definition(&self) -> Option { - SkillRuntime::skill_tool_definition(self) - } - - fn activation_payload(&self, name: &str) -> Result { - SkillRuntime::activation_payload(self, name) - } - - fn activation_event_payload(&self, name: &str) -> Result { - SkillRuntime::activation_event_payload(self, name) - } } impl SkillSource { @@ -362,30 +342,6 @@ impl SkillCatalog { self.catalog_event_payload() } - pub fn skill_tool_definition(&self) -> Option { - if self.skills.is_empty() { - return None; - } - - Some(Tool { - tool_type: "function".to_string(), - function: ToolFunction { - name: "skill_activate".to_string(), - description: "Load detailed instructions for a named skill discovered from SKILL.md files. Use when a task matches a listed skill description.".to_string(), - parameters: json!({ - "type": "object", - "properties": { - "name": { - "type": "string", - "description": "Skill name from the available skills list" - } - }, - "required": ["name"] - }), - }, - }) - } - pub fn activation_payload(&self, name: &str) -> Result { let skill = self .find_skill(name) @@ -697,31 +653,6 @@ mod tests { assert!(err.contains("description")); } - #[test] - fn test_skill_tool_definition_exists_when_skills_present() { - let dir = tempfile::tempdir().unwrap(); - let root = dir.path().join(".picobot").join("skills").join("demo"); - fs::create_dir_all(&root).unwrap(); - fs::write( - root.join("SKILL.md"), - "---\ndescription: demo skill\n---\nDo demo", - ) - .unwrap(); - - let skills = load_skills_from_root( - &dir.path().join(".picobot").join("skills"), - SkillSource::Project, - ); - let catalog = SkillCatalog { - skills, - max_index_chars: 4000, - max_listed_skills: 10, - }; - - let tool = catalog.skill_tool_definition().unwrap(); - assert_eq!(tool.function.name, "skill_activate"); - } - #[test] fn test_activation_payload_contains_body() { let dir = tempfile::tempdir().unwrap(); diff --git a/src/tools/mod.rs b/src/tools/mod.rs index 3678fc7..29487ca 100644 --- a/src/tools/mod.rs +++ b/src/tools/mod.rs @@ -9,6 +9,7 @@ pub mod memory_search; pub mod registry; pub mod scheduler_manage; pub mod schema; +pub mod skill_activate; pub mod skill_manage; pub mod time; pub mod traits; @@ -25,6 +26,7 @@ pub use memory_search::MemorySearchTool; pub use registry::ToolRegistry; pub use scheduler_manage::SchedulerManageTool; pub use schema::{CleaningStrategy, SchemaCleanr}; +pub use skill_activate::SkillActivateTool; pub use skill_manage::{SkillListTool, SkillManageTool}; pub use time::TimeTool; pub use traits::{Tool, ToolContext, ToolResult}; diff --git a/src/tools/skill_activate.rs b/src/tools/skill_activate.rs new file mode 100644 index 0000000..5de84aa --- /dev/null +++ b/src/tools/skill_activate.rs @@ -0,0 +1,151 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use serde_json::json; + +use crate::skills::SkillRuntime; +use crate::storage::SkillEventRepository; +use crate::tools::traits::{Tool, ToolContext, ToolResult}; + +pub struct SkillActivateTool { + skills: Arc, + events: Arc, +} + +impl SkillActivateTool { + pub fn new(skills: Arc, events: Arc) -> Self { + Self { skills, events } + } + + fn record_event( + &self, + context: &ToolContext, + event_type: &str, + skill_name: Option<&str>, + payload: &serde_json::Value, + ) { + if let Err(err) = self.events.append_skill_event( + context.session_id.as_deref(), + event_type, + skill_name, + payload, + ) { + tracing::warn!(error = %err, event_type, skill_name, "Failed to record skill activation event"); + } + } +} + +#[async_trait] +impl Tool for SkillActivateTool { + fn name(&self) -> &str { + "skill_activate" + } + + fn description(&self) -> &str { + "Load detailed instructions for a named skill discovered from SKILL.md files. Use when a task matches a listed skill description." + } + + fn parameters_schema(&self) -> serde_json::Value { + json!({ + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Skill name from the available skills list" + } + }, + "required": ["name"] + }) + } + + async fn execute(&self, args: serde_json::Value) -> anyhow::Result { + self.execute_with_context(&ToolContext::default(), args) + .await + } + + async fn execute_with_context( + &self, + context: &ToolContext, + args: serde_json::Value, + ) -> anyhow::Result { + let skill_name = match args.get("name").and_then(|value| value.as_str()) { + Some(name) if !name.trim().is_empty() => name, + _ => { + self.record_event( + context, + "activation_failed", + None, + &json!({ + "reason": "missing_name", + "arguments": args, + }), + ); + return Ok(error_result("Missing required parameter: name")); + } + }; + + match self.skills.activation_payload(skill_name) { + Ok(output) => { + if let Ok(payload) = self.skills.activation_event_payload(skill_name) { + self.record_event(context, "activated", Some(skill_name), &payload); + } + Ok(ToolResult { + success: true, + output, + error: None, + }) + } + Err(err) => { + self.record_event( + context, + "activation_failed", + Some(skill_name), + &json!({ + "reason": err, + "arguments": args, + }), + ); + Ok(error_result(&err)) + } + } + } +} + +fn error_result(message: &str) -> ToolResult { + ToolResult { + success: false, + output: String::new(), + error: Some(message.to_string()), + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage::SessionStore; + + #[tokio::test] + async fn test_skill_activate_records_failed_activation_event() { + let skills = Arc::new(SkillRuntime::default()); + let store = Arc::new(SessionStore::in_memory().unwrap()); + store.ensure_channel_session("feishu", "chat-1").unwrap(); + let tool = SkillActivateTool::new(skills, store.clone()); + let context = ToolContext { + session_id: Some("feishu:chat-1".to_string()), + ..ToolContext::default() + }; + + let result = tool + .execute_with_context(&context, json!({ "name": "demo" })) + .await + .unwrap(); + + assert!(!result.success); + assert!(result.error.unwrap().contains("not found")); + + let events = store.list_skill_events(Some("feishu:chat-1")).unwrap(); + assert_eq!(events.len(), 1); + assert_eq!(events[0].event_type, "activation_failed"); + assert_eq!(events[0].skill_name.as_deref(), Some("demo")); + } +}