led_controller.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. import requests
  2. import json
  3. from typing import Dict, Optional
  4. import time
  5. import logging
  6. logger = logging.getLogger(__name__)
  7. class LEDController:
  8. def __init__(self, ip_address: Optional[str] = None):
  9. self.ip_address = ip_address
  10. def _get_base_url(self) -> str:
  11. """Get base URL for WLED JSON API"""
  12. if not self.ip_address:
  13. raise ValueError("No WLED IP configured")
  14. return f"http://{self.ip_address}/json"
  15. def set_ip(self, ip_address: str) -> None:
  16. """Update the WLED IP address"""
  17. self.ip_address = ip_address
  18. def _send_command(self, state_params: Dict = None) -> Dict:
  19. """Send command to WLED and return status"""
  20. try:
  21. url = self._get_base_url()
  22. # First check current state
  23. response = requests.get(f"{url}/state", timeout=2)
  24. response.raise_for_status()
  25. current_state = response.json()
  26. # If WLED is off and we're trying to set something, turn it on first
  27. if not current_state.get('on', False) and state_params and 'on' not in state_params:
  28. # Turn on power first
  29. requests.post(f"{url}/state", json={"on": True}, timeout=2)
  30. # Now send the actual command if there are parameters
  31. if state_params:
  32. response = requests.post(f"{url}/state", json=state_params, timeout=2)
  33. response.raise_for_status()
  34. response = requests.get(f"{url}/state", timeout=2)
  35. response.raise_for_status()
  36. current_state = response.json()
  37. preset_id = current_state.get('ps', -1)
  38. playlist_id = current_state.get('pl', -1)
  39. # Use True as default since WLED is typically on when responding
  40. is_on = current_state.get('on', True)
  41. return {
  42. "connected": True,
  43. "is_on": is_on,
  44. "preset_id": preset_id,
  45. "playlist_id": playlist_id,
  46. "brightness": current_state.get('bri', 0),
  47. "message": "WLED is ON" if is_on else "WLED is OFF"
  48. }
  49. except ValueError as e:
  50. return {"connected": False, "message": str(e)}
  51. except requests.RequestException as e:
  52. return {"connected": False, "message": f"Cannot connect to WLED: {str(e)}"}
  53. except json.JSONDecodeError as e:
  54. return {"connected": False, "message": f"Error parsing WLED response: {str(e)}"}
  55. def check_wled_status(self) -> Dict:
  56. """Check WLED connection status and brightness"""
  57. return self._send_command()
  58. def set_brightness(self, value: int) -> Dict:
  59. """Set WLED brightness (0-255)"""
  60. if not 0 <= value <= 255:
  61. return {"connected": False, "message": "Brightness must be between 0 and 255"}
  62. return self._send_command({"bri": value})
  63. def set_power(self, state: int) -> Dict:
  64. """Set WLED power state (0=Off, 1=On, 2=Toggle)"""
  65. if state not in [0, 1, 2]:
  66. return {"connected": False, "message": "Power state must be 0 (Off), 1 (On), or 2 (Toggle)"}
  67. if state == 2:
  68. return self._send_command({"on": "t"}) # Toggle
  69. return self._send_command({"on": bool(state)})
  70. def _hex_to_rgb(self, hex_color: str) -> tuple:
  71. """Convert hex color string to RGB tuple"""
  72. hex_color = hex_color.lstrip('#')
  73. if len(hex_color) != 6:
  74. raise ValueError("Hex color must be 6 characters long (without #)")
  75. return tuple(int(hex_color[i:i+2], 16) for i in (0, 2, 4))
  76. def set_color(self, r: int = None, g: int = None, b: int = None, w: int = None, hex: str = None) -> Dict:
  77. """Set WLED color using RGB(W) values or hex color code"""
  78. if hex is not None:
  79. try:
  80. r, g, b = self._hex_to_rgb(hex)
  81. except ValueError as e:
  82. return {"connected": False, "message": str(e)}
  83. # Prepare segment with color
  84. seg = {"col": [[r or 0, g or 0, b or 0]]}
  85. if w is not None:
  86. if not 0 <= w <= 255:
  87. return {"connected": False, "message": "White value must be between 0 and 255"}
  88. seg["col"][0].append(w)
  89. return self._send_command({"seg": [seg]})
  90. def set_effect(self, effect_index: int, speed: int = None, intensity: int = None,
  91. brightness: int = None, palette: int = None,
  92. # Primary color
  93. r: int = None, g: int = None, b: int = None, w: int = None, hex: str = None,
  94. # Secondary color
  95. r2: int = None, g2: int = None, b2: int = None, w2: int = None, hex2: str = None,
  96. # Transition
  97. transition: int = 0) -> Dict:
  98. """
  99. Set WLED effect with optional parameters
  100. Args:
  101. effect_index: Effect index (0-101)
  102. speed: Effect speed (0-255)
  103. intensity: Effect intensity (0-255)
  104. brightness: LED brightness (0-255)
  105. palette: FastLED palette index (0-46)
  106. r, g, b: Primary RGB color values (0-255)
  107. w: Primary White value for RGBW (0-255)
  108. hex: Primary hex color code (e.g., '#ff0000' or 'ff0000')
  109. r2, g2, b2: Secondary RGB color values (0-255)
  110. w2: Secondary White value for RGBW (0-255)
  111. hex2: Secondary hex color code
  112. transition: Duration of crossfade in 100ms units (e.g. 7 = 700ms). Default 0 for instant change.
  113. """
  114. try:
  115. effect_index = int(effect_index)
  116. except (ValueError, TypeError):
  117. return {"connected": False, "message": "Effect index must be a valid integer between 0 and 101"}
  118. if not 0 <= effect_index <= 101:
  119. return {"connected": False, "message": "Effect index must be between 0 and 101"}
  120. # Convert primary hex to RGB if provided
  121. if hex is not None:
  122. try:
  123. r, g, b = self._hex_to_rgb(hex)
  124. except ValueError as e:
  125. return {"connected": False, "message": f"Primary color: {str(e)}"}
  126. # Convert secondary hex to RGB if provided
  127. if hex2 is not None:
  128. try:
  129. r2, g2, b2 = self._hex_to_rgb(hex2)
  130. except ValueError as e:
  131. return {"connected": False, "message": f"Secondary color: {str(e)}"}
  132. # Build segment parameters
  133. seg = {"fx": effect_index}
  134. if speed is not None:
  135. if not 0 <= speed <= 255:
  136. return {"connected": False, "message": "Speed must be between 0 and 255"}
  137. seg["sx"] = speed
  138. if intensity is not None:
  139. if not 0 <= intensity <= 255:
  140. return {"connected": False, "message": "Intensity must be between 0 and 255"}
  141. seg["ix"] = intensity
  142. # Prepare colors array
  143. colors = []
  144. # Add primary color
  145. primary = [r or 0, g or 0, b or 0]
  146. if w is not None:
  147. if not 0 <= w <= 255:
  148. return {"connected": False, "message": "Primary white value must be between 0 and 255"}
  149. primary.append(w)
  150. colors.append(primary)
  151. # Add secondary color if any secondary color parameter is provided
  152. if any(x is not None for x in [r2, g2, b2, w2, hex2]):
  153. secondary = [r2 or 0, g2 or 0, b2 or 0]
  154. if w2 is not None:
  155. if not 0 <= w2 <= 255:
  156. return {"connected": False, "message": "Secondary white value must be between 0 and 255"}
  157. secondary.append(w2)
  158. colors.append(secondary)
  159. if colors:
  160. seg["col"] = colors
  161. if palette is not None:
  162. if not 0 <= palette <= 46:
  163. return {"connected": False, "message": "Palette index must be between 0 and 46"}
  164. seg["pal"] = palette
  165. # Combine with global parameters
  166. state = {"seg": [seg], "transition": transition}
  167. if brightness is not None:
  168. if not 0 <= brightness <= 255:
  169. return {"connected": False, "message": "Brightness must be between 0 and 255"}
  170. state["bri"] = brightness
  171. return self._send_command(state)
  172. def set_preset(self, preset_id: int) -> bool:
  173. preset_id = int(preset_id)
  174. # Send the command and get response
  175. response = self._send_command({"ps": preset_id})
  176. logger.debug(response)
  177. return response
  178. def effect_loading(led_controller: LEDController):
  179. res = led_controller.set_effect(47, hex='#ffa000', hex2='#000000', palette=0, speed=150, intensity=150)
  180. if res.get('is_on', False):
  181. return True
  182. else:
  183. return False
  184. def effect_idle(led_controller: LEDController):
  185. led_controller.set_preset(1)
  186. def effect_connected(led_controller: LEDController):
  187. res = led_controller.set_effect(0, hex='#08ff00', brightness=100)
  188. time.sleep(1)
  189. led_controller.set_effect(0, brightness=0) # Turn off
  190. time.sleep(0.5)
  191. res = led_controller.set_effect(0, hex='#08ff00', brightness=100)
  192. time.sleep(1)
  193. effect_idle(led_controller)
  194. if res.get('is_on', False):
  195. return True
  196. else:
  197. return False
  198. def effect_playing(led_controller: LEDController):
  199. led_controller.set_preset(2)