Browse Source

fix mqtt integration

tuanchris 5 months ago
parent
commit
e0ce8f8ac0
2 changed files with 28 additions and 15 deletions
  1. 17 3
      modules/mqtt/handler.py
  2. 11 12
      modules/mqtt/utils.py

+ 17 - 3
modules/mqtt/handler.py

@@ -382,18 +382,32 @@ class MQTTHandler(BaseMQTTHandler):
                 self.callback_registry['set_speed'](speed)
             elif msg.topic == f"{self.device_id}/command/stop":
                 # Handle stop command
-                self.callback_registry['stop']()
+                callback = self.callback_registry['stop']
+                if asyncio.iscoroutinefunction(callback):
+                    asyncio.run_coroutine_threadsafe(callback(), self.main_loop)
+                else:
+                    callback()
                 # Clear both pattern and playlist selections
                 self._publish_pattern_state(None)
                 self._publish_playlist_state(None)
             elif msg.topic == f"{self.device_id}/command/pause":
                 # Handle pause command - only if in running state
                 if bool(self.state.current_playing_file) and not self.state.pause_requested:
-                    self.callback_registry['pause']()
+                    # Check if callback is async or sync
+                    callback = self.callback_registry['pause']
+                    if asyncio.iscoroutinefunction(callback):
+                        asyncio.run_coroutine_threadsafe(callback(), self.main_loop)
+                    else:
+                        callback()
             elif msg.topic == f"{self.device_id}/command/play":
                 # Handle play command - only if in paused state
                 if bool(self.state.current_playing_file) and self.state.pause_requested:
-                    self.callback_registry['resume']()
+                    # Check if callback is async or sync
+                    callback = self.callback_registry['resume']
+                    if asyncio.iscoroutinefunction(callback):
+                        asyncio.run_coroutine_threadsafe(callback(), self.main_loop)
+                    else:
+                        callback()
             elif msg.topic == f"{self.device_id}/playlist/mode/set":
                 mode = msg.payload.decode()
                 if mode in ["single", "loop"]:

+ 11 - 12
modules/mqtt/utils.py

@@ -11,22 +11,21 @@ from modules.connection.connection_manager import home
 from modules.core.state import state
 
 def create_mqtt_callbacks() -> Dict[str, Callable]:
-    """Create and return the MQTT callback registry."""
+    """Create and return the MQTT callback registry.
+    
+    Note: run_theta_rho_file and run_playlist are async functions,
+    while pause_execution, resume_execution, and stop_actions are sync functions.
+    The MQTT handler will check and handle both async and sync appropriately.
+    """
     def set_speed(speed):
         state.speed = speed
 
     return {
-        'run_pattern': lambda file_path: run_theta_rho_file(file_path),
-        'run_playlist': lambda playlist_name, run_mode="loop", pause_time=0, clear_pattern=None, shuffle=False: run_playlist(
-            playlist_name,
-            run_mode=run_mode,
-            pause_time=pause_time,
-            clear_pattern=clear_pattern,
-            shuffle=shuffle
-        ),
-        'stop': stop_actions,
-        'pause': pause_execution,
-        'resume': resume_execution,
+        'run_pattern': run_theta_rho_file,  # async function
+        'run_playlist': run_playlist,  # async function
+        'stop': stop_actions,  # sync function
+        'pause': pause_execution,  # sync function
+        'resume': resume_execution,  # sync function
         'home': home,
         'set_speed': set_speed
     }