ball_tracking_manager.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203
  1. """
  2. Ball Tracking LED Manager
  3. Tracks the ball bearing's position and updates LEDs in real-time to follow its movement.
  4. """
  5. import asyncio
  6. import time
  7. import logging
  8. import threading
  9. from collections import deque
  10. from typing import Optional, Tuple, Dict
  11. from .dw_led_controller import DWLEDController
  12. logger = logging.getLogger(__name__)
  13. class BallTrackingManager:
  14. """Manages real-time LED tracking of ball bearing position"""
  15. def __init__(self, led_controller: DWLEDController, num_leds: int, config: Dict):
  16. """
  17. Initialize ball tracking manager
  18. Args:
  19. led_controller: DWLEDController instance (kept for compatibility, not used for rendering)
  20. num_leds: Number of LEDs in the strip
  21. config: Configuration dict with keys:
  22. - led_offset: LED index offset (0 to num_leds-1)
  23. - reversed: Reverse LED direction (bool)
  24. - spread: Number of adjacent LEDs to light (1-10)
  25. - lookback: Number of coordinates to look back (0-15)
  26. - brightness: LED brightness 0-100
  27. - color: Hex color string (e.g., "#ffffff")
  28. - trail_enabled: Enable fade trail (bool)
  29. - trail_length: Trail length in LEDs (1-20)
  30. """
  31. self.led_controller = led_controller # Kept for backward compatibility
  32. self.num_leds = num_leds
  33. self.config = config
  34. # Coordinate history buffer (max 15 coordinates)
  35. self.position_buffer = deque(maxlen=15)
  36. # Tracking state
  37. self._active = False
  38. self._update_task = None
  39. self._last_led_index = None
  40. self._lock = threading.Lock() # Thread safety for LED index updates
  41. logger.info(f"BallTrackingManager initialized with {num_leds} LEDs")
  42. def start(self):
  43. """Start ball tracking"""
  44. if self._active:
  45. logger.warning("Ball tracking already active")
  46. return
  47. self._active = True
  48. logger.info("Ball tracking started")
  49. def stop(self):
  50. """Stop ball tracking"""
  51. if not self._active:
  52. return
  53. self._active = False
  54. self.position_buffer.clear()
  55. self._last_led_index = None
  56. logger.info("Ball tracking stopped")
  57. def update_position(self, theta: float, rho: float):
  58. """
  59. Update ball position (called from pattern execution)
  60. Args:
  61. theta: Angular position in degrees (0-360)
  62. rho: Radial distance (0.0-1.0)
  63. """
  64. if not self._active:
  65. return
  66. # Add to buffer
  67. self.position_buffer.append((theta, rho, time.time()))
  68. # Trigger LED update
  69. self._update_leds()
  70. def _update_leds(self):
  71. """Update LED tracking state (rendering is done by effect loop)"""
  72. if not self._active:
  73. return
  74. # Get position to track (with lookback)
  75. position = self._get_tracked_position()
  76. if position is None:
  77. return
  78. theta, rho, _ = position
  79. # Calculate LED index
  80. led_index = self._theta_to_led(theta)
  81. # Store the LED index (effect will read this) - thread-safe update
  82. with self._lock:
  83. self._last_led_index = led_index
  84. def _get_tracked_position(self) -> Optional[Tuple[float, float, float]]:
  85. """Get position to track (accounting for lookback delay)"""
  86. lookback = self.config.get("lookback", 0)
  87. if len(self.position_buffer) == 0:
  88. return None
  89. # Clamp lookback to buffer size
  90. lookback = min(lookback, len(self.position_buffer) - 1)
  91. lookback = max(0, lookback)
  92. # Get position from buffer
  93. # Index -1 = most recent, -2 = one back, etc.
  94. index = -(lookback + 1)
  95. return self.position_buffer[index]
  96. def _theta_to_led(self, theta: float) -> int:
  97. """
  98. Convert theta angle to LED index
  99. Args:
  100. theta: Angle in degrees (0-360)
  101. Returns:
  102. LED index (0 to num_leds-1)
  103. """
  104. # Normalize theta to 0-360
  105. theta = theta % 360
  106. if theta < 0:
  107. theta += 360
  108. # Calculate LED index (0° = LED 0 before offset)
  109. led_index = int((theta / 360.0) * self.num_leds)
  110. original_index = led_index
  111. # Apply user-defined offset
  112. offset = self.config.get("led_offset", 0)
  113. led_index = (led_index + offset) % self.num_leds
  114. # Reverse direction if needed
  115. is_reversed = self.config.get("reversed", False)
  116. if is_reversed:
  117. led_index_before_reverse = led_index
  118. led_index = (self.num_leds - led_index) % self.num_leds
  119. logger.debug(f"Theta={theta:.1f}° -> LED {original_index} + offset {offset} = {led_index_before_reverse} -> REVERSED to {led_index}")
  120. else:
  121. logger.debug(f"Theta={theta:.1f}° -> LED {original_index} + offset {offset} = {led_index}")
  122. return led_index
  123. def get_tracking_data(self) -> Optional[Dict]:
  124. """
  125. Get current tracking data for effect rendering
  126. Returns:
  127. Dictionary with led_index, spread, brightness, color
  128. or None if no tracking data available
  129. """
  130. # Thread-safe read of LED index
  131. with self._lock:
  132. led_index = self._last_led_index
  133. if led_index is None:
  134. return None
  135. # Get configuration
  136. spread = self.config.get("spread", 3)
  137. brightness = self.config.get("brightness", 50) / 100.0
  138. color_hex = self.config.get("color", "#ffffff")
  139. # Convert hex color to RGB tuple
  140. color_hex = color_hex.lstrip('#')
  141. r = int(color_hex[0:2], 16)
  142. g = int(color_hex[2:4], 16)
  143. b = int(color_hex[4:6], 16)
  144. return {
  145. 'led_index': led_index,
  146. 'spread': spread,
  147. 'brightness': brightness,
  148. 'color': (r, g, b)
  149. }
  150. def update_config(self, config: Dict):
  151. """Update configuration at runtime"""
  152. self.config.update(config)
  153. logger.info(f"Ball tracking config updated: {config}")
  154. logger.info(f"Current reversed setting: {self.config.get('reversed', False)}")
  155. def get_status(self) -> Dict:
  156. """Get current tracking status"""
  157. return {
  158. "active": self._active,
  159. "buffer_size": len(self.position_buffer),
  160. "last_led_index": self._last_led_index,
  161. "config": self.config
  162. }