Просмотр исходного кода

Use PSRAM for MQTT publishing and make it scheduled (#2113)

* Run the Homeassistant Discovery directly after connecting to the broker. Before it was delayed 10s and happened while the first round alredy was in progress

* schedule sending HA discovery and static topics

* Allow setting QOS for MQTT topics

* .

* .

* change MQTT QOS1 expiration time from (default) 30 to 5s

* add logging of heap change on MQTT topic sendings

* wait for MQTT transmission timeout after publishing

* use QOS0 for Homeassistant Discovery topics. the messages then could possibly get lost but we save a lot of heap

* .

* use PSRAM for the MQTT outbox

* use QOS1 for HA discovery again

* .

* .

* disable delay, not needed with PSRAM

* .

* consolidated scheduledSendingOf_DiscoveryAndStaticTopics into sendingOf_DiscoveryAndStaticTopics_scheduled

* Send Homeasstsiatnt Discovery and static data in MQTT step instead of when the wifi gets connected

* "WIFI roaming" by channel scan (AP switching at low RSSI) (#2120)

* Activate 802.11kv wifi mesh roaming

* Activate roaming by scanning

* Revert stack reducation

* move Wifi, LWIP and BSSI to PSRAm

* added State Class "measurement" to rate_per_time_unit (#2116)

Co-authored-by: CaCO3 <caco@ruinelli.ch>

* use QOS0 for Homeassistant Discovery topics. the messages then could possibly get lost but we save a lot of heap

* use QOS0 for Homeassistant Discovery topics. the messages then could possibly get lost but we save a lot of heap

# Conflicts:
#	code/components/jomjol_mqtt/server_mqtt.cpp

# Conflicts:
#	code/components/jomjol_mqtt/server_mqtt.cpp

* .

* .

* move to next PR

* Update code/components/jomjol_mqtt/server_mqtt.cpp

* Update code/components/jomjol_mqtt/server_mqtt.cpp

---------

Co-authored-by: CaCO3 <caco@ruinelli.ch>
Co-authored-by: Slider0007 <115730895+Slider0007@users.noreply.github.com>
CaCO3 2 лет назад
Родитель
Сommit
806adcb4d0

+ 3 - 3
code/components/jomjol_controlGPIO/server_GPIO.cpp

@@ -85,7 +85,7 @@ void GpioPin::gpioInterrupt(int value) {
     if (_mqttTopic != "") {
         ESP_LOGD(TAG, "gpioInterrupt %s %d", _mqttTopic.c_str(), value);
 
-        MQTTPublish(_mqttTopic, value ? "true" : "false");        
+        MQTTPublish(_mqttTopic, value ? "true" : "false", 1);        
     }
 #endif //ENABLE_MQTT
     currentState = value;
@@ -142,7 +142,7 @@ void GpioPin::setValue(bool value, gpio_set_source setSource, std::string* error
 
 #ifdef ENABLE_MQTT
         if ((_mqttTopic != "") && (setSource != GPIO_SET_SOURCE_MQTT)) {
-            MQTTPublish(_mqttTopic, value ? "true" : "false");
+            MQTTPublish(_mqttTopic, value ? "true" : "false", 1);
         }
 #endif //ENABLE_MQTT
     }
@@ -153,7 +153,7 @@ void GpioPin::publishState() {
     if (newState != currentState) {
         ESP_LOGD(TAG,"publish state of GPIO %d new state %d", _gpio, newState);
 #ifdef ENABLE_MQTT
-        MQTTPublish(_mqttTopic, newState ? "true" : "false");
+        MQTTPublish(_mqttTopic, newState ? "true" : "false", 1);
 #endif //ENABLE_MQTT
         currentState = newState;
     }

+ 5 - 4
code/components/jomjol_flowcontroll/ClassFlowControll.cpp

@@ -277,7 +277,7 @@ void ClassFlowControll::InitFlow(std::string config)
     aktstatusWithTime = aktstatus;
 
     //#ifdef ENABLE_MQTT
-        //MQTTPublish(mqttServer_getMainTopic() + "/" + "status", "Initialization", false); // Right now, not possible -> MQTT Service is going to be started later
+        //MQTTPublish(mqttServer_getMainTopic() + "/" + "status", "Initialization", 1, false); // Right now, not possible -> MQTT Service is going to be started later
     //#endif //ENABLE_MQTT
     
     string line;
@@ -352,7 +352,7 @@ void ClassFlowControll::doFlowTakeImageOnly(string time)
             aktstatus = TranslateAktstatus(FlowControll[i]->name());
             aktstatusWithTime = aktstatus + " (" + zw_time + ")";
             #ifdef ENABLE_MQTT
-                MQTTPublish(mqttServer_getMainTopic() + "/" + "status", aktstatus, false);
+                MQTTPublish(mqttServer_getMainTopic() + "/" + "status", aktstatus, 1, false);
             #endif //ENABLE_MQTT
 
             FlowControll[i]->doFlow(time);
@@ -366,6 +366,7 @@ bool ClassFlowControll::doFlow(string time)
     bool result = true;
     std::string zw_time;
     int repeat = 0;
+    int qos = 1;
 
     #ifdef DEBUG_DETAIL_ON 
         LogFile.WriteHeapInfo("ClassFlowControll::doFlow - Start");
@@ -386,7 +387,7 @@ bool ClassFlowControll::doFlow(string time)
         aktstatusWithTime = aktstatus + " (" + zw_time + ")";
         //LogFile.WriteToFile(ESP_LOG_INFO, TAG, aktstatusWithTime);
         #ifdef ENABLE_MQTT
-            MQTTPublish(mqttServer_getMainTopic() + "/" + "status", aktstatus, false);
+            MQTTPublish(mqttServer_getMainTopic() + "/" + "status", aktstatus, qos, false);
         #endif //ENABLE_MQTT
 
         #ifdef DEBUG_DETAIL_ON
@@ -420,7 +421,7 @@ bool ClassFlowControll::doFlow(string time)
     aktstatusWithTime = aktstatus + " (" + zw_time + ")";
     //LogFile.WriteToFile(ESP_LOG_INFO, TAG, aktstatusWithTime);
     #ifdef ENABLE_MQTT
-        MQTTPublish(mqttServer_getMainTopic() + "/" + "status", aktstatus, false);
+        MQTTPublish(mqttServer_getMainTopic() + "/" + "status", aktstatus, qos, false);
     #endif //ENABLE_MQTT
 
     return result;

+ 15 - 11
code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp

@@ -225,8 +225,12 @@ bool ClassFlowMQTT::doFlow(string zwtime)
     std::string resultchangabs = "";
     string zw = "";
     string namenumber = "";
+    int qos = 1;
 
-    success = publishSystemData();
+    /* Send the the Homeassistant Discovery and the Static Topics in case they where scheduled */
+    sendDiscovery_and_static_Topics();
+
+    success = publishSystemData(qos);
 
     if (flowpostprocessing && getMQTTisConnected())
     {
@@ -252,13 +256,13 @@ bool ClassFlowMQTT::doFlow(string zwtime)
 
 
             if (result.length() > 0)   
-                success |= MQTTPublish(namenumber + "value", result, SetRetainFlag);
+                success |= MQTTPublish(namenumber + "value", result, qos, SetRetainFlag);
 
             if (resulterror.length() > 0)  
-                success |= MQTTPublish(namenumber + "error", resulterror, SetRetainFlag);
+                success |= MQTTPublish(namenumber + "error", resulterror, qos, SetRetainFlag);
 
             if (resultrate.length() > 0) {
-                success |= MQTTPublish(namenumber + "rate", resultrate, SetRetainFlag);
+                success |= MQTTPublish(namenumber + "rate", resultrate, qos, SetRetainFlag);
                 
                 std::string resultRatePerTimeUnit;
                 if (getTimeUnit() == "h") { // Need conversion to be per hour
@@ -267,22 +271,22 @@ bool ClassFlowMQTT::doFlow(string zwtime)
                 else { // Keep per minute
                     resultRatePerTimeUnit = resultrate;
                 }
-                success |= MQTTPublish(namenumber + "rate_per_time_unit", resultRatePerTimeUnit, SetRetainFlag);
+                success |= MQTTPublish(namenumber + "rate_per_time_unit", resultRatePerTimeUnit, qos, SetRetainFlag);
             }
 
             if (resultchangabs.length() > 0) {
-                success |= MQTTPublish(namenumber + "changeabsolut", resultchangabs, SetRetainFlag); // Legacy API
-                success |= MQTTPublish(namenumber + "rate_per_digitalization_round", resultchangabs, SetRetainFlag);
+                success |= MQTTPublish(namenumber + "changeabsolut", resultchangabs, qos, SetRetainFlag); // Legacy API
+                success |= MQTTPublish(namenumber + "rate_per_digitalization_round", resultchangabs, qos, SetRetainFlag);
             }
 
             if (resultraw.length() > 0)   
-                success |= MQTTPublish(namenumber + "raw", resultraw, SetRetainFlag);
+                success |= MQTTPublish(namenumber + "raw", resultraw, qos, SetRetainFlag);
 
             if (resulttimestamp.length() > 0)
-                success |= MQTTPublish(namenumber + "timestamp", resulttimestamp, SetRetainFlag);
+                success |= MQTTPublish(namenumber + "timestamp", resulttimestamp, qos, SetRetainFlag);
 
             std::string json = flowpostprocessing->getJsonFromNumber(i, "\n");
-            success |= MQTTPublish(namenumber + "json", json, SetRetainFlag);
+            success |= MQTTPublish(namenumber + "json", json, qos, SetRetainFlag);
         }
     }
     
@@ -300,7 +304,7 @@ bool ClassFlowMQTT::doFlow(string zwtime)
     //                 result = result + "\t" + zw;
     //         }
     //     }
-    //     success |= MQTTPublish(topic, result, SetRetainFlag);
+    //     success |= MQTTPublish(topic, result, qos, SetRetainFlag);
     // }
     
     OldValue = result;

+ 5 - 5
code/components/jomjol_mqtt/interface_mqtt.cpp

@@ -31,7 +31,7 @@ bool SetRetainFlag;
 void (*callbackOnConnected)(std::string, bool) = NULL;
 
 
-bool MQTTPublish(std::string _key, std::string _content, bool retained_flag) 
+bool MQTTPublish(std::string _key, std::string _content, int qos, bool retained_flag) 
 {
     if (!mqtt_enabled) {                            // MQTT sevice not started / configured (MQTT_Init not called before)      
         return false;
@@ -51,7 +51,7 @@ bool MQTTPublish(std::string _key, std::string _content, bool retained_flag)
         #ifdef DEBUG_DETAIL_ON 
             long long int starttime = esp_timer_get_time();
         #endif
-        int msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
+        int msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, qos, retained_flag);
         #ifdef DEBUG_DETAIL_ON 
             ESP_LOGD(TAG, "Publish msg_id %d in %lld ms", msg_id, (esp_timer_get_time() - starttime)/1000);
         #endif
@@ -60,7 +60,7 @@ bool MQTTPublish(std::string _key, std::string _content, bool retained_flag)
             #ifdef DEBUG_DETAIL_ON 
                 starttime = esp_timer_get_time();
             #endif
-            msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
+            msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, qos, retained_flag);
             #ifdef DEBUG_DETAIL_ON 
                 ESP_LOGD(TAG, "Publish msg_id %d in %lld ms", msg_id, (esp_timer_get_time() - starttime)/1000);
             #endif
@@ -234,7 +234,7 @@ int MQTT_Init() {
         .buffer_size = 1536,                    // size of MQTT send/receive buffer (Default: 1024)
         .reconnect_timeout_ms = 15000,          // Try to reconnect to broker (Default: 10000ms)
         .network_timeout_ms = 20000,            // Network Timeout (Default: 10000ms)
-        .message_retransmit_timeout = 3000      // Tiem after message resent when broker not acknowledged (QoS1, QoS2)
+        .message_retransmit_timeout = 3000      // Time after message resent when broker not acknowledged (QoS1, QoS2)
 
     };
 
@@ -345,7 +345,7 @@ void MQTTconnected(){
             }
         }
 
-        vTaskDelay(10000 / portTICK_PERIOD_MS);                 // Delay execution of callback routine after connection got established   
+        /* Send Static Topics and Homeassistant Discovery */
         if (callbackOnConnected) {                              // Call onConnected callback routine --> mqtt_server
             callbackOnConnected(maintopic, SetRetainFlag);
         }

+ 1 - 1
code/components/jomjol_mqtt/interface_mqtt.h

@@ -15,7 +15,7 @@ bool MQTT_Configure(std::string _mqttURI, std::string _clientid, std::string _us
 int MQTT_Init();
 void MQTTdestroy_client(bool _disable);
 
-bool MQTTPublish(std::string _key, std::string _content, bool retained_flag = 1);            // retained Flag as Standart
+bool MQTTPublish(std::string _key, std::string _content, int qos, bool retained_flag = 1);            // retained Flag as Standart
 
 bool getMQTTisEnabled();
 bool getMQTTisConnected();

+ 301 - 0
code/components/jomjol_mqtt/mqtt_outbox.c

@@ -0,0 +1,301 @@
+/* This is a modification of https://github.com/espressif/esp-mqtt/blob/master/lib/mqtt_outbox.c
+ * to use the PSRAM instead of the internal heap.
+*/
+#include "mqtt_outbox.h"
+#include <stdlib.h>
+#include <string.h>
+#include "sys/queue.h"
+#include "esp_log.h"
+#include "esp_heap_caps.h"
+
+#define USE_PSRAM
+
+#ifdef CONFIG_MQTT_CUSTOM_OUTBOX
+static const char *TAG = "outbox";
+
+typedef struct outbox_item {
+    char *buffer;
+    int len;
+    int msg_id;
+    int msg_type;
+    int msg_qos;
+    outbox_tick_t tick;
+    pending_state_t pending;
+    STAILQ_ENTRY(outbox_item) next;
+} outbox_item_t;
+
+STAILQ_HEAD(outbox_list_t, outbox_item);
+
+
+outbox_handle_t outbox_init(void)
+{
+#ifdef USE_PSRAM
+    outbox_handle_t outbox = heap_caps_calloc(1, sizeof(struct outbox_list_t), MALLOC_CAP_8BIT | MALLOC_CAP_SPIRAM);
+#else
+    outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t));
+#endif
+    //ESP_MEM_CHECK(TAG, outbox, return NULL);
+    STAILQ_INIT(outbox);
+    return outbox;
+}
+
+outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
+{
+#ifdef USE_PSRAM
+    outbox_item_handle_t item = heap_caps_calloc(1, sizeof(outbox_item_t), MALLOC_CAP_8BIT | MALLOC_CAP_SPIRAM);
+#else
+    outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
+#endif
+    //ESP_MEM_CHECK(TAG, item, return NULL);
+    item->msg_id = message->msg_id;
+    item->msg_type = message->msg_type;
+    item->msg_qos = message->msg_qos;
+    item->tick = tick;
+    item->len =  message->len + message->remaining_len;
+    item->pending = QUEUED;
+#ifdef USE_PSRAM
+    item->buffer = heap_caps_malloc(message->len + message->remaining_len, MALLOC_CAP_8BIT | MALLOC_CAP_SPIRAM);
+#else
+    item->buffer = malloc(message->len + message->remaining_len);
+#endif
+    /*ESP_MEM_CHECK(TAG, item->buffer, {
+        free(item);
+        return NULL;
+    });*/
+    memcpy(item->buffer, message->data, message->len);
+    if (message->remaining_data) {
+        memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len);
+    }
+    STAILQ_INSERT_TAIL(outbox, item, next);
+    ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
+    return item;
+}
+
+outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
+{
+    outbox_item_handle_t item;
+    STAILQ_FOREACH(item, outbox, next) {
+        if (item->msg_id == msg_id) {
+            return item;
+        }
+    }
+    return NULL;
+}
+
+outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
+{
+    outbox_item_handle_t item;
+    STAILQ_FOREACH(item, outbox, next) {
+        if (item->pending == pending) {
+            if (tick) {
+                *tick = item->tick;
+            }
+            return item;
+        }
+    }
+    return NULL;
+}
+
+esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
+{
+    outbox_item_handle_t item;
+    STAILQ_FOREACH(item, outbox, next) {
+        if (item == item_to_delete) {
+            STAILQ_REMOVE(outbox, item, outbox_item, next);
+#ifdef USE_PSRAM
+            heap_caps_free(item->buffer);
+            heap_caps_free(item);
+#else
+            free(item->buffer);
+            free(item);
+#endif
+            return ESP_OK;
+        }
+    }
+    return ESP_FAIL;
+}
+
+uint8_t *outbox_item_get_data(outbox_item_handle_t item,  size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
+{
+    if (item) {
+        *len = item->len;
+        *msg_id = item->msg_id;
+        *msg_type = item->msg_type;
+        *qos = item->msg_qos;
+        return (uint8_t *)item->buffer;
+    }
+    return NULL;
+}
+
+esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
+{
+    outbox_item_handle_t item, tmp;
+    STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
+        if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) {
+            STAILQ_REMOVE(outbox, item, outbox_item, next);
+#ifdef USE_PSRAM
+            heap_caps_free(item->buffer);
+            heap_caps_free(item);
+#else
+            free(item->buffer);
+            free(item);
+#endif
+            ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%d", msg_id, msg_type, outbox_get_size(outbox));
+            return ESP_OK;
+        }
+
+    }
+    return ESP_FAIL;
+}
+esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id)
+{
+    outbox_item_handle_t item, tmp;
+    STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
+        if (item->msg_id == msg_id) {
+            STAILQ_REMOVE(outbox, item, outbox_item, next);
+#ifdef USE_PSRAM
+            heap_caps_free(item->buffer);
+            heap_caps_free(item);
+#else
+            free(item->buffer);
+            free(item);
+#endif
+        }
+
+    }
+    return ESP_OK;
+}
+esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
+{
+    outbox_item_handle_t item = outbox_get(outbox, msg_id);
+    if (item) {
+        item->pending = pending;
+        return ESP_OK;
+    }
+    return ESP_FAIL;
+}
+
+pending_state_t outbox_item_get_pending(outbox_item_handle_t item)
+{
+    if (item) {
+        return item->pending;
+    }
+    return QUEUED;
+}
+
+esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick)
+{
+    outbox_item_handle_t item = outbox_get(outbox, msg_id);
+    if (item) {
+        item->tick = tick;
+        return ESP_OK;
+    }
+    return ESP_FAIL;
+}
+
+esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type)
+{
+    outbox_item_handle_t item, tmp;
+    STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
+        if (item->msg_type == msg_type) {
+            STAILQ_REMOVE(outbox, item, outbox_item, next);
+#ifdef USE_PSRAM
+            heap_caps_free(item->buffer);
+            heap_caps_free(item);
+#else
+            free(item->buffer);
+            free(item);
+#endif
+        }
+
+    }
+    return ESP_OK;
+}
+int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
+{
+    int msg_id = -1;
+    outbox_item_handle_t item;
+    STAILQ_FOREACH(item, outbox, next) {
+        if (current_tick - item->tick > timeout) {
+            STAILQ_REMOVE(outbox, item, outbox_item, next);
+
+#ifdef USE_PSRAM
+            heap_caps_free(item->buffer);
+#else
+            free(item->buffer);
+#endif
+
+            msg_id = item->msg_id;
+
+#ifdef USE_PSRAM
+            heap_caps_free(item);
+#else
+            free(item);
+#endif
+
+            return msg_id;
+        }
+
+    }
+    return msg_id;
+}
+
+int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
+{
+    int deleted_items = 0;
+    outbox_item_handle_t item, tmp;
+    STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
+        if (current_tick - item->tick > timeout) {
+            STAILQ_REMOVE(outbox, item, outbox_item, next);
+#ifdef USE_PSRAM
+            heap_caps_free(item->buffer);
+            heap_caps_free(item);
+#else
+            free(item->buffer);
+            free(item);
+#endif
+            deleted_items ++;
+        }
+
+    }
+    return deleted_items;
+}
+
+int outbox_get_size(outbox_handle_t outbox)
+{
+    int siz = 0;
+    outbox_item_handle_t item;
+    STAILQ_FOREACH(item, outbox, next) {
+        // Suppressing "use after free" warning as this could happen only if queue is in inconsistent state
+        // which never happens if STAILQ interface used
+        siz += item->len; // NOLINT(clang-analyzer-unix.Malloc)
+    }
+    return siz;
+}
+
+void outbox_delete_all_items(outbox_handle_t outbox)
+{
+    outbox_item_handle_t item, tmp;
+    STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
+        STAILQ_REMOVE(outbox, item, outbox_item, next);
+#ifdef USE_PSRAM
+            heap_caps_free(item->buffer);
+            heap_caps_free(item);
+#else
+            free(item->buffer);
+            free(item);
+#endif
+    }
+}
+void outbox_destroy(outbox_handle_t outbox)
+{
+    outbox_delete_all_items(outbox);
+
+#ifdef USE_PSRAM
+    heap_caps_free(outbox);
+#else
+    free(outbox);
+#endif
+}
+
+#endif /* CONFIG_MQTT_CUSTOM_OUTBOX */

