dw_led_controller.py 17 KB


  1. """
  2. Dune Weaver LED Controller - Embedded NeoPixel LED controller for Raspberry Pi
  3. Provides direct GPIO control of WS2812B LED strips with beautiful effects
  4. """
  5. import threading
  6. import time
  7. import logging
  8. from typing import Optional, Dict, List, Tuple
  9. from .dw_leds.segment import Segment
  10. from .dw_leds.effects.basic_effects import get_effect, get_all_effects, FRAMETIME
  11. from .dw_leds.utils.palettes import get_palette_name, PALETTE_NAMES
  12. from .dw_leds.utils.colors import rgb_to_color
  13. logger = logging.getLogger(__name__)
  14. class DWLEDController:
  15. """Dune Weaver LED Controller for NeoPixel LED strips"""
  16. def __init__(self, num_leds: int = 60, gpio_pin: int = 12, brightness: float = 0.35,
  17. pixel_order: str = "GRB"):
  18. """
  19. Initialize Dune Weaver LED controller
  20. Args:
  21. num_leds: Number of LEDs in the strip
  22. gpio_pin: GPIO pin number (BCM numbering: 12, 13, 18, or 19)
  23. brightness: Global brightness (0.0 - 1.0)
  24. pixel_order: Pixel color order (GRB, RGB, RGBW, GRBW)
  25. """
  26. self.num_leds = num_leds
  27. self.gpio_pin = gpio_pin
  28. self.brightness = brightness
  29. self.pixel_order = pixel_order
  30. # State
  31. self._powered_on = False
  32. self._current_effect_id = 0
  33. self._current_palette_id = 0
  34. self._speed = 128
  35. self._intensity = 128
  36. self._color1 = (255, 0, 0) # Red
  37. self._color2 = (0, 0, 255) # Blue
  38. self._color3 = (0, 255, 0) # Green
  39. # Threading
  40. self._pixels = None
  41. self._segment = None
  42. self._effect_thread = None
  43. self._stop_thread = threading.Event()
  44. self._lock = threading.Lock()
  45. self._initialized = False
  46. self._init_error = None # Store initialization error message
  47. def _initialize_hardware(self):
  48. """Lazy initialization of NeoPixel hardware"""
  49. if self._initialized:
  50. return True
  51. try:
  52. import board
  53. import neopixel
  54. # Map GPIO pin numbers to board pins
  55. pin_map = {
  56. 12: board.D12,
  57. 13: board.D13,
  58. 18: board.D18,
  59. 19: board.D19
  60. }
  61. if self.gpio_pin not in pin_map:
  62. error_msg = f"Invalid GPIO pin {self.gpio_pin}. Must be 12, 13, 18, or 19 (PWM-capable pins)"
  63. self._init_error = error_msg
  64. logger.error(error_msg)
  65. return False
  66. board_pin = pin_map[self.gpio_pin]
  67. # Initialize NeoPixel strip
  68. self._pixels = neopixel.NeoPixel(
  69. board_pin,
  70. self.num_leds,
  71. brightness=self.brightness,
  72. auto_write=False,
  73. pixel_order=self.pixel_order
  74. )
  75. # Create segment for the entire strip
  76. self._segment = Segment(self._pixels, 0, self.num_leds)
  77. self._segment.speed = self._speed
  78. self._segment.intensity = self._intensity
  79. self._segment.palette_id = self._current_palette_id
  80. # Set colors
  81. self._segment.colors[0] = rgb_to_color(*self._color1)
  82. self._segment.colors[1] = rgb_to_color(*self._color2)
  83. self._segment.colors[2] = rgb_to_color(*self._color3)
  84. self._initialized = True
  85. logger.info(f"DW LEDs initialized: {self.num_leds} LEDs on GPIO {self.gpio_pin}")
  86. return True
  87. except ImportError as e:
  88. error_msg = f"Failed to import NeoPixel libraries: {e}. Make sure adafruit-circuitpython-neopixel and Adafruit-Blinka are installed."
  89. self._init_error = error_msg
  90. logger.error(error_msg)
  91. return False
  92. except Exception as e:
  93. error_msg = f"Failed to initialize NeoPixel hardware: {e}"
  94. self._init_error = error_msg
  95. logger.error(error_msg)
  96. return False
  97. def _effect_loop(self):
  98. """Background thread that runs the current effect"""
  99. while not self._stop_thread.is_set():
  100. try:
  101. with self._lock:
  102. if self._pixels and self._segment and self._powered_on:
  103. # Get current effect function (allows dynamic effect switching)
  104. effect_func = get_effect(self._current_effect_id)
  105. # Run effect and get delay
  106. delay_ms = effect_func(self._segment)
  107. # Update pixels
  108. self._pixels.show()
  109. # Increment call counter
  110. self._segment.call += 1
  111. else:
  112. delay_ms = 100 # Idle delay when off
  113. # Sleep for the effect's requested delay
  114. time.sleep(delay_ms / 1000.0)
  115. except Exception as e:
  116. logger.error(f"Error in effect loop: {e}")
  117. time.sleep(0.1)
  118. def set_power(self, state: int) -> Dict:
  119. """
  120. Set power state
  121. Args:
  122. state: 0=Off, 1=On, 2=Toggle
  123. Returns:
  124. Dict with status
  125. """
  126. if not self._initialize_hardware():
  127. return {
  128. "connected": False,
  129. "error": self._init_error or "Failed to initialize LED hardware"
  130. }
  131. with self._lock:
  132. if state == 2: # Toggle
  133. self._powered_on = not self._powered_on
  134. else:
  135. self._powered_on = bool(state)
  136. # Turn off all pixels immediately when powering off
  137. if not self._powered_on and self._pixels:
  138. self._pixels.fill((0, 0, 0))
  139. self._pixels.show()
  140. # Start effect thread if not running
  141. if self._powered_on and (self._effect_thread is None or not self._effect_thread.is_alive()):
  142. self._stop_thread.clear()
  143. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  144. self._effect_thread.start()
  145. return {
  146. "connected": True,
  147. "power_on": self._powered_on,
  148. "message": f"Power {'on' if self._powered_on else 'off'}"
  149. }
  150. def set_brightness(self, value: int) -> Dict:
  151. """
  152. Set global brightness
  153. Args:
  154. value: Brightness 0-100
  155. Returns:
  156. Dict with status
  157. """
  158. if not self._initialized:
  159. if not self._initialize_hardware():
  160. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  161. brightness = max(0.0, min(1.0, value / 100.0))
  162. with self._lock:
  163. self.brightness = brightness
  164. if self._pixels:
  165. self._pixels.brightness = brightness
  166. return {
  167. "connected": True,
  168. "brightness": int(brightness * 100),
  169. "message": "Brightness updated"
  170. }
  171. def set_color(self, r: int, g: int, b: int) -> Dict:
  172. """
  173. Set solid color (sets effect to Static and color1)
  174. Args:
  175. r, g, b: RGB values 0-255
  176. Returns:
  177. Dict with status
  178. """
  179. if not self._initialized:
  180. if not self._initialize_hardware():
  181. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  182. with self._lock:
  183. self._color1 = (r, g, b)
  184. if self._segment:
  185. self._segment.colors[0] = rgb_to_color(r, g, b)
  186. # Switch to static effect
  187. self._current_effect_id = 0
  188. self._segment.reset()
  189. # Auto power on when setting color
  190. if not self._powered_on:
  191. self._powered_on = True
  192. # Ensure effect thread is running
  193. if self._effect_thread is None or not self._effect_thread.is_alive():
  194. self._stop_thread.clear()
  195. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  196. self._effect_thread.start()
  197. return {
  198. "connected": True,
  199. "color": [r, g, b],
  200. "power_on": self._powered_on,
  201. "message": "Color set"
  202. }
  203. def set_effect(self, effect_id: int, speed: Optional[int] = None,
  204. intensity: Optional[int] = None) -> Dict:
  205. """
  206. Set active effect
  207. Args:
  208. effect_id: Effect ID (0-15)
  209. speed: Optional speed override (0-255)
  210. intensity: Optional intensity override (0-255)
  211. Returns:
  212. Dict with status
  213. """
  214. if not self._initialized:
  215. if not self._initialize_hardware():
  216. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  217. # Validate effect ID
  218. effects = get_all_effects()
  219. if not any(eid == effect_id for eid, _ in effects):
  220. return {
  221. "connected": False,
  222. "message": f"Invalid effect ID: {effect_id}"
  223. }
  224. with self._lock:
  225. self._current_effect_id = effect_id
  226. if speed is not None:
  227. self._speed = max(0, min(255, speed))
  228. if self._segment:
  229. self._segment.speed = self._speed
  230. if intensity is not None:
  231. self._intensity = max(0, min(255, intensity))
  232. if self._segment:
  233. self._segment.intensity = self._intensity
  234. # Reset effect state
  235. if self._segment:
  236. self._segment.reset()
  237. # Auto power on when setting effect
  238. if not self._powered_on:
  239. self._powered_on = True
  240. # Ensure effect thread is running
  241. if self._effect_thread is None or not self._effect_thread.is_alive():
  242. self._stop_thread.clear()
  243. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  244. self._effect_thread.start()
  245. effect_name = next(name for eid, name in effects if eid == effect_id)
  246. return {
  247. "connected": True,
  248. "effect_id": effect_id,
  249. "effect_name": effect_name,
  250. "power_on": self._powered_on,
  251. "message": f"Effect set to {effect_name}"
  252. }
  253. def set_palette(self, palette_id: int) -> Dict:
  254. """
  255. Set color palette
  256. Args:
  257. palette_id: Palette ID (0-58)
  258. Returns:
  259. Dict with status
  260. """
  261. if not self._initialized:
  262. if not self._initialize_hardware():
  263. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  264. if palette_id < 0 or palette_id >= len(PALETTE_NAMES):
  265. return {
  266. "connected": False,
  267. "message": f"Invalid palette ID: {palette_id}"
  268. }
  269. with self._lock:
  270. self._current_palette_id = palette_id
  271. if self._segment:
  272. self._segment.palette_id = palette_id
  273. # Auto power on when setting palette
  274. if not self._powered_on:
  275. self._powered_on = True
  276. # Ensure effect thread is running
  277. if self._effect_thread is None or not self._effect_thread.is_alive():
  278. self._stop_thread.clear()
  279. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  280. self._effect_thread.start()
  281. palette_name = get_palette_name(palette_id)
  282. return {
  283. "connected": True,
  284. "palette_id": palette_id,
  285. "palette_name": palette_name,
  286. "power_on": self._powered_on,
  287. "message": f"Palette set to {palette_name}"
  288. }
  289. def set_speed(self, speed: int) -> Dict:
  290. """Set effect speed (0-255)"""
  291. if not self._initialized:
  292. if not self._initialize_hardware():
  293. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  294. speed = max(0, min(255, speed))
  295. with self._lock:
  296. self._speed = speed
  297. if self._segment:
  298. self._segment.speed = speed
  299. return {
  300. "connected": True,
  301. "speed": speed,
  302. "message": "Speed updated"
  303. }
  304. def set_intensity(self, intensity: int) -> Dict:
  305. """Set effect intensity (0-255)"""
  306. if not self._initialized:
  307. if not self._initialize_hardware():
  308. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  309. intensity = max(0, min(255, intensity))
  310. with self._lock:
  311. self._intensity = intensity
  312. if self._segment:
  313. self._segment.intensity = intensity
  314. return {
  315. "connected": True,
  316. "intensity": intensity,
  317. "message": "Intensity updated"
  318. }
  319. def get_effects(self) -> List[Tuple[int, str]]:
  320. """Get list of all available effects"""
  321. return get_all_effects()
  322. def get_palettes(self) -> List[Tuple[int, str]]:
  323. """Get list of all available palettes"""
  324. return [(i, name) for i, name in enumerate(PALETTE_NAMES)]
  325. def check_status(self) -> Dict:
  326. """Get current controller status"""
  327. # Attempt initialization if not already initialized
  328. if not self._initialized:
  329. self._initialize_hardware()
  330. status = {
  331. "connected": self._initialized,
  332. "power_on": self._powered_on,
  333. "num_leds": self.num_leds,
  334. "gpio_pin": self.gpio_pin,
  335. "brightness": int(self.brightness * 100),
  336. "current_effect": self._current_effect_id,
  337. "current_palette": self._current_palette_id,
  338. "speed": self._speed,
  339. "intensity": self._intensity,
  340. "effect_running": self._effect_thread is not None and self._effect_thread.is_alive()
  341. }
  342. # Include error message if not initialized
  343. if not self._initialized and self._init_error:
  344. status["error"] = self._init_error
  345. return status
  346. def stop(self):
  347. """Stop the effect loop and cleanup"""
  348. self._stop_thread.set()
  349. if self._effect_thread and self._effect_thread.is_alive():
  350. self._effect_thread.join(timeout=1.0)
  351. with self._lock:
  352. if self._pixels:
  353. self._pixels.fill((0, 0, 0))
  354. self._pixels.show()
  355. self._pixels.deinit()
  356. self._pixels = None
  357. self._segment = None
  358. self._initialized = False
  359. # Helper functions for pattern manager integration
  360. def effect_loading(controller: DWLEDController) -> bool:
  361. """Show loading effect (Rainbow Cycle)"""
  362. try:
  363. controller.set_power(1)
  364. controller.set_effect(8, speed=100) # Rainbow Cycle
  365. return True
  366. except Exception as e:
  367. logger.error(f"Error setting loading effect: {e}")
  368. return False
  369. def effect_idle(controller: DWLEDController, effect_name: Optional[str] = None) -> bool:
  370. """Show idle effect"""
  371. try:
  372. if effect_name and effect_name.lower() != "off":
  373. # Try to find effect by name
  374. effects = controller.get_effects()
  375. for eid, name in effects:
  376. if name.lower() == effect_name.lower():
  377. controller.set_power(1)
  378. controller.set_effect(eid)
  379. return True
  380. # Default: turn off
  381. controller.set_power(0)
  382. return True
  383. except Exception as e:
  384. logger.error(f"Error setting idle effect: {e}")
  385. return False
  386. def effect_connected(controller: DWLEDController) -> bool:
  387. """Show connected effect (green flash)"""
  388. try:
  389. controller.set_power(1)
  390. controller.set_color(0, 255, 0) # Green
  391. controller.set_effect(1, speed=200, intensity=128) # Blink effect
  392. time.sleep(1.0)
  393. return True
  394. except Exception as e:
  395. logger.error(f"Error setting connected effect: {e}")
  396. return False
  397. def effect_playing(controller: DWLEDController, effect_name: Optional[str] = None) -> bool:
  398. """Show playing effect"""
  399. try:
  400. if effect_name and effect_name.lower() != "off":
  401. # Try to find effect by name
  402. effects = controller.get_effects()
  403. for eid, name in effects:
  404. if name.lower() == effect_name.lower():
  405. controller.set_power(1)
  406. controller.set_effect(eid)
  407. return True
  408. else:
  409. # Default: turn off
  410. controller.set_power(0)
  411. return True
  412. except Exception as e:
  413. logger.error(f"Error setting playing effect: {e}")
  414. return False