From efb3e09929d442b91f4eb10e878613a8cc3f6a5a Mon Sep 17 00:00:00 2001 From: crispyberry Date: Thu, 5 Feb 2026 18:55:23 +0800 Subject: [PATCH] feat: add FreeRTOS message bus (inbound + outbound queues) mimi_msg_t carries channel/chat_id/content between tasks. Decouples input channels from agent loop and output dispatch. Co-Authored-By: Claude Opus 4.5 --- main/bus/message_bus.c | 59 ++++++++++++++++++++++++++++++++++++++++++ main/bus/message_bus.h | 46 ++++++++++++++++++++++++++++++++ 2 files changed, 105 insertions(+) create mode 100644 main/bus/message_bus.c create mode 100644 main/bus/message_bus.h diff --git a/main/bus/message_bus.c b/main/bus/message_bus.c new file mode 100644 index 0000000..119b141 --- /dev/null +++ b/main/bus/message_bus.c @@ -0,0 +1,59 @@ +#include "message_bus.h" +#include "mimi_config.h" +#include "esp_log.h" +#include + +static const char *TAG = "bus"; + +static QueueHandle_t s_inbound_queue; +static QueueHandle_t s_outbound_queue; + +esp_err_t message_bus_init(void) +{ + s_inbound_queue = xQueueCreate(MIMI_BUS_QUEUE_LEN, sizeof(mimi_msg_t)); + s_outbound_queue = xQueueCreate(MIMI_BUS_QUEUE_LEN, sizeof(mimi_msg_t)); + + if (!s_inbound_queue || !s_outbound_queue) { + ESP_LOGE(TAG, "Failed to create message queues"); + return ESP_ERR_NO_MEM; + } + + ESP_LOGI(TAG, "Message bus initialized (queue depth %d)", MIMI_BUS_QUEUE_LEN); + return ESP_OK; +} + +esp_err_t message_bus_push_inbound(const mimi_msg_t *msg) +{ + if (xQueueSend(s_inbound_queue, msg, pdMS_TO_TICKS(1000)) != pdTRUE) { + ESP_LOGW(TAG, "Inbound queue full, dropping message"); + return ESP_ERR_NO_MEM; + } + return ESP_OK; +} + +esp_err_t message_bus_pop_inbound(mimi_msg_t *msg, uint32_t timeout_ms) +{ + TickType_t ticks = (timeout_ms == UINT32_MAX) ? portMAX_DELAY : pdMS_TO_TICKS(timeout_ms); + if (xQueueReceive(s_inbound_queue, msg, ticks) != pdTRUE) { + return ESP_ERR_TIMEOUT; + } + return ESP_OK; +} + +esp_err_t message_bus_push_outbound(const mimi_msg_t *msg) +{ + if (xQueueSend(s_outbound_queue, msg, pdMS_TO_TICKS(1000)) != pdTRUE) { + ESP_LOGW(TAG, "Outbound queue full, dropping message"); + return ESP_ERR_NO_MEM; + } + return ESP_OK; +} + +esp_err_t message_bus_pop_outbound(mimi_msg_t *msg, uint32_t timeout_ms) +{ + TickType_t ticks = (timeout_ms == UINT32_MAX) ? portMAX_DELAY : pdMS_TO_TICKS(timeout_ms); + if (xQueueReceive(s_outbound_queue, msg, ticks) != pdTRUE) { + return ESP_ERR_TIMEOUT; + } + return ESP_OK; +} diff --git a/main/bus/message_bus.h b/main/bus/message_bus.h new file mode 100644 index 0000000..b460d18 --- /dev/null +++ b/main/bus/message_bus.h @@ -0,0 +1,46 @@ +#pragma once + +#include "esp_err.h" +#include "freertos/FreeRTOS.h" +#include "freertos/queue.h" + +/* Channel identifiers */ +#define MIMI_CHAN_TELEGRAM "telegram" +#define MIMI_CHAN_WEBSOCKET "websocket" +#define MIMI_CHAN_CLI "cli" + +/* Message types on the bus */ +typedef struct { + char channel[16]; /* "telegram", "websocket", "cli" */ + char chat_id[32]; /* Telegram chat_id or WS client id */ + char *content; /* Heap-allocated message text (caller must free) */ +} mimi_msg_t; + +/** + * Initialize the message bus (inbound + outbound FreeRTOS queues). + */ +esp_err_t message_bus_init(void); + +/** + * Push a message to the inbound queue (towards Agent Loop). + * The bus takes ownership of msg->content. + */ +esp_err_t message_bus_push_inbound(const mimi_msg_t *msg); + +/** + * Pop a message from the inbound queue (blocking). + * Caller must free msg->content when done. + */ +esp_err_t message_bus_pop_inbound(mimi_msg_t *msg, uint32_t timeout_ms); + +/** + * Push a message to the outbound queue (towards channels). + * The bus takes ownership of msg->content. + */ +esp_err_t message_bus_push_outbound(const mimi_msg_t *msg); + +/** + * Pop a message from the outbound queue (blocking). + * Caller must free msg->content when done. + */ +esp_err_t message_bus_pop_outbound(mimi_msg_t *msg, uint32_t timeout_ms);