/// 守护进程模块 /// /// 实现 TCP Socket 服务器,接收客户端请求并执行 TTS 语音合成和播放 /// 类似 Docker daemon 模式,作为后台服务运行 use anyhow::{Context, Result}; use serde::{Deserialize, Serialize}; use std::fs; use std::io::Write; use std::path::PathBuf; use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader}; use tokio::net::TcpListener; use tokio::sync::mpsc; use tiny_http; /// 客户端请求结构 #[derive(Debug, Deserialize)] pub struct DaemonRequest { /// 要合成的文本 pub text: String, /// 音色名称(可选,默认 mimo_default) #[serde(skip_serializing_if = "Option::is_none")] pub voice: Option, /// 音频格式(可选,默认 wav) #[serde(skip_serializing_if = "Option::is_none")] pub format: Option, /// 风格描述(可选,宏观场景风格) #[serde(skip_serializing_if = "Option::is_none")] pub style: Option, } /// 服务端响应结构 #[derive(Debug, Serialize)] pub struct DaemonResponse { /// 状态:ok 或 error pub status: String, /// 消息 pub message: String, } /// 获取配置目录(所有平台统一使用 ~/.config/tts/) fn get_config_dir() -> Result { let home = dirs::home_dir().context("无法获取用户家目录")?; let config_dir = home.join(".config").join("tts"); // 确保目录存在 if !config_dir.exists() { fs::create_dir_all(&config_dir) .with_context(|| format!("无法创建配置目录: {:?}", config_dir))?; } Ok(config_dir) } /// 获取 PID 文件路径 fn get_pid_file_path() -> Result { Ok(get_config_dir()?.join("ttsd.pid")) } /// 获取日志文件路径 fn get_log_file_path() -> Result { Ok(get_config_dir()?.join("ttsd.log")) } /// 获取 socket 文件路径(预留,未来可选 Unix Socket) #[allow(dead_code)] fn get_socket_path() -> Result { Ok(get_config_dir()?.join("ttsd.sock")) } /// 日志级别 #[derive(Debug, PartialEq)] enum LogLevel { Info, Warn, Error, } impl LogLevel { fn from_message(msg: &str) -> Self { let lower = msg.to_lowercase(); if lower.contains("error") || lower.contains("失败") || lower.contains("无法") { LogLevel::Error } else if lower.contains("warning") || lower.contains("警告") || lower.contains("注意") { LogLevel::Warn } else { LogLevel::Info } } fn as_str(&self) -> &str { match self { LogLevel::Info => "INFO", LogLevel::Warn => "WARN", LogLevel::Error => "ERROR", } } } /// 写入日志 fn write_log(message: &str) -> Result<()> { let log_path = get_log_file_path()?; let now = std::time::SystemTime::now(); let datetime: chrono::DateTime = now.into(); let timestamp = datetime.format("%Y-%m-%d %H:%M:%S"); // 自动检测日志级别 let level = LogLevel::from_message(message); // 获取当前 PID let pid = std::process::id(); // 新格式:[时间戳] [级别] [PID] 消息 let log_line = format!("[{}] [{}] [{}] {}\n", timestamp, level.as_str(), pid, message); let mut file = fs::OpenOptions::new() .create(true) .append(true) .open(&log_path) .with_context(|| format!("无法打开日志文件: {:?}", log_path))?; file.write_all(log_line.as_bytes()) .with_context(|| format!("无法写入日志文件: {:?}", log_path))?; // 简化的 stdout 输出 if level == LogLevel::Error { eprintln!("[Daemon {}] {}", pid, message); } else { println!("[Daemon {}] {}", pid, message); } Ok(()) } /// 检查守护进程是否正在运行 pub fn is_running() -> bool { let pid_path = match get_pid_file_path() { Ok(p) => p, Err(_) => return false, }; if !pid_path.exists() { return false; } let pid_content = match fs::read_to_string(&pid_path) { Ok(s) => s, Err(_) => return false, }; let pid: u32 = match pid_content.trim().parse() { Ok(p) => p, Err(_) => { // PID 文件内容无效,删除它 let _ = fs::remove_file(&pid_path); return false; } }; // 检查进程是否存在(跨平台) #[cfg(unix)] { use std::process::Command; let output = Command::new("kill") .args(["-0", &pid.to_string()]) .output(); match output { Ok(o) => o.status.success(), Err(_) => false, } } #[cfg(windows)] { use std::process::Command; let output = Command::new("tasklist") .args(["/FI", &format!("PID eq {}", pid), "/NH"]) .output(); match output { Ok(o) => { let stdout = String::from_utf8_lossy(&o.stdout); stdout.contains(&pid.to_string()) } Err(_) => false, } } } /// 启动守护进程(纯循环,由调用者决定是否后台运行) pub async fn start_daemon(port: u16) -> Result<()> { // 检查是否已经在运行 if is_running() { return Err(anyhow::anyhow!("守护进程已经在运行中")); } let addr = format!("127.0.0.1:{}", port); write_log(&format!("正在启动守护进程,监听地址: {}", addr))?; // 创建 TCP 监听器 let listener = TcpListener::bind(&addr) .await .with_context(|| format!("无法绑定地址: {}", addr))?; write_log(&format!("守护进程已启动,监听: {}", addr))?; // 写入 PID 文件 let pid = std::process::id(); let pid_path = get_pid_file_path()?; fs::write(&pid_path, pid.to_string()) .with_context(|| format!("无法写入 PID 文件: {:?}", pid_path))?; write_log(&format!("PID: {}, PID 文件: {:?}", pid, pid_path))?; // 启动 HTTP 服务器(在独立线程中运行,用于调试接口) let http_port = port + 1; std::thread::spawn(move || { if let Err(e) = start_http_server(http_port) { eprintln!("HTTP 服务器启动失败: {:#}", e); } }); // 创建通道用于停止信号 let (tx, mut rx) = mpsc::channel::<()>(1); // 处理 Ctrl+C tokio::spawn(async move { if let Ok(()) = tokio::signal::ctrl_c().await { write_log("收到停止信号,正在关闭...").ok(); let _ = tx.send(()).await; } }); // 主循环:接受连接 loop { tokio::select! { // 接受新连接 accept_result = listener.accept() => { match accept_result { Ok((stream, addr)) => { write_log(&format!("新连接来自: {}", addr))?; // 为每个连接创建新任务 tokio::spawn(async move { if let Err(e) = handle_client(stream).await { let _ = write_log(&format!("处理连接失败: {:#}", e)); } }); } Err(e) => { write_log(&format!("接受连接失败: {}", e)).ok(); } } } // 收到停止信号 _ = rx.recv() => { write_log("正在停止守护进程...").ok(); break; } } } // 清理 PID 文件 let _ = fs::remove_file(&pid_path); write_log("守护进程已停止").ok(); Ok(()) } /// 处理单个客户端连接 async fn handle_client(stream: tokio::net::TcpStream) -> Result<()> { let (reader, mut writer) = stream.into_split(); let mut reader = BufReader::new(reader); let mut line = String::new(); // 读取一行 JSON 请求 let bytes_read = reader.read_line(&mut line).await?; if bytes_read == 0 { return Err(anyhow::anyhow!("连接已关闭")); } write_log(&format!("收到请求: {}", line.trim()))?; // 解析请求 let response = match serde_json::from_str::(&line) { Ok(request) => { // 处理 TTS 请求 match process_tts_request(request).await { Ok(msg) => DaemonResponse { status: "ok".to_string(), message: msg, }, Err(e) => { write_log(&format!("TTS 处理失败: {:#}", e)).ok(); DaemonResponse { status: "error".to_string(), message: format!("{:#}", e), } } } } Err(e) => { write_log(&format!("解析请求失败: {}", e)).ok(); DaemonResponse { status: "error".to_string(), message: format!("无效的请求格式: {}", e), } } }; // 发送响应 let response_json = serde_json::to_string(&response)?; writer.write_all(response_json.as_bytes()).await?; writer.write_all(b"\n").await?; writer.flush().await?; write_log(&format!("响应: {}", response_json))?; Ok(()) } /// 处理 TTS 请求 async fn process_tts_request(request: DaemonRequest) -> Result { let text = request.text; let voice = request.voice.unwrap_or_else(|| "mimo_default".to_string()); let format = request.format.unwrap_or_else(|| "wav".to_string()); let style = request.style; write_log(&format!("处理 TTS: text={}, voice={}, format={}, style={:?}", &text[..text.len().min(50)], voice, format, style))?; // 处理风格标签 let mut final_text = text.clone(); if let Some(style_value) = &style { // 按官方文档,使用 标签 final_text = format!("{}", style_value, final_text); write_log(&format!("添加风格标签: {}", style_value))?; } // 加载配置 let config_manager = crate::config::ConfigManager::new() .context("无法加载配置")?; let config = config_manager.get_config(); // 检查 API Key if config.api_key.is_empty() { return Err(anyhow::anyhow!( "API Key 未设置,请使用: mimo-tts config set --api-key " )); } // 创建 TTS 客户端 let client = crate::api::TtsClient::builder() .base_url(config.base_url.clone()) .api_key(config.api_key.clone()) .build() .context("无法创建 TTS 客户端")?; // 构建请求 let mut builder = crate::api::TtsRequest::builder() .audio(crate::api::AudioConfig { format: format.clone(), voice: voice.clone(), }); // 添加 assistant 消息(实际要合成的文本) builder = builder.add_message(crate::api::Message { role: "assistant".to_string(), content: final_text.clone(), }); let tts_request = builder.build(); // 调用 API 合成语音 write_log("正在调用 TTS API...")?; let audio_data = client .synthesize_with_request(&tts_request) .await .context("语音合成失败")?; write_log(&format!("TTS 成功,音频数据大小: {} 字节", audio_data.len()))?; // 播放音频 write_log("正在播放音频...")?; crate::play_audio(&audio_data)?; Ok("播放完成".to_string()) } /// 停止守护进程 pub fn stop_daemon() -> Result<()> { if !is_running() { return Err(anyhow::anyhow!("守护进程未运行")); } let pid_path = get_pid_file_path()?; let pid_content = fs::read_to_string(&pid_path) .with_context(|| format!("无法读取 PID 文件: {:?}", pid_path))?; let pid: u32 = pid_content.trim().parse() .with_context(|| format!("无效的 PID: {}", pid_content))?; write_log(&format!("正在停止守护进程,PID: {}", pid))?; // 发送终止信号(跨平台) #[cfg(unix)] { use std::process::Command; Command::new("kill") .arg(pid.to_string()) .output() .with_context(|| format!("无法停止进程: {}", pid))?; } #[cfg(windows)] { use std::process::Command; Command::new("taskkill") .args(["/PID", &pid.to_string(), "/F"]) .output() .with_context(|| format!("无法停止进程: {}", pid))?; } // 等待进程停止 std::thread::sleep(std::time::Duration::from_secs(1)); // 清理 PID 文件 if pid_path.exists() { fs::remove_file(&pid_path) .with_context(|| format!("无法删除 PID 文件: {:?}", pid_path))?; } write_log("守护进程已停止")?; Ok(()) } /// 显示守护进程状态 pub fn show_status() -> Result<()> { if is_running() { let pid_path = get_pid_file_path()?; let pid_content = fs::read_to_string(&pid_path) .with_context(|| format!("无法读取 PID 文件: {:?}", pid_path))?; println!("守护进程状态: 运行中"); println!("PID: {}", pid_content.trim()); println!("日志文件: {:?}", get_log_file_path()?); } else { println!("守护进程状态: 未运行"); } Ok(()) } /// 显示守护进程日志 pub fn show_logs(lines: u32) -> Result<()> { let log_path = get_log_file_path()?; if !log_path.exists() { println!("日志文件不存在: {:?}", log_path); return Ok(()); } let content = fs::read_to_string(&log_path) .with_context(|| format!("无法读取日志文件: {:?}", log_path))?; let all_lines: Vec<&str> = content.lines().collect(); let total_lines = all_lines.len(); let start = if total_lines > lines as usize { total_lines - lines as usize } else { 0 }; println!("===== 守护进程日志 (最近 {} 行) =====", lines); println!("日志文件: {:?}\n", log_path); for line in &all_lines[start..] { println!("{}", line); } Ok(()) } /// 启动 HTTP 服务器(用于调试接口,Postman 测试) fn start_http_server(port: u16) -> Result<()> { let addr = format!("127.0.0.1:{}", port); let server = match tiny_http::Server::http(&addr) { Ok(s) => s, Err(e) => return Err(anyhow::anyhow!("无法启动 HTTP 服务器 {}: {}", addr, e)), }; println!("[HTTP] 服务器已启动: http://{}", addr); for mut request in server.incoming_requests() { let path = request.url().to_string(); // 健康检查 if path == "/health" || path == "/health/" { let response = tiny_http::Response::from_string("OK") .with_header(tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"text/plain"[..]).unwrap()); request.respond(response).ok(); continue; } // 合成接口 if path == "/synthesize" || path == "/synthesize/" { let body = request.as_reader(); let mut body_str = String::new(); body.read_to_string(&mut body_str).ok(); let response = match serde_json::from_str::(&body_str) { Ok(req) => { let text_preview = if req.text.len() > 30 { format!("{}...", &req.text[..30]) } else { req.text.clone() }; println!("[HTTP] 收到请求: text={}", text_preview); let resp = DaemonResponse { status: "ok".to_string(), message: "请求已接收".to_string(), }; tiny_http::Response::from_string(serde_json::to_string(&resp).unwrap()) } Err(e) => { let resp = DaemonResponse { status: "error".to_string(), message: format!("无效的请求: {}", e), }; tiny_http::Response::from_string(serde_json::to_string(&resp).unwrap()) .with_status_code(400) } }; let response = response .with_header(tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap()); request.respond(response).ok(); } else { let resp = DaemonResponse { status: "error".to_string(), message: format!("路由 {} 不存在", path), }; let response = tiny_http::Response::from_string(serde_json::to_string(&resp).unwrap()) .with_status_code(404) .with_header(tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap()); request.respond(response).ok(); } } Ok(()) }