|
@@ -5,16 +5,11 @@ from PySide6.QtNetwork import QAbstractSocket
|
|
|
import aiohttp
|
|
import aiohttp
|
|
|
import asyncio
|
|
import asyncio
|
|
|
import json
|
|
import json
|
|
|
-import logging
|
|
|
|
|
import subprocess
|
|
import subprocess
|
|
|
import threading
|
|
import threading
|
|
|
import time
|
|
import time
|
|
|
from pathlib import Path
|
|
from pathlib import Path
|
|
|
import os
|
|
import os
|
|
|
-import select
|
|
|
|
|
-
|
|
|
|
|
-# Configure logging for this module
|
|
|
|
|
-logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
QML_IMPORT_NAME = "DuneWeaver"
|
|
QML_IMPORT_NAME = "DuneWeaver"
|
|
|
QML_IMPORT_MAJOR_VERSION = 1
|
|
QML_IMPORT_MAJOR_VERSION = 1
|
|
@@ -109,9 +104,6 @@ class Backend(QObject):
|
|
|
self._backend_connected = False
|
|
self._backend_connected = False
|
|
|
self._reconnect_status = "Connecting to backend..."
|
|
self._reconnect_status = "Connecting to backend..."
|
|
|
|
|
|
|
|
- # Preview path cache to avoid repeated filesystem lookups
|
|
|
|
|
- self._preview_cache = {} # filename -> preview_path
|
|
|
|
|
-
|
|
|
|
|
# LED control state
|
|
# LED control state
|
|
|
self._led_provider = "none" # "none", "wled", or "dw_leds"
|
|
self._led_provider = "none" # "none", "wled", or "dw_leds"
|
|
|
self._led_connected = False
|
|
self._led_connected = False
|
|
@@ -146,13 +138,11 @@ class Backend(QObject):
|
|
|
self._last_screen_change = 0 # Track last state change time
|
|
self._last_screen_change = 0 # Track last state change time
|
|
|
self._use_touch_script = False # Disable external touch-monitor script (too sensitive)
|
|
self._use_touch_script = False # Disable external touch-monitor script (too sensitive)
|
|
|
self._screen_timer = QTimer()
|
|
self._screen_timer = QTimer()
|
|
|
- self._screen_timer.setSingleShot(True) # Event-driven, fires only once per timeout
|
|
|
|
|
- self._screen_timer.timeout.connect(self._screen_timeout_triggered)
|
|
|
|
|
|
|
+ self._screen_timer.timeout.connect(self._check_screen_timeout)
|
|
|
|
|
+ self._screen_timer.start(1000) # Check every second
|
|
|
# Load local settings first
|
|
# Load local settings first
|
|
|
self._load_local_settings()
|
|
self._load_local_settings()
|
|
|
- # Start the initial timeout timer if enabled
|
|
|
|
|
- if self._screen_timeout > 0:
|
|
|
|
|
- self._screen_timer.start(self._screen_timeout * 1000)
|
|
|
|
|
|
|
+ print(f"🖥️ Screen management initialized: timeout={self._screen_timeout}s, timer started")
|
|
|
|
|
|
|
|
# HTTP session - initialize lazily
|
|
# HTTP session - initialize lazily
|
|
|
self.session = None
|
|
self.session = None
|
|
@@ -326,8 +316,10 @@ class Backend(QObject):
|
|
|
|
|
|
|
|
# Detect pattern change and emit executionStarted signal
|
|
# Detect pattern change and emit executionStarted signal
|
|
|
if new_file and new_file != self._current_file:
|
|
if new_file and new_file != self._current_file:
|
|
|
|
|
+ print(f"🎯 Pattern changed from '{self._current_file}' to '{new_file}'")
|
|
|
# Find preview for the new pattern
|
|
# Find preview for the new pattern
|
|
|
preview_path = self._find_pattern_preview(new_file)
|
|
preview_path = self._find_pattern_preview(new_file)
|
|
|
|
|
+ print(f"🖼️ Preview path for new pattern: {preview_path}")
|
|
|
# Emit signal so UI can update
|
|
# Emit signal so UI can update
|
|
|
self.executionStarted.emit(new_file, preview_path)
|
|
self.executionStarted.emit(new_file, preview_path)
|
|
|
|
|
|
|
@@ -337,12 +329,14 @@ class Backend(QObject):
|
|
|
# Handle pause state from WebSocket
|
|
# Handle pause state from WebSocket
|
|
|
new_paused = status.get("is_paused", False)
|
|
new_paused = status.get("is_paused", False)
|
|
|
if new_paused != self._is_paused:
|
|
if new_paused != self._is_paused:
|
|
|
|
|
+ print(f"⏸️ Pause state changed: {self._is_paused} -> {new_paused}")
|
|
|
self._is_paused = new_paused
|
|
self._is_paused = new_paused
|
|
|
self.pausedChanged.emit(new_paused)
|
|
self.pausedChanged.emit(new_paused)
|
|
|
|
|
|
|
|
# Handle serial connection status from WebSocket
|
|
# Handle serial connection status from WebSocket
|
|
|
ws_connection_status = status.get("connection_status", False)
|
|
ws_connection_status = status.get("connection_status", False)
|
|
|
if ws_connection_status != self._serial_connected:
|
|
if ws_connection_status != self._serial_connected:
|
|
|
|
|
+ print(f"🔌 WebSocket serial connection status changed: {ws_connection_status}")
|
|
|
self._serial_connected = ws_connection_status
|
|
self._serial_connected = ws_connection_status
|
|
|
self.serialConnectionChanged.emit(ws_connection_status)
|
|
self.serialConnectionChanged.emit(ws_connection_status)
|
|
|
|
|
|
|
@@ -357,6 +351,7 @@ class Backend(QObject):
|
|
|
# Handle speed updates from WebSocket
|
|
# Handle speed updates from WebSocket
|
|
|
ws_speed = status.get("speed", None)
|
|
ws_speed = status.get("speed", None)
|
|
|
if ws_speed and ws_speed != self._current_speed:
|
|
if ws_speed and ws_speed != self._current_speed:
|
|
|
|
|
+ print(f"⚡ WebSocket speed changed: {ws_speed}")
|
|
|
self._current_speed = ws_speed
|
|
self._current_speed = ws_speed
|
|
|
self.speedChanged.emit(ws_speed)
|
|
self.speedChanged.emit(ws_speed)
|
|
|
|
|
|
|
@@ -431,14 +426,11 @@ class Backend(QObject):
|
|
|
self.errorOccurred.emit(str(e))
|
|
self.errorOccurred.emit(str(e))
|
|
|
|
|
|
|
|
def _find_pattern_preview(self, fileName):
|
|
def _find_pattern_preview(self, fileName):
|
|
|
- """Find the preview image for a pattern (with caching)"""
|
|
|
|
|
- # Check cache first
|
|
|
|
|
- if fileName in self._preview_cache:
|
|
|
|
|
- return self._preview_cache[fileName]
|
|
|
|
|
-
|
|
|
|
|
|
|
+ """Find the preview image for a pattern"""
|
|
|
try:
|
|
try:
|
|
|
# Extract just the filename from the path (remove any directory prefixes)
|
|
# Extract just the filename from the path (remove any directory prefixes)
|
|
|
clean_filename = fileName.split('/')[-1] # Get last part of path
|
|
clean_filename = fileName.split('/')[-1] # Get last part of path
|
|
|
|
|
+ print(f"🔍 Original fileName: {fileName}, clean filename: {clean_filename}")
|
|
|
|
|
|
|
|
# Check multiple possible locations for patterns directory
|
|
# Check multiple possible locations for patterns directory
|
|
|
# Use relative paths that work across different environments
|
|
# Use relative paths that work across different environments
|
|
@@ -451,6 +443,8 @@ class Backend(QObject):
|
|
|
for patterns_dir in possible_dirs:
|
|
for patterns_dir in possible_dirs:
|
|
|
cache_dir = patterns_dir / "cached_images"
|
|
cache_dir = patterns_dir / "cached_images"
|
|
|
if cache_dir.exists():
|
|
if cache_dir.exists():
|
|
|
|
|
+ print(f"🔍 Searching for preview in cache directory: {cache_dir}")
|
|
|
|
|
+
|
|
|
# Extensions to try - PNG first for better kiosk compatibility
|
|
# Extensions to try - PNG first for better kiosk compatibility
|
|
|
extensions = [".png", ".webp", ".jpg", ".jpeg"]
|
|
extensions = [".png", ".webp", ".jpg", ".jpeg"]
|
|
|
|
|
|
|
@@ -463,11 +457,11 @@ class Backend(QObject):
|
|
|
for ext in extensions:
|
|
for ext in extensions:
|
|
|
preview_file = cache_dir / (filename + ext)
|
|
preview_file = cache_dir / (filename + ext)
|
|
|
if preview_file.exists():
|
|
if preview_file.exists():
|
|
|
- preview_path = str(preview_file.absolute())
|
|
|
|
|
- self._preview_cache[fileName] = preview_path
|
|
|
|
|
- return preview_path
|
|
|
|
|
|
|
+ print(f"✅ Found preview (direct): {preview_file}")
|
|
|
|
|
+ return str(preview_file.absolute())
|
|
|
|
|
|
|
|
# If not found directly, search recursively through subdirectories
|
|
# If not found directly, search recursively through subdirectories
|
|
|
|
|
+ print(f"🔍 Searching recursively in {cache_dir}...")
|
|
|
for filename in filenames_to_try:
|
|
for filename in filenames_to_try:
|
|
|
for ext in extensions:
|
|
for ext in extensions:
|
|
|
target_name = filename + ext
|
|
target_name = filename + ext
|
|
@@ -475,17 +469,15 @@ class Backend(QObject):
|
|
|
matches = list(cache_dir.rglob(target_name))
|
|
matches = list(cache_dir.rglob(target_name))
|
|
|
if matches:
|
|
if matches:
|
|
|
# Return the first match found
|
|
# Return the first match found
|
|
|
- preview_path = str(matches[0].absolute())
|
|
|
|
|
- self._preview_cache[fileName] = preview_path
|
|
|
|
|
- return preview_path
|
|
|
|
|
|
|
+ preview_file = matches[0]
|
|
|
|
|
+ print(f"✅ Found preview (recursive): {preview_file}")
|
|
|
|
|
+ return str(preview_file.absolute())
|
|
|
|
|
|
|
|
|
|
+ print("❌ No preview image found")
|
|
|
return ""
|
|
return ""
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
|
|
+ print(f"💥 Exception finding preview: {e}")
|
|
|
return ""
|
|
return ""
|
|
|
-
|
|
|
|
|
- def _clear_preview_cache(self):
|
|
|
|
|
- """Clear the preview path cache (call when patterns might change)"""
|
|
|
|
|
- self._preview_cache = {}
|
|
|
|
|
|
|
|
|
|
@Slot()
|
|
@Slot()
|
|
|
def stopExecution(self):
|
|
def stopExecution(self):
|
|
@@ -1136,155 +1128,175 @@ class Backend(QObject):
|
|
|
traceback.print_exc()
|
|
traceback.print_exc()
|
|
|
|
|
|
|
|
def _reset_activity_timer(self):
|
|
def _reset_activity_timer(self):
|
|
|
- """Reset the screen timeout timer (event-driven approach)"""
|
|
|
|
|
- current_time = time.time()
|
|
|
|
|
- # Throttle: only reset if more than 100ms since last reset
|
|
|
|
|
- if current_time - self._last_activity < 0.1:
|
|
|
|
|
- return
|
|
|
|
|
- self._last_activity = current_time
|
|
|
|
|
- # Stop existing timer and restart with full timeout duration
|
|
|
|
|
- self._screen_timer.stop()
|
|
|
|
|
- if self._screen_timeout > 0 and self._screen_on:
|
|
|
|
|
- self._screen_timer.start(self._screen_timeout * 1000)
|
|
|
|
|
-
|
|
|
|
|
- def _screen_timeout_triggered(self):
|
|
|
|
|
- """Handle screen timeout - timer has already waited the full duration"""
|
|
|
|
|
- logger.info("🖥️ Screen timeout triggered")
|
|
|
|
|
- if self._screen_on and self._screen_timeout > 0:
|
|
|
|
|
- self._turn_screen_off()
|
|
|
|
|
- # Add delay before starting touch monitoring to avoid catching residual events
|
|
|
|
|
- logger.info("🖥️ Scheduling touch monitoring in 1 second")
|
|
|
|
|
- QTimer.singleShot(1000, self._start_touch_monitoring)
|
|
|
|
|
-
|
|
|
|
|
|
|
+ """Reset the last activity timestamp"""
|
|
|
|
|
+ old_time = self._last_activity
|
|
|
|
|
+ self._last_activity = time.time()
|
|
|
|
|
+ time_since_last = self._last_activity - old_time
|
|
|
|
|
+ if time_since_last > 1: # Only log if it's been more than 1 second
|
|
|
|
|
+ print(f"🖥️ Activity detected - timer reset (was idle for {time_since_last:.1f}s)")
|
|
|
|
|
+
|
|
|
|
|
+ def _check_screen_timeout(self):
|
|
|
|
|
+ """Check if screen should be turned off due to inactivity"""
|
|
|
|
|
+ if self._screen_on and self._screen_timeout > 0: # Only check if timeout is enabled
|
|
|
|
|
+ idle_time = time.time() - self._last_activity
|
|
|
|
|
+ # Log every 10 seconds when getting close to timeout
|
|
|
|
|
+ if idle_time > self._screen_timeout - 10 and idle_time % 10 < 1:
|
|
|
|
|
+ print(f"🖥️ Screen idle for {idle_time:.0f}s (timeout at {self._screen_timeout}s)")
|
|
|
|
|
+
|
|
|
|
|
+ if idle_time > self._screen_timeout:
|
|
|
|
|
+ print(f"🖥️ Screen timeout reached! Idle for {idle_time:.0f}s (timeout: {self._screen_timeout}s)")
|
|
|
|
|
+ self._turn_screen_off()
|
|
|
|
|
+ # Add delay before starting touch monitoring to avoid catching residual events
|
|
|
|
|
+ QTimer.singleShot(1000, self._start_touch_monitoring) # 1 second delay
|
|
|
|
|
+ # If timeout is 0 (Never), screen stays on indefinitely
|
|
|
|
|
+
|
|
|
def _start_touch_monitoring(self):
|
|
def _start_touch_monitoring(self):
|
|
|
"""Start monitoring touch input for wake-up"""
|
|
"""Start monitoring touch input for wake-up"""
|
|
|
- logger.info("👆 _start_touch_monitoring called")
|
|
|
|
|
if self._touch_monitor_thread is None or not self._touch_monitor_thread.is_alive():
|
|
if self._touch_monitor_thread is None or not self._touch_monitor_thread.is_alive():
|
|
|
- logger.info("👆 Creating touch monitor thread")
|
|
|
|
|
self._touch_monitor_thread = threading.Thread(target=self._monitor_touch_input, daemon=True)
|
|
self._touch_monitor_thread = threading.Thread(target=self._monitor_touch_input, daemon=True)
|
|
|
self._touch_monitor_thread.start()
|
|
self._touch_monitor_thread.start()
|
|
|
- logger.info("👆 Touch monitor thread started")
|
|
|
|
|
- else:
|
|
|
|
|
- logger.info("👆 Touch monitor thread already running")
|
|
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def _monitor_touch_input(self):
|
|
def _monitor_touch_input(self):
|
|
|
"""Monitor touch input to wake up the screen"""
|
|
"""Monitor touch input to wake up the screen"""
|
|
|
- logger.info("👆 Touch monitoring thread started")
|
|
|
|
|
|
|
+ print("👆 Starting touch monitoring for wake-up")
|
|
|
# Add delay to let any residual touch events clear
|
|
# Add delay to let any residual touch events clear
|
|
|
time.sleep(2)
|
|
time.sleep(2)
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# Flush touch device to clear any buffered events
|
|
# Flush touch device to clear any buffered events
|
|
|
try:
|
|
try:
|
|
|
- import fcntl
|
|
|
|
|
|
|
+ # Find and flush touch device
|
|
|
for i in range(5):
|
|
for i in range(5):
|
|
|
device = f'/dev/input/event{i}'
|
|
device = f'/dev/input/event{i}'
|
|
|
if Path(device).exists():
|
|
if Path(device).exists():
|
|
|
try:
|
|
try:
|
|
|
|
|
+ # Read and discard any pending events
|
|
|
with open(device, 'rb') as f:
|
|
with open(device, 'rb') as f:
|
|
|
|
|
+ import fcntl
|
|
|
|
|
+ import os
|
|
|
fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
|
|
fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
|
|
|
while True:
|
|
while True:
|
|
|
try:
|
|
try:
|
|
|
- f.read(24)
|
|
|
|
|
|
|
+ f.read(24) # Standard input_event size
|
|
|
except:
|
|
except:
|
|
|
break
|
|
break
|
|
|
- logger.debug(f"👆 Flushed touch device: {device}")
|
|
|
|
|
|
|
+ print(f"👆 Flushed touch device: {device}")
|
|
|
break
|
|
break
|
|
|
except:
|
|
except:
|
|
|
continue
|
|
continue
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- logger.warning(f"👆 Could not flush touch device: {e}")
|
|
|
|
|
-
|
|
|
|
|
- logger.info("👆 Touch monitoring active - waiting for touch events")
|
|
|
|
|
|
|
+ print(f"👆 Could not flush touch device: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ print("👆 Touch monitoring active")
|
|
|
try:
|
|
try:
|
|
|
- # Find touch input device
|
|
|
|
|
- touch_device = None
|
|
|
|
|
- for i in range(5):
|
|
|
|
|
- device = f'/dev/input/event{i}'
|
|
|
|
|
- if Path(device).exists():
|
|
|
|
|
- try:
|
|
|
|
|
- info = subprocess.run(['udevadm', 'info', '--query=all', f'--name={device}'],
|
|
|
|
|
- capture_output=True, text=True, timeout=2)
|
|
|
|
|
- if 'touch' in info.stdout.lower() or 'ft5406' in info.stdout.lower():
|
|
|
|
|
- touch_device = device
|
|
|
|
|
- break
|
|
|
|
|
- except:
|
|
|
|
|
- pass
|
|
|
|
|
-
|
|
|
|
|
- if not touch_device:
|
|
|
|
|
- touch_device = '/dev/input/event0'
|
|
|
|
|
-
|
|
|
|
|
- logger.info(f"👆 Monitoring touch device: {touch_device}")
|
|
|
|
|
-
|
|
|
|
|
- # Use evtest with select() for non-blocking I/O
|
|
|
|
|
- evtest_available = subprocess.run(['which', 'evtest'],
|
|
|
|
|
- capture_output=True).returncode == 0
|
|
|
|
|
-
|
|
|
|
|
- if evtest_available:
|
|
|
|
|
- logger.info("👆 Using evtest for touch detection")
|
|
|
|
|
- process = subprocess.Popen(['sudo', 'evtest', touch_device],
|
|
|
|
|
- stdout=subprocess.PIPE,
|
|
|
|
|
- stderr=subprocess.DEVNULL,
|
|
|
|
|
- text=False) # Binary mode for select()
|
|
|
|
|
-
|
|
|
|
|
- # Use select() with timeout for CPU-efficient blocking
|
|
|
|
|
|
|
+ # Use external touch monitor script if available - but only if not too sensitive
|
|
|
|
|
+ touch_monitor_script = Path('/usr/local/bin/touch-monitor')
|
|
|
|
|
+ use_script = touch_monitor_script.exists() and hasattr(self, '_use_touch_script') and self._use_touch_script
|
|
|
|
|
+
|
|
|
|
|
+ if use_script:
|
|
|
|
|
+ print("👆 Using touch-monitor script")
|
|
|
|
|
+ # Add extra delay for script-based monitoring since it's more sensitive
|
|
|
|
|
+ time.sleep(3)
|
|
|
|
|
+ print("👆 Starting touch-monitor script after flush delay")
|
|
|
|
|
+ process = subprocess.Popen(['sudo', '/usr/local/bin/touch-monitor'],
|
|
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
|
|
+ stderr=subprocess.PIPE)
|
|
|
|
|
+
|
|
|
|
|
+ # Wait for script to detect touch and wake screen
|
|
|
while not self._screen_on:
|
|
while not self._screen_on:
|
|
|
- try:
|
|
|
|
|
- ready, _, _ = select.select([process.stdout], [], [], 1.0)
|
|
|
|
|
- if ready:
|
|
|
|
|
- data = process.stdout.read(1024)
|
|
|
|
|
- if data and b'Event:' in data:
|
|
|
|
|
- logger.info("👆 Touch detected via evtest - waking screen")
|
|
|
|
|
- process.terminate()
|
|
|
|
|
- self._turn_screen_on()
|
|
|
|
|
- self._reset_activity_timer()
|
|
|
|
|
- break
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"👆 Error in evtest select: {e}")
|
|
|
|
|
|
|
+ if process.poll() is not None: # Script exited (touch detected)
|
|
|
|
|
+ print("👆 Touch detected by monitor script")
|
|
|
|
|
+ self._turn_screen_on()
|
|
|
|
|
+ self._reset_activity_timer()
|
|
|
break
|
|
break
|
|
|
-
|
|
|
|
|
- if process.poll() is not None:
|
|
|
|
|
- logger.warning("👆 evtest process died unexpectedly")
|
|
|
|
|
- break
|
|
|
|
|
-
|
|
|
|
|
- # Cleanup
|
|
|
|
|
|
|
+ time.sleep(0.1)
|
|
|
|
|
+
|
|
|
if process.poll() is None:
|
|
if process.poll() is None:
|
|
|
process.terminate()
|
|
process.terminate()
|
|
|
else:
|
|
else:
|
|
|
- # Fallback: Use cat with select()
|
|
|
|
|
- logger.info("👆 Using cat for touch detection (evtest not available)")
|
|
|
|
|
- process = subprocess.Popen(['sudo', 'cat', touch_device],
|
|
|
|
|
- stdout=subprocess.PIPE,
|
|
|
|
|
- stderr=subprocess.DEVNULL)
|
|
|
|
|
-
|
|
|
|
|
- while not self._screen_on:
|
|
|
|
|
- try:
|
|
|
|
|
- ready, _, _ = select.select([process.stdout], [], [], 1.0)
|
|
|
|
|
- if ready:
|
|
|
|
|
- data = process.stdout.read(1)
|
|
|
|
|
- if data:
|
|
|
|
|
- logger.info("👆 Touch detected via cat - waking screen")
|
|
|
|
|
|
|
+ # Fallback: Direct monitoring
|
|
|
|
|
+ # Find touch input device
|
|
|
|
|
+ touch_device = None
|
|
|
|
|
+ for i in range(5): # Check event0 through event4
|
|
|
|
|
+ device = f'/dev/input/event{i}'
|
|
|
|
|
+ if Path(device).exists():
|
|
|
|
|
+ # Check if it's a touch device
|
|
|
|
|
+ try:
|
|
|
|
|
+ info = subprocess.run(['udevadm', 'info', '--query=all', f'--name={device}'],
|
|
|
|
|
+ capture_output=True, text=True, timeout=2)
|
|
|
|
|
+ if 'touch' in info.stdout.lower() or 'ft5406' in info.stdout.lower():
|
|
|
|
|
+ touch_device = device
|
|
|
|
|
+ break
|
|
|
|
|
+ except:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ if not touch_device:
|
|
|
|
|
+ touch_device = '/dev/input/event0' # Default fallback
|
|
|
|
|
+
|
|
|
|
|
+ print(f"👆 Monitoring touch device: {touch_device}")
|
|
|
|
|
+
|
|
|
|
|
+ # Try evtest first (more responsive to single taps)
|
|
|
|
|
+ evtest_available = subprocess.run(['which', 'evtest'],
|
|
|
|
|
+ capture_output=True).returncode == 0
|
|
|
|
|
+
|
|
|
|
|
+ if evtest_available:
|
|
|
|
|
+ # Use evtest which is more sensitive to single touches
|
|
|
|
|
+ print("👆 Using evtest for touch detection")
|
|
|
|
|
+ process = subprocess.Popen(['sudo', 'evtest', touch_device],
|
|
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
|
|
+ stderr=subprocess.DEVNULL,
|
|
|
|
|
+ text=True)
|
|
|
|
|
+
|
|
|
|
|
+ # Wait for any event line
|
|
|
|
|
+ while not self._screen_on:
|
|
|
|
|
+ try:
|
|
|
|
|
+ line = process.stdout.readline()
|
|
|
|
|
+ if line and 'Event:' in line:
|
|
|
|
|
+ print("👆 Touch detected via evtest - waking screen")
|
|
|
process.terminate()
|
|
process.terminate()
|
|
|
self._turn_screen_on()
|
|
self._turn_screen_on()
|
|
|
self._reset_activity_timer()
|
|
self._reset_activity_timer()
|
|
|
break
|
|
break
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"👆 Error in cat select: {e}")
|
|
|
|
|
- break
|
|
|
|
|
-
|
|
|
|
|
- if process.poll() is not None:
|
|
|
|
|
- logger.warning("👆 cat process died unexpectedly")
|
|
|
|
|
- break
|
|
|
|
|
-
|
|
|
|
|
- # Cleanup
|
|
|
|
|
- if process.poll() is None:
|
|
|
|
|
- process.terminate()
|
|
|
|
|
-
|
|
|
|
|
|
|
+ except:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ if process.poll() is not None:
|
|
|
|
|
+ break
|
|
|
|
|
+ time.sleep(0.01) # Small sleep to prevent CPU spinning
|
|
|
|
|
+ else:
|
|
|
|
|
+ # Fallback: Use cat with single byte read (more responsive)
|
|
|
|
|
+ print("👆 Using cat for touch detection")
|
|
|
|
|
+ process = subprocess.Popen(['sudo', 'cat', touch_device],
|
|
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
|
|
+ stderr=subprocess.DEVNULL)
|
|
|
|
|
+
|
|
|
|
|
+ # Wait for any data (even 1 byte indicates touch)
|
|
|
|
|
+ while not self._screen_on:
|
|
|
|
|
+ try:
|
|
|
|
|
+ # Non-blocking check for data
|
|
|
|
|
+ import select
|
|
|
|
|
+ ready, _, _ = select.select([process.stdout], [], [], 0.1)
|
|
|
|
|
+ if ready:
|
|
|
|
|
+ data = process.stdout.read(1) # Read just 1 byte
|
|
|
|
|
+ if data:
|
|
|
|
|
+ print("👆 Touch detected - waking screen")
|
|
|
|
|
+ process.terminate()
|
|
|
|
|
+ self._turn_screen_on()
|
|
|
|
|
+ self._reset_activity_timer()
|
|
|
|
|
+ break
|
|
|
|
|
+ except:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ # Check if screen was turned on by other means
|
|
|
|
|
+ if self._screen_on:
|
|
|
|
|
+ process.terminate()
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ time.sleep(0.1)
|
|
|
|
|
+
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- logger.error(f"❌ Error monitoring touch input: {e}")
|
|
|
|
|
- import traceback
|
|
|
|
|
- logger.error(traceback.format_exc())
|
|
|
|
|
|
|
+ print(f"❌ Error monitoring touch input: {e}")
|
|
|
|
|
|
|
|
- logger.info("👆 Touch monitoring stopped")
|
|
|
|
|
|
|
+ print("👆 Touch monitoring stopped")
|
|
|
|
|
|
|
|
# ==================== LED Control Methods ====================
|
|
# ==================== LED Control Methods ====================
|
|
|
|
|
|