重构: 将CLI通道统一为CliChatChannel

- 创建CliChatChannel实现Channel trait
- 简化ws.rs为纯粹的传输桥接
- CLI消息通过MessageBus走统一流程
- 重命名命令从agent改为chat
- 通道名称从cli改为cli_chat
This commit is contained in:
xiaoxixi 2026-04-26 12:04:11 +08:00
parent ef80ae27ac
commit 6a3a1b5edf
6 changed files with 355 additions and 299 deletions

310
src/channels/cli_chat.rs Normal file
View File

@ -0,0 +1,310 @@
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::{mpsc, Mutex};
use uuid::Uuid;
use crate::bus::{InboundMessage, MessageBus, OutboundMessage};
use crate::protocol::{parse_inbound, SessionSummary, WsInbound, WsOutbound};
use crate::storage::{SessionStore, SessionRecord};
use super::base::{Channel, ChannelError};
// ============================================================================
// Client - Connected CLI client
// ============================================================================
pub(crate) struct Client {
sender: mpsc::Sender<WsOutbound>,
current_session_id: Mutex<Option<String>>,
}
// ============================================================================
// CliChatChannel - Channel implementation for CLI chat
// ============================================================================
pub struct CliChatChannel {
bus: std::sync::Mutex<Option<Arc<MessageBus>>>,
session_store: Arc<SessionStore>,
clients: Mutex<Vec<Arc<Client>>>,
}
impl CliChatChannel {
pub fn new(session_store: Arc<SessionStore>) -> Self {
Self {
bus: std::sync::Mutex::new(None),
session_store,
clients: Mutex::new(Vec::new()),
}
}
/// Register a new client connection, returns (session_id, client)
pub(crate) async fn register_client(&self, sender: mpsc::Sender<WsOutbound>) -> (String, Arc<Client>) {
let client = Arc::new(Client {
sender,
current_session_id: Mutex::new(None),
});
self.clients.lock().await.push(client.clone());
// Create initial session
let session_id = match self.create_session(None) {
Ok(record) => record.id,
Err(e) => {
tracing::error!(error = %e, "Failed to create initial session");
Uuid::new_v4().to_string()
}
};
// Set current session id in client
{
let mut current = client.current_session_id.lock().await;
*current = Some(session_id.clone());
}
(session_id, client)
}
/// Handle an inbound message from a client
pub(crate) async fn handle_inbound(&self, client: Arc<Client>, raw_msg: &str) {
match parse_inbound(raw_msg) {
Ok(inbound) => {
match self.handle_ws_inbound(client.clone(), inbound).await {
Ok(()) => {}
Err(e) => {
tracing::warn!(error = %e, "Failed to handle inbound message");
let _ = client
.sender
.send(WsOutbound::Error {
code: "INTERNAL_ERROR".to_string(),
message: e.to_string(),
})
.await;
}
}
}
Err(e) => {
tracing::warn!(error = %e, "Failed to parse inbound message");
let _ = client
.sender
.send(WsOutbound::Error {
code: "PARSE_ERROR".to_string(),
message: e.to_string(),
})
.await;
}
}
}
async fn handle_ws_inbound(&self, client: Arc<Client>, inbound: WsInbound) -> Result<(), ChannelError> {
let bus = {
let guard = self.bus.lock().unwrap();
guard.clone().ok_or_else(|| ChannelError::Other("Channel not started".to_string()))?
};
let mut current_session_guard = client.current_session_id.lock().await;
match inbound {
WsInbound::UserInput { content, chat_id, .. } => {
let chat_id = chat_id.or(current_session_guard.clone()).unwrap_or_else(|| Uuid::new_v4().to_string());
// If no session, create one
let session_id = if current_session_guard.is_none() {
let record = self.create_session(None)?;
record.id
} else {
chat_id.clone()
};
// Update client's current session
*current_session_guard = Some(session_id.clone());
// Publish to bus
let msg = InboundMessage {
channel: self.name().to_string(),
sender_id: "cli".to_string(),
chat_id: session_id.clone(),
content,
timestamp: current_timestamp(),
media: Vec::new(),
metadata: Default::default(),
forwarded_metadata: Default::default(),
};
bus.publish_inbound(msg).await?;
}
WsInbound::ClearHistory { chat_id, session_id } => {
let target = session_id
.or(chat_id)
.or(current_session_guard.clone())
.ok_or_else(|| ChannelError::Other("No active session".to_string()))?;
self.session_store
.clear_messages(&target)
.map_err(|e| ChannelError::Other(e.to_string()))?;
let _ = client
.sender
.send(WsOutbound::HistoryCleared { session_id: target })
.await;
}
WsInbound::CreateSession { title } => {
let record = self.create_session(title.as_deref())?;
*current_session_guard = Some(record.id.clone());
let _ = client
.sender
.send(WsOutbound::SessionCreated {
session_id: record.id,
title: record.title,
})
.await;
}
WsInbound::ListSessions { include_archived } => {
let records = self.session_store
.list_sessions("cli_chat", include_archived)
.map_err(|e| ChannelError::Other(e.to_string()))?;
let summaries: Vec<_> = records.into_iter().map(to_session_summary).collect();
let _ = client
.sender
.send(WsOutbound::SessionList {
sessions: summaries,
current_session_id: current_session_guard.clone(),
})
.await;
}
WsInbound::LoadSession { session_id } => {
let Some(record) = self.session_store
.get_session(&session_id)
.map_err(|e| ChannelError::Other(e.to_string()))?
else {
let _ = client
.sender
.send(WsOutbound::Error {
code: "SESSION_NOT_FOUND".to_string(),
message: format!("Session not found: {}", session_id),
})
.await;
return Ok(());
};
*current_session_guard = Some(record.id.clone());
let _ = client
.sender
.send(WsOutbound::SessionLoaded {
session_id: record.id,
title: record.title,
message_count: record.message_count,
})
.await;
}
WsInbound::RenameSession { session_id, title } => {
let target = session_id.or(current_session_guard.clone()).ok_or_else(|| {
ChannelError::Other("No active session".to_string())
})?;
self.session_store
.rename_session(&target, &title)
.map_err(|e| ChannelError::Other(e.to_string()))?;
let _ = client
.sender
.send(WsOutbound::SessionRenamed {
session_id: target,
title,
})
.await;
}
WsInbound::ArchiveSession { session_id } => {
let target = session_id.or(current_session_guard.clone()).ok_or_else(|| {
ChannelError::Other("No active session".to_string())
})?;
self.session_store
.archive_session(&target)
.map_err(|e| ChannelError::Other(e.to_string()))?;
let _ = client
.sender
.send(WsOutbound::SessionArchived { session_id: target })
.await;
}
WsInbound::DeleteSession { session_id } => {
let target = session_id.or(current_session_guard.clone()).ok_or_else(|| {
ChannelError::Other("No active session".to_string())
})?;
self.session_store
.delete_session(&target)
.map_err(|e| ChannelError::Other(e.to_string()))?;
let _ = client
.sender
.send(WsOutbound::SessionDeleted { session_id: target.clone() })
.await;
// If deleting current session, create a new one
if current_session_guard.as_deref() == Some(&target) {
let new_record = self.create_session(None)?;
*current_session_guard = Some(new_record.id.clone());
let _ = client
.sender
.send(WsOutbound::SessionCreated {
session_id: new_record.id,
title: new_record.title,
})
.await;
}
}
WsInbound::Ping => {
let _ = client.sender.send(WsOutbound::Pong).await;
}
}
Ok(())
}
fn create_session(&self, title: Option<&str>) -> Result<SessionRecord, ChannelError> {
self.session_store
.create_cli_session(title)
.map_err(|e| ChannelError::Other(e.to_string()))
}
}
#[async_trait]
impl Channel for CliChatChannel {
fn name(&self) -> &str {
"cli_chat"
}
fn is_running(&self) -> bool {
self.bus.lock().unwrap().is_some()
}
async fn start(&self, bus: Arc<MessageBus>) -> Result<(), ChannelError> {
*self.bus.lock().unwrap() = Some(bus);
Ok(())
}
async fn stop(&self) -> Result<(), ChannelError> {
*self.bus.lock().unwrap() = None;
Ok(())
}
async fn send(&self, msg: OutboundMessage) -> Result<(), ChannelError> {
let clients = self.clients.lock().await.clone();
for client in clients {
let outbound = WsOutbound::AssistantResponse {
id: Uuid::new_v4().to_string(),
content: msg.content.clone(),
role: "assistant".to_string(),
};
let _ = client.sender.send(outbound).await;
}
Ok(())
}
}
fn to_session_summary(record: SessionRecord) -> SessionSummary {
SessionSummary {
session_id: record.id,
title: record.title,
channel_name: record.channel_name,
chat_id: record.chat_id,
message_count: record.message_count,
last_active_at: record.last_active_at,
archived_at: record.archived_at,
}
}
fn current_timestamp() -> i64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis() as i64
}

