| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871 |
- from PySide6.QtCore import QObject, Signal, Property, Slot, QTimer
- from PySide6.QtQml import QmlElement
- from PySide6.QtWebSockets import QWebSocket
- from PySide6.QtNetwork import QAbstractSocket
- import aiohttp
- import asyncio
- import json
- import subprocess
- import threading
- import time
- from pathlib import Path
- import os
- QML_IMPORT_NAME = "DuneWeaver"
- QML_IMPORT_MAJOR_VERSION = 1
- @QmlElement
- class Backend(QObject):
- """Backend controller for API and WebSocket communication"""
-
- # Constants
- SETTINGS_FILE = "touch_settings.json"
- DEFAULT_SCREEN_TIMEOUT = 300 # 5 minutes in seconds
-
- # Predefined timeout options (in seconds)
- TIMEOUT_OPTIONS = {
- "30 seconds": 30,
- "1 minute": 60,
- "5 minutes": 300,
- "10 minutes": 600,
- "Never": 0 # 0 means never timeout
- }
-
- # Predefined speed options
- SPEED_OPTIONS = {
- "50": 50,
- "100": 100,
- "150": 150,
- "200": 200,
- "300": 300,
- "500": 500
- }
-
- # Predefined pause between patterns options (in seconds)
- PAUSE_OPTIONS = {
- "0s": 0, # No pause
- "1 min": 60, # 1 minute
- "5 min": 300, # 5 minutes
- "15 min": 900, # 15 minutes
- "30 min": 1800, # 30 minutes
- "1 hour": 3600, # 1 hour
- "2 hour": 7200, # 2 hours
- "3 hour": 10800, # 3 hours
- "4 hour": 14400, # 4 hours
- "5 hour": 18000, # 5 hours
- "6 hour": 21600, # 6 hours
- "12 hour": 43200 # 12 hours
- }
-
- # Signals
- statusChanged = Signal()
- progressChanged = Signal()
- connectionChanged = Signal()
- executionStarted = Signal(str, str) # patternName, patternPreview
- executionStopped = Signal()
- errorOccurred = Signal(str)
- serialPortsUpdated = Signal(list)
- serialConnectionChanged = Signal(bool)
- currentPortChanged = Signal(str)
- speedChanged = Signal(int)
- settingsLoaded = Signal()
- screenStateChanged = Signal(bool) # True = on, False = off
- screenTimeoutChanged = Signal(int) # New signal for timeout changes
- pauseBetweenPatternsChanged = Signal(int) # New signal for pause changes
- pausedChanged = Signal(bool) # Signal when pause state changes
- patternsRefreshCompleted = Signal(bool, str) # (success, message) for pattern refresh
- # Playlist management signals
- playlistCreated = Signal(bool, str) # (success, message)
- playlistDeleted = Signal(bool, str) # (success, message)
- patternAddedToPlaylist = Signal(bool, str) # (success, message)
- playlistModified = Signal(bool, str) # (success, message)
- # Backend connection status signals
- backendConnectionChanged = Signal(bool) # True = backend reachable, False = unreachable
- reconnectStatusChanged = Signal(str) # Current reconnection status message
- # LED control signals
- ledStatusChanged = Signal()
- ledEffectsLoaded = Signal(list) # List of available effects
- ledPalettesLoaded = Signal(list) # List of available palettes
-
- def __init__(self):
- super().__init__()
- # Load base URL from environment variable, default to localhost
- self.base_url = os.environ.get("DUNE_WEAVER_URL", "http://localhost:8080")
-
- # Initialize all status properties first
- self._current_file = ""
- self._progress = 0
- self._is_running = False
- self._is_paused = False # Track pause state separately
- self._is_connected = False
- self._serial_ports = []
- self._serial_connected = False
- self._current_port = ""
- self._current_speed = 130
- self._auto_play_on_boot = False
- self._pause_between_patterns = 0 # Default: no pause (0 seconds)
-
- # Backend connection status
- self._backend_connected = False
- self._reconnect_status = "Connecting to backend..."
- # LED control state
- self._led_provider = "none" # "none", "wled", or "dw_leds"
- self._led_connected = False
- self._led_power_on = False
- self._led_brightness = 100
- self._led_effects = []
- self._led_palettes = []
- self._led_current_effect = 0
- self._led_current_palette = 0
- self._led_color = "#ffffff"
-
- # WebSocket for status with reconnection
- self.ws = QWebSocket()
- self.ws.connected.connect(self._on_ws_connected)
- self.ws.disconnected.connect(self._on_ws_disconnected)
- self.ws.errorOccurred.connect(self._on_ws_error)
- self.ws.textMessageReceived.connect(self._on_ws_message)
-
- # WebSocket reconnection management
- self._reconnect_timer = QTimer()
- self._reconnect_timer.timeout.connect(self._attempt_ws_reconnect)
- self._reconnect_timer.setSingleShot(True)
- self._reconnect_attempts = 0
- self._reconnect_delay = 1000 # Fixed 1 second delay between retries
-
- # Screen management
- self._screen_on = True
- self._screen_timeout = self.DEFAULT_SCREEN_TIMEOUT # Will be loaded from settings
- self._last_activity = time.time()
- self._touch_monitor_thread = None
- self._stop_touch_monitor = threading.Event() # Signal to stop touch monitoring thread
- self._screen_transition_lock = threading.Lock() # Prevent rapid state changes
- self._last_screen_change = 0 # Track last state change time
- self._use_touch_script = False # Disable external touch-monitor script (too sensitive)
- self._screen_timer = QTimer()
- self._screen_timer.timeout.connect(self._check_screen_timeout)
- self._screen_timer.start(1000) # Check every second
- # Load local settings first
- self._load_local_settings()
- print(f"🖥️ Screen management initialized: timeout={self._screen_timeout}s, timer started")
-
- # HTTP session - initialize lazily
- self.session = None
- self._session_initialized = False
-
- # Use QTimer to defer session initialization until event loop is running
- QTimer.singleShot(100, self._delayed_init)
-
- # Start initial WebSocket connection (after all attributes are initialized)
- # Use QTimer to ensure it happens after constructor completes
- QTimer.singleShot(200, self._attempt_ws_reconnect)
-
- @Slot()
- def _delayed_init(self):
- """Initialize session after Qt event loop is running"""
- if not self._session_initialized:
- try:
- loop = asyncio.get_event_loop()
- if loop.is_running():
- asyncio.create_task(self._init_session())
- else:
- # If no loop is running, try again later
- QTimer.singleShot(500, self._delayed_init)
- except RuntimeError:
- # No event loop yet, try again
- QTimer.singleShot(500, self._delayed_init)
-
- async def _init_session(self):
- """Initialize aiohttp session"""
- if not self._session_initialized:
- # Create connector with SSL disabled for localhost
- connector = aiohttp.TCPConnector(ssl=False)
- self.session = aiohttp.ClientSession(connector=connector)
- self._session_initialized = True
-
- # Properties
- @Property(str, notify=statusChanged)
- def currentFile(self):
- return self._current_file
-
- @Property(float, notify=progressChanged)
- def progress(self):
- return self._progress
-
- @Property(bool, notify=statusChanged)
- def isRunning(self):
- return self._is_running
- @Property(bool, notify=pausedChanged)
- def isPaused(self):
- return self._is_paused
- @Property(bool, notify=connectionChanged)
- def isConnected(self):
- return self._is_connected
-
- @Property(list, notify=serialPortsUpdated)
- def serialPorts(self):
- return self._serial_ports
-
- @Property(bool, notify=serialConnectionChanged)
- def serialConnected(self):
- return self._serial_connected
-
- @Property(str, notify=currentPortChanged)
- def currentPort(self):
- return self._current_port
-
- @Property(int, notify=speedChanged)
- def currentSpeed(self):
- return self._current_speed
-
- @Property(bool, notify=settingsLoaded)
- def autoPlayOnBoot(self):
- return self._auto_play_on_boot
-
- @Property(bool, notify=backendConnectionChanged)
- def backendConnected(self):
- return self._backend_connected
-
- @Property(str, notify=reconnectStatusChanged)
- def reconnectStatus(self):
- return self._reconnect_status
-
- # WebSocket handlers
- @Slot()
- def _on_ws_connected(self):
- print("✅ WebSocket connected successfully")
- self._is_connected = True
- self._backend_connected = True
- self._reconnect_attempts = 0 # Reset reconnection counter
- self._reconnect_status = "Connected to backend"
- self.connectionChanged.emit()
- self.backendConnectionChanged.emit(True)
- self.reconnectStatusChanged.emit("Connected to backend")
- # Load initial settings when we connect
- self.loadControlSettings()
- # Also load LED config automatically
- self.loadLedConfig()
-
- @Slot()
- def _on_ws_disconnected(self):
- print("❌ WebSocket disconnected")
- self._is_connected = False
- self._backend_connected = False
- self._reconnect_status = "Backend connection lost..."
- self.connectionChanged.emit()
- self.backendConnectionChanged.emit(False)
- self.reconnectStatusChanged.emit("Backend connection lost...")
- # Start reconnection attempts
- self._schedule_reconnect()
-
- @Slot()
- def _on_ws_error(self, error):
- print(f"❌ WebSocket error: {error}")
- self._is_connected = False
- self._backend_connected = False
- self._reconnect_status = f"Backend error: {error}"
- self.connectionChanged.emit()
- self.backendConnectionChanged.emit(False)
- self.reconnectStatusChanged.emit(f"Backend error: {error}")
- # Start reconnection attempts
- self._schedule_reconnect()
-
- def _schedule_reconnect(self):
- """Schedule a reconnection attempt with fixed 1-second delay."""
- # Always retry - no maximum attempts for touch interface
- status_msg = f"Reconnecting in 1s... (attempt {self._reconnect_attempts + 1})"
- print(f"🔄 {status_msg}")
- self._reconnect_status = status_msg
- self.reconnectStatusChanged.emit(status_msg)
- self._reconnect_timer.start(self._reconnect_delay) # Always 1 second
-
- @Slot()
- def _attempt_ws_reconnect(self):
- """Attempt to reconnect WebSocket."""
- if self.ws.state() == QAbstractSocket.SocketState.ConnectedState:
- print("✅ WebSocket already connected")
- return
-
- self._reconnect_attempts += 1
- status_msg = f"Connecting to backend... (attempt {self._reconnect_attempts})"
- print(f"🔄 {status_msg}")
- self._reconnect_status = status_msg
- self.reconnectStatusChanged.emit(status_msg)
-
- # Close existing connection if any
- if self.ws.state() != QAbstractSocket.SocketState.UnconnectedState:
- self.ws.close()
-
- # Attempt new connection - derive WebSocket URL from base URL
- ws_url = self.base_url.replace("http://", "ws://").replace("https://", "wss://") + "/ws/status"
- self.ws.open(ws_url)
-
- @Slot()
- def retryConnection(self):
- """Manually retry connection (reset attempts and try again)."""
- print("🔄 Manual connection retry requested")
- self._reconnect_attempts = 0
- self._reconnect_timer.stop() # Stop any scheduled reconnect
- self._attempt_ws_reconnect()
-
- @Slot(str)
- def _on_ws_message(self, message):
- try:
- data = json.loads(message)
- if data.get("type") == "status_update":
- status = data.get("data", {})
- new_file = status.get("current_file", "")
- # Detect pattern change and emit executionStarted signal
- if new_file and new_file != self._current_file:
- print(f"🎯 Pattern changed from '{self._current_file}' to '{new_file}'")
- # Find preview for the new pattern
- preview_path = self._find_pattern_preview(new_file)
- print(f"🖼️ Preview path for new pattern: {preview_path}")
- # Emit signal so UI can update
- self.executionStarted.emit(new_file, preview_path)
- self._current_file = new_file
- self._is_running = status.get("is_running", False)
- # Handle pause state from WebSocket
- new_paused = status.get("is_paused", False)
- if new_paused != self._is_paused:
- print(f"⏸️ Pause state changed: {self._is_paused} -> {new_paused}")
- self._is_paused = new_paused
- self.pausedChanged.emit(new_paused)
- # Handle serial connection status from WebSocket
- ws_connection_status = status.get("connection_status", False)
- if ws_connection_status != self._serial_connected:
- print(f"🔌 WebSocket serial connection status changed: {ws_connection_status}")
- self._serial_connected = ws_connection_status
- self.serialConnectionChanged.emit(ws_connection_status)
- # If we're connected, we need to get the current port
- if ws_connection_status:
- # We'll need to fetch the current port via HTTP since WS doesn't include port info
- asyncio.create_task(self._get_current_port())
- else:
- self._current_port = ""
- self.currentPortChanged.emit("")
- # Handle speed updates from WebSocket
- ws_speed = status.get("speed", None)
- if ws_speed and ws_speed != self._current_speed:
- print(f"⚡ WebSocket speed changed: {ws_speed}")
- self._current_speed = ws_speed
- self.speedChanged.emit(ws_speed)
- if status.get("progress"):
- self._progress = status["progress"].get("percentage", 0)
- self.statusChanged.emit()
- self.progressChanged.emit()
- except json.JSONDecodeError:
- pass
-
- async def _get_current_port(self):
- """Fetch the current port when we detect a connection via WebSocket"""
- if not self.session:
- return
-
- try:
- async with self.session.get(f"{self.base_url}/serial_status") as resp:
- if resp.status == 200:
- data = await resp.json()
- current_port = data.get("port", "")
- if current_port:
- self._current_port = current_port
- self.currentPortChanged.emit(current_port)
- print(f"🔌 Updated current port from WebSocket trigger: {current_port}")
- except Exception as e:
- print(f"💥 Exception getting current port: {e}")
-
- # API Methods
- @Slot(str, str)
- def executePattern(self, fileName, preExecution="adaptive"):
- print(f"🎯 ExecutePattern called: fileName='{fileName}', preExecution='{preExecution}'")
- asyncio.create_task(self._execute_pattern(fileName, preExecution))
-
- async def _execute_pattern(self, fileName, preExecution):
- if not self.session:
- print("❌ Backend session not ready")
- self.errorOccurred.emit("Backend not ready, please try again")
- return
-
- try:
- request_data = {"file_name": fileName, "pre_execution": preExecution}
- print(f"🔄 Making HTTP POST to: {self.base_url}/run_theta_rho")
- print(f"📝 Request payload: {request_data}")
-
- async with self.session.post(
- f"{self.base_url}/run_theta_rho",
- json=request_data
- ) as resp:
- print(f"📡 Response status: {resp.status}")
- print(f"📋 Response headers: {dict(resp.headers)}")
-
- response_text = await resp.text()
- print(f"📄 Response body: {response_text}")
-
- if resp.status == 200:
- print("✅ Pattern execution request successful")
- # Find preview image for the pattern
- preview_path = self._find_pattern_preview(fileName)
- print(f"🖼️ Pattern preview path: {preview_path}")
- print(f"📡 About to emit executionStarted signal with: fileName='{fileName}', preview='{preview_path}'")
- try:
- self.executionStarted.emit(fileName, preview_path)
- print("✅ ExecutionStarted signal emitted successfully")
- except Exception as e:
- print(f"❌ Error emitting executionStarted signal: {e}")
- else:
- print(f"❌ Pattern execution failed with status {resp.status}")
- self.errorOccurred.emit(f"Failed to execute: {resp.status} - {response_text}")
- except Exception as e:
- print(f"💥 Exception in _execute_pattern: {e}")
- self.errorOccurred.emit(str(e))
-
- def _find_pattern_preview(self, fileName):
- """Find the preview image for a pattern"""
- try:
- # Extract just the filename from the path (remove any directory prefixes)
- clean_filename = fileName.split('/')[-1] # Get last part of path
- print(f"🔍 Original fileName: {fileName}, clean filename: {clean_filename}")
- # Check multiple possible locations for patterns directory
- # Use relative paths that work across different environments
- possible_dirs = [
- Path("../patterns"), # One level up (for when running from touch subdirectory)
- Path("patterns"), # Same level (for when running from main directory)
- Path(__file__).parent.parent / "patterns" # Dynamic path relative to backend.py
- ]
- for patterns_dir in possible_dirs:
- cache_dir = patterns_dir / "cached_images"
- if cache_dir.exists():
- print(f"🔍 Searching for preview in cache directory: {cache_dir}")
- # Extensions to try - PNG first for better kiosk compatibility
- extensions = [".png", ".webp", ".jpg", ".jpeg"]
- # Filenames to try - with and without .thr suffix
- base_name = clean_filename.replace(".thr", "")
- filenames_to_try = [clean_filename, base_name]
- # Try direct path in cache_dir first (fastest)
- for filename in filenames_to_try:
- for ext in extensions:
- preview_file = cache_dir / (filename + ext)
- if preview_file.exists():
- print(f"✅ Found preview (direct): {preview_file}")
- return str(preview_file.absolute())
- # If not found directly, search recursively through subdirectories
- print(f"🔍 Searching recursively in {cache_dir}...")
- for filename in filenames_to_try:
- for ext in extensions:
- target_name = filename + ext
- # Use rglob to search recursively
- matches = list(cache_dir.rglob(target_name))
- if matches:
- # Return the first match found
- preview_file = matches[0]
- print(f"✅ Found preview (recursive): {preview_file}")
- return str(preview_file.absolute())
- print("❌ No preview image found")
- return ""
- except Exception as e:
- print(f"💥 Exception finding preview: {e}")
- return ""
-
- @Slot()
- def stopExecution(self):
- asyncio.create_task(self._stop_execution())
-
- async def _stop_execution(self):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
-
- try:
- print("🛑 Calling stop_execution endpoint...")
- # Add timeout to prevent hanging
- timeout = aiohttp.ClientTimeout(total=10) # 10 second timeout
- async with self.session.post(f"{self.base_url}/stop_execution", timeout=timeout) as resp:
- print(f"🛑 Stop execution response status: {resp.status}")
- if resp.status == 200:
- response_data = await resp.json()
- print(f"🛑 Stop execution response: {response_data}")
- self.executionStopped.emit()
- else:
- print(f"❌ Stop execution failed with status: {resp.status}")
- response_text = await resp.text()
- self.errorOccurred.emit(f"Stop failed: {resp.status} - {response_text}")
- except asyncio.TimeoutError:
- print("⏰ Stop execution request timed out")
- self.errorOccurred.emit("Stop execution request timed out")
- except Exception as e:
- print(f"💥 Exception in _stop_execution: {e}")
- self.errorOccurred.emit(str(e))
-
- @Slot()
- def pauseExecution(self):
- print("⏸️ Pausing execution...")
- asyncio.create_task(self._api_call("/pause_execution"))
-
- @Slot()
- def resumeExecution(self):
- print("▶️ Resuming execution...")
- asyncio.create_task(self._api_call("/resume_execution"))
-
- @Slot()
- def skipPattern(self):
- print("⏭️ Skipping pattern...")
- asyncio.create_task(self._api_call("/skip_pattern"))
-
- @Slot(str, float, str, str, bool)
- def executePlaylist(self, playlistName, pauseTime=0.0, clearPattern="adaptive", runMode="single", shuffle=False):
- print(f"🎵 ExecutePlaylist called: playlist='{playlistName}', pauseTime={pauseTime}, clearPattern='{clearPattern}', runMode='{runMode}', shuffle={shuffle}")
- asyncio.create_task(self._execute_playlist(playlistName, pauseTime, clearPattern, runMode, shuffle))
-
- async def _execute_playlist(self, playlistName, pauseTime, clearPattern, runMode, shuffle):
- if not self.session:
- print("❌ Backend session not ready")
- self.errorOccurred.emit("Backend not ready, please try again")
- return
-
- try:
- request_data = {
- "playlist_name": playlistName,
- "pause_time": pauseTime,
- "clear_pattern": clearPattern,
- "run_mode": runMode,
- "shuffle": shuffle
- }
- print(f"🔄 Making HTTP POST to: {self.base_url}/run_playlist")
- print(f"📝 Request payload: {request_data}")
-
- async with self.session.post(
- f"{self.base_url}/run_playlist",
- json=request_data
- ) as resp:
- print(f"📡 Response status: {resp.status}")
-
- response_text = await resp.text()
- print(f"📄 Response body: {response_text}")
-
- if resp.status == 200:
- print(f"✅ Playlist execution request successful: {playlistName}")
- # The playlist will start executing patterns automatically
- # Status updates will come through WebSocket
- else:
- print(f"❌ Playlist execution failed with status {resp.status}")
- self.errorOccurred.emit(f"Failed to execute playlist: {resp.status} - {response_text}")
- except Exception as e:
- print(f"💥 Exception in _execute_playlist: {e}")
- self.errorOccurred.emit(str(e))
-
- async def _api_call(self, endpoint):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
-
- try:
- print(f"📡 Calling API endpoint: {endpoint}")
- # Add timeout to prevent hanging
- timeout = aiohttp.ClientTimeout(total=10) # 10 second timeout
- async with self.session.post(f"{self.base_url}{endpoint}", timeout=timeout) as resp:
- print(f"📡 API response status for {endpoint}: {resp.status}")
- if resp.status == 200:
- response_data = await resp.json()
- print(f"📡 API response for {endpoint}: {response_data}")
- else:
- print(f"❌ API call {endpoint} failed with status: {resp.status}")
- response_text = await resp.text()
- self.errorOccurred.emit(f"API call failed: {endpoint} - {resp.status} - {response_text}")
- except asyncio.TimeoutError:
- print(f"⏰ API call {endpoint} timed out")
- self.errorOccurred.emit(f"API call {endpoint} timed out")
- except Exception as e:
- print(f"💥 Exception in API call {endpoint}: {e}")
- self.errorOccurred.emit(str(e))
-
- # Serial Port Management
- @Slot()
- def refreshSerialPorts(self):
- print("🔌 Refreshing serial ports...")
- asyncio.create_task(self._refresh_serial_ports())
-
- async def _refresh_serial_ports(self):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
-
- try:
- async with self.session.get(f"{self.base_url}/list_serial_ports") as resp:
- if resp.status == 200:
- # The endpoint returns a list directly, not a dictionary
- ports = await resp.json()
- self._serial_ports = ports if isinstance(ports, list) else []
- print(f"📡 Found serial ports: {self._serial_ports}")
- self.serialPortsUpdated.emit(self._serial_ports)
- else:
- print(f"❌ Failed to get serial ports: {resp.status}")
- except Exception as e:
- print(f"💥 Exception refreshing serial ports: {e}")
- self.errorOccurred.emit(str(e))
-
- @Slot(str)
- def connectSerial(self, port):
- print(f"🔗 Connecting to serial port: {port}")
- asyncio.create_task(self._connect_serial(port))
-
- async def _connect_serial(self, port):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
-
- try:
- async with self.session.post(f"{self.base_url}/connect", json={"port": port}) as resp:
- if resp.status == 200:
- print(f"✅ Connected to {port}")
- self._serial_connected = True
- self._current_port = port
- self.serialConnectionChanged.emit(True)
- self.currentPortChanged.emit(port)
- else:
- response_text = await resp.text()
- print(f"❌ Failed to connect to {port}: {resp.status} - {response_text}")
- self.errorOccurred.emit(f"Failed to connect: {response_text}")
- except Exception as e:
- print(f"💥 Exception connecting to serial: {e}")
- self.errorOccurred.emit(str(e))
-
- @Slot()
- def disconnectSerial(self):
- print("🔌 Disconnecting serial...")
- asyncio.create_task(self._disconnect_serial())
-
- async def _disconnect_serial(self):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
-
- try:
- async with self.session.post(f"{self.base_url}/disconnect") as resp:
- if resp.status == 200:
- print("✅ Disconnected from serial")
- self._serial_connected = False
- self._current_port = ""
- self.serialConnectionChanged.emit(False)
- self.currentPortChanged.emit("")
- else:
- response_text = await resp.text()
- print(f"❌ Failed to disconnect: {resp.status} - {response_text}")
- except Exception as e:
- print(f"💥 Exception disconnecting serial: {e}")
- self.errorOccurred.emit(str(e))
-
- # Hardware Movement Controls
- @Slot()
- def sendHome(self):
- print("🏠 Sending home command...")
- asyncio.create_task(self._send_home())
- async def _send_home(self):
- """Send home command without timeout - homing can take up to 90 seconds."""
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
- try:
- print("🏠 Calling /send_home (no timeout - homing can take up to 90s)...")
- async with self.session.post(f"{self.base_url}/send_home") as resp:
- print(f"🏠 Home command response status: {resp.status}")
- if resp.status == 200:
- response_data = await resp.json()
- print(f"✅ Home command successful: {response_data}")
- else:
- print(f"❌ Home command failed with status: {resp.status}")
- response_text = await resp.text()
- self.errorOccurred.emit(f"Home failed: {resp.status} - {response_text}")
- except Exception as e:
- print(f"💥 Exception in home command: {e}")
- self.errorOccurred.emit(str(e))
-
- @Slot()
- def moveToCenter(self):
- print("🎯 Moving to center...")
- asyncio.create_task(self._api_call("/move_to_center"))
-
- @Slot()
- def moveToPerimeter(self):
- print("⭕ Moving to perimeter...")
- asyncio.create_task(self._api_call("/move_to_perimeter"))
-
- # Speed Control
- @Slot(int)
- def setSpeed(self, speed):
- print(f"⚡ Setting speed to: {speed}")
- asyncio.create_task(self._set_speed(speed))
-
- async def _set_speed(self, speed):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
-
- try:
- async with self.session.post(f"{self.base_url}/set_speed", json={"speed": speed}) as resp:
- if resp.status == 200:
- print(f"✅ Speed set to {speed}")
- self._current_speed = speed
- self.speedChanged.emit(speed)
- else:
- response_text = await resp.text()
- print(f"❌ Failed to set speed: {resp.status} - {response_text}")
- except Exception as e:
- print(f"💥 Exception setting speed: {e}")
- self.errorOccurred.emit(str(e))
-
- # Auto Play on Boot Setting
- @Slot(bool)
- def setAutoPlayOnBoot(self, enabled):
- print(f"🚀 Setting auto play on boot: {enabled}")
- asyncio.create_task(self._set_auto_play_on_boot(enabled))
-
- async def _set_auto_play_on_boot(self, enabled):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
-
- try:
- # Use the kiosk mode API endpoint for auto-play on boot
- async with self.session.post(f"{self.base_url}/api/kiosk-mode", json={"enabled": enabled}) as resp:
- if resp.status == 200:
- print(f"✅ Auto play on boot set to {enabled}")
- self._auto_play_on_boot = enabled
- else:
- response_text = await resp.text()
- print(f"❌ Failed to set auto play: {resp.status} - {response_text}")
- except Exception as e:
- print(f"💥 Exception setting auto play: {e}")
- self.errorOccurred.emit(str(e))
-
- # Note: Screen timeout is now managed locally in touch_settings.json
- # The main application doesn't have a kiosk-mode endpoint, so we manage this locally
-
- # Load Settings
- def _load_local_settings(self):
- """Load settings from local JSON file"""
- try:
- if os.path.exists(self.SETTINGS_FILE):
- with open(self.SETTINGS_FILE, 'r') as f:
- settings = json.load(f)
-
- screen_timeout = settings.get('screen_timeout', self.DEFAULT_SCREEN_TIMEOUT)
- if isinstance(screen_timeout, (int, float)) and screen_timeout >= 0:
- self._screen_timeout = int(screen_timeout)
- if screen_timeout == 0:
- print(f"🖥️ Loaded screen timeout from local settings: Never (0s)")
- else:
- print(f"🖥️ Loaded screen timeout from local settings: {self._screen_timeout}s")
- else:
- print(f"⚠️ Invalid screen timeout in settings, using default: {self.DEFAULT_SCREEN_TIMEOUT}s")
- else:
- print(f"📄 No local settings file found, creating with defaults")
- self._save_local_settings()
- except Exception as e:
- print(f"❌ Error loading local settings: {e}, using defaults")
- self._screen_timeout = self.DEFAULT_SCREEN_TIMEOUT
-
- def _save_local_settings(self):
- """Save settings to local JSON file"""
- try:
- settings = {
- 'screen_timeout': self._screen_timeout,
- 'version': '1.0'
- }
- with open(self.SETTINGS_FILE, 'w') as f:
- json.dump(settings, f, indent=2)
- print(f"💾 Saved local settings: screen_timeout={self._screen_timeout}s")
- except Exception as e:
- print(f"❌ Error saving local settings: {e}")
- @Slot()
- def loadControlSettings(self):
- print("📋 Loading control settings...")
- asyncio.create_task(self._load_settings())
-
- async def _load_settings(self):
- if not self.session:
- print("⚠️ Session not ready for loading settings")
- return
-
- try:
- # Load auto play setting from the working endpoint
- timeout = aiohttp.ClientTimeout(total=5) # 5 second timeout
- async with self.session.get(f"{self.base_url}/api/auto_play-mode", timeout=timeout) as resp:
- if resp.status == 200:
- data = await resp.json()
- self._auto_play_on_boot = data.get("enabled", False)
- print(f"🚀 Loaded auto play setting: {self._auto_play_on_boot}")
- # Note: Screen timeout is managed locally, not from server
-
- # Serial status will be handled by WebSocket updates automatically
- # But we still load the initial port info if connected
- async with self.session.get(f"{self.base_url}/serial_status", timeout=timeout) as resp:
- if resp.status == 200:
- data = await resp.json()
- initial_connected = data.get("connected", False)
- current_port = data.get("port", "")
- print(f"🔌 Initial serial status: connected={initial_connected}, port={current_port}")
-
- # Only update if WebSocket hasn't already set this
- if initial_connected and current_port and not self._current_port:
- self._current_port = current_port
- self.currentPortChanged.emit(current_port)
-
- # Set initial connection status (WebSocket will take over from here)
- if self._serial_connected != initial_connected:
- self._serial_connected = initial_connected
- self.serialConnectionChanged.emit(initial_connected)
-
- print("✅ Settings loaded - WebSocket will handle real-time updates")
- self.settingsLoaded.emit()
-
- except aiohttp.ClientConnectorError as e:
- print(f"⚠️ Cannot connect to backend at {self.base_url}: {e}")
- # Don't emit error - this is expected when backend is down
- # WebSocket will handle reconnection
- except asyncio.TimeoutError:
- print(f"⏰ Timeout loading settings from {self.base_url}")
- # Don't emit error - expected when backend is slow/down
- except Exception as e:
- print(f"💥 Unexpected error loading settings: {e}")
- # Only emit error for unexpected issues
- if "ssl" not in str(e).lower():
- self.errorOccurred.emit(str(e))
-
- # Screen Management Properties
- @Property(bool, notify=screenStateChanged)
- def screenOn(self):
- return self._screen_on
-
- @Property(int, notify=screenTimeoutChanged)
- def screenTimeout(self):
- return self._screen_timeout
-
- @screenTimeout.setter
- def setScreenTimeout(self, timeout):
- if self._screen_timeout != timeout:
- old_timeout = self._screen_timeout
- self._screen_timeout = timeout
- print(f"🖥️ Screen timeout changed from {old_timeout}s to {timeout}s")
-
- # Save to local settings
- self._save_local_settings()
-
- # Emit change signal for QML
- self.screenTimeoutChanged.emit(timeout)
-
- @Slot(result='QStringList')
- def getScreenTimeoutOptions(self):
- """Get list of screen timeout options for QML"""
- return list(self.TIMEOUT_OPTIONS.keys())
-
- @Slot(result=str)
- def getCurrentScreenTimeoutOption(self):
- """Get current screen timeout as option string"""
- current_timeout = self._screen_timeout
- for option, value in self.TIMEOUT_OPTIONS.items():
- if value == current_timeout:
- return option
- # If custom value, return closest match or custom description
- if current_timeout == 0:
- return "Never"
- elif current_timeout < 60:
- return f"{current_timeout} seconds"
- elif current_timeout < 3600:
- minutes = current_timeout // 60
- return f"{minutes} minute{'s' if minutes != 1 else ''}"
- else:
- hours = current_timeout // 3600
- return f"{hours} hour{'s' if hours != 1 else ''}"
-
- @Slot(str)
- def setScreenTimeoutByOption(self, option):
- """Set screen timeout by option string"""
- if option in self.TIMEOUT_OPTIONS:
- timeout_value = self.TIMEOUT_OPTIONS[option]
- # Don't call the setter method, just assign to trigger the property setter
- if self._screen_timeout != timeout_value:
- old_timeout = self._screen_timeout
- self._screen_timeout = timeout_value
- print(f"🖥️ Screen timeout changed from {old_timeout}s to {timeout_value}s ({option})")
-
- # Save to local settings
- self._save_local_settings()
-
- # Emit change signal for QML
- self.screenTimeoutChanged.emit(timeout_value)
- else:
- print(f"⚠️ Unknown timeout option: {option}")
-
- @Slot(result='QStringList')
- def getSpeedOptions(self):
- """Get list of speed options for QML"""
- return list(self.SPEED_OPTIONS.keys())
-
- @Slot(result=str)
- def getCurrentSpeedOption(self):
- """Get current speed as option string"""
- current_speed = self._current_speed
- for option, value in self.SPEED_OPTIONS.items():
- if value == current_speed:
- return option
- # If custom value, return as string
- return str(current_speed)
-
- @Slot(str)
- def setSpeedByOption(self, option):
- """Set speed by option string"""
- if option in self.SPEED_OPTIONS:
- speed_value = self.SPEED_OPTIONS[option]
- # Don't call setter method, just assign directly
- if self._current_speed != speed_value:
- old_speed = self._current_speed
- self._current_speed = speed_value
- print(f"⚡ Speed changed from {old_speed} to {speed_value} ({option})")
-
- # Send to main application
- asyncio.create_task(self._set_speed_async(speed_value))
-
- # Emit change signal for QML
- self.speedChanged.emit(speed_value)
- else:
- print(f"⚠️ Unknown speed option: {option}")
-
- async def _set_speed_async(self, speed):
- """Send speed to main application asynchronously"""
- if not self.session:
- return
- try:
- async with self.session.post(f"{self.base_url}/set_speed", json={"speed": speed}) as resp:
- if resp.status == 200:
- print(f"✅ Speed set successfully: {speed}")
- else:
- print(f"❌ Failed to set speed: {resp.status}")
- except Exception as e:
- print(f"💥 Exception setting speed: {e}")
-
- # Pause Between Patterns Methods
- @Slot(result='QStringList')
- def getPauseOptions(self):
- """Get list of pause between patterns options for QML"""
- return list(self.PAUSE_OPTIONS.keys())
-
- @Slot(result=str)
- def getCurrentPauseOption(self):
- """Get current pause between patterns as option string"""
- current_pause = self._pause_between_patterns
- for option, value in self.PAUSE_OPTIONS.items():
- if value == current_pause:
- return option
- # If custom value, return descriptive string
- if current_pause == 0:
- return "0s"
- elif current_pause < 60:
- return f"{current_pause}s"
- elif current_pause < 3600:
- minutes = current_pause // 60
- return f"{minutes} min"
- else:
- hours = current_pause // 3600
- return f"{hours} hour"
-
- @Slot(str)
- def setPauseByOption(self, option):
- """Set pause between patterns by option string"""
- if option in self.PAUSE_OPTIONS:
- pause_value = self.PAUSE_OPTIONS[option]
- if self._pause_between_patterns != pause_value:
- old_pause = self._pause_between_patterns
- self._pause_between_patterns = pause_value
- print(f"⏸️ Pause between patterns changed from {old_pause}s to {pause_value}s ({option})")
-
- # Emit change signal for QML
- self.pauseBetweenPatternsChanged.emit(pause_value)
- else:
- print(f"⚠️ Unknown pause option: {option}")
-
- # Property for pause between patterns
- @Property(int, notify=pauseBetweenPatternsChanged)
- def pauseBetweenPatterns(self):
- """Get current pause between patterns in seconds"""
- return self._pause_between_patterns
-
- # Screen Control Methods
- @Slot()
- def turnScreenOn(self):
- """Turn the screen on and reset activity timer"""
- if not self._screen_on:
- self._turn_screen_on()
- self._reset_activity_timer()
-
- @Slot()
- def turnScreenOff(self):
- """Turn the screen off"""
- self._turn_screen_off()
- # Start touch monitoring after manual screen off
- QTimer.singleShot(1000, self._start_touch_monitoring) # 1 second delay
-
- @Slot()
- def resetActivityTimer(self):
- """Reset the activity timer (call on user interaction)"""
- self._reset_activity_timer()
- if not self._screen_on:
- self._turn_screen_on()
-
- def _turn_screen_on(self):
- """Internal method to turn screen on"""
- with self._screen_transition_lock:
- # Debounce: Don't turn on if we just changed state
- time_since_change = time.time() - self._last_screen_change
- if time_since_change < 2.0: # 2 second debounce
- print(f"🖥️ Screen state change blocked (debounce: {time_since_change:.1f}s < 2s)")
- return
-
- if self._screen_on:
- print("🖥️ Screen already ON, skipping")
- return
-
- try:
- # Use the working screen-on script if available
- screen_on_script = Path('/usr/local/bin/screen-on')
- if screen_on_script.exists():
- result = subprocess.run(['sudo', '/usr/local/bin/screen-on'],
- capture_output=True, text=True, timeout=5)
- if result.returncode == 0:
- print("🖥️ Screen turned ON (screen-on script)")
- else:
- print(f"⚠️ screen-on script failed: {result.stderr}")
- else:
- # Fallback: Manual control matching the script
- # Unblank framebuffer and restore backlight
- max_brightness = 255
- try:
- result = subprocess.run(['cat', '/sys/class/backlight/*/max_brightness'],
- shell=True, capture_output=True, text=True, timeout=2)
- if result.returncode == 0 and result.stdout.strip():
- max_brightness = int(result.stdout.strip())
- except:
- pass
-
- subprocess.run(['sudo', 'sh', '-c',
- f'echo 0 > /sys/class/graphics/fb0/blank && echo {max_brightness} > /sys/class/backlight/*/brightness'],
- check=False, timeout=5)
- print(f"🖥️ Screen turned ON (manual, brightness: {max_brightness})")
-
- self._screen_on = True
- self._last_screen_change = time.time()
- self.screenStateChanged.emit(True)
- # Signal touch monitor thread to stop
- self._stop_touch_monitor.set()
- # Give thread time to exit gracefully
- if self._touch_monitor_thread and self._touch_monitor_thread.is_alive():
- print("🖥️ Waiting for touch monitor thread to stop...")
- self._touch_monitor_thread.join(timeout=1.0)
- if self._touch_monitor_thread.is_alive():
- print("⚠️ Touch monitor thread still running (subprocess will be killed)")
- except Exception as e:
- print(f"❌ Failed to turn screen on: {e}")
-
- def _turn_screen_off(self):
- """Internal method to turn screen off"""
- print("🖥️ _turn_screen_off() called")
- with self._screen_transition_lock:
- # Debounce: Don't turn off if we just changed state
- time_since_change = time.time() - self._last_screen_change
- if time_since_change < 2.0: # 2 second debounce
- print(f"🖥️ Screen state change blocked (debounce: {time_since_change:.1f}s < 2s)")
- return
-
- if not self._screen_on:
- print("🖥️ Screen already OFF, skipping")
- return
-
- try:
- # Use the working screen-off script if available
- screen_off_script = Path('/usr/local/bin/screen-off')
- print(f"🖥️ Checking for screen-off script at: {screen_off_script}")
- print(f"🖥️ Script exists: {screen_off_script.exists()}")
-
- if screen_off_script.exists():
- print("🖥️ Executing screen-off script...")
- result = subprocess.run(['sudo', '/usr/local/bin/screen-off'],
- capture_output=True, text=True, timeout=10)
- print(f"🖥️ Script return code: {result.returncode}")
- if result.stdout:
- print(f"🖥️ Script stdout: {result.stdout}")
- if result.stderr:
- print(f"🖥️ Script stderr: {result.stderr}")
-
- if result.returncode == 0:
- print("✅ Screen turned OFF (screen-off script)")
- else:
- print(f"⚠️ screen-off script failed: return code {result.returncode}")
- else:
- print("🖥️ Using manual screen control...")
- # Fallback: Manual control matching the script
- # Blank framebuffer and turn off backlight
- subprocess.run(['sudo', 'sh', '-c',
- 'echo 0 > /sys/class/backlight/*/brightness && echo 1 > /sys/class/graphics/fb0/blank'],
- check=False, timeout=5)
- print("🖥️ Screen turned OFF (manual)")
-
- self._screen_on = False
- self._last_screen_change = time.time()
- self.screenStateChanged.emit(False)
- print("🖥️ Screen state set to OFF, signal emitted")
-
- except Exception as e:
- print(f"❌ Failed to turn screen off: {e}")
- import traceback
- traceback.print_exc()
-
- def _reset_activity_timer(self):
- """Reset the last activity timestamp"""
- old_time = self._last_activity
- self._last_activity = time.time()
- time_since_last = self._last_activity - old_time
- if time_since_last > 1: # Only log if it's been more than 1 second
- print(f"🖥️ Activity detected - timer reset (was idle for {time_since_last:.1f}s)")
-
- def _check_screen_timeout(self):
- """Check if screen should be turned off due to inactivity"""
- if self._screen_on and self._screen_timeout > 0: # Only check if timeout is enabled
- idle_time = time.time() - self._last_activity
- # Log every 10 seconds when getting close to timeout
- if idle_time > self._screen_timeout - 10 and idle_time % 10 < 1:
- print(f"🖥️ Screen idle for {idle_time:.0f}s (timeout at {self._screen_timeout}s)")
-
- if idle_time > self._screen_timeout:
- print(f"🖥️ Screen timeout reached! Idle for {idle_time:.0f}s (timeout: {self._screen_timeout}s)")
- self._turn_screen_off()
- # DISABLED FOR TESTING - verify touch monitoring causes 100% CPU
- # Add delay before starting touch monitoring to avoid catching residual events
- # QTimer.singleShot(1000, self._start_touch_monitoring) # 1 second delay
- # If timeout is 0 (Never), screen stays on indefinitely
-
- def _start_touch_monitoring(self):
- """Start monitoring touch input for wake-up"""
- if self._touch_monitor_thread is None or not self._touch_monitor_thread.is_alive():
- # Reset stop flag before starting new thread
- self._stop_touch_monitor.clear()
- self._touch_monitor_thread = threading.Thread(target=self._monitor_touch_input, daemon=True)
- self._touch_monitor_thread.start()
-
- def _monitor_touch_input(self):
- """Monitor touch input to wake up the screen"""
- import select
- print("👆 Starting touch monitoring for wake-up")
- process = None
- try:
- # Add delay to let any residual touch events clear
- # Check stop flag during delay to allow quick exit
- for _ in range(20): # 2 seconds in 100ms increments
- if self._stop_touch_monitor.is_set() or self._screen_on:
- print("👆 Touch monitoring cancelled during startup delay")
- return
- time.sleep(0.1)
- # Flush touch device to clear any buffered events
- try:
- import fcntl
- # Find and flush touch device
- for i in range(5):
- device = f'/dev/input/event{i}'
- if Path(device).exists():
- try:
- # Read and discard any pending events
- with open(device, 'rb') as f:
- fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
- while True:
- try:
- f.read(24) # Standard input_event size
- except:
- break
- print(f"👆 Flushed touch device: {device}")
- break
- except:
- continue
- except Exception as e:
- print(f"👆 Could not flush touch device: {e}")
- # Check again before starting monitoring
- if self._stop_touch_monitor.is_set() or self._screen_on:
- print("👆 Touch monitoring cancelled before starting")
- return
- print("👆 Touch monitoring active")
- # Use external touch monitor script if available - but only if not too sensitive
- touch_monitor_script = Path('/usr/local/bin/touch-monitor')
- use_script = touch_monitor_script.exists() and hasattr(self, '_use_touch_script') and self._use_touch_script
- if use_script:
- print("👆 Using touch-monitor script")
- # Add extra delay for script-based monitoring since it's more sensitive
- for _ in range(30): # 3 seconds in 100ms increments
- if self._stop_touch_monitor.is_set() or self._screen_on:
- print("👆 Touch monitoring cancelled during script delay")
- return
- time.sleep(0.1)
- print("👆 Starting touch-monitor script after flush delay")
- process = subprocess.Popen(['sudo', '/usr/local/bin/touch-monitor'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- # Wait for script to detect touch and wake screen
- while not self._screen_on and not self._stop_touch_monitor.is_set():
- if process.poll() is not None: # Script exited (touch detected)
- print("👆 Touch detected by monitor script")
- self._turn_screen_on()
- self._reset_activity_timer()
- break
- time.sleep(0.1)
- else:
- # Fallback: Direct monitoring
- # Find touch input device
- touch_device = None
- for i in range(5): # Check event0 through event4
- device = f'/dev/input/event{i}'
- if Path(device).exists():
- # Check if it's a touch device
- try:
- info = subprocess.run(['udevadm', 'info', '--query=all', f'--name={device}'],
- capture_output=True, text=True, timeout=2)
- if 'touch' in info.stdout.lower() or 'ft5406' in info.stdout.lower():
- touch_device = device
- break
- except:
- pass
- if not touch_device:
- touch_device = '/dev/input/event0' # Default fallback
- print(f"👆 Monitoring touch device: {touch_device}")
- # Try evtest first (more responsive to single taps)
- evtest_available = subprocess.run(['which', 'evtest'],
- capture_output=True).returncode == 0
- if evtest_available:
- # Use evtest which is more sensitive to single touches
- # Run in binary mode to avoid Python text buffering issues with select()
- print("👆 Using evtest for touch detection (binary mode)")
- process = subprocess.Popen(['sudo', 'evtest', touch_device],
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL)
- # Skip initial device info output (wait for "Testing ..." line)
- # This prevents tight looping during evtest startup
- print("👆 Waiting for evtest initialization...")
- init_timeout = time.time() + 5 # 5 second timeout for init
- while time.time() < init_timeout:
- if self._stop_touch_monitor.is_set() or self._screen_on:
- break
- ready, _, _ = select.select([process.stdout], [], [], 0.5)
- if ready:
- line = process.stdout.readline()
- if line and b'Testing' in line:
- print("👆 evtest initialized, now monitoring for touch events")
- break
- # Now monitor for actual touch events
- while not self._screen_on and not self._stop_touch_monitor.is_set():
- # Check if process exited
- if process.poll() is not None:
- print("👆 evtest process exited unexpectedly")
- break
- # Use select with timeout for non-blocking read
- try:
- ready, _, _ = select.select([process.stdout], [], [], 0.5)
- if ready:
- line = process.stdout.readline()
- if line and b'Event:' in line:
- print("👆 Touch detected via evtest - waking screen")
- self._turn_screen_on()
- self._reset_activity_timer()
- break
- except Exception as e:
- print(f"👆 Error reading evtest output: {e}")
- break
- else:
- # Fallback: Use cat with single byte read (more responsive)
- print("👆 Using cat for touch detection")
- process = subprocess.Popen(['sudo', 'cat', touch_device],
- stdout=subprocess.PIPE,
- stderr=subprocess.DEVNULL)
- # Wait for any data (even 1 byte indicates touch)
- while not self._screen_on and not self._stop_touch_monitor.is_set():
- # Check if process exited
- if process.poll() is not None:
- print("👆 cat process exited unexpectedly")
- break
- try:
- # Non-blocking check for data with timeout
- ready, _, _ = select.select([process.stdout], [], [], 0.5)
- if ready:
- data = process.stdout.read(1) # Read just 1 byte
- if data:
- print("👆 Touch detected - waking screen")
- self._turn_screen_on()
- self._reset_activity_timer()
- break
- except Exception as e:
- print(f"👆 Error reading touch input: {e}")
- break
- except Exception as e:
- print(f"❌ Error monitoring touch input: {e}")
- finally:
- # Always clean up subprocess
- if process is not None and process.poll() is None:
- print("👆 Terminating touch monitoring subprocess")
- process.terminate()
- try:
- process.wait(timeout=2)
- except subprocess.TimeoutExpired:
- print("👆 Force killing touch monitoring subprocess")
- process.kill()
- process.wait()
- print("👆 Touch monitoring stopped")
- # ==================== LED Control Methods ====================
- # LED Properties
- @Property(str, notify=ledStatusChanged)
- def ledProvider(self):
- return self._led_provider
- @Property(bool, notify=ledStatusChanged)
- def ledConnected(self):
- return self._led_connected
- @Property(bool, notify=ledStatusChanged)
- def ledPowerOn(self):
- return self._led_power_on
- @Property(int, notify=ledStatusChanged)
- def ledBrightness(self):
- return self._led_brightness
- @Property(list, notify=ledEffectsLoaded)
- def ledEffects(self):
- return self._led_effects
- @Property(list, notify=ledPalettesLoaded)
- def ledPalettes(self):
- return self._led_palettes
- @Property(int, notify=ledStatusChanged)
- def ledCurrentEffect(self):
- return self._led_current_effect
- @Property(int, notify=ledStatusChanged)
- def ledCurrentPalette(self):
- return self._led_current_palette
- @Property(str, notify=ledStatusChanged)
- def ledColor(self):
- return self._led_color
- @Slot()
- def loadLedConfig(self):
- """Load LED configuration from the server"""
- print("💡 Loading LED configuration...")
- asyncio.create_task(self._load_led_config())
- async def _load_led_config(self):
- if not self.session:
- print("⚠️ Session not ready for LED config")
- return
- try:
- timeout = aiohttp.ClientTimeout(total=5)
- async with self.session.get(f"{self.base_url}/get_led_config", timeout=timeout) as resp:
- if resp.status == 200:
- data = await resp.json()
- self._led_provider = data.get("provider", "none")
- print(f"💡 LED provider: {self._led_provider}")
- if self._led_provider == "dw_leds":
- # Load DW LEDs status
- await self._load_led_status()
- await self._load_led_effects()
- await self._load_led_palettes()
- self.ledStatusChanged.emit()
- else:
- print(f"❌ Failed to get LED config: {resp.status}")
- except Exception as e:
- print(f"💥 Exception loading LED config: {e}")
- async def _load_led_status(self):
- """Load current LED status"""
- if not self.session:
- return
- try:
- timeout = aiohttp.ClientTimeout(total=5)
- async with self.session.get(f"{self.base_url}/api/dw_leds/status", timeout=timeout) as resp:
- if resp.status == 200:
- data = await resp.json()
- self._led_connected = data.get("connected", False)
- self._led_power_on = data.get("power_on", False)
- self._led_brightness = data.get("brightness", 100)
- self._led_current_effect = data.get("current_effect", 0)
- self._led_current_palette = data.get("current_palette", 0)
- print(f"💡 LED status: connected={self._led_connected}, power={self._led_power_on}, brightness={self._led_brightness}")
- self.ledStatusChanged.emit()
- except Exception as e:
- print(f"💥 Exception loading LED status: {e}")
- async def _load_led_effects(self):
- """Load available LED effects"""
- if not self.session:
- return
- try:
- timeout = aiohttp.ClientTimeout(total=5)
- async with self.session.get(f"{self.base_url}/api/dw_leds/effects", timeout=timeout) as resp:
- if resp.status == 200:
- data = await resp.json()
- # API returns effects as [[id, name], ...] arrays
- raw_effects = data.get("effects", [])
- # Convert to list of dicts for easier use in QML
- self._led_effects = [{"id": e[0], "name": e[1]} for e in raw_effects if len(e) >= 2]
- print(f"💡 Loaded {len(self._led_effects)} LED effects")
- self.ledEffectsLoaded.emit(self._led_effects)
- except Exception as e:
- print(f"💥 Exception loading LED effects: {e}")
- async def _load_led_palettes(self):
- """Load available LED palettes"""
- if not self.session:
- return
- try:
- timeout = aiohttp.ClientTimeout(total=5)
- async with self.session.get(f"{self.base_url}/api/dw_leds/palettes", timeout=timeout) as resp:
- if resp.status == 200:
- data = await resp.json()
- # API returns palettes as [[id, name], ...] arrays
- raw_palettes = data.get("palettes", [])
- # Convert to list of dicts for easier use in QML
- self._led_palettes = [{"id": p[0], "name": p[1]} for p in raw_palettes if len(p) >= 2]
- print(f"💡 Loaded {len(self._led_palettes)} LED palettes")
- self.ledPalettesLoaded.emit(self._led_palettes)
- except Exception as e:
- print(f"💥 Exception loading LED palettes: {e}")
- @Slot()
- def refreshLedStatus(self):
- """Refresh LED status from server"""
- print("💡 Refreshing LED status...")
- asyncio.create_task(self._load_led_status())
- @Slot()
- def toggleLedPower(self):
- """Toggle LED power on/off"""
- print("💡 Toggling LED power...")
- asyncio.create_task(self._toggle_led_power())
- async def _toggle_led_power(self):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
- try:
- async with self.session.post(
- f"{self.base_url}/api/dw_leds/power",
- json={"state": 2} # Toggle
- ) as resp:
- if resp.status == 200:
- data = await resp.json()
- self._led_power_on = data.get("power_on", False)
- self._led_connected = data.get("connected", False)
- print(f"💡 LED power toggled: {self._led_power_on}")
- self.ledStatusChanged.emit()
- else:
- self.errorOccurred.emit(f"Failed to toggle LED power: {resp.status}")
- except Exception as e:
- print(f"💥 Exception toggling LED power: {e}")
- self.errorOccurred.emit(str(e))
- @Slot(bool)
- def setLedPower(self, on):
- """Set LED power state (True=on, False=off)"""
- print(f"💡 Setting LED power: {on}")
- asyncio.create_task(self._set_led_power(on))
- async def _set_led_power(self, on):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
- try:
- async with self.session.post(
- f"{self.base_url}/api/dw_leds/power",
- json={"state": 1 if on else 0}
- ) as resp:
- if resp.status == 200:
- data = await resp.json()
- self._led_power_on = data.get("power_on", False)
- self._led_connected = data.get("connected", False)
- print(f"💡 LED power set: {self._led_power_on}")
- self.ledStatusChanged.emit()
- else:
- self.errorOccurred.emit(f"Failed to set LED power: {resp.status}")
- except Exception as e:
- print(f"💥 Exception setting LED power: {e}")
- self.errorOccurred.emit(str(e))
- @Slot(int)
- def setLedBrightness(self, value):
- """Set LED brightness (0-100)"""
- print(f"💡 Setting LED brightness: {value}")
- asyncio.create_task(self._set_led_brightness(value))
- async def _set_led_brightness(self, value):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
- try:
- async with self.session.post(
- f"{self.base_url}/api/dw_leds/brightness",
- json={"value": value}
- ) as resp:
- if resp.status == 200:
- self._led_brightness = value
- print(f"💡 LED brightness set: {value}")
- self.ledStatusChanged.emit()
- else:
- self.errorOccurred.emit(f"Failed to set brightness: {resp.status}")
- except Exception as e:
- print(f"💥 Exception setting LED brightness: {e}")
- self.errorOccurred.emit(str(e))
- @Slot(int, int, int)
- def setLedColor(self, r, g, b):
- """Set LED color using RGB values"""
- print(f"💡 Setting LED color: RGB({r}, {g}, {b})")
- asyncio.create_task(self._set_led_color(r, g, b))
- async def _set_led_color(self, r, g, b):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
- try:
- async with self.session.post(
- f"{self.base_url}/api/dw_leds/color",
- json={"color": [r, g, b]}
- ) as resp:
- if resp.status == 200:
- self._led_color = f"#{r:02x}{g:02x}{b:02x}"
- print(f"💡 LED color set: {self._led_color}")
- self.ledStatusChanged.emit()
- else:
- self.errorOccurred.emit(f"Failed to set color: {resp.status}")
- except Exception as e:
- print(f"💥 Exception setting LED color: {e}")
- self.errorOccurred.emit(str(e))
- @Slot(str)
- def setLedColorHex(self, hexColor):
- """Set LED color using hex string (e.g., '#ff0000')"""
- # Parse hex color
- hexColor = hexColor.lstrip('#')
- if len(hexColor) == 6:
- r = int(hexColor[0:2], 16)
- g = int(hexColor[2:4], 16)
- b = int(hexColor[4:6], 16)
- self.setLedColor(r, g, b)
- else:
- print(f"⚠️ Invalid hex color: {hexColor}")
- @Slot(int)
- def setLedEffect(self, effectId):
- """Set LED effect by ID"""
- print(f"💡 Setting LED effect: {effectId}")
- asyncio.create_task(self._set_led_effect(effectId))
- async def _set_led_effect(self, effectId):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
- try:
- async with self.session.post(
- f"{self.base_url}/api/dw_leds/effect",
- json={"effect_id": effectId}
- ) as resp:
- if resp.status == 200:
- self._led_current_effect = effectId
- print(f"💡 LED effect set: {effectId}")
- self.ledStatusChanged.emit()
- else:
- self.errorOccurred.emit(f"Failed to set effect: {resp.status}")
- except Exception as e:
- print(f"💥 Exception setting LED effect: {e}")
- self.errorOccurred.emit(str(e))
- @Slot(int)
- def setLedPalette(self, paletteId):
- """Set LED palette by ID"""
- print(f"💡 Setting LED palette: {paletteId}")
- asyncio.create_task(self._set_led_palette(paletteId))
- async def _set_led_palette(self, paletteId):
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
- try:
- async with self.session.post(
- f"{self.base_url}/api/dw_leds/palette",
- json={"palette_id": paletteId}
- ) as resp:
- if resp.status == 200:
- self._led_current_palette = paletteId
- print(f"💡 LED palette set: {paletteId}")
- self.ledStatusChanged.emit()
- else:
- self.errorOccurred.emit(f"Failed to set palette: {resp.status}")
- except Exception as e:
- print(f"💥 Exception setting LED palette: {e}")
- self.errorOccurred.emit(str(e))
- # ==================== Pattern Refresh Methods ====================
- @Slot()
- def refreshPatterns(self):
- """Refresh pattern cache - converts new WebPs to PNG and rescans patterns"""
- print("🔄 Refreshing patterns...")
- asyncio.create_task(self._refresh_patterns())
- async def _refresh_patterns(self):
- """Async implementation of pattern refresh"""
- try:
- from png_cache_manager import PngCacheManager
- cache_manager = PngCacheManager()
- success = await cache_manager.ensure_png_cache_available()
- message = "Patterns refreshed" if success else "Refreshed with warnings"
- print(f"✅ Pattern refresh completed: {message}")
- self.patternsRefreshCompleted.emit(True, message)
- except Exception as e:
- print(f"❌ Pattern refresh failed: {e}")
- self.patternsRefreshCompleted.emit(False, str(e))
- # ==================== System Control Methods ====================
- @Slot()
- def restartBackend(self):
- """Restart the dune-weaver backend via API"""
- print("🔄 Requesting backend restart via API...")
- asyncio.create_task(self._restart_backend())
- async def _restart_backend(self):
- """Async implementation of backend restart"""
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
- try:
- async with self.session.post(f"{self.base_url}/api/system/restart") as resp:
- if resp.status == 200:
- print("✅ Backend restart initiated via API")
- else:
- response_text = await resp.text()
- print(f"❌ Failed to restart backend: {resp.status} - {response_text}")
- self.errorOccurred.emit(f"Failed to restart: {response_text}")
- except Exception as e:
- print(f"💥 Exception restarting backend: {e}")
- self.errorOccurred.emit(str(e))
- @Slot()
- def shutdownPi(self):
- """Shutdown the Raspberry Pi via API"""
- print("⏻ Requesting Pi shutdown via API...")
- asyncio.create_task(self._shutdown_pi())
- async def _shutdown_pi(self):
- """Async implementation of Pi shutdown"""
- if not self.session:
- self.errorOccurred.emit("Backend not ready")
- return
- try:
- async with self.session.post(f"{self.base_url}/api/system/shutdown") as resp:
- if resp.status == 200:
- print("✅ Shutdown initiated via API")
- else:
- response_text = await resp.text()
- print(f"❌ Failed to shutdown: {resp.status} - {response_text}")
- self.errorOccurred.emit(f"Failed to shutdown: {response_text}")
- except Exception as e:
- print(f"💥 Exception during shutdown: {e}")
- self.errorOccurred.emit(str(e))
- # ==================== Playlist Management Methods ====================
- @Slot(str)
- def createPlaylist(self, playlistName):
- """Create a new empty playlist"""
- print(f"📋 Creating playlist: {playlistName}")
- asyncio.create_task(self._create_playlist(playlistName))
- async def _create_playlist(self, playlistName):
- """Async implementation of playlist creation"""
- if not self.session:
- self.playlistCreated.emit(False, "Backend not ready")
- return
- try:
- async with self.session.post(
- f"{self.base_url}/create_playlist",
- json={"playlist_name": playlistName, "files": []}
- ) as resp:
- if resp.status == 200:
- print(f"✅ Playlist created: {playlistName}")
- self.playlistCreated.emit(True, f"Created: {playlistName}")
- else:
- response_text = await resp.text()
- print(f"❌ Failed to create playlist: {resp.status} - {response_text}")
- self.playlistCreated.emit(False, f"Failed: {response_text}")
- except Exception as e:
- print(f"💥 Exception creating playlist: {e}")
- self.playlistCreated.emit(False, str(e))
- @Slot(str)
- def deletePlaylist(self, playlistName):
- """Delete a playlist"""
- print(f"🗑️ Deleting playlist: {playlistName}")
- asyncio.create_task(self._delete_playlist(playlistName))
- async def _delete_playlist(self, playlistName):
- """Async implementation of playlist deletion"""
- if not self.session:
- self.playlistDeleted.emit(False, "Backend not ready")
- return
- try:
- async with self.session.request(
- "DELETE",
- f"{self.base_url}/delete_playlist",
- json={"playlist_name": playlistName}
- ) as resp:
- if resp.status == 200:
- print(f"✅ Playlist deleted: {playlistName}")
- self.playlistDeleted.emit(True, f"Deleted: {playlistName}")
- else:
- response_text = await resp.text()
- print(f"❌ Failed to delete playlist: {resp.status} - {response_text}")
- self.playlistDeleted.emit(False, f"Failed: {response_text}")
- except Exception as e:
- print(f"💥 Exception deleting playlist: {e}")
- self.playlistDeleted.emit(False, str(e))
- @Slot(str, str)
- def addPatternToPlaylist(self, playlistName, patternPath):
- """Add a pattern to an existing playlist"""
- print(f"➕ Adding pattern to playlist: {patternPath} -> {playlistName}")
- asyncio.create_task(self._add_pattern_to_playlist(playlistName, patternPath))
- async def _add_pattern_to_playlist(self, playlistName, patternPath):
- """Async implementation of adding pattern to playlist"""
- if not self.session:
- self.patternAddedToPlaylist.emit(False, "Backend not ready")
- return
- try:
- async with self.session.post(
- f"{self.base_url}/add_to_playlist",
- json={"playlist_name": playlistName, "pattern": patternPath}
- ) as resp:
- if resp.status == 200:
- print(f"✅ Pattern added to {playlistName}")
- self.patternAddedToPlaylist.emit(True, f"Added to {playlistName}")
- else:
- response_text = await resp.text()
- print(f"❌ Failed to add pattern: {resp.status} - {response_text}")
- self.patternAddedToPlaylist.emit(False, f"Failed: {response_text}")
- except Exception as e:
- print(f"💥 Exception adding pattern: {e}")
- self.patternAddedToPlaylist.emit(False, str(e))
- @Slot(str, list)
- def updatePlaylistPatterns(self, playlistName, patterns):
- """Update a playlist with a new list of patterns (used for removing patterns)"""
- print(f"📝 Updating playlist patterns: {playlistName} -> {len(patterns)} patterns")
- asyncio.create_task(self._update_playlist_patterns(playlistName, patterns))
- async def _update_playlist_patterns(self, playlistName, patterns):
- """Async implementation of playlist pattern update"""
- if not self.session:
- self.playlistModified.emit(False, "Backend not ready")
- return
- try:
- async with self.session.post(
- f"{self.base_url}/modify_playlist",
- json={"playlist_name": playlistName, "files": patterns}
- ) as resp:
- if resp.status == 200:
- print(f"✅ Playlist updated: {playlistName}")
- self.playlistModified.emit(True, f"Updated: {playlistName}")
- else:
- response_text = await resp.text()
- print(f"❌ Failed to update playlist: {resp.status} - {response_text}")
- self.playlistModified.emit(False, f"Failed: {response_text}")
- except Exception as e:
- print(f"💥 Exception updating playlist: {e}")
- self.playlistModified.emit(False, str(e))
- @Slot(result=list)
- def getPlaylistNames(self):
- """Get list of all playlist names (synchronous, reads from local file)"""
- try:
- playlists_file = Path("../playlists.json")
- if playlists_file.exists():
- with open(playlists_file, 'r') as f:
- data = json.load(f)
- return sorted(list(data.keys()))
- except Exception as e:
- print(f"💥 Error reading playlists: {e}")
- return []
|