Browse Source

Revert process isolation and real-time scheduling (commit 5a8643c)

Remove ProcessPoolExecutor and real-time scheduling features that were
causing issues on some systems:

- Remove modules/core/process_pool.py
- Remove modules/core/scheduling.py
- Revert cache_manager.py to use asyncio.to_thread
- Revert preview.py to use asyncio.to_thread
- Remove CPU pinning and priority elevation from motion/LED threads

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
tuanchris 1 tuần trước cách đây
mục cha
commit
be0690acc4

+ 0 - 19
main.py

@@ -33,7 +33,6 @@ import time
 import argparse
 import subprocess
 import platform
-from modules.core import process_pool as pool_module
 
 # Get log level from environment variable, default to INFO
 log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper()
@@ -102,18 +101,6 @@ async def lifespan(app: FastAPI):
     signal.signal(signal.SIGINT, signal_handler)
     signal.signal(signal.SIGTERM, signal_handler)
 
-    # Initialize shared process pool for CPU-intensive tasks
-    pool_module.init_pool()
-
-    # Pin main process to CPUs 1-N to keep CPU 0 dedicated to motion/LED
-    from modules.core import scheduling
-    background_cpus = scheduling.get_background_cpus()
-    if background_cpus:
-        scheduling.pin_to_cpus(background_cpus)
-        logger.info(f"FastAPI main process pinned to CPUs {sorted(background_cpus)}")
-    else:
-        logger.info("Single-core system detected, skipping CPU pinning")
-
     # Connect device in background so the web server starts immediately
     async def connect_and_home():
         """Connect to device and perform homing in background."""
@@ -281,9 +268,6 @@ async def lifespan(app: FastAPI):
     # Shutdown
     logger.info("Shutting down Dune Weaver application...")
 
-    # Shutdown process pool
-    pool_module.shutdown_pool(wait=True)
-
 app = FastAPI(lifespan=lifespan)
 
 # Add CORS middleware to allow cross-origin requests from other Dune Weaver frontends
@@ -3591,9 +3575,6 @@ def signal_handler(signum, frame):
         if state.led_controller:
             state.led_controller.set_power(0)
 
-        # Shutdown process pool - wait=True allows workers to release semaphores properly
-        pool_module.shutdown_pool(wait=True)
-
         # Stop pattern manager motion controller
         pattern_manager.motion_controller.stop()
 

+ 3 - 29
modules/core/cache_manager.py

@@ -5,7 +5,6 @@ import asyncio
 import logging
 from pathlib import Path
 from modules.core.pattern_manager import list_theta_rho_files, THETA_RHO_DIR, parse_theta_rho_file
-from modules.core.process_pool import get_pool as _get_process_pool
 
 logger = logging.getLogger(__name__)
 
@@ -468,17 +467,7 @@ async def generate_image_preview(pattern_file):
             pattern_path = os.path.join(THETA_RHO_DIR, pattern_file)
 
             try:
-                # Use process pool to avoid GIL contention with motion thread
-                # Add timeout protection to prevent hanging on problematic files
-                loop = asyncio.get_running_loop()
-                coordinates = await asyncio.wait_for(
-                    loop.run_in_executor(
-                        _get_process_pool(),
-                        parse_theta_rho_file,
-                        pattern_path
-                    ),
-                    timeout=30.0  # 30 second timeout per file
-                )
+                coordinates = await asyncio.to_thread(parse_theta_rho_file, pattern_path)
 
                 if coordinates:
                     first_coord = {"x": coordinates[0][0], "y": coordinates[0][1]}
@@ -490,8 +479,6 @@ async def generate_image_preview(pattern_file):
                     logger.debug(f"Metadata cached for {pattern_file}: {total_coords} coordinates")
                 else:
                     logger.warning(f"No coordinates found in {pattern_file}")
-            except asyncio.TimeoutError:
-                logger.error(f"Timeout parsing {pattern_file} for metadata - skipping")
             except Exception as e:
                 logger.error(f"Failed to parse {pattern_file} for metadata: {str(e)}")
                 # Continue with image generation even if metadata fails
@@ -650,21 +637,8 @@ async def generate_metadata_cache():
                 cache_progress["current_file"] = file_name
                 
                 try:
-                    # Parse file in separate process to avoid GIL contention with motion thread
-                    # Add timeout protection to prevent hanging on problematic files
-                    try:
-                        loop = asyncio.get_running_loop()
-                        coordinates = await asyncio.wait_for(
-                            loop.run_in_executor(
-                                _get_process_pool(),
-                                parse_theta_rho_file,
-                                pattern_path
-                            ),
-                            timeout=30.0  # 30 second timeout per file
-                        )
-                    except asyncio.TimeoutError:
-                        logger.error(f"Timeout parsing {file_name} - skipping (file may be too large or corrupted)")
-                        continue
+                    # Parse file to get metadata
+                    coordinates = await asyncio.to_thread(parse_theta_rho_file, pattern_path)
 
                     if coordinates:
                         first_coord = {"x": coordinates[0][0], "y": coordinates[0][1]}

