handler.py 21 KB


  1. """Real MQTT handler implementation."""
  2. import os
  3. import threading
  4. import time
  5. import json
  6. from typing import Dict, Callable, List, Optional, Any
  7. import paho.mqtt.client as mqtt
  8. import logging
  9. from functools import partial
  10. from .base import BaseMQTTHandler
  11. from modules.core.state import state
  12. from modules.core.pattern_manager import list_theta_rho_files
  13. from modules.core.playlist_manager import list_all_playlists
  14. logger = logging.getLogger(__name__)
  15. class MQTTHandler(BaseMQTTHandler):
  16. """Real implementation of MQTT handler."""
  17. def __init__(self, callback_registry: Dict[str, Callable]):
  18. # MQTT Configuration from environment variables
  19. self.broker = os.getenv('MQTT_BROKER')
  20. self.port = int(os.getenv('MQTT_PORT', '1883'))
  21. self.username = os.getenv('MQTT_USERNAME')
  22. self.password = os.getenv('MQTT_PASSWORD')
  23. self.client_id = os.getenv('MQTT_CLIENT_ID', 'dune_weaver')
  24. self.status_topic = os.getenv('MQTT_STATUS_TOPIC', 'dune_weaver/status')
  25. self.command_topic = os.getenv('MQTT_COMMAND_TOPIC', 'dune_weaver/command')
  26. self.status_interval = int(os.getenv('MQTT_STATUS_INTERVAL', '30'))
  27. # Store callback registry
  28. self.callback_registry = callback_registry
  29. # Threading control
  30. self.running = False
  31. self.status_thread = None
  32. # Home Assistant MQTT Discovery settings
  33. self.discovery_prefix = os.getenv('MQTT_DISCOVERY_PREFIX', 'homeassistant')
  34. self.device_name = os.getenv('HA_DEVICE_NAME', 'Dune Weaver')
  35. self.device_id = os.getenv('HA_DEVICE_ID', 'dune_weaver')
  36. # Additional topics for state
  37. self.running_state_topic = f"{self.device_id}/state/running"
  38. self.serial_state_topic = f"{self.device_id}/state/serial"
  39. self.pattern_select_topic = f"{self.device_id}/pattern/set"
  40. self.playlist_select_topic = f"{self.device_id}/playlist/set"
  41. self.speed_topic = f"{self.device_id}/speed/set"
  42. # Store current state
  43. self.current_file = ""
  44. self.is_running_state = False
  45. self.serial_state = ""
  46. self.patterns = []
  47. self.playlists = []
  48. # Initialize MQTT client if broker is configured
  49. if self.broker:
  50. self.client = mqtt.Client(client_id=self.client_id)
  51. self.client.on_connect = self.on_connect
  52. self.client.on_message = self.on_message
  53. if self.username and self.password:
  54. self.client.username_pw_set(self.username, self.password)
  55. self.state = state
  56. self.state.mqtt_handler = self # Set reference to self in state, needed so that state setters can update the state
  57. def setup_ha_discovery(self):
  58. """Publish Home Assistant MQTT discovery configurations."""
  59. if not self.is_enabled:
  60. return
  61. base_device = {
  62. "identifiers": [self.device_id],
  63. "name": self.device_name,
  64. "model": "Dune Weaver",
  65. "manufacturer": "DIY"
  66. }
  67. # Serial State Sensor
  68. serial_config = {
  69. "name": f"{self.device_name} Serial State",
  70. "unique_id": f"{self.device_id}_serial_state",
  71. "state_topic": self.serial_state_topic,
  72. "device": base_device,
  73. "icon": "mdi:serial-port",
  74. "entity_category": "diagnostic"
  75. }
  76. self._publish_discovery("sensor", "serial_state", serial_config)
  77. # Running State Sensor
  78. running_config = {
  79. "name": f"{self.device_name} Running State",
  80. "unique_id": f"{self.device_id}_running_state",
  81. "state_topic": self.running_state_topic,
  82. "device": base_device,
  83. "icon": "mdi:machine",
  84. "entity_category": "diagnostic"
  85. }
  86. self._publish_discovery("sensor", "running_state", running_config)
  87. # Stop Button
  88. stop_config = {
  89. "name": f"Stop pattern execution",
  90. "unique_id": f"{self.device_id}_stop",
  91. "command_topic": f"{self.device_id}/command/stop",
  92. "device": base_device,
  93. "icon": "mdi:stop",
  94. "entity_category": "config"
  95. }
  96. self._publish_discovery("button", "stop", stop_config)
  97. # Pause Button
  98. pause_config = {
  99. "name": f"Pause pattern execution",
  100. "unique_id": f"{self.device_id}_pause",
  101. "command_topic": f"{self.device_id}/command/pause",
  102. "state_topic": f"{self.device_id}/command/pause/state",
  103. "device": base_device,
  104. "icon": "mdi:pause",
  105. "entity_category": "config",
  106. "enabled_by_default": True,
  107. "availability": {
  108. "topic": f"{self.device_id}/command/pause/available",
  109. "payload_available": "true",
  110. "payload_not_available": "false"
  111. }
  112. }
  113. self._publish_discovery("button", "pause", pause_config)
  114. # Play Button
  115. play_config = {
  116. "name": f"Resume pattern execution",
  117. "unique_id": f"{self.device_id}_play",
  118. "command_topic": f"{self.device_id}/command/play",
  119. "state_topic": f"{self.device_id}/command/play/state",
  120. "device": base_device,
  121. "icon": "mdi:play",
  122. "entity_category": "config",
  123. "enabled_by_default": True,
  124. "availability": {
  125. "topic": f"{self.device_id}/command/play/available",
  126. "payload_available": "true",
  127. "payload_not_available": "false"
  128. }
  129. }
  130. self._publish_discovery("button", "play", play_config)
  131. # Speed Control
  132. speed_config = {
  133. "name": f"{self.device_name} Speed",
  134. "unique_id": f"{self.device_id}_speed",
  135. "command_topic": self.speed_topic,
  136. "state_topic": f"{self.speed_topic}/state",
  137. "device": base_device,
  138. "icon": "mdi:speedometer",
  139. "mode": "box",
  140. "min": 0,
  141. "max": 1000,
  142. "step": 10
  143. }
  144. self._publish_discovery("number", "speed", speed_config)
  145. # Pattern Select
  146. pattern_config = {
  147. "name": f"{self.device_name} Pattern",
  148. "unique_id": f"{self.device_id}_pattern",
  149. "command_topic": self.pattern_select_topic,
  150. "state_topic": f"{self.pattern_select_topic}/state",
  151. "options": self.patterns,
  152. "device": base_device,
  153. "icon": "mdi:draw"
  154. }
  155. self._publish_discovery("select", "pattern", pattern_config)
  156. # Playlist Select
  157. playlist_config = {
  158. "name": f"{self.device_name} Playlist",
  159. "unique_id": f"{self.device_id}_playlist",
  160. "command_topic": self.playlist_select_topic,
  161. "state_topic": f"{self.playlist_select_topic}/state",
  162. "options": self.playlists,
  163. "device": base_device,
  164. "icon": "mdi:playlist-play"
  165. }
  166. self._publish_discovery("select", "playlist", playlist_config)
  167. # Playlist Run Mode Select
  168. playlist_mode_config = {
  169. "name": f"{self.device_name} Playlist Mode",
  170. "unique_id": f"{self.device_id}_playlist_mode",
  171. "command_topic": f"{self.device_id}/playlist/mode/set",
  172. "state_topic": f"{self.device_id}/playlist/mode/state",
  173. "options": ["single", "loop"],
  174. "device": base_device,
  175. "icon": "mdi:repeat",
  176. "entity_category": "config"
  177. }
  178. self._publish_discovery("select", "playlist_mode", playlist_mode_config)
  179. # Playlist Pause Time Number Input
  180. pause_time_config = {
  181. "name": f"{self.device_name} Playlist Pause Time",
  182. "unique_id": f"{self.device_id}_pause_time",
  183. "command_topic": f"{self.device_id}/playlist/pause_time/set",
  184. "state_topic": f"{self.device_id}/playlist/pause_time/state",
  185. "device": base_device,
  186. "icon": "mdi:timer",
  187. "entity_category": "config",
  188. "mode": "box",
  189. "unit_of_measurement": "seconds",
  190. "min": 0,
  191. "max": 86400,
  192. }
  193. self._publish_discovery("number", "pause_time", pause_time_config)
  194. # Clear Pattern Select
  195. clear_pattern_config = {
  196. "name": f"{self.device_name} Clear Pattern",
  197. "unique_id": f"{self.device_id}_clear_pattern",
  198. "command_topic": f"{self.device_id}/playlist/clear_pattern/set",
  199. "state_topic": f"{self.device_id}/playlist/clear_pattern/state",
  200. "options": ["none", "random", "adaptive", "clear_from_in", "clear_from_out", "clear_sideway"],
  201. "device": base_device,
  202. "icon": "mdi:eraser",
  203. "entity_category": "config"
  204. }
  205. self._publish_discovery("select", "clear_pattern", clear_pattern_config)
  206. def _publish_discovery(self, component: str, config_type: str, config: dict):
  207. """Helper method to publish HA discovery configs."""
  208. if not self.is_enabled:
  209. return
  210. discovery_topic = f"{self.discovery_prefix}/{component}/{self.device_id}/{config_type}/config"
  211. self.client.publish(discovery_topic, json.dumps(config), retain=True)
  212. def _publish_running_state(self, running_state=None):
  213. """Helper to publish running state and button availability."""
  214. if running_state is None:
  215. if not self.state.current_playing_file:
  216. running_state = "idle"
  217. elif self.state.pause_requested:
  218. running_state = "paused"
  219. else:
  220. running_state = "running"
  221. self.client.publish(self.running_state_topic, running_state, retain=True)
  222. # Update button availability based on state
  223. self.client.publish(f"{self.device_id}/command/pause/available",
  224. "true" if running_state == "running" else "false",
  225. retain=True)
  226. self.client.publish(f"{self.device_id}/command/play/available",
  227. "true" if running_state == "paused" else "false",
  228. retain=True)
  229. def _publish_pattern_state(self, current_file=None):
  230. """Helper to publish pattern state."""
  231. if current_file is None:
  232. current_file = self.state.current_playing_file
  233. if current_file:
  234. if current_file.startswith('./patterns/'):
  235. current_file = current_file[len('./patterns/'):]
  236. else:
  237. current_file = current_file.split("/")[-1].split("\\")[-1]
  238. self.client.publish(f"{self.pattern_select_topic}/state", current_file, retain=True)
  239. else:
  240. # Clear the pattern selection
  241. self.client.publish(f"{self.pattern_select_topic}/state", "None", retain=True)
  242. def _publish_playlist_state(self, playlist_name=None):
  243. """Helper to publish playlist state."""
  244. if playlist_name is None:
  245. playlist_name = self.state.current_playlist_name
  246. if playlist_name:
  247. self.client.publish(f"{self.playlist_select_topic}/state", playlist_name, retain=True)
  248. else:
  249. # Clear the playlist selection
  250. self.client.publish(f"{self.playlist_select_topic}/state", "None", retain=True)
  251. def _publish_serial_state(self):
  252. """Helper to publish serial state."""
  253. serial_connected = (state.conn.is_connected() if state.conn else False)
  254. serial_port = state.port if serial_connected else None
  255. serial_status = f"connected to {serial_port}" if serial_connected else "disconnected"
  256. self.client.publish(self.serial_state_topic, serial_status, retain=True)
  257. def update_state(self, current_file=None, is_running=None, playlist=None, playlist_name=None):
  258. """Update state in Home Assistant. Only publishes the attributes that are explicitly passed."""
  259. if not self.is_enabled:
  260. return
  261. # Update pattern state if current_file is provided
  262. if current_file is not None:
  263. self._publish_pattern_state(current_file)
  264. # Update running state and button availability if is_running is provided
  265. if is_running is not None:
  266. running_state = "running" if is_running else "paused" if self.state.current_playing_file else "idle"
  267. self._publish_running_state(running_state)
  268. # Update playlist state if playlist info is provided
  269. if playlist_name is not None:
  270. self._publish_playlist_state(playlist_name)
  271. def on_connect(self, client, userdata, flags, rc):
  272. """Callback when connected to MQTT broker."""
  273. if rc == 0:
  274. logger.info("MQTT Connection Accepted.")
  275. # Subscribe to command topics
  276. client.subscribe([
  277. (self.command_topic, 0),
  278. (self.pattern_select_topic, 0),
  279. (self.playlist_select_topic, 0),
  280. (self.speed_topic, 0),
  281. (f"{self.device_id}/command/stop", 0),
  282. (f"{self.device_id}/command/pause", 0),
  283. (f"{self.device_id}/command/play", 0),
  284. (f"{self.device_id}/playlist/mode/set", 0),
  285. (f"{self.device_id}/playlist/pause_time/set", 0),
  286. (f"{self.device_id}/playlist/clear_pattern/set", 0),
  287. ])
  288. # Publish discovery configurations
  289. self.setup_ha_discovery()
  290. elif rc == 1:
  291. logger.error("MQTT Connection Refused. Protocol level not supported.")
  292. elif rc == 2:
  293. logger.error("MQTT Connection Refused. The client-identifier is not allowed by the server.")
  294. elif rc == 3:
  295. logger.error("MQTT Connection Refused. The MQTT service is not available.")
  296. elif rc == 4:
  297. logger.error("MQTT Connection Refused. The data in the username or password is malformed.")
  298. elif rc == 5:
  299. logger.error("MQTT Connection Refused. The client is not authorized to connect.")
  300. else:
  301. logger.error(f"MQTT Connection Refused. Unknown error code: {rc}")
  302. def on_message(self, client, userdata, msg):
  303. """Callback when message is received."""
  304. try:
  305. if msg.topic == self.pattern_select_topic:
  306. from modules.core.pattern_manager import THETA_RHO_DIR
  307. # Handle pattern selection
  308. pattern_name = msg.payload.decode()
  309. if pattern_name in self.patterns:
  310. self.callback_registry['run_pattern'](file_path=f"{THETA_RHO_DIR}/{pattern_name}")
  311. self.client.publish(f"{self.pattern_select_topic}/state", pattern_name, retain=True)
  312. elif msg.topic == self.playlist_select_topic:
  313. # Handle playlist selection
  314. playlist_name = msg.payload.decode()
  315. if playlist_name in self.playlists:
  316. self.callback_registry['run_playlist'](playlist_name=playlist_name,
  317. run_mode=self.state.playlist_mode,
  318. pause_time=self.state.pause_time,
  319. clear_pattern=self.state.clear_pattern)
  320. self.client.publish(f"{self.playlist_select_topic}/state", playlist_name, retain=True)
  321. elif msg.topic == self.speed_topic:
  322. speed = int(msg.payload.decode())
  323. self.callback_registry['set_speed'](speed)
  324. elif msg.topic == f"{self.device_id}/command/stop":
  325. # Handle stop command
  326. self.callback_registry['stop']()
  327. # Clear both pattern and playlist selections
  328. self._publish_pattern_state(None)
  329. self._publish_playlist_state(None)
  330. elif msg.topic == f"{self.device_id}/command/pause":
  331. # Handle pause command - only if in running state
  332. if bool(self.state.current_playing_file) and not self.state.pause_requested:
  333. self.callback_registry['pause']()
  334. elif msg.topic == f"{self.device_id}/command/play":
  335. # Handle play command - only if in paused state
  336. if bool(self.state.current_playing_file) and self.state.pause_requested:
  337. self.callback_registry['resume']()
  338. elif msg.topic == f"{self.device_id}/playlist/mode/set":
  339. mode = msg.payload.decode()
  340. if mode in ["single", "loop"]:
  341. state.playlist_mode = mode
  342. self.client.publish(f"{self.device_id}/playlist/mode/state", mode, retain=True)
  343. elif msg.topic == f"{self.device_id}/playlist/pause_time/set":
  344. pause_time = float(msg.payload.decode())
  345. if 0 <= pause_time <= 60:
  346. state.pause_time = pause_time
  347. self.client.publish(f"{self.device_id}/playlist/pause_time/state", pause_time, retain=True)
  348. elif msg.topic == f"{self.device_id}/playlist/clear_pattern/set":
  349. clear_pattern = msg.payload.decode()
  350. if clear_pattern in ["none", "random", "adaptive", "clear_from_in", "clear_from_out", "clear_sideway"]:
  351. state.clear_pattern = clear_pattern
  352. self.client.publish(f"{self.device_id}/playlist/clear_pattern/state", clear_pattern, retain=True)
  353. else:
  354. # Handle other commands
  355. payload = json.loads(msg.payload.decode())
  356. command = payload.get('command')
  357. params = payload.get('params', {})
  358. if command in self.callback_registry:
  359. self.callback_registry[command](**params)
  360. else:
  361. logger.error(f"Unknown command received: {command}")
  362. except json.JSONDecodeError:
  363. logger.error(f"Invalid JSON payload received: {msg.payload}")
  364. except Exception as e:
  365. logger.error(f"Error processing MQTT message: {e}")
  366. def publish_status(self):
  367. """Publish status updates periodically."""
  368. while self.running:
  369. try:
  370. # Update all states
  371. self._publish_running_state()
  372. self._publish_pattern_state()
  373. self._publish_playlist_state()
  374. self._publish_serial_state()
  375. # Update speed state
  376. self.client.publish(f"{self.speed_topic}/state", self.state.speed, retain=True)
  377. # Publish keepalive status
  378. status = {
  379. "timestamp": time.time(),
  380. "client_id": self.client_id
  381. }
  382. self.client.publish(self.status_topic, json.dumps(status))
  383. # Wait for next interval
  384. time.sleep(self.status_interval)
  385. except Exception as e:
  386. logger.error(f"Error publishing status: {e}")
  387. time.sleep(5) # Wait before retry
  388. def start(self) -> None:
  389. """Start the MQTT handler."""
  390. if not self.is_enabled:
  391. return
  392. try:
  393. self.client.connect(self.broker, self.port)
  394. self.client.loop_start()
  395. # Start status publishing thread
  396. self.running = True
  397. self.status_thread = threading.Thread(target=self.publish_status, daemon=True)
  398. self.status_thread.start()
  399. # Get initial pattern and playlist lists
  400. self.patterns = list_theta_rho_files()
  401. self.playlists = list_all_playlists()
  402. # Wait a bit for MQTT connection to establish
  403. time.sleep(1)
  404. # Publish initial states
  405. self._publish_running_state()
  406. self._publish_pattern_state()
  407. self._publish_playlist_state()
  408. self._publish_serial_state()
  409. # Setup Home Assistant discovery
  410. self.setup_ha_discovery()
  411. logger.info("MQTT Handler started successfully")
  412. except Exception as e:
  413. logger.error(f"Failed to start MQTT Handler: {e}")
  414. def stop(self) -> None:
  415. """Stop the MQTT handler."""
  416. if not self.is_enabled:
  417. return
  418. # First stop the running flag to prevent new iterations
  419. self.running = False
  420. # Clean up status thread
  421. local_status_thread = self.status_thread # Keep a local reference
  422. if local_status_thread and local_status_thread.is_alive():
  423. try:
  424. local_status_thread.join(timeout=5)
  425. if local_status_thread.is_alive():
  426. logger.warning("MQTT status thread did not terminate cleanly")
  427. except Exception as e:
  428. logger.error(f"Error joining status thread: {e}")
  429. self.status_thread = None
  430. # Clean up MQTT client
  431. try:
  432. if hasattr(self, 'client'):
  433. self.client.loop_stop()
  434. self.client.disconnect()
  435. except Exception as e:
  436. logger.error(f"Error disconnecting MQTT client: {e}")
  437. # Clean up main loop reference
  438. self.main_loop = None
  439. logger.info("MQTT handler stopped")
  440. @property
  441. def is_enabled(self) -> bool:
  442. """Return whether MQTT functionality is enabled."""
  443. return bool(self.broker)