diff --git a/README.md b/README.md index 2fe5e6a..e699793 100644 --- a/README.md +++ b/README.md @@ -549,7 +549,8 @@ CLI 中已实现的交互命令包括: "models": { "default": { "model_id": "", - "temperature": 0.2 + "temperature": 0.2, + "context_window_tokens": 128000 } }, "agents": { diff --git a/config.json b/config.json index 580b0c9..3f9f0ed 100644 --- a/config.json +++ b/config.json @@ -10,7 +10,8 @@ "models": { "default": { "model_id": "", - "temperature": 0.2 + "temperature": 0.7, + "context_window_tokens": 128000 } }, "agents": { diff --git a/src/gateway/execution.rs b/src/gateway/execution.rs index a7c1964..4953aa8 100644 --- a/src/gateway/execution.rs +++ b/src/gateway/execution.rs @@ -1,13 +1,16 @@ use std::collections::HashMap; use std::sync::Arc; -use crate::agent::{AgentError, AgentProcessResult}; +use crate::agent::{AgentError, AgentProcessResult, EmittedMessageHandler}; use crate::bus::message::ToolMessageState; -use crate::bus::{ChatMessage, OutboundMessage}; +use crate::bus::{ChatMessage, MediaItem, OutboundMessage, SYSTEM_CONTEXT_SCHEDULED_PROMPT}; use crate::config::LLMProviderConfig; use tokio::sync::Mutex; -use super::session::{Session, schedule_background_history_compaction}; +use super::session::{ + Session, enrich_user_content_with_media_refs, handle_in_chat_command, + schedule_background_history_compaction, +}; const SCHEDULED_TASK_EXECUTION_SYSTEM_PROMPT: &str = "系统说明:当前输入来自一次已经触发的定时任务执行。你现在需要执行任务内容本身,而不是创建、修改、恢复、暂停或查询新的定时任务。除非当前任务内容明确要求管理调度器,否则不要调用任何定时任务管理工具;像“每小时”、“每天”、“cron”、“定时”等词,只应视为任务背景,不应再解释为新的建任务请求。"; @@ -56,6 +59,28 @@ pub(crate) struct FinalizedAgentResult { pub(crate) should_schedule_compaction: bool, } +pub(crate) struct MessageExecutionRequest<'a> { + pub(crate) session: Arc>, + pub(crate) channel_name: &'a str, + pub(crate) sender_id: &'a str, + pub(crate) chat_id: &'a str, + pub(crate) content: &'a str, + pub(crate) media: Vec, + pub(crate) live_emitter: Option>, +} + +pub(crate) struct ScheduledExecutionRequest<'a> { + pub(crate) session: Arc>, + pub(crate) channel_name: &'a str, + pub(crate) chat_id: &'a str, + pub(crate) prompt: &'a str, + pub(crate) sender_id: &'a str, + pub(crate) provider_config: LLMProviderConfig, + pub(crate) fresh_session: bool, + pub(crate) system_prompt: Option<&'a str>, + pub(crate) metadata: &'a HashMap, +} + impl AgentExecutionService { pub(crate) fn new(show_tool_results: bool) -> Self { Self { show_tool_results } @@ -115,6 +140,136 @@ impl AgentExecutionService { }) } + pub(crate) async fn prepare_and_execute_message( + &self, + request: MessageExecutionRequest<'_>, + ) -> Result, AgentError> { + let (history, agent, user_message) = { + let mut session_guard = request.session.lock().await; + + session_guard.ensure_persistent_session(request.chat_id)?; + session_guard.ensure_chat_loaded(request.chat_id)?; + + if let Some(command_response) = + handle_in_chat_command(&mut session_guard, request.chat_id, request.content)? + { + return Ok(vec![OutboundMessage::assistant( + request.channel_name.to_string(), + request.chat_id.to_string(), + command_response, + None, + HashMap::new(), + )]); + } + + session_guard.ensure_agent_prompt_before_user_message(request.chat_id)?; + + let media_refs: Vec = request + .media + .iter() + .map(|media| media.path.clone()) + .collect(); + #[cfg(debug_assertions)] + if !media_refs.is_empty() { + tracing::debug!(media_count = %request.media.len(), media_refs = ?media_refs, "Adding user message with media"); + } + let enriched_content = + enrich_user_content_with_media_refs(request.content, &media_refs)?; + let user_message = session_guard.create_user_message(&enriched_content, media_refs); + session_guard.append_persisted_message(request.chat_id, user_message.clone())?; + + let history = session_guard.get_or_create_history(request.chat_id).clone(); + session_guard.record_skill_offer(request.chat_id)?; + + let mut agent = session_guard.create_agent( + request.chat_id, + Some(request.sender_id), + Some(&user_message.id), + )?; + if let Some(handler) = request.live_emitter.clone() { + agent = agent.with_emitted_message_handler(handler); + } + + (history, agent, user_message) + }; + + let result = agent.process(history).await?; + let metadata = HashMap::new(); + + self.finalize_result_and_schedule_compaction( + request.session.clone(), + FinalizeAgentResultRequest { + channel_name: request.channel_name, + chat_id: request.chat_id, + user_message: &user_message, + result, + metadata: &metadata, + suppress_live_tool_calls: request.live_emitter.is_some(), + execution_kind: "message", + }, + ) + .await + } + + pub(crate) async fn prepare_and_execute_scheduled_task( + &self, + request: ScheduledExecutionRequest<'_>, + ) -> Result, AgentError> { + let (history, agent, user_message) = { + let mut session_guard = request.session.lock().await; + + session_guard.ensure_persistent_session(request.chat_id)?; + + if request.fresh_session { + session_guard.reset_chat_context(request.chat_id)?; + } + + session_guard.ensure_chat_loaded(request.chat_id)?; + session_guard.ensure_agent_prompt_before_user_message(request.chat_id)?; + + let scheduled_system_prompt = + compose_scheduled_task_system_prompt(request.system_prompt); + session_guard.append_persisted_message( + request.chat_id, + ChatMessage::system_with_context( + &scheduled_system_prompt, + Some(SYSTEM_CONTEXT_SCHEDULED_PROMPT.to_string()), + ), + )?; + + let user_message = session_guard.create_user_message(request.prompt, Vec::new()); + session_guard.append_persisted_message(request.chat_id, user_message.clone())?; + + let history = session_guard.get_or_create_history(request.chat_id).clone(); + session_guard.record_skill_offer(request.chat_id)?; + + let agent = session_guard.create_agent_with_provider_config( + request.chat_id, + Some(request.sender_id), + Some(&user_message.id), + request.provider_config.clone(), + )?; + + (history, agent, user_message) + }; + + let result = agent.process(history).await?; + + self.finalize_result_and_schedule_compaction( + request.session.clone(), + FinalizeAgentResultRequest { + channel_name: request.channel_name, + chat_id: request.chat_id, + user_message: &user_message, + result, + metadata: request.metadata, + suppress_live_tool_calls: false, + execution_kind: "scheduled_task", + }, + ) + .await + } + pub(crate) async fn finalize_result_and_schedule_compaction( &self, session: Arc>, diff --git a/src/gateway/session.rs b/src/gateway/session.rs index 3f8828a..dd2a8c4 100644 --- a/src/gateway/session.rs +++ b/src/gateway/session.rs @@ -1,8 +1,7 @@ use crate::agent::{AgentError, AgentLoop, ContextCompressor, EmittedMessageHandler}; -use crate::bus::{ - ChatMessage, MessageBus, OutboundMessage, SYSTEM_CONTEXT_AGENT_PROMPT, - SYSTEM_CONTEXT_SCHEDULED_PROMPT, -}; +#[cfg(test)] +use crate::bus::SYSTEM_CONTEXT_SCHEDULED_PROMPT; +use crate::bus::{ChatMessage, MessageBus, OutboundMessage, SYSTEM_CONTEXT_AGENT_PROMPT}; use crate::config::LLMProviderConfig; use crate::protocol::WsOutbound; use crate::skills::SkillRuntime; @@ -20,7 +19,7 @@ use tokio::sync::{Mutex, mpsc}; use uuid::Uuid; use super::execution::{ - AgentExecutionService, FinalizeAgentResultRequest, compose_scheduled_task_system_prompt, + AgentExecutionService, MessageExecutionRequest, ScheduledExecutionRequest, select_provider_config, should_display_message_to_user, }; #[cfg(test)] @@ -42,7 +41,7 @@ fn preview_text(content: &str, max_chars: usize) -> String { preview.replace('\n', "\\n") } -fn enrich_user_content_with_media_refs( +pub(crate) fn enrich_user_content_with_media_refs( content: &str, media_refs: &[String], ) -> Result { @@ -921,66 +920,16 @@ impl SessionManager { .await .ok_or_else(|| AgentError::Other("Session not found".to_string()))?; - // 处理消息 - let (history, agent, user_message) = { - let mut session_guard = session.lock().await; - - session_guard.ensure_persistent_session(chat_id)?; - session_guard.ensure_chat_loaded(chat_id)?; - - if let Some(command_response) = - handle_in_chat_command(&mut session_guard, chat_id, content)? - { - return Ok(vec![OutboundMessage::assistant( - channel_name.to_string(), - chat_id.to_string(), - command_response, - None, - HashMap::new(), - )]); - } - - session_guard.ensure_agent_prompt_before_user_message(chat_id)?; - - // 添加用户消息到历史 - let media_refs: Vec = media.iter().map(|m| m.path.clone()).collect(); - #[cfg(debug_assertions)] - if !media_refs.is_empty() { - tracing::debug!(media_count = %media.len(), media_refs = ?media_refs, "Adding user message with media"); - } - let enriched_content = enrich_user_content_with_media_refs(content, &media_refs)?; - let user_message = session_guard.create_user_message(&enriched_content, media_refs); - session_guard.append_persisted_message(chat_id, user_message.clone())?; - - let history = session_guard.get_or_create_history(chat_id).clone(); - session_guard.record_skill_offer(chat_id)?; - - // 创建 agent 并处理 - let mut agent = - session_guard.create_agent(chat_id, Some(sender_id), Some(&user_message.id))?; - if let Some(handler) = live_emitter.clone() { - agent = agent.with_emitted_message_handler(handler); - } - - (history, agent, user_message) - }; - - let result = agent.process(history).await?; - - let metadata = HashMap::new(); let outbound_messages = AgentExecutionService::new(self.show_tool_results) - .finalize_result_and_schedule_compaction( - session.clone(), - FinalizeAgentResultRequest { - channel_name, - chat_id, - user_message: &user_message, - result, - metadata: &metadata, - suppress_live_tool_calls: live_emitter.is_some(), - execution_kind: "message", - }, - ) + .prepare_and_execute_message(MessageExecutionRequest { + session: session.clone(), + channel_name, + sender_id, + chat_id, + content, + media, + live_emitter, + }) .await?; #[cfg(debug_assertions)] @@ -1015,60 +964,18 @@ impl SessionManager { .unwrap_or_else(|| "scheduler".to_string()); let provider_config = self.provider_config_for_agent(options.agent.as_deref())?; - let (history, agent, user_message) = { - let mut session_guard = session.lock().await; - - session_guard.ensure_persistent_session(chat_id)?; - - if options.fresh_session { - session_guard.reset_chat_context(chat_id)?; - } - - session_guard.ensure_chat_loaded(chat_id)?; - session_guard.ensure_agent_prompt_before_user_message(chat_id)?; - - let scheduled_system_prompt = - compose_scheduled_task_system_prompt(options.system_prompt.as_deref()); - session_guard.append_persisted_message( - chat_id, - ChatMessage::system_with_context( - &scheduled_system_prompt, - Some(SYSTEM_CONTEXT_SCHEDULED_PROMPT.to_string()), - ), - )?; - - let user_message = session_guard.create_user_message(prompt, Vec::new()); - session_guard.append_persisted_message(chat_id, user_message.clone())?; - - let history = session_guard.get_or_create_history(chat_id).clone(); - - session_guard.record_skill_offer(chat_id)?; - - let agent = session_guard.create_agent_with_provider_config( - chat_id, - Some(&sender_id), - Some(&user_message.id), - provider_config.clone(), - )?; - - (history, agent, user_message) - }; - - let result = agent.process(history).await?; - AgentExecutionService::new(self.show_tool_results) - .finalize_result_and_schedule_compaction( - session.clone(), - FinalizeAgentResultRequest { - channel_name, - chat_id, - user_message: &user_message, - result, - metadata: &options.metadata, - suppress_live_tool_calls: false, - execution_kind: "scheduled_task", - }, - ) + .prepare_and_execute_scheduled_task(ScheduledExecutionRequest { + session: session.clone(), + channel_name, + chat_id, + prompt, + sender_id: &sender_id, + provider_config, + fresh_session: options.fresh_session, + system_prompt: options.system_prompt.as_deref(), + metadata: &options.metadata, + }) .await } diff --git a/src/gateway/ws.rs b/src/gateway/ws.rs index 7de9b50..816e8f3 100644 --- a/src/gateway/ws.rs +++ b/src/gateway/ws.rs @@ -1,10 +1,11 @@ use super::{ GatewayState, - session::{Session, handle_in_chat_command, schedule_background_history_compaction}, + execution::{AgentExecutionService, MessageExecutionRequest, should_display_message_to_user}, + session::Session, }; -use crate::agent::EmittedMessageHandler; -use crate::bus::ChatMessage; -use crate::bus::message::{ToolMessageState, format_tool_call_content}; +use crate::agent::{AgentError, EmittedMessageHandler}; +use crate::bus::message::{OutboundEventKind, ToolMessageState, format_tool_call_content}; +use crate::bus::{ChatMessage, OutboundMessage}; use crate::protocol::{SessionSummary, WsInbound, WsOutbound, parse_inbound, serialize_outbound}; use async_trait::async_trait; use axum::extract::State; @@ -14,6 +15,8 @@ use futures_util::{SinkExt, StreamExt}; use std::sync::Arc; use tokio::sync::{Mutex, mpsc}; +const CLI_CHANNEL_NAME: &str = "cli"; + struct WsToolCallEmitter { sender: mpsc::Sender, show_tool_results: bool, @@ -57,7 +60,7 @@ async fn handle_socket(ws: WebSocket, state: Arc) { } }; - let channel_name = "cli".to_string(); + let channel_name = CLI_CHANNEL_NAME.to_string(); // 创建 CLI session let session = match Session::new( @@ -229,19 +232,55 @@ fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec { } } -fn should_display_message_to_user(show_tool_results: bool, message: &ChatMessage) -> bool { - if message.role != "tool" { - return true; +fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Vec { + match message.event_kind { + OutboundEventKind::AssistantResponse | OutboundEventKind::SchedulerNotification => { + vec![WsOutbound::AssistantResponse { + id: uuid::Uuid::new_v4().to_string(), + content: message.content.clone(), + role: message.role.clone(), + }] + } + OutboundEventKind::ToolCall => vec![WsOutbound::ToolCall { + id: message + .tool_call_id + .clone() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), + tool_call_id: message.tool_call_id.clone().unwrap_or_default(), + tool_name: message.tool_name.clone().unwrap_or_default(), + arguments: message + .tool_arguments + .clone() + .unwrap_or(serde_json::Value::Null), + content: message.content.clone(), + role: message.role.clone(), + }], + OutboundEventKind::ToolResult => vec![WsOutbound::ToolResult { + id: message + .tool_call_id + .clone() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), + tool_call_id: message.tool_call_id.clone().unwrap_or_default(), + tool_name: message.tool_name.clone().unwrap_or_default(), + content: message.content.clone(), + role: message.role.clone(), + }], + OutboundEventKind::ToolPending => vec![WsOutbound::ToolPending { + id: message + .tool_call_id + .clone() + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()), + tool_call_id: message.tool_call_id.clone().unwrap_or_default(), + tool_name: message.tool_name.clone().unwrap_or_default(), + content: message.content.clone(), + role: message.role.clone(), + resume_hint: "完成外部操作后,直接发一条继续消息即可。".to_string(), + }], + OutboundEventKind::ErrorNotification => vec![WsOutbound::Error { + code: "AGENT_ERROR".to_string(), + message: message.content.clone(), + }], } - - show_tool_results - || matches!( - message - .tool_state - .as_ref() - .unwrap_or(&ToolMessageState::Completed), - ToolMessageState::PendingUserAction - ) } async fn handle_inbound( @@ -260,83 +299,42 @@ async fn handle_inbound( } => { let chat_id = chat_id.unwrap_or_else(|| current_session_id.clone()); let sender_id = resolve_ws_sender_id(sender_id.as_deref(), runtime_session_id); - let (history, agent, user_tx) = { - let mut session_guard = session.lock().await; + let user_tx = session.lock().await.user_tx.clone(); + let live_emitter = Arc::new(WsToolCallEmitter { + sender: user_tx.clone(), + show_tool_results: state.config.gateway.show_tool_results, + }); - session_guard.ensure_persistent_session(&chat_id)?; - session_guard.ensure_chat_loaded(&chat_id)?; - - if let Some(command_response) = - handle_in_chat_command(&mut session_guard, &chat_id, &content)? - { - let _ = session_guard - .send(WsOutbound::AssistantResponse { - id: uuid::Uuid::new_v4().to_string(), - content: command_response, - role: "assistant".to_string(), - }) - .await; - return Ok(()); - } - - session_guard.ensure_agent_prompt_before_user_message(&chat_id)?; - - let user_message = session_guard.create_user_message(&content, Vec::new()); - let user_message_id = user_message.id.clone(); - session_guard.append_persisted_message(&chat_id, user_message)?; - - let history = session_guard.get_or_create_history(&chat_id).clone(); - session_guard.record_skill_offer(&chat_id)?; - - let live_emitter = Arc::new(WsToolCallEmitter { - sender: session_guard.user_tx.clone(), - show_tool_results: state.config.gateway.show_tool_results, - }); - let agent = session_guard - .create_agent(&chat_id, Some(&sender_id), Some(&user_message_id))? - .with_emitted_message_handler(live_emitter); - - (history, agent, session_guard.user_tx.clone()) - }; - - match agent.process(history).await { - Ok(result) => { - let mut session_guard = session.lock().await; - session_guard - .append_persisted_messages(&chat_id, result.emitted_messages.clone())?; - for outbound in result - .emitted_messages + match AgentExecutionService::new(state.config.gateway.show_tool_results) + .prepare_and_execute_message(MessageExecutionRequest { + session: session.clone(), + channel_name: CLI_CHANNEL_NAME, + sender_id: &sender_id, + chat_id: &chat_id, + content: &content, + media: Vec::new(), + live_emitter: Some(live_emitter), + }) + .await + { + Ok(outbound_messages) => { + for outbound in outbound_messages .iter() - .filter(|message| { - !message.is_assistant_tool_call_message() - && should_display_message_to_user( - state.config.gateway.show_tool_results, - message, - ) - }) - .flat_map(ws_outbound_from_chat_message) + .flat_map(ws_outbound_from_outbound_message) { - let _ = session_guard.send(outbound).await; - } - - drop(session_guard); - - if let Err(error) = - schedule_background_history_compaction(session.clone(), chat_id.clone()) - .await - { - tracing::warn!(chat_id = %chat_id, error = %error, "Failed to schedule background history compaction for CLI session"); + let _ = user_tx.send(outbound).await; } } - Err(error) => { + Err(AgentError::LlmError(error)) => { tracing::error!(chat_id = %chat_id, error = %error, "Agent process error"); let _ = user_tx .send(WsOutbound::Error { code: "LLM_ERROR".to_string(), - message: error.to_string(), + message: error, }) .await; } + Err(error) => return Err(error), } Ok(()) @@ -483,8 +481,8 @@ mod tests { ws_outbound_from_chat_message, }; use crate::agent::EmittedMessageHandler; - use crate::bus::ChatMessage; use crate::bus::message::ToolMessageState; + use crate::bus::{ChatMessage, OutboundMessage}; use crate::protocol::WsOutbound; use crate::providers::ToolCall; use serde_json::json; @@ -579,6 +577,38 @@ mod tests { assert!(should_display_message_to_user(true, &completed)); } + #[test] + fn test_ws_outbound_from_outbound_message_maps_tool_call() { + let message = OutboundMessage::tool_call( + "cli", + "session-1", + "call-1", + "calculator", + json!({"expression": "1 + 1"}), + None, + Default::default(), + ); + + let outbound = super::ws_outbound_from_outbound_message(&message); + + assert_eq!(outbound.len(), 1); + match &outbound[0] { + WsOutbound::ToolCall { + tool_call_id, + tool_name, + arguments, + content, + .. + } => { + assert_eq!(tool_call_id, "call-1"); + assert_eq!(tool_name, "calculator"); + assert_eq!(arguments["expression"], "1 + 1"); + assert_eq!(content, "calculator\nargs: {\"expression\":\"1 + 1\"}"); + } + other => panic!("unexpected outbound variant: {:?}", other), + } + } + #[test] fn test_resolve_ws_sender_id_prefers_inbound_sender() { assert_eq!(