View File

@ -1,7 +1,9 @@
pub mod base; pub mod base;
pub mod feishu; pub mod feishu;
pub mod cli_chat;
pub mod manager; pub mod manager;
pub use base::{Channel, ChannelError}; pub use base::{Channel, ChannelError};
pub use manager::ChannelManager; pub use manager::ChannelManager;
pub use feishu::FeishuChannel; pub use feishu::FeishuChannel;
pub use cli_chat::CliChatChannel;

View File

@ -7,7 +7,7 @@ use axum::{routing, Router};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use crate::bus::{MessageBus, OutboundDispatcher}; use crate::bus::{MessageBus, OutboundDispatcher};
use crate::channels::ChannelManager; use crate::channels::{Channel, ChannelManager, CliChatChannel};
use crate::config::Config; use crate::config::Config;
use crate::logging; use crate::logging;
use session::SessionManager; use session::SessionManager;
@ -17,6 +17,7 @@ pub struct GatewayState {
pub session_manager: SessionManager, pub session_manager: SessionManager,
pub channel_manager: ChannelManager, pub channel_manager: ChannelManager,
pub bus: Arc<MessageBus>, pub bus: Arc<MessageBus>,
pub cli_chat_channel: Arc<CliChatChannel>,
} }
impl GatewayState { impl GatewayState {
@ -29,15 +30,21 @@ impl GatewayState {
// Session TTL from config (default 4 hours) // Session TTL from config (default 4 hours)
let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4); let session_ttl_hours = config.gateway.session_ttl_hours.unwrap_or(4);
let session_manager = SessionManager::new(session_ttl_hours, provider_config)?; let session_manager = SessionManager::new(session_ttl_hours, provider_config.clone())?;
let channel_manager = ChannelManager::new(); let channel_manager = ChannelManager::new();
let bus = channel_manager.bus(); let bus = channel_manager.bus();
// Create CLI Chat Channel
let cli_chat_channel = Arc::new(CliChatChannel::new(
session_manager.store().clone(),
));
Ok(Self { Ok(Self {
config, config,
session_manager, session_manager,
channel_manager, channel_manager,
bus, bus,
cli_chat_channel,
}) })
} }
@ -47,6 +54,11 @@ impl GatewayState {
let bus_for_outbound = self.bus.clone(); let bus_for_outbound = self.bus.clone();
let session_manager = self.session_manager.clone(); let session_manager = self.session_manager.clone();
// Start CLI Chat Channel
if let Err(e) = self.cli_chat_channel.start(self.bus.clone()).await {
tracing::error!(error = %e, "Failed to start CLI chat channel");
}
// Spawn inbound message processor // Spawn inbound message processor
// This consumes from bus.inbound, processes via SessionManager, publishes to bus.outbound // This consumes from bus.inbound, processes via SessionManager, publishes to bus.outbound
tokio::spawn(async move { tokio::spawn(async move {
@ -104,11 +116,13 @@ impl GatewayState {
// Spawn outbound dispatcher // Spawn outbound dispatcher
let dispatcher = OutboundDispatcher::new(bus_for_outbound); let dispatcher = OutboundDispatcher::new(bus_for_outbound);
let channel_manager = self.channel_manager.clone(); let channel_manager = self.channel_manager.clone();
let cli_chat_channel = self.cli_chat_channel.clone();
// Register channels with dispatcher // Register channels with dispatcher
if let Some(channel) = channel_manager.get_channel("feishu").await { if let Some(channel) = channel_manager.get_channel("feishu").await {
dispatcher.register_channel("feishu", channel).await; dispatcher.register_channel("feishu", channel).await;
} }
dispatcher.register_channel("cli_chat", cli_chat_channel).await;
tokio::spawn(async move { tokio::spawn(async move {
tracing::info!("Outbound dispatcher started"); tracing::info!("Outbound dispatcher started");

View File

@ -3,328 +3,57 @@ use axum::extract::ws::{WebSocket, WebSocketUpgrade, Message as WsMessage};
use axum::extract::State; use axum::extract::State;
use axum::response::Response; use axum::response::Response;
use futures_util::{SinkExt, StreamExt}; use futures_util::{SinkExt, StreamExt};
use tokio::sync::{mpsc, Mutex}; use tokio::sync::mpsc;
use crate::protocol::{parse_inbound, serialize_outbound, SessionSummary, WsInbound, WsOutbound}; use crate::protocol::serialize_outbound;
use super::{GatewayState, session::{Session, handle_in_chat_command}}; use crate::protocol::WsOutbound;
use super::GatewayState;
pub async fn ws_handler(ws: WebSocketUpgrade, State(state): State<Arc<GatewayState>>) -> Response { pub async fn ws_handler(ws: WebSocketUpgrade, State(state): State<Arc<GatewayState>>) -> Response {
ws.on_upgrade(|socket| async { ws.on_upgrade(|socket| async move {
handle_socket(socket, state).await; handle_socket(socket, state).await;
}) })
} }
async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) { async fn handle_socket(ws: WebSocket, state: Arc<GatewayState>) {
let (sender, receiver) = mpsc::channel::<WsOutbound>(100); // Create channel for sending outbound messages to this client
let (sender, mut receiver) = mpsc::channel::<WsOutbound>(100);
let provider_config = match state.config.get_provider_config("default") { // Register client with CliChatChannel and get initial session id
Ok(cfg) => cfg, let (session_id, client) = state.cli_chat_channel.register_client(sender.clone()).await;
Err(e) => {
tracing::error!(error = %e, "Failed to get provider config");
return;
}
};
let initial_record = match state.session_manager.create_cli_session(None) { // Send session established message
Ok(record) => record, let _ = sender.send(WsOutbound::SessionEstablished {
Err(e) => { session_id: session_id.clone(),
tracing::error!(error = %e, "Failed to create initial CLI session"); }).await;
return;
}
};
let channel_name = "cli".to_string(); tracing::info!(session_id = %session_id, "CLI session established");
// 创建 CLI session
let session = match Session::new(
channel_name.clone(),
provider_config,
sender,
state.session_manager.tools(),
state.session_manager.store(),
)
.await
{
Ok(s) => Arc::new(Mutex::new(s)),
Err(e) => {
tracing::error!(error = %e, "Failed to create session");
return;
}
};
if let Err(e) = session.lock().await.ensure_chat_loaded(&initial_record.id) {
tracing::error!(error = %e, session_id = %initial_record.id, "Failed to load initial CLI session history");
return;
}
let runtime_session_id = session.lock().await.id;
let mut current_session_id = initial_record.id.clone();
tracing::info!(runtime_session_id = %runtime_session_id, session_id = %current_session_id, "CLI session established");
let _ = session
.lock()
.await
.send(WsOutbound::SessionEstablished {
session_id: current_session_id.clone(),
})
.await;
let (mut ws_sender, mut ws_receiver) = ws.split(); let (mut ws_sender, mut ws_receiver) = ws.split();
let mut receiver = receiver; // Task: forward from receiver to WebSocket
let session_id_for_sender = runtime_session_id;
tokio::spawn(async move { tokio::spawn(async move {
while let Some(msg) = receiver.recv().await { while let Some(msg) = receiver.recv().await {
if let Ok(text) = serialize_outbound(&msg) { if let Ok(text) = serialize_outbound(&msg) {
if ws_sender.send(WsMessage::Text(text.into())).await.is_err() { if ws_sender.send(WsMessage::Text(text.into())).await.is_err() {
#[cfg(debug_assertions)]
tracing::debug!(session_id = %session_id_for_sender, "WebSocket send error");
break; break;
} }
} }
} }
}); });
// Main loop: receive WebSocket messages and forward to CliChatChannel
while let Some(msg) = ws_receiver.next().await { while let Some(msg) = ws_receiver.next().await {
match msg { match msg {
Ok(WsMessage::Text(text)) => { Ok(WsMessage::Text(text)) => {
let text = text.to_string(); state.cli_chat_channel.handle_inbound(client.clone(), &text).await;
match parse_inbound(&text) {
Ok(inbound) => {
if let Err(e) = handle_inbound(&state, &session, &mut current_session_id, inbound).await {
tracing::warn!(error = %e, session_id = %current_session_id, "Failed to handle inbound message");
let _ = session
.lock()
.await
.send(WsOutbound::Error {
code: "SESSION_ERROR".to_string(),
message: e.to_string(),
})
.await;
}
}
Err(e) => {
tracing::warn!(error = %e, "Failed to parse inbound message");
let _ = session
.lock()
.await
.send(WsOutbound::Error {
code: "PARSE_ERROR".to_string(),
message: e.to_string(),
})
.await;
}
}
} }
Ok(WsMessage::Close(_)) | Err(_) => { Ok(WsMessage::Close(_)) | Err(_) => {
#[cfg(debug_assertions)] tracing::debug!(session_id = %session_id, "WebSocket closed");
tracing::debug!(session_id = %runtime_session_id, "WebSocket closed");
break; break;
} }
_ => {} _ => {}
} }
} }
tracing::info!(session_id = %runtime_session_id, current_session_id = %current_session_id, "CLI session ended"); tracing::info!(session_id = %session_id, "CLI session ended");
}
fn to_session_summary(record: crate::storage::SessionRecord) -> SessionSummary {
SessionSummary {
session_id: record.id,
title: record.title,
channel_name: record.channel_name,
chat_id: record.chat_id,
message_count: record.message_count,
last_active_at: record.last_active_at,
archived_at: record.archived_at,
}
}
async fn handle_inbound(
state: &Arc<GatewayState>,
session: &Arc<Mutex<Session>>,
current_session_id: &mut String,
inbound: WsInbound,
) -> Result<(), crate::agent::AgentError> {
match inbound {
WsInbound::UserInput { content, chat_id, .. } => {
let chat_id = chat_id.unwrap_or_else(|| current_session_id.clone());
let mut session_guard = session.lock().await;
session_guard.ensure_persistent_session(&chat_id)?;
session_guard.ensure_chat_loaded(&chat_id)?;
if let Some(command_response) = handle_in_chat_command(&mut session_guard, &chat_id, &content)? {
let _ = session_guard
.send(WsOutbound::AssistantResponse {
id: uuid::Uuid::new_v4().to_string(),
content: command_response,
role: "assistant".to_string(),
})
.await;
return Ok(());
}
let user_message = session_guard.create_user_message(&content, Vec::new());
session_guard.append_persisted_message(&chat_id, user_message)?;
let raw_history = session_guard.get_or_create_history(&chat_id).clone();
let history = match session_guard
.compressor()
.compress_if_needed(raw_history)
.await
{
Ok(history) => history,
Err(error) => {
tracing::warn!(chat_id = %chat_id, error = %error, "Compression failed, using original history");
session_guard.get_or_create_history(&chat_id).clone()
}
};
let agent = session_guard.create_agent()?;
match agent.process(history).await {
Ok(result) => {
session_guard.append_persisted_messages(&chat_id, result.emitted_messages.clone())?;
let _ = session_guard
.send(WsOutbound::AssistantResponse {
id: result.final_response.id,
content: result.final_response.content,
role: result.final_response.role,
})
.await;
}
Err(error) => {
tracing::error!(chat_id = %chat_id, error = %error, "Agent process error");
let _ = session_guard
.send(WsOutbound::Error {
code: "LLM_ERROR".to_string(),
message: error.to_string(),
})
.await;
}
}
Ok(())
}
WsInbound::ClearHistory { session_id, chat_id } => {
let target = session_id.or(chat_id).unwrap_or_else(|| current_session_id.clone());
state.session_manager.clear_session_messages(&target)?;
let mut session_guard = session.lock().await;
session_guard.remove_history(&target);
let _ = session_guard
.send(WsOutbound::HistoryCleared {
session_id: target,
})
.await;
Ok(())
}
WsInbound::CreateSession { title } => {
let record = state.session_manager.create_cli_session(title.as_deref())?;
*current_session_id = record.id.clone();
let mut session_guard = session.lock().await;
session_guard.ensure_chat_loaded(&record.id)?;
let _ = session_guard
.send(WsOutbound::SessionCreated {
session_id: record.id,
title: record.title,
})
.await;
Ok(())
}
WsInbound::ListSessions { include_archived } => {
let records = state.session_manager.list_cli_sessions(include_archived)?;
let summaries = records.into_iter().map(to_session_summary).collect();
let session_guard = session.lock().await;
let _ = session_guard
.send(WsOutbound::SessionList {
sessions: summaries,
current_session_id: Some(current_session_id.clone()),
})
.await;
Ok(())
}
WsInbound::LoadSession { session_id } => {
let Some(record) = state.session_manager.get_session_record(&session_id)? else {
let session_guard = session.lock().await;
let _ = session_guard
.send(WsOutbound::Error {
code: "SESSION_NOT_FOUND".to_string(),
message: format!("Session not found: {}", session_id),
})
.await;
return Ok(());
};
*current_session_id = record.id.clone();
let mut session_guard = session.lock().await;
session_guard.ensure_chat_loaded(&record.id)?;
let _ = session_guard
.send(WsOutbound::SessionLoaded {
session_id: record.id,
title: record.title,
message_count: record.message_count,
})
.await;
Ok(())
}
WsInbound::RenameSession { session_id, title } => {
let target = session_id.unwrap_or_else(|| current_session_id.clone());
state.session_manager.rename_session(&target, &title)?;
let session_guard = session.lock().await;
let _ = session_guard
.send(WsOutbound::SessionRenamed {
session_id: target,
title,
})
.await;
Ok(())
}
WsInbound::ArchiveSession { session_id } => {
let target = session_id.unwrap_or_else(|| current_session_id.clone());
state.session_manager.archive_session(&target)?;
let session_guard = session.lock().await;
let _ = session_guard
.send(WsOutbound::SessionArchived { session_id: target })
.await;
Ok(())
}
WsInbound::DeleteSession { session_id } => {
let target = session_id.unwrap_or_else(|| current_session_id.clone());
state.session_manager.delete_session(&target)?;
let replacement = if target == *current_session_id {
Some(state.session_manager.create_cli_session(None)?)
} else {
None
};
let mut session_guard = session.lock().await;
session_guard.remove_history(&target);
let _ = session_guard
.send(WsOutbound::SessionDeleted {
session_id: target.clone(),
})
.await;
if let Some(record) = replacement {
*current_session_id = record.id.clone();
session_guard.ensure_chat_loaded(&record.id)?;
let _ = session_guard
.send(WsOutbound::SessionCreated {
session_id: record.id,
title: record.title,
})
.await;
}
Ok(())
}
WsInbound::Ping => {
let session_guard = session.lock().await;
let _ = session_guard.send(WsOutbound::Pong).await;
Ok(())
}
}
} }

View File

@ -5,7 +5,7 @@ use clap::{Parser, CommandFactory};
#[command(about = "A CLI chatbot", long_about = None)] #[command(about = "A CLI chatbot", long_about = None)]
enum Command { enum Command {
/// Connect to gateway /// Connect to gateway
Agent { Chat {
/// Gateway WebSocket URL (e.g., ws://127.0.0.1:19876/ws) /// Gateway WebSocket URL (e.g., ws://127.0.0.1:19876/ws)
#[arg(long)] #[arg(long)]
gateway_url: Option<String>, gateway_url: Option<String>,
@ -33,7 +33,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
} }
match Command::parse() { match Command::parse() {
Command::Agent { gateway_url } => { Command::Chat { gateway_url } => {
let config = picobot::config::Config::load_default().ok(); let config = picobot::config::Config::load_default().ok();
let url = gateway_url let url = gateway_url
.or_else(|| config.as_ref().map(|c| c.client.gateway_url.clone())) .or_else(|| config.as_ref().map(|c| c.client.gateway_url.clone()))

View File

@ -127,7 +127,7 @@ impl SessionStore {
INSERT INTO sessions ( INSERT INTO sessions (
id, title, channel_name, chat_id, summary, id, title, channel_name, chat_id, summary,
created_at, updated_at, last_active_at, archived_at, deleted_at, message_count created_at, updated_at, last_active_at, archived_at, deleted_at, message_count
) VALUES (?1, ?2, 'cli', ?3, NULL, ?4, ?4, ?4, NULL, NULL, 0) ) VALUES (?1, ?2, 'cli_chat', ?3, NULL, ?4, ?4, ?4, NULL, NULL, 0)
", ",
params![id, title, id, now], params![id, title, id, now],
)?; )?;
@ -344,7 +344,7 @@ impl SessionStore {
} }
pub fn persistent_session_id(channel_name: &str, chat_id: &str) -> String { pub fn persistent_session_id(channel_name: &str, chat_id: &str) -> String {
if channel_name == "cli" { if channel_name == "cli" || channel_name == "cli_chat" {
chat_id.to_string() chat_id.to_string()
} else { } else {
format!("{}:{}", channel_name, chat_id) format!("{}:{}", channel_name, chat_id)
@ -482,6 +482,7 @@ mod tests {
#[test] #[test]
fn test_persistent_session_id_for_cli_and_channel() { fn test_persistent_session_id_for_cli_and_channel() {
assert_eq!(persistent_session_id("cli", "abc"), "abc"); assert_eq!(persistent_session_id("cli", "abc"), "abc");
assert_eq!(persistent_session_id("cli_chat", "abc"), "abc");
assert_eq!(persistent_session_id("feishu", "abc"), "feishu:abc"); assert_eq!(persistent_session_id("feishu", "abc"), "feishu:abc");
} }
@ -491,7 +492,7 @@ mod tests {
let session = store.create_cli_session(Some("demo")).unwrap(); let session = store.create_cli_session(Some("demo")).unwrap();
assert_eq!(session.title, "demo"); assert_eq!(session.title, "demo");
assert_eq!(session.channel_name, "cli"); assert_eq!(session.channel_name, "cli_chat");
assert_eq!(session.chat_id, session.id); assert_eq!(session.chat_id, session.id);
assert_eq!(session.message_count, 0); assert_eq!(session.message_count, 0);
assert_eq!(session.reset_cutoff_seq, 0); assert_eq!(session.reset_cutoff_seq, 0);
@ -521,10 +522,10 @@ mod tests {
let archived = store.get_session(&session.id).unwrap().unwrap(); let archived = store.get_session(&session.id).unwrap().unwrap();
assert!(archived.archived_at.is_some()); assert!(archived.archived_at.is_some());
let active_only = store.list_sessions("cli", false).unwrap(); let active_only = store.list_sessions("cli_chat", false).unwrap();
assert!(active_only.is_empty()); assert!(active_only.is_empty());
let including_archived = store.list_sessions("cli", true).unwrap(); let including_archived = store.list_sessions("cli_chat", true).unwrap();
assert_eq!(including_archived.len(), 1); assert_eq!(including_archived.len(), 1);
store.clear_messages(&session.id).unwrap(); store.clear_messages(&session.id).unwrap();