+ 0 - 5
modules/core/pattern_manager.py

@@ -407,11 +407,6 @@ class MotionControlThread:
 
     def _motion_loop(self):
         """Main loop for the motion control thread."""
-        # Setup realtime priority from within thread to avoid native_id race
-        # Motion uses higher priority (60) than LED (40) for CNC reliability
-        from modules.core import scheduling
-        scheduling.setup_realtime_thread(priority=60)
-        
         logger.info("Motion control thread loop started")
 
         while self.running:

+ 5 - 32
modules/core/preview.py

@@ -1,29 +1,15 @@
-"""Preview module for generating image previews of patterns.
-
-Uses ProcessPoolExecutor to run CPU-intensive preview generation in separate
-processes, completely eliminating Python GIL contention with the motion control thread.
-"""
+"""Preview module for generating image previews of patterns."""
 import os
 import math
 import asyncio
 import logging
 from io import BytesIO
-from modules.core import process_pool as pool_module
 
 logger = logging.getLogger(__name__)
 
 
-def _generate_preview_in_process(pattern_file, format='WEBP'):
-    """Generate preview for a pattern file, optimized for 300x300 view.
-    
-    Runs in a separate process with its own GIL, avoiding contention
-    with the motion control thread.
-    
-    All imports are done inside the function to ensure they happen
-    in the worker process, not the main process.
-    
-    Note: Worker CPU affinity/priority is configured once at pool init via initializer.
-    """
+def _generate_preview(pattern_file, format='WEBP'):
+    """Generate preview for a pattern file, optimized for 300x300 view."""
     # Import dependencies in the worker process
     from PIL import Image, ImageDraw
     from modules.core.pattern_manager import parse_theta_rho_file, THETA_RHO_DIR
@@ -92,22 +78,9 @@ def _generate_preview_in_process(pattern_file, format='WEBP'):
 
 
 async def generate_preview_image(pattern_file, format='WEBP'):
-    """Generate a preview for a pattern file.
-    
-    Runs in a separate process via ProcessPoolExecutor to completely
-    eliminate GIL contention with the motion control thread.
-    """
-    loop = asyncio.get_running_loop()
-    pool = pool_module.get_pool()
-    
+    """Generate a preview for a pattern file."""
     try:
-        # Run preview generation in a separate process (separate GIL)
-        result = await loop.run_in_executor(
-            pool,
-            _generate_preview_in_process,
-            pattern_file,
-            format
-        )
+        result = await asyncio.to_thread(_generate_preview, pattern_file, format)
         return result
     except Exception as e:
         logger.error(f"Error generating preview for {pattern_file}: {e}")

+ 0 - 100
modules/core/process_pool.py

