PicoBot/src/gateway/runtime.rs
oudecheng 7626ba2d2f feat(gateway): 添加待办事项读取功能
- 引入 TodoReadTool 工具支持读取当前对话的待办事项列表
- 实现从内存或SQLite数据库读取待办事项的功能
- 添加内存回填机制确保数据一致性
- 在ToolRegistryFactory中注册新的待办事项读取工具
- 更新会话初始化逻辑以传递待办事项存储依赖
- 添加完整的单元测试验证各种读取场景
2026-06-15 15:33:43 +08:00

272 lines
9.8 KiB
Rust

//! Gateway Runtime - builds SessionManager with decoupled MCP integration
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::RwLock;
use crate::agent::AgentError;
use crate::bus::MessageBus;
use crate::config::{LLMProviderConfig, MemoryMaintenanceConfig, SubagentsConfig, TaskConfig};
use crate::gateway::tool_registry_factory::ToolRegistryFactory;
use crate::mcp::McpInitializer;
use crate::skills::SkillRuntime;
use crate::storage::{
ConversationRepository, MemoryRepository, PromptInjectionRepository, SchedulerJobRepository,
SessionStore, SkillEventRepository, TodoRepository,
};
use crate::tools::{
DefaultSubAgentRuntime, InMemoryTaskRepository, NoopSessionMessageSender,
SessionMessageSender, SubAgentRuntimeConfig, SubagentCatalog, ToolRegistry,
};
use crate::tools::task::repository::TaskRepository;
use crate::tools::todo_write::TodoItem;
use super::agent_factory::AgentFactory;
use super::cli_session::CliSessionService;
use super::memory_maintenance_coordinator::MemoryMaintenanceCoordinator;
use super::provider_config_service::ProviderConfigService;
use super::scheduled_agent_task_service::ScheduledAgentTaskService;
use super::session::{SessionManager, SessionManagerServices};
use super::session_factory::SessionFactory;
use super::session_lifecycle::SessionLifecycleService;
use super::session_message_service::SessionMessageService;
/// Build SessionManager with optional MCP integration
///
/// MCP is initialized asynchronously in background, not blocking gateway startup.
pub(crate) fn build_session_manager(
agent_prompt_reinject_every: u64,
show_tool_results: bool,
default_timezone: String,
provider_config: LLMProviderConfig,
provider_configs: HashMap<String, LLMProviderConfig>,
skills: Arc<SkillRuntime>,
disabled_tools: HashSet<String>,
task_config: TaskConfig,
subagents_config: SubagentsConfig,
maintenance_config: MemoryMaintenanceConfig,
session_ttl_hours: Option<u64>,
mcp_config: crate::mcp::McpConfig,
bus: Option<Arc<MessageBus>>,
) -> Result<(SessionManager, Arc<dyn TaskRepository>), AgentError> {
build_session_manager_with_sender(
agent_prompt_reinject_every,
show_tool_results,
default_timezone,
provider_config,
provider_configs,
skills,
Arc::new(NoopSessionMessageSender),
disabled_tools,
task_config,
subagents_config,
maintenance_config,
session_ttl_hours,
mcp_config,
bus,
)
}
/// Build SessionManager with custom session message sender
pub(crate) fn build_session_manager_with_sender(
agent_prompt_reinject_every: u64,
show_tool_results: bool,
default_timezone: String,
provider_config: LLMProviderConfig,
provider_configs: HashMap<String, LLMProviderConfig>,
skills: Arc<SkillRuntime>,
session_message_sender: Arc<dyn SessionMessageSender>,
disabled_tools: HashSet<String>,
task_config: TaskConfig,
subagents_config: SubagentsConfig,
maintenance_config: MemoryMaintenanceConfig,
session_ttl_hours: Option<u64>,
mcp_config: crate::mcp::McpConfig,
bus: Option<Arc<MessageBus>>,
) -> Result<(SessionManager, Arc<dyn TaskRepository>), AgentError> {
let store = Arc::new(
SessionStore::new()
.map_err(|err| AgentError::Other(format!("session store init error: {}", err)))?,
);
let known_agents = provider_configs.keys().cloned().collect::<HashSet<_>>();
let provider_configs = ProviderConfigService::new(
provider_config.clone(),
provider_configs,
maintenance_config,
);
if let Err(err) =
store.append_skill_event(None, "discovered", None, &skills.discovery_event_payload())
{
tracing::warn!(error = %err, "Failed to record skill discovery event");
}
let memories: Arc<dyn MemoryRepository> = store.clone();
let scheduler_jobs: Arc<dyn SchedulerJobRepository> = store.clone();
let skill_events: Arc<dyn SkillEventRepository> = store.clone();
let conversations: Arc<dyn ConversationRepository> = store.clone();
let todo_repository: Arc<dyn TodoRepository> = store.clone();
// Create ToolRegistryFactory
let factory = ToolRegistryFactory::new(
skills.clone(),
memories,
scheduler_jobs,
skill_events.clone(),
todo_repository,
session_message_sender.clone(),
known_agents,
default_timezone,
disabled_tools,
task_config.clone(),
);
// Create shared todo state for TodoWriteTool
let todo_state: Arc<RwLock<HashMap<String, Vec<TodoItem>>>> =
Arc::new(RwLock::new(HashMap::new()));
let factory = factory.with_todo_state(todo_state);
// Create MCP Initializer (async, non-blocking)
// MCP servers connect in background task
let mut mcp_initializer = McpInitializer::with_config(mcp_config);
// Add MCP manager to factory (if enabled)
let factory = if let Some(manager) = mcp_initializer.manager() {
factory.with_mcp_manager(manager)
} else {
factory
};
// Wait for MCP connections and collect MCP tools for subagents
// This needs to happen before building subagent tools
let mut mcp_tools_for_subagents: Vec<crate::mcp::tool_adapter::McpToolWrapper> = Vec::new();
if mcp_initializer.is_enabled() {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
// Wait for connections to complete
if let Err(e) = mcp_initializer.wait_for_connections().await {
tracing::error!(error = %e, "Failed to wait for MCP connections");
return;
}
// Collect MCP tools for subagents
if let Some(manager) = mcp_initializer.manager() {
let all_tools = manager.all_tools().await;
for (server_key, tool_info) in all_tools {
let wrapper = crate::mcp::tool_adapter::McpToolWrapper::new(
manager.clone(),
server_key.clone(),
tool_info,
);
mcp_tools_for_subagents.push(wrapper);
}
tracing::info!(
tool_count = mcp_tools_for_subagents.len(),
"Collected MCP tools for subagents"
);
}
})
});
}
// Create SubAgentRuntime (if task tool is enabled)
let (factory, task_repository): (_, Arc<dyn TaskRepository>) = if task_config.enabled {
let task_repository = Arc::new(InMemoryTaskRepository::new());
// Build subagent tools with MCP tools
let subagent_tools = Arc::new(
factory.build_subagent_tools(
if mcp_tools_for_subagents.is_empty() {
None
} else {
Some(mcp_tools_for_subagents.clone())
}
)
);
// Create subagent catalog with discovery
let catalog = Arc::new(SubagentCatalog::discover(&subagents_config));
let runtime_config = SubAgentRuntimeConfig {
default_allowed_tools: task_config.allowed_tools.iter().cloned().collect(),
default_max_execution_secs: task_config.max_execution_secs,
explore_max_execution_secs: task_config.explore_max_execution_secs,
ttl_hours: task_config.ttl_hours,
skills_index: skills.system_index_prompt(),
};
let subagent_runtime = Arc::new(DefaultSubAgentRuntime::new(
runtime_config,
task_repository.clone(),
conversations.clone(),
subagent_tools,
provider_config.clone(),
catalog,
bus.clone(),
store.clone(),
));
(factory.with_subagent_runtime(subagent_runtime), task_repository)
} else {
(factory, Arc::new(InMemoryTaskRepository::new()))
};
// Build base tools
let mut tools = factory.build();
// Register MCP tools to main agent (async)
// Note: MCP tools for subagents are already collected above
if mcp_initializer.is_enabled() {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
// Register pre-collected MCP tools
for tool in mcp_tools_for_subagents {
tools.register(tool);
}
tracing::info!("Registered MCP tools to main agent");
})
});
}
let tools = Arc::new(tools);
let prompt_repository: Arc<dyn PromptInjectionRepository> = store.clone();
let agent_factory = AgentFactory::new(
tools.clone(),
skills.clone(),
agent_prompt_reinject_every as usize,
prompt_repository.clone(),
);
let session_factory = SessionFactory::new(
provider_config.clone(),
skills.clone(),
agent_factory,
conversations,
skill_events,
store.clone(),
);
let lifecycle = SessionLifecycleService::new(session_factory, session_ttl_hours);
let cli_sessions = CliSessionService::new(store.clone());
let messages = SessionMessageService::new(lifecycle.clone(), show_tool_results);
let scheduled_tasks = ScheduledAgentTaskService::new(
lifecycle.clone(),
provider_configs.clone(),
show_tool_results,
);
let memory_maintenance =
MemoryMaintenanceCoordinator::new(store.clone(), provider_configs.clone());
Ok((SessionManager::from_services(SessionManagerServices {
tools: tools as Arc<ToolRegistry>,
skills,
store,
show_tool_results,
lifecycle,
cli_sessions,
messages,
scheduled_tasks,
memory_maintenance,
task_repository: task_repository.clone(),
}), task_repository))
}