fix: dedupe telegram updates and persist update offset
Signed-off-by: Bo <boironic@gmail.com>
This commit is contained in:
@@ -7,6 +7,7 @@
|
||||
#include <stdlib.h>
|
||||
#include <stdbool.h>
|
||||
#include "esp_log.h"
|
||||
#include "esp_timer.h"
|
||||
#include "esp_http_client.h"
|
||||
#include "esp_crt_bundle.h"
|
||||
#include "nvs.h"
|
||||
@@ -16,6 +17,16 @@ static const char *TAG = "telegram";
|
||||
|
||||
static char s_bot_token[128] = MIMI_SECRET_TG_TOKEN;
|
||||
static int64_t s_update_offset = 0;
|
||||
static int64_t s_last_saved_offset = -1;
|
||||
static int64_t s_last_offset_save_us = 0;
|
||||
|
||||
#define TG_OFFSET_NVS_KEY "update_offset"
|
||||
#define TG_DEDUP_CACHE_SIZE 64
|
||||
#define TG_OFFSET_SAVE_INTERVAL_US (5LL * 1000 * 1000)
|
||||
#define TG_OFFSET_SAVE_STEP 10
|
||||
|
||||
static uint64_t s_seen_msg_keys[TG_DEDUP_CACHE_SIZE] = {0};
|
||||
static size_t s_seen_msg_idx = 0;
|
||||
|
||||
/* HTTP response accumulator */
|
||||
typedef struct {
|
||||
@@ -24,6 +35,77 @@ typedef struct {
|
||||
size_t cap;
|
||||
} http_resp_t;
|
||||
|
||||
static uint64_t fnv1a64(const char *s)
|
||||
{
|
||||
uint64_t h = 1469598103934665603ULL;
|
||||
if (!s) {
|
||||
return h;
|
||||
}
|
||||
while (*s) {
|
||||
h ^= (unsigned char)(*s++);
|
||||
h *= 1099511628211ULL;
|
||||
}
|
||||
return h;
|
||||
}
|
||||
|
||||
static uint64_t make_msg_key(const char *chat_id, int msg_id)
|
||||
{
|
||||
uint64_t h = fnv1a64(chat_id);
|
||||
return (h << 16) ^ (uint64_t)(msg_id & 0xFFFF) ^ ((uint64_t)msg_id << 32);
|
||||
}
|
||||
|
||||
static bool seen_msg_contains(uint64_t key)
|
||||
{
|
||||
for (size_t i = 0; i < TG_DEDUP_CACHE_SIZE; i++) {
|
||||
if (s_seen_msg_keys[i] == key) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
static void seen_msg_insert(uint64_t key)
|
||||
{
|
||||
s_seen_msg_keys[s_seen_msg_idx] = key;
|
||||
s_seen_msg_idx = (s_seen_msg_idx + 1) % TG_DEDUP_CACHE_SIZE;
|
||||
}
|
||||
|
||||
static void save_update_offset_if_needed(bool force)
|
||||
{
|
||||
if (s_update_offset <= 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
int64_t now = esp_timer_get_time();
|
||||
bool should_save = force;
|
||||
if (!should_save && s_last_saved_offset >= 0) {
|
||||
if ((s_update_offset - s_last_saved_offset) >= TG_OFFSET_SAVE_STEP) {
|
||||
should_save = true;
|
||||
} else if ((now - s_last_offset_save_us) >= TG_OFFSET_SAVE_INTERVAL_US) {
|
||||
should_save = true;
|
||||
}
|
||||
} else if (!should_save) {
|
||||
should_save = true;
|
||||
}
|
||||
|
||||
if (!should_save) {
|
||||
return;
|
||||
}
|
||||
|
||||
nvs_handle_t nvs;
|
||||
if (nvs_open(MIMI_NVS_TG, NVS_READWRITE, &nvs) != ESP_OK) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (nvs_set_i64(nvs, TG_OFFSET_NVS_KEY, s_update_offset) == ESP_OK) {
|
||||
if (nvs_commit(nvs) == ESP_OK) {
|
||||
s_last_saved_offset = s_update_offset;
|
||||
s_last_offset_save_us = now;
|
||||
}
|
||||
}
|
||||
nvs_close(nvs);
|
||||
}
|
||||
|
||||
static esp_err_t http_event_handler(esp_http_client_event_t *evt)
|
||||
{
|
||||
http_resp_t *resp = (http_resp_t *)evt->user_data;
|
||||
@@ -229,6 +311,7 @@ static void process_updates(const char *json_str)
|
||||
continue;
|
||||
}
|
||||
s_update_offset = uid + 1;
|
||||
save_update_offset_if_needed(false);
|
||||
}
|
||||
|
||||
/* Extract message */
|
||||
@@ -244,6 +327,12 @@ static void process_updates(const char *json_str)
|
||||
cJSON *chat_id = cJSON_GetObjectItem(chat, "id");
|
||||
if (!chat_id) continue;
|
||||
|
||||
int msg_id_val = -1;
|
||||
cJSON *message_id = cJSON_GetObjectItem(message, "message_id");
|
||||
if (cJSON_IsNumber(message_id)) {
|
||||
msg_id_val = (int)message_id->valuedouble;
|
||||
}
|
||||
|
||||
char chat_id_str[32];
|
||||
if (cJSON_IsString(chat_id) && chat_id->valuestring) {
|
||||
strncpy(chat_id_str, chat_id->valuestring, sizeof(chat_id_str) - 1);
|
||||
@@ -254,7 +343,18 @@ static void process_updates(const char *json_str)
|
||||
continue;
|
||||
}
|
||||
|
||||
ESP_LOGI(TAG, "Message from chat %s: %.40s...", chat_id_str, text->valuestring);
|
||||
if (msg_id_val >= 0) {
|
||||
uint64_t msg_key = make_msg_key(chat_id_str, msg_id_val);
|
||||
if (seen_msg_contains(msg_key)) {
|
||||
ESP_LOGW(TAG, "Drop duplicate message update_id=%" PRId64 " chat=%s message_id=%d",
|
||||
uid, chat_id_str, msg_id_val);
|
||||
continue;
|
||||
}
|
||||
seen_msg_insert(msg_key);
|
||||
}
|
||||
|
||||
ESP_LOGI(TAG, "Message update_id=%" PRId64 " message_id=%d from chat %s: %.40s...",
|
||||
uid, msg_id_val, chat_id_str, text->valuestring);
|
||||
|
||||
/* Push to inbound bus */
|
||||
mimi_msg_t msg = {0};
|
||||
@@ -311,6 +411,13 @@ esp_err_t telegram_bot_init(void)
|
||||
if (nvs_get_str(nvs, MIMI_NVS_KEY_TG_TOKEN, tmp, &len) == ESP_OK && tmp[0]) {
|
||||
strncpy(s_bot_token, tmp, sizeof(s_bot_token) - 1);
|
||||
}
|
||||
|
||||
int64_t offset = 0;
|
||||
if (nvs_get_i64(nvs, TG_OFFSET_NVS_KEY, &offset) == ESP_OK && offset > 0) {
|
||||
s_update_offset = offset;
|
||||
s_last_saved_offset = offset;
|
||||
ESP_LOGI(TAG, "Loaded Telegram update offset: %" PRId64, s_update_offset);
|
||||
}
|
||||
nvs_close(nvs);
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user