feat: add FreeRTOS message bus (inbound + outbound queues)

mimi_msg_t carries channel/chat_id/content between tasks.
Decouples input channels from agent loop and output dispatch.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
crispyberry
2026-02-05 18:55:23 +08:00
parent 0ebd1ccb20
commit efb3e09929
2 changed files with 105 additions and 0 deletions

59
main/bus/message_bus.c Normal file
View File

@@ -0,0 +1,59 @@
#include "message_bus.h"
#include "mimi_config.h"
#include "esp_log.h"
#include <string.h>
static const char *TAG = "bus";
static QueueHandle_t s_inbound_queue;
static QueueHandle_t s_outbound_queue;
esp_err_t message_bus_init(void)
{
s_inbound_queue = xQueueCreate(MIMI_BUS_QUEUE_LEN, sizeof(mimi_msg_t));
s_outbound_queue = xQueueCreate(MIMI_BUS_QUEUE_LEN, sizeof(mimi_msg_t));
if (!s_inbound_queue || !s_outbound_queue) {
ESP_LOGE(TAG, "Failed to create message queues");
return ESP_ERR_NO_MEM;
}
ESP_LOGI(TAG, "Message bus initialized (queue depth %d)", MIMI_BUS_QUEUE_LEN);
return ESP_OK;
}
esp_err_t message_bus_push_inbound(const mimi_msg_t *msg)
{
if (xQueueSend(s_inbound_queue, msg, pdMS_TO_TICKS(1000)) != pdTRUE) {
ESP_LOGW(TAG, "Inbound queue full, dropping message");
return ESP_ERR_NO_MEM;
}
return ESP_OK;
}
esp_err_t message_bus_pop_inbound(mimi_msg_t *msg, uint32_t timeout_ms)
{
TickType_t ticks = (timeout_ms == UINT32_MAX) ? portMAX_DELAY : pdMS_TO_TICKS(timeout_ms);
if (xQueueReceive(s_inbound_queue, msg, ticks) != pdTRUE) {
return ESP_ERR_TIMEOUT;
}
return ESP_OK;
}
esp_err_t message_bus_push_outbound(const mimi_msg_t *msg)
{
if (xQueueSend(s_outbound_queue, msg, pdMS_TO_TICKS(1000)) != pdTRUE) {
ESP_LOGW(TAG, "Outbound queue full, dropping message");
return ESP_ERR_NO_MEM;
}
return ESP_OK;
}
esp_err_t message_bus_pop_outbound(mimi_msg_t *msg, uint32_t timeout_ms)
{
TickType_t ticks = (timeout_ms == UINT32_MAX) ? portMAX_DELAY : pdMS_TO_TICKS(timeout_ms);
if (xQueueReceive(s_outbound_queue, msg, ticks) != pdTRUE) {
return ESP_ERR_TIMEOUT;
}
return ESP_OK;
}

46
main/bus/message_bus.h Normal file
View File

@@ -0,0 +1,46 @@
#pragma once
#include "esp_err.h"
#include "freertos/FreeRTOS.h"
#include "freertos/queue.h"
/* Channel identifiers */
#define MIMI_CHAN_TELEGRAM "telegram"
#define MIMI_CHAN_WEBSOCKET "websocket"
#define MIMI_CHAN_CLI "cli"
/* Message types on the bus */
typedef struct {
char channel[16]; /* "telegram", "websocket", "cli" */
char chat_id[32]; /* Telegram chat_id or WS client id */
char *content; /* Heap-allocated message text (caller must free) */
} mimi_msg_t;
/**
* Initialize the message bus (inbound + outbound FreeRTOS queues).
*/
esp_err_t message_bus_init(void);
/**
* Push a message to the inbound queue (towards Agent Loop).
* The bus takes ownership of msg->content.
*/
esp_err_t message_bus_push_inbound(const mimi_msg_t *msg);
/**
* Pop a message from the inbound queue (blocking).
* Caller must free msg->content when done.
*/
esp_err_t message_bus_pop_inbound(mimi_msg_t *msg, uint32_t timeout_ms);
/**
* Push a message to the outbound queue (towards channels).
* The bus takes ownership of msg->content.
*/
esp_err_t message_bus_push_outbound(const mimi_msg_t *msg);
/**
* Pop a message from the outbound queue (blocking).
* Caller must free msg->content when done.
*/
esp_err_t message_bus_pop_outbound(mimi_msg_t *msg, uint32_t timeout_ms);