diff --git a/README.md b/README.md index c1696a5..5dad979 100644 --- a/README.md +++ b/README.md @@ -543,7 +543,7 @@ PicoBot 支持通过 MCP (Model Context Protocol) 扩展工具能力,可以连 | **Stdio** | `stdio` | 启动子进程,通过 stdin/stdout 通信 | 本地 MCP servers(如 npm 包) | | **HTTP** | `streamableHttp` 或 `http` | 通过 HTTP/SSE 连接远程服务器 | 远程 MCP servers、云服务 | -**配置示例(Claude Desktop 兼容格式):** +**配置示例:** ```json { @@ -562,12 +562,6 @@ PicoBot 支持通过 MCP (Model Context Protocol) 扩展工具能力,可以连 }, "isActive": true, "name": "AliyunBailianMCP_WebSearch" - }, - "disabled-server": { - "type": "stdio", - "command": "npx", - "args": ["-y", "some-server"], - "isActive": false } } } @@ -606,7 +600,6 @@ MCP 工具会自动注册到 ToolRegistry,命名格式为 `mcp_{server_key}_{t - `mcp_filesystem_read_file` - server key 为 "filesystem" - `mcp_filesystem_write_file` -- `mcp_filesystem_list_directory` - `mcp_WebSearch_search` - server key 为 "WebSearch" **架构特点:** diff --git a/src/cli/init.rs b/src/cli/init.rs index d3d42ad..d3b190d 100644 --- a/src/cli/init.rs +++ b/src/cli/init.rs @@ -76,7 +76,7 @@ impl InitWizard { skills: crate::config::SkillsConfig::default(), tools: crate::config::ToolsConfig::default(), memory_maintenance: crate::config::MemoryMaintenanceConfig::default(), - mcp: crate::mcp::McpConfig::default(), + mcp_servers: HashMap::new(), } } @@ -827,7 +827,7 @@ impl InitWizard { skills: existing.skills.clone(), tools: existing.tools.clone(), memory_maintenance: existing.memory_maintenance.clone(), - mcp: existing.mcp.clone(), + mcp_servers: existing.mcp_servers.clone(), } } diff --git a/src/config/mod.rs b/src/config/mod.rs index b2f7b6c..77e49b1 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -29,8 +29,8 @@ pub struct Config { pub tools: ToolsConfig, #[serde(default)] pub memory_maintenance: MemoryMaintenanceConfig, - #[serde(default)] - pub mcp: crate::mcp::McpConfig, + #[serde(default, rename = "mcpServers")] + pub mcp_servers: HashMap, } #[derive(Debug, Clone, Deserialize, Serialize)] @@ -813,6 +813,15 @@ impl Config { let content = resolve_env_placeholders(&content); let config: Config = serde_json::from_str(&content)?; config.time.parse_timezone()?; + + // Log MCP servers count if any + if !config.mcp_servers.is_empty() { + tracing::info!( + mcp_servers = config.mcp_servers.len(), + "MCP servers loaded from config" + ); + } + Ok(config) } @@ -2014,4 +2023,98 @@ mod tests { unsafe { env::remove_var("BRACE_VAR") }; unsafe { env::remove_var("ANGLE_VAR") }; } + + #[test] + fn test_root_level_mcp_servers_merging() { + // Test that mcpServers at root level is loaded correctly + let file = tempfile::NamedTempFile::new().unwrap(); + std::fs::write( + file.path(), + r#"{ + "providers": { + "aliyun": { + "type": "openai", + "base_url": "https://example.invalid/v1", + "api_key": "test-key", + "extra_headers": {} + } + }, + "models": { + "qwen-plus": { + "model_id": "qwen-plus" + } + }, + "agents": { + "default": { + "provider": "aliyun", + "model": "qwen-plus" + } + }, + "mcpServers": { + "WebSearch": { + "type": "streamableHttp", + "baseUrl": "https://api.example.com/mcp", + "isActive": true + }, + "filesystem": { + "type": "stdio", + "command": "npx", + "isActive": true + } + } +}"#, + ) + .unwrap(); + + let config = Config::load(file.path().to_str().unwrap()).unwrap(); + + // Should have 2 servers + assert_eq!(config.mcp_servers.len(), 2); + assert!(config.mcp_servers.contains_key("WebSearch")); + assert!(config.mcp_servers.contains_key("filesystem")); + } + + #[test] + fn test_root_level_mcp_servers_only() { + // Test that mcpServers at root level works + let file = tempfile::NamedTempFile::new().unwrap(); + std::fs::write( + file.path(), + r#"{ + "providers": { + "aliyun": { + "type": "openai", + "base_url": "https://example.invalid/v1", + "api_key": "test-key", + "extra_headers": {} + } + }, + "models": { + "qwen-plus": { + "model_id": "qwen-plus" + } + }, + "agents": { + "default": { + "provider": "aliyun", + "model": "qwen-plus" + } + }, + "mcpServers": { + "WebSearch": { + "type": "streamableHttp", + "baseUrl": "https://api.example.com/mcp", + "isActive": true + } + } +}"#, + ) + .unwrap(); + + let config = Config::load(file.path().to_str().unwrap()).unwrap(); + + // Should have 1 server from root level + assert_eq!(config.mcp_servers.len(), 1); + assert!(config.mcp_servers.contains_key("WebSearch")); + } } diff --git a/src/gateway/mod.rs b/src/gateway/mod.rs index 3d3228a..cbadafd 100644 --- a/src/gateway/mod.rs +++ b/src/gateway/mod.rs @@ -72,6 +72,10 @@ impl GatewayState { let channel_manager = ChannelManager::new(); let bus = channel_manager.bus(); + let mcp_config = crate::mcp::McpConfig { + mcp_servers: config.mcp_servers.clone(), + }; + let (session_manager, task_repository) = build_session_manager_with_sender( agent_prompt_reinject_every, show_tool_results, @@ -84,7 +88,7 @@ impl GatewayState { config.tools.task.clone(), config.memory_maintenance.clone(), session_ttl_hours, - config.mcp.clone(), + mcp_config, )?; Ok(Self { diff --git a/src/gateway/runtime.rs b/src/gateway/runtime.rs index 800997c..f279f77 100644 --- a/src/gateway/runtime.rs +++ b/src/gateway/runtime.rs @@ -112,7 +112,7 @@ pub(crate) fn build_session_manager_with_sender( // Create MCP Initializer (async, non-blocking) // MCP servers connect in background task - let mcp_initializer = McpInitializer::with_config(mcp_config); + let mut mcp_initializer = McpInitializer::with_config(mcp_config); // Add MCP manager to factory (if enabled) let factory = if let Some(manager) = mcp_initializer.manager() { diff --git a/src/mcp/client.rs b/src/mcp/client.rs index 760fb4c..9cb4aeb 100644 --- a/src/mcp/client.rs +++ b/src/mcp/client.rs @@ -21,6 +21,17 @@ use http::{HeaderName, HeaderValue}; use tokio::process::Command; use crate::mcp::config::{McpServerConfig, McpTransportConfig}; +use std::env; + +/// Resolve ${ENV_VAR} placeholders in a value string +fn resolve_env_placeholders_in_value(value: &str) -> String { + let re = regex::Regex::new(r"\$\{([A-Z_][A-Z0-9_]*)\}").expect("invalid regex"); + re.replace_all(value, |caps: ®ex::Captures| { + let var_name = &caps[1]; + env::var(var_name).unwrap_or_else(|_| caps[0].to_string()) + }) + .to_string() +} /// Type alias for the MCP client service pub type McpClient = RunningService; @@ -166,8 +177,22 @@ impl McpClientManager { url: &str, headers: &HashMap, ) -> anyhow::Result { + // Resolve env placeholders in headers + let resolved_headers: HashMap = headers + .iter() + .map(|(key, value)| { + // Resolve ${ENV_VAR} placeholders + let resolved = if value.contains("${") { + resolve_env_placeholders_in_value(value) + } else { + value.clone() + }; + (key.clone(), resolved) + }) + .collect(); + // Build custom headers - let custom_headers: HashMap = headers + let custom_headers: HashMap = resolved_headers .iter() .filter_map(|(key, value)| { // Try to parse header name and value @@ -363,14 +388,14 @@ impl McpInitializer { /// Register MCP tools to the tool registry /// /// This should be called after the gateway is ready to accept tools. - /// The method handles the case where connections are still in progress. - pub async fn register_tools(&self, registry: &mut crate::tools::ToolRegistry) -> anyhow::Result<()> { - if let Some(manager) = &self.manager { - // Give a small grace period for connections if still in progress - // This allows tools to be registered even if connection task is running - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + /// Waits for connections to complete before registering tools. + pub async fn register_tools(&mut self, registry: &mut crate::tools::ToolRegistry) -> anyhow::Result<()> { + if let Some(manager) = self.manager.clone() { + // Wait for connections to complete first + self.wait_for_connections().await?; - crate::mcp::register_mcp_tools(manager.clone(), registry).await?; + tracing::info!("Registering MCP tools after connections completed"); + crate::mcp::register_mcp_tools(manager, registry).await?; } Ok(()) }