ball_tracking_manager.py 13 KB


  1. """
  2. Ball Tracking LED Manager
  3. Tracks the ball bearing's position and updates LEDs in real-time to follow its movement.
  4. """
  5. import asyncio
  6. import time
  7. import logging
  8. import threading
  9. from collections import deque
  10. from typing import Optional, Tuple, Dict
  11. from .dw_led_controller import DWLEDController
  12. logger = logging.getLogger(__name__)
  13. class BallTrackingManager:
  14. """Manages real-time LED tracking of ball bearing position"""
  15. def __init__(self, led_controller: DWLEDController, num_leds: int, config: Dict):
  16. """
  17. Initialize ball tracking manager
  18. Args:
  19. led_controller: DWLEDController instance (kept for compatibility, not used for rendering)
  20. num_leds: Number of LEDs in the strip
  21. config: Configuration dict with keys:
  22. - led_offset: LED index offset (0 to num_leds-1)
  23. - reversed: Reverse LED direction (bool)
  24. - spread: Number of adjacent LEDs to light (1-10)
  25. - lookback: Number of coordinates to look back (0-15)
  26. - brightness: LED brightness 0-100
  27. - color: Hex color string (e.g., "#ffffff")
  28. - trail_enabled: Enable fade trail (bool)
  29. - trail_length: Trail length in LEDs (1-20)
  30. """
  31. self.led_controller = led_controller # Kept for backward compatibility
  32. self.num_leds = num_leds
  33. self.config = config
  34. # Position storage (buffer only if lookback > 0)
  35. lookback = config.get("lookback", 0)
  36. if lookback > 0:
  37. # Use buffer for lookback delay
  38. self.position_buffer = deque(maxlen=min(15, lookback + 5))
  39. self._use_buffer = True
  40. logger.info(f"Using position buffer (size={lookback + 5}) for lookback={lookback}")
  41. else:
  42. # No lookback, just store current position
  43. self.position_buffer = None
  44. self._current_position = None # (theta, rho, timestamp)
  45. self._use_buffer = False
  46. logger.info("Direct tracking (no lookback buffer)")
  47. # Tracking state
  48. self._active = False
  49. self._update_task = None
  50. self._last_led_index = None
  51. self._lock = threading.Lock() # Thread safety for LED index updates
  52. self._update_count = 0 # Counter for debug logging
  53. self._skipped_updates = 0 # Track how many updates were skipped
  54. # Polling timer for position updates
  55. self._poll_timer = None
  56. self._poll_interval = 0.2 # Check position every 0.2 seconds (5 Hz)
  57. self._is_pattern_running = False # Flag to track if pattern is executing
  58. logger.info(f"BallTrackingManager initialized with {num_leds} LEDs")
  59. def start(self):
  60. """Start ball tracking"""
  61. if self._active:
  62. logger.warning("Ball tracking already active")
  63. return
  64. self._active = True
  65. logger.info("Ball tracking started")
  66. def stop(self):
  67. """Stop ball tracking"""
  68. if not self._active:
  69. return
  70. self._active = False
  71. self._stop_polling()
  72. if self._use_buffer and self.position_buffer:
  73. self.position_buffer.clear()
  74. else:
  75. self._current_position = None
  76. self._last_led_index = None
  77. logger.info("Ball tracking stopped")
  78. def update_position(self, theta: float, rho: float):
  79. """
  80. Update ball position (called from pattern execution)
  81. Args:
  82. theta: Angular position in degrees (0-360)
  83. rho: Radial distance (0.0-1.0)
  84. """
  85. if not self._active:
  86. return
  87. # Store position
  88. timestamp = time.time()
  89. if self._use_buffer:
  90. self.position_buffer.append((theta, rho, timestamp))
  91. else:
  92. self._current_position = (theta, rho, timestamp)
  93. self._update_count += 1
  94. # Debug logging (every 100th update)
  95. if self._update_count % 100 == 0:
  96. buffer_info = f"buffer_size={len(self.position_buffer)}" if self._use_buffer else "direct"
  97. logger.info(f"Position update #{self._update_count}: theta={theta:.1f}°, rho={rho:.2f}, {buffer_info}, skipped={self._skipped_updates}")
  98. # Trigger LED update (with optimization)
  99. self._update_leds_optimized(theta, rho)
  100. def set_pattern_running(self, is_running: bool):
  101. """
  102. Notify manager that pattern execution started/stopped
  103. Args:
  104. is_running: True if pattern is executing, False otherwise
  105. """
  106. reversed_status = self.config.get("reversed", False)
  107. logger.info(f"set_pattern_running called: is_running={is_running}, active={self._active}, reversed={reversed_status}")
  108. self._is_pattern_running = is_running
  109. if is_running and self._active:
  110. # Pattern started, begin polling
  111. self._start_polling()
  112. logger.info(f"Pattern started - beginning position polling (interval={self._poll_interval}s)")
  113. else:
  114. # Pattern stopped, stop polling
  115. self._stop_polling()
  116. logger.info("Pattern stopped - stopping position polling")
  117. def _start_polling(self):
  118. """Start the position polling timer"""
  119. if self._poll_timer is not None:
  120. return # Already polling
  121. self._poll_position() # Do first poll immediately
  122. def _stop_polling(self):
  123. """Stop the position polling timer"""
  124. if self._poll_timer is not None:
  125. self._poll_timer.cancel()
  126. self._poll_timer = None
  127. def _poll_position(self):
  128. """Poll current position from state and update LEDs if needed"""
  129. if not self._active or not self._is_pattern_running:
  130. logger.debug(f"Polling stopped: active={self._active}, pattern_running={self._is_pattern_running}")
  131. self._poll_timer = None
  132. return
  133. try:
  134. # Import here to avoid circular dependency
  135. from modules.core.state import state
  136. # Get current position from global state
  137. theta = state.current_theta
  138. rho = state.current_rho
  139. logger.info(f"Polling position: theta={theta:.1f}°, rho={rho:.2f}, last_led={self._last_led_index}")
  140. # Update position (this will skip if LED zone hasn't changed)
  141. self._update_leds_optimized(theta, rho)
  142. except Exception as e:
  143. logger.error(f"Error polling position: {e}", exc_info=True)
  144. # Schedule next poll
  145. if self._active and self._is_pattern_running:
  146. self._poll_timer = threading.Timer(self._poll_interval, self._poll_position)
  147. self._poll_timer.daemon = True
  148. self._poll_timer.start()
  149. else:
  150. self._poll_timer = None
  151. def _update_leds_optimized(self, current_theta: float, current_rho: float):
  152. """
  153. Optimized LED update - only recalculates if LED zone changed
  154. Args:
  155. current_theta: Most recent theta value
  156. current_rho: Most recent rho value
  157. """
  158. if not self._active:
  159. logger.debug("Update skipped: not active")
  160. return
  161. # If using lookback buffer, get the delayed position
  162. if self._use_buffer:
  163. position = self._get_tracked_position()
  164. if position is None:
  165. logger.debug("Update skipped: no position in buffer")
  166. return
  167. theta, rho, _ = position
  168. else:
  169. # Direct tracking - use current position
  170. theta = current_theta
  171. rho = current_rho
  172. # Calculate new LED index
  173. new_led_index = self._theta_to_led(theta)
  174. # OPTIMIZATION: Only update if LED index actually changed
  175. with self._lock:
  176. if new_led_index == self._last_led_index:
  177. # LED zone hasn't changed, skip update
  178. self._skipped_updates += 1
  179. logger.debug(f"LED zone unchanged: {new_led_index}")
  180. return
  181. # LED zone changed, update it
  182. logger.info(f"LED zone changed: {self._last_led_index} → {new_led_index} (theta={theta:.1f}°)")
  183. self._last_led_index = new_led_index
  184. def _update_leds(self):
  185. """Update LED tracking state (rendering is done by effect loop)"""
  186. if not self._active:
  187. return
  188. # Get position to track (with lookback)
  189. position = self._get_tracked_position()
  190. if position is None:
  191. return
  192. theta, rho, _ = position
  193. # Calculate LED index
  194. led_index = self._theta_to_led(theta)
  195. # Debug logging (every 50th update)
  196. if self._update_count % 50 == 0:
  197. lookback = self.config.get("lookback", 0)
  198. logger.info(f"LED update #{self._update_count}: lookback={lookback}, tracked_theta={theta:.1f}°, led_index={led_index}")
  199. # Store the LED index (effect will read this) - thread-safe update
  200. with self._lock:
  201. self._last_led_index = led_index
  202. def _get_tracked_position(self) -> Optional[Tuple[float, float, float]]:
  203. """Get position to track (accounting for lookback delay)"""
  204. if not self._use_buffer:
  205. # Direct mode - return current position
  206. return self._current_position
  207. # Buffer mode - apply lookback
  208. if not self.position_buffer or len(self.position_buffer) == 0:
  209. return None
  210. lookback = self.config.get("lookback", 0)
  211. # Clamp lookback to buffer size
  212. lookback = min(lookback, len(self.position_buffer) - 1)
  213. lookback = max(0, lookback)
  214. # Get position from buffer
  215. # Index -1 = most recent, -2 = one back, etc.
  216. index = -(lookback + 1)
  217. return self.position_buffer[index]
  218. def _theta_to_led(self, theta: float) -> int:
  219. """
  220. Convert theta angle to LED index
  221. Args:
  222. theta: Angle in radians (0 to 2π for one revolution)
  223. Returns:
  224. LED index (0 to num_leds-1)
  225. """
  226. import math
  227. # Normalize theta to 0-2π
  228. TWO_PI = 2 * math.pi
  229. theta = theta % TWO_PI
  230. if theta < 0:
  231. theta += TWO_PI
  232. # Calculate LED index (0 rad = LED 0 before offset)
  233. led_index = int((theta / TWO_PI) * self.num_leds)
  234. original_index = led_index
  235. # Apply user-defined offset
  236. offset = self.config.get("led_offset", 0)
  237. led_index = (led_index + offset) % self.num_leds
  238. # Reverse direction if needed
  239. is_reversed = self.config.get("reversed", False)
  240. if is_reversed:
  241. led_index_before_reverse = led_index
  242. led_index = (self.num_leds - led_index) % self.num_leds
  243. logger.debug(f"Theta={theta:.3f} rad ({math.degrees(theta):.1f}°) -> LED {original_index} + offset {offset} = {led_index_before_reverse} -> REVERSED to {led_index}")
  244. else:
  245. logger.debug(f"Theta={theta:.3f} rad ({math.degrees(theta):.1f}°) -> LED {original_index} + offset {offset} = {led_index}")
  246. return led_index
  247. def get_tracking_data(self) -> Optional[Dict]:
  248. """
  249. Get current tracking data for effect rendering
  250. Returns:
  251. Dictionary with led_index, spread, brightness, color
  252. or None if no tracking data available
  253. """
  254. # Thread-safe read of LED index
  255. with self._lock:
  256. led_index = self._last_led_index
  257. if led_index is None:
  258. return None
  259. # Get configuration
  260. spread = self.config.get("spread", 3)
  261. brightness = self.config.get("brightness", 50) / 100.0
  262. color_hex = self.config.get("color", "#ffffff")
  263. # Convert hex color to RGB tuple
  264. color_hex = color_hex.lstrip('#')
  265. r = int(color_hex[0:2], 16)
  266. g = int(color_hex[2:4], 16)
  267. b = int(color_hex[4:6], 16)
  268. return {
  269. 'led_index': led_index,
  270. 'spread': spread,
  271. 'brightness': brightness,
  272. 'color': (r, g, b)
  273. }
  274. def update_config(self, config: Dict):
  275. """Update configuration at runtime"""
  276. self.config.update(config)
  277. logger.info(f"Ball tracking config updated: {config}")
  278. logger.info(f"Current reversed setting: {self.config.get('reversed', False)}")
  279. def get_status(self) -> Dict:
  280. """Get current tracking status"""
  281. return {
  282. "active": self._active,
  283. "buffer_size": len(self.position_buffer),
  284. "last_led_index": self._last_led_index,
  285. "config": self.config
  286. }