@@ -1,100 +0,0 @@
-"""Shared process pool for CPU-intensive tasks.
-
-Provides a single ProcessPoolExecutor shared across modules to:
-- Isolate CPU-intensive work from real-time threads (separate GILs)
-- Manage worker count based on available CPUs
-- Configure CPU affinity to keep workers off CPU 0 (reserved for motion/LED)
-
-Environment variables:
-- POOL_WORKERS: Override worker count (default: 3)
-"""
-import logging
-import os
-from concurrent.futures import ProcessPoolExecutor
-from typing import Optional
-from modules.core import scheduling
-
-logger = logging.getLogger(__name__)
-
-_pool: Optional[ProcessPoolExecutor] = None
-_shutdown_in_progress: bool = False
-
-# Default to 3 workers for parallel processing
-DEFAULT_WORKERS = 3
-
-
-def _get_worker_count() -> int:
-    """Calculate worker count for the process pool.
-
-    Uses POOL_WORKERS env var if set, otherwise defaults to 3 workers.
-    For memory-constrained devices (Pi Zero 2 W), set POOL_WORKERS=1.
-    """
-    env_workers = os.environ.get('POOL_WORKERS')
-    if env_workers is not None:
-        try:
-            count = int(env_workers)
-            if count >= 0:
-                return count
-            logger.warning(f"Invalid POOL_WORKERS={env_workers}, using default")
-        except ValueError:
-            logger.warning(f"Invalid POOL_WORKERS={env_workers}, using default")
-
-    return DEFAULT_WORKERS
-
-
-def setup_worker_process():
-    """Configure worker process (called at worker startup).
-    
-    Sets CPU affinity and lowers priority.
-    """
-    scheduling.setup_background_worker()
-
-
-def init_pool() -> ProcessPoolExecutor:
-    """Initialize the shared process pool."""
-    global _pool
-    if _pool is not None:
-        return _pool
-    
-    worker_count = _get_worker_count()
-    cpu_count = scheduling.get_cpu_count()
-    _pool = ProcessPoolExecutor(
-        max_workers=worker_count,
-        initializer=setup_worker_process
-    )
-    logger.info(f"Process pool initialized: {worker_count} workers, {cpu_count} CPUs")
-    return _pool
-
-
-def get_pool() -> ProcessPoolExecutor:
-    """Get the shared process pool (must be initialized first)."""
-    if _pool is None:
-        raise RuntimeError("Process pool not initialized - call init_pool() first")
-    return _pool
-
-
-def shutdown_pool(wait: bool = True, cancel_futures: bool = False):
-    """Shutdown the process pool.
-
-    Args:
-        wait: If True, wait for workers to finish current tasks before shutdown.
-              This allows workers to properly release semaphores.
-        cancel_futures: If True, cancel pending futures. Use with caution as this
-                       can cause semaphore leaks if wait=False.
-
-    Note: Always use wait=True to prevent semaphore leaks. The wait=False option
-    exists only for emergency shutdown scenarios.
-    """
-    global _pool, _shutdown_in_progress
-
-    # Prevent concurrent shutdown calls (race condition between signal handler and lifespan)
-    if _shutdown_in_progress:
-        logger.debug("Pool shutdown already in progress, skipping")
-        return
-
-    if _pool is not None:
-        _shutdown_in_progress = True
-        _pool.shutdown(wait=wait, cancel_futures=cancel_futures)
-        _pool = None
-        logger.info("Process pool shut down")
-

+ 0 - 207
modules/core/scheduling.py

