feat: 支持分离 session_chat_id 和 notification_chat_id,优化任务执行逻辑

This commit is contained in:
oudecheng 2026-05-13 09:39:05 +08:00
parent 9d9fa1dc4b
commit cadc5e5577
6 changed files with 148 additions and 15 deletions

View File

@ -13,7 +13,8 @@ pub(crate) struct AgentFactory {
pub(crate) struct AgentBuildRequest<'a> { pub(crate) struct AgentBuildRequest<'a> {
pub(crate) channel_name: &'a str, pub(crate) channel_name: &'a str,
pub(crate) chat_id: &'a str, pub(crate) session_chat_id: &'a str,
pub(crate) notification_chat_id: Option<&'a str>,
pub(crate) sender_id: Option<&'a str>, pub(crate) sender_id: Option<&'a str>,
pub(crate) message_id: Option<&'a str>, pub(crate) message_id: Option<&'a str>,
pub(crate) provider_config: LLMProviderConfig, pub(crate) provider_config: LLMProviderConfig,
@ -25,17 +26,21 @@ impl AgentFactory {
} }
pub(crate) fn create(&self, request: AgentBuildRequest<'_>) -> Result<AgentLoop, AgentError> { pub(crate) fn create(&self, request: AgentBuildRequest<'_>) -> Result<AgentLoop, AgentError> {
let session_id = persistent_session_id(request.channel_name, request.chat_id); let session_id = persistent_session_id(request.channel_name, request.session_chat_id);
AgentLoop::with_tools_and_skill_provider( AgentLoop::with_tools_and_skill_provider(
request.provider_config, request.provider_config,
self.tools.clone(), self.tools.clone(),
self.skills.clone(), self.skills.clone(),
) )
.map(|agent| { .map(|agent| {
// notification_chat_id 优先,否则使用 session_chat_id
let tool_chat_id = request
.notification_chat_id
.unwrap_or(request.session_chat_id);
agent.with_tool_context(ToolContext { agent.with_tool_context(ToolContext {
channel_name: Some(request.channel_name.to_string()), channel_name: Some(request.channel_name.to_string()),
sender_id: request.sender_id.map(str::to_string), sender_id: request.sender_id.map(str::to_string),
chat_id: Some(request.chat_id.to_string()), chat_id: Some(tool_chat_id.to_string()),
session_id: Some(session_id), session_id: Some(session_id),
message_id: request.message_id.map(str::to_string), message_id: request.message_id.map(str::to_string),
message_seq: None, message_seq: None,

View File

@ -30,6 +30,19 @@ impl AgentTaskExecutor {
.run_scheduled_agent_task(channel_name, chat_id, prompt, options) .run_scheduled_agent_task(channel_name, chat_id, prompt, options)
.await .await
} }
async fn execute_silent_agent_task(
&self,
channel_name: &str,
session_chat_id: &str,
notification_chat_id: Option<&str>,
prompt: &str,
options: ScheduledAgentTaskOptions,
) -> Result<Vec<OutboundMessage>, AgentError> {
self.session_manager
.run_silent_agent_task(channel_name, session_chat_id, notification_chat_id, prompt, options)
.await
}
} }
#[async_trait] #[async_trait]
@ -45,6 +58,25 @@ impl SchedulerAgentTaskExecutor for AgentTaskExecutor {
.await .await
.map_err(|error| anyhow::anyhow!(error.to_string())) .map_err(|error| anyhow::anyhow!(error.to_string()))
} }
async fn execute_silent(
&self,
channel_name: &str,
session_chat_id: &str,
notification_chat_id: Option<&str>,
prompt: &str,
options: ScheduledAgentTaskOptions,
) -> anyhow::Result<Vec<OutboundMessage>> {
self.execute_silent_agent_task(
channel_name,
session_chat_id,
notification_chat_id,
prompt,
options,
)
.await
.map_err(|error| anyhow::anyhow!(error.to_string()))
}
} }
#[derive(Clone)] #[derive(Clone)]

View File

@ -60,6 +60,7 @@ pub(crate) struct ScheduledExecutionRequest<'a> {
pub(crate) session: Arc<Mutex<Session>>, pub(crate) session: Arc<Mutex<Session>>,
pub(crate) channel_name: &'a str, pub(crate) channel_name: &'a str,
pub(crate) chat_id: &'a str, pub(crate) chat_id: &'a str,
pub(crate) notification_chat_id: Option<&'a str>,
pub(crate) prompt: &'a str, pub(crate) prompt: &'a str,
pub(crate) sender_id: &'a str, pub(crate) sender_id: &'a str,
pub(crate) provider_config: LLMProviderConfig, pub(crate) provider_config: LLMProviderConfig,
@ -232,6 +233,7 @@ impl AgentExecutionService {
let agent = session_guard.create_agent_with_provider_config( let agent = session_guard.create_agent_with_provider_config(
request.chat_id, request.chat_id,
request.notification_chat_id, // 传入真实 chat_id
Some(request.sender_id), Some(request.sender_id),
Some(&user_message.id), Some(&user_message.id),
request.provider_config.clone(), request.provider_config.clone(),

View File

@ -30,6 +30,7 @@ impl ScheduledAgentTaskService {
&self, &self,
channel_name: &str, channel_name: &str,
chat_id: &str, chat_id: &str,
notification_chat_id: Option<&str>,
prompt: &str, prompt: &str,
options: ScheduledAgentTaskOptions, options: ScheduledAgentTaskOptions,
) -> Result<Vec<OutboundMessage>, AgentError> { ) -> Result<Vec<OutboundMessage>, AgentError> {
@ -45,6 +46,7 @@ impl ScheduledAgentTaskService {
session, session,
channel_name, channel_name,
chat_id, chat_id,
notification_chat_id,
prompt, prompt,
sender_id: &sender_id, sender_id: &sender_id,
provider_config, provider_config,

View File

@ -311,6 +311,7 @@ impl Session {
) -> Result<AgentLoop, AgentError> { ) -> Result<AgentLoop, AgentError> {
self.create_agent_with_provider_config( self.create_agent_with_provider_config(
chat_id, chat_id,
None, // notification_chat_id = None使用 session_chat_id
sender_id, sender_id,
message_id, message_id,
self.provider_config.clone(), self.provider_config.clone(),
@ -319,14 +320,16 @@ impl Session {
pub fn create_agent_with_provider_config( pub fn create_agent_with_provider_config(
&self, &self,
chat_id: &str, session_chat_id: &str,
notification_chat_id: Option<&str>,
sender_id: Option<&str>, sender_id: Option<&str>,
message_id: Option<&str>, message_id: Option<&str>,
provider_config: LLMProviderConfig, provider_config: LLMProviderConfig,
) -> Result<AgentLoop, AgentError> { ) -> Result<AgentLoop, AgentError> {
self.agent_factory.create(AgentBuildRequest { self.agent_factory.create(AgentBuildRequest {
channel_name: &self.channel_name, channel_name: &self.channel_name,
chat_id, session_chat_id,
notification_chat_id,
sender_id, sender_id,
message_id, message_id,
provider_config, provider_config,
@ -480,7 +483,21 @@ impl SessionManager {
options: ScheduledAgentTaskOptions, options: ScheduledAgentTaskOptions,
) -> Result<Vec<OutboundMessage>, AgentError> { ) -> Result<Vec<OutboundMessage>, AgentError> {
self.scheduled_tasks self.scheduled_tasks
.run(channel_name, chat_id, prompt, options) .run(channel_name, chat_id, None, prompt, options)
.await
}
/// 执行 SilentAgentTask支持 notification_chat_id 分离
pub async fn run_silent_agent_task(
&self,
channel_name: &str,
session_chat_id: &str,
notification_chat_id: Option<&str>,
prompt: &str,
options: ScheduledAgentTaskOptions,
) -> Result<Vec<OutboundMessage>, AgentError> {
self.scheduled_tasks
.run(channel_name, session_chat_id, notification_chat_id, prompt, options)
.await .await
} }

View File

@ -43,6 +43,16 @@ pub trait AgentTaskExecutor: Send + Sync {
prompt: &str, prompt: &str,
options: ScheduledAgentTaskOptions, options: ScheduledAgentTaskOptions,
) -> anyhow::Result<Vec<OutboundMessage>>; ) -> anyhow::Result<Vec<OutboundMessage>>;
/// 执行 SilentAgentTask支持 session_chat_id 和 notification_chat_id 分离
async fn execute_silent(
&self,
channel_name: &str,
session_chat_id: &str,
notification_chat_id: Option<&str>,
prompt: &str,
options: ScheduledAgentTaskOptions,
) -> anyhow::Result<Vec<OutboundMessage>>;
} }
#[async_trait] #[async_trait]
@ -303,10 +313,61 @@ impl Scheduler {
} }
} }
SchedulerJobKind::SilentAgentTask => { SchedulerJobKind::SilentAgentTask => {
let execution_chat_id = resolve_execution_chat_id(job)?; // Session 使用虚拟 chat_idscheduler/job_id
if let Err(error) = let session_chat_id = resolve_execution_chat_id(job)?;
execute_agent_task(self.agent_task_executor.as_ref(), job, &execution_chat_id)
.await // ToolContext 使用真实 chat_idtarget.chat_id
let notification_chat_id = job.target.chat_id.as_deref();
tracing::info!(
job_id = %job.id,
session_chat_id = %session_chat_id,
notification_chat_id = ?notification_chat_id,
"Executing silent agent task"
);
// 先提取参数,如果失败需要手动发送错误通知
let prompt = match extract_prompt(job) {
Ok(p) => p,
Err(e) => {
if let Err(notify_error) =
self.notify_silent_agent_task_failure(job, &e).await
{
tracing::error!(
job_id = %job.id,
error = %notify_error,
"Failed to publish silent scheduler failure notification"
);
}
return Err(e);
}
};
let options = match parse_scheduled_agent_task_options(job) {
Ok(o) => o,
Err(e) => {
if let Err(notify_error) =
self.notify_silent_agent_task_failure(job, &e).await
{
tracing::error!(
job_id = %job.id,
error = %notify_error,
"Failed to publish silent scheduler failure notification"
);
}
return Err(e);
}
};
if let Err(error) = self
.agent_task_executor
.execute_silent(
job.target.channel.as_deref().unwrap_or_default(),
&session_chat_id,
notification_chat_id,
prompt,
options,
)
.await
{ {
if let Err(notify_error) = if let Err(notify_error) =
self.notify_silent_agent_task_failure(job, &error).await self.notify_silent_agent_task_failure(job, &error).await
@ -894,11 +955,7 @@ async fn execute_agent_task(
.channel .channel
.as_deref() .as_deref()
.ok_or_else(|| anyhow::anyhow!("scheduled agent task requires target.channel"))?; .ok_or_else(|| anyhow::anyhow!("scheduled agent task requires target.channel"))?;
let prompt = job let prompt = extract_prompt(job)?;
.payload
.get("prompt")
.and_then(|value| value.as_str())
.ok_or_else(|| anyhow::anyhow!("agent_task payload.prompt must be a string"))?;
let options = parse_scheduled_agent_task_options(job)?; let options = parse_scheduled_agent_task_options(job)?;
agent_task_executor agent_task_executor
@ -906,6 +963,13 @@ async fn execute_agent_task(
.await .await
} }
fn extract_prompt(job: &RuntimeJob) -> anyhow::Result<&str> {
job.payload
.get("prompt")
.and_then(|value| value.as_str())
.ok_or_else(|| anyhow::anyhow!("agent_task payload.prompt must be a string"))
}
fn required_notification_chat_id<'a>( fn required_notification_chat_id<'a>(
job: &'a RuntimeJob, job: &'a RuntimeJob,
kind_name: &str, kind_name: &str,
@ -1238,6 +1302,17 @@ mod tests {
) -> anyhow::Result<Vec<OutboundMessage>> { ) -> anyhow::Result<Vec<OutboundMessage>> {
Ok(Vec::new()) Ok(Vec::new())
} }
async fn execute_silent(
&self,
_channel_name: &str,
_session_chat_id: &str,
_notification_chat_id: Option<&str>,
_prompt: &str,
_options: ScheduledAgentTaskOptions,
) -> anyhow::Result<Vec<OutboundMessage>> {
Ok(Vec::new())
}
} }
#[derive(Clone)] #[derive(Clone)]