feat: 添加 context_window_tokens 配置,调整模型温度并重构消息执行逻辑

This commit is contained in:
ooodc 2026-04-28 11:45:36 +08:00
parent fa3354db9c
commit 14476bb101
5 changed files with 300 additions and 206 deletions

View File

@ -549,7 +549,8 @@ CLI 中已实现的交互命令包括:
"models": { "models": {
"default": { "default": {
"model_id": "<OPENAI_MODEL_NAME>", "model_id": "<OPENAI_MODEL_NAME>",
"temperature": 0.2 "temperature": 0.2,
"context_window_tokens": 128000
} }
}, },
"agents": { "agents": {

View File

@ -10,7 +10,8 @@
"models": { "models": {
"default": { "default": {
"model_id": "<OPENAI_MODEL_NAME>", "model_id": "<OPENAI_MODEL_NAME>",
"temperature": 0.2 "temperature": 0.7,
"context_window_tokens": 128000
} }
}, },
"agents": { "agents": {

View File

@ -1,13 +1,16 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use crate::agent::{AgentError, AgentProcessResult}; use crate::agent::{AgentError, AgentProcessResult, EmittedMessageHandler};
use crate::bus::message::ToolMessageState; use crate::bus::message::ToolMessageState;
use crate::bus::{ChatMessage, OutboundMessage}; use crate::bus::{ChatMessage, MediaItem, OutboundMessage, SYSTEM_CONTEXT_SCHEDULED_PROMPT};
use crate::config::LLMProviderConfig; use crate::config::LLMProviderConfig;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use super::session::{Session, schedule_background_history_compaction}; use super::session::{
Session, enrich_user_content_with_media_refs, handle_in_chat_command,
schedule_background_history_compaction,
};
const SCHEDULED_TASK_EXECUTION_SYSTEM_PROMPT: &str = "系统说明当前输入来自一次已经触发的定时任务执行。你现在需要执行任务内容本身而不是创建、修改、恢复、暂停或查询新的定时任务。除非当前任务内容明确要求管理调度器否则不要调用任何定时任务管理工具像“每小时”、“每天”、“cron”、“定时”等词只应视为任务背景不应再解释为新的建任务请求。"; const SCHEDULED_TASK_EXECUTION_SYSTEM_PROMPT: &str = "系统说明当前输入来自一次已经触发的定时任务执行。你现在需要执行任务内容本身而不是创建、修改、恢复、暂停或查询新的定时任务。除非当前任务内容明确要求管理调度器否则不要调用任何定时任务管理工具像“每小时”、“每天”、“cron”、“定时”等词只应视为任务背景不应再解释为新的建任务请求。";
@ -56,6 +59,28 @@ pub(crate) struct FinalizedAgentResult {
pub(crate) should_schedule_compaction: bool, pub(crate) should_schedule_compaction: bool,
} }
pub(crate) struct MessageExecutionRequest<'a> {
pub(crate) session: Arc<Mutex<Session>>,
pub(crate) channel_name: &'a str,
pub(crate) sender_id: &'a str,
pub(crate) chat_id: &'a str,
pub(crate) content: &'a str,
pub(crate) media: Vec<MediaItem>,
pub(crate) live_emitter: Option<Arc<dyn EmittedMessageHandler>>,
}
pub(crate) struct ScheduledExecutionRequest<'a> {
pub(crate) session: Arc<Mutex<Session>>,
pub(crate) channel_name: &'a str,
pub(crate) chat_id: &'a str,
pub(crate) prompt: &'a str,
pub(crate) sender_id: &'a str,
pub(crate) provider_config: LLMProviderConfig,
pub(crate) fresh_session: bool,
pub(crate) system_prompt: Option<&'a str>,
pub(crate) metadata: &'a HashMap<String, String>,
}
impl AgentExecutionService { impl AgentExecutionService {
pub(crate) fn new(show_tool_results: bool) -> Self { pub(crate) fn new(show_tool_results: bool) -> Self {
Self { show_tool_results } Self { show_tool_results }
@ -115,6 +140,136 @@ impl AgentExecutionService {
}) })
} }
pub(crate) async fn prepare_and_execute_message(
&self,
request: MessageExecutionRequest<'_>,
) -> Result<Vec<OutboundMessage>, AgentError> {
let (history, agent, user_message) = {
let mut session_guard = request.session.lock().await;
session_guard.ensure_persistent_session(request.chat_id)?;
session_guard.ensure_chat_loaded(request.chat_id)?;
if let Some(command_response) =
handle_in_chat_command(&mut session_guard, request.chat_id, request.content)?
{
return Ok(vec![OutboundMessage::assistant(
request.channel_name.to_string(),
request.chat_id.to_string(),
command_response,
None,
HashMap::new(),
)]);
}
session_guard.ensure_agent_prompt_before_user_message(request.chat_id)?;
let media_refs: Vec<String> = request
.media
.iter()
.map(|media| media.path.clone())
.collect();
#[cfg(debug_assertions)]
if !media_refs.is_empty() {
tracing::debug!(media_count = %request.media.len(), media_refs = ?media_refs, "Adding user message with media");
}
let enriched_content =
enrich_user_content_with_media_refs(request.content, &media_refs)?;
let user_message = session_guard.create_user_message(&enriched_content, media_refs);
session_guard.append_persisted_message(request.chat_id, user_message.clone())?;
let history = session_guard.get_or_create_history(request.chat_id).clone();
session_guard.record_skill_offer(request.chat_id)?;
let mut agent = session_guard.create_agent(
request.chat_id,
Some(request.sender_id),
Some(&user_message.id),
)?;
if let Some(handler) = request.live_emitter.clone() {
agent = agent.with_emitted_message_handler(handler);
}
(history, agent, user_message)
};
let result = agent.process(history).await?;
let metadata = HashMap::new();
self.finalize_result_and_schedule_compaction(
request.session.clone(),
FinalizeAgentResultRequest {
channel_name: request.channel_name,
chat_id: request.chat_id,
user_message: &user_message,
result,
metadata: &metadata,
suppress_live_tool_calls: request.live_emitter.is_some(),
execution_kind: "message",
},
)
.await
}
pub(crate) async fn prepare_and_execute_scheduled_task(
&self,
request: ScheduledExecutionRequest<'_>,
) -> Result<Vec<OutboundMessage>, AgentError> {
let (history, agent, user_message) = {
let mut session_guard = request.session.lock().await;
session_guard.ensure_persistent_session(request.chat_id)?;
if request.fresh_session {
session_guard.reset_chat_context(request.chat_id)?;
}
session_guard.ensure_chat_loaded(request.chat_id)?;
session_guard.ensure_agent_prompt_before_user_message(request.chat_id)?;
let scheduled_system_prompt =
compose_scheduled_task_system_prompt(request.system_prompt);
session_guard.append_persisted_message(
request.chat_id,
ChatMessage::system_with_context(
&scheduled_system_prompt,
Some(SYSTEM_CONTEXT_SCHEDULED_PROMPT.to_string()),
),
)?;
let user_message = session_guard.create_user_message(request.prompt, Vec::new());
session_guard.append_persisted_message(request.chat_id, user_message.clone())?;
let history = session_guard.get_or_create_history(request.chat_id).clone();
session_guard.record_skill_offer(request.chat_id)?;
let agent = session_guard.create_agent_with_provider_config(
request.chat_id,
Some(request.sender_id),
Some(&user_message.id),
request.provider_config.clone(),
)?;
(history, agent, user_message)
};
let result = agent.process(history).await?;
self.finalize_result_and_schedule_compaction(
request.session.clone(),
FinalizeAgentResultRequest {
channel_name: request.channel_name,
chat_id: request.chat_id,
user_message: &user_message,
result,
metadata: request.metadata,
suppress_live_tool_calls: false,
execution_kind: "scheduled_task",
},
)
.await
}
pub(crate) async fn finalize_result_and_schedule_compaction( pub(crate) async fn finalize_result_and_schedule_compaction(
&self, &self,
session: Arc<Mutex<Session>>, session: Arc<Mutex<Session>>,

View File

@ -1,8 +1,7 @@
use crate::agent::{AgentError, AgentLoop, ContextCompressor, EmittedMessageHandler}; use crate::agent::{AgentError, AgentLoop, ContextCompressor, EmittedMessageHandler};
use crate::bus::{ #[cfg(test)]
ChatMessage, MessageBus, OutboundMessage, SYSTEM_CONTEXT_AGENT_PROMPT, use crate::bus::SYSTEM_CONTEXT_SCHEDULED_PROMPT;
SYSTEM_CONTEXT_SCHEDULED_PROMPT, use crate::bus::{ChatMessage, MessageBus, OutboundMessage, SYSTEM_CONTEXT_AGENT_PROMPT};
};
use crate::config::LLMProviderConfig; use crate::config::LLMProviderConfig;
use crate::protocol::WsOutbound; use crate::protocol::WsOutbound;
use crate::skills::SkillRuntime; use crate::skills::SkillRuntime;
@ -20,7 +19,7 @@ use tokio::sync::{Mutex, mpsc};
use uuid::Uuid; use uuid::Uuid;
use super::execution::{ use super::execution::{
AgentExecutionService, FinalizeAgentResultRequest, compose_scheduled_task_system_prompt, AgentExecutionService, MessageExecutionRequest, ScheduledExecutionRequest,
select_provider_config, should_display_message_to_user, select_provider_config, should_display_message_to_user,
}; };
#[cfg(test)] #[cfg(test)]
@ -42,7 +41,7 @@ fn preview_text(content: &str, max_chars: usize) -> String {
preview.replace('\n', "\\n") preview.replace('\n', "\\n")
} }
fn enrich_user_content_with_media_refs( pub(crate) fn enrich_user_content_with_media_refs(
content: &str, content: &str,
media_refs: &[String], media_refs: &[String],
) -> Result<String, AgentError> { ) -> Result<String, AgentError> {
@ -921,66 +920,16 @@ impl SessionManager {
.await .await
.ok_or_else(|| AgentError::Other("Session not found".to_string()))?; .ok_or_else(|| AgentError::Other("Session not found".to_string()))?;
// 处理消息
let (history, agent, user_message) = {
let mut session_guard = session.lock().await;
session_guard.ensure_persistent_session(chat_id)?;
session_guard.ensure_chat_loaded(chat_id)?;
if let Some(command_response) =
handle_in_chat_command(&mut session_guard, chat_id, content)?
{
return Ok(vec![OutboundMessage::assistant(
channel_name.to_string(),
chat_id.to_string(),
command_response,
None,
HashMap::new(),
)]);
}
session_guard.ensure_agent_prompt_before_user_message(chat_id)?;
// 添加用户消息到历史
let media_refs: Vec<String> = media.iter().map(|m| m.path.clone()).collect();
#[cfg(debug_assertions)]
if !media_refs.is_empty() {
tracing::debug!(media_count = %media.len(), media_refs = ?media_refs, "Adding user message with media");
}
let enriched_content = enrich_user_content_with_media_refs(content, &media_refs)?;
let user_message = session_guard.create_user_message(&enriched_content, media_refs);
session_guard.append_persisted_message(chat_id, user_message.clone())?;
let history = session_guard.get_or_create_history(chat_id).clone();
session_guard.record_skill_offer(chat_id)?;
// 创建 agent 并处理
let mut agent =
session_guard.create_agent(chat_id, Some(sender_id), Some(&user_message.id))?;
if let Some(handler) = live_emitter.clone() {
agent = agent.with_emitted_message_handler(handler);
}
(history, agent, user_message)
};
let result = agent.process(history).await?;
let metadata = HashMap::new();
let outbound_messages = AgentExecutionService::new(self.show_tool_results) let outbound_messages = AgentExecutionService::new(self.show_tool_results)
.finalize_result_and_schedule_compaction( .prepare_and_execute_message(MessageExecutionRequest {
session.clone(), session: session.clone(),
FinalizeAgentResultRequest { channel_name,
channel_name, sender_id,
chat_id, chat_id,
user_message: &user_message, content,
result, media,
metadata: &metadata, live_emitter,
suppress_live_tool_calls: live_emitter.is_some(), })
execution_kind: "message",
},
)
.await?; .await?;
#[cfg(debug_assertions)] #[cfg(debug_assertions)]
@ -1015,60 +964,18 @@ impl SessionManager {
.unwrap_or_else(|| "scheduler".to_string()); .unwrap_or_else(|| "scheduler".to_string());
let provider_config = self.provider_config_for_agent(options.agent.as_deref())?; let provider_config = self.provider_config_for_agent(options.agent.as_deref())?;
let (history, agent, user_message) = {
let mut session_guard = session.lock().await;
session_guard.ensure_persistent_session(chat_id)?;
if options.fresh_session {
session_guard.reset_chat_context(chat_id)?;
}
session_guard.ensure_chat_loaded(chat_id)?;
session_guard.ensure_agent_prompt_before_user_message(chat_id)?;
let scheduled_system_prompt =
compose_scheduled_task_system_prompt(options.system_prompt.as_deref());
session_guard.append_persisted_message(
chat_id,
ChatMessage::system_with_context(
&scheduled_system_prompt,
Some(SYSTEM_CONTEXT_SCHEDULED_PROMPT.to_string()),
),
)?;
let user_message = session_guard.create_user_message(prompt, Vec::new());
session_guard.append_persisted_message(chat_id, user_message.clone())?;
let history = session_guard.get_or_create_history(chat_id).clone();
session_guard.record_skill_offer(chat_id)?;
let agent = session_guard.create_agent_with_provider_config(
chat_id,
Some(&sender_id),
Some(&user_message.id),
provider_config.clone(),
)?;
(history, agent, user_message)
};
let result = agent.process(history).await?;
AgentExecutionService::new(self.show_tool_results) AgentExecutionService::new(self.show_tool_results)
.finalize_result_and_schedule_compaction( .prepare_and_execute_scheduled_task(ScheduledExecutionRequest {
session.clone(), session: session.clone(),
FinalizeAgentResultRequest { channel_name,
channel_name, chat_id,
chat_id, prompt,
user_message: &user_message, sender_id: &sender_id,
result, provider_config,
metadata: &options.metadata, fresh_session: options.fresh_session,
suppress_live_tool_calls: false, system_prompt: options.system_prompt.as_deref(),
execution_kind: "scheduled_task", metadata: &options.metadata,
}, })
)
.await .await
} }

View File

@ -1,10 +1,11 @@
use super::{ use super::{
GatewayState, GatewayState,
session::{Session, handle_in_chat_command, schedule_background_history_compaction}, execution::{AgentExecutionService, MessageExecutionRequest, should_display_message_to_user},
session::Session,
}; };
use crate::agent::EmittedMessageHandler; use crate::agent::{AgentError, EmittedMessageHandler};
use crate::bus::ChatMessage; use crate::bus::message::{OutboundEventKind, ToolMessageState, format_tool_call_content};
use crate::bus::message::{ToolMessageState, format_tool_call_content}; use crate::bus::{ChatMessage, OutboundMessage};
use crate::protocol::{SessionSummary, WsInbound, WsOutbound, parse_inbound, serialize_outbound}; use crate::protocol::{SessionSummary, WsInbound, WsOutbound, parse_inbound, serialize_outbound};
use async_trait::async_trait; use async_trait::async_trait;
use axum::extract::State; use axum::extract::State;
@ -14,6 +15,8 @@ use futures_util::{SinkExt, StreamExt};
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::{Mutex, mpsc}; use tokio::sync::{Mutex, mpsc};
const CLI_CHANNEL_NAME: &str = "cli";
struct WsToolCallEmitter { struct WsToolCallEmitter {
sender: mpsc::Sender<WsOutbound>, sender: mpsc::Sender<WsOutbound>,
show_tool_results: bool, show_tool_results: bool,
@ -57,7 +60,7 @@ async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
} }
}; };
let channel_name = "cli".to_string(); let channel_name = CLI_CHANNEL_NAME.to_string();
// 创建 CLI session // 创建 CLI session
let session = match Session::new( let session = match Session::new(
@ -229,19 +232,55 @@ fn ws_outbound_from_chat_message(message: &ChatMessage) -> Vec<WsOutbound> {
} }
} }
fn should_display_message_to_user(show_tool_results: bool, message: &ChatMessage) -> bool { fn ws_outbound_from_outbound_message(message: &OutboundMessage) -> Vec<WsOutbound> {
if message.role != "tool" { match message.event_kind {
return true; OutboundEventKind::AssistantResponse | OutboundEventKind::SchedulerNotification => {
vec![WsOutbound::AssistantResponse {
id: uuid::Uuid::new_v4().to_string(),
content: message.content.clone(),
role: message.role.clone(),
}]
}
OutboundEventKind::ToolCall => vec![WsOutbound::ToolCall {
id: message
.tool_call_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
tool_call_id: message.tool_call_id.clone().unwrap_or_default(),
tool_name: message.tool_name.clone().unwrap_or_default(),
arguments: message
.tool_arguments
.clone()
.unwrap_or(serde_json::Value::Null),
content: message.content.clone(),
role: message.role.clone(),
}],
OutboundEventKind::ToolResult => vec![WsOutbound::ToolResult {
id: message
.tool_call_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
tool_call_id: message.tool_call_id.clone().unwrap_or_default(),
tool_name: message.tool_name.clone().unwrap_or_default(),
content: message.content.clone(),
role: message.role.clone(),
}],
OutboundEventKind::ToolPending => vec![WsOutbound::ToolPending {
id: message
.tool_call_id
.clone()
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
tool_call_id: message.tool_call_id.clone().unwrap_or_default(),
tool_name: message.tool_name.clone().unwrap_or_default(),
content: message.content.clone(),
role: message.role.clone(),
resume_hint: "完成外部操作后,直接发一条继续消息即可。".to_string(),
}],
OutboundEventKind::ErrorNotification => vec![WsOutbound::Error {
code: "AGENT_ERROR".to_string(),
message: message.content.clone(),
}],
} }
show_tool_results
|| matches!(
message
.tool_state
.as_ref()
.unwrap_or(&ToolMessageState::Completed),
ToolMessageState::PendingUserAction
)
} }
async fn handle_inbound( async fn handle_inbound(
@ -260,83 +299,42 @@ async fn handle_inbound(
} => { } => {
let chat_id = chat_id.unwrap_or_else(|| current_session_id.clone()); let chat_id = chat_id.unwrap_or_else(|| current_session_id.clone());
let sender_id = resolve_ws_sender_id(sender_id.as_deref(), runtime_session_id); let sender_id = resolve_ws_sender_id(sender_id.as_deref(), runtime_session_id);
let (history, agent, user_tx) = { let user_tx = session.lock().await.user_tx.clone();
let mut session_guard = session.lock().await; let live_emitter = Arc::new(WsToolCallEmitter {
sender: user_tx.clone(),
show_tool_results: state.config.gateway.show_tool_results,
});
session_guard.ensure_persistent_session(&chat_id)?; match AgentExecutionService::new(state.config.gateway.show_tool_results)
session_guard.ensure_chat_loaded(&chat_id)?; .prepare_and_execute_message(MessageExecutionRequest {
session: session.clone(),
if let Some(command_response) = channel_name: CLI_CHANNEL_NAME,
handle_in_chat_command(&mut session_guard, &chat_id, &content)? sender_id: &sender_id,
{ chat_id: &chat_id,
let _ = session_guard content: &content,
.send(WsOutbound::AssistantResponse { media: Vec::new(),
id: uuid::Uuid::new_v4().to_string(), live_emitter: Some(live_emitter),
content: command_response, })
role: "assistant".to_string(), .await
}) {
.await; Ok(outbound_messages) => {
return Ok(()); for outbound in outbound_messages
}
session_guard.ensure_agent_prompt_before_user_message(&chat_id)?;
let user_message = session_guard.create_user_message(&content, Vec::new());
let user_message_id = user_message.id.clone();
session_guard.append_persisted_message(&chat_id, user_message)?;
let history = session_guard.get_or_create_history(&chat_id).clone();
session_guard.record_skill_offer(&chat_id)?;
let live_emitter = Arc::new(WsToolCallEmitter {
sender: session_guard.user_tx.clone(),
show_tool_results: state.config.gateway.show_tool_results,
});
let agent = session_guard
.create_agent(&chat_id, Some(&sender_id), Some(&user_message_id))?
.with_emitted_message_handler(live_emitter);
(history, agent, session_guard.user_tx.clone())
};
match agent.process(history).await {
Ok(result) => {
let mut session_guard = session.lock().await;
session_guard
.append_persisted_messages(&chat_id, result.emitted_messages.clone())?;
for outbound in result
.emitted_messages
.iter() .iter()
.filter(|message| { .flat_map(ws_outbound_from_outbound_message)
!message.is_assistant_tool_call_message()
&& should_display_message_to_user(
state.config.gateway.show_tool_results,
message,
)
})
.flat_map(ws_outbound_from_chat_message)
{ {
let _ = session_guard.send(outbound).await; let _ = user_tx.send(outbound).await;
}
drop(session_guard);
if let Err(error) =
schedule_background_history_compaction(session.clone(), chat_id.clone())
.await
{
tracing::warn!(chat_id = %chat_id, error = %error, "Failed to schedule background history compaction for CLI session");
} }
} }
Err(error) => { Err(AgentError::LlmError(error)) => {
tracing::error!(chat_id = %chat_id, error = %error, "Agent process error"); tracing::error!(chat_id = %chat_id, error = %error, "Agent process error");
let _ = user_tx let _ = user_tx
.send(WsOutbound::Error { .send(WsOutbound::Error {
code: "LLM_ERROR".to_string(), code: "LLM_ERROR".to_string(),
message: error.to_string(), message: error,
}) })
.await; .await;
} }
Err(error) => return Err(error),
} }
Ok(()) Ok(())
@ -483,8 +481,8 @@ mod tests {
ws_outbound_from_chat_message, ws_outbound_from_chat_message,
}; };
use crate::agent::EmittedMessageHandler; use crate::agent::EmittedMessageHandler;
use crate::bus::ChatMessage;
use crate::bus::message::ToolMessageState; use crate::bus::message::ToolMessageState;
use crate::bus::{ChatMessage, OutboundMessage};
use crate::protocol::WsOutbound; use crate::protocol::WsOutbound;
use crate::providers::ToolCall; use crate::providers::ToolCall;
use serde_json::json; use serde_json::json;
@ -579,6 +577,38 @@ mod tests {
assert!(should_display_message_to_user(true, &completed)); assert!(should_display_message_to_user(true, &completed));
} }
#[test]
fn test_ws_outbound_from_outbound_message_maps_tool_call() {
let message = OutboundMessage::tool_call(
"cli",
"session-1",
"call-1",
"calculator",
json!({"expression": "1 + 1"}),
None,
Default::default(),
);
let outbound = super::ws_outbound_from_outbound_message(&message);
assert_eq!(outbound.len(), 1);
match &outbound[0] {
WsOutbound::ToolCall {
tool_call_id,
tool_name,
arguments,
content,
..
} => {
assert_eq!(tool_call_id, "call-1");
assert_eq!(tool_name, "calculator");
assert_eq!(arguments["expression"], "1 + 1");
assert_eq!(content, "calculator\nargs: {\"expression\":\"1 + 1\"}");
}
other => panic!("unexpected outbound variant: {:?}", other),
}
}
#[test] #[test]
fn test_resolve_ws_sender_id_prefers_inbound_sender() { fn test_resolve_ws_sender_id_prefers_inbound_sender() {
assert_eq!( assert_eq!(