mqtt_outbox.c 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305
  1. /* This is a modification of https://github.com/espressif/esp-mqtt/blob/master/lib/mqtt_outbox.c
  2. * to use the PSRAM instead of the internal heap.
  3. */
  4. #include "mqtt_outbox.h"
  5. #include <stdlib.h>
  6. #include <string.h>
  7. #include "sys/queue.h"
  8. #include "esp_log.h"
  9. #include "esp_heap_caps.h"
  10. /* Enable this to use the PSRAM for MQTT Publishing.
  11. * This saves 10 kBytes of RAM, see https://github.com/jomjol/AI-on-the-edge-device/pull/2113
  12. * However we can run into PSRAM fragmentation issues, leading to insufficient large blocks to load the model.
  13. * See https://github.com/jomjol/AI-on-the-edge-device/issues/2200 */
  14. #define USE_PSRAM
  15. #ifdef CONFIG_MQTT_CUSTOM_OUTBOX
  16. static const char *TAG = "outbox";
  17. typedef struct outbox_item {
  18. char *buffer;
  19. int len;
  20. int msg_id;
  21. int msg_type;
  22. int msg_qos;
  23. outbox_tick_t tick;
  24. pending_state_t pending;
  25. STAILQ_ENTRY(outbox_item) next;
  26. } outbox_item_t;
  27. STAILQ_HEAD(outbox_list_t, outbox_item);
  28. outbox_handle_t outbox_init(void)
  29. {
  30. #ifdef USE_PSRAM
  31. outbox_handle_t outbox = heap_caps_calloc(1, sizeof(struct outbox_list_t), MALLOC_CAP_8BIT | MALLOC_CAP_SPIRAM);
  32. #else
  33. outbox_handle_t outbox = calloc(1, sizeof(struct outbox_list_t));
  34. #endif
  35. //ESP_MEM_CHECK(TAG, outbox, return NULL);
  36. STAILQ_INIT(outbox);
  37. return outbox;
  38. }
  39. outbox_item_handle_t outbox_enqueue(outbox_handle_t outbox, outbox_message_handle_t message, outbox_tick_t tick)
  40. {
  41. #ifdef USE_PSRAM
  42. outbox_item_handle_t item = heap_caps_calloc(1, sizeof(outbox_item_t), MALLOC_CAP_8BIT | MALLOC_CAP_SPIRAM);
  43. #else
  44. outbox_item_handle_t item = calloc(1, sizeof(outbox_item_t));
  45. #endif
  46. //ESP_MEM_CHECK(TAG, item, return NULL);
  47. item->msg_id = message->msg_id;
  48. item->msg_type = message->msg_type;
  49. item->msg_qos = message->msg_qos;
  50. item->tick = tick;
  51. item->len = message->len + message->remaining_len;
  52. item->pending = QUEUED;
  53. #ifdef USE_PSRAM
  54. item->buffer = heap_caps_malloc(message->len + message->remaining_len, MALLOC_CAP_8BIT | MALLOC_CAP_SPIRAM);
  55. #else
  56. item->buffer = malloc(message->len + message->remaining_len);
  57. #endif
  58. /*ESP_MEM_CHECK(TAG, item->buffer, {
  59. free(item);
  60. return NULL;
  61. });*/
  62. memcpy(item->buffer, message->data, message->len);
  63. if (message->remaining_data) {
  64. memcpy(item->buffer + message->len, message->remaining_data, message->remaining_len);
  65. }
  66. STAILQ_INSERT_TAIL(outbox, item, next);
  67. ESP_LOGD(TAG, "ENQUEUE msgid=%d, msg_type=%d, len=%d, size=%d", message->msg_id, message->msg_type, message->len + message->remaining_len, outbox_get_size(outbox));
  68. return item;
  69. }
  70. outbox_item_handle_t outbox_get(outbox_handle_t outbox, int msg_id)
  71. {
  72. outbox_item_handle_t item;
  73. STAILQ_FOREACH(item, outbox, next) {
  74. if (item->msg_id == msg_id) {
  75. return item;
  76. }
  77. }
  78. return NULL;
  79. }
  80. outbox_item_handle_t outbox_dequeue(outbox_handle_t outbox, pending_state_t pending, outbox_tick_t *tick)
  81. {
  82. outbox_item_handle_t item;
  83. STAILQ_FOREACH(item, outbox, next) {
  84. if (item->pending == pending) {
  85. if (tick) {
  86. *tick = item->tick;
  87. }
  88. return item;
  89. }
  90. }
  91. return NULL;
  92. }
  93. esp_err_t outbox_delete_item(outbox_handle_t outbox, outbox_item_handle_t item_to_delete)
  94. {
  95. outbox_item_handle_t item;
  96. STAILQ_FOREACH(item, outbox, next) {
  97. if (item == item_to_delete) {
  98. STAILQ_REMOVE(outbox, item, outbox_item, next);
  99. #ifdef USE_PSRAM
  100. heap_caps_free(item->buffer);
  101. heap_caps_free(item);
  102. #else
  103. free(item->buffer);
  104. free(item);
  105. #endif
  106. return ESP_OK;
  107. }
  108. }
  109. return ESP_FAIL;
  110. }
  111. uint8_t *outbox_item_get_data(outbox_item_handle_t item, size_t *len, uint16_t *msg_id, int *msg_type, int *qos)
  112. {
  113. if (item) {
  114. *len = item->len;
  115. *msg_id = item->msg_id;
  116. *msg_type = item->msg_type;
  117. *qos = item->msg_qos;
  118. return (uint8_t *)item->buffer;
  119. }
  120. return NULL;
  121. }
  122. esp_err_t outbox_delete(outbox_handle_t outbox, int msg_id, int msg_type)
  123. {
  124. outbox_item_handle_t item, tmp;
  125. STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
  126. if (item->msg_id == msg_id && (0xFF & (item->msg_type)) == msg_type) {
  127. STAILQ_REMOVE(outbox, item, outbox_item, next);
  128. #ifdef USE_PSRAM
  129. heap_caps_free(item->buffer);
  130. heap_caps_free(item);
  131. #else
  132. free(item->buffer);
  133. free(item);
  134. #endif
  135. ESP_LOGD(TAG, "DELETED msgid=%d, msg_type=%d, remain size=%d", msg_id, msg_type, outbox_get_size(outbox));
  136. return ESP_OK;
  137. }
  138. }
  139. return ESP_FAIL;
  140. }
  141. esp_err_t outbox_delete_msgid(outbox_handle_t outbox, int msg_id)
  142. {
  143. outbox_item_handle_t item, tmp;
  144. STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
  145. if (item->msg_id == msg_id) {
  146. STAILQ_REMOVE(outbox, item, outbox_item, next);
  147. #ifdef USE_PSRAM
  148. heap_caps_free(item->buffer);
  149. heap_caps_free(item);
  150. #else
  151. free(item->buffer);
  152. free(item);
  153. #endif
  154. }
  155. }
  156. return ESP_OK;
  157. }
  158. esp_err_t outbox_set_pending(outbox_handle_t outbox, int msg_id, pending_state_t pending)
  159. {
  160. outbox_item_handle_t item = outbox_get(outbox, msg_id);
  161. if (item) {
  162. item->pending = pending;
  163. return ESP_OK;
  164. }
  165. return ESP_FAIL;
  166. }
  167. pending_state_t outbox_item_get_pending(outbox_item_handle_t item)
  168. {
  169. if (item) {
  170. return item->pending;
  171. }
  172. return QUEUED;
  173. }
  174. esp_err_t outbox_set_tick(outbox_handle_t outbox, int msg_id, outbox_tick_t tick)
  175. {
  176. outbox_item_handle_t item = outbox_get(outbox, msg_id);
  177. if (item) {
  178. item->tick = tick;
  179. return ESP_OK;
  180. }
  181. return ESP_FAIL;
  182. }
  183. esp_err_t outbox_delete_msgtype(outbox_handle_t outbox, int msg_type)
  184. {
  185. outbox_item_handle_t item, tmp;
  186. STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
  187. if (item->msg_type == msg_type) {
  188. STAILQ_REMOVE(outbox, item, outbox_item, next);
  189. #ifdef USE_PSRAM
  190. heap_caps_free(item->buffer);
  191. heap_caps_free(item);
  192. #else
  193. free(item->buffer);
  194. free(item);
  195. #endif
  196. }
  197. }
  198. return ESP_OK;
  199. }
  200. int outbox_delete_single_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
  201. {
  202. int msg_id = -1;
  203. outbox_item_handle_t item;
  204. STAILQ_FOREACH(item, outbox, next) {
  205. if (current_tick - item->tick > timeout) {
  206. STAILQ_REMOVE(outbox, item, outbox_item, next);
  207. #ifdef USE_PSRAM
  208. heap_caps_free(item->buffer);
  209. #else
  210. free(item->buffer);
  211. #endif
  212. msg_id = item->msg_id;
  213. #ifdef USE_PSRAM
  214. heap_caps_free(item);
  215. #else
  216. free(item);
  217. #endif
  218. return msg_id;
  219. }
  220. }
  221. return msg_id;
  222. }
  223. int outbox_delete_expired(outbox_handle_t outbox, outbox_tick_t current_tick, outbox_tick_t timeout)
  224. {
  225. int deleted_items = 0;
  226. outbox_item_handle_t item, tmp;
  227. STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
  228. if (current_tick - item->tick > timeout) {
  229. STAILQ_REMOVE(outbox, item, outbox_item, next);
  230. #ifdef USE_PSRAM
  231. heap_caps_free(item->buffer);
  232. heap_caps_free(item);
  233. #else
  234. free(item->buffer);
  235. free(item);
  236. #endif
  237. deleted_items ++;
  238. }
  239. }
  240. return deleted_items;
  241. }
  242. int outbox_get_size(outbox_handle_t outbox)
  243. {
  244. int siz = 0;
  245. outbox_item_handle_t item;
  246. STAILQ_FOREACH(item, outbox, next) {
  247. // Suppressing "use after free" warning as this could happen only if queue is in inconsistent state
  248. // which never happens if STAILQ interface used
  249. siz += item->len; // NOLINT(clang-analyzer-unix.Malloc)
  250. }
  251. return siz;
  252. }
  253. void outbox_delete_all_items(outbox_handle_t outbox)
  254. {
  255. outbox_item_handle_t item, tmp;
  256. STAILQ_FOREACH_SAFE(item, outbox, next, tmp) {
  257. STAILQ_REMOVE(outbox, item, outbox_item, next);
  258. #ifdef USE_PSRAM
  259. heap_caps_free(item->buffer);
  260. heap_caps_free(item);
  261. #else
  262. free(item->buffer);
  263. free(item);
  264. #endif
  265. }
  266. }
  267. void outbox_destroy(outbox_handle_t outbox)
  268. {
  269. outbox_delete_all_items(outbox);
  270. #ifdef USE_PSRAM
  271. heap_caps_free(outbox);
  272. #else
  273. free(outbox);
  274. #endif
  275. }
  276. #endif /* CONFIG_MQTT_CUSTOM_OUTBOX */