feat: 集成 MCP (Model Context Protocol),支持 Stdio 和 HTTP 传输方式,优化工具注册和连接管理

This commit is contained in:
ooodc 2026-05-23 23:22:36 +08:00
parent b3fa0bb978
commit ef7e899584
4 changed files with 210 additions and 42 deletions

View File

@ -13,6 +13,7 @@ PicoBot 是一个用 Rust 构建的多通道 Agent 网关。它把消息接入
- 超长上下文压缩与历史摘要 - 超长上下文压缩与历史摘要
- 持久化 Agent 配置文件注入与周期性重注入 - 持久化 Agent 配置文件注入与周期性重注入
- 会话管理支持按通道查询和切换 - 会话管理支持按通道查询和切换
- MCP (Model Context Protocol) 集成,支持 Stdio 和 HTTP 两种传输方式
## 1. 项目定位 ## 1. 项目定位
@ -531,6 +532,68 @@ PicoBot 的 Agent 是围绕工具调用构建的。当前默认注册的工具
- bash / shell / http_request / web_fetch 让 Agent 具备更强的外部交互能力bash 和 shell 是同一工具在不同平台的名称) - bash / shell / http_request / web_fetch 让 Agent 具备更强的外部交互能力bash 和 shell 是同一工具在不同平台的名称)
- task 允许 Agent 创建独立上下文的子代理来处理复杂多步骤任务,支持 general 和 explore 两种类型 - task 允许 Agent 创建独立上下文的子代理来处理复杂多步骤任务,支持 general 和 explore 两种类型
### 8.1 MCP 工具集成
PicoBot 支持通过 MCP (Model Context Protocol) 扩展工具能力,可以连接外部 MCP servers 并自动发现其提供的工具。
**支持的 Transport 类型:**
| Transport | 说明 | 适用场景 |
|-----------|------|----------|
| **Stdio** | 启动子进程,通过 stdin/stdout 通信 | 本地 MCP servers如 npm 包) |
| **HTTP** | 通过 HTTP/SSE 连接远程服务器 | 远程 MCP servers、云服务 |
**配置示例:**
```json
{
"mcp": {
"enabled": true,
"servers": [
{
"name": "filesystem",
"enabled": true,
"description": "本地文件系统操作",
"transport": {
"type": "stdio",
"command": "npx",
"args": ["-y", "@modelcontextprotocol/server-filesystem", "/home/user"],
"env": {}
}
},
{
"name": "remote-tools",
"enabled": true,
"description": "远程 MCP server",
"transport": {
"type": "http",
"url": "http://api.example.com/mcp",
"headers": {
"Authorization": "Bearer your-token"
}
}
}
]
}
}
```
**工具命名规则:**
MCP 工具会自动注册到 ToolRegistry命名格式为 `mcp_{server_name}_{tool_name}`
- `mcp_filesystem_read_file`
- `mcp_filesystem_write_file`
- `mcp_filesystem_list_directory`
- `mcp_remote-tools_custom_query`
**架构特点:**
- MCP 模块完全解耦,默认禁用时不影响原有系统
- 异步初始化,不阻塞 Gateway 启动
- 通过 Tool trait 适配器接入,无需修改核心代码
- 连接失败不影响 Gateway 运行
## 9. 调度器机制 ## 9. 调度器机制
PicoBot 带有一个基于 SQLite 的调度器,而不是纯内存或 JSON 文件驱动的任务系统。 PicoBot 带有一个基于 SQLite 的调度器,而不是纯内存或 JSON 文件驱动的任务系统。
@ -861,6 +924,7 @@ PicoBot/
│ ├── domain/ # 领域模型(消息、工具定义) │ ├── domain/ # 领域模型(消息、工具定义)
│ ├── gateway/ # Gateway、Session 编排、WS/HTTP 控制面、执行服务 │ ├── gateway/ # Gateway、Session 编排、WS/HTTP 控制面、执行服务
│ ├── logging/ # 日志配置 │ ├── logging/ # 日志配置
│ ├── mcp/ # MCP 集成(客户端管理、工具适配器、配置)
│ ├── observability/ # 可观测性支持 │ ├── observability/ # 可观测性支持
│ ├── platform/ # 平台抽象 │ ├── platform/ # 平台抽象
│ ├── providers/ # OpenAI / Anthropic Provider │ ├── providers/ # OpenAI / Anthropic Provider
@ -896,7 +960,7 @@ PicoBot 当前已经具备一个可长期运行 Agent 系统的关键组件:
- 有入口Gateway + Channel - 有入口Gateway + Channel
- 有状态SQLite + Session 恢复 - 有状态SQLite + Session 恢复
- 有能力:工具调用 + 技能系统 - 有能力:工具调用 + 技能系统 + MCP 扩展
- 有记忆:长期记忆 + 自动维护摘要 - 有记忆:长期记忆 + 自动维护摘要
- 有计划Scheduler + agent_task - 有计划Scheduler + agent_task
@ -906,3 +970,4 @@ PicoBot 当前已经具备一个可长期运行 Agent 系统的关键组件:
- 增强多模态能力 - 增强多模态能力
- 完善记忆检索排序与冲突消解 - 完善记忆检索排序与冲突消解
- 为不同 Agent 提供更清晰的配置和权限隔离 - 为不同 Agent 提供更清晰的配置和权限隔离
- 扩展 MCP servers 支持(更多 npm 包、云服务集成)

