Files
haibao-tts-cli/src/daemon.rs
Z.To ceeb6a3c31 fix: 修复守护进程日志中文截取 panic
- daemon.rs: 使用字符索引替代字节索引截取日志文本
- agents.md: 添加踩坑记录
2026-04-25 07:38:26 +08:00

556 lines
17 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/// 守护进程模块
///
/// 实现 TCP Socket 服务器,接收客户端请求并执行 TTS 语音合成和播放
/// 类似 Docker daemon 模式,作为后台服务运行
use anyhow::{Context, Result};
use serde::{Deserialize, Serialize};
use std::fs;
use std::io::Write;
use std::path::PathBuf;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tiny_http;
/// 客户端请求结构
#[derive(Debug, Deserialize)]
pub struct DaemonRequest {
/// 要合成的文本
pub text: String,
/// 音色名称(可选,默认 mimo_default
#[serde(skip_serializing_if = "Option::is_none")]
pub voice: Option<String>,
/// 音频格式(可选,默认 wav
#[serde(skip_serializing_if = "Option::is_none")]
pub format: Option<String>,
/// 风格描述(可选,宏观场景风格)
#[serde(skip_serializing_if = "Option::is_none")]
pub style: Option<String>,
}
/// 服务端响应结构
#[derive(Debug, Serialize)]
pub struct DaemonResponse {
/// 状态ok 或 error
pub status: String,
/// 消息
pub message: String,
}
/// 获取配置目录(所有平台统一使用 ~/.config/tts/
fn get_config_dir() -> Result<PathBuf> {
let home = dirs::home_dir().context("无法获取用户家目录")?;
let config_dir = home.join(".config").join("tts");
// 确保目录存在
if !config_dir.exists() {
fs::create_dir_all(&config_dir)
.with_context(|| format!("无法创建配置目录: {:?}", config_dir))?;
}
Ok(config_dir)
}
/// 获取 PID 文件路径
fn get_pid_file_path() -> Result<PathBuf> {
Ok(get_config_dir()?.join("ttsd.pid"))
}
/// 获取日志文件路径
fn get_log_file_path() -> Result<PathBuf> {
Ok(get_config_dir()?.join("ttsd.log"))
}
/// 获取 socket 文件路径(预留,未来可选 Unix Socket
#[allow(dead_code)]
fn get_socket_path() -> Result<PathBuf> {
Ok(get_config_dir()?.join("ttsd.sock"))
}
/// 日志级别
#[derive(Debug, PartialEq)]
enum LogLevel {
Info,
Warn,
Error,
}
impl LogLevel {
fn from_message(msg: &str) -> Self {
let lower = msg.to_lowercase();
if lower.contains("error") || lower.contains("失败") || lower.contains("无法") {
LogLevel::Error
} else if lower.contains("warning") || lower.contains("警告") || lower.contains("注意") {
LogLevel::Warn
} else {
LogLevel::Info
}
}
fn as_str(&self) -> &str {
match self {
LogLevel::Info => "INFO",
LogLevel::Warn => "WARN",
LogLevel::Error => "ERROR",
}
}
}
/// 写入日志
fn write_log(message: &str) -> Result<()> {
let log_path = get_log_file_path()?;
let now = std::time::SystemTime::now();
let datetime: chrono::DateTime<chrono::Local> = now.into();
let timestamp = datetime.format("%Y-%m-%d %H:%M:%S");
// 自动检测日志级别
let level = LogLevel::from_message(message);
// 获取当前 PID
let pid = std::process::id();
// 新格式:[时间戳] [级别] [PID] 消息
let log_line = format!("[{}] [{}] [{}] {}\n", timestamp, level.as_str(), pid, message);
let mut file = fs::OpenOptions::new()
.create(true)
.append(true)
.open(&log_path)
.with_context(|| format!("无法打开日志文件: {:?}", log_path))?;
file.write_all(log_line.as_bytes())
.with_context(|| format!("无法写入日志文件: {:?}", log_path))?;
// 简化的 stdout 输出
if level == LogLevel::Error {
eprintln!("[Daemon {}] {}", pid, message);
} else {
println!("[Daemon {}] {}", pid, message);
}
Ok(())
}
/// 检查守护进程是否正在运行
pub fn is_running() -> bool {
let pid_path = match get_pid_file_path() {
Ok(p) => p,
Err(_) => return false,
};
if !pid_path.exists() {
return false;
}
let pid_content = match fs::read_to_string(&pid_path) {
Ok(s) => s,
Err(_) => return false,
};
let pid: u32 = match pid_content.trim().parse() {
Ok(p) => p,
Err(_) => {
// PID 文件内容无效,删除它
let _ = fs::remove_file(&pid_path);
return false;
}
};
// 检查进程是否存在(跨平台)
#[cfg(unix)]
{
use std::process::Command;
let output = Command::new("kill")
.args(["-0", &pid.to_string()])
.output();
match output {
Ok(o) => o.status.success(),
Err(_) => false,
}
}
#[cfg(windows)]
{
use std::process::Command;
let output = Command::new("tasklist")
.args(["/FI", &format!("PID eq {}", pid), "/NH"])
.output();
match output {
Ok(o) => {
let stdout = String::from_utf8_lossy(&o.stdout);
stdout.contains(&pid.to_string())
}
Err(_) => false,
}
}
}
/// 启动守护进程(纯循环,由调用者决定是否后台运行)
pub async fn start_daemon(port: u16) -> Result<()> {
// 检查是否已经在运行
if is_running() {
return Err(anyhow::anyhow!("守护进程已经在运行中"));
}
let addr = format!("127.0.0.1:{}", port);
write_log(&format!("正在启动守护进程,监听地址: {}", addr))?;
// 创建 TCP 监听器
let listener = TcpListener::bind(&addr)
.await
.with_context(|| format!("无法绑定地址: {}", addr))?;
write_log(&format!("守护进程已启动,监听: {}", addr))?;
// 写入 PID 文件
let pid = std::process::id();
let pid_path = get_pid_file_path()?;
fs::write(&pid_path, pid.to_string())
.with_context(|| format!("无法写入 PID 文件: {:?}", pid_path))?;
write_log(&format!("PID: {}, PID 文件: {:?}", pid, pid_path))?;
// 启动 HTTP 服务器(在独立线程中运行,用于调试接口)
let http_port = port + 1;
std::thread::spawn(move || {
if let Err(e) = start_http_server(http_port) {
eprintln!("HTTP 服务器启动失败: {:#}", e);
}
});
// 创建通道用于停止信号
let (tx, mut rx) = mpsc::channel::<()>(1);
// 处理 Ctrl+C
tokio::spawn(async move {
if let Ok(()) = tokio::signal::ctrl_c().await {
write_log("收到停止信号,正在关闭...").ok();
let _ = tx.send(()).await;
}
});
// 主循环:接受连接
loop {
tokio::select! {
// 接受新连接
accept_result = listener.accept() => {
match accept_result {
Ok((stream, addr)) => {
write_log(&format!("新连接来自: {}", addr))?;
// 为每个连接创建新任务
tokio::spawn(async move {
if let Err(e) = handle_client(stream).await {
let _ = write_log(&format!("处理连接失败: {:#}", e));
}
});
}
Err(e) => {
write_log(&format!("接受连接失败: {}", e)).ok();
}
}
}
// 收到停止信号
_ = rx.recv() => {
write_log("正在停止守护进程...").ok();
break;
}
}
}
// 清理 PID 文件
let _ = fs::remove_file(&pid_path);
write_log("守护进程已停止").ok();
Ok(())
}
/// 处理单个客户端连接
async fn handle_client(stream: tokio::net::TcpStream) -> Result<()> {
let (reader, mut writer) = stream.into_split();
let mut reader = BufReader::new(reader);
let mut line = String::new();
// 读取一行 JSON 请求
let bytes_read = reader.read_line(&mut line).await?;
if bytes_read == 0 {
return Err(anyhow::anyhow!("连接已关闭"));
}
write_log(&format!("收到请求: {}", line.trim()))?;
// 解析请求
let response = match serde_json::from_str::<DaemonRequest>(&line) {
Ok(request) => {
// 处理 TTS 请求
match process_tts_request(request).await {
Ok(msg) => DaemonResponse {
status: "ok".to_string(),
message: msg,
},
Err(e) => {
write_log(&format!("TTS 处理失败: {:#}", e)).ok();
DaemonResponse {
status: "error".to_string(),
message: format!("{:#}", e),
}
}
}
}
Err(e) => {
write_log(&format!("解析请求失败: {}", e)).ok();
DaemonResponse {
status: "error".to_string(),
message: format!("无效的请求格式: {}", e),
}
}
};
// 发送响应
let response_json = serde_json::to_string(&response)?;
writer.write_all(response_json.as_bytes()).await?;
writer.write_all(b"\n").await?;
writer.flush().await?;
write_log(&format!("响应: {}", response_json))?;
Ok(())
}
/// 处理 TTS 请求
async fn process_tts_request(request: DaemonRequest) -> Result<String> {
let text = request.text;
let voice = request.voice.unwrap_or_else(|| "mimo_default".to_string());
let format = request.format.unwrap_or_else(|| "wav".to_string());
let style = request.style;
let text_preview: String = text.chars().take(50).collect();
write_log(&format!("处理 TTS: text={}, voice={}, format={}, style={:?}",
text_preview, voice, format, style))?;
// 处理风格标签
let mut final_text = text.clone();
if let Some(style_value) = &style {
// 按官方文档,使用 <style>...</style> 标签
final_text = format!("<style>{}</style>{}", style_value, final_text);
write_log(&format!("添加风格标签: {}", style_value))?;
}
// 加载配置
let config_manager = crate::config::ConfigManager::new()
.context("无法加载配置")?;
let config = config_manager.get_config();
// 检查 API Key
if config.api_key.is_empty() {
return Err(anyhow::anyhow!(
"API Key 未设置,请使用: mimo-tts config set --api-key <YOUR_API_KEY>"
));
}
// 创建 TTS 客户端
let client = crate::api::TtsClient::builder()
.base_url(config.base_url.clone())
.api_key(config.api_key.clone())
.build()
.context("无法创建 TTS 客户端")?;
// 构建请求
let mut builder = crate::api::TtsRequest::builder()
.audio(crate::api::AudioConfig {
format: format.clone(),
voice: voice.clone(),
});
// 添加 assistant 消息(实际要合成的文本)
builder = builder.add_message(crate::api::Message {
role: "assistant".to_string(),
content: final_text.clone(),
});
let tts_request = builder.build();
// 调用 API 合成语音
write_log("正在调用 TTS API...")?;
let audio_data = client
.synthesize_with_request(&tts_request)
.await
.context("语音合成失败")?;
write_log(&format!("TTS 成功,音频数据大小: {} 字节", audio_data.len()))?;
// 播放音频
write_log("正在播放音频...")?;
crate::play_audio(&audio_data)?;
Ok("播放完成".to_string())
}
/// 停止守护进程
pub fn stop_daemon() -> Result<()> {
if !is_running() {
return Err(anyhow::anyhow!("守护进程未运行"));
}
let pid_path = get_pid_file_path()?;
let pid_content = fs::read_to_string(&pid_path)
.with_context(|| format!("无法读取 PID 文件: {:?}", pid_path))?;
let pid: u32 = pid_content.trim().parse()
.with_context(|| format!("无效的 PID: {}", pid_content))?;
write_log(&format!("正在停止守护进程PID: {}", pid))?;
// 发送终止信号(跨平台)
#[cfg(unix)]
{
use std::process::Command;
Command::new("kill")
.arg(pid.to_string())
.output()
.with_context(|| format!("无法停止进程: {}", pid))?;
}
#[cfg(windows)]
{
use std::process::Command;
Command::new("taskkill")
.args(["/PID", &pid.to_string(), "/F"])
.output()
.with_context(|| format!("无法停止进程: {}", pid))?;
}
// 等待进程停止
std::thread::sleep(std::time::Duration::from_secs(1));
// 清理 PID 文件
if pid_path.exists() {
fs::remove_file(&pid_path)
.with_context(|| format!("无法删除 PID 文件: {:?}", pid_path))?;
}
write_log("守护进程已停止")?;
Ok(())
}
/// 显示守护进程状态
pub fn show_status() -> Result<()> {
if is_running() {
let pid_path = get_pid_file_path()?;
let pid_content = fs::read_to_string(&pid_path)
.with_context(|| format!("无法读取 PID 文件: {:?}", pid_path))?;
println!("守护进程状态: 运行中");
println!("PID: {}", pid_content.trim());
println!("日志文件: {:?}", get_log_file_path()?);
} else {
println!("守护进程状态: 未运行");
}
Ok(())
}
/// 显示守护进程日志
pub fn show_logs(lines: u32) -> Result<()> {
let log_path = get_log_file_path()?;
if !log_path.exists() {
println!("日志文件不存在: {:?}", log_path);
return Ok(());
}
let content = fs::read_to_string(&log_path)
.with_context(|| format!("无法读取日志文件: {:?}", log_path))?;
let all_lines: Vec<&str> = content.lines().collect();
let total_lines = all_lines.len();
let start = if total_lines > lines as usize {
total_lines - lines as usize
} else {
0
};
println!("===== 守护进程日志 (最近 {} 行) =====", lines);
println!("日志文件: {:?}\n", log_path);
for line in &all_lines[start..] {
println!("{}", line);
}
Ok(())
}
/// 启动 HTTP 服务器用于调试接口Postman 测试)
fn start_http_server(port: u16) -> Result<()> {
let addr = format!("127.0.0.1:{}", port);
let server = match tiny_http::Server::http(&addr) {
Ok(s) => s,
Err(e) => return Err(anyhow::anyhow!("无法启动 HTTP 服务器 {}: {}", addr, e)),
};
println!("[HTTP] 服务器已启动: http://{}", addr);
for mut request in server.incoming_requests() {
let path = request.url().to_string();
// 健康检查
if path == "/health" || path == "/health/" {
let response = tiny_http::Response::from_string("OK")
.with_header(tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"text/plain"[..]).unwrap());
request.respond(response).ok();
continue;
}
// 合成接口
if path == "/synthesize" || path == "/synthesize/" {
let body = request.as_reader();
let mut body_str = String::new();
body.read_to_string(&mut body_str).ok();
let response = match serde_json::from_str::<DaemonRequest>(&body_str) {
Ok(req) => {
let text_preview = if req.text.len() > 30 { format!("{}...", &req.text[..30]) } else { req.text.clone() };
println!("[HTTP] 收到请求: text={}", text_preview);
let resp = DaemonResponse {
status: "ok".to_string(),
message: "请求已接收".to_string(),
};
tiny_http::Response::from_string(serde_json::to_string(&resp).unwrap())
}
Err(e) => {
let resp = DaemonResponse {
status: "error".to_string(),
message: format!("无效的请求: {}", e),
};
tiny_http::Response::from_string(serde_json::to_string(&resp).unwrap())
.with_status_code(400)
}
};
let response = response
.with_header(tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap());
request.respond(response).ok();
} else {
let resp = DaemonResponse {
status: "error".to_string(),
message: format!("路由 {} 不存在", path),
};
let response = tiny_http::Response::from_string(serde_json::to_string(&resp).unwrap())
.with_status_code(404)
.with_header(tiny_http::Header::from_bytes(&b"Content-Type"[..], &b"application/json"[..]).unwrap());
request.respond(response).ok();
}
}
Ok(())
}