1
0

ball_tracking_manager.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. """
  2. Ball Tracking LED Manager
  3. Tracks the ball bearing's position and updates LEDs in real-time to follow its movement.
  4. """
  5. import asyncio
  6. import time
  7. import logging
  8. import threading
  9. from collections import deque
  10. from typing import Optional, Tuple, Dict
  11. from .dw_led_controller import DWLEDController
  12. logger = logging.getLogger(__name__)
  13. class BallTrackingManager:
  14. """Manages real-time LED tracking of ball bearing position"""
  15. def __init__(self, led_controller: DWLEDController, num_leds: int, config: Dict):
  16. """
  17. Initialize ball tracking manager
  18. Args:
  19. led_controller: DWLEDController instance (kept for compatibility, not used for rendering)
  20. num_leds: Number of LEDs in the strip
  21. config: Configuration dict with keys:
  22. - led_offset: LED index offset (0 to num_leds-1)
  23. - reversed: Reverse LED direction (bool)
  24. - spread: Number of adjacent LEDs to light (1-10)
  25. - lookback: Number of coordinates to look back (0-15)
  26. - brightness: LED brightness 0-100
  27. - color: Hex color string (e.g., "#ffffff")
  28. - trail_enabled: Enable fade trail (bool)
  29. - trail_length: Trail length in LEDs (1-20)
  30. """
  31. self.led_controller = led_controller # Kept for backward compatibility
  32. self.num_leds = num_leds
  33. self.config = config
  34. # Coordinate history buffer (max 15 coordinates)
  35. self.position_buffer = deque(maxlen=15)
  36. # Tracking state
  37. self._active = False
  38. self._update_task = None
  39. self._last_led_index = None
  40. self._lock = threading.Lock() # Thread safety for LED index updates
  41. self._update_count = 0 # Counter for debug logging
  42. logger.info(f"BallTrackingManager initialized with {num_leds} LEDs")
  43. def start(self):
  44. """Start ball tracking"""
  45. if self._active:
  46. logger.warning("Ball tracking already active")
  47. return
  48. self._active = True
  49. logger.info("Ball tracking started")
  50. def stop(self):
  51. """Stop ball tracking"""
  52. if not self._active:
  53. return
  54. self._active = False
  55. self.position_buffer.clear()
  56. self._last_led_index = None
  57. logger.info("Ball tracking stopped")
  58. def update_position(self, theta: float, rho: float):
  59. """
  60. Update ball position (called from pattern execution)
  61. Args:
  62. theta: Angular position in degrees (0-360)
  63. rho: Radial distance (0.0-1.0)
  64. """
  65. if not self._active:
  66. return
  67. # Add to buffer
  68. self.position_buffer.append((theta, rho, time.time()))
  69. self._update_count += 1
  70. # Debug logging (every 50th update)
  71. if self._update_count % 50 == 0:
  72. logger.info(f"Position update #{self._update_count}: theta={theta:.1f}°, rho={rho:.2f}, buffer_size={len(self.position_buffer)}")
  73. # Trigger LED update
  74. self._update_leds()
  75. def _update_leds(self):
  76. """Update LED tracking state (rendering is done by effect loop)"""
  77. if not self._active:
  78. return
  79. # Get position to track (with lookback)
  80. position = self._get_tracked_position()
  81. if position is None:
  82. return
  83. theta, rho, _ = position
  84. # Calculate LED index
  85. led_index = self._theta_to_led(theta)
  86. # Debug logging (every 50th update)
  87. if self._update_count % 50 == 0:
  88. lookback = self.config.get("lookback", 0)
  89. logger.info(f"LED update #{self._update_count}: lookback={lookback}, tracked_theta={theta:.1f}°, led_index={led_index}")
  90. # Store the LED index (effect will read this) - thread-safe update
  91. with self._lock:
  92. self._last_led_index = led_index
  93. def _get_tracked_position(self) -> Optional[Tuple[float, float, float]]:
  94. """Get position to track (accounting for lookback delay)"""
  95. lookback = self.config.get("lookback", 0)
  96. if len(self.position_buffer) == 0:
  97. return None
  98. # Clamp lookback to buffer size
  99. lookback = min(lookback, len(self.position_buffer) - 1)
  100. lookback = max(0, lookback)
  101. # Get position from buffer
  102. # Index -1 = most recent, -2 = one back, etc.
  103. index = -(lookback + 1)
  104. return self.position_buffer[index]
  105. def _theta_to_led(self, theta: float) -> int:
  106. """
  107. Convert theta angle to LED index
  108. Args:
  109. theta: Angle in degrees (0-360)
  110. Returns:
  111. LED index (0 to num_leds-1)
  112. """
  113. # Normalize theta to 0-360
  114. theta = theta % 360
  115. if theta < 0:
  116. theta += 360
  117. # Calculate LED index (0° = LED 0 before offset)
  118. led_index = int((theta / 360.0) * self.num_leds)
  119. original_index = led_index
  120. # Apply user-defined offset
  121. offset = self.config.get("led_offset", 0)
  122. led_index = (led_index + offset) % self.num_leds
  123. # Reverse direction if needed
  124. is_reversed = self.config.get("reversed", False)
  125. if is_reversed:
  126. led_index_before_reverse = led_index
  127. led_index = (self.num_leds - led_index) % self.num_leds
  128. logger.debug(f"Theta={theta:.1f}° -> LED {original_index} + offset {offset} = {led_index_before_reverse} -> REVERSED to {led_index}")
  129. else:
  130. logger.debug(f"Theta={theta:.1f}° -> LED {original_index} + offset {offset} = {led_index}")
  131. return led_index
  132. def get_tracking_data(self) -> Optional[Dict]:
  133. """
  134. Get current tracking data for effect rendering
  135. Returns:
  136. Dictionary with led_index, spread, brightness, color
  137. or None if no tracking data available
  138. """
  139. # Thread-safe read of LED index
  140. with self._lock:
  141. led_index = self._last_led_index
  142. if led_index is None:
  143. return None
  144. # Get configuration
  145. spread = self.config.get("spread", 3)
  146. brightness = self.config.get("brightness", 50) / 100.0
  147. color_hex = self.config.get("color", "#ffffff")
  148. # Convert hex color to RGB tuple
  149. color_hex = color_hex.lstrip('#')
  150. r = int(color_hex[0:2], 16)
  151. g = int(color_hex[2:4], 16)
  152. b = int(color_hex[4:6], 16)
  153. return {
  154. 'led_index': led_index,
  155. 'spread': spread,
  156. 'brightness': brightness,
  157. 'color': (r, g, b)
  158. }
  159. def update_config(self, config: Dict):
  160. """Update configuration at runtime"""
  161. self.config.update(config)
  162. logger.info(f"Ball tracking config updated: {config}")
  163. logger.info(f"Current reversed setting: {self.config.get('reversed', False)}")
  164. def get_status(self) -> Dict:
  165. """Get current tracking status"""
  166. return {
  167. "active": self._active,
  168. "buffer_size": len(self.position_buffer),
  169. "last_led_index": self._last_led_index,
  170. "config": self.config
  171. }