@@ -1,207 +0,0 @@
-"""Scheduling utilities for thread/process priority and CPU affinity.
-
-Provides centralized functions to configure scheduling for:
-- Real-time I/O threads (motion control, LED effects) - high priority, CPU 0
-- Background workers (preview generation, file parsing) - low priority, CPUs 1-N
-"""
-import os
-import sys
-import ctypes
-import ctypes.util
-import logging
-from typing import Optional, Set
-
-logger = logging.getLogger(__name__)
-
-# Linux scheduling constants
-SCHED_RR = 2
-
-# Cached libc handle (lazy-loaded)
-_libc = None
-
-
-class _SchedParam(ctypes.Structure):
-    """Linux sched_param structure for real-time scheduling."""
-    _fields_ = [('sched_priority', ctypes.c_int)]
-
-
-def _get_libc():
-    """Get cached libc handle."""
-    global _libc
-    if _libc is None:
-        _libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
-    return _libc
-
-
-def get_cpu_count() -> int:
-    """Get available CPU cores."""
-    return os.cpu_count() or 1
-
-
-def get_background_cpus() -> Optional[Set[int]]:
-    """Get CPU set for background work (all except CPU 0).
-    
-    Returns None on single-core systems.
-    """
-    cpu_count = get_cpu_count()
-    if cpu_count <= 1:
-        return None
-    return set(range(1, cpu_count))
-
-
-def elevate_priority(tid: Optional[int] = None, realtime_priority: int = 50) -> bool:
-    """Elevate thread/process to real-time priority.
-    
-    Attempts SCHED_RR (real-time round-robin) first, falls back to nice -10.
-    Requires CAP_SYS_NICE capability for full real-time scheduling.
-    
-    Args:
-        tid: Thread/process ID. If None, uses current thread (0).
-        realtime_priority: SCHED_RR priority (1-99, default 50).
-    
-    Returns:
-        True if any elevation succeeded.
-    """
-    if sys.platform != 'linux':
-        logger.debug("Priority elevation only supported on Linux")
-        return False
-    
-    target_id = tid if tid is not None else 0
-    
-    # Try SCHED_RR (real-time round-robin)
-    try:
-        libc = _get_libc()
-        param = _SchedParam(realtime_priority)
-        result = libc.sched_setscheduler(target_id, SCHED_RR, ctypes.byref(param))
-        
-        if result == 0:
-            logger.info(f"Thread {target_id} set to SCHED_RR priority {realtime_priority}")
-            return True
-        else:
-            errno = ctypes.get_errno()
-            logger.debug(f"SCHED_RR failed with errno {errno}, trying nice fallback")
-    except Exception as e:
-        logger.debug(f"SCHED_RR setup failed: {e}, trying nice fallback")
-    
-    # Fallback: negative nice value
-    try:
-        current_nice = os.nice(0)
-        if current_nice > -10:
-            os.nice(-10 - current_nice)
-            logger.info("Process priority elevated via nice(-10)")
-            return True
-    except PermissionError:
-        logger.info("Priority elevation requires CAP_SYS_NICE capability - using default priority")
-    except Exception as e:
-        logger.debug(f"Nice priority elevation failed: {e}")
-    
-    return False
-
-
-def lower_priority(nice_value: int = 10) -> bool:
-    """Lower current thread/process priority for background work.
-    
-    Args:
-        nice_value: Target nice value (positive = lower priority).
-    
-    Returns:
-        True if priority was lowered.
-    """
-    if sys.platform != 'linux':
-        return False
-    
-    try:
-        current_nice = os.nice(0)
-        if current_nice < nice_value:
-            os.nice(nice_value - current_nice)
-            logger.debug(f"Process priority lowered to nice {nice_value}")
-        return True
-    except Exception as e:
-        logger.debug(f"Could not lower priority: {e}")
-        return False
-
-
-def pin_to_cpu(cpu_id: int, tid: Optional[int] = None) -> bool:
-    """Pin thread/process to a specific CPU core.
-    
-    Args:
-        cpu_id: CPU core number (0-indexed).
-        tid: Thread/process ID. If None, uses current (0).
-    
-    Returns:
-        True if affinity was set.
-    """
-    return pin_to_cpus({cpu_id}, tid)
-
-
-def pin_to_cpus(cpu_ids: Set[int], tid: Optional[int] = None) -> bool:
-    """Pin thread/process to multiple CPU cores.
-    
-    Args:
-        cpu_ids: Set of CPU core numbers.
-        tid: Thread/process ID. If None, uses current (0).
-    
-    Returns:
-        True if affinity was set.
-    """
-    if sys.platform != 'linux':
-        return False
-    
-    if not cpu_ids:
-        return False
-    
-    target_id = tid if tid is not None else 0
-    
-    try:
-        os.sched_setaffinity(target_id, cpu_ids)
-        cpu_str = ','.join(map(str, sorted(cpu_ids)))
-        logger.debug(f"Thread {target_id} pinned to CPU(s) {cpu_str}")
-        return True
-    except Exception as e:
-        logger.debug(f"CPU affinity not set: {e}")
-        return False
-
-
-def setup_realtime_thread(tid: Optional[int] = None, priority: int = 50) -> None:
-    """Setup for time-critical I/O threads (motion control, LED effects).
-
-    Elevates priority and pins to CPU 0.
-
-    Args:
-        tid: Thread native_id. If None, uses current thread.
-        priority: SCHED_RR priority (1-99). Higher = more important.
-                  Motion should use higher than LED (e.g., 60 vs 40).
-    """
-    # DISABLED: SCHED_RR + CPU pinning causes serial buffer corruption on Pi 3B+
-    # The real-time scheduling appears to interfere with serial I/O timing,
-    # causing commands to be merged/corrupted (e.g., "G1 G53" -> "G10G53").
-    # This needs further investigation - may need to pin to a different CPU
-    # or use a different scheduling policy.
-    return
-
-    cpu_count = get_cpu_count()
-
-    # Elevate priority (logs internally on success)
-    elevate_priority(tid, realtime_priority=priority)
-
-    # Pin to CPU 0 if multi-core
-    if cpu_count > 1:
-        if pin_to_cpu(0, tid):
-            logger.info(f"Real-time thread pinned to CPU 0 ({cpu_count} CPUs detected)")
-
-
-def setup_background_worker() -> None:
-    """Setup for CPU-intensive background workers.
-    
-    Lowers priority and pins to CPUs 1-N (avoiding CPU 0).
-    Called at worker process startup.
-    """
-    # Lower priority
-    lower_priority(10)
-    
-    # Pin to background CPUs (1-N)
-    worker_cpus = get_background_cpus()
-    if worker_cpus:
-        pin_to_cpus(worker_cpus)
-        cpu_str = ','.join(map(str, sorted(worker_cpus)))
-        logger.debug(f"Background worker pinned to CPUs {cpu_str}")

+ 0 - 5
modules/led/dw_led_controller.py

@@ -139,11 +139,6 @@ class DWLEDController:
 
     def _effect_loop(self):
         """Background thread that runs the current effect"""
-        # Elevate priority and pin to CPU 0 for consistent timing
-        # LED uses lower priority (40) than motion (60) since CNC is more critical
-        from modules.core import scheduling
-        scheduling.setup_realtime_thread(priority=40)
-        
         while not self._stop_thread.is_set():
             try:
                 with self._lock: