ball_tracking_manager.py 13 KB


  1. """
  2. Ball Tracking LED Manager
  3. Tracks the ball bearing's position and updates LEDs in real-time to follow its movement.
  4. """
  5. import asyncio
  6. import time
  7. import logging
  8. import threading
  9. from collections import deque
  10. from typing import Optional, Tuple, Dict
  11. from .dw_led_controller import DWLEDController
  12. logger = logging.getLogger(__name__)
  13. class BallTrackingManager:
  14. """Manages real-time LED tracking of ball bearing position"""
  15. def __init__(self, led_controller: DWLEDController, num_leds: int, config: Dict):
  16. """
  17. Initialize ball tracking manager
  18. Args:
  19. led_controller: DWLEDController instance (kept for compatibility, not used for rendering)
  20. num_leds: Number of LEDs in the strip
  21. config: Configuration dict with keys:
  22. - led_offset: LED index offset (0 to num_leds-1)
  23. - reversed: Reverse LED direction (bool)
  24. - spread: Number of adjacent LEDs to light (1-10)
  25. - lookback: Number of coordinates to look back (0-15)
  26. - brightness: LED brightness 0-100
  27. - color: Hex color string (e.g., "#ffffff")
  28. - trail_enabled: Enable fade trail (bool)
  29. - trail_length: Trail length in LEDs (1-20)
  30. """
  31. self.led_controller = led_controller # Kept for backward compatibility
  32. self.num_leds = num_leds
  33. self.config = config
  34. # Position storage (buffer only if lookback > 0)
  35. lookback = config.get("lookback", 0)
  36. if lookback > 0:
  37. # Use buffer for lookback delay
  38. self.position_buffer = deque(maxlen=min(15, lookback + 5))
  39. self._use_buffer = True
  40. logger.info(f"Using position buffer (size={lookback + 5}) for lookback={lookback}")
  41. else:
  42. # No lookback, just store current position
  43. self.position_buffer = None
  44. self._current_position = None # (theta, rho, timestamp)
  45. self._use_buffer = False
  46. logger.info("Direct tracking (no lookback buffer)")
  47. # Tracking state
  48. self._active = False
  49. self._update_task = None
  50. self._last_led_index = None
  51. self._lock = threading.Lock() # Thread safety for LED index updates
  52. self._update_count = 0 # Counter for debug logging
  53. self._skipped_updates = 0 # Track how many updates were skipped
  54. # Polling timer for position updates
  55. self._poll_timer = None
  56. self._poll_interval = 0.5 # Check position every 0.5 seconds
  57. self._is_pattern_running = False # Flag to track if pattern is executing
  58. logger.info(f"BallTrackingManager initialized with {num_leds} LEDs")
  59. def start(self):
  60. """Start ball tracking"""
  61. if self._active:
  62. logger.warning("Ball tracking already active")
  63. return
  64. self._active = True
  65. logger.info("Ball tracking started")
  66. def stop(self):
  67. """Stop ball tracking"""
  68. if not self._active:
  69. return
  70. self._active = False
  71. self._stop_polling()
  72. if self._use_buffer and self.position_buffer:
  73. self.position_buffer.clear()
  74. else:
  75. self._current_position = None
  76. self._last_led_index = None
  77. logger.info("Ball tracking stopped")
  78. def update_position(self, theta: float, rho: float):
  79. """
  80. Update ball position (called from pattern execution)
  81. Args:
  82. theta: Angular position in degrees (0-360)
  83. rho: Radial distance (0.0-1.0)
  84. """
  85. if not self._active:
  86. return
  87. # Store position
  88. timestamp = time.time()
  89. if self._use_buffer:
  90. self.position_buffer.append((theta, rho, timestamp))
  91. else:
  92. self._current_position = (theta, rho, timestamp)
  93. self._update_count += 1
  94. # Debug logging (every 100th update)
  95. if self._update_count % 100 == 0:
  96. buffer_info = f"buffer_size={len(self.position_buffer)}" if self._use_buffer else "direct"
  97. logger.info(f"Position update #{self._update_count}: theta={theta:.1f}°, rho={rho:.2f}, {buffer_info}, skipped={self._skipped_updates}")
  98. # Trigger LED update (with optimization)
  99. self._update_leds_optimized(theta, rho)
  100. def set_pattern_running(self, is_running: bool):
  101. """
  102. Notify manager that pattern execution started/stopped
  103. Args:
  104. is_running: True if pattern is executing, False otherwise
  105. """
  106. logger.info(f"set_pattern_running called: is_running={is_running}, active={self._active}")
  107. self._is_pattern_running = is_running
  108. if is_running and self._active:
  109. # Pattern started, begin polling
  110. self._start_polling()
  111. logger.info(f"Pattern started - beginning position polling (interval={self._poll_interval}s)")
  112. else:
  113. # Pattern stopped, stop polling
  114. self._stop_polling()
  115. logger.info("Pattern stopped - stopping position polling")
  116. def _start_polling(self):
  117. """Start the position polling timer"""
  118. if self._poll_timer is not None:
  119. return # Already polling
  120. self._poll_position() # Do first poll immediately
  121. def _stop_polling(self):
  122. """Stop the position polling timer"""
  123. if self._poll_timer is not None:
  124. self._poll_timer.cancel()
  125. self._poll_timer = None
  126. def _poll_position(self):
  127. """Poll current position from state and update LEDs if needed"""
  128. if not self._active or not self._is_pattern_running:
  129. logger.debug(f"Polling stopped: active={self._active}, pattern_running={self._is_pattern_running}")
  130. self._poll_timer = None
  131. return
  132. try:
  133. # Import here to avoid circular dependency
  134. from modules.core.state import state
  135. # Get current position from global state
  136. theta = state.current_theta
  137. rho = state.current_rho
  138. logger.debug(f"Polling position: theta={theta:.1f}°, rho={rho:.2f}, last_led={self._last_led_index}")
  139. # Update position (this will skip if LED zone hasn't changed)
  140. self._update_leds_optimized(theta, rho)
  141. except Exception as e:
  142. logger.error(f"Error polling position: {e}", exc_info=True)
  143. # Schedule next poll
  144. if self._active and self._is_pattern_running:
  145. self._poll_timer = threading.Timer(self._poll_interval, self._poll_position)
  146. self._poll_timer.daemon = True
  147. self._poll_timer.start()
  148. else:
  149. self._poll_timer = None
  150. def _update_leds_optimized(self, current_theta: float, current_rho: float):
  151. """
  152. Optimized LED update - only recalculates if LED zone changed
  153. Args:
  154. current_theta: Most recent theta value
  155. current_rho: Most recent rho value
  156. """
  157. if not self._active:
  158. logger.debug("Update skipped: not active")
  159. return
  160. # If using lookback buffer, get the delayed position
  161. if self._use_buffer:
  162. position = self._get_tracked_position()
  163. if position is None:
  164. logger.debug("Update skipped: no position in buffer")
  165. return
  166. theta, rho, _ = position
  167. else:
  168. # Direct tracking - use current position
  169. theta = current_theta
  170. rho = current_rho
  171. # Calculate new LED index
  172. new_led_index = self._theta_to_led(theta)
  173. # OPTIMIZATION: Only update if LED index actually changed
  174. with self._lock:
  175. if new_led_index == self._last_led_index:
  176. # LED zone hasn't changed, skip update
  177. self._skipped_updates += 1
  178. logger.debug(f"LED zone unchanged: {new_led_index}")
  179. return
  180. # LED zone changed, update it
  181. logger.info(f"LED zone changed: {self._last_led_index} → {new_led_index} (theta={theta:.1f}°)")
  182. self._last_led_index = new_led_index
  183. def _update_leds(self):
  184. """Update LED tracking state (rendering is done by effect loop)"""
  185. if not self._active:
  186. return
  187. # Get position to track (with lookback)
  188. position = self._get_tracked_position()
  189. if position is None:
  190. return
  191. theta, rho, _ = position
  192. # Calculate LED index
  193. led_index = self._theta_to_led(theta)
  194. # Debug logging (every 50th update)
  195. if self._update_count % 50 == 0:
  196. lookback = self.config.get("lookback", 0)
  197. logger.info(f"LED update #{self._update_count}: lookback={lookback}, tracked_theta={theta:.1f}°, led_index={led_index}")
  198. # Store the LED index (effect will read this) - thread-safe update
  199. with self._lock:
  200. self._last_led_index = led_index
  201. def _get_tracked_position(self) -> Optional[Tuple[float, float, float]]:
  202. """Get position to track (accounting for lookback delay)"""
  203. if not self._use_buffer:
  204. # Direct mode - return current position
  205. return self._current_position
  206. # Buffer mode - apply lookback
  207. if not self.position_buffer or len(self.position_buffer) == 0:
  208. return None
  209. lookback = self.config.get("lookback", 0)
  210. # Clamp lookback to buffer size
  211. lookback = min(lookback, len(self.position_buffer) - 1)
  212. lookback = max(0, lookback)
  213. # Get position from buffer
  214. # Index -1 = most recent, -2 = one back, etc.
  215. index = -(lookback + 1)
  216. return self.position_buffer[index]
  217. def _theta_to_led(self, theta: float) -> int:
  218. """
  219. Convert theta angle to LED index
  220. Args:
  221. theta: Angle in degrees (0-360)
  222. Returns:
  223. LED index (0 to num_leds-1)
  224. """
  225. # Normalize theta to 0-360
  226. theta = theta % 360
  227. if theta < 0:
  228. theta += 360
  229. # Calculate LED index (0° = LED 0 before offset)
  230. led_index = int((theta / 360.0) * self.num_leds)
  231. original_index = led_index
  232. # Apply user-defined offset
  233. offset = self.config.get("led_offset", 0)
  234. led_index = (led_index + offset) % self.num_leds
  235. # Reverse direction if needed
  236. is_reversed = self.config.get("reversed", False)
  237. if is_reversed:
  238. led_index_before_reverse = led_index
  239. led_index = (self.num_leds - led_index) % self.num_leds
  240. logger.debug(f"Theta={theta:.1f}° -> LED {original_index} + offset {offset} = {led_index_before_reverse} -> REVERSED to {led_index}")
  241. else:
  242. logger.debug(f"Theta={theta:.1f}° -> LED {original_index} + offset {offset} = {led_index}")
  243. return led_index
  244. def get_tracking_data(self) -> Optional[Dict]:
  245. """
  246. Get current tracking data for effect rendering
  247. Returns:
  248. Dictionary with led_index, spread, brightness, color
  249. or None if no tracking data available
  250. """
  251. # Thread-safe read of LED index
  252. with self._lock:
  253. led_index = self._last_led_index
  254. if led_index is None:
  255. return None
  256. # Get configuration
  257. spread = self.config.get("spread", 3)
  258. brightness = self.config.get("brightness", 50) / 100.0
  259. color_hex = self.config.get("color", "#ffffff")
  260. # Convert hex color to RGB tuple
  261. color_hex = color_hex.lstrip('#')
  262. r = int(color_hex[0:2], 16)
  263. g = int(color_hex[2:4], 16)
  264. b = int(color_hex[4:6], 16)
  265. return {
  266. 'led_index': led_index,
  267. 'spread': spread,
  268. 'brightness': brightness,
  269. 'color': (r, g, b)
  270. }
  271. def update_config(self, config: Dict):
  272. """Update configuration at runtime"""
  273. self.config.update(config)
  274. logger.info(f"Ball tracking config updated: {config}")
  275. logger.info(f"Current reversed setting: {self.config.get('reversed', False)}")
  276. def get_status(self) -> Dict:
  277. """Get current tracking status"""
  278. return {
  279. "active": self._active,
  280. "buffer_size": len(self.position_buffer),
  281. "last_led_index": self._last_led_index,
  282. "config": self.config
  283. }