+ 66 - 0
code/components/jomjol_mqtt/mqtt_outbox.h

@@ -0,0 +1,66 @@
+/* This is an adaption of https://github.com/espressif/esp-mqtt/blob/master/lib/include/mqtt_outbox.h
+ * This file is subject to the terms and conditions defined in
+ * file 'LICENSE', which is part of this source code package.
+ * Tuan PM <tuanpm at live dot com>
+ */
+#ifndef _MQTT_OUTOBX_H_
+#define _MQTT_OUTOBX_H_
+//#include "platform.h"
+#include "esp_err.h"
+
+#ifdef  __cplusplus
+extern "C" {
+#endif
+
+struct outbox_item;
+
+typedef struct outbox_list_t *outbox_handle_t;
+typedef struct outbox_item *outbox_item_handle_t;
+typedef struct outbox_message *outbox_message_handle_t;
+typedef long long outbox_tick_t;
+
+typedef struct outbox_message {
+    uint8_t *data;
+    int len;
+    int msg_id;
+    int msg_qos;
+    int msg_type;
+    uint8_t *remaining_data;
+    int remaining_len;
+} outbox_message_t;
+
+typedef enum pending_state {
+    QUEUED,
+    TRANSMITTED,
+    ACKNOWLEDGED,
+    CONFIRMED
+} pending_state_t;
+
+outbox_handle_t outbox_init(void);
+outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick);
+outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick);
+outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id);
+uint8_t *outbox_item_get_data(outbox_item_handle_t item,  size_t *len, uint16_t *msg_id, int *msg_type, int *qos);
+esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type);
+esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id);
+esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type);
+esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item);
+int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
+/**
+ * @brief Deletes single expired message returning it's message id
+ *
+ * @return msg id of the deleted message, -1 if no expired message in the outbox
+ */
+int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout);
+
+esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending);
+pending_state_t outbox_item_get_pending(outbox_item_handle_t item);
+esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick);
+int outbox_get_size(outbox_handle_t outbox);
+void outbox_destroy(outbox_handle_t outbox);
+void outbox_delete_all_items(outbox_handle_t outbox);
+
+#ifdef  __cplusplus
+}
+#endif
+#endif

+ 90 - 71
code/components/jomjol_mqtt/server_mqtt.cpp

@@ -32,6 +32,7 @@ float roundInterval; // Minutes
 int keepAlive = 0; // Seconds
 bool retainFlag;
 static std::string maintopic;
+bool sendingOf_DiscoveryAndStaticTopics_scheduled = true; // Set it to true to make sure it gets sent at least once after startup
 
 
 void mqttServer_setParameter(std::vector<NumberPost*>* _NUMBERS, int _keepAlive, float _roundInterval) {
@@ -48,7 +49,8 @@ void mqttServer_setMeterType(std::string _meterType, std::string _valueUnit, std
 }
 
 bool sendHomeAssistantDiscoveryTopic(std::string group, std::string field,
-    std::string name, std::string icon, std::string unit, std::string deviceClass, std::string stateClass, std::string entityCategory) {
+    std::string name, std::string icon, std::string unit, std::string deviceClass, std::string stateClass, std::string entityCategory,
+    int qos) {
     std::string version = std::string(libfive_git_version());
 
     if (version == "") {
@@ -131,10 +133,10 @@ bool sendHomeAssistantDiscoveryTopic(std::string group, std::string field,
     "}"  +
     "}";
 
-    return MQTTPublish(topicFull, payload, true);
+    return MQTTPublish(topicFull, payload, qos, true);
 }
 
-bool MQTThomeassistantDiscovery() {  
+bool MQTThomeassistantDiscovery(int qos) {  
     bool allSendsSuccessed = false;
 
     if (!getMQTTisConnected()) {
@@ -142,18 +144,20 @@ bool MQTThomeassistantDiscovery() {
         return false;
     }
 
-    LogFile.WriteToFile(ESP_LOG_INFO, TAG, "MQTT - Sending Homeassistant Discovery Topics (Meter Type: " + meterType + ", Value Unit: " + valueUnit + " , Rate Unit: " + rateUnit + ")...");
+    LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Publishing Homeassistant Discovery topics (Meter Type: '" + meterType + "', Value Unit: '" + valueUnit + "' , Rate Unit: '" + rateUnit + "') ...");
+
+	int aFreeInternalHeapSizeBefore = heap_caps_get_free_size(MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL);
 
     //                                                   Group | Field            | User Friendly Name | Icon                      | Unit | Device Class     | State Class  | Entity Category
-    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "uptime",          "Uptime",            "clock-time-eight-outline", "s",   "",                "",            "diagnostic");
-    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "MAC",             "MAC Address",       "network-outline",          "",    "",                "",            "diagnostic");
-    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "hostname",        "Hostname",          "network-outline",          "",    "",                "",            "diagnostic");
-    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "freeMem",         "Free Memory",       "memory",                   "B",   "",                "measurement", "diagnostic");
-    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "wifiRSSI",        "Wi-Fi RSSI",        "wifi",                     "dBm", "signal_strength", "",            "diagnostic");
-    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "CPUtemp",         "CPU Temperature",   "thermometer",              "°C",  "temperature",     "measurement", "diagnostic");
-    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "interval",        "Interval",          "clock-time-eight-outline", "min",  ""           ,    "measurement", "diagnostic");
-    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "IP",              "IP",                "network-outline",           "",    "",               "",            "diagnostic");
-    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "status",          "Status",            "list-status",               "",    "",               "",            "diagnostic");
+    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "uptime",          "Uptime",            "clock-time-eight-outline", "s",   "",                "",            "diagnostic", qos);
+    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "MAC",             "MAC Address",       "network-outline",          "",    "",                "",            "diagnostic", qos);
+    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "hostname",        "Hostname",          "network-outline",          "",    "",                "",            "diagnostic", qos);
+    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "freeMem",         "Free Memory",       "memory",                   "B",   "",                "measurement", "diagnostic", qos);
+    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "wifiRSSI",        "Wi-Fi RSSI",        "wifi",                     "dBm", "signal_strength", "",            "diagnostic", qos);
+    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "CPUtemp",         "CPU Temperature",   "thermometer",              "°C",  "temperature",     "measurement", "diagnostic", qos);
+    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "interval",        "Interval",          "clock-time-eight-outline", "min",  ""           ,    "measurement", "diagnostic", qos);
+    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "IP",              "IP",                "network-outline",           "",    "",               "",            "diagnostic", qos);
+    allSendsSuccessed |= sendHomeAssistantDiscoveryTopic("",     "status",          "Status",            "list-status",               "",    "",               "",            "diagnostic", qos);
 
 
     for (int i = 0; i < (*NUMBERS).size(); ++i) {
@@ -163,23 +167,31 @@ bool MQTThomeassistantDiscovery() {
         }
 
     //                                                       Group | Field                          | User Friendly Name                | Icon                   | Unit                 | Device Class | State Class       | Entity Category
-        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "value",                      "Value",                            "gauge",                 valueUnit,             meterType,      "total_increasing", "");
-        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "raw",                        "Raw Value",                        "raw",                   "",                    "",             "",                 "diagnostic");
-        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "error",                      "Error",                            "alert-circle-outline",  "",                    "",             "",                 "diagnostic");
+        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "value",                      "Value",                            "gauge",                 valueUnit,             meterType,      "total_increasing", "", qos);
+        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "raw",                        "Raw Value",                        "raw",                   "",                    "",             "",                 "diagnostic", qos);
+        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "error",                      "Error",                            "alert-circle-outline",  "",                    "",             "",                 "diagnostic", qos);
         /* Not announcing "rate" as it is better to use rate_per_time_unit resp. rate_per_digitalization_round */
         // allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "rate",               "Rate (Unit/Minute)",               "swap-vertical",         "",        "",            "",                 ""); // Legacy, always Unit per Minute
-        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "rate_per_time_unit",         "Rate (" + rateUnit + ")",          "swap-vertical",         rateUnit,              "",             "measurement",      "");
-        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "rate_per_digitalization_round",  "Change since last digitalization round",  "arrow-expand-vertical", valueUnit,  "",             "measurement",      ""); // correctly the Unit is Uint/Interval!
-        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "timestamp",                  "Timestamp",                     "clock-time-eight-outline", "",                    "timestamp",    "",                 "diagnostic");
-        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "json",                       "JSON",                             "code-json",             "",                    "",             "",                 "diagnostic");
-        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "problem",                    "Problem",                          "alert-outline",         "",                    "problem",      "",                 ""); // Special binary sensor which is based on error topic
+        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "rate_per_time_unit",         "Rate (" + rateUnit + ")",          "swap-vertical",         rateUnit,              "",             "measurement",      "", qos);
+        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "rate_per_digitalization_round",  "Change since last digitalization round",  "arrow-expand-vertical", valueUnit,  "",             "measurement",      "", qos); // correctly the Unit is Unit/Interval!
+        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "timestamp",                  "Timestamp",                     "clock-time-eight-outline", "",                    "timestamp",    "",                 "diagnostic", qos);
+        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "json",                       "JSON",                             "code-json",             "",                    "",             "",                 "diagnostic", qos);
+        allSendsSuccessed |= sendHomeAssistantDiscoveryTopic(group,   "problem",                    "Problem",                          "alert-outline",         "",                    "problem",      "",                 "", qos); // Special binary sensor which is based on error topic
     }
 
     LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Successfully published all Homeassistant Discovery MQTT topics");
