handler.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414
  1. """Real MQTT handler implementation."""
  2. import os
  3. import threading
  4. import time
  5. import json
  6. from typing import Dict, Callable, List, Optional, Any
  7. import paho.mqtt.client as mqtt
  8. import logging
  9. from .base import BaseMQTTHandler
  10. from dune_weaver_flask.modules.core.state import state
  11. from dune_weaver_flask.modules.core.pattern_manager import list_theta_rho_files
  12. from dune_weaver_flask.modules.core.playlist_manager import list_all_playlists
  13. logger = logging.getLogger(__name__)
  14. class MQTTHandler(BaseMQTTHandler):
  15. """Real implementation of MQTT handler."""
  16. def __init__(self, callback_registry: Dict[str, Callable]):
  17. # MQTT Configuration from environment variables
  18. self.broker = os.getenv('MQTT_BROKER')
  19. self.port = int(os.getenv('MQTT_PORT', '1883'))
  20. self.username = os.getenv('MQTT_USERNAME')
  21. self.password = os.getenv('MQTT_PASSWORD')
  22. self.client_id = os.getenv('MQTT_CLIENT_ID', 'dune_weaver')
  23. self.status_topic = os.getenv('MQTT_STATUS_TOPIC', 'dune_weaver/status')
  24. self.command_topic = os.getenv('MQTT_COMMAND_TOPIC', 'dune_weaver/command')
  25. self.status_interval = int(os.getenv('MQTT_STATUS_INTERVAL', '30'))
  26. # Store callback registry
  27. self.callback_registry = callback_registry
  28. # Threading control
  29. self.running = False
  30. self.status_thread = None
  31. # Home Assistant MQTT Discovery settings
  32. self.discovery_prefix = os.getenv('MQTT_DISCOVERY_PREFIX', 'homeassistant')
  33. self.device_name = os.getenv('HA_DEVICE_NAME', 'Dune Weaver')
  34. self.device_id = os.getenv('HA_DEVICE_ID', 'dune_weaver')
  35. # Additional topics for state
  36. self.running_state_topic = f"{self.device_id}/state/running"
  37. self.serial_state_topic = f"{self.device_id}/state/serial"
  38. self.pattern_select_topic = f"{self.device_id}/pattern/set"
  39. self.playlist_select_topic = f"{self.device_id}/playlist/set"
  40. self.speed_topic = f"{self.device_id}/speed/set"
  41. # Store current state
  42. self.current_file = ""
  43. self.is_running_state = False
  44. self.serial_state = ""
  45. self.patterns = []
  46. self.playlists = []
  47. # Initialize MQTT client if broker is configured
  48. if self.broker:
  49. self.client = mqtt.Client(client_id=self.client_id)
  50. self.client.on_connect = self.on_connect
  51. self.client.on_message = self.on_message
  52. if self.username and self.password:
  53. self.client.username_pw_set(self.username, self.password)
  54. self.state = state
  55. self.state.mqtt_handler = self # Set reference to self in state, needed so that state setters can update the state
  56. def setup_ha_discovery(self):
  57. """Publish Home Assistant MQTT discovery configurations."""
  58. if not self.is_enabled:
  59. return
  60. base_device = {
  61. "identifiers": [self.device_id],
  62. "name": self.device_name,
  63. "model": "Dune Weaver",
  64. "manufacturer": "DIY"
  65. }
  66. # Serial State Sensor
  67. serial_config = {
  68. "name": f"{self.device_name} Serial State",
  69. "unique_id": f"{self.device_id}_serial_state",
  70. "state_topic": self.serial_state_topic,
  71. "device": base_device,
  72. "icon": "mdi:serial-port",
  73. "entity_category": "diagnostic"
  74. }
  75. self._publish_discovery("sensor", "serial_state", serial_config)
  76. # Running State Sensor
  77. running_config = {
  78. "name": f"{self.device_name} Running State",
  79. "unique_id": f"{self.device_id}_running_state",
  80. "state_topic": self.running_state_topic,
  81. "device": base_device,
  82. "icon": "mdi:machine",
  83. "entity_category": "diagnostic"
  84. }
  85. self._publish_discovery("sensor", "running_state", running_config)
  86. # Stop Button
  87. stop_config = {
  88. "name": f"Stop pattern execution",
  89. "unique_id": f"{self.device_id}_stop",
  90. "command_topic": f"{self.device_id}/command/stop",
  91. "device": base_device,
  92. "icon": "mdi:stop",
  93. "entity_category": "config"
  94. }
  95. self._publish_discovery("button", "stop", stop_config)
  96. # Pause Button
  97. pause_config = {
  98. "name": f"Pause pattern execution",
  99. "unique_id": f"{self.device_id}_pause",
  100. "command_topic": f"{self.device_id}/command/pause",
  101. "state_topic": f"{self.device_id}/command/pause/state",
  102. "device": base_device,
  103. "icon": "mdi:pause",
  104. "entity_category": "config",
  105. "enabled_by_default": True,
  106. "availability": {
  107. "topic": f"{self.device_id}/command/pause/available",
  108. "payload_available": "true",
  109. "payload_not_available": "false"
  110. }
  111. }
  112. self._publish_discovery("button", "pause", pause_config)
  113. # Play Button
  114. play_config = {
  115. "name": f"Resume pattern execution",
  116. "unique_id": f"{self.device_id}_play",
  117. "command_topic": f"{self.device_id}/command/play",
  118. "state_topic": f"{self.device_id}/command/play/state",
  119. "device": base_device,
  120. "icon": "mdi:play",
  121. "entity_category": "config",
  122. "enabled_by_default": True,
  123. "availability": {
  124. "topic": f"{self.device_id}/command/play/available",
  125. "payload_available": "true",
  126. "payload_not_available": "false"
  127. }
  128. }
  129. self._publish_discovery("button", "play", play_config)
  130. # Speed Control
  131. speed_config = {
  132. "name": f"{self.device_name} Speed",
  133. "unique_id": f"{self.device_id}_speed",
  134. "command_topic": self.speed_topic,
  135. "state_topic": f"{self.speed_topic}/state",
  136. "device": base_device,
  137. "icon": "mdi:speedometer",
  138. "min": 50,
  139. "max": 1000,
  140. "step": 50
  141. }
  142. self._publish_discovery("number", "speed", speed_config)
  143. # Pattern Select
  144. pattern_config = {
  145. "name": f"{self.device_name} Pattern",
  146. "unique_id": f"{self.device_id}_pattern",
  147. "command_topic": self.pattern_select_topic,
  148. "state_topic": f"{self.pattern_select_topic}/state",
  149. "options": self.patterns,
  150. "device": base_device,
  151. "icon": "mdi:draw"
  152. }
  153. self._publish_discovery("select", "pattern", pattern_config)
  154. # Playlist Select
  155. playlist_config = {
  156. "name": f"{self.device_name} Playlist",
  157. "unique_id": f"{self.device_id}_playlist",
  158. "command_topic": self.playlist_select_topic,
  159. "state_topic": f"{self.playlist_select_topic}/state",
  160. "options": self.playlists,
  161. "device": base_device,
  162. "icon": "mdi:playlist-play"
  163. }
  164. self._publish_discovery("select", "playlist", playlist_config)
  165. def _publish_discovery(self, component: str, config_type: str, config: dict):
  166. """Helper method to publish HA discovery configs."""
  167. if not self.is_enabled:
  168. return
  169. discovery_topic = f"{self.discovery_prefix}/{component}/{self.device_id}/{config_type}/config"
  170. self.client.publish(discovery_topic, json.dumps(config), retain=True)
  171. def _publish_running_state(self, running_state=None):
  172. """Helper to publish running state and button availability."""
  173. if running_state is None:
  174. if not self.state.current_playing_file:
  175. running_state = "idle"
  176. elif self.state.pause_requested:
  177. running_state = "paused"
  178. else:
  179. running_state = "running"
  180. self.client.publish(self.running_state_topic, running_state, retain=True)
  181. # Update button availability based on state
  182. self.client.publish(f"{self.device_id}/command/pause/available",
  183. "true" if running_state == "running" else "false",
  184. retain=True)
  185. self.client.publish(f"{self.device_id}/command/play/available",
  186. "true" if running_state == "paused" else "false",
  187. retain=True)
  188. def _publish_pattern_state(self, current_file=None):
  189. """Helper to publish pattern state."""
  190. if current_file is None:
  191. current_file = self.state.current_playing_file
  192. if current_file:
  193. if current_file.startswith('./patterns/'):
  194. current_file = current_file[len('./patterns/'):]
  195. else:
  196. current_file = current_file.split("/")[-1].split("\\")[-1]
  197. self.client.publish(f"{self.pattern_select_topic}/state", current_file, retain=True)
  198. else:
  199. self.client.publish(f"{self.pattern_select_topic}/state", "None", retain=True)
  200. def _publish_playlist_state(self, playlist=None):
  201. """Helper to publish playlist state."""
  202. if playlist is None:
  203. playlist = self.state.current_playlist
  204. if playlist:
  205. self.client.publish(f"{self.playlist_select_topic}/state", playlist, retain=True)
  206. else:
  207. self.client.publish(f"{self.playlist_select_topic}/state", "None", retain=True)
  208. def _publish_serial_state(self):
  209. """Helper to publish serial state."""
  210. serial_connected = state.conn.is_connected()
  211. serial_port = state.port if serial_connected else None
  212. serial_status = f"connected to {serial_port}" if serial_connected else "disconnected"
  213. self.client.publish(self.serial_state_topic, serial_status, retain=True)
  214. def update_state(self, current_file=None, is_running=None, playlist=None):
  215. """Update state in Home Assistant. Only publishes the attributes that are explicitly passed."""
  216. if not self.is_enabled:
  217. return
  218. # Update pattern state if current_file is provided
  219. if current_file is not None:
  220. self._publish_pattern_state(current_file)
  221. # Update running state and button availability if is_running is provided
  222. if is_running is not None:
  223. running_state = "running" if is_running else "paused" if self.state.current_playing_file else "idle"
  224. self._publish_running_state(running_state)
  225. # Update playlist state if playlist info is provided
  226. if playlist is not None:
  227. self._publish_playlist_state(playlist)
  228. def on_connect(self, client, userdata, flags, rc):
  229. """Callback when connected to MQTT broker."""
  230. if rc == 0:
  231. logger.info("MQTT Connection Accepted.")
  232. # Subscribe to command topics
  233. client.subscribe([
  234. (self.command_topic, 0),
  235. (self.pattern_select_topic, 0),
  236. (self.playlist_select_topic, 0),
  237. (self.speed_topic, 0),
  238. (f"{self.device_id}/command/stop", 0),
  239. (f"{self.device_id}/command/pause", 0),
  240. (f"{self.device_id}/command/play", 0)
  241. ])
  242. # Publish discovery configurations
  243. self.setup_ha_discovery()
  244. elif rc == 1:
  245. logger.error("MQTT Connection Refused. Protocol level not supported.")
  246. elif rc == 2:
  247. logger.error("MQTT Connection Refused. The client-identifier is not allowed by the server.")
  248. elif rc == 3:
  249. logger.error("MQTT Connection Refused. The MQTT service is not available.")
  250. elif rc == 4:
  251. logger.error("MQTT Connection Refused. The data in the username or password is malformed.")
  252. elif rc == 5:
  253. logger.error("MQTT Connection Refused. The client is not authorized to connect.")
  254. else:
  255. logger.error(f"MQTT Connection Refused. Unknown error code: {rc}")
  256. def on_message(self, client, userdata, msg):
  257. """Callback when message is received."""
  258. try:
  259. if msg.topic == self.pattern_select_topic:
  260. from dune_weaver_flask.modules.core.pattern_manager import THETA_RHO_DIR
  261. # Handle pattern selection
  262. pattern_name = msg.payload.decode()
  263. if pattern_name in self.patterns:
  264. self.callback_registry['run_pattern'](file_path=f"{THETA_RHO_DIR}/{pattern_name}")
  265. self.client.publish(f"{self.pattern_select_topic}/state", pattern_name, retain=True)
  266. elif msg.topic == self.playlist_select_topic:
  267. # Handle playlist selection
  268. playlist_name = msg.payload.decode()
  269. if playlist_name in self.playlists:
  270. self.callback_registry['run_playlist'](playlist_name=playlist_name)
  271. self.client.publish(f"{self.playlist_select_topic}/state", playlist_name, retain=True)
  272. elif msg.topic == self.speed_topic:
  273. speed = int(msg.payload.decode())
  274. self.callback_registry['set_speed'](speed)
  275. elif msg.topic == f"{self.device_id}/command/stop":
  276. # Handle stop command
  277. self.callback_registry['stop']()
  278. elif msg.topic == f"{self.device_id}/command/pause":
  279. # Handle pause command - only if in running state
  280. if bool(self.state.current_playing_file) and not self.state.pause_requested:
  281. self.callback_registry['pause']()
  282. elif msg.topic == f"{self.device_id}/command/play":
  283. # Handle play command - only if in paused state
  284. if bool(self.state.current_playing_file) and self.state.pause_requested:
  285. self.callback_registry['resume']()
  286. else:
  287. # Handle other commands
  288. payload = json.loads(msg.payload.decode())
  289. command = payload.get('command')
  290. params = payload.get('params', {})
  291. if command in self.callback_registry:
  292. self.callback_registry[command](**params)
  293. else:
  294. logger.error(f"Unknown command received: {command}")
  295. except json.JSONDecodeError:
  296. logger.error(f"Invalid JSON payload received: {msg.payload}")
  297. except Exception as e:
  298. logger.error(f"Error processing MQTT message: {e}")
  299. def publish_status(self):
  300. """Publish status updates periodically."""
  301. while self.running:
  302. try:
  303. # Update all states
  304. self._publish_running_state()
  305. self._publish_pattern_state()
  306. self._publish_playlist_state()
  307. self._publish_serial_state()
  308. # Update speed state
  309. self.client.publish(f"{self.speed_topic}/state", self.state.speed, retain=True)
  310. # Publish keepalive status
  311. status = {
  312. "timestamp": time.time(),
  313. "client_id": self.client_id
  314. }
  315. self.client.publish(self.status_topic, json.dumps(status))
  316. # Wait for next interval
  317. time.sleep(self.status_interval)
  318. except Exception as e:
  319. logger.error(f"Error publishing status: {e}")
  320. time.sleep(5) # Wait before retry
  321. def start(self) -> None:
  322. """Start the MQTT handler."""
  323. if not self.is_enabled:
  324. return
  325. try:
  326. self.client.connect(self.broker, self.port)
  327. self.client.loop_start()
  328. # Start status publishing thread
  329. self.running = True
  330. self.status_thread = threading.Thread(target=self.publish_status, daemon=True)
  331. self.status_thread.start()
  332. # Get initial pattern and playlist lists
  333. self.patterns = list_theta_rho_files()
  334. self.playlists = list_all_playlists()
  335. # Wait a bit for MQTT connection to establish
  336. time.sleep(1)
  337. # Publish initial states
  338. self._publish_running_state()
  339. self._publish_pattern_state()
  340. self._publish_playlist_state()
  341. self._publish_serial_state()
  342. # Setup Home Assistant discovery
  343. self.setup_ha_discovery()
  344. logger.info("MQTT Handler started successfully")
  345. except Exception as e:
  346. logger.error(f"Failed to start MQTT Handler: {e}")
  347. def stop(self) -> None:
  348. """Stop the MQTT handler."""
  349. if not self.is_enabled:
  350. return
  351. self.running = False
  352. if self.status_thread:
  353. self.status_thread.join(timeout=1)
  354. self.client.loop_stop()
  355. self.client.disconnect()
  356. @property
  357. def is_enabled(self) -> bool:
  358. """Return whether MQTT functionality is enabled."""
  359. return bool(self.broker)