Parcourir la source

Drop data only for clients that can't keep up

Oxan van Leeuwen il y a 3 ans
Parent
commit
e4ce3cacfc
1 fichiers modifiés avec 19 ajouts et 11 suppressions
  1. 19 11
      components/stream_server/stream_server.cpp

+ 19 - 11
components/stream_server/stream_server.cpp

@@ -101,23 +101,31 @@ void StreamServerComponent::cleanup() {
 }
 
 void StreamServerComponent::read() {
-    bool first_iteration = true;
+    size_t len = 0;
     int available;
     while ((available = this->stream_->available()) > 0) {
-        // Write until the tail is encountered, or wraparound of the ring buffer if that happens before.
-        size_t max = std::min(this->buf_ahead(this->buf_head_), this->buf_tail_ + this->buf_size_ - this->buf_head_);
-        if (max == 0) {
-            // Only warn on the first iteration, the finite buffer size is also used as a throttling mechanism to avoid
-            // blocking here for too long when a large amount of data comes in.
-            if (first_iteration)
-                ESP_LOGW(TAG, "Incoming bytes available in stream, but outgoing buffer is full!");
-            break;
+        size_t free = this->buf_size_ - (this->buf_head_ - this->buf_tail_);
+        if (free == 0) {
+            // Only overwrite if nothing has been added yet, otherwise give flush() a chance to empty the buffer first.
+            if (len > 0)
+                return;
+
+            ESP_LOGE(TAG, "Incoming bytes available, but outgoing buffer is full: stream will be corrupted!");
+            free = std::min<size_t>(available, this->buf_size_);
+            this->buf_tail_ += free;
+            for (Client &client : this->clients_) {
+                if (client.position < this->buf_tail_) {
+                    ESP_LOGW(TAG, "Dropped %u pending bytes for client %s", this->buf_tail_ - client.position, client.identifier.c_str());
+                    client.position = this->buf_tail_;
+                }
+            }
+
         }
 
-        size_t len = std::min<size_t>(available, max);
+        // Fill all available contiguous space in the ring buffer.
+        len = std::min<size_t>(available, std::min<size_t>(this->buf_ahead(this->buf_head_), free));
         this->stream_->read_array(&this->buf_[this->buf_index(this->buf_head_)], len);
         this->buf_head_ += len;
-        first_iteration = false;
     }
 }