+
+    int aFreeInternalHeapSizeAfter = heap_caps_get_free_size(MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL);
+    int aMinFreeInternalHeapSize =  heap_caps_get_minimum_free_size(MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL);
+
+    LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Int. Heap Usage before Publishing Homeassistand Discovery Topics: " + 
+            to_string(aFreeInternalHeapSizeBefore) + ", after: " + to_string(aFreeInternalHeapSizeAfter) + ", delta: " + 
+            to_string(aFreeInternalHeapSizeBefore - aFreeInternalHeapSizeAfter) + ", lowest free: " + to_string(aMinFreeInternalHeapSize));
+
     return allSendsSuccessed;
 }
 
-bool publishSystemData() {
+bool publishSystemData(int qos) {
     bool allSendsSuccessed = false;
 
     if (!getMQTTisConnected()) {
@@ -189,28 +201,38 @@ bool publishSystemData() {
 
     char tmp_char[50];
 
-    LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Publishing system MQTT topics...");
+    LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Publishing System MQTT topics...");
+
+	int aFreeInternalHeapSizeBefore = heap_caps_get_free_size(MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL);
 
-    allSendsSuccessed |= MQTTPublish(maintopic + "/" + std::string(LWT_TOPIC), LWT_CONNECTED, retainFlag); // Publish "connected" to maintopic/connection
+    allSendsSuccessed |= MQTTPublish(maintopic + "/" + std::string(LWT_TOPIC), LWT_CONNECTED, qos, retainFlag); // Publish "connected" to maintopic/connection
 
     sprintf(tmp_char, "%ld", (long)getUpTime());
-    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "uptime", std::string(tmp_char), retainFlag);
+    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "uptime", std::string(tmp_char), qos, retainFlag);
     
     sprintf(tmp_char, "%lu", (long) getESPHeapSize());
-    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "freeMem", std::string(tmp_char), retainFlag);
+    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "freeMem", std::string(tmp_char), qos, retainFlag);
 
     sprintf(tmp_char, "%d", get_WIFI_RSSI());