View File

@ -1,10 +1,12 @@
//! Gateway Runtime - builds SessionManager with decoupled MCP integration
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::sync::Arc; use std::sync::Arc;
use crate::agent::AgentError; use crate::agent::AgentError;
use crate::config::{LLMProviderConfig, MemoryMaintenanceConfig, TaskConfig}; use crate::config::{LLMProviderConfig, MemoryMaintenanceConfig, TaskConfig};
use crate::gateway::tool_registry_factory::ToolRegistryFactory; use crate::gateway::tool_registry_factory::ToolRegistryFactory;
use crate::mcp::{McpClientManager, McpConfig}; use crate::mcp::McpInitializer;
use crate::skills::SkillRuntime; use crate::skills::SkillRuntime;
use crate::storage::{ use crate::storage::{
ConversationRepository, MemoryRepository, PromptInjectionRepository, SchedulerJobRepository, ConversationRepository, MemoryRepository, PromptInjectionRepository, SchedulerJobRepository,
@ -26,6 +28,9 @@ use super::session_factory::SessionFactory;
use super::session_lifecycle::SessionLifecycleService; use super::session_lifecycle::SessionLifecycleService;
use super::session_message_service::SessionMessageService; use super::session_message_service::SessionMessageService;
/// Build SessionManager with optional MCP integration
///
/// MCP is initialized asynchronously in background, not blocking gateway startup.
pub(crate) fn build_session_manager( pub(crate) fn build_session_manager(
agent_prompt_reinject_every: u64, agent_prompt_reinject_every: u64,
show_tool_results: bool, show_tool_results: bool,
@ -37,7 +42,7 @@ pub(crate) fn build_session_manager(
task_config: TaskConfig, task_config: TaskConfig,
maintenance_config: MemoryMaintenanceConfig, maintenance_config: MemoryMaintenanceConfig,
session_ttl_hours: Option<u64>, session_ttl_hours: Option<u64>,
mcp_config: McpConfig, mcp_config: crate::mcp::McpConfig,
) -> Result<(SessionManager, Arc<dyn TaskRepository>), AgentError> { ) -> Result<(SessionManager, Arc<dyn TaskRepository>), AgentError> {
build_session_manager_with_sender( build_session_manager_with_sender(
agent_prompt_reinject_every, agent_prompt_reinject_every,
@ -55,6 +60,7 @@ pub(crate) fn build_session_manager(
) )
} }
/// Build SessionManager with custom session message sender
pub(crate) fn build_session_manager_with_sender( pub(crate) fn build_session_manager_with_sender(
agent_prompt_reinject_every: u64, agent_prompt_reinject_every: u64,
show_tool_results: bool, show_tool_results: bool,
@ -67,7 +73,7 @@ pub(crate) fn build_session_manager_with_sender(
task_config: TaskConfig, task_config: TaskConfig,
maintenance_config: MemoryMaintenanceConfig, maintenance_config: MemoryMaintenanceConfig,
session_ttl_hours: Option<u64>, session_ttl_hours: Option<u64>,
mcp_config: McpConfig, mcp_config: crate::mcp::McpConfig,
) -> Result<(SessionManager, Arc<dyn TaskRepository>), AgentError> { ) -> Result<(SessionManager, Arc<dyn TaskRepository>), AgentError> {
let store = Arc::new( let store = Arc::new(
SessionStore::new() SessionStore::new()
@ -91,7 +97,7 @@ pub(crate) fn build_session_manager_with_sender(
let skill_events: Arc<dyn SkillEventRepository> = store.clone(); let skill_events: Arc<dyn SkillEventRepository> = store.clone();
let conversations: Arc<dyn ConversationRepository> = store.clone(); let conversations: Arc<dyn ConversationRepository> = store.clone();
// 创建 ToolRegistryFactory // Create ToolRegistryFactory
let factory = ToolRegistryFactory::new( let factory = ToolRegistryFactory::new(
skills.clone(), skills.clone(),
memories, memories,
@ -104,37 +110,18 @@ pub(crate) fn build_session_manager_with_sender(
task_config.clone(), task_config.clone(),
); );
// 创建 MCP Client Manager如果启用 // Create MCP Initializer (async, non-blocking)
let mcp_manager = if mcp_config.has_enabled_servers() { // MCP servers connect in background task
let manager = Arc::new(McpClientManager::new()); let mcp_initializer = McpInitializer::with_config(mcp_config);
// 在 tokio runtime 中连接 MCP servers // Add MCP manager to factory (if enabled)
// 使用 block_in_place 允许在同步上下文中执行异步代码 let factory = if let Some(manager) = mcp_initializer.manager() {
let servers = mcp_config.enabled_servers(); factory.with_mcp_manager(manager)
let servers_clone: Vec<_> = servers.into_iter().cloned().collect();
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
tracing::info!("Connecting to MCP servers...");
if let Err(e) = manager.connect_all(&servers_clone).await {
tracing::error!(error = %e, "Failed to connect to some MCP servers");
}
})
});
Some(manager)
} else {
None
};
// 将 MCP manager 添加到 factory
let factory = if let Some(ref manager) = mcp_manager {
factory.with_mcp_manager(manager.clone())
} else { } else {
factory factory
}; };
// 创建 SubAgentRuntime如果 task 工具启用) // Create SubAgentRuntime (if task tool is enabled)
let (factory, task_repository): (_, Arc<dyn TaskRepository>) = if task_config.enabled { let (factory, task_repository): (_, Arc<dyn TaskRepository>) = if task_config.enabled {
let task_repository = Arc::new(InMemoryTaskRepository::new()); let task_repository = Arc::new(InMemoryTaskRepository::new());
let subagent_tools = Arc::new(factory.build_subagent_tools()); let subagent_tools = Arc::new(factory.build_subagent_tools());
@ -158,17 +145,18 @@ pub(crate) fn build_session_manager_with_sender(
(factory.with_subagent_runtime(subagent_runtime), task_repository) (factory.with_subagent_runtime(subagent_runtime), task_repository)
} else { } else {
// 如果 task 工具未启用,创建一个空的内存仓库
(factory, Arc::new(InMemoryTaskRepository::new())) (factory, Arc::new(InMemoryTaskRepository::new()))
}; };
// Build base tools
let mut tools = factory.build(); let mut tools = factory.build();
// 注册 MCP tools如果有 MCP manager // Register MCP tools (async)
if let Some(manager) = &mcp_manager { // This waits briefly for connections, then registers available tools
if mcp_initializer.is_enabled() {
tokio::task::block_in_place(|| { tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async { tokio::runtime::Handle::current().block_on(async {
if let Err(e) = crate::mcp::register_mcp_tools(manager.clone(), &mut tools).await { if let Err(e) = mcp_initializer.register_tools(&mut tools).await {
tracing::error!(error = %e, "Failed to register MCP tools"); tracing::error!(error = %e, "Failed to register MCP tools");
} }
}) })

View File

@ -1,4 +1,10 @@
//! MCP Client Manager - manages connections to MCP servers //! MCP Client Manager - manages connections to MCP servers
//!
//! This module provides a decoupled MCP integration that:
//! - Doesn't block gateway startup
//! - Is completely optional (disabled by default)
//! - Connects to MCP servers asynchronously
//! - Dynamically registers MCP tools via the Tool trait adapter
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
@ -31,6 +37,12 @@ pub struct McpServerInfo {
} }
/// Manager for MCP client connections /// Manager for MCP client connections
///
/// This manager handles:
/// - Connecting to MCP servers (stdio and HTTP transports)
/// - Discovering available tools
/// - Calling tools on connected servers
/// - Connection lifecycle management
pub struct McpClientManager { pub struct McpClientManager {
/// Connected clients keyed by server name /// Connected clients keyed by server name
clients: RwLock<HashMap<String, Arc<McpClient>>>, clients: RwLock<HashMap<String, Arc<McpClient>>>,
@ -39,7 +51,7 @@ pub struct McpClientManager {
} }
impl McpClientManager { impl McpClientManager {
/// Create a new manager /// Create a new manager (no connections yet)
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
clients: RwLock::new(HashMap::new()), clients: RwLock::new(HashMap::new()),
@ -47,7 +59,10 @@ impl McpClientManager {
} }
} }
/// Connect to all configured servers /// Connect to all configured servers (async, non-blocking)
///
/// This method is designed to be called asynchronously without
/// blocking the main gateway startup flow.
pub async fn connect_all(&self, servers: &[McpServerConfig]) -> anyhow::Result<()> { pub async fn connect_all(&self, servers: &[McpServerConfig]) -> anyhow::Result<()> {
for server in servers { for server in servers {
if !server.enabled { if !server.enabled {
@ -55,15 +70,17 @@ impl McpClientManager {
continue; continue;
} }
// Each server connection is independent
match self.connect_server(server).await { match self.connect_server(server).await {
Ok(info) => { Ok(info) => {
tracing::info!( tracing::info!(
name = %server.name, name = %info.name,
tools_count = info.tools.len(), tools_count = info.tools.len(),
"Connected to MCP server" "Connected to MCP server"
); );
} }
Err(e) => { Err(e) => {
// Log error but continue with other servers
tracing::error!( tracing::error!(
name = %server.name, name = %server.name,
error = %e, error = %e,
@ -77,7 +94,7 @@ impl McpClientManager {
/// Connect to a single MCP server /// Connect to a single MCP server
pub async fn connect_server(&self, config: &McpServerConfig) -> anyhow::Result<McpServerInfo> { pub async fn connect_server(&self, config: &McpServerConfig) -> anyhow::Result<McpServerInfo> {
tracing::info!(name = %config.name, "Connecting to MCP server"); tracing::info!(name = %config.name, transport = ?config.transport, "Connecting to MCP server");
let client = match &config.transport { let client = match &config.transport {
McpTransportConfig::Stdio { command, args, env } => { McpTransportConfig::Stdio { command, args, env } => {
@ -221,7 +238,7 @@ impl McpClientManager {
let arguments = if args.is_object() { let arguments = if args.is_object() {
args.as_object().unwrap().clone() args.as_object().unwrap().clone()
} else { } else {
// If not an object, wrap it or use empty object // If not an object, use empty object
serde_json::Map::new() serde_json::Map::new()
}; };
@ -253,6 +270,11 @@ impl McpClientManager {
self.server_info.write().await.clear(); self.server_info.write().await.clear();
Ok(()) Ok(())
} }
/// Check if any servers are connected
pub async fn has_connections(&self) -> bool {
!self.clients.read().await.is_empty()
}
} }
impl Default for McpClientManager { impl Default for McpClientManager {
@ -260,3 +282,88 @@ impl Default for McpClientManager {
Self::new() Self::new()
} }
} }
/// MCP Initializer - handles asynchronous MCP initialization
///
/// This struct provides a decoupled way to initialize MCP:
/// - Doesn't block gateway startup
/// - Can be initialized in a background task
/// - Tools are registered after connection is established
pub struct McpInitializer {
/// The MCP client manager (None if MCP is disabled)
manager: Option<Arc<McpClientManager>>,
/// Connection task handle (for background initialization)
connection_task: Option<tokio::task::JoinHandle<anyhow::Result<()>>>,
}
impl McpInitializer {
/// Create a disabled initializer (MCP not configured)
pub fn disabled() -> Self {
Self {
manager: None,
connection_task: None,
}
}
/// Create an initializer with MCP configuration
///
/// This spawns a background task to connect to MCP servers,
/// allowing the gateway to start immediately.
pub fn with_config(config: crate::mcp::McpConfig) -> Self {
if !config.has_enabled_servers() {
return Self::disabled();
}
let manager = Arc::new(McpClientManager::new());
let servers: Vec<_> = config.enabled_servers().into_iter().cloned().collect();
// Spawn background connection task
let manager_clone = manager.clone();
let connection_task = tokio::spawn(async move {
tracing::info!("Starting MCP connection task...");
manager_clone.connect_all(&servers).await
});
Self {
manager: Some(manager),
connection_task: Some(connection_task),
}
}
/// Get the manager (if MCP is enabled)
pub fn manager(&self) -> Option<Arc<McpClientManager>> {
self.manager.clone()
}
/// Check if MCP is enabled
pub fn is_enabled(&self) -> bool {
self.manager.is_some()
}
/// Wait for connections to complete (optional)
///
/// This can be called if you want to ensure MCP servers are connected
/// before proceeding, but it's not required.
pub async fn wait_for_connections(&mut self) -> anyhow::Result<()> {
if let Some(task) = self.connection_task.take() {
// Handle JoinError and inner Result
task.await??;
}
Ok(())
}
/// Register MCP tools to the tool registry
///
/// This should be called after the gateway is ready to accept tools.
/// The method handles the case where connections are still in progress.
pub async fn register_tools(&self, registry: &mut crate::tools::ToolRegistry) -> anyhow::Result<()> {
if let Some(manager) = &self.manager {
// Give a small grace period for connections if still in progress
// This allows tools to be registered even if connection task is running
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
crate::mcp::register_mcp_tools(manager.clone(), registry).await?;
}
Ok(())
}
}

View File

@ -2,11 +2,19 @@
//! //!
//! This module provides MCP client functionality to connect to external MCP servers //! This module provides MCP client functionality to connect to external MCP servers
//! and expose their tools through PicoBot's Tool system. //! and expose their tools through PicoBot's Tool system.
//!
//! ## Architecture (Decoupled)
//!
//! - `McpInitializer`: Handles async initialization without blocking gateway startup
//! - `McpClientManager`: Manages connections to MCP servers
//! - `McpToolWrapper`: Adapts MCP tools to PicoBot's Tool trait
//!
//! MCP is completely optional and disabled by default.
pub mod config; pub mod config;
pub mod client; pub mod client;
pub mod tool_adapter; pub mod tool_adapter;
pub use config::{McpConfig, McpServerConfig, McpTransportConfig}; pub use config::{McpConfig, McpServerConfig, McpTransportConfig};
pub use client::{McpClientManager, McpClient, McpServerInfo}; pub use client::{McpClientManager, McpClient, McpServerInfo, McpInitializer};
pub use tool_adapter::{McpToolWrapper, register_mcp_tools}; pub use tool_adapter::{McpToolWrapper, register_mcp_tools};