|
@@ -5,11 +5,16 @@ from PySide6.QtNetwork import QAbstractSocket
|
|
|
import aiohttp
|
|
import aiohttp
|
|
|
import asyncio
|
|
import asyncio
|
|
|
import json
|
|
import json
|
|
|
|
|
+import logging
|
|
|
import subprocess
|
|
import subprocess
|
|
|
import threading
|
|
import threading
|
|
|
import time
|
|
import time
|
|
|
from pathlib import Path
|
|
from pathlib import Path
|
|
|
import os
|
|
import os
|
|
|
|
|
+import select
|
|
|
|
|
+
|
|
|
|
|
+# Configure logging for this module
|
|
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
QML_IMPORT_NAME = "DuneWeaver"
|
|
QML_IMPORT_NAME = "DuneWeaver"
|
|
|
QML_IMPORT_MAJOR_VERSION = 1
|
|
QML_IMPORT_MAJOR_VERSION = 1
|
|
@@ -1144,158 +1149,142 @@ class Backend(QObject):
|
|
|
|
|
|
|
|
def _screen_timeout_triggered(self):
|
|
def _screen_timeout_triggered(self):
|
|
|
"""Handle screen timeout - timer has already waited the full duration"""
|
|
"""Handle screen timeout - timer has already waited the full duration"""
|
|
|
|
|
+ logger.info("🖥️ Screen timeout triggered")
|
|
|
if self._screen_on and self._screen_timeout > 0:
|
|
if self._screen_on and self._screen_timeout > 0:
|
|
|
self._turn_screen_off()
|
|
self._turn_screen_off()
|
|
|
# Add delay before starting touch monitoring to avoid catching residual events
|
|
# Add delay before starting touch monitoring to avoid catching residual events
|
|
|
|
|
+ logger.info("🖥️ Scheduling touch monitoring in 1 second")
|
|
|
QTimer.singleShot(1000, self._start_touch_monitoring)
|
|
QTimer.singleShot(1000, self._start_touch_monitoring)
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
def _start_touch_monitoring(self):
|
|
def _start_touch_monitoring(self):
|
|
|
"""Start monitoring touch input for wake-up"""
|
|
"""Start monitoring touch input for wake-up"""
|
|
|
|
|
+ logger.info("👆 _start_touch_monitoring called")
|
|
|
if self._touch_monitor_thread is None or not self._touch_monitor_thread.is_alive():
|
|
if self._touch_monitor_thread is None or not self._touch_monitor_thread.is_alive():
|
|
|
|
|
+ logger.info("👆 Creating touch monitor thread")
|
|
|
self._touch_monitor_thread = threading.Thread(target=self._monitor_touch_input, daemon=True)
|
|
self._touch_monitor_thread = threading.Thread(target=self._monitor_touch_input, daemon=True)
|
|
|
self._touch_monitor_thread.start()
|
|
self._touch_monitor_thread.start()
|
|
|
-
|
|
|
|
|
|
|
+ logger.info("👆 Touch monitor thread started")
|
|
|
|
|
+ else:
|
|
|
|
|
+ logger.info("👆 Touch monitor thread already running")
|
|
|
|
|
+
|
|
|
def _monitor_touch_input(self):
|
|
def _monitor_touch_input(self):
|
|
|
"""Monitor touch input to wake up the screen"""
|
|
"""Monitor touch input to wake up the screen"""
|
|
|
- print("👆 Starting touch monitoring for wake-up")
|
|
|
|
|
|
|
+ logger.info("👆 Touch monitoring thread started")
|
|
|
# Add delay to let any residual touch events clear
|
|
# Add delay to let any residual touch events clear
|
|
|
time.sleep(2)
|
|
time.sleep(2)
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
# Flush touch device to clear any buffered events
|
|
# Flush touch device to clear any buffered events
|
|
|
try:
|
|
try:
|
|
|
- # Find and flush touch device
|
|
|
|
|
|
|
+ import fcntl
|
|
|
for i in range(5):
|
|
for i in range(5):
|
|
|
device = f'/dev/input/event{i}'
|
|
device = f'/dev/input/event{i}'
|
|
|
if Path(device).exists():
|
|
if Path(device).exists():
|
|
|
try:
|
|
try:
|
|
|
- # Read and discard any pending events
|
|
|
|
|
with open(device, 'rb') as f:
|
|
with open(device, 'rb') as f:
|
|
|
- import fcntl
|
|
|
|
|
- import os
|
|
|
|
|
fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
|
|
fcntl.fcntl(f.fileno(), fcntl.F_SETFL, os.O_NONBLOCK)
|
|
|
while True:
|
|
while True:
|
|
|
try:
|
|
try:
|
|
|
- f.read(24) # Standard input_event size
|
|
|
|
|
|
|
+ f.read(24)
|
|
|
except:
|
|
except:
|
|
|
break
|
|
break
|
|
|
- print(f"👆 Flushed touch device: {device}")
|
|
|
|
|
|
|
+ logger.debug(f"👆 Flushed touch device: {device}")
|
|
|
break
|
|
break
|
|
|
except:
|
|
except:
|
|
|
continue
|
|
continue
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- print(f"👆 Could not flush touch device: {e}")
|
|
|
|
|
-
|
|
|
|
|
- print("👆 Touch monitoring active")
|
|
|
|
|
|
|
+ logger.warning(f"👆 Could not flush touch device: {e}")
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("👆 Touch monitoring active - waiting for touch events")
|
|
|
try:
|
|
try:
|
|
|
- # Use external touch monitor script if available - but only if not too sensitive
|
|
|
|
|
- touch_monitor_script = Path('/usr/local/bin/touch-monitor')
|
|
|
|
|
- use_script = touch_monitor_script.exists() and hasattr(self, '_use_touch_script') and self._use_touch_script
|
|
|
|
|
-
|
|
|
|
|
- if use_script:
|
|
|
|
|
- print("👆 Using touch-monitor script")
|
|
|
|
|
- # Add extra delay for script-based monitoring since it's more sensitive
|
|
|
|
|
- time.sleep(3)
|
|
|
|
|
- print("👆 Starting touch-monitor script after flush delay")
|
|
|
|
|
- process = subprocess.Popen(['sudo', '/usr/local/bin/touch-monitor'],
|
|
|
|
|
- stdout=subprocess.PIPE,
|
|
|
|
|
- stderr=subprocess.PIPE)
|
|
|
|
|
-
|
|
|
|
|
- # Wait for script to detect touch and wake screen
|
|
|
|
|
|
|
+ # Find touch input device
|
|
|
|
|
+ touch_device = None
|
|
|
|
|
+ for i in range(5):
|
|
|
|
|
+ device = f'/dev/input/event{i}'
|
|
|
|
|
+ if Path(device).exists():
|
|
|
|
|
+ try:
|
|
|
|
|
+ info = subprocess.run(['udevadm', 'info', '--query=all', f'--name={device}'],
|
|
|
|
|
+ capture_output=True, text=True, timeout=2)
|
|
|
|
|
+ if 'touch' in info.stdout.lower() or 'ft5406' in info.stdout.lower():
|
|
|
|
|
+ touch_device = device
|
|
|
|
|
+ break
|
|
|
|
|
+ except:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
|
|
+ if not touch_device:
|
|
|
|
|
+ touch_device = '/dev/input/event0'
|
|
|
|
|
+
|
|
|
|
|
+ logger.info(f"👆 Monitoring touch device: {touch_device}")
|
|
|
|
|
+
|
|
|
|
|
+ # Use evtest with select() for non-blocking I/O
|
|
|
|
|
+ evtest_available = subprocess.run(['which', 'evtest'],
|
|
|
|
|
+ capture_output=True).returncode == 0
|
|
|
|
|
+
|
|
|
|
|
+ if evtest_available:
|
|
|
|
|
+ logger.info("👆 Using evtest for touch detection")
|
|
|
|
|
+ process = subprocess.Popen(['sudo', 'evtest', touch_device],
|
|
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
|
|
+ stderr=subprocess.DEVNULL,
|
|
|
|
|
+ text=False) # Binary mode for select()
|
|
|
|
|
+
|
|
|
|
|
+ # Use select() with timeout for CPU-efficient blocking
|
|
|
while not self._screen_on:
|
|
while not self._screen_on:
|
|
|
- if process.poll() is not None: # Script exited (touch detected)
|
|
|
|
|
- print("👆 Touch detected by monitor script")
|
|
|
|
|
- self._turn_screen_on()
|
|
|
|
|
- self._reset_activity_timer()
|
|
|
|
|
|
|
+ try:
|
|
|
|
|
+ ready, _, _ = select.select([process.stdout], [], [], 1.0)
|
|
|
|
|
+ if ready:
|
|
|
|
|
+ data = process.stdout.read(1024)
|
|
|
|
|
+ if data and b'Event:' in data:
|
|
|
|
|
+ logger.info("👆 Touch detected via evtest - waking screen")
|
|
|
|
|
+ process.terminate()
|
|
|
|
|
+ self._turn_screen_on()
|
|
|
|
|
+ self._reset_activity_timer()
|
|
|
|
|
+ break
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"👆 Error in evtest select: {e}")
|
|
|
break
|
|
break
|
|
|
- time.sleep(0.1)
|
|
|
|
|
-
|
|
|
|
|
|
|
+
|
|
|
|
|
+ if process.poll() is not None:
|
|
|
|
|
+ logger.warning("👆 evtest process died unexpectedly")
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ # Cleanup
|
|
|
if process.poll() is None:
|
|
if process.poll() is None:
|
|
|
process.terminate()
|
|
process.terminate()
|
|
|
else:
|
|
else:
|
|
|
- # Fallback: Direct monitoring
|
|
|
|
|
- # Find touch input device
|
|
|
|
|
- touch_device = None
|
|
|
|
|
- for i in range(5): # Check event0 through event4
|
|
|
|
|
- device = f'/dev/input/event{i}'
|
|
|
|
|
- if Path(device).exists():
|
|
|
|
|
- # Check if it's a touch device
|
|
|
|
|
- try:
|
|
|
|
|
- info = subprocess.run(['udevadm', 'info', '--query=all', f'--name={device}'],
|
|
|
|
|
- capture_output=True, text=True, timeout=2)
|
|
|
|
|
- if 'touch' in info.stdout.lower() or 'ft5406' in info.stdout.lower():
|
|
|
|
|
- touch_device = device
|
|
|
|
|
- break
|
|
|
|
|
- except:
|
|
|
|
|
- pass
|
|
|
|
|
-
|
|
|
|
|
- if not touch_device:
|
|
|
|
|
- touch_device = '/dev/input/event0' # Default fallback
|
|
|
|
|
-
|
|
|
|
|
- print(f"👆 Monitoring touch device: {touch_device}")
|
|
|
|
|
-
|
|
|
|
|
- # Try evtest first (more responsive to single taps)
|
|
|
|
|
- evtest_available = subprocess.run(['which', 'evtest'],
|
|
|
|
|
- capture_output=True).returncode == 0
|
|
|
|
|
-
|
|
|
|
|
- if evtest_available:
|
|
|
|
|
- # Use evtest which is more sensitive to single touches
|
|
|
|
|
- print("👆 Using evtest for touch detection")
|
|
|
|
|
- process = subprocess.Popen(['sudo', 'evtest', touch_device],
|
|
|
|
|
- stdout=subprocess.PIPE,
|
|
|
|
|
- stderr=subprocess.DEVNULL,
|
|
|
|
|
- text=True)
|
|
|
|
|
-
|
|
|
|
|
- # Wait for any event line
|
|
|
|
|
- while not self._screen_on:
|
|
|
|
|
- try:
|
|
|
|
|
- line = process.stdout.readline()
|
|
|
|
|
- if line and 'Event:' in line:
|
|
|
|
|
- print("👆 Touch detected via evtest - waking screen")
|
|
|
|
|
|
|
+ # Fallback: Use cat with select()
|
|
|
|
|
+ logger.info("👆 Using cat for touch detection (evtest not available)")
|
|
|
|
|
+ process = subprocess.Popen(['sudo', 'cat', touch_device],
|
|
|
|
|
+ stdout=subprocess.PIPE,
|
|
|
|
|
+ stderr=subprocess.DEVNULL)
|
|
|
|
|
+
|
|
|
|
|
+ while not self._screen_on:
|
|
|
|
|
+ try:
|
|
|
|
|
+ ready, _, _ = select.select([process.stdout], [], [], 1.0)
|
|
|
|
|
+ if ready:
|
|
|
|
|
+ data = process.stdout.read(1)
|
|
|
|
|
+ if data:
|
|
|
|
|
+ logger.info("👆 Touch detected via cat - waking screen")
|
|
|
process.terminate()
|
|
process.terminate()
|
|
|
self._turn_screen_on()
|
|
self._turn_screen_on()
|
|
|
self._reset_activity_timer()
|
|
self._reset_activity_timer()
|
|
|
break
|
|
break
|
|
|
- except:
|
|
|
|
|
- pass
|
|
|
|
|
-
|
|
|
|
|
- if process.poll() is not None:
|
|
|
|
|
- break
|
|
|
|
|
- time.sleep(0.01) # Small sleep to prevent CPU spinning
|
|
|
|
|
- else:
|
|
|
|
|
- # Fallback: Use cat with single byte read (more responsive)
|
|
|
|
|
- print("👆 Using cat for touch detection")
|
|
|
|
|
- process = subprocess.Popen(['sudo', 'cat', touch_device],
|
|
|
|
|
- stdout=subprocess.PIPE,
|
|
|
|
|
- stderr=subprocess.DEVNULL)
|
|
|
|
|
-
|
|
|
|
|
- # Wait for any data (even 1 byte indicates touch)
|
|
|
|
|
- while not self._screen_on:
|
|
|
|
|
- try:
|
|
|
|
|
- # Non-blocking check for data
|
|
|
|
|
- import select
|
|
|
|
|
- ready, _, _ = select.select([process.stdout], [], [], 0.1)
|
|
|
|
|
- if ready:
|
|
|
|
|
- data = process.stdout.read(1) # Read just 1 byte
|
|
|
|
|
- if data:
|
|
|
|
|
- print("👆 Touch detected - waking screen")
|
|
|
|
|
- process.terminate()
|
|
|
|
|
- self._turn_screen_on()
|
|
|
|
|
- self._reset_activity_timer()
|
|
|
|
|
- break
|
|
|
|
|
- except:
|
|
|
|
|
- pass
|
|
|
|
|
-
|
|
|
|
|
- # Check if screen was turned on by other means
|
|
|
|
|
- if self._screen_on:
|
|
|
|
|
- process.terminate()
|
|
|
|
|
- break
|
|
|
|
|
-
|
|
|
|
|
- time.sleep(0.1)
|
|
|
|
|
-
|
|
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"👆 Error in cat select: {e}")
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ if process.poll() is not None:
|
|
|
|
|
+ logger.warning("👆 cat process died unexpectedly")
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ # Cleanup
|
|
|
|
|
+ if process.poll() is None:
|
|
|
|
|
+ process.terminate()
|
|
|
|
|
+
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
- print(f"❌ Error monitoring touch input: {e}")
|
|
|
|
|
|
|
+ logger.error(f"❌ Error monitoring touch input: {e}")
|
|
|
|
|
+ import traceback
|
|
|
|
|
+ logger.error(traceback.format_exc())
|
|
|
|
|
|
|
|
- print("👆 Touch monitoring stopped")
|
|
|
|
|
|
|
+ logger.info("👆 Touch monitoring stopped")
|
|
|
|
|
|
|
|
# ==================== LED Control Methods ====================
|
|
# ==================== LED Control Methods ====================
|
|
|
|
|
|