|
|
@@ -37,8 +37,19 @@ class BallTrackingManager:
|
|
|
self.num_leds = num_leds
|
|
|
self.config = config
|
|
|
|
|
|
- # Coordinate history buffer (max 15 coordinates)
|
|
|
- self.position_buffer = deque(maxlen=15)
|
|
|
+ # Position storage (buffer only if lookback > 0)
|
|
|
+ lookback = config.get("lookback", 0)
|
|
|
+ if lookback > 0:
|
|
|
+ # Use buffer for lookback delay
|
|
|
+ self.position_buffer = deque(maxlen=min(15, lookback + 5))
|
|
|
+ self._use_buffer = True
|
|
|
+ logger.info(f"Using position buffer (size={lookback + 5}) for lookback={lookback}")
|
|
|
+ else:
|
|
|
+ # No lookback, just store current position
|
|
|
+ self.position_buffer = None
|
|
|
+ self._current_position = None # (theta, rho, timestamp)
|
|
|
+ self._use_buffer = False
|
|
|
+ logger.info("Direct tracking (no lookback buffer)")
|
|
|
|
|
|
# Tracking state
|
|
|
self._active = False
|
|
|
@@ -46,6 +57,7 @@ class BallTrackingManager:
|
|
|
self._last_led_index = None
|
|
|
self._lock = threading.Lock() # Thread safety for LED index updates
|
|
|
self._update_count = 0 # Counter for debug logging
|
|
|
+ self._skipped_updates = 0 # Track how many updates were skipped
|
|
|
|
|
|
logger.info(f"BallTrackingManager initialized with {num_leds} LEDs")
|
|
|
|
|
|
@@ -64,7 +76,10 @@ class BallTrackingManager:
|
|
|
return
|
|
|
|
|
|
self._active = False
|
|
|
- self.position_buffer.clear()
|
|
|
+ if self._use_buffer and self.position_buffer:
|
|
|
+ self.position_buffer.clear()
|
|
|
+ else:
|
|
|
+ self._current_position = None
|
|
|
self._last_led_index = None
|
|
|
|
|
|
logger.info("Ball tracking stopped")
|
|
|
@@ -80,16 +95,57 @@ class BallTrackingManager:
|
|
|
if not self._active:
|
|
|
return
|
|
|
|
|
|
- # Add to buffer
|
|
|
- self.position_buffer.append((theta, rho, time.time()))
|
|
|
+ # Store position
|
|
|
+ timestamp = time.time()
|
|
|
+ if self._use_buffer:
|
|
|
+ self.position_buffer.append((theta, rho, timestamp))
|
|
|
+ else:
|
|
|
+ self._current_position = (theta, rho, timestamp)
|
|
|
+
|
|
|
self._update_count += 1
|
|
|
|
|
|
- # Debug logging (every 50th update)
|
|
|
- if self._update_count % 50 == 0:
|
|
|
- logger.info(f"Position update #{self._update_count}: theta={theta:.1f}°, rho={rho:.2f}, buffer_size={len(self.position_buffer)}")
|
|
|
+ # Debug logging (every 100th update)
|
|
|
+ if self._update_count % 100 == 0:
|
|
|
+ buffer_info = f"buffer_size={len(self.position_buffer)}" if self._use_buffer else "direct"
|
|
|
+ logger.info(f"Position update #{self._update_count}: theta={theta:.1f}°, rho={rho:.2f}, {buffer_info}, skipped={self._skipped_updates}")
|
|
|
+
|
|
|
+ # Trigger LED update (with optimization)
|
|
|
+ self._update_leds_optimized(theta, rho)
|
|
|
+
|
|
|
+ def _update_leds_optimized(self, current_theta: float, current_rho: float):
|
|
|
+ """
|
|
|
+ Optimized LED update - only recalculates if LED zone changed
|
|
|
+
|
|
|
+ Args:
|
|
|
+ current_theta: Most recent theta value
|
|
|
+ current_rho: Most recent rho value
|
|
|
+ """
|
|
|
+ if not self._active:
|
|
|
+ return
|
|
|
|
|
|
- # Trigger LED update
|
|
|
- self._update_leds()
|
|
|
+ # If using lookback buffer, get the delayed position
|
|
|
+ if self._use_buffer:
|
|
|
+ position = self._get_tracked_position()
|
|
|
+ if position is None:
|
|
|
+ return
|
|
|
+ theta, rho, _ = position
|
|
|
+ else:
|
|
|
+ # Direct tracking - use current position
|
|
|
+ theta = current_theta
|
|
|
+ rho = current_rho
|
|
|
+
|
|
|
+ # Calculate new LED index
|
|
|
+ new_led_index = self._theta_to_led(theta)
|
|
|
+
|
|
|
+ # OPTIMIZATION: Only update if LED index actually changed
|
|
|
+ with self._lock:
|
|
|
+ if new_led_index == self._last_led_index:
|
|
|
+ # LED zone hasn't changed, skip update
|
|
|
+ self._skipped_updates += 1
|
|
|
+ return
|
|
|
+
|
|
|
+ # LED zone changed, update it
|
|
|
+ self._last_led_index = new_led_index
|
|
|
|
|
|
def _update_leds(self):
|
|
|
"""Update LED tracking state (rendering is done by effect loop)"""
|
|
|
@@ -117,11 +173,16 @@ class BallTrackingManager:
|
|
|
|
|
|
def _get_tracked_position(self) -> Optional[Tuple[float, float, float]]:
|
|
|
"""Get position to track (accounting for lookback delay)"""
|
|
|
- lookback = self.config.get("lookback", 0)
|
|
|
+ if not self._use_buffer:
|
|
|
+ # Direct mode - return current position
|
|
|
+ return self._current_position
|
|
|
|
|
|
- if len(self.position_buffer) == 0:
|
|
|
+ # Buffer mode - apply lookback
|
|
|
+ if not self.position_buffer or len(self.position_buffer) == 0:
|
|
|
return None
|
|
|
|
|
|
+ lookback = self.config.get("lookback", 0)
|
|
|
+
|
|
|
# Clamp lookback to buffer size
|
|
|
lookback = min(lookback, len(self.position_buffer) - 1)
|
|
|
lookback = max(0, lookback)
|