dw_led_controller.py 15 KB


  1. """
  2. Dune Weaver LED Controller - Embedded NeoPixel LED controller for Raspberry Pi
  3. Provides direct GPIO control of WS2812B LED strips with beautiful effects
  4. """
  5. import threading
  6. import time
  7. import logging
  8. from typing import Optional, Dict, List, Tuple
  9. from .dw_leds.segment import Segment
  10. from .dw_leds.effects.basic_effects import get_effect, get_all_effects, FRAMETIME
  11. from .dw_leds.utils.palettes import get_palette_name, PALETTE_NAMES
  12. from .dw_leds.utils.colors import rgb_to_color
  13. logger = logging.getLogger(__name__)
  14. class DWLEDController:
  15. """Dune Weaver LED Controller for NeoPixel LED strips"""
  16. def __init__(self, num_leds: int = 60, gpio_pin: int = 12, brightness: float = 0.35,
  17. pixel_order: str = "GRB"):
  18. """
  19. Initialize Dune Weaver LED controller
  20. Args:
  21. num_leds: Number of LEDs in the strip
  22. gpio_pin: GPIO pin number (BCM numbering: 12, 13, 18, or 19)
  23. brightness: Global brightness (0.0 - 1.0)
  24. pixel_order: Pixel color order (GRB, RGB, RGBW, GRBW)
  25. """
  26. self.num_leds = num_leds
  27. self.gpio_pin = gpio_pin
  28. self.brightness = brightness
  29. self.pixel_order = pixel_order
  30. # State
  31. self._powered_on = False
  32. self._current_effect_id = 0
  33. self._current_palette_id = 0
  34. self._speed = 128
  35. self._intensity = 128
  36. self._color1 = (255, 0, 0) # Red
  37. self._color2 = (0, 0, 255) # Blue
  38. self._color3 = (0, 255, 0) # Green
  39. # Threading
  40. self._pixels = None
  41. self._segment = None
  42. self._effect_thread = None
  43. self._stop_thread = threading.Event()
  44. self._lock = threading.Lock()
  45. self._initialized = False
  46. self._init_error = None # Store initialization error message
  47. def _initialize_hardware(self):
  48. """Lazy initialization of NeoPixel hardware"""
  49. if self._initialized:
  50. return True
  51. try:
  52. import board
  53. import neopixel
  54. # Map GPIO pin numbers to board pins
  55. pin_map = {
  56. 12: board.D12,
  57. 13: board.D13,
  58. 18: board.D18,
  59. 19: board.D19
  60. }
  61. if self.gpio_pin not in pin_map:
  62. error_msg = f"Invalid GPIO pin {self.gpio_pin}. Must be 12, 13, 18, or 19 (PWM-capable pins)"
  63. self._init_error = error_msg
  64. logger.error(error_msg)
  65. return False
  66. board_pin = pin_map[self.gpio_pin]
  67. # Initialize NeoPixel strip
  68. self._pixels = neopixel.NeoPixel(
  69. board_pin,
  70. self.num_leds,
  71. brightness=self.brightness,
  72. auto_write=False,
  73. pixel_order=self.pixel_order
  74. )
  75. # Create segment for the entire strip
  76. self._segment = Segment(self._pixels, 0, self.num_leds)
  77. self._segment.speed = self._speed
  78. self._segment.intensity = self._intensity
  79. self._segment.palette_id = self._current_palette_id
  80. # Set colors
  81. self._segment.colors[0] = rgb_to_color(*self._color1)
  82. self._segment.colors[1] = rgb_to_color(*self._color2)
  83. self._segment.colors[2] = rgb_to_color(*self._color3)
  84. self._initialized = True
  85. logger.info(f"DW LEDs initialized: {self.num_leds} LEDs on GPIO {self.gpio_pin}")
  86. return True
  87. except ImportError as e:
  88. error_msg = f"Failed to import NeoPixel libraries: {e}. Make sure adafruit-circuitpython-neopixel and Adafruit-Blinka are installed."
  89. self._init_error = error_msg
  90. logger.error(error_msg)
  91. return False
  92. except Exception as e:
  93. error_msg = f"Failed to initialize NeoPixel hardware: {e}"
  94. self._init_error = error_msg
  95. logger.error(error_msg)
  96. return False
  97. def _effect_loop(self):
  98. """Background thread that runs the current effect"""
  99. effect_func = get_effect(self._current_effect_id)
  100. while not self._stop_thread.is_set():
  101. try:
  102. with self._lock:
  103. if self._pixels and self._segment and self._powered_on:
  104. # Run effect and get delay
  105. delay_ms = effect_func(self._segment)
  106. # Update pixels
  107. self._pixels.show()
  108. # Increment call counter
  109. self._segment.call += 1
  110. else:
  111. delay_ms = 100 # Idle delay when off
  112. # Sleep for the effect's requested delay
  113. time.sleep(delay_ms / 1000.0)
  114. except Exception as e:
  115. logger.error(f"Error in effect loop: {e}")
  116. time.sleep(0.1)
  117. def set_power(self, state: int) -> Dict:
  118. """
  119. Set power state
  120. Args:
  121. state: 0=Off, 1=On, 2=Toggle
  122. Returns:
  123. Dict with status
  124. """
  125. if not self._initialize_hardware():
  126. return {
  127. "connected": False,
  128. "error": self._init_error or "Failed to initialize LED hardware"
  129. }
  130. with self._lock:
  131. if state == 2: # Toggle
  132. self._powered_on = not self._powered_on
  133. else:
  134. self._powered_on = bool(state)
  135. # Turn off all pixels immediately when powering off
  136. if not self._powered_on and self._pixels:
  137. self._pixels.fill((0, 0, 0))
  138. self._pixels.show()
  139. # Start effect thread if not running
  140. if self._powered_on and (self._effect_thread is None or not self._effect_thread.is_alive()):
  141. self._stop_thread.clear()
  142. self._effect_thread = threading.Thread(target=self._effect_loop, daemon=True)
  143. self._effect_thread.start()
  144. return {
  145. "connected": True,
  146. "power_on": self._powered_on,
  147. "message": f"Power {'on' if self._powered_on else 'off'}"
  148. }
  149. def set_brightness(self, value: int) -> Dict:
  150. """
  151. Set global brightness
  152. Args:
  153. value: Brightness 0-100
  154. Returns:
  155. Dict with status
  156. """
  157. if not self._initialized:
  158. if not self._initialize_hardware():
  159. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  160. brightness = max(0.0, min(1.0, value / 100.0))
  161. with self._lock:
  162. self.brightness = brightness
  163. if self._pixels:
  164. self._pixels.brightness = brightness
  165. return {
  166. "connected": True,
  167. "brightness": int(brightness * 100),
  168. "message": "Brightness updated"
  169. }
  170. def set_color(self, r: int, g: int, b: int) -> Dict:
  171. """
  172. Set solid color (sets effect to Static and color1)
  173. Args:
  174. r, g, b: RGB values 0-255
  175. Returns:
  176. Dict with status
  177. """
  178. if not self._initialized:
  179. if not self._initialize_hardware():
  180. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  181. with self._lock:
  182. self._color1 = (r, g, b)
  183. if self._segment:
  184. self._segment.colors[0] = rgb_to_color(r, g, b)
  185. # Switch to static effect
  186. self._current_effect_id = 0
  187. self._segment.reset()
  188. return {
  189. "connected": True,
  190. "color": [r, g, b],
  191. "message": "Color set"
  192. }
  193. def set_effect(self, effect_id: int, speed: Optional[int] = None,
  194. intensity: Optional[int] = None) -> Dict:
  195. """
  196. Set active effect
  197. Args:
  198. effect_id: Effect ID (0-15)
  199. speed: Optional speed override (0-255)
  200. intensity: Optional intensity override (0-255)
  201. Returns:
  202. Dict with status
  203. """
  204. if not self._initialized:
  205. if not self._initialize_hardware():
  206. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  207. # Validate effect ID
  208. effects = get_all_effects()
  209. if not any(eid == effect_id for eid, _ in effects):
  210. return {
  211. "connected": False,
  212. "message": f"Invalid effect ID: {effect_id}"
  213. }
  214. with self._lock:
  215. self._current_effect_id = effect_id
  216. if speed is not None:
  217. self._speed = max(0, min(255, speed))
  218. if self._segment:
  219. self._segment.speed = self._speed
  220. if intensity is not None:
  221. self._intensity = max(0, min(255, intensity))
  222. if self._segment:
  223. self._segment.intensity = self._intensity
  224. # Reset effect state
  225. if self._segment:
  226. self._segment.reset()
  227. effect_name = next(name for eid, name in effects if eid == effect_id)
  228. return {
  229. "connected": True,
  230. "effect_id": effect_id,
  231. "effect_name": effect_name,
  232. "message": f"Effect set to {effect_name}"
  233. }
  234. def set_palette(self, palette_id: int) -> Dict:
  235. """
  236. Set color palette
  237. Args:
  238. palette_id: Palette ID (0-58)
  239. Returns:
  240. Dict with status
  241. """
  242. if not self._initialized:
  243. if not self._initialize_hardware():
  244. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  245. if palette_id < 0 or palette_id >= len(PALETTE_NAMES):
  246. return {
  247. "connected": False,
  248. "message": f"Invalid palette ID: {palette_id}"
  249. }
  250. with self._lock:
  251. self._current_palette_id = palette_id
  252. if self._segment:
  253. self._segment.palette_id = palette_id
  254. palette_name = get_palette_name(palette_id)
  255. return {
  256. "connected": True,
  257. "palette_id": palette_id,
  258. "palette_name": palette_name,
  259. "message": f"Palette set to {palette_name}"
  260. }
  261. def set_speed(self, speed: int) -> Dict:
  262. """Set effect speed (0-255)"""
  263. if not self._initialized:
  264. if not self._initialize_hardware():
  265. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  266. speed = max(0, min(255, speed))
  267. with self._lock:
  268. self._speed = speed
  269. if self._segment:
  270. self._segment.speed = speed
  271. return {
  272. "connected": True,
  273. "speed": speed,
  274. "message": "Speed updated"
  275. }
  276. def set_intensity(self, intensity: int) -> Dict:
  277. """Set effect intensity (0-255)"""
  278. if not self._initialized:
  279. if not self._initialize_hardware():
  280. return {"connected": False, "error": self._init_error or "Hardware not initialized"}
  281. intensity = max(0, min(255, intensity))
  282. with self._lock:
  283. self._intensity = intensity
  284. if self._segment:
  285. self._segment.intensity = intensity
  286. return {
  287. "connected": True,
  288. "intensity": intensity,
  289. "message": "Intensity updated"
  290. }
  291. def get_effects(self) -> List[Tuple[int, str]]:
  292. """Get list of all available effects"""
  293. return get_all_effects()
  294. def get_palettes(self) -> List[Tuple[int, str]]:
  295. """Get list of all available palettes"""
  296. return [(i, name) for i, name in enumerate(PALETTE_NAMES)]
  297. def check_status(self) -> Dict:
  298. """Get current controller status"""
  299. status = {
  300. "connected": self._initialized,
  301. "power_on": self._powered_on,
  302. "num_leds": self.num_leds,
  303. "gpio_pin": self.gpio_pin,
  304. "brightness": int(self.brightness * 100),
  305. "current_effect": self._current_effect_id,
  306. "current_palette": self._current_palette_id,
  307. "speed": self._speed,
  308. "intensity": self._intensity,
  309. "effect_running": self._effect_thread is not None and self._effect_thread.is_alive()
  310. }
  311. # Include error message if not initialized
  312. if not self._initialized and self._init_error:
  313. status["error"] = self._init_error
  314. return status
  315. def stop(self):
  316. """Stop the effect loop and cleanup"""
  317. self._stop_thread.set()
  318. if self._effect_thread and self._effect_thread.is_alive():
  319. self._effect_thread.join(timeout=1.0)
  320. with self._lock:
  321. if self._pixels:
  322. self._pixels.fill((0, 0, 0))
  323. self._pixels.show()
  324. self._pixels.deinit()
  325. self._pixels = None
  326. self._segment = None
  327. self._initialized = False
  328. # Helper functions for pattern manager integration
  329. def effect_loading(controller: DWLEDController) -> bool:
  330. """Show loading effect (Rainbow Cycle)"""
  331. try:
  332. controller.set_power(1)
  333. controller.set_effect(8, speed=100) # Rainbow Cycle
  334. return True
  335. except Exception as e:
  336. logger.error(f"Error setting loading effect: {e}")
  337. return False
  338. def effect_idle(controller: DWLEDController, effect_name: Optional[str] = None) -> bool:
  339. """Show idle effect"""
  340. try:
  341. if effect_name and effect_name.lower() != "off":
  342. # Try to find effect by name
  343. effects = controller.get_effects()
  344. for eid, name in effects:
  345. if name.lower() == effect_name.lower():
  346. controller.set_power(1)
  347. controller.set_effect(eid)
  348. return True
  349. # Default: turn off
  350. controller.set_power(0)
  351. return True
  352. except Exception as e:
  353. logger.error(f"Error setting idle effect: {e}")
  354. return False
  355. def effect_connected(controller: DWLEDController) -> bool:
  356. """Show connected effect (green flash)"""
  357. try:
  358. controller.set_power(1)
  359. controller.set_color(0, 255, 0) # Green
  360. controller.set_effect(1, speed=200, intensity=128) # Blink effect
  361. time.sleep(1.0)
  362. return True
  363. except Exception as e:
  364. logger.error(f"Error setting connected effect: {e}")
  365. return False
  366. def effect_playing(controller: DWLEDController, effect_name: Optional[str] = None) -> bool:
  367. """Show playing effect"""
  368. try:
  369. if effect_name and effect_name.lower() != "off":
  370. # Try to find effect by name
  371. effects = controller.get_effects()
  372. for eid, name in effects:
  373. if name.lower() == effect_name.lower():
  374. controller.set_power(1)
  375. controller.set_effect(eid)
  376. return True
  377. else:
  378. # Default: turn off
  379. controller.set_power(0)
  380. return True
  381. except Exception as e:
  382. logger.error(f"Error setting playing effect: {e}")
  383. return False