-    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "wifiRSSI", std::string(tmp_char), retainFlag);
+    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "wifiRSSI", std::string(tmp_char), qos, retainFlag);
 
     sprintf(tmp_char, "%d", (int)temperatureRead());
-    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "CPUtemp", std::string(tmp_char), retainFlag);
+    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "CPUtemp", std::string(tmp_char), qos, retainFlag);
 
     LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Successfully published all System MQTT topics");
+
+	int aFreeInternalHeapSizeAfter = heap_caps_get_free_size(MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL);
+	int aMinFreeInternalHeapSize =  heap_caps_get_minimum_free_size(MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL);
+
+    LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Int. Heap Usage before publishing System Topics: " + 
+            to_string(aFreeInternalHeapSizeBefore) + ", after: " + to_string(aFreeInternalHeapSizeAfter) + ", delta: " + 
+            to_string(aFreeInternalHeapSizeBefore - aFreeInternalHeapSizeAfter) + ", lowest free: " + to_string(aMinFreeInternalHeapSize));
+
     return allSendsSuccessed;
 }
 
 
-bool publishStaticData() {
+bool publishStaticData(int qos) {
     bool allSendsSuccessed = false;
 
     if (!getMQTTisConnected()) {
@@ -219,69 +241,66 @@ bool publishStaticData() {
     }
 
     LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Publishing static MQTT topics...");
-    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "MAC", getMac(), retainFlag);
-    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "IP", *getIPAddress(), retainFlag);
-    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "hostname", wlan_config.hostname, retainFlag);
+
+	int aFreeInternalHeapSizeBefore = heap_caps_get_free_size(MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL);
+
+    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "MAC", getMac(), qos, retainFlag);
+    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "IP", *getIPAddress(), qos, retainFlag);
+    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "hostname", wlan_config.hostname, qos, retainFlag);
 
     std::stringstream stream;
     stream << std::fixed << std::setprecision(1) << roundInterval; // minutes
