Просмотр исходного кода

Improve MQTT connection handling (#1462)

* modify mqtt init at startup + after disconnection

* mqtt_init only when not initialized

* Minor udapte

* Apply suggestions from code review

Co-authored-by: CaCO3 <caco3@ruinelli.ch>

* Correct typo

Co-authored-by: CaCO3 <caco3@ruinelli.ch>
Slider0007 3 лет назад
Родитель
Сommit
85905a7045

+ 12 - 7
code/components/jomjol_flowcontroll/ClassFlowControll.cpp

@@ -140,6 +140,15 @@ string ClassFlowControll::GetMQTTMainTopic()
     return "";
     return "";
 }
 }
 
 
+bool ClassFlowControll::StartMQTTService() {
+    /* Start the MQTT service */
+        for (int i = 0; i < FlowControll.size(); ++i) {
+            if (FlowControll[i]->name().compare("ClassFlowMQTT") == 0) {
+                return ((ClassFlowMQTT*) (FlowControll[i]))->Start(AutoIntervall);
+            }  
+        }
+    return false;
+}
 
 
 
 
 void ClassFlowControll::SetInitialParameter(void)
 void ClassFlowControll::SetInitialParameter(void)
@@ -311,7 +320,9 @@ bool ClassFlowControll::doFlow(string time)
         MQTTPublish(mqttServer_getMainTopic() + "/" + "status", flowStatus, false);
         MQTTPublish(mqttServer_getMainTopic() + "/" + "status", flowStatus, false);
 
 
         string zw = "FlowControll.doFlow - " + FlowControll[i]->name();
         string zw = "FlowControll.doFlow - " + FlowControll[i]->name();
-        LogFile.WriteHeapInfo(zw);
+        #ifdef DEBUG_DETAIL_ON 
+            LogFile.WriteHeapInfo(zw);
+        #endif
 
 
         if (!FlowControll[i]->doFlow(time)){
         if (!FlowControll[i]->doFlow(time)){
             repeat++;
             repeat++;
@@ -551,12 +562,6 @@ bool ClassFlowControll::ReadParameter(FILE* pfile, string& aktparamgraph)
             }        
             }        
         }
         }
     }
     }
-
-    /* Start the MQTT service */
-    for (int i = 0; i < FlowControll.size(); ++i)
-        if (FlowControll[i]->name().compare("ClassFlowMQTT") == 0)
-            return ((ClassFlowMQTT*) (FlowControll[i]))->Start(AutoIntervall);
-
     return true;
     return true;
 }
 }
 
 

+ 1 - 0
code/components/jomjol_flowcontroll/ClassFlowControll.h

@@ -71,6 +71,7 @@ public:
 
 
 	t_CNNType GetTypeDigital();
 	t_CNNType GetTypeDigital();
 	t_CNNType GetTypeAnalog();
 	t_CNNType GetTypeAnalog();
+	bool StartMQTTService();
 
 
 	int CleanTempFolder();
 	int CleanTempFolder();
 
 

+ 2 - 3
code/components/jomjol_flowcontroll/ClassFlowMQTT.cpp

@@ -203,9 +203,8 @@ bool ClassFlowMQTT::Start(float AutoIntervall) {
             keepAlive, SetRetainFlag, (void *)&GotConnected);
             keepAlive, SetRetainFlag, (void *)&GotConnected);
 
 
     if (!MQTT_Init()) {
     if (!MQTT_Init()) {
-        if (!MQTT_Init()) { // Retry
-            return false;
-        }
+        LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Init at startup failed! Retry with next publish call");
+        return false;
     }
     }
 
 
     return true;
     return true;

+ 28 - 20
code/components/jomjol_mqtt/interface_mqtt.cpp

@@ -17,10 +17,11 @@ std::map<std::string, std::function<bool(std::string, char*, int)>>* subscribeFu
 
 
 
 
 int failedOnRound = -1;
 int failedOnRound = -1;
-
+ 
 esp_mqtt_event_id_t esp_mmqtt_ID = MQTT_EVENT_ANY;
 esp_mqtt_event_id_t esp_mmqtt_ID = MQTT_EVENT_ANY;
 // ESP_EVENT_ANY_ID
 // ESP_EVENT_ANY_ID
 
 
