1
0

handler.py 39 KB


  1. """Real MQTT handler implementation."""
  2. import os
  3. import threading
  4. import time
  5. import json
  6. from typing import Dict, Callable, List, Optional, Any
  7. import paho.mqtt.client as mqtt
  8. import logging
  9. import asyncio
  10. from functools import partial
  11. from .base import BaseMQTTHandler
  12. from modules.core.state import state
  13. from modules.core.pattern_manager import list_theta_rho_files
  14. from modules.core.playlist_manager import list_all_playlists
  15. logger = logging.getLogger(__name__)
  16. class MQTTHandler(BaseMQTTHandler):
  17. """Real implementation of MQTT handler."""
  18. def __init__(self, callback_registry: Dict[str, Callable]):
  19. # MQTT Configuration from environment variables
  20. self.broker = os.getenv('MQTT_BROKER')
  21. self.port = int(os.getenv('MQTT_PORT', '1883'))
  22. self.username = os.getenv('MQTT_USERNAME')
  23. self.password = os.getenv('MQTT_PASSWORD')
  24. self.client_id = os.getenv('MQTT_CLIENT_ID', 'dune_weaver')
  25. self.status_topic = os.getenv('MQTT_STATUS_TOPIC', 'dune_weaver/status')
  26. self.command_topic = os.getenv('MQTT_COMMAND_TOPIC', 'dune_weaver/command')
  27. self.status_interval = int(os.getenv('MQTT_STATUS_INTERVAL', '30'))
  28. # Store callback registry
  29. self.callback_registry = callback_registry
  30. # Threading control
  31. self.running = False
  32. self.status_thread = None
  33. # Home Assistant MQTT Discovery settings
  34. self.discovery_prefix = os.getenv('MQTT_DISCOVERY_PREFIX', 'homeassistant')
  35. self.device_name = os.getenv('HA_DEVICE_NAME', 'Dune Weaver')
  36. self.device_id = os.getenv('HA_DEVICE_ID', 'dune_weaver')
  37. # Additional topics for state
  38. self.running_state_topic = f"{self.device_id}/state/running"
  39. self.serial_state_topic = f"{self.device_id}/state/serial"
  40. self.pattern_select_topic = f"{self.device_id}/pattern/set"
  41. self.playlist_select_topic = f"{self.device_id}/playlist/set"
  42. self.speed_topic = f"{self.device_id}/speed/set"
  43. self.completion_topic = f"{self.device_id}/state/completion"
  44. self.time_remaining_topic = f"{self.device_id}/state/time_remaining"
  45. # LED control topics
  46. self.led_power_topic = f"{self.device_id}/led/power/set"
  47. self.led_brightness_topic = f"{self.device_id}/led/brightness/set"
  48. self.led_effect_topic = f"{self.device_id}/led/effect/set"
  49. self.led_speed_topic = f"{self.device_id}/led/speed/set"
  50. self.led_intensity_topic = f"{self.device_id}/led/intensity/set"
  51. self.led_color_topic = f"{self.device_id}/led/color/set"
  52. # Store current state
  53. self.current_file = ""
  54. self.is_running_state = False
  55. self.serial_state = ""
  56. self.patterns = []
  57. self.playlists = []
  58. # Initialize MQTT client if broker is configured
  59. if self.broker:
  60. self.client = mqtt.Client(client_id=self.client_id)
  61. self.client.on_connect = self.on_connect
  62. self.client.on_message = self.on_message
  63. if self.username and self.password:
  64. self.client.username_pw_set(self.username, self.password)
  65. self.state = state
  66. self.state.mqtt_handler = self # Set reference to self in state, needed so that state setters can update the state
  67. # Store the main event loop during initialization
  68. self.main_loop = asyncio.get_event_loop()
  69. def setup_ha_discovery(self):
  70. """Publish Home Assistant MQTT discovery configurations."""
  71. if not self.is_enabled:
  72. return
  73. base_device = {
  74. "identifiers": [self.device_id],
  75. "name": self.device_name,
  76. "model": "Dune Weaver",
  77. "manufacturer": "DIY"
  78. }
  79. # Serial State Sensor
  80. serial_config = {
  81. "name": f"{self.device_name} Serial State",
  82. "unique_id": f"{self.device_id}_serial_state",
  83. "state_topic": self.serial_state_topic,
  84. "device": base_device,
  85. "icon": "mdi:serial-port",
  86. "entity_category": "diagnostic"
  87. }
  88. self._publish_discovery("sensor", "serial_state", serial_config)
  89. # Running State Sensor
  90. running_config = {
  91. "name": f"{self.device_name} Running State",
  92. "unique_id": f"{self.device_id}_running_state",
  93. "state_topic": self.running_state_topic,
  94. "device": base_device,
  95. "icon": "mdi:machine",
  96. "entity_category": "diagnostic"
  97. }
  98. self._publish_discovery("sensor", "running_state", running_config)
  99. # Stop Button
  100. stop_config = {
  101. "name": f"Stop pattern execution",
  102. "unique_id": f"{self.device_id}_stop",
  103. "command_topic": f"{self.device_id}/command/stop",
  104. "device": base_device,
  105. "icon": "mdi:stop",
  106. "entity_category": "config"
  107. }
  108. self._publish_discovery("button", "stop", stop_config)
  109. # Pause Button
  110. pause_config = {
  111. "name": f"Pause pattern execution",
  112. "unique_id": f"{self.device_id}_pause",
  113. "command_topic": f"{self.device_id}/command/pause",
  114. "state_topic": f"{self.device_id}/command/pause/state",
  115. "device": base_device,
  116. "icon": "mdi:pause",
  117. "entity_category": "config",
  118. "enabled_by_default": True,
  119. "availability": {
  120. "topic": f"{self.device_id}/command/pause/available",
  121. "payload_available": "true",
  122. "payload_not_available": "false"
  123. }
  124. }
  125. self._publish_discovery("button", "pause", pause_config)
  126. # Play Button
  127. play_config = {
  128. "name": f"Resume pattern execution",
  129. "unique_id": f"{self.device_id}_play",
  130. "command_topic": f"{self.device_id}/command/play",
  131. "state_topic": f"{self.device_id}/command/play/state",
  132. "device": base_device,
  133. "icon": "mdi:play",
  134. "entity_category": "config",
  135. "enabled_by_default": True,
  136. "availability": {
  137. "topic": f"{self.device_id}/command/play/available",
  138. "payload_available": "true",
  139. "payload_not_available": "false"
  140. }
  141. }
  142. self._publish_discovery("button", "play", play_config)
  143. # Speed Control
  144. speed_config = {
  145. "name": f"{self.device_name} Speed",
  146. "unique_id": f"{self.device_id}_speed",
  147. "command_topic": self.speed_topic,
  148. "state_topic": f"{self.speed_topic}/state",
  149. "device": base_device,
  150. "icon": "mdi:speedometer",
  151. "mode": "box",
  152. "min": 50,
  153. "max": 2000,
  154. "step": 50
  155. }
  156. self._publish_discovery("number", "speed", speed_config)
  157. # Pattern Select
  158. pattern_config = {
  159. "name": f"{self.device_name} Pattern",
  160. "unique_id": f"{self.device_id}_pattern",
  161. "command_topic": self.pattern_select_topic,
  162. "state_topic": f"{self.pattern_select_topic}/state",
  163. "options": self.patterns,
  164. "device": base_device,
  165. "icon": "mdi:draw"
  166. }
  167. self._publish_discovery("select", "pattern", pattern_config)
  168. # Playlist Select
  169. playlist_config = {
  170. "name": f"{self.device_name} Playlist",
  171. "unique_id": f"{self.device_id}_playlist",
  172. "command_topic": self.playlist_select_topic,
  173. "state_topic": f"{self.playlist_select_topic}/state",
  174. "options": self.playlists,
  175. "device": base_device,
  176. "icon": "mdi:playlist-play"
  177. }
  178. self._publish_discovery("select", "playlist", playlist_config)
  179. # Playlist Run Mode Select
  180. playlist_mode_config = {
  181. "name": f"{self.device_name} Playlist Mode",
  182. "unique_id": f"{self.device_id}_playlist_mode",
  183. "command_topic": f"{self.device_id}/playlist/mode/set",
  184. "state_topic": f"{self.device_id}/playlist/mode/state",
  185. "options": ["single", "loop"],
  186. "device": base_device,
  187. "icon": "mdi:repeat",
  188. "entity_category": "config"
  189. }
  190. self._publish_discovery("select", "playlist_mode", playlist_mode_config)
  191. # Playlist Pause Time Number Input
  192. pause_time_config = {
  193. "name": f"{self.device_name} Playlist Pause Time",
  194. "unique_id": f"{self.device_id}_pause_time",
  195. "command_topic": f"{self.device_id}/playlist/pause_time/set",
  196. "state_topic": f"{self.device_id}/playlist/pause_time/state",
  197. "device": base_device,
  198. "icon": "mdi:timer",
  199. "entity_category": "config",
  200. "mode": "box",
  201. "unit_of_measurement": "seconds",
  202. "min": 0,
  203. "max": 86400,
  204. }
  205. self._publish_discovery("number", "pause_time", pause_time_config)
  206. # Clear Pattern Select
  207. clear_pattern_config = {
  208. "name": f"{self.device_name} Clear Pattern",
  209. "unique_id": f"{self.device_id}_clear_pattern",
  210. "command_topic": f"{self.device_id}/playlist/clear_pattern/set",
  211. "state_topic": f"{self.device_id}/playlist/clear_pattern/state",
  212. "options": ["none", "random", "adaptive", "clear_from_in", "clear_from_out", "clear_sideway"],
  213. "device": base_device,
  214. "icon": "mdi:eraser",
  215. "entity_category": "config"
  216. }
  217. self._publish_discovery("select", "clear_pattern", clear_pattern_config)
  218. # Completion Percentage Sensor
  219. completion_config = {
  220. "name": f"{self.device_name} Completion",
  221. "unique_id": f"{self.device_id}_completion",
  222. "state_topic": self.completion_topic,
  223. "device": base_device,
  224. "icon": "mdi:progress-clock",
  225. "unit_of_measurement": "%",
  226. "state_class": "measurement",
  227. "entity_category": "diagnostic"
  228. }
  229. self._publish_discovery("sensor", "completion", completion_config)
  230. # Time Remaining Sensor
  231. time_remaining_config = {
  232. "name": f"{self.device_name} Time Remaining",
  233. "unique_id": f"{self.device_id}_time_remaining",
  234. "state_topic": self.time_remaining_topic,
  235. "device": base_device,
  236. "icon": "mdi:timer-sand",
  237. "unit_of_measurement": "s",
  238. "device_class": "duration",
  239. "state_class": "measurement",
  240. "entity_category": "diagnostic"
  241. }
  242. self._publish_discovery("sensor", "time_remaining", time_remaining_config)
  243. # LED Control Entities (only for DW LEDs - WLED has its own MQTT integration)
  244. if state.led_provider == "dw_leds":
  245. # LED Power Switch
  246. led_power_config = {
  247. "name": f"{self.device_name} LED Power",
  248. "unique_id": f"{self.device_id}_led_power",
  249. "command_topic": self.led_power_topic,
  250. "state_topic": f"{self.device_id}/led/power/state",
  251. "payload_on": "ON",
  252. "payload_off": "OFF",
  253. "device": base_device,
  254. "icon": "mdi:lightbulb",
  255. "optimistic": False
  256. }
  257. self._publish_discovery("switch", "led_power", led_power_config)
  258. # LED Brightness Control
  259. led_brightness_config = {
  260. "name": f"{self.device_name} LED Brightness",
  261. "unique_id": f"{self.device_id}_led_brightness",
  262. "command_topic": self.led_brightness_topic,
  263. "state_topic": f"{self.device_id}/led/brightness/state",
  264. "device": base_device,
  265. "icon": "mdi:brightness-6",
  266. "min": 0,
  267. "max": 100,
  268. "mode": "slider"
  269. }
  270. self._publish_discovery("number", "led_brightness", led_brightness_config)
  271. # LED Effect Selector
  272. led_effect_options = [
  273. "Static", "Blink", "Breathe", "Wipe", "Fade", "Scan", "Dual Scan",
  274. "Rainbow Cycle", "Rainbow", "Theater Chase", "Running Lights",
  275. "Random Color", "Dynamic", "Twinkle", "Sparkle", "Strobe", "Fire",
  276. "Comet", "Chase", "Police", "Lightning", "Fireworks", "Ripple", "Flow",
  277. "Colorloop", "Palette Flow", "Gradient", "Multi Strobe", "Waves", "BPM",
  278. "Juggle", "Meteor", "Pride", "Pacifica", "Plasma", "Dissolve", "Glitter",
  279. "Confetti", "Sinelon", "Candle", "Aurora", "Rain", "Halloween", "Noise",
  280. "Funky Plank"
  281. ]
  282. led_effect_config = {
  283. "name": f"{self.device_name} LED Effect",
  284. "unique_id": f"{self.device_id}_led_effect",
  285. "command_topic": self.led_effect_topic,
  286. "state_topic": f"{self.device_id}/led/effect/state",
  287. "options": led_effect_options,
  288. "device": base_device,
  289. "icon": "mdi:palette"
  290. }
  291. self._publish_discovery("select", "led_effect", led_effect_config)
  292. # LED Speed Control
  293. led_speed_config = {
  294. "name": f"{self.device_name} LED Speed",
  295. "unique_id": f"{self.device_id}_led_speed",
  296. "command_topic": self.led_speed_topic,
  297. "state_topic": f"{self.device_id}/led/speed/state",
  298. "device": base_device,
  299. "icon": "mdi:speedometer",
  300. "min": 0,
  301. "max": 255,
  302. "mode": "slider"
  303. }
  304. self._publish_discovery("number", "led_speed", led_speed_config)
  305. # LED Intensity Control
  306. led_intensity_config = {
  307. "name": f"{self.device_name} LED Intensity",
  308. "unique_id": f"{self.device_id}_led_intensity",
  309. "command_topic": self.led_intensity_topic,
  310. "state_topic": f"{self.device_id}/led/intensity/state",
  311. "device": base_device,
  312. "icon": "mdi:brightness-7",
  313. "min": 0,
  314. "max": 255,
  315. "mode": "slider"
  316. }
  317. self._publish_discovery("number", "led_intensity", led_intensity_config)
  318. # LED RGB Color Control
  319. led_color_config = {
  320. "name": f"{self.device_name} LED Color",
  321. "unique_id": f"{self.device_id}_led_color",
  322. "command_topic": self.led_color_topic,
  323. "state_topic": f"{self.device_id}/led/color/state",
  324. "rgb_command_topic": self.led_color_topic,
  325. "rgb_state_topic": f"{self.device_id}/led/color/state",
  326. "device": base_device,
  327. "icon": "mdi:palette-swatch",
  328. "schema": "json",
  329. "rgb": True
  330. }
  331. self._publish_discovery("light", "led_color", led_color_config)
  332. def _publish_discovery(self, component: str, config_type: str, config: dict):
  333. """Helper method to publish HA discovery configs."""
  334. if not self.is_enabled:
  335. return
  336. discovery_topic = f"{self.discovery_prefix}/{component}/{self.device_id}/{config_type}/config"
  337. self.client.publish(discovery_topic, json.dumps(config), retain=True)
  338. def _publish_running_state(self, running_state=None):
  339. """Helper to publish running state and button availability."""
  340. if running_state is None:
  341. if not self.state.current_playing_file:
  342. running_state = "idle"
  343. elif self.state.pause_requested:
  344. running_state = "paused"
  345. else:
  346. running_state = "running"
  347. self.client.publish(self.running_state_topic, running_state, retain=True)
  348. # Update button availability based on state
  349. self.client.publish(f"{self.device_id}/command/pause/available",
  350. "true" if running_state == "running" else "false",
  351. retain=True)
  352. self.client.publish(f"{self.device_id}/command/play/available",
  353. "true" if running_state == "paused" else "false",
  354. retain=True)
  355. def _publish_pattern_state(self, current_file=None):
  356. """Helper to publish pattern state."""
  357. if current_file is None:
  358. current_file = self.state.current_playing_file
  359. if current_file:
  360. if current_file.startswith('./patterns/'):
  361. current_file = current_file[len('./patterns/'):]
  362. else:
  363. current_file = current_file.split("/")[-1].split("\\")[-1]
  364. self.client.publish(f"{self.pattern_select_topic}/state", current_file, retain=True)
  365. else:
  366. # Clear the pattern selection
  367. self.client.publish(f"{self.pattern_select_topic}/state", "None", retain=True)
  368. def _publish_playlist_state(self, playlist_name=None):
  369. """Helper to publish playlist state."""
  370. if playlist_name is None:
  371. playlist_name = self.state.current_playlist_name
  372. if playlist_name:
  373. self.client.publish(f"{self.playlist_select_topic}/state", playlist_name, retain=True)
  374. else:
  375. # Clear the playlist selection
  376. self.client.publish(f"{self.playlist_select_topic}/state", "None", retain=True)
  377. def _publish_serial_state(self):
  378. """Helper to publish serial state."""
  379. serial_connected = (state.conn.is_connected() if state.conn else False)
  380. serial_port = state.port if serial_connected else None
  381. serial_status = f"connected to {serial_port}" if serial_connected else "disconnected"
  382. self.client.publish(self.serial_state_topic, serial_status, retain=True)
  383. def _publish_progress_state(self):
  384. """Helper to publish completion percentage and time remaining."""
  385. if state.execution_progress:
  386. current, total, remaining_time, elapsed_time = state.execution_progress
  387. completion_percentage = (current / total * 100) if total > 0 else 0
  388. # Publish completion percentage (rounded to 1 decimal place)
  389. self.client.publish(self.completion_topic, round(completion_percentage, 1), retain=True)
  390. # Publish time remaining (rounded to nearest second, defaulting to 0 if None)
  391. time_remaining_seconds = round(remaining_time) if remaining_time is not None else 0
  392. self.client.publish(self.time_remaining_topic, max(0, time_remaining_seconds), retain=True)
  393. else:
  394. # No pattern running, publish zeros
  395. self.client.publish(self.completion_topic, 0, retain=True)
  396. self.client.publish(self.time_remaining_topic, 0, retain=True)
  397. def _publish_led_state(self):
  398. """Helper to publish LED state to MQTT (DW LEDs only - WLED has its own MQTT)."""
  399. if not state.led_controller or state.led_provider != "dw_leds":
  400. return
  401. try:
  402. status = state.led_controller.check_status()
  403. if not status.get("connected", False):
  404. return
  405. # Publish power state
  406. power_state = "ON" if status.get("power", False) else "OFF"
  407. self.client.publish(f"{self.device_id}/led/power/state", power_state, retain=True)
  408. # Publish brightness (convert from 0-1 to 0-100)
  409. if "brightness" in status:
  410. brightness = int(status["brightness"] * 100)
  411. self.client.publish(f"{self.device_id}/led/brightness/state", brightness, retain=True)
  412. # Publish effect
  413. if "effect_id" in status:
  414. effect_map = {
  415. 0: "Static", 1: "Blink", 2: "Breathe", 3: "Wipe", 4: "Fade",
  416. 5: "Scan", 6: "Dual Scan", 7: "Rainbow Cycle", 8: "Rainbow",
  417. 9: "Theater Chase", 10: "Running Lights", 11: "Random Color",
  418. 12: "Dynamic", 13: "Twinkle", 14: "Sparkle", 15: "Strobe",
  419. 16: "Fire", 17: "Comet", 18: "Chase", 19: "Police", 20: "Lightning",
  420. 21: "Fireworks", 22: "Ripple", 23: "Flow", 24: "Colorloop",
  421. 25: "Palette Flow", 26: "Gradient", 27: "Multi Strobe", 28: "Waves",
  422. 29: "BPM", 30: "Juggle", 31: "Meteor", 32: "Pride", 33: "Pacifica",
  423. 34: "Plasma", 35: "Dissolve", 36: "Glitter", 37: "Confetti",
  424. 38: "Sinelon", 39: "Candle", 40: "Aurora", 41: "Rain",
  425. 42: "Halloween", 43: "Noise", 44: "Funky Plank"
  426. }
  427. effect_name = effect_map.get(status["effect_id"], "Static")
  428. self.client.publish(f"{self.device_id}/led/effect/state", effect_name, retain=True)
  429. # Publish speed
  430. if "speed" in status:
  431. self.client.publish(f"{self.device_id}/led/speed/state", status["speed"], retain=True)
  432. # Publish intensity
  433. if "intensity" in status:
  434. self.client.publish(f"{self.device_id}/led/intensity/state", status["intensity"], retain=True)
  435. # Publish color (RGB)
  436. if "colors" in status and len(status["colors"]) > 0:
  437. # colors is array of hex strings like ["#ff0000", "#00ff00", "#0000ff"]
  438. # Convert first color to RGB dict
  439. color_hex = status["colors"][0]
  440. if color_hex and color_hex.startswith('#') and len(color_hex) == 7:
  441. r = int(color_hex[1:3], 16)
  442. g = int(color_hex[3:5], 16)
  443. b = int(color_hex[5:7], 16)
  444. self.client.publish(f"{self.device_id}/led/color/state",
  445. json.dumps({"r": r, "g": g, "b": b}), retain=True)
  446. except Exception as e:
  447. logger.error(f"Error publishing LED state: {e}")
  448. def update_state(self, current_file=None, is_running=None, playlist=None, playlist_name=None):
  449. """Update state in Home Assistant. Only publishes the attributes that are explicitly passed."""
  450. if not self.is_enabled:
  451. return
  452. # Update pattern state if current_file is provided
  453. if current_file is not None:
  454. self._publish_pattern_state(current_file)
  455. # Update running state and button availability if is_running is provided
  456. if is_running is not None:
  457. running_state = "running" if is_running else "paused" if self.state.current_playing_file else "idle"
  458. self._publish_running_state(running_state)
  459. # Update playlist state if playlist info is provided
  460. if playlist_name is not None:
  461. self._publish_playlist_state(playlist_name)
  462. def on_connect(self, client, userdata, flags, rc):
  463. """Callback when connected to MQTT broker."""
  464. if rc == 0:
  465. logger.info("MQTT Connection Accepted.")
  466. # Subscribe to command topics
  467. client.subscribe([
  468. (self.command_topic, 0),
  469. (self.pattern_select_topic, 0),
  470. (self.playlist_select_topic, 0),
  471. (self.speed_topic, 0),
  472. (f"{self.device_id}/command/stop", 0),
  473. (f"{self.device_id}/command/pause", 0),
  474. (f"{self.device_id}/command/play", 0),
  475. (f"{self.device_id}/playlist/mode/set", 0),
  476. (f"{self.device_id}/playlist/pause_time/set", 0),
  477. (f"{self.device_id}/playlist/clear_pattern/set", 0),
  478. (self.led_power_topic, 0),
  479. (self.led_brightness_topic, 0),
  480. (self.led_effect_topic, 0),
  481. (self.led_speed_topic, 0),
  482. (self.led_intensity_topic, 0),
  483. (self.led_color_topic, 0),
  484. ])
  485. # Publish discovery configurations
  486. self.setup_ha_discovery()
  487. elif rc == 1:
  488. logger.error("MQTT Connection Refused. Protocol level not supported.")
  489. elif rc == 2:
  490. logger.error("MQTT Connection Refused. The client-identifier is not allowed by the server.")
  491. elif rc == 3:
  492. logger.error("MQTT Connection Refused. The MQTT service is not available.")
  493. elif rc == 4:
  494. logger.error("MQTT Connection Refused. The data in the username or password is malformed.")
  495. elif rc == 5:
  496. logger.error("MQTT Connection Refused. The client is not authorized to connect.")
  497. else:
  498. logger.error(f"MQTT Connection Refused. Unknown error code: {rc}")
  499. def on_message(self, client, userdata, msg):
  500. """Callback when message is received."""
  501. try:
  502. if msg.topic == self.pattern_select_topic:
  503. from modules.core.pattern_manager import THETA_RHO_DIR
  504. # Handle pattern selection
  505. pattern_name = msg.payload.decode()
  506. if pattern_name in self.patterns:
  507. # Schedule the coroutine to run in the main event loop
  508. asyncio.run_coroutine_threadsafe(
  509. self.callback_registry['run_pattern'](file_path=f"{THETA_RHO_DIR}/{pattern_name}"),
  510. self.main_loop
  511. ).add_done_callback(
  512. lambda _: self._publish_pattern_state(None) # Clear pattern after execution
  513. )
  514. self.client.publish(f"{self.pattern_select_topic}/state", pattern_name, retain=True)
  515. elif msg.topic == self.playlist_select_topic:
  516. # Handle playlist selection
  517. playlist_name = msg.payload.decode()
  518. if playlist_name in self.playlists:
  519. # Schedule the coroutine to run in the main event loop
  520. asyncio.run_coroutine_threadsafe(
  521. self.callback_registry['run_playlist'](
  522. playlist_name=playlist_name,
  523. run_mode=self.state.playlist_mode,
  524. pause_time=self.state.pause_time,
  525. clear_pattern=self.state.clear_pattern
  526. ),
  527. self.main_loop
  528. ).add_done_callback(
  529. lambda _: self._publish_playlist_state(None) # Clear playlist after execution
  530. )
  531. self.client.publish(f"{self.playlist_select_topic}/state", playlist_name, retain=True)
  532. elif msg.topic == self.speed_topic:
  533. speed = int(msg.payload.decode())
  534. self.callback_registry['set_speed'](speed)
  535. elif msg.topic == f"{self.device_id}/command/stop":
  536. # Handle stop command
  537. callback = self.callback_registry['stop']
  538. if asyncio.iscoroutinefunction(callback):
  539. asyncio.run_coroutine_threadsafe(callback(), self.main_loop)
  540. else:
  541. callback()
  542. # Clear both pattern and playlist selections
  543. self._publish_pattern_state(None)
  544. self._publish_playlist_state(None)
  545. elif msg.topic == f"{self.device_id}/command/pause":
  546. # Handle pause command - only if in running state
  547. if bool(self.state.current_playing_file) and not self.state.pause_requested:
  548. # Check if callback is async or sync
  549. callback = self.callback_registry['pause']
  550. if asyncio.iscoroutinefunction(callback):
  551. asyncio.run_coroutine_threadsafe(callback(), self.main_loop)
  552. else:
  553. callback()
  554. elif msg.topic == f"{self.device_id}/command/play":
  555. # Handle play command - only if in paused state
  556. if bool(self.state.current_playing_file) and self.state.pause_requested:
  557. # Check if callback is async or sync
  558. callback = self.callback_registry['resume']
  559. if asyncio.iscoroutinefunction(callback):
  560. asyncio.run_coroutine_threadsafe(callback(), self.main_loop)
  561. else:
  562. callback()
  563. elif msg.topic == f"{self.device_id}/playlist/mode/set":
  564. mode = msg.payload.decode()
  565. if mode in ["single", "loop"]:
  566. state.playlist_mode = mode
  567. self.client.publish(f"{self.device_id}/playlist/mode/state", mode, retain=True)
  568. elif msg.topic == f"{self.device_id}/playlist/pause_time/set":
  569. pause_time = float(msg.payload.decode())
  570. if 0 <= pause_time <= 60:
  571. state.pause_time = pause_time
  572. self.client.publish(f"{self.device_id}/playlist/pause_time/state", pause_time, retain=True)
  573. elif msg.topic == f"{self.device_id}/playlist/clear_pattern/set":
  574. clear_pattern = msg.payload.decode()
  575. if clear_pattern in ["none", "random", "adaptive", "clear_from_in", "clear_from_out", "clear_sideway"]:
  576. state.clear_pattern = clear_pattern
  577. self.client.publish(f"{self.device_id}/playlist/clear_pattern/state", clear_pattern, retain=True)
  578. elif msg.topic == self.led_power_topic:
  579. # Handle LED power command (DW LEDs only)
  580. payload = msg.payload.decode()
  581. if state.led_controller and state.led_provider == "dw_leds":
  582. power_state = 1 if payload == "ON" else 0
  583. state.led_controller.set_power(power_state)
  584. self.client.publish(f"{self.device_id}/led/power/state", payload, retain=True)
  585. elif msg.topic == self.led_brightness_topic:
  586. # Handle LED brightness command (DW LEDs only)
  587. brightness = int(msg.payload.decode())
  588. if 0 <= brightness <= 100 and state.led_controller and state.led_provider == "dw_leds":
  589. controller = state.led_controller.get_controller()
  590. if controller and hasattr(controller, 'set_brightness'):
  591. controller.set_brightness(brightness / 100.0)
  592. self.client.publish(f"{self.device_id}/led/brightness/state", brightness, retain=True)
  593. elif msg.topic == self.led_effect_topic:
  594. # Handle LED effect command (DW LEDs only)
  595. effect_name = msg.payload.decode()
  596. if state.led_controller and state.led_provider == "dw_leds":
  597. # Map effect name to ID
  598. effect_map = {
  599. "Static": 0, "Blink": 1, "Breathe": 2, "Wipe": 3, "Fade": 4,
  600. "Scan": 5, "Dual Scan": 6, "Rainbow Cycle": 7, "Rainbow": 8,
  601. "Theater Chase": 9, "Running Lights": 10, "Random Color": 11,
  602. "Dynamic": 12, "Twinkle": 13, "Sparkle": 14, "Strobe": 15,
  603. "Fire": 16, "Comet": 17, "Chase": 18, "Police": 19, "Lightning": 20,
  604. "Fireworks": 21, "Ripple": 22, "Flow": 23, "Colorloop": 24,
  605. "Palette Flow": 25, "Gradient": 26, "Multi Strobe": 27, "Waves": 28,
  606. "BPM": 29, "Juggle": 30, "Meteor": 31, "Pride": 32, "Pacifica": 33,
  607. "Plasma": 34, "Dissolve": 35, "Glitter": 36, "Confetti": 37,
  608. "Sinelon": 38, "Candle": 39, "Aurora": 40, "Rain": 41,
  609. "Halloween": 42, "Noise": 43, "Funky Plank": 44
  610. }
  611. effect_id = effect_map.get(effect_name)
  612. if effect_id is not None:
  613. controller = state.led_controller.get_controller()
  614. if controller and hasattr(controller, 'set_effect'):
  615. controller.set_effect(effect_id)
  616. self.client.publish(f"{self.device_id}/led/effect/state", effect_name, retain=True)
  617. elif msg.topic == self.led_speed_topic:
  618. # Handle LED speed command (DW LEDs only)
  619. speed = int(msg.payload.decode())
  620. if 0 <= speed <= 255 and state.led_controller and state.led_provider == "dw_leds":
  621. controller = state.led_controller.get_controller()
  622. if controller and hasattr(controller, 'set_speed'):
  623. controller.set_speed(speed)
  624. self.client.publish(f"{self.device_id}/led/speed/state", speed, retain=True)
  625. elif msg.topic == self.led_intensity_topic:
  626. # Handle LED intensity command (DW LEDs only)
  627. intensity = int(msg.payload.decode())
  628. if 0 <= intensity <= 255 and state.led_controller and state.led_provider == "dw_leds":
  629. controller = state.led_controller.get_controller()
  630. if controller and hasattr(controller, 'set_intensity'):
  631. controller.set_intensity(intensity)
  632. self.client.publish(f"{self.device_id}/led/intensity/state", intensity, retain=True)
  633. elif msg.topic == self.led_color_topic:
  634. # Handle LED color command (RGB) (DW LEDs only)
  635. try:
  636. color_data = json.loads(msg.payload.decode())
  637. if state.led_controller and state.led_provider == "dw_leds" and 'r' in color_data and 'g' in color_data and 'b' in color_data:
  638. controller = state.led_controller.get_controller()
  639. if controller and hasattr(controller, 'set_color'):
  640. r, g, b = color_data['r'], color_data['g'], color_data['b']
  641. controller.set_color(r, g, b)
  642. self.client.publish(f"{self.device_id}/led/color/state",
  643. json.dumps({"r": r, "g": g, "b": b}), retain=True)
  644. except json.JSONDecodeError:
  645. logger.error(f"Invalid JSON for color command: {msg.payload}")
  646. else:
  647. # Handle other commands
  648. payload = json.loads(msg.payload.decode())
  649. command = payload.get('command')
  650. params = payload.get('params', {})
  651. if command in self.callback_registry:
  652. self.callback_registry[command](**params)
  653. else:
  654. logger.error(f"Unknown command received: {command}")
  655. except json.JSONDecodeError:
  656. logger.error(f"Invalid JSON payload received: {msg.payload}")
  657. except Exception as e:
  658. logger.error(f"Error processing MQTT message: {e}")
  659. def publish_status(self):
  660. """Publish status updates periodically."""
  661. while self.running:
  662. try:
  663. # Update all states
  664. self._publish_running_state()
  665. self._publish_pattern_state()
  666. self._publish_playlist_state()
  667. self._publish_serial_state()
  668. self._publish_progress_state()
  669. # Update speed state
  670. self.client.publish(f"{self.speed_topic}/state", self.state.speed, retain=True)
  671. # Update LED state
  672. self._publish_led_state()
  673. # Publish keepalive status
  674. status = {
  675. "timestamp": time.time(),
  676. "client_id": self.client_id
  677. }
  678. self.client.publish(self.status_topic, json.dumps(status))
  679. # Wait for next interval
  680. time.sleep(self.status_interval)
  681. except Exception as e:
  682. logger.error(f"Error publishing status: {e}")
  683. time.sleep(5) # Wait before retry
  684. def start(self) -> None:
  685. """Start the MQTT handler."""
  686. if not self.is_enabled:
  687. return
  688. try:
  689. self.client.connect(self.broker, self.port)
  690. self.client.loop_start()
  691. # Start status publishing thread
  692. self.running = True
  693. self.status_thread = threading.Thread(target=self.publish_status, daemon=True)
  694. self.status_thread.start()
  695. # Get initial pattern and playlist lists
  696. self.patterns = list_theta_rho_files()
  697. self.playlists = list_all_playlists()
  698. # Wait a bit for MQTT connection to establish
  699. time.sleep(1)
  700. # Publish initial states
  701. self._publish_running_state()
  702. self._publish_pattern_state()
  703. self._publish_playlist_state()
  704. self._publish_serial_state()
  705. self._publish_progress_state()
  706. self._publish_led_state()
  707. # Setup Home Assistant discovery
  708. self.setup_ha_discovery()
  709. logger.info("MQTT Handler started successfully")
  710. except Exception as e:
  711. logger.error(f"Failed to start MQTT Handler: {e}")
  712. def stop(self) -> None:
  713. """Stop the MQTT handler."""
  714. if not self.is_enabled:
  715. return
  716. # First stop the running flag to prevent new iterations
  717. self.running = False
  718. # Clean up status thread
  719. local_status_thread = self.status_thread # Keep a local reference
  720. if local_status_thread and local_status_thread.is_alive():
  721. try:
  722. local_status_thread.join(timeout=5)
  723. if local_status_thread.is_alive():
  724. logger.warning("MQTT status thread did not terminate cleanly")
  725. except Exception as e:
  726. logger.error(f"Error joining status thread: {e}")
  727. self.status_thread = None
  728. # Clean up MQTT client
  729. try:
  730. if hasattr(self, 'client'):
  731. self.client.loop_stop()
  732. self.client.disconnect()
  733. except Exception as e:
  734. logger.error(f"Error disconnecting MQTT client: {e}")
  735. # Clean up main loop reference
  736. self.main_loop = None
  737. logger.info("MQTT handler stopped")
  738. @property
  739. def is_enabled(self) -> bool:
  740. """Return whether MQTT functionality is enabled."""
  741. return bool(self.broker)