"""Real MQTT handler implementation.""" import os import threading import time import json from typing import Dict, Callable, List, Optional, Any import paho.mqtt.client as mqtt import logging import asyncio from functools import partial from .base import BaseMQTTHandler from modules.core.state import state from modules.core.pattern_manager import list_theta_rho_files from modules.core.playlist_manager import list_all_playlists logger = logging.getLogger(__name__) class MQTTHandler(BaseMQTTHandler): """Real implementation of MQTT handler.""" def __init__(self, callback_registry: Dict[str, Callable]): # MQTT Configuration from environment variables self.broker = os.getenv('MQTT_BROKER') self.port = int(os.getenv('MQTT_PORT', '1883')) self.username = os.getenv('MQTT_USERNAME') self.password = os.getenv('MQTT_PASSWORD') self.client_id = os.getenv('MQTT_CLIENT_ID', 'dune_weaver') self.status_topic = os.getenv('MQTT_STATUS_TOPIC', 'dune_weaver/status') self.command_topic = os.getenv('MQTT_COMMAND_TOPIC', 'dune_weaver/command') self.status_interval = int(os.getenv('MQTT_STATUS_INTERVAL', '30')) # Store callback registry self.callback_registry = callback_registry # Threading control self.running = False self.status_thread = None # Home Assistant MQTT Discovery settings self.discovery_prefix = os.getenv('MQTT_DISCOVERY_PREFIX', 'homeassistant') self.device_name = os.getenv('HA_DEVICE_NAME', 'Dune Weaver') self.device_id = os.getenv('HA_DEVICE_ID', 'dune_weaver') # Additional topics for state self.running_state_topic = f"{self.device_id}/state/running" self.serial_state_topic = f"{self.device_id}/state/serial" self.pattern_select_topic = f"{self.device_id}/pattern/set" self.playlist_select_topic = f"{self.device_id}/playlist/set" self.speed_topic = f"{self.device_id}/speed/set" self.completion_topic = f"{self.device_id}/state/completion" self.time_remaining_topic = f"{self.device_id}/state/time_remaining" # LED control topics self.led_power_topic = f"{self.device_id}/led/power/set" self.led_brightness_topic = f"{self.device_id}/led/brightness/set" self.led_effect_topic = f"{self.device_id}/led/effect/set" self.led_speed_topic = f"{self.device_id}/led/speed/set" self.led_intensity_topic = f"{self.device_id}/led/intensity/set" self.led_color_topic = f"{self.device_id}/led/color/set" # Store current state self.current_file = "" self.is_running_state = False self.serial_state = "" self.patterns = [] self.playlists = [] # Initialize MQTT client if broker is configured if self.broker: self.client = mqtt.Client(client_id=self.client_id) self.client.on_connect = self.on_connect self.client.on_message = self.on_message if self.username and self.password: self.client.username_pw_set(self.username, self.password) self.state = state self.state.mqtt_handler = self # Set reference to self in state, needed so that state setters can update the state # Store the main event loop during initialization self.main_loop = asyncio.get_event_loop() def setup_ha_discovery(self): """Publish Home Assistant MQTT discovery configurations.""" if not self.is_enabled: return base_device = { "identifiers": [self.device_id], "name": self.device_name, "model": "Dune Weaver", "manufacturer": "DIY" } # Serial State Sensor serial_config = { "name": f"{self.device_name} Serial State", "unique_id": f"{self.device_id}_serial_state", "state_topic": self.serial_state_topic, "device": base_device, "icon": "mdi:serial-port", "entity_category": "diagnostic" } self._publish_discovery("sensor", "serial_state", serial_config) # Running State Sensor running_config = { "name": f"{self.device_name} Running State", "unique_id": f"{self.device_id}_running_state", "state_topic": self.running_state_topic, "device": base_device, "icon": "mdi:machine", "entity_category": "diagnostic" } self._publish_discovery("sensor", "running_state", running_config) # Stop Button stop_config = { "name": f"Stop pattern execution", "unique_id": f"{self.device_id}_stop", "command_topic": f"{self.device_id}/command/stop", "device": base_device, "icon": "mdi:stop", "entity_category": "config" } self._publish_discovery("button", "stop", stop_config) # Pause Button pause_config = { "name": f"Pause pattern execution", "unique_id": f"{self.device_id}_pause", "command_topic": f"{self.device_id}/command/pause", "state_topic": f"{self.device_id}/command/pause/state", "device": base_device, "icon": "mdi:pause", "entity_category": "config", "enabled_by_default": True, "availability": { "topic": f"{self.device_id}/command/pause/available", "payload_available": "true", "payload_not_available": "false" } } self._publish_discovery("button", "pause", pause_config) # Play Button play_config = { "name": f"Resume pattern execution", "unique_id": f"{self.device_id}_play", "command_topic": f"{self.device_id}/command/play", "state_topic": f"{self.device_id}/command/play/state", "device": base_device, "icon": "mdi:play", "entity_category": "config", "enabled_by_default": True, "availability": { "topic": f"{self.device_id}/command/play/available", "payload_available": "true", "payload_not_available": "false" } } self._publish_discovery("button", "play", play_config) # Speed Control speed_config = { "name": f"{self.device_name} Speed", "unique_id": f"{self.device_id}_speed", "command_topic": self.speed_topic, "state_topic": f"{self.speed_topic}/state", "device": base_device, "icon": "mdi:speedometer", "mode": "box", "min": 50, "max": 2000, "step": 50 } self._publish_discovery("number", "speed", speed_config) # Pattern Select pattern_config = { "name": f"{self.device_name} Pattern", "unique_id": f"{self.device_id}_pattern", "command_topic": self.pattern_select_topic, "state_topic": f"{self.pattern_select_topic}/state", "options": self.patterns, "device": base_device, "icon": "mdi:draw" } self._publish_discovery("select", "pattern", pattern_config) # Playlist Select playlist_config = { "name": f"{self.device_name} Playlist", "unique_id": f"{self.device_id}_playlist", "command_topic": self.playlist_select_topic, "state_topic": f"{self.playlist_select_topic}/state", "options": self.playlists, "device": base_device, "icon": "mdi:playlist-play" } self._publish_discovery("select", "playlist", playlist_config) # Playlist Run Mode Select playlist_mode_config = { "name": f"{self.device_name} Playlist Mode", "unique_id": f"{self.device_id}_playlist_mode", "command_topic": f"{self.device_id}/playlist/mode/set", "state_topic": f"{self.device_id}/playlist/mode/state", "options": ["single", "loop"], "device": base_device, "icon": "mdi:repeat", "entity_category": "config" } self._publish_discovery("select", "playlist_mode", playlist_mode_config) # Playlist Pause Time Number Input pause_time_config = { "name": f"{self.device_name} Playlist Pause Time", "unique_id": f"{self.device_id}_pause_time", "command_topic": f"{self.device_id}/playlist/pause_time/set", "state_topic": f"{self.device_id}/playlist/pause_time/state", "device": base_device, "icon": "mdi:timer", "entity_category": "config", "mode": "box", "unit_of_measurement": "seconds", "min": 0, "max": 86400, } self._publish_discovery("number", "pause_time", pause_time_config) # Clear Pattern Select clear_pattern_config = { "name": f"{self.device_name} Clear Pattern", "unique_id": f"{self.device_id}_clear_pattern", "command_topic": f"{self.device_id}/playlist/clear_pattern/set", "state_topic": f"{self.device_id}/playlist/clear_pattern/state", "options": ["none", "random", "adaptive", "clear_from_in", "clear_from_out", "clear_sideway"], "device": base_device, "icon": "mdi:eraser", "entity_category": "config" } self._publish_discovery("select", "clear_pattern", clear_pattern_config) # Completion Percentage Sensor completion_config = { "name": f"{self.device_name} Completion", "unique_id": f"{self.device_id}_completion", "state_topic": self.completion_topic, "device": base_device, "icon": "mdi:progress-clock", "unit_of_measurement": "%", "state_class": "measurement", "entity_category": "diagnostic" } self._publish_discovery("sensor", "completion", completion_config) # Time Remaining Sensor time_remaining_config = { "name": f"{self.device_name} Time Remaining", "unique_id": f"{self.device_id}_time_remaining", "state_topic": self.time_remaining_topic, "device": base_device, "icon": "mdi:timer-sand", "unit_of_measurement": "s", "device_class": "duration", "state_class": "measurement", "entity_category": "diagnostic" } self._publish_discovery("sensor", "time_remaining", time_remaining_config) # LED Control Entities (only for DW LEDs - WLED has its own MQTT integration) if state.led_provider == "dw_leds": # LED Power Switch led_power_config = { "name": f"{self.device_name} LED Power", "unique_id": f"{self.device_id}_led_power", "command_topic": self.led_power_topic, "state_topic": f"{self.device_id}/led/power/state", "payload_on": "ON", "payload_off": "OFF", "device": base_device, "icon": "mdi:lightbulb", "optimistic": False } self._publish_discovery("switch", "led_power", led_power_config) # LED Brightness Control led_brightness_config = { "name": f"{self.device_name} LED Brightness", "unique_id": f"{self.device_id}_led_brightness", "command_topic": self.led_brightness_topic, "state_topic": f"{self.device_id}/led/brightness/state", "device": base_device, "icon": "mdi:brightness-6", "min": 0, "max": 100, "mode": "slider" } self._publish_discovery("number", "led_brightness", led_brightness_config) # LED Effect Selector led_effect_options = [ "Static", "Blink", "Breathe", "Wipe", "Fade", "Scan", "Dual Scan", "Rainbow Cycle", "Rainbow", "Theater Chase", "Running Lights", "Random Color", "Dynamic", "Twinkle", "Sparkle", "Strobe", "Fire", "Comet", "Chase", "Police", "Lightning", "Fireworks", "Ripple", "Flow", "Colorloop", "Palette Flow", "Gradient", "Multi Strobe", "Waves", "BPM", "Juggle", "Meteor", "Pride", "Pacifica", "Plasma", "Dissolve", "Glitter", "Confetti", "Sinelon", "Candle", "Aurora", "Rain", "Halloween", "Noise", "Funky Plank" ] led_effect_config = { "name": f"{self.device_name} LED Effect", "unique_id": f"{self.device_id}_led_effect", "command_topic": self.led_effect_topic, "state_topic": f"{self.device_id}/led/effect/state", "options": led_effect_options, "device": base_device, "icon": "mdi:palette" } self._publish_discovery("select", "led_effect", led_effect_config) # LED Speed Control led_speed_config = { "name": f"{self.device_name} LED Speed", "unique_id": f"{self.device_id}_led_speed", "command_topic": self.led_speed_topic, "state_topic": f"{self.device_id}/led/speed/state", "device": base_device, "icon": "mdi:speedometer", "min": 0, "max": 255, "mode": "slider" } self._publish_discovery("number", "led_speed", led_speed_config) # LED Intensity Control led_intensity_config = { "name": f"{self.device_name} LED Intensity", "unique_id": f"{self.device_id}_led_intensity", "command_topic": self.led_intensity_topic, "state_topic": f"{self.device_id}/led/intensity/state", "device": base_device, "icon": "mdi:brightness-7", "min": 0, "max": 255, "mode": "slider" } self._publish_discovery("number", "led_intensity", led_intensity_config) # LED RGB Color Control led_color_config = { "name": f"{self.device_name} LED Color", "unique_id": f"{self.device_id}_led_color", "command_topic": self.led_color_topic, "state_topic": f"{self.device_id}/led/color/state", "rgb_command_topic": self.led_color_topic, "rgb_state_topic": f"{self.device_id}/led/color/state", "device": base_device, "icon": "mdi:palette-swatch", "schema": "json", "rgb": True } self._publish_discovery("light", "led_color", led_color_config) def _publish_discovery(self, component: str, config_type: str, config: dict): """Helper method to publish HA discovery configs.""" if not self.is_enabled: return discovery_topic = f"{self.discovery_prefix}/{component}/{self.device_id}/{config_type}/config" self.client.publish(discovery_topic, json.dumps(config), retain=True) def _publish_running_state(self, running_state=None): """Helper to publish running state and button availability.""" if running_state is None: if not self.state.current_playing_file: running_state = "idle" elif self.state.pause_requested: running_state = "paused" else: running_state = "running" self.client.publish(self.running_state_topic, running_state, retain=True) # Update button availability based on state self.client.publish(f"{self.device_id}/command/pause/available", "true" if running_state == "running" else "false", retain=True) self.client.publish(f"{self.device_id}/command/play/available", "true" if running_state == "paused" else "false", retain=True) def _publish_pattern_state(self, current_file=None): """Helper to publish pattern state.""" if current_file is None: current_file = self.state.current_playing_file if current_file: if current_file.startswith('./patterns/'): current_file = current_file[len('./patterns/'):] else: current_file = current_file.split("/")[-1].split("\\")[-1] self.client.publish(f"{self.pattern_select_topic}/state", current_file, retain=True) else: # Clear the pattern selection self.client.publish(f"{self.pattern_select_topic}/state", "None", retain=True) def _publish_playlist_state(self, playlist_name=None): """Helper to publish playlist state.""" if playlist_name is None: playlist_name = self.state.current_playlist_name if playlist_name: self.client.publish(f"{self.playlist_select_topic}/state", playlist_name, retain=True) else: # Clear the playlist selection self.client.publish(f"{self.playlist_select_topic}/state", "None", retain=True) def _publish_serial_state(self): """Helper to publish serial state.""" serial_connected = (state.conn.is_connected() if state.conn else False) serial_port = state.port if serial_connected else None serial_status = f"connected to {serial_port}" if serial_connected else "disconnected" self.client.publish(self.serial_state_topic, serial_status, retain=True) def _publish_progress_state(self): """Helper to publish completion percentage and time remaining.""" if state.execution_progress: current, total, remaining_time, elapsed_time = state.execution_progress completion_percentage = (current / total * 100) if total > 0 else 0 # Publish completion percentage (rounded to 1 decimal place) self.client.publish(self.completion_topic, round(completion_percentage, 1), retain=True) # Publish time remaining (rounded to nearest second, defaulting to 0 if None) time_remaining_seconds = round(remaining_time) if remaining_time is not None else 0 self.client.publish(self.time_remaining_topic, max(0, time_remaining_seconds), retain=True) else: # No pattern running, publish zeros self.client.publish(self.completion_topic, 0, retain=True) self.client.publish(self.time_remaining_topic, 0, retain=True) def _publish_led_state(self): """Helper to publish LED state to MQTT (DW LEDs only - WLED has its own MQTT).""" if not state.led_controller or state.led_provider != "dw_leds": return try: status = state.led_controller.check_status() if not status.get("connected", False): return # Publish power state power_state = "ON" if status.get("power", False) else "OFF" self.client.publish(f"{self.device_id}/led/power/state", power_state, retain=True) # Publish brightness (convert from 0-1 to 0-100) if "brightness" in status: brightness = int(status["brightness"] * 100) self.client.publish(f"{self.device_id}/led/brightness/state", brightness, retain=True) # Publish effect if "effect_id" in status: effect_map = { 0: "Static", 1: "Blink", 2: "Breathe", 3: "Wipe", 4: "Fade", 5: "Scan", 6: "Dual Scan", 7: "Rainbow Cycle", 8: "Rainbow", 9: "Theater Chase", 10: "Running Lights", 11: "Random Color", 12: "Dynamic", 13: "Twinkle", 14: "Sparkle", 15: "Strobe", 16: "Fire", 17: "Comet", 18: "Chase", 19: "Police", 20: "Lightning", 21: "Fireworks", 22: "Ripple", 23: "Flow", 24: "Colorloop", 25: "Palette Flow", 26: "Gradient", 27: "Multi Strobe", 28: "Waves", 29: "BPM", 30: "Juggle", 31: "Meteor", 32: "Pride", 33: "Pacifica", 34: "Plasma", 35: "Dissolve", 36: "Glitter", 37: "Confetti", 38: "Sinelon", 39: "Candle", 40: "Aurora", 41: "Rain", 42: "Halloween", 43: "Noise", 44: "Funky Plank" } effect_name = effect_map.get(status["effect_id"], "Static") self.client.publish(f"{self.device_id}/led/effect/state", effect_name, retain=True) # Publish speed if "speed" in status: self.client.publish(f"{self.device_id}/led/speed/state", status["speed"], retain=True) # Publish intensity if "intensity" in status: self.client.publish(f"{self.device_id}/led/intensity/state", status["intensity"], retain=True) # Publish color (RGB) if "colors" in status and len(status["colors"]) > 0: # colors is array of hex strings like ["#ff0000", "#00ff00", "#0000ff"] # Convert first color to RGB dict color_hex = status["colors"][0] if color_hex and color_hex.startswith('#') and len(color_hex) == 7: r = int(color_hex[1:3], 16) g = int(color_hex[3:5], 16) b = int(color_hex[5:7], 16) self.client.publish(f"{self.device_id}/led/color/state", json.dumps({"r": r, "g": g, "b": b}), retain=True) except Exception as e: logger.error(f"Error publishing LED state: {e}") def update_state(self, current_file=None, is_running=None, playlist=None, playlist_name=None): """Update state in Home Assistant. Only publishes the attributes that are explicitly passed.""" if not self.is_enabled: return # Update pattern state if current_file is provided if current_file is not None: self._publish_pattern_state(current_file) # Update running state and button availability if is_running is provided if is_running is not None: running_state = "running" if is_running else "paused" if self.state.current_playing_file else "idle" self._publish_running_state(running_state) # Update playlist state if playlist info is provided if playlist_name is not None: self._publish_playlist_state(playlist_name) def on_connect(self, client, userdata, flags, rc): """Callback when connected to MQTT broker.""" if rc == 0: logger.info("MQTT Connection Accepted.") # Subscribe to command topics client.subscribe([ (self.command_topic, 0), (self.pattern_select_topic, 0), (self.playlist_select_topic, 0), (self.speed_topic, 0), (f"{self.device_id}/command/stop", 0), (f"{self.device_id}/command/pause", 0), (f"{self.device_id}/command/play", 0), (f"{self.device_id}/playlist/mode/set", 0), (f"{self.device_id}/playlist/pause_time/set", 0), (f"{self.device_id}/playlist/clear_pattern/set", 0), (self.led_power_topic, 0), (self.led_brightness_topic, 0), (self.led_effect_topic, 0), (self.led_speed_topic, 0), (self.led_intensity_topic, 0), (self.led_color_topic, 0), ]) # Publish discovery configurations self.setup_ha_discovery() elif rc == 1: logger.error("MQTT Connection Refused. Protocol level not supported.") elif rc == 2: logger.error("MQTT Connection Refused. The client-identifier is not allowed by the server.") elif rc == 3: logger.error("MQTT Connection Refused. The MQTT service is not available.") elif rc == 4: logger.error("MQTT Connection Refused. The data in the username or password is malformed.") elif rc == 5: logger.error("MQTT Connection Refused. The client is not authorized to connect.") else: logger.error(f"MQTT Connection Refused. Unknown error code: {rc}") def on_message(self, client, userdata, msg): """Callback when message is received.""" try: if msg.topic == self.pattern_select_topic: from modules.core.pattern_manager import THETA_RHO_DIR # Handle pattern selection pattern_name = msg.payload.decode() if pattern_name in self.patterns: # Schedule the coroutine to run in the main event loop asyncio.run_coroutine_threadsafe( self.callback_registry['run_pattern'](file_path=f"{THETA_RHO_DIR}/{pattern_name}"), self.main_loop ).add_done_callback( lambda _: self._publish_pattern_state(None) # Clear pattern after execution ) self.client.publish(f"{self.pattern_select_topic}/state", pattern_name, retain=True) elif msg.topic == self.playlist_select_topic: # Handle playlist selection playlist_name = msg.payload.decode() if playlist_name in self.playlists: # Schedule the coroutine to run in the main event loop asyncio.run_coroutine_threadsafe( self.callback_registry['run_playlist']( playlist_name=playlist_name, run_mode=self.state.playlist_mode, pause_time=self.state.pause_time, clear_pattern=self.state.clear_pattern ), self.main_loop ).add_done_callback( lambda _: self._publish_playlist_state(None) # Clear playlist after execution ) self.client.publish(f"{self.playlist_select_topic}/state", playlist_name, retain=True) elif msg.topic == self.speed_topic: speed = int(msg.payload.decode()) self.callback_registry['set_speed'](speed) elif msg.topic == f"{self.device_id}/command/stop": # Handle stop command callback = self.callback_registry['stop'] if asyncio.iscoroutinefunction(callback): asyncio.run_coroutine_threadsafe(callback(), self.main_loop) else: callback() # Clear both pattern and playlist selections self._publish_pattern_state(None) self._publish_playlist_state(None) elif msg.topic == f"{self.device_id}/command/pause": # Handle pause command - only if in running state if bool(self.state.current_playing_file) and not self.state.pause_requested: # Check if callback is async or sync callback = self.callback_registry['pause'] if asyncio.iscoroutinefunction(callback): asyncio.run_coroutine_threadsafe(callback(), self.main_loop) else: callback() elif msg.topic == f"{self.device_id}/command/play": # Handle play command - only if in paused state if bool(self.state.current_playing_file) and self.state.pause_requested: # Check if callback is async or sync callback = self.callback_registry['resume'] if asyncio.iscoroutinefunction(callback): asyncio.run_coroutine_threadsafe(callback(), self.main_loop) else: callback() elif msg.topic == f"{self.device_id}/playlist/mode/set": mode = msg.payload.decode() if mode in ["single", "loop"]: state.playlist_mode = mode self.client.publish(f"{self.device_id}/playlist/mode/state", mode, retain=True) elif msg.topic == f"{self.device_id}/playlist/pause_time/set": pause_time = float(msg.payload.decode()) if 0 <= pause_time <= 60: state.pause_time = pause_time self.client.publish(f"{self.device_id}/playlist/pause_time/state", pause_time, retain=True) elif msg.topic == f"{self.device_id}/playlist/clear_pattern/set": clear_pattern = msg.payload.decode() if clear_pattern in ["none", "random", "adaptive", "clear_from_in", "clear_from_out", "clear_sideway"]: state.clear_pattern = clear_pattern self.client.publish(f"{self.device_id}/playlist/clear_pattern/state", clear_pattern, retain=True) elif msg.topic == self.led_power_topic: # Handle LED power command (DW LEDs only) payload = msg.payload.decode() if state.led_controller and state.led_provider == "dw_leds": power_state = 1 if payload == "ON" else 0 state.led_controller.set_power(power_state) # Reset idle timeout when LEDs are manually powered on via MQTT (only if idle timeout is enabled) if payload == "ON" and state.dw_led_idle_timeout_enabled: state.dw_led_last_activity_time = time.time() logger.debug("LED activity time reset due to MQTT power on") self.client.publish(f"{self.device_id}/led/power/state", payload, retain=True) elif msg.topic == self.led_brightness_topic: # Handle LED brightness command (DW LEDs only) brightness = int(msg.payload.decode()) if 0 <= brightness <= 100 and state.led_controller and state.led_provider == "dw_leds": controller = state.led_controller.get_controller() if controller and hasattr(controller, 'set_brightness'): controller.set_brightness(brightness / 100.0) self.client.publish(f"{self.device_id}/led/brightness/state", brightness, retain=True) elif msg.topic == self.led_effect_topic: # Handle LED effect command (DW LEDs only) effect_name = msg.payload.decode() if state.led_controller and state.led_provider == "dw_leds": # Map effect name to ID effect_map = { "Static": 0, "Blink": 1, "Breathe": 2, "Wipe": 3, "Fade": 4, "Scan": 5, "Dual Scan": 6, "Rainbow Cycle": 7, "Rainbow": 8, "Theater Chase": 9, "Running Lights": 10, "Random Color": 11, "Dynamic": 12, "Twinkle": 13, "Sparkle": 14, "Strobe": 15, "Fire": 16, "Comet": 17, "Chase": 18, "Police": 19, "Lightning": 20, "Fireworks": 21, "Ripple": 22, "Flow": 23, "Colorloop": 24, "Palette Flow": 25, "Gradient": 26, "Multi Strobe": 27, "Waves": 28, "BPM": 29, "Juggle": 30, "Meteor": 31, "Pride": 32, "Pacifica": 33, "Plasma": 34, "Dissolve": 35, "Glitter": 36, "Confetti": 37, "Sinelon": 38, "Candle": 39, "Aurora": 40, "Rain": 41, "Halloween": 42, "Noise": 43, "Funky Plank": 44 } effect_id = effect_map.get(effect_name) if effect_id is not None: controller = state.led_controller.get_controller() if controller and hasattr(controller, 'set_effect'): controller.set_effect(effect_id) self.client.publish(f"{self.device_id}/led/effect/state", effect_name, retain=True) elif msg.topic == self.led_speed_topic: # Handle LED speed command (DW LEDs only) speed = int(msg.payload.decode()) if 0 <= speed <= 255 and state.led_controller and state.led_provider == "dw_leds": controller = state.led_controller.get_controller() if controller and hasattr(controller, 'set_speed'): controller.set_speed(speed) self.client.publish(f"{self.device_id}/led/speed/state", speed, retain=True) elif msg.topic == self.led_intensity_topic: # Handle LED intensity command (DW LEDs only) intensity = int(msg.payload.decode()) if 0 <= intensity <= 255 and state.led_controller and state.led_provider == "dw_leds": controller = state.led_controller.get_controller() if controller and hasattr(controller, 'set_intensity'): controller.set_intensity(intensity) self.client.publish(f"{self.device_id}/led/intensity/state", intensity, retain=True) elif msg.topic == self.led_color_topic: # Handle LED color command (RGB) (DW LEDs only) try: color_data = json.loads(msg.payload.decode()) if state.led_controller and state.led_provider == "dw_leds" and 'r' in color_data and 'g' in color_data and 'b' in color_data: controller = state.led_controller.get_controller() if controller and hasattr(controller, 'set_color'): r, g, b = color_data['r'], color_data['g'], color_data['b'] controller.set_color(r, g, b) self.client.publish(f"{self.device_id}/led/color/state", json.dumps({"r": r, "g": g, "b": b}), retain=True) except json.JSONDecodeError: logger.error(f"Invalid JSON for color command: {msg.payload}") else: # Handle other commands payload = json.loads(msg.payload.decode()) command = payload.get('command') params = payload.get('params', {}) if command in self.callback_registry: self.callback_registry[command](**params) else: logger.error(f"Unknown command received: {command}") except json.JSONDecodeError: logger.error(f"Invalid JSON payload received: {msg.payload}") except Exception as e: logger.error(f"Error processing MQTT message: {e}") def publish_status(self): """Publish status updates periodically.""" while self.running: try: # Update all states self._publish_running_state() self._publish_pattern_state() self._publish_playlist_state() self._publish_serial_state() self._publish_progress_state() # Update speed state self.client.publish(f"{self.speed_topic}/state", self.state.speed, retain=True) # Update LED state self._publish_led_state() # Publish keepalive status status = { "timestamp": time.time(), "client_id": self.client_id } self.client.publish(self.status_topic, json.dumps(status)) # Wait for next interval time.sleep(self.status_interval) except Exception as e: logger.error(f"Error publishing status: {e}") time.sleep(5) # Wait before retry def start(self) -> None: """Start the MQTT handler.""" if not self.is_enabled: return try: self.client.connect(self.broker, self.port) self.client.loop_start() # Start status publishing thread self.running = True self.status_thread = threading.Thread(target=self.publish_status, daemon=True) self.status_thread.start() # Get initial pattern and playlist lists self.patterns = list_theta_rho_files() self.playlists = list_all_playlists() # Wait a bit for MQTT connection to establish time.sleep(1) # Publish initial states self._publish_running_state() self._publish_pattern_state() self._publish_playlist_state() self._publish_serial_state() self._publish_progress_state() self._publish_led_state() # Setup Home Assistant discovery self.setup_ha_discovery() logger.info("MQTT Handler started successfully") except Exception as e: logger.error(f"Failed to start MQTT Handler: {e}") def stop(self) -> None: """Stop the MQTT handler.""" if not self.is_enabled: return # First stop the running flag to prevent new iterations self.running = False # Clean up status thread local_status_thread = self.status_thread # Keep a local reference if local_status_thread and local_status_thread.is_alive(): try: local_status_thread.join(timeout=5) if local_status_thread.is_alive(): logger.warning("MQTT status thread did not terminate cleanly") except Exception as e: logger.error(f"Error joining status thread: {e}") self.status_thread = None # Clean up MQTT client try: if hasattr(self, 'client'): self.client.loop_stop() self.client.disconnect() except Exception as e: logger.error(f"Error disconnecting MQTT client: {e}") # Clean up main loop reference self.main_loop = None logger.info("MQTT handler stopped") @property def is_enabled(self) -> bool: """Return whether MQTT functionality is enabled.""" return bool(self.broker)