+bool mqtt_initialized = false;
 bool mqtt_connected = false;
 bool mqtt_connected = false;
 esp_mqtt_client_handle_t client = NULL;
 esp_mqtt_client_handle_t client = NULL;
 std::string uri, client_id, lwt_topic, lwt_connected, lwt_disconnected, user, password, maintopic;
 std::string uri, client_id, lwt_topic, lwt_connected, lwt_disconnected, user, password, maintopic;
@@ -36,29 +37,27 @@ bool MQTTPublish(std::string _key, std::string _content, int retained_flag) {
         return true; // Fail quietly
         return true; // Fail quietly
     }
     }
 
 
+
     #ifdef DEBUG_DETAIL_ON  
     #ifdef DEBUG_DETAIL_ON  
         LogFile.WriteHeapInfo("MQTT Publish");
         LogFile.WriteHeapInfo("MQTT Publish");
     #endif
     #endif
 
 
-    if (!mqtt_connected) {
-        LogFile.WriteToFile(ESP_LOG_WARN, TAG, "Not connected, trying to re-connect...");
+    if (!mqtt_initialized) {
         if (!MQTT_Init()) {
         if (!MQTT_Init()) {
-            if (!MQTT_Init()) { // Retry
-                LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Failed to init, skipping all MQTT publishings in this round!");
-                failedOnRound = getCountFlowRounds();
-                return false;
-            }
+            LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Init failed, skipping all MQTT publishings in this round!");
+            failedOnRound = getCountFlowRounds();
+            return false;
         }
         }
-    }    
+    }
 
 
     msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
     msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
     if (msg_id < 0) {
     if (msg_id < 0) {
         LogFile.WriteToFile(ESP_LOG_WARN, TAG, "Failed to publish topic '" + _key + "', re-trying...");
         LogFile.WriteToFile(ESP_LOG_WARN, TAG, "Failed to publish topic '" + _key + "', re-trying...");
-
+        esp_mqtt_client_reconnect(client);
+        
         msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
         msg_id = esp_mqtt_client_publish(client, _key.c_str(), _content.c_str(), 0, 1, retained_flag);
         if (msg_id < 0) {
         if (msg_id < 0) {
             LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Failed to publish topic '" + _key + "', skipping all MQTT publishings in this round!");
             LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Failed to publish topic '" + _key + "', skipping all MQTT publishings in this round!");
-            mqtt_connected = false; // Force re-init on next call
             failedOnRound = getCountFlowRounds();
             failedOnRound = getCountFlowRounds();
             return false;
             return false;
         }
         }
@@ -91,9 +90,7 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
             break;
             break;
         case MQTT_EVENT_DISCONNECTED:
         case MQTT_EVENT_DISCONNECTED:
             ESP_LOGD(TAG, "MQTT_EVENT_DISCONNECTED");
             ESP_LOGD(TAG, "MQTT_EVENT_DISCONNECTED");
-            LogFile.WriteToFile(ESP_LOG_WARN, TAG, "Disconnected! Going to re-connect...");
-            mqtt_connected = false; // Force re-init on next call
-            esp_mqtt_client_reconnect(client);
+            mqtt_connected = false;
             break;
             break;
         case MQTT_EVENT_SUBSCRIBED:
         case MQTT_EVENT_SUBSCRIBED:
             ESP_LOGD(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
             ESP_LOGD(TAG, "MQTT_EVENT_SUBSCRIBED, msg_id=%d", event->msg_id);
@@ -122,7 +119,8 @@ static esp_err_t mqtt_event_handler_cb(esp_mqtt_event_handle_t event)
             break;
             break;
         case MQTT_EVENT_ERROR:
         case MQTT_EVENT_ERROR:
             ESP_LOGD(TAG, "MQTT_EVENT_ERROR");
             ESP_LOGD(TAG, "MQTT_EVENT_ERROR");
-            mqtt_connected = false; // Force re-init on next call
+            mqtt_initialized = false; // Force re-init on next publish call
+            mqtt_connected = false;
             break;
             break;
         default:
         default:
             ESP_LOGD(TAG, "Other event id:%d", event->event_id);
             ESP_LOGD(TAG, "Other event id:%d", event->event_id);
@@ -179,7 +177,9 @@ bool MQTT_Init() {
         .lwt_msg = lw.c_str(),
         .lwt_msg = lw.c_str(),
         .lwt_retain = 1,
         .lwt_retain = 1,
         .lwt_msg_len = (int)(lw.length()),
         .lwt_msg_len = (int)(lw.length()),
-        .keepalive = keepalive
+        .keepalive = keepalive,
+        .disable_auto_reconnect = false,        // Reconnection routine active
+        .reconnect_timeout_ms = 10000           // Try to reconnect to broker every 10s
     };
     };
 
 
     if (user.length() && password.length()){
     if (user.length() && password.length()){
@@ -190,6 +190,7 @@ bool MQTT_Init() {
     #ifdef DEBUG_DETAIL_ON  
     #ifdef DEBUG_DETAIL_ON  
         LogFile.WriteHeapInfo("MQTT Client Init");
         LogFile.WriteHeapInfo("MQTT Client Init");
     #endif
     #endif
+
     client = esp_mqtt_client_init(&mqtt_cfg);
     client = esp_mqtt_client_init(&mqtt_cfg);
     if (client)
     if (client)
     {
     {
@@ -197,6 +198,7 @@ bool MQTT_Init() {
         if (ret != ESP_OK)
         if (ret != ESP_OK)
         {
         {
             LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Could not register event (ret=" + std::to_string(ret) + ")!");
             LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Could not register event (ret=" + std::to_string(ret) + ")!");
+            mqtt_initialized = false;
             return false;
             return false;
         }
         }
 
 
@@ -211,25 +213,31 @@ bool MQTT_Init() {
             if (ret != ESP_OK)
             if (ret != ESP_OK)
             {
             {
                 LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Could not start client (ret=" + std::to_string(ret) + ")!");
                 LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Could not start client (ret=" + std::to_string(ret) + ")!");
+                mqtt_initialized = false;
                 return false;
                 return false;
             }
             }
         }
         }
     }
     }
     else
     else
     {
     {
-        LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Could not init client!");
+        LogFile.WriteToFile(ESP_LOG_ERROR, TAG, "Init failed, no handle created!");
+        mqtt_initialized = false;
         return false;
         return false;
     }
     }
 
 
-    LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Init successful");
+    LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Client started, waiting for established connection...");
+    mqtt_initialized = true;
     return true;
     return true;
 }
 }
 
 
 
 
 void MQTTdestroy_client() {
 void MQTTdestroy_client() {
-    if (client != NULL) {
+    if (client) {
         esp_mqtt_client_stop(client);
         esp_mqtt_client_stop(client);
         esp_mqtt_client_destroy(client);
         esp_mqtt_client_destroy(client);
+        client = NULL;
+        mqtt_initialized = false;
+        mqtt_connected = false;
     }
     }
 }
 }
 
 
@@ -283,7 +291,7 @@ void MQTTregisterSubscribeFunction(std::string topic, std::function<bool(std::st
 
 
 void MQTTconnected(){
 void MQTTconnected(){
     if (mqtt_connected) {
     if (mqtt_connected) {
-        LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Connected");
+        LogFile.WriteToFile(ESP_LOG_INFO, TAG, "Connected to broker");
 
 
         MQTTPublish(lwt_topic, lwt_connected, true);
         MQTTPublish(lwt_topic, lwt_connected, true);
 
 

+ 2 - 0
code/components/jomjol_tfliteclass/server_tflite.cpp

@@ -103,6 +103,8 @@ void doInit(void)
 #ifdef DEBUG_DETAIL_ON      
 #ifdef DEBUG_DETAIL_ON      
     ESP_LOGD(TAG, "Finished tfliteflow.InitFlow(config);");
     ESP_LOGD(TAG, "Finished tfliteflow.InitFlow(config);");
 #endif
 #endif
+    
+    tfliteflow.StartMQTTService();
 }
 }