dw_led_controller.py 24 KB


  1. """
  2. Dune Weaver LED Controller - Embedded NeoPixel LED controller for Raspberry Pi
  3. Provides direct GPIO control of WS2812B LED strips with beautiful effects
  4. """
  5. import threading
  6. import time
  7. import logging
  8. from typing import Optional, Dict, List, Tuple
  9. from .dw_leds.segment import Segment
  10. from .dw_leds.effects.basic_effects import get_effect, get_all_effects, FRAMETIME
  11. from .dw_leds.utils.palettes import get_palette_name, PALETTE_NAMES
  12. from .dw_leds.utils.colors import rgb_to_color
  13. logger = logging.getLogger(__name__)
  14. class DWLEDController:
  15. """Dune Weaver LED Controller for NeoPixel LED strips"""
  16. def __init__(self, num_leds: int = 60, gpio_pin: int = 18, brightness: float = 0.35,
  17. pixel_order: str = "GRB", speed: int = 128, intensity: int = 128):
  18. """
  19. Initialize Dune Weaver LED controller
  20. Args:
  21. num_leds: Number of LEDs in the strip
  22. gpio_pin: GPIO pin number (BCM numbering: 12, 13, 18, or 19)
  23. brightness: Global brightness (0.0 - 1.0)
  24. pixel_order: Pixel color order (GRB, RGB, RGBW, GRBW)
  25. speed: Effect speed 0-255 (default: 128)
  26. intensity: Effect intensity 0-255 (default: 128)
  27. """
  28. self.num_leds = num_leds
  29. self.gpio_pin = gpio_pin
  30. self.brightness = brightness
  31. self.pixel_order = pixel_order
  32. # State
  33. self._powered_on = False
  34. self._current_effect_id = 8
  35. self._current_palette_id = 0
  36. self._speed = speed
  37. self._intensity = intensity
  38. self._color1 = (255, 0, 0) # Red (primary)
  39. self._color2 = (0, 0, 0) # Black (background/off)
  40. self._color3 = (0, 0, 255) # Blue (tertiary)
  41. # Threading
  42. self._pixels = None
  43. self._segment = None
  44. self._effect_thread = None
  45. self._stop_thread = threading.Event()
  46. self._lock = threading.Lock()
  47. self._initialized = False
  48. self._init_error = None # Store initialization error message
  49. def _initialize_hardware(self):
  50. """Lazy initialization of NeoPixel hardware"""
  51. if self._initialized:
  52. return True
  53. # Try standard NeoPixel library first (works on Pi 4 and earlier)
  54. # If that fails, fall back to Pi 5-specific library
  55. neopixel_module = None
  56. using_pi5_library = False
  57. try:
  58. import board
  59. import neopixel
  60. neopixel_module = neopixel
  61. logger.info("Using standard NeoPixel library")
  62. except (ImportError, RuntimeError) as e:
  63. logger.warning(f"Standard NeoPixel library failed: {e}. Trying Pi 5 library...")
  64. try:
  65. import board
  66. from adafruit_blinka_raspberry_pi5_neopixel import neopixel as neopixel_pi5
  67. neopixel_module = neopixel_pi5
  68. using_pi5_library = True
  69. logger.info("Using Adafruit Pi 5 NeoPixel library (PIO-based)")
  70. except ImportError as e2:
  71. error_msg = (
  72. f"Failed to import NeoPixel libraries. "
  73. f"Standard library error: {e}. "
  74. f"Pi 5 library error: {e2}. "
  75. f"For Pi 4 and earlier: pip install adafruit-circuitpython-neopixel adafruit-blinka. "
  76. f"For Pi 5: pip install Adafruit-Blinka-Raspberry-Pi5-Neopixel"
  77. )
  78. self._init_error = error_msg
  79. logger.error(error_msg)
  80. return False
  81. try:
  82. # Map GPIO pin numbers to board pins
  83. pin_map = {
  84. 12: board.D12,
  85. 13: board.D13,
  86. 18: board.D18,
  87. 19: board.D19
  88. }
  89. if self.gpio_pin not in pin_map:
  90. error_msg = f"Invalid GPIO pin {self.gpio_pin}. Must be 12, 13, 18, or 19 (PWM-capable pins)"
  91. self._init_error = error_msg
  92. logger.error(error_msg)
  93. return False
  94. board_pin = pin_map[self.gpio_pin]
  95. # Initialize NeoPixel strip
  96. self._pixels = neopixel_module.NeoPixel(
  97. board_pin,
  98. self.num_leds,
  99. brightness=self.brightness,
  100. auto_write=False,
  101. pixel_order=self.pixel_order
  102. )
  103. # Create segment for the entire strip
  104. self._segment = Segment(self._pixels, 0, self.num_leds)
  105. self._segment.speed = self._speed
  106. self._segment.intensity = self._intensity
  107. self._segment.palette_id = self._current_palette_id
  108. # Set colors
  109. self._segment.colors[0] = rgb_to_color(*self._color1)
  110. self._segment.colors[1] = rgb_to_color(*self._color2)
  111. self._segment.colors[2] = rgb_to_color(*self._color3)
  112. self._initialized = True
  113. library_type = "Pi 5 (PIO)" if using_pi5_library else "standard"
  114. logger.info(f"DW LEDs initialized: {self.num_leds} LEDs on GPIO {self.gpio_pin} using {library_type} library")
  115. return True
  116. except Exception as e:
  117. error_msg = f"Failed to initialize NeoPixel hardware: {e}"
  118. self._init_error = error_msg
  119. logger.error(error_msg)
  120. return False
  121. def _effect_loop(self):
  122. """Background thread that runs the current effect"""
  123. # Elevate priority and pin to CPU 0 for consistent timing
  124. # LED uses lower priority (40) than motion (60) since CNC is more critical
  125. from modules.core import scheduling
  126. scheduling.setup_realtime_thread(priority=40)
  127. while not self._stop_thread.is_set():
  128. try:
  129. with self._lock:
  130. if self._pixels and self._segment and self._powered_on:
  131. # Get current effect function (allows dynamic effect switching)
  132. effect_func = get_effect(self._current_effect_id)
  133. # Run effect and get delay
  134. delay_ms = effect_func(self._segment)
  135. # Update pixels
  136. self._pixels.show()
  137. # Increment call counter
  138. self._segment.call += 1
  139. else:
  140. delay_ms = 100 # Idle delay when off
  141. # Sleep for the effect's requested delay
  142. time.sleep(delay_ms / 1000.0)
  143. except Exception as e:
  144. logger.error(f"Error in effect loop: {e}")
  145. time.sleep(0.1)
  146. def set_power(self, state: int) -> Dict:
  147. """
  148. Set power state
  149. Args:
  150. state: 0=Off, 1=On, 2=Toggle
  151. Returns:
  152. Dict with status
  153. """
  154. if not self._initialize_hardware():
  155. return {
  156. "connected": False,
  157. "error": self._init_error or "Failed to initialize LED hardware"
  158. }
  159. with self._lock:
  160. if state == 2: # Toggle
  161. self._powered_on = not self._powered_on
  162. else:
  163. self._powered_on = bool(state)
  164. # Turn off all pixels immediately when powering off
  165. if not self._powered_on and self._pixels:
  166. self._pixels.fill((0, 0, 0))
  167. self._pixels.show()
  168. # Start effect thread if not running
  169. if self._powered_on and (self._effect_thread is None or not self._effect_thread.is_alive()):
  170. self._stop_thread.clear()
  171. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  172. self._effect_thread.start()
  173. return {
  174. "connected": True,
  175. "power_on": self._powered_on,
  176. "message": f"Power {'on' if self._powered_on else 'off'}"
  177. }
  178. def set_brightness(self, value: int) -> Dict:
  179. """
  180. Set global brightness
  181. Args:
  182. value: Brightness 0-100
  183. Returns:
  184. Dict with status
  185. """
  186. if not self._initialized:
  187. if not self._initialize_hardware():
  188. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  189. brightness = max(0.0, min(1.0, value / 100.0))
  190. with self._lock:
  191. self.brightness = brightness
  192. if self._pixels:
  193. self._pixels.brightness = brightness
  194. return {
  195. "connected": True,
  196. "brightness": int(brightness * 100),
  197. "message": "Brightness updated"
  198. }
  199. def set_color(self, r: int, g: int, b: int) -> Dict:
  200. """
  201. Set solid color (sets effect to Static and color1)
  202. Args:
  203. r, g, b: RGB values 0-255
  204. Returns:
  205. Dict with status
  206. """
  207. if not self._initialized:
  208. if not self._initialize_hardware():
  209. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  210. with self._lock:
  211. self._color1 = (r, g, b)
  212. if self._segment:
  213. self._segment.colors[0] = rgb_to_color(r, g, b)
  214. # Switch to static effect
  215. self._current_effect_id = 0
  216. self._segment.reset()
  217. # Auto power on when setting color
  218. if not self._powered_on:
  219. self._powered_on = True
  220. # Ensure effect thread is running
  221. if self._effect_thread is None or not self._effect_thread.is_alive():
  222. self._stop_thread.clear()
  223. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  224. self._effect_thread.start()
  225. return {
  226. "connected": True,
  227. "color": [r, g, b],
  228. "power_on": self._powered_on,
  229. "message": "Color set"
  230. }
  231. def set_colors(self, color1: Optional[Tuple[int, int, int]] = None,
  232. color2: Optional[Tuple[int, int, int]] = None,
  233. color3: Optional[Tuple[int, int, int]] = None) -> Dict:
  234. """
  235. Set effect colors (does not change effect or auto-power on)
  236. Args:
  237. color1: Primary color RGB tuple (0-255)
  238. color2: Secondary/background color RGB tuple (0-255)
  239. color3: Tertiary color RGB tuple (0-255)
  240. Returns:
  241. Dict with status
  242. """
  243. if not self._initialized:
  244. if not self._initialize_hardware():
  245. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  246. colors_set = []
  247. with self._lock:
  248. if color1 is not None:
  249. self._color1 = color1
  250. if self._segment:
  251. self._segment.colors[0] = rgb_to_color(*color1)
  252. colors_set.append(f"color1={color1}")
  253. if color2 is not None:
  254. self._color2 = color2
  255. if self._segment:
  256. self._segment.colors[1] = rgb_to_color(*color2)
  257. colors_set.append(f"color2={color2}")
  258. if color3 is not None:
  259. self._color3 = color3
  260. if self._segment:
  261. self._segment.colors[2] = rgb_to_color(*color3)
  262. colors_set.append(f"color3={color3}")
  263. # Reset effect to apply new colors
  264. if self._segment and colors_set:
  265. self._segment.reset()
  266. return {
  267. "connected": True,
  268. "colors": {
  269. "color1": self._color1,
  270. "color2": self._color2,
  271. "color3": self._color3
  272. },
  273. "message": f"Colors updated: {', '.join(colors_set)}"
  274. }
  275. def set_effect(self, effect_id: int, speed: Optional[int] = None,
  276. intensity: Optional[int] = None) -> Dict:
  277. """
  278. Set active effect
  279. Args:
  280. effect_id: Effect ID (0-15)
  281. speed: Optional speed override (0-255)
  282. intensity: Optional intensity override (0-255)
  283. Returns:
  284. Dict with status
  285. """
  286. if not self._initialized:
  287. if not self._initialize_hardware():
  288. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  289. # Validate effect ID
  290. effects = get_all_effects()
  291. if not any(eid == effect_id for eid, _ in effects):
  292. return {
  293. "connected": False,
  294. "message": f"Invalid effect ID: {effect_id}"
  295. }
  296. with self._lock:
  297. self._current_effect_id = effect_id
  298. if speed is not None:
  299. self._speed = max(0, min(255, speed))
  300. if self._segment:
  301. self._segment.speed = self._speed
  302. if intensity is not None:
  303. self._intensity = max(0, min(255, intensity))
  304. if self._segment:
  305. self._segment.intensity = self._intensity
  306. # Reset effect state
  307. if self._segment:
  308. self._segment.reset()
  309. # Auto power on when setting effect
  310. if not self._powered_on:
  311. self._powered_on = True
  312. # Ensure effect thread is running
  313. if self._effect_thread is None or not self._effect_thread.is_alive():
  314. self._stop_thread.clear()
  315. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  316. self._effect_thread.start()
  317. effect_name = next(name for eid, name in effects if eid == effect_id)
  318. return {
  319. "connected": True,
  320. "effect_id": effect_id,
  321. "effect_name": effect_name,
  322. "power_on": self._powered_on,
  323. "message": f"Effect set to {effect_name}"
  324. }
  325. def set_palette(self, palette_id: int) -> Dict:
  326. """
  327. Set color palette
  328. Args:
  329. palette_id: Palette ID (0-58)
  330. Returns:
  331. Dict with status
  332. """
  333. if not self._initialized:
  334. if not self._initialize_hardware():
  335. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  336. if palette_id < 0 or palette_id >= len(PALETTE_NAMES):
  337. return {
  338. "connected": False,
  339. "message": f"Invalid palette ID: {palette_id}"
  340. }
  341. with self._lock:
  342. self._current_palette_id = palette_id
  343. if self._segment:
  344. self._segment.palette_id = palette_id
  345. # Auto power on when setting palette
  346. if not self._powered_on:
  347. self._powered_on = True
  348. # Ensure effect thread is running
  349. if self._effect_thread is None or not self._effect_thread.is_alive():
  350. self._stop_thread.clear()
  351. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  352. self._effect_thread.start()
  353. palette_name = get_palette_name(palette_id)
  354. return {
  355. "connected": True,
  356. "palette_id": palette_id,
  357. "palette_name": palette_name,
  358. "power_on": self._powered_on,
  359. "message": f"Palette set to {palette_name}"
  360. }
  361. def set_speed(self, speed: int) -> Dict:
  362. """Set effect speed (0-255)"""
  363. if not self._initialized:
  364. if not self._initialize_hardware():
  365. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  366. speed = max(0, min(255, speed))
  367. with self._lock:
  368. self._speed = speed
  369. if self._segment:
  370. self._segment.speed = speed
  371. # Reset effect state so speed change takes effect immediately
  372. self._segment.reset()
  373. return {
  374. "connected": True,
  375. "speed": speed,
  376. "message": "Speed updated"
  377. }
  378. def set_intensity(self, intensity: int) -> Dict:
  379. """Set effect intensity (0-255)"""
  380. if not self._initialized:
  381. if not self._initialize_hardware():
  382. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  383. intensity = max(0, min(255, intensity))
  384. with self._lock:
  385. self._intensity = intensity
  386. if self._segment:
  387. self._segment.intensity = intensity
  388. # Reset effect state so intensity change takes effect immediately
  389. self._segment.reset()
  390. return {
  391. "connected": True,
  392. "intensity": intensity,
  393. "message": "Intensity updated"
  394. }
  395. def get_effects(self) -> List[Tuple[int, str]]:
  396. """Get list of all available effects"""
  397. return get_all_effects()
  398. def get_palettes(self) -> List[Tuple[int, str]]:
  399. """Get list of all available palettes"""
  400. return [(i, name) for i, name in enumerate(PALETTE_NAMES)]
  401. def check_status(self) -> Dict:
  402. """Get current controller status"""
  403. # Attempt initialization if not already initialized
  404. if not self._initialized:
  405. self._initialize_hardware()
  406. # Get color slots from segment if available
  407. colors = []
  408. if self._segment and hasattr(self._segment, 'colors'):
  409. for color_int in self._segment.colors[:3]: # Get up to 3 colors
  410. # Convert integer color to hex string
  411. r = (color_int >> 16) & 0xFF
  412. g = (color_int >> 8) & 0xFF
  413. b = color_int & 0xFF
  414. colors.append(f"#{r:02x}{g:02x}{b:02x}")
  415. else:
  416. colors = ["#ff0000", "#000000", "#0000ff"] # Defaults
  417. status = {
  418. "connected": self._initialized,
  419. "power_on": self._powered_on,
  420. "num_leds": self.num_leds,
  421. "gpio_pin": self.gpio_pin,
  422. "brightness": int(self.brightness * 100),
  423. "current_effect": self._current_effect_id,
  424. "current_palette": self._current_palette_id,
  425. "speed": self._speed,
  426. "intensity": self._intensity,
  427. "colors": colors,
  428. "effect_running": self._effect_thread is not None and self._effect_thread.is_alive()
  429. }
  430. # Include error message if not initialized
  431. if not self._initialized and self._init_error:
  432. status["error"] = self._init_error
  433. return status
  434. def stop(self):
  435. """Stop the effect loop and cleanup"""
  436. self._stop_thread.set()
  437. if self._effect_thread and self._effect_thread.is_alive():
  438. self._effect_thread.join(timeout=1.0)
  439. with self._lock:
  440. if self._pixels:
  441. self._pixels.fill((0, 0, 0))
  442. self._pixels.show()
  443. self._pixels.deinit()
  444. self._pixels = None
  445. self._segment = None
  446. self._initialized = False
  447. # Helper functions for pattern manager integration
  448. def effect_loading(controller: DWLEDController) -> bool:
  449. """Show loading effect (Rainbow Cycle)"""
  450. try:
  451. controller.set_power(1)
  452. controller.set_effect(8, speed=100) # Rainbow Cycle
  453. return True
  454. except Exception as e:
  455. logger.error(f"Error setting loading effect: {e}")
  456. return False
  457. def effect_idle(controller: DWLEDController, effect_settings: Optional[dict] = None) -> bool:
  458. """Show idle effect with full settings. If no effect configured, plays Rainbow with current parameters."""
  459. try:
  460. controller.set_power(1)
  461. if effect_settings and isinstance(effect_settings, dict):
  462. # Configured idle effect: apply full settings
  463. effect_id = effect_settings.get("effect_id", 0)
  464. palette_id = effect_settings.get("palette_id", 0)
  465. speed = effect_settings.get("speed", 128)
  466. intensity = effect_settings.get("intensity", 128)
  467. controller.set_effect(effect_id, speed=speed, intensity=intensity)
  468. controller.set_palette(palette_id)
  469. # Set colors if provided
  470. color1 = effect_settings.get("color1")
  471. if color1:
  472. # Convert hex to RGB
  473. r1 = int(color1[1:3], 16)
  474. g1 = int(color1[3:5], 16)
  475. b1 = int(color1[5:7], 16)
  476. color2 = effect_settings.get("color2", "#000000")
  477. r2 = int(color2[1:3], 16)
  478. g2 = int(color2[3:5], 16)
  479. b2 = int(color2[5:7], 16)
  480. color3 = effect_settings.get("color3", "#0000ff")
  481. r3 = int(color3[1:3], 16)
  482. g3 = int(color3[3:5], 16)
  483. b3 = int(color3[5:7], 16)
  484. controller.set_colors(
  485. color1=(r1, g1, b1),
  486. color2=(r2, g2, b2),
  487. color3=(r3, g3, b3)
  488. )
  489. else:
  490. # Default: Rainbow effect with current controller parameters
  491. controller.set_effect(8) # Rainbow - uses controller's current speed/intensity
  492. return True
  493. except Exception as e:
  494. logger.error(f"Error setting idle effect: {e}")
  495. return False
  496. def effect_connected(controller: DWLEDController) -> bool:
  497. """Show connected effect (green flash)"""
  498. try:
  499. controller.set_power(1)
  500. controller.set_color(0, 255, 0) # Green
  501. controller.set_effect(1, speed=200, intensity=128) # Blink effect
  502. time.sleep(1.0)
  503. return True
  504. except Exception as e:
  505. logger.error(f"Error setting connected effect: {e}")
  506. return False
  507. def effect_playing(controller: DWLEDController, effect_settings: Optional[dict] = None) -> bool:
  508. """Show playing effect with full settings"""
  509. try:
  510. if effect_settings and isinstance(effect_settings, dict):
  511. # New format: full settings dict
  512. controller.set_power(1)
  513. # Set effect
  514. effect_id = effect_settings.get("effect_id", 0)
  515. palette_id = effect_settings.get("palette_id", 0)
  516. speed = effect_settings.get("speed", 128)
  517. intensity = effect_settings.get("intensity", 128)
  518. controller.set_effect(effect_id, speed=speed, intensity=intensity)
  519. controller.set_palette(palette_id)
  520. # Set colors if provided
  521. color1 = effect_settings.get("color1")
  522. if color1:
  523. # Convert hex to RGB
  524. r1 = int(color1[1:3], 16)
  525. g1 = int(color1[3:5], 16)
  526. b1 = int(color1[5:7], 16)
  527. color2 = effect_settings.get("color2", "#000000")
  528. r2 = int(color2[1:3], 16)
  529. g2 = int(color2[3:5], 16)
  530. b2 = int(color2[5:7], 16)
  531. color3 = effect_settings.get("color3", "#0000ff")
  532. r3 = int(color3[1:3], 16)
  533. g3 = int(color3[3:5], 16)
  534. b3 = int(color3[5:7], 16)
  535. controller.set_colors(
  536. color1=(r1, g1, b1),
  537. color2=(r2, g2, b2),
  538. color3=(r3, g3, b3)
  539. )
  540. return True
  541. # Default: do nothing (keep current LED state)
  542. return True
  543. except Exception as e:
  544. logger.error(f"Error setting playing effect: {e}")
  545. return False