-    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "interval", stream.str(), retainFlag);
+    allSendsSuccessed |= MQTTPublish(maintopic + "/" + "interval", stream.str(), qos, retainFlag);
 
     LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Successfully published all Static MQTT topics");
+
+	int aFreeInternalHeapSizeAfter = heap_caps_get_free_size(MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL);
+	int aMinFreeInternalHeapSize =  heap_caps_get_minimum_free_size(MALLOC_CAP_8BIT | MALLOC_CAP_INTERNAL);
+
+    LogFile.WriteToFile(ESP_LOG_DEBUG, TAG, "Int. Heap Usage before Publishing Static Topics: " + 
+            to_string(aFreeInternalHeapSizeBefore) + ", after: " + to_string(aFreeInternalHeapSizeAfter) + ", delta: " + 
+            to_string(aFreeInternalHeapSizeBefore - aFreeInternalHeapSizeAfter) + ", lowest free: " + to_string(aMinFreeInternalHeapSize));
+
     return allSendsSuccessed;
 }
 
-esp_err_t sendDiscovery_and_static_Topics(httpd_req_t *req) {
+
+esp_err_t scheduleSendingDiscovery_and_static_Topics(httpd_req_t *req) {
+    sendingOf_DiscoveryAndStaticTopics_scheduled = true;
+    char msg[] = "MQTT Homeassistant Discovery and Static Topics scheduled";
+    httpd_resp_send(req, msg, strlen(msg));  
+    return ESP_OK;
+}
+
+
+esp_err_t sendDiscovery_and_static_Topics(void) {
     bool success = false;
 
+    if (!sendingOf_DiscoveryAndStaticTopics_scheduled) {
+        // Flag not set, nothing to do
+        return ESP_OK;
+    }
+
     if (HomeassistantDiscovery) {
-        success = MQTThomeassistantDiscovery();
+        success = MQTThomeassistantDiscovery(1);
     }
 
-    success |= publishStaticData();
+    success |= publishStaticData(1);
 
-    if (success) {
-        char msg[] = "MQTT Homeassistant Discovery and Static Topics sent!";
-        httpd_resp_send(req, msg, strlen(msg));  
+    if (success) { // Success, clear the flag
+        sendingOf_DiscoveryAndStaticTopics_scheduled = false;
         return ESP_OK;
     }
     else {
-        LogFile.WriteToFile(ESP_LOG_WARN, TAG, "One or more MQTT topics failed to be published!");
-        char msg[] = "Failed to send MQTT topics!";
-        httpd_resp_send(req, msg, strlen(msg)); 
+        LogFile.WriteToFile(ESP_LOG_WARN, TAG, "One or more MQTT topics failed to be published, will try sending them in the next round!");
+        /* Keep sendingOf_DiscoveryAndStaticTopics_scheduled set so we can retry after the next round */
         return ESP_FAIL;
     }
 }
 
-void GotConnected(std::string maintopic, bool retainFlag) {
-    static bool initialStaticOrHomeassistantDiscoveryTopicsGotSent = false;
-    bool success = false;
-
-    /* Only send Homeassistant Discovery and Static topics on the first time connecting */
-    if (!initialStaticOrHomeassistantDiscoveryTopicsGotSent) {
-        if (HomeassistantDiscovery) {
-            success = MQTThomeassistantDiscovery();
-        }
-
-        success |= publishStaticData();
 
-        if (success) {
-            /* Sending of all Homeassistant Discovery and Static Topics was successfull.
-             * Will no no longer send it on a re-connect!
-             * (But it is still possible to trigger sending through the REST API). */
-            initialStaticOrHomeassistantDiscoveryTopicsGotSent = true;
-        }
-        else {
-        LogFile.WriteToFile(ESP_LOG_WARN, TAG, "One or more static or Homeassistant Discovery MQTT topics failed to be published! Will try again on the next round.");
-        }
-    }
-
-    /* The System Data changes at runtime, therefore we always send it after a re-connect */
-    success |= publishSystemData();
-
-    if (!success) {
-        LogFile.WriteToFile(ESP_LOG_WARN, TAG, "One or more MQTT topics failed to be published!");
-    }
+void GotConnected(std::string maintopic, bool retainFlag) {
+    // Nothing to do
 }
 
 void register_server_mqtt_uri(httpd_handle_t server) {
@@ -289,7 +308,7 @@ void register_server_mqtt_uri(httpd_handle_t server) {
     uri.method    = HTTP_GET;
 
     uri.uri       = "/mqtt_publish_discovery";
-    uri.handler   = sendDiscovery_and_static_Topics;
+    uri.handler   = scheduleSendingDiscovery_and_static_Topics;
     uri.user_ctx  = (void*) "";    
     httpd_register_uri_handler(server, &uri); 
 }

+ 2 - 1
code/components/jomjol_mqtt/server_mqtt.h

@@ -16,10 +16,11 @@ std::string mqttServer_getMainTopic();
 
 void register_server_mqtt_uri(httpd_handle_t server);
 
-bool publishSystemData();
+bool publishSystemData(int qos);
 
 std::string getTimeUnit(void);
 void GotConnected(std::string maintopic, bool SetRetainFlag);
+esp_err_t sendDiscovery_and_static_Topics(void);
 
 
 #endif //SERVERMQTT_H

+ 3 - 0
code/sdkconfig.defaults

@@ -118,6 +118,9 @@ CONFIG_MQTT_MSG_ID_INCREMENTAL=y
 CONFIG_MQTT_SKIP_PUBLISH_IF_DISCONNECTED=y
 CONFIG_MQTT_TASK_CORE_SELECTION_ENABLED=y
 CONFIG_MQTT_USE_CORE_0=y
+CONFIG_MQTT_USE_CUSTOM_CONFIG=y
+#CONFIG_MQTT_OUTBOX_EXPIRED_TIMEOUT_MS=5000
+CONFIG_MQTT_CUSTOM_OUTBOX=y
 
 CONFIG_FREERTOS_TASK_FUNCTION_WRAPPER=n