dw_led_controller.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638
  1. """
  2. Dune Weaver LED Controller - Embedded NeoPixel LED controller for Raspberry Pi
  3. Provides direct GPIO control of WS2812B LED strips with beautiful effects
  4. """
  5. import threading
  6. import time
  7. import logging
  8. from typing import Optional, Dict, List, Tuple
  9. from .dw_leds.segment import Segment
  10. from .dw_leds.effects.basic_effects import get_effect, get_all_effects, FRAMETIME
  11. from .dw_leds.utils.palettes import get_palette_name, PALETTE_NAMES
  12. from .dw_leds.utils.colors import rgb_to_color
  13. logger = logging.getLogger(__name__)
  14. class DWLEDController:
  15. """Dune Weaver LED Controller for NeoPixel LED strips"""
  16. def __init__(self, num_leds: int = 60, gpio_pin: int = 12, brightness: float = 0.35,
  17. pixel_order: str = "GRB", speed: int = 128, intensity: int = 128):
  18. """
  19. Initialize Dune Weaver LED controller
  20. Args:
  21. num_leds: Number of LEDs in the strip
  22. gpio_pin: GPIO pin number (BCM numbering: 12, 13, 18, or 19)
  23. brightness: Global brightness (0.0 - 1.0)
  24. pixel_order: Pixel color order (GRB, RGB, RGBW, GRBW)
  25. speed: Effect speed 0-255 (default: 128)
  26. intensity: Effect intensity 0-255 (default: 128)
  27. """
  28. self.num_leds = num_leds
  29. self.gpio_pin = gpio_pin
  30. self.brightness = brightness
  31. self.pixel_order = pixel_order
  32. # State
  33. self._powered_on = False
  34. self._current_effect_id = 0
  35. self._current_palette_id = 0
  36. self._speed = speed
  37. self._intensity = intensity
  38. self._color1 = (255, 0, 0) # Red (primary)
  39. self._color2 = (0, 0, 0) # Black (background/off)
  40. self._color3 = (0, 0, 255) # Blue (tertiary)
  41. # Threading
  42. self._pixels = None
  43. self._segment = None
  44. self._effect_thread = None
  45. self._stop_thread = threading.Event()
  46. self._lock = threading.Lock()
  47. self._initialized = False
  48. self._init_error = None # Store initialization error message
  49. def _initialize_hardware(self):
  50. """Lazy initialization of NeoPixel hardware"""
  51. if self._initialized:
  52. return True
  53. try:
  54. import board
  55. import neopixel
  56. # Map GPIO pin numbers to board pins
  57. pin_map = {
  58. 12: board.D12,
  59. 13: board.D13,
  60. 18: board.D18,
  61. 19: board.D19
  62. }
  63. if self.gpio_pin not in pin_map:
  64. error_msg = f"Invalid GPIO pin {self.gpio_pin}. Must be 12, 13, 18, or 19 (PWM-capable pins)"
  65. self._init_error = error_msg
  66. logger.error(error_msg)
  67. return False
  68. board_pin = pin_map[self.gpio_pin]
  69. # Initialize NeoPixel strip
  70. self._pixels = neopixel.NeoPixel(
  71. board_pin,
  72. self.num_leds,
  73. brightness=self.brightness,
  74. auto_write=False,
  75. pixel_order=self.pixel_order
  76. )
  77. # Create segment for the entire strip
  78. self._segment = Segment(self._pixels, 0, self.num_leds)
  79. self._segment.speed = self._speed
  80. self._segment.intensity = self._intensity
  81. self._segment.palette_id = self._current_palette_id
  82. # Set colors
  83. self._segment.colors[0] = rgb_to_color(*self._color1)
  84. self._segment.colors[1] = rgb_to_color(*self._color2)
  85. self._segment.colors[2] = rgb_to_color(*self._color3)
  86. self._initialized = True
  87. logger.info(f"DW LEDs initialized: {self.num_leds} LEDs on GPIO {self.gpio_pin}")
  88. return True
  89. except ImportError as e:
  90. error_msg = f"Failed to import NeoPixel libraries: {e}. Make sure adafruit-circuitpython-neopixel and Adafruit-Blinka are installed."
  91. self._init_error = error_msg
  92. logger.error(error_msg)
  93. return False
  94. except Exception as e:
  95. error_msg = f"Failed to initialize NeoPixel hardware: {e}"
  96. self._init_error = error_msg
  97. logger.error(error_msg)
  98. return False
  99. def _effect_loop(self):
  100. """Background thread that runs the current effect"""
  101. while not self._stop_thread.is_set():
  102. try:
  103. with self._lock:
  104. if self._pixels and self._segment and self._powered_on:
  105. # Get current effect function (allows dynamic effect switching)
  106. effect_func = get_effect(self._current_effect_id)
  107. # Run effect and get delay
  108. delay_ms = effect_func(self._segment)
  109. # Update pixels
  110. self._pixels.show()
  111. # Increment call counter
  112. self._segment.call += 1
  113. else:
  114. delay_ms = 100 # Idle delay when off
  115. # Sleep for the effect's requested delay
  116. time.sleep(delay_ms / 1000.0)
  117. except Exception as e:
  118. logger.error(f"Error in effect loop: {e}")
  119. time.sleep(0.1)
  120. def set_power(self, state: int) -> Dict:
  121. """
  122. Set power state
  123. Args:
  124. state: 0=Off, 1=On, 2=Toggle
  125. Returns:
  126. Dict with status
  127. """
  128. # DEBUG: Log power state changes with caller info
  129. import traceback
  130. if state == 0:
  131. stack_summary = ''.join(traceback.format_stack()[-4:-1])
  132. logger.warning(f"LED POWER OFF called from:\n{stack_summary}")
  133. elif state == 1:
  134. logger.info("LED POWER ON called")
  135. if not self._initialize_hardware():
  136. return {
  137. "connected": False,
  138. "error": self._init_error or "Failed to initialize LED hardware"
  139. }
  140. with self._lock:
  141. if state == 2: # Toggle
  142. self._powered_on = not self._powered_on
  143. else:
  144. self._powered_on = bool(state)
  145. # Turn off all pixels immediately when powering off
  146. if not self._powered_on and self._pixels:
  147. self._pixels.fill((0, 0, 0))
  148. self._pixels.show()
  149. # Start effect thread if not running
  150. if self._powered_on and (self._effect_thread is None or not self._effect_thread.is_alive()):
  151. self._stop_thread.clear()
  152. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  153. self._effect_thread.start()
  154. return {
  155. "connected": True,
  156. "power_on": self._powered_on,
  157. "message": f"Power {'on' if self._powered_on else 'off'}"
  158. }
  159. def set_brightness(self, value: int) -> Dict:
  160. """
  161. Set global brightness
  162. Args:
  163. value: Brightness 0-100
  164. Returns:
  165. Dict with status
  166. """
  167. if not self._initialized:
  168. if not self._initialize_hardware():
  169. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  170. brightness = max(0.0, min(1.0, value / 100.0))
  171. with self._lock:
  172. self.brightness = brightness
  173. if self._pixels:
  174. self._pixels.brightness = brightness
  175. return {
  176. "connected": True,
  177. "brightness": int(brightness * 100),
  178. "message": "Brightness updated"
  179. }
  180. def set_color(self, r: int, g: int, b: int) -> Dict:
  181. """
  182. Set solid color (sets effect to Static and color1)
  183. Args:
  184. r, g, b: RGB values 0-255
  185. Returns:
  186. Dict with status
  187. """
  188. if not self._initialized:
  189. if not self._initialize_hardware():
  190. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  191. with self._lock:
  192. self._color1 = (r, g, b)
  193. if self._segment:
  194. self._segment.colors[0] = rgb_to_color(r, g, b)
  195. # Switch to static effect
  196. self._current_effect_id = 0
  197. self._segment.reset()
  198. # Auto power on when setting color
  199. if not self._powered_on:
  200. self._powered_on = True
  201. # Ensure effect thread is running
  202. if self._effect_thread is None or not self._effect_thread.is_alive():
  203. self._stop_thread.clear()
  204. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  205. self._effect_thread.start()
  206. return {
  207. "connected": True,
  208. "color": [r, g, b],
  209. "power_on": self._powered_on,
  210. "message": "Color set"
  211. }
  212. def set_colors(self, color1: Optional[Tuple[int, int, int]] = None,
  213. color2: Optional[Tuple[int, int, int]] = None,
  214. color3: Optional[Tuple[int, int, int]] = None) -> Dict:
  215. """
  216. Set effect colors (does not change effect or auto-power on)
  217. Args:
  218. color1: Primary color RGB tuple (0-255)
  219. color2: Secondary/background color RGB tuple (0-255)
  220. color3: Tertiary color RGB tuple (0-255)
  221. Returns:
  222. Dict with status
  223. """
  224. if not self._initialized:
  225. if not self._initialize_hardware():
  226. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  227. colors_set = []
  228. with self._lock:
  229. if color1 is not None:
  230. self._color1 = color1
  231. if self._segment:
  232. self._segment.colors[0] = rgb_to_color(*color1)
  233. colors_set.append(f"color1={color1}")
  234. if color2 is not None:
  235. self._color2 = color2
  236. if self._segment:
  237. self._segment.colors[1] = rgb_to_color(*color2)
  238. colors_set.append(f"color2={color2}")
  239. if color3 is not None:
  240. self._color3 = color3
  241. if self._segment:
  242. self._segment.colors[2] = rgb_to_color(*color3)
  243. colors_set.append(f"color3={color3}")
  244. # Reset effect to apply new colors
  245. if self._segment and colors_set:
  246. self._segment.reset()
  247. return {
  248. "connected": True,
  249. "colors": {
  250. "color1": self._color1,
  251. "color2": self._color2,
  252. "color3": self._color3
  253. },
  254. "message": f"Colors updated: {', '.join(colors_set)}"
  255. }
  256. def set_effect(self, effect_id: int, speed: Optional[int] = None,
  257. intensity: Optional[int] = None) -> Dict:
  258. """
  259. Set active effect
  260. Args:
  261. effect_id: Effect ID (0-15)
  262. speed: Optional speed override (0-255)
  263. intensity: Optional intensity override (0-255)
  264. Returns:
  265. Dict with status
  266. """
  267. if not self._initialized:
  268. if not self._initialize_hardware():
  269. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  270. # Validate effect ID
  271. effects = get_all_effects()
  272. if not any(eid == effect_id for eid, _ in effects):
  273. return {
  274. "connected": False,
  275. "message": f"Invalid effect ID: {effect_id}"
  276. }
  277. with self._lock:
  278. self._current_effect_id = effect_id
  279. if speed is not None:
  280. self._speed = max(0, min(255, speed))
  281. if self._segment:
  282. self._segment.speed = self._speed
  283. if intensity is not None:
  284. self._intensity = max(0, min(255, intensity))
  285. if self._segment:
  286. self._segment.intensity = self._intensity
  287. # Reset effect state
  288. if self._segment:
  289. self._segment.reset()
  290. # Auto power on when setting effect
  291. if not self._powered_on:
  292. self._powered_on = True
  293. # Ensure effect thread is running
  294. if self._effect_thread is None or not self._effect_thread.is_alive():
  295. self._stop_thread.clear()
  296. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  297. self._effect_thread.start()
  298. effect_name = next(name for eid, name in effects if eid == effect_id)
  299. return {
  300. "connected": True,
  301. "effect_id": effect_id,
  302. "effect_name": effect_name,
  303. "power_on": self._powered_on,
  304. "message": f"Effect set to {effect_name}"
  305. }
  306. def set_palette(self, palette_id: int) -> Dict:
  307. """
  308. Set color palette
  309. Args:
  310. palette_id: Palette ID (0-58)
  311. Returns:
  312. Dict with status
  313. """
  314. if not self._initialized:
  315. if not self._initialize_hardware():
  316. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  317. if palette_id < 0 or palette_id >= len(PALETTE_NAMES):
  318. return {
  319. "connected": False,
  320. "message": f"Invalid palette ID: {palette_id}"
  321. }
  322. with self._lock:
  323. self._current_palette_id = palette_id
  324. if self._segment:
  325. self._segment.palette_id = palette_id
  326. # Auto power on when setting palette
  327. if not self._powered_on:
  328. self._powered_on = True
  329. # Ensure effect thread is running
  330. if self._effect_thread is None or not self._effect_thread.is_alive():
  331. self._stop_thread.clear()
  332. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  333. self._effect_thread.start()
  334. palette_name = get_palette_name(palette_id)
  335. return {
  336. "connected": True,
  337. "palette_id": palette_id,
  338. "palette_name": palette_name,
  339. "power_on": self._powered_on,
  340. "message": f"Palette set to {palette_name}"
  341. }
  342. def set_speed(self, speed: int) -> Dict:
  343. """Set effect speed (0-255)"""
  344. if not self._initialized:
  345. if not self._initialize_hardware():
  346. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  347. speed = max(0, min(255, speed))
  348. with self._lock:
  349. self._speed = speed
  350. if self._segment:
  351. self._segment.speed = speed
  352. # Reset effect state so speed change takes effect immediately
  353. self._segment.reset()
  354. return {
  355. "connected": True,
  356. "speed": speed,
  357. "message": "Speed updated"
  358. }
  359. def set_intensity(self, intensity: int) -> Dict:
  360. """Set effect intensity (0-255)"""
  361. if not self._initialized:
  362. if not self._initialize_hardware():
  363. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  364. intensity = max(0, min(255, intensity))
  365. with self._lock:
  366. self._intensity = intensity
  367. if self._segment:
  368. self._segment.intensity = intensity
  369. # Reset effect state so intensity change takes effect immediately
  370. self._segment.reset()
  371. return {
  372. "connected": True,
  373. "intensity": intensity,
  374. "message": "Intensity updated"
  375. }
  376. def get_effects(self) -> List[Tuple[int, str]]:
  377. """Get list of all available effects"""
  378. return get_all_effects()
  379. def get_palettes(self) -> List[Tuple[int, str]]:
  380. """Get list of all available palettes"""
  381. return [(i, name) for i, name in enumerate(PALETTE_NAMES)]
  382. def check_status(self) -> Dict:
  383. """Get current controller status"""
  384. # Attempt initialization if not already initialized
  385. if not self._initialized:
  386. self._initialize_hardware()
  387. # Get color slots from segment if available
  388. colors = []
  389. if self._segment and hasattr(self._segment, 'colors'):
  390. for color_int in self._segment.colors[:3]: # Get up to 3 colors
  391. # Convert integer color to hex string
  392. r = (color_int >> 16) & 0xFF
  393. g = (color_int >> 8) & 0xFF
  394. b = color_int & 0xFF
  395. colors.append(f"#{r:02x}{g:02x}{b:02x}")
  396. else:
  397. colors = ["#ff0000", "#000000", "#0000ff"] # Defaults
  398. status = {
  399. "connected": self._initialized,
  400. "power_on": self._powered_on,
  401. "num_leds": self.num_leds,
  402. "gpio_pin": self.gpio_pin,
  403. "brightness": int(self.brightness * 100),
  404. "current_effect": self._current_effect_id,
  405. "current_palette": self._current_palette_id,
  406. "speed": self._speed,
  407. "intensity": self._intensity,
  408. "colors": colors,
  409. "effect_running": self._effect_thread is not None and self._effect_thread.is_alive()
  410. }
  411. # Include error message if not initialized
  412. if not self._initialized and self._init_error:
  413. status["error"] = self._init_error
  414. return status
  415. def stop(self):
  416. """Stop the effect loop and cleanup"""
  417. self._stop_thread.set()
  418. if self._effect_thread and self._effect_thread.is_alive():
  419. self._effect_thread.join(timeout=1.0)
  420. with self._lock:
  421. if self._pixels:
  422. self._pixels.fill((0, 0, 0))
  423. self._pixels.show()
  424. self._pixels.deinit()
  425. self._pixels = None
  426. self._segment = None
  427. self._initialized = False
  428. # Helper functions for pattern manager integration
  429. def effect_loading(controller: DWLEDController) -> bool:
  430. """Show loading effect (Rainbow Cycle)"""
  431. try:
  432. controller.set_power(1)
  433. controller.set_effect(8, speed=100) # Rainbow Cycle
  434. return True
  435. except Exception as e:
  436. logger.error(f"Error setting loading effect: {e}")
  437. return False
  438. def effect_idle(controller: DWLEDController, effect_settings: Optional[dict] = None) -> bool:
  439. """Show idle effect with full settings"""
  440. try:
  441. if effect_settings and isinstance(effect_settings, dict):
  442. # New format: full settings dict
  443. controller.set_power(1)
  444. # Set effect
  445. effect_id = effect_settings.get("effect_id", 0)
  446. palette_id = effect_settings.get("palette_id", 0)
  447. speed = effect_settings.get("speed", 128)
  448. intensity = effect_settings.get("intensity", 128)
  449. controller.set_effect(effect_id, speed=speed, intensity=intensity)
  450. controller.set_palette(palette_id)
  451. # Set colors if provided
  452. color1 = effect_settings.get("color1")
  453. if color1:
  454. # Convert hex to RGB
  455. r1 = int(color1[1:3], 16)
  456. g1 = int(color1[3:5], 16)
  457. b1 = int(color1[5:7], 16)
  458. color2 = effect_settings.get("color2", "#000000")
  459. r2 = int(color2[1:3], 16)
  460. g2 = int(color2[3:5], 16)
  461. b2 = int(color2[5:7], 16)
  462. color3 = effect_settings.get("color3", "#0000ff")
  463. r3 = int(color3[1:3], 16)
  464. g3 = int(color3[3:5], 16)
  465. b3 = int(color3[5:7], 16)
  466. controller.set_colors(
  467. color1=(r1, g1, b1),
  468. color2=(r2, g2, b2),
  469. color3=(r3, g3, b3)
  470. )
  471. return True
  472. # Default: do nothing (keep current LED state)
  473. return True
  474. except Exception as e:
  475. logger.error(f"Error setting idle effect: {e}")
  476. return False
  477. def effect_connected(controller: DWLEDController) -> bool:
  478. """Show connected effect (green flash)"""
  479. try:
  480. controller.set_power(1)
  481. controller.set_color(0, 255, 0) # Green
  482. controller.set_effect(1, speed=200, intensity=128) # Blink effect
  483. time.sleep(1.0)
  484. return True
  485. except Exception as e:
  486. logger.error(f"Error setting connected effect: {e}")
  487. return False
  488. def effect_playing(controller: DWLEDController, effect_settings: Optional[dict] = None) -> bool:
  489. """Show playing effect with full settings"""
  490. try:
  491. if effect_settings and isinstance(effect_settings, dict):
  492. # New format: full settings dict
  493. controller.set_power(1)
  494. # Set effect
  495. effect_id = effect_settings.get("effect_id", 0)
  496. palette_id = effect_settings.get("palette_id", 0)
  497. speed = effect_settings.get("speed", 128)
  498. intensity = effect_settings.get("intensity", 128)
  499. controller.set_effect(effect_id, speed=speed, intensity=intensity)
  500. controller.set_palette(palette_id)
  501. # Set colors if provided
  502. color1 = effect_settings.get("color1")
  503. if color1:
  504. # Convert hex to RGB
  505. r1 = int(color1[1:3], 16)
  506. g1 = int(color1[3:5], 16)
  507. b1 = int(color1[5:7], 16)
  508. color2 = effect_settings.get("color2", "#000000")
  509. r2 = int(color2[1:3], 16)
  510. g2 = int(color2[3:5], 16)
  511. b2 = int(color2[5:7], 16)
  512. color3 = effect_settings.get("color3", "#0000ff")
  513. r3 = int(color3[1:3], 16)
  514. g3 = int(color3[3:5], 16)
  515. b3 = int(color3[5:7], 16)
  516. controller.set_colors(
  517. color1=(r1, g1, b1),
  518. color2=(r2, g2, b2),
  519. color3=(r3, g3, b3)
  520. )
  521. return True
  522. # Default: do nothing (keep current LED state)
  523. return True
  524. except Exception as e:
  525. logger.error(f"Error setting playing effect: {e}")
  526. return False