handler.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. """Real MQTT handler implementation."""
  2. import os
  3. import threading
  4. import time
  5. import json
  6. from typing import Dict, Callable, List, Optional, Any
  7. import paho.mqtt.client as mqtt
  8. import logging
  9. from .base import BaseMQTTHandler
  10. from dune_weaver_flask.modules.core.state import state
  11. from dune_weaver_flask.modules.core.pattern_manager import list_theta_rho_files
  12. from dune_weaver_flask.modules.core.playlist_manager import list_all_playlists
  13. from dune_weaver_flask.modules.serial.serial_manager import is_connected
  14. logger = logging.getLogger(__name__)
  15. class MQTTHandler(BaseMQTTHandler):
  16. """Real implementation of MQTT handler."""
  17. def __init__(self, callback_registry: Dict[str, Callable]):
  18. # MQTT Configuration from environment variables
  19. self.broker = os.getenv('MQTT_BROKER')
  20. self.port = int(os.getenv('MQTT_PORT', '1883'))
  21. self.username = os.getenv('MQTT_USERNAME')
  22. self.password = os.getenv('MQTT_PASSWORD')
  23. self.client_id = os.getenv('MQTT_CLIENT_ID', 'dune_weaver')
  24. self.status_topic = os.getenv('MQTT_STATUS_TOPIC', 'dune_weaver/status')
  25. self.command_topic = os.getenv('MQTT_COMMAND_TOPIC', 'dune_weaver/command')
  26. self.status_interval = int(os.getenv('MQTT_STATUS_INTERVAL', '30'))
  27. # Store callback registry
  28. self.callback_registry = callback_registry
  29. # Threading control
  30. self.running = False
  31. self.status_thread = None
  32. # Home Assistant MQTT Discovery settings
  33. self.discovery_prefix = os.getenv('MQTT_DISCOVERY_PREFIX', 'homeassistant')
  34. self.device_name = os.getenv('HA_DEVICE_NAME', 'Sand Table')
  35. self.device_id = os.getenv('HA_DEVICE_ID', 'dune_weaver')
  36. # Additional topics for state
  37. self.current_file_topic = f"{self.device_id}/state/current_file"
  38. self.running_state_topic = f"{self.device_id}/state/running"
  39. self.serial_state_topic = f"{self.device_id}/state/serial"
  40. self.pattern_select_topic = f"{self.device_id}/pattern/set"
  41. self.playlist_select_topic = f"{self.device_id}/playlist/set"
  42. self.speed_topic = f"{self.device_id}/speed/set"
  43. # Store current state
  44. self.current_file = ""
  45. self.is_running_state = False
  46. self.serial_state = ""
  47. self.patterns = []
  48. self.playlists = []
  49. # Initialize MQTT client if broker is configured
  50. if self.broker:
  51. self.client = mqtt.Client(client_id=self.client_id)
  52. self.client.on_connect = self.on_connect
  53. self.client.on_message = self.on_message
  54. if self.username and self.password:
  55. self.client.username_pw_set(self.username, self.password)
  56. def setup_ha_discovery(self):
  57. """Publish Home Assistant MQTT discovery configurations."""
  58. if not self.is_enabled:
  59. return
  60. base_device = {
  61. "identifiers": [self.device_id],
  62. "name": self.device_name,
  63. "model": "Dune Weaver",
  64. "manufacturer": "DIY"
  65. }
  66. # Serial State Sensor
  67. serial_config = {
  68. "name": f"{self.device_name} Serial State",
  69. "unique_id": f"{self.device_id}_serial_state",
  70. "state_topic": self.serial_state_topic,
  71. "device": base_device,
  72. "icon": "mdi:serial-port",
  73. "entity_category": "diagnostic"
  74. }
  75. self._publish_discovery("sensor", "serial_state", serial_config)
  76. # Running State Sensor
  77. running_config = {
  78. "name": f"{self.device_name} Running State",
  79. "unique_id": f"{self.device_id}_running_state",
  80. "state_topic": self.running_state_topic,
  81. "device": base_device,
  82. "icon": "mdi:machine",
  83. "entity_category": "diagnostic"
  84. }
  85. self._publish_discovery("binary_sensor", "running_state", running_config)
  86. # Speed Control
  87. speed_config = {
  88. "name": f"{self.device_name} Speed",
  89. "unique_id": f"{self.device_id}_speed",
  90. "command_topic": self.speed_topic,
  91. "state_topic": f"{self.speed_topic}/state",
  92. "device": base_device,
  93. "icon": "mdi:speedometer",
  94. "min": 50,
  95. "max": 1000,
  96. "step": 50
  97. }
  98. self._publish_discovery("number", "speed", speed_config)
  99. # Pattern Select
  100. pattern_config = {
  101. "name": f"{self.device_name} Pattern",
  102. "unique_id": f"{self.device_id}_pattern",
  103. "command_topic": self.pattern_select_topic,
  104. "state_topic": f"{self.pattern_select_topic}/state",
  105. "options": self.patterns,
  106. "device": base_device,
  107. "icon": "mdi:draw"
  108. }
  109. self._publish_discovery("select", "pattern", pattern_config)
  110. # Playlist Select
  111. playlist_config = {
  112. "name": f"{self.device_name} Playlist",
  113. "unique_id": f"{self.device_id}_playlist",
  114. "command_topic": self.playlist_select_topic,
  115. "state_topic": f"{self.playlist_select_topic}/state",
  116. "options": self.playlists,
  117. "device": base_device,
  118. "icon": "mdi:playlist-play"
  119. }
  120. self._publish_discovery("select", "playlist", playlist_config)
  121. # Playlist Active Sensor
  122. playlist_active_config = {
  123. "name": f"{self.device_name} Playlist Active",
  124. "unique_id": f"{self.device_id}_playlist_active",
  125. "state_topic": f"{self.device_id}/state/playlist",
  126. "value_template": "{{ value_json.active }}",
  127. "device": base_device,
  128. "icon": "mdi:playlist-play",
  129. "entity_category": "diagnostic"
  130. }
  131. self._publish_discovery("binary_sensor", "playlist_active", playlist_active_config)
  132. def _publish_discovery(self, component: str, config_type: str, config: dict):
  133. """Helper method to publish HA discovery configs."""
  134. if not self.is_enabled:
  135. return
  136. discovery_topic = f"{self.discovery_prefix}/{component}/{self.device_id}/{config_type}/config"
  137. self.client.publish(discovery_topic, json.dumps(config), retain=True)
  138. def update_state(self, is_running: Optional[bool] = None,
  139. current_file: Optional[str] = None,
  140. patterns: Optional[List[str]] = None,
  141. serial: Optional[str] = None,
  142. playlist: Optional[Dict[str, Any]] = None) -> None:
  143. """Update the state of the sand table and publish to MQTT."""
  144. if not self.is_enabled:
  145. return
  146. if is_running is not None:
  147. self.is_running_state = is_running
  148. self.client.publish(self.running_state_topic, "ON" if is_running else "OFF", retain=True)
  149. if current_file is not None:
  150. self.current_file = current_file
  151. self.client.publish(self.current_file_topic, current_file, retain=True)
  152. self.client.publish(f"{self.pattern_select_topic}/state", current_file, retain=True)
  153. if patterns is not None and set(patterns) != set(self.patterns):
  154. self.patterns = patterns
  155. self.setup_ha_discovery()
  156. if serial is not None:
  157. self.serial_state = serial
  158. self.client.publish(self.serial_state_topic, serial, retain=True)
  159. # Update speed state
  160. self.client.publish(f"{self.speed_topic}/state", state.speed, retain=True)
  161. def on_connect(self, client, userdata, flags, rc):
  162. """Callback when connected to MQTT broker."""
  163. logger.info(f"Connected to MQTT broker with result code {rc}")
  164. # Subscribe to command topics
  165. client.subscribe([
  166. (self.command_topic, 0),
  167. (self.pattern_select_topic, 0),
  168. (self.playlist_select_topic, 0),
  169. (self.speed_topic, 0)
  170. ])
  171. # Publish discovery configurations
  172. self.setup_ha_discovery()
  173. def on_message(self, client, userdata, msg):
  174. """Callback when message is received."""
  175. try:
  176. if msg.topic == self.pattern_select_topic:
  177. # Handle pattern selection
  178. pattern_name = msg.payload.decode()
  179. if pattern_name in self.patterns:
  180. self.callback_registry['run_pattern'](file_path=f"{pattern_name}")
  181. self.client.publish(f"{self.pattern_select_topic}/state", pattern_name, retain=True)
  182. elif msg.topic == self.playlist_select_topic:
  183. # Handle playlist selection
  184. playlist_name = msg.payload.decode()
  185. if playlist_name in self.playlists:
  186. self.callback_registry['run_playlist'](playlist_name=playlist_name)
  187. self.client.publish(f"{self.playlist_select_topic}/state", playlist_name, retain=True)
  188. elif msg.topic == self.speed_topic:
  189. speed = int(msg.payload.decode())
  190. self.callback_registry['set_speed'](speed)
  191. else:
  192. # Handle other commands
  193. payload = json.loads(msg.payload.decode())
  194. command = payload.get('command')
  195. params = payload.get('params', {})
  196. if command in self.callback_registry:
  197. self.callback_registry[command](**params)
  198. else:
  199. logger.error(f"Unknown command received: {command}")
  200. except json.JSONDecodeError:
  201. logger.error(f"Invalid JSON payload received: {msg.payload}")
  202. except Exception as e:
  203. logger.error(f"Error processing MQTT message: {e}")
  204. def publish_status(self):
  205. """Publish status updates periodically."""
  206. while self.running:
  207. try:
  208. # Create status message
  209. status = {
  210. "status": "running" if not state.stop_requested else "idle",
  211. "timestamp": time.time(),
  212. "client_id": self.client_id,
  213. "current_file": state.current_playing_file,
  214. "speed": state.speed,
  215. "position": {
  216. "theta": state.current_theta,
  217. "rho": state.current_rho,
  218. "x": state.machine_x,
  219. "y": state.machine_y
  220. }
  221. }
  222. logger.info(f"publishing status: {status}" )
  223. self.client.publish(self.status_topic, json.dumps(status))
  224. # Wait for next interval
  225. time.sleep(self.status_interval)
  226. except Exception as e:
  227. logger.error(f"Error publishing status: {e}")
  228. time.sleep(5) # Wait before retry
  229. def start(self) -> None:
  230. """Start the MQTT handler."""
  231. if not self.is_enabled:
  232. return
  233. try:
  234. self.client.connect(self.broker, self.port)
  235. self.client.loop_start()
  236. # Start status publishing thread
  237. self.running = True
  238. self.status_thread = threading.Thread(target=self.publish_status, daemon=True)
  239. self.status_thread.start()
  240. # Get initial states
  241. patterns = list_theta_rho_files()
  242. playlists = list_all_playlists()
  243. serial_status = is_connected()
  244. logger.info(patterns, playlists, serial_status)
  245. # Wait for connection
  246. time.sleep(1)
  247. # Publish initial states
  248. self.update_state(
  249. is_running=not state.stop_requested,
  250. current_file=state.current_playing_file,
  251. patterns=patterns,
  252. serial=serial_status.get('status', '')
  253. )
  254. logger.info("MQTT Handler started successfully")
  255. except Exception as e:
  256. logger.error(f"Failed to start MQTT Handler: {e}")
  257. def stop(self) -> None:
  258. """Stop the MQTT handler."""
  259. if not self.is_enabled:
  260. return
  261. self.running = False
  262. if self.status_thread:
  263. self.status_thread.join(timeout=1)
  264. self.client.loop_stop()
  265. self.client.disconnect()
  266. @property
  267. def is_enabled(self) -> bool:
  268. """Return whether MQTT functionality is enabled."""
  269. return bool(self.broker)