ball_tracking_manager.py 9.6 KB


  1. """
  2. Ball Tracking LED Manager
  3. Tracks the ball bearing's position and updates LEDs in real-time to follow its movement.
  4. """
  5. import asyncio
  6. import time
  7. import logging
  8. import threading
  9. from collections import deque
  10. from typing import Optional, Tuple, Dict
  11. from .dw_led_controller import DWLEDController
  12. logger = logging.getLogger(__name__)
  13. class BallTrackingManager:
  14. """Manages real-time LED tracking of ball bearing position"""
  15. def __init__(self, led_controller: DWLEDController, num_leds: int, config: Dict):
  16. """
  17. Initialize ball tracking manager
  18. Args:
  19. led_controller: DWLEDController instance (kept for compatibility, not used for rendering)
  20. num_leds: Number of LEDs in the strip
  21. config: Configuration dict with keys:
  22. - led_offset: LED index offset (0 to num_leds-1)
  23. - reversed: Reverse LED direction (bool)
  24. - spread: Number of adjacent LEDs to light (1-10)
  25. - lookback: Number of coordinates to look back (0-15)
  26. - brightness: LED brightness 0-100
  27. - color: Hex color string (e.g., "#ffffff")
  28. - trail_enabled: Enable fade trail (bool)
  29. - trail_length: Trail length in LEDs (1-20)
  30. """
  31. self.led_controller = led_controller # Kept for backward compatibility
  32. self.num_leds = num_leds
  33. self.config = config
  34. # Position storage (buffer only if lookback > 0)
  35. lookback = config.get("lookback", 0)
  36. if lookback > 0:
  37. # Use buffer for lookback delay
  38. self.position_buffer = deque(maxlen=min(15, lookback + 5))
  39. self._use_buffer = True
  40. logger.info(f"Using position buffer (size={lookback + 5}) for lookback={lookback}")
  41. else:
  42. # No lookback, just store current position
  43. self.position_buffer = None
  44. self._current_position = None # (theta, rho, timestamp)
  45. self._use_buffer = False
  46. logger.info("Direct tracking (no lookback buffer)")
  47. # Tracking state
  48. self._active = False
  49. self._update_task = None
  50. self._last_led_index = None
  51. self._lock = threading.Lock() # Thread safety for LED index updates
  52. self._update_count = 0 # Counter for debug logging
  53. self._skipped_updates = 0 # Track how many updates were skipped
  54. logger.info(f"BallTrackingManager initialized with {num_leds} LEDs")
  55. def start(self):
  56. """Start ball tracking"""
  57. if self._active:
  58. logger.warning("Ball tracking already active")
  59. return
  60. self._active = True
  61. logger.info("Ball tracking started")
  62. def stop(self):
  63. """Stop ball tracking"""
  64. if not self._active:
  65. return
  66. self._active = False
  67. if self._use_buffer and self.position_buffer:
  68. self.position_buffer.clear()
  69. else:
  70. self._current_position = None
  71. self._last_led_index = None
  72. logger.info("Ball tracking stopped")
  73. def update_position(self, theta: float, rho: float):
  74. """
  75. Update ball position (called from pattern execution)
  76. Args:
  77. theta: Angular position in degrees (0-360)
  78. rho: Radial distance (0.0-1.0)
  79. """
  80. if not self._active:
  81. return
  82. # Store position
  83. timestamp = time.time()
  84. if self._use_buffer:
  85. self.position_buffer.append((theta, rho, timestamp))
  86. else:
  87. self._current_position = (theta, rho, timestamp)
  88. self._update_count += 1
  89. # Debug logging (every 100th update)
  90. if self._update_count % 100 == 0:
  91. buffer_info = f"buffer_size={len(self.position_buffer)}" if self._use_buffer else "direct"
  92. logger.info(f"Position update #{self._update_count}: theta={theta:.1f}°, rho={rho:.2f}, {buffer_info}, skipped={self._skipped_updates}")
  93. # Trigger LED update (with optimization)
  94. self._update_leds_optimized(theta, rho)
  95. def _update_leds_optimized(self, current_theta: float, current_rho: float):
  96. """
  97. Optimized LED update - only recalculates if LED zone changed
  98. Args:
  99. current_theta: Most recent theta value
  100. current_rho: Most recent rho value
  101. """
  102. if not self._active:
  103. return
  104. # If using lookback buffer, get the delayed position
  105. if self._use_buffer:
  106. position = self._get_tracked_position()
  107. if position is None:
  108. return
  109. theta, rho, _ = position
  110. else:
  111. # Direct tracking - use current position
  112. theta = current_theta
  113. rho = current_rho
  114. # Calculate new LED index
  115. new_led_index = self._theta_to_led(theta)
  116. # OPTIMIZATION: Only update if LED index actually changed
  117. with self._lock:
  118. if new_led_index == self._last_led_index:
  119. # LED zone hasn't changed, skip update
  120. self._skipped_updates += 1
  121. return
  122. # LED zone changed, update it
  123. self._last_led_index = new_led_index
  124. def _update_leds(self):
  125. """Update LED tracking state (rendering is done by effect loop)"""
  126. if not self._active:
  127. return
  128. # Get position to track (with lookback)
  129. position = self._get_tracked_position()
  130. if position is None:
  131. return
  132. theta, rho, _ = position
  133. # Calculate LED index
  134. led_index = self._theta_to_led(theta)
  135. # Debug logging (every 50th update)
  136. if self._update_count % 50 == 0:
  137. lookback = self.config.get("lookback", 0)
  138. logger.info(f"LED update #{self._update_count}: lookback={lookback}, tracked_theta={theta:.1f}°, led_index={led_index}")
  139. # Store the LED index (effect will read this) - thread-safe update
  140. with self._lock:
  141. self._last_led_index = led_index
  142. def _get_tracked_position(self) -> Optional[Tuple[float, float, float]]:
  143. """Get position to track (accounting for lookback delay)"""
  144. if not self._use_buffer:
  145. # Direct mode - return current position
  146. return self._current_position
  147. # Buffer mode - apply lookback
  148. if not self.position_buffer or len(self.position_buffer) == 0:
  149. return None
  150. lookback = self.config.get("lookback", 0)
  151. # Clamp lookback to buffer size
  152. lookback = min(lookback, len(self.position_buffer) - 1)
  153. lookback = max(0, lookback)
  154. # Get position from buffer
  155. # Index -1 = most recent, -2 = one back, etc.
  156. index = -(lookback + 1)
  157. return self.position_buffer[index]
  158. def _theta_to_led(self, theta: float) -> int:
  159. """
  160. Convert theta angle to LED index
  161. Args:
  162. theta: Angle in degrees (0-360)
  163. Returns:
  164. LED index (0 to num_leds-1)
  165. """
  166. # Normalize theta to 0-360
  167. theta = theta % 360
  168. if theta < 0:
  169. theta += 360
  170. # Calculate LED index (0° = LED 0 before offset)
  171. led_index = int((theta / 360.0) * self.num_leds)
  172. original_index = led_index
  173. # Apply user-defined offset
  174. offset = self.config.get("led_offset", 0)
  175. led_index = (led_index + offset) % self.num_leds
  176. # Reverse direction if needed
  177. is_reversed = self.config.get("reversed", False)
  178. if is_reversed:
  179. led_index_before_reverse = led_index
  180. led_index = (self.num_leds - led_index) % self.num_leds
  181. logger.debug(f"Theta={theta:.1f}° -> LED {original_index} + offset {offset} = {led_index_before_reverse} -> REVERSED to {led_index}")
  182. else:
  183. logger.debug(f"Theta={theta:.1f}° -> LED {original_index} + offset {offset} = {led_index}")
  184. return led_index
  185. def get_tracking_data(self) -> Optional[Dict]:
  186. """
  187. Get current tracking data for effect rendering
  188. Returns:
  189. Dictionary with led_index, spread, brightness, color
  190. or None if no tracking data available
  191. """
  192. # Thread-safe read of LED index
  193. with self._lock:
  194. led_index = self._last_led_index
  195. if led_index is None:
  196. return None
  197. # Get configuration
  198. spread = self.config.get("spread", 3)
  199. brightness = self.config.get("brightness", 50) / 100.0
  200. color_hex = self.config.get("color", "#ffffff")
  201. # Convert hex color to RGB tuple
  202. color_hex = color_hex.lstrip('#')
  203. r = int(color_hex[0:2], 16)
  204. g = int(color_hex[2:4], 16)
  205. b = int(color_hex[4:6], 16)
  206. return {
  207. 'led_index': led_index,
  208. 'spread': spread,
  209. 'brightness': brightness,
  210. 'color': (r, g, b)
  211. }
  212. def update_config(self, config: Dict):
  213. """Update configuration at runtime"""
  214. self.config.update(config)
  215. logger.info(f"Ball tracking config updated: {config}")
  216. logger.info(f"Current reversed setting: {self.config.get('reversed', False)}")
  217. def get_status(self) -> Dict:
  218. """Get current tracking status"""
  219. return {
  220. "active": self._active,
  221. "buffer_size": len(self.position_buffer),
  222. "last_led_index": self._last_led_index,
  223. "config": self.config
  224. }