diff --git a/README.md b/README.md index 832344c..47c19f7 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ PicoBot 是一个用 Rust 构建的多通道 Agent 网关。它把消息接入 - 超长上下文压缩与历史摘要 - 持久化 Agent 配置文件注入与周期性重注入 - 会话管理支持按通道查询和切换 +- MCP (Model Context Protocol) 集成,支持 Stdio 和 HTTP 两种传输方式 ## 1. 项目定位 @@ -531,6 +532,68 @@ PicoBot 的 Agent 是围绕工具调用构建的。当前默认注册的工具 - bash / shell / http_request / web_fetch 让 Agent 具备更强的外部交互能力(bash 和 shell 是同一工具在不同平台的名称) - task 允许 Agent 创建独立上下文的子代理来处理复杂多步骤任务,支持 general 和 explore 两种类型 +### 8.1 MCP 工具集成 + +PicoBot 支持通过 MCP (Model Context Protocol) 扩展工具能力,可以连接外部 MCP servers 并自动发现其提供的工具。 + +**支持的 Transport 类型:** + +| Transport | 说明 | 适用场景 | +|-----------|------|----------| +| **Stdio** | 启动子进程,通过 stdin/stdout 通信 | 本地 MCP servers(如 npm 包) | +| **HTTP** | 通过 HTTP/SSE 连接远程服务器 | 远程 MCP servers、云服务 | + +**配置示例:** + +```json +{ + "mcp": { + "enabled": true, + "servers": [ + { + "name": "filesystem", + "enabled": true, + "description": "本地文件系统操作", + "transport": { + "type": "stdio", + "command": "npx", + "args": ["-y", "@modelcontextprotocol/server-filesystem", "/home/user"], + "env": {} + } + }, + { + "name": "remote-tools", + "enabled": true, + "description": "远程 MCP server", + "transport": { + "type": "http", + "url": "http://api.example.com/mcp", + "headers": { + "Authorization": "Bearer your-token" + } + } + } + ] + } +} +``` + +**工具命名规则:** + +MCP 工具会自动注册到 ToolRegistry,命名格式为 `mcp_{server_name}_{tool_name}`: + +- `mcp_filesystem_read_file` +- `mcp_filesystem_write_file` +- `mcp_filesystem_list_directory` +- `mcp_remote-tools_custom_query` + +**架构特点:** + +- MCP 模块完全解耦,默认禁用时不影响原有系统 +- 异步初始化,不阻塞 Gateway 启动 +- 通过 Tool trait 适配器接入,无需修改核心代码 +- 连接失败不影响 Gateway 运行 + ## 9. 调度器机制 PicoBot 带有一个基于 SQLite 的调度器,而不是纯内存或 JSON 文件驱动的任务系统。 @@ -861,6 +924,7 @@ PicoBot/ │ ├── domain/ # 领域模型(消息、工具定义) │ ├── gateway/ # Gateway、Session 编排、WS/HTTP 控制面、执行服务 │ ├── logging/ # 日志配置 +│ ├── mcp/ # MCP 集成(客户端管理、工具适配器、配置) │ ├── observability/ # 可观测性支持 │ ├── platform/ # 平台抽象 │ ├── providers/ # OpenAI / Anthropic Provider @@ -896,7 +960,7 @@ PicoBot 当前已经具备一个可长期运行 Agent 系统的关键组件: - 有入口:Gateway + Channel - 有状态:SQLite + Session 恢复 -- 有能力:工具调用 + 技能系统 +- 有能力:工具调用 + 技能系统 + MCP 扩展 - 有记忆:长期记忆 + 自动维护摘要 - 有计划:Scheduler + agent_task @@ -906,3 +970,4 @@ PicoBot 当前已经具备一个可长期运行 Agent 系统的关键组件: - 增强多模态能力 - 完善记忆检索排序与冲突消解 - 为不同 Agent 提供更清晰的配置和权限隔离 +- 扩展 MCP servers 支持(更多 npm 包、云服务集成) diff --git a/src/gateway/runtime.rs b/src/gateway/runtime.rs index 1f7d51a..800997c 100644 --- a/src/gateway/runtime.rs +++ b/src/gateway/runtime.rs @@ -1,10 +1,12 @@ +//! Gateway Runtime - builds SessionManager with decoupled MCP integration + use std::collections::{HashMap, HashSet}; use std::sync::Arc; use crate::agent::AgentError; use crate::config::{LLMProviderConfig, MemoryMaintenanceConfig, TaskConfig}; use crate::gateway::tool_registry_factory::ToolRegistryFactory; -use crate::mcp::{McpClientManager, McpConfig}; +use crate::mcp::McpInitializer; use crate::skills::SkillRuntime; use crate::storage::{ ConversationRepository, MemoryRepository, PromptInjectionRepository, SchedulerJobRepository, @@ -26,6 +28,9 @@ use super::session_factory::SessionFactory; use super::session_lifecycle::SessionLifecycleService; use super::session_message_service::SessionMessageService; +/// Build SessionManager with optional MCP integration +/// +/// MCP is initialized asynchronously in background, not blocking gateway startup. pub(crate) fn build_session_manager( agent_prompt_reinject_every: u64, show_tool_results: bool, @@ -37,7 +42,7 @@ pub(crate) fn build_session_manager( task_config: TaskConfig, maintenance_config: MemoryMaintenanceConfig, session_ttl_hours: Option, - mcp_config: McpConfig, + mcp_config: crate::mcp::McpConfig, ) -> Result<(SessionManager, Arc), AgentError> { build_session_manager_with_sender( agent_prompt_reinject_every, @@ -55,6 +60,7 @@ pub(crate) fn build_session_manager( ) } +/// Build SessionManager with custom session message sender pub(crate) fn build_session_manager_with_sender( agent_prompt_reinject_every: u64, show_tool_results: bool, @@ -67,7 +73,7 @@ pub(crate) fn build_session_manager_with_sender( task_config: TaskConfig, maintenance_config: MemoryMaintenanceConfig, session_ttl_hours: Option, - mcp_config: McpConfig, + mcp_config: crate::mcp::McpConfig, ) -> Result<(SessionManager, Arc), AgentError> { let store = Arc::new( SessionStore::new() @@ -91,7 +97,7 @@ pub(crate) fn build_session_manager_with_sender( let skill_events: Arc = store.clone(); let conversations: Arc = store.clone(); - // 创建 ToolRegistryFactory + // Create ToolRegistryFactory let factory = ToolRegistryFactory::new( skills.clone(), memories, @@ -104,37 +110,18 @@ pub(crate) fn build_session_manager_with_sender( task_config.clone(), ); - // 创建 MCP Client Manager(如果启用) - let mcp_manager = if mcp_config.has_enabled_servers() { - let manager = Arc::new(McpClientManager::new()); + // Create MCP Initializer (async, non-blocking) + // MCP servers connect in background task + let mcp_initializer = McpInitializer::with_config(mcp_config); - // 在 tokio runtime 中连接 MCP servers - // 使用 block_in_place 允许在同步上下文中执行异步代码 - let servers = mcp_config.enabled_servers(); - let servers_clone: Vec<_> = servers.into_iter().cloned().collect(); - - tokio::task::block_in_place(|| { - tokio::runtime::Handle::current().block_on(async { - tracing::info!("Connecting to MCP servers..."); - if let Err(e) = manager.connect_all(&servers_clone).await { - tracing::error!(error = %e, "Failed to connect to some MCP servers"); - } - }) - }); - - Some(manager) - } else { - None - }; - - // 将 MCP manager 添加到 factory - let factory = if let Some(ref manager) = mcp_manager { - factory.with_mcp_manager(manager.clone()) + // Add MCP manager to factory (if enabled) + let factory = if let Some(manager) = mcp_initializer.manager() { + factory.with_mcp_manager(manager) } else { factory }; - // 创建 SubAgentRuntime(如果 task 工具启用) + // Create SubAgentRuntime (if task tool is enabled) let (factory, task_repository): (_, Arc) = if task_config.enabled { let task_repository = Arc::new(InMemoryTaskRepository::new()); let subagent_tools = Arc::new(factory.build_subagent_tools()); @@ -158,17 +145,18 @@ pub(crate) fn build_session_manager_with_sender( (factory.with_subagent_runtime(subagent_runtime), task_repository) } else { - // 如果 task 工具未启用,创建一个空的内存仓库 (factory, Arc::new(InMemoryTaskRepository::new())) }; + // Build base tools let mut tools = factory.build(); - // 注册 MCP tools(如果有 MCP manager) - if let Some(manager) = &mcp_manager { + // Register MCP tools (async) + // This waits briefly for connections, then registers available tools + if mcp_initializer.is_enabled() { tokio::task::block_in_place(|| { tokio::runtime::Handle::current().block_on(async { - if let Err(e) = crate::mcp::register_mcp_tools(manager.clone(), &mut tools).await { + if let Err(e) = mcp_initializer.register_tools(&mut tools).await { tracing::error!(error = %e, "Failed to register MCP tools"); } }) @@ -215,4 +203,4 @@ pub(crate) fn build_session_manager_with_sender( memory_maintenance, task_repository: task_repository.clone(), }), task_repository)) -} +} \ No newline at end of file diff --git a/src/mcp/client.rs b/src/mcp/client.rs index 72a79e3..83686c9 100644 --- a/src/mcp/client.rs +++ b/src/mcp/client.rs @@ -1,4 +1,10 @@ //! MCP Client Manager - manages connections to MCP servers +//! +//! This module provides a decoupled MCP integration that: +//! - Doesn't block gateway startup +//! - Is completely optional (disabled by default) +//! - Connects to MCP servers asynchronously +//! - Dynamically registers MCP tools via the Tool trait adapter use std::collections::HashMap; use std::sync::Arc; @@ -31,6 +37,12 @@ pub struct McpServerInfo { } /// Manager for MCP client connections +/// +/// This manager handles: +/// - Connecting to MCP servers (stdio and HTTP transports) +/// - Discovering available tools +/// - Calling tools on connected servers +/// - Connection lifecycle management pub struct McpClientManager { /// Connected clients keyed by server name clients: RwLock>>, @@ -39,7 +51,7 @@ pub struct McpClientManager { } impl McpClientManager { - /// Create a new manager + /// Create a new manager (no connections yet) pub fn new() -> Self { Self { clients: RwLock::new(HashMap::new()), @@ -47,7 +59,10 @@ impl McpClientManager { } } - /// Connect to all configured servers + /// Connect to all configured servers (async, non-blocking) + /// + /// This method is designed to be called asynchronously without + /// blocking the main gateway startup flow. pub async fn connect_all(&self, servers: &[McpServerConfig]) -> anyhow::Result<()> { for server in servers { if !server.enabled { @@ -55,15 +70,17 @@ impl McpClientManager { continue; } + // Each server connection is independent match self.connect_server(server).await { Ok(info) => { tracing::info!( - name = %server.name, + name = %info.name, tools_count = info.tools.len(), "Connected to MCP server" ); } Err(e) => { + // Log error but continue with other servers tracing::error!( name = %server.name, error = %e, @@ -77,7 +94,7 @@ impl McpClientManager { /// Connect to a single MCP server pub async fn connect_server(&self, config: &McpServerConfig) -> anyhow::Result { - tracing::info!(name = %config.name, "Connecting to MCP server"); + tracing::info!(name = %config.name, transport = ?config.transport, "Connecting to MCP server"); let client = match &config.transport { McpTransportConfig::Stdio { command, args, env } => { @@ -221,7 +238,7 @@ impl McpClientManager { let arguments = if args.is_object() { args.as_object().unwrap().clone() } else { - // If not an object, wrap it or use empty object + // If not an object, use empty object serde_json::Map::new() }; @@ -253,10 +270,100 @@ impl McpClientManager { self.server_info.write().await.clear(); Ok(()) } + + /// Check if any servers are connected + pub async fn has_connections(&self) -> bool { + !self.clients.read().await.is_empty() + } } impl Default for McpClientManager { fn default() -> Self { Self::new() } +} + +/// MCP Initializer - handles asynchronous MCP initialization +/// +/// This struct provides a decoupled way to initialize MCP: +/// - Doesn't block gateway startup +/// - Can be initialized in a background task +/// - Tools are registered after connection is established +pub struct McpInitializer { + /// The MCP client manager (None if MCP is disabled) + manager: Option>, + /// Connection task handle (for background initialization) + connection_task: Option>>, +} + +impl McpInitializer { + /// Create a disabled initializer (MCP not configured) + pub fn disabled() -> Self { + Self { + manager: None, + connection_task: None, + } + } + + /// Create an initializer with MCP configuration + /// + /// This spawns a background task to connect to MCP servers, + /// allowing the gateway to start immediately. + pub fn with_config(config: crate::mcp::McpConfig) -> Self { + if !config.has_enabled_servers() { + return Self::disabled(); + } + + let manager = Arc::new(McpClientManager::new()); + let servers: Vec<_> = config.enabled_servers().into_iter().cloned().collect(); + + // Spawn background connection task + let manager_clone = manager.clone(); + let connection_task = tokio::spawn(async move { + tracing::info!("Starting MCP connection task..."); + manager_clone.connect_all(&servers).await + }); + + Self { + manager: Some(manager), + connection_task: Some(connection_task), + } + } + + /// Get the manager (if MCP is enabled) + pub fn manager(&self) -> Option> { + self.manager.clone() + } + + /// Check if MCP is enabled + pub fn is_enabled(&self) -> bool { + self.manager.is_some() + } + + /// Wait for connections to complete (optional) + /// + /// This can be called if you want to ensure MCP servers are connected + /// before proceeding, but it's not required. + pub async fn wait_for_connections(&mut self) -> anyhow::Result<()> { + if let Some(task) = self.connection_task.take() { + // Handle JoinError and inner Result + task.await??; + } + Ok(()) + } + + /// Register MCP tools to the tool registry + /// + /// This should be called after the gateway is ready to accept tools. + /// The method handles the case where connections are still in progress. + pub async fn register_tools(&self, registry: &mut crate::tools::ToolRegistry) -> anyhow::Result<()> { + if let Some(manager) = &self.manager { + // Give a small grace period for connections if still in progress + // This allows tools to be registered even if connection task is running + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + + crate::mcp::register_mcp_tools(manager.clone(), registry).await?; + } + Ok(()) + } } \ No newline at end of file diff --git a/src/mcp/mod.rs b/src/mcp/mod.rs index e1141ea..d3d9215 100644 --- a/src/mcp/mod.rs +++ b/src/mcp/mod.rs @@ -2,11 +2,19 @@ //! //! This module provides MCP client functionality to connect to external MCP servers //! and expose their tools through PicoBot's Tool system. +//! +//! ## Architecture (Decoupled) +//! +//! - `McpInitializer`: Handles async initialization without blocking gateway startup +//! - `McpClientManager`: Manages connections to MCP servers +//! - `McpToolWrapper`: Adapts MCP tools to PicoBot's Tool trait +//! +//! MCP is completely optional and disabled by default. pub mod config; pub mod client; pub mod tool_adapter; pub use config::{McpConfig, McpServerConfig, McpTransportConfig}; -pub use client::{McpClientManager, McpClient, McpServerInfo}; +pub use client::{McpClientManager, McpClient, McpServerInfo, McpInitializer}; pub use tool_adapter::{McpToolWrapper, register_mcp_tools}; \ No newline at end of file