Procházet zdrojové kódy

Merge main into feature/react-ui

Resolved conflicts:
- docker-compose.yml: Keep main's network_mode=host and SYS_NICE capability
- cache_manager.py: Combined both approaches - use ProcessPoolExecutor for
  GIL isolation AND asyncio.wait_for for timeout protection

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
tuanchris před 3 týdny
rodič
revize
c5fd2719b8

+ 2 - 0
.gitignore

@@ -18,6 +18,8 @@ dune-weaver-touch/*.json
 .venv/
 patterns/cached_svg/
 patterns/cached_images/custom_*
+# macOS files
+.DS_Store
 # Node.js and build files
 node_modules/
 *.log

+ 5 - 2
docker-compose.yml

@@ -6,8 +6,11 @@ services:
       dockerfile: Dockerfile
     image: ghcr.io/tuanchris/dune-weaver-frontend:feature-react-ui
     restart: always
-    ports:
-      - "80:80"
+    cap_add:
+      - SYS_NICE  # Enable real-time thread priority for smooth UART communication
+    # ports:
+    #   - "8080:8080" # Map port 8080 of the container to 8080 of the host (access via http://localhost:8080)
+    network_mode: "host" # Use host network for device access
     volumes:
       - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
     depends_on:

+ 84 - 75
main.py

@@ -30,22 +30,14 @@ import json
 import base64
 import time
 import argparse
-from concurrent.futures import ProcessPoolExecutor
-import multiprocessing
 import subprocess
 import platform
+from modules.core import process_pool as pool_module
 
 # Get log level from environment variable, default to INFO
 log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper()
 log_level = getattr(logging, log_level_str, logging.INFO)
 
-# Create a process pool for CPU-intensive tasks
-# Limit to reasonable number of workers for embedded systems
-cpu_count = multiprocessing.cpu_count()
-# Maximum 3 workers (leaving 1 for motion), minimum 1
-process_pool_size = min(3, max(1, cpu_count - 1))
-process_pool = None  # Will be initialized in lifespan
-
 logging.basicConfig(
     level=log_level,
     format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
@@ -109,10 +101,17 @@ async def lifespan(app: FastAPI):
     signal.signal(signal.SIGINT, signal_handler)
     signal.signal(signal.SIGTERM, signal_handler)
 
-    # Initialize process pool for CPU-intensive tasks
-    global process_pool
-    process_pool = ProcessPoolExecutor(max_workers=process_pool_size)
-    logger.info(f"Initialized process pool with {process_pool_size} workers (detected {cpu_count} cores total)")
+    # Initialize shared process pool for CPU-intensive tasks
+    pool_module.init_pool()
+
+    # Pin main process to CPUs 1-N to keep CPU 0 dedicated to motion/LED
+    from modules.core import scheduling
+    background_cpus = scheduling.get_background_cpus()
+    if background_cpus:
+        scheduling.pin_to_cpus(background_cpus)
+        logger.info(f"FastAPI main process pinned to CPUs {sorted(background_cpus)}")
+    else:
+        logger.info("Single-core system detected, skipping CPU pinning")
 
     try:
         connection_manager.connect_device()
@@ -255,14 +254,16 @@ async def lifespan(app: FastAPI):
     logger.info("Shutting down Dune Weaver application...")
 
     # Shutdown process pool
-    if process_pool:
-        process_pool.shutdown(wait=True)
-        logger.info("Process pool shutdown complete")
+    pool_module.shutdown_pool(wait=True)
 
 app = FastAPI(lifespan=lifespan)
 templates = Jinja2Templates(directory="templates")
 app.mount("/static", StaticFiles(directory="static"), name="static")
 
+# Global semaphore to limit concurrent preview processing
+# Prevents resource exhaustion when loading many previews simultaneously
+preview_semaphore = asyncio.Semaphore(5)
+
 # Pydantic models for request/response validation
 class ConnectRequest(BaseModel):
     port: Optional[str] = None
@@ -1198,7 +1199,7 @@ async def list_theta_rho_files_with_metadata():
         logger.error(f"Failed to load metadata cache, falling back to slow method: {e}")
         # Fallback to original method if cache loading fails
         # Create tasks only when needed
-        loop = asyncio.get_event_loop()
+        loop = asyncio.get_running_loop()
         tasks = [loop.run_in_executor(executor, process_file, file_path) for file_path in files]
 
         for task in asyncio.as_completed(tasks):
@@ -1278,8 +1279,8 @@ async def get_theta_rho_coordinates(request: GetCoordinatesRequest):
 
         # Parse the theta-rho file in a separate process for CPU-intensive work
         # This prevents blocking the motion control thread
-        loop = asyncio.get_event_loop()
-        coordinates = await loop.run_in_executor(process_pool, parse_theta_rho_file, file_path)
+        loop = asyncio.get_running_loop()
+        coordinates = await loop.run_in_executor(pool_module.get_pool(), parse_theta_rho_file, file_path)
         
         if not coordinates:
             raise HTTPException(status_code=400, detail="No valid coordinates found in file")
@@ -2343,6 +2344,16 @@ async def test_mqtt_connection(request: dict):
             status_code=500
         )
 
+def _read_and_encode_preview(cache_path: str) -> str:
+    """Read preview image from disk and encode as base64.
+    
+    Combines file I/O and base64 encoding in a single function
+    to be run in executor, reducing context switches.
+    """
+    with open(cache_path, 'rb') as f:
+        image_data = f.read()
+    return base64.b64encode(image_data).decode('utf-8')
+
 @app.post("/preview_thr_batch")
 async def preview_thr_batch(request: dict):
     start = time.time()
@@ -2361,57 +2372,59 @@ async def preview_thr_batch(request: dict):
 
     async def process_single_file(file_name):
         """Process a single file and return its preview data."""
-        t1 = time.time()
-        try:
-            # Normalize file path for cross-platform compatibility
-            normalized_file_name = normalize_file_path(file_name)
-            pattern_file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
-
-            # Check file existence asynchronously
-            exists = await asyncio.to_thread(os.path.exists, pattern_file_path)
-            if not exists:
-                logger.warning(f"Pattern file not found: {pattern_file_path}")
-                return file_name, {"error": "Pattern file not found"}
-
-            cache_path = get_cache_path(normalized_file_name)
-
-            # Check cache existence asynchronously
-            cache_exists = await asyncio.to_thread(os.path.exists, cache_path)
-            if not cache_exists:
-                logger.info(f"Cache miss for {file_name}. Generating preview...")
-                success = await generate_image_preview(normalized_file_name)
-                cache_exists_after = await asyncio.to_thread(os.path.exists, cache_path)
-                if not success or not cache_exists_after:
-                    logger.error(f"Failed to generate or find preview for {file_name}")
-                    return file_name, {"error": "Failed to generate preview"}
-
-            metadata = get_pattern_metadata(normalized_file_name)
-            if metadata:
-                first_coord_obj = metadata.get('first_coordinate')
-                last_coord_obj = metadata.get('last_coordinate')
-            else:
-                logger.debug(f"Metadata cache miss for {file_name}, parsing file")
-                # Use process pool for CPU-intensive parsing
-                loop = asyncio.get_event_loop()
-                coordinates = await loop.run_in_executor(process_pool, parse_theta_rho_file, pattern_file_path)
-                first_coord = coordinates[0] if coordinates else None
-                last_coord = coordinates[-1] if coordinates else None
-                first_coord_obj = {"x": first_coord[0], "y": first_coord[1]} if first_coord else None
-                last_coord_obj = {"x": last_coord[0], "y": last_coord[1]} if last_coord else None
-
-            # Read image file asynchronously
-            image_data = await asyncio.to_thread(lambda: open(cache_path, 'rb').read())
-            image_b64 = base64.b64encode(image_data).decode('utf-8')
-            result = {
-                "image_data": f"data:image/webp;base64,{image_b64}",
-                "first_coordinate": first_coord_obj,
-                "last_coordinate": last_coord_obj
-            }
-            logger.debug(f"Processed {file_name} in {time.time() - t1:.2f}s")
-            return file_name, result
-        except Exception as e:
-            logger.error(f"Error processing {file_name}: {str(e)}")
-            return file_name, {"error": str(e)}
+        # Acquire semaphore to limit concurrent processing
+        async with preview_semaphore:
+            t1 = time.time()
+            try:
+                # Normalize file path for cross-platform compatibility
+                normalized_file_name = normalize_file_path(file_name)
+                pattern_file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
+
+                # Check file existence asynchronously
+                exists = await asyncio.to_thread(os.path.exists, pattern_file_path)
+                if not exists:
+                    logger.warning(f"Pattern file not found: {pattern_file_path}")
+                    return file_name, {"error": "Pattern file not found"}
+
+                cache_path = get_cache_path(normalized_file_name)
+
+                # Check cache existence asynchronously
+                cache_exists = await asyncio.to_thread(os.path.exists, cache_path)
+                if not cache_exists:
+                    logger.info(f"Cache miss for {file_name}. Generating preview...")
+                    success = await generate_image_preview(normalized_file_name)
+                    cache_exists_after = await asyncio.to_thread(os.path.exists, cache_path)
+                    if not success or not cache_exists_after:
+                        logger.error(f"Failed to generate or find preview for {file_name}")
+                        return file_name, {"error": "Failed to generate preview"}
+
+                metadata = get_pattern_metadata(normalized_file_name)
+                if metadata:
+                    first_coord_obj = metadata.get('first_coordinate')
+                    last_coord_obj = metadata.get('last_coordinate')
+                else:
+                    logger.debug(f"Metadata cache miss for {file_name}, parsing file")
+                    # Use process pool for CPU-intensive parsing
+                    loop = asyncio.get_running_loop()
+                    coordinates = await loop.run_in_executor(pool_module.get_pool(), parse_theta_rho_file, pattern_file_path)
+                    first_coord = coordinates[0] if coordinates else None
+                    last_coord = coordinates[-1] if coordinates else None
+                    first_coord_obj = {"x": first_coord[0], "y": first_coord[1]} if first_coord else None
+                    last_coord_obj = {"x": last_coord[0], "y": last_coord[1]} if last_coord else None
+
+                # Read image file and encode in executor to avoid blocking event loop
+                loop = asyncio.get_running_loop()
+                image_b64 = await loop.run_in_executor(None, _read_and_encode_preview, cache_path)
+                result = {
+                    "image_data": f"data:image/webp;base64,{image_b64}",
+                    "first_coordinate": first_coord_obj,
+                    "last_coordinate": last_coord_obj
+                }
+                logger.debug(f"Processed {file_name} in {time.time() - t1:.2f}s")
+                return file_name, result
+            except Exception as e:
+                logger.error(f"Error processing {file_name}: {str(e)}")
+                return file_name, {"error": str(e)}
 
     # Process all files concurrently
     tasks = [process_single_file(file_name) for file_name in file_names]
@@ -2821,11 +2834,7 @@ def signal_handler(signum, frame):
             state.led_controller.set_power(0)
 
         # Shutdown process pool to prevent semaphore leaks
-        global process_pool
-        if process_pool:
-            logger.info("Shutting down process pool...")
-            process_pool.shutdown(wait=False, cancel_futures=True)
-            process_pool = None
+        pool_module.shutdown_pool(wait=False, cancel_futures=True)
 
         # Stop pattern manager motion controller
         pattern_manager.motion_controller.stop()

+ 16 - 3
modules/core/cache_manager.py

@@ -5,6 +5,7 @@ import asyncio
 import logging
 from pathlib import Path
 from modules.core.pattern_manager import list_theta_rho_files, THETA_RHO_DIR, parse_theta_rho_file
+from modules.core.process_pool import get_pool as _get_process_pool
 
 logger = logging.getLogger(__name__)
 
@@ -450,9 +451,15 @@ async def generate_image_preview(pattern_file):
             pattern_path = os.path.join(THETA_RHO_DIR, pattern_file)
 
             try:
+                # Use process pool to avoid GIL contention with motion thread
                 # Add timeout protection to prevent hanging on problematic files
+                loop = asyncio.get_running_loop()
                 coordinates = await asyncio.wait_for(
-                    asyncio.to_thread(parse_theta_rho_file, pattern_path),
+                    loop.run_in_executor(
+                        _get_process_pool(),
+                        parse_theta_rho_file,
+                        pattern_path
+                    ),
                     timeout=30.0  # 30 second timeout per file
                 )
 
@@ -626,10 +633,16 @@ async def generate_metadata_cache():
                 cache_progress["current_file"] = file_name
                 
                 try:
-                    # Parse file to get metadata with timeout protection
+                    # Parse file in separate process to avoid GIL contention with motion thread
+                    # Add timeout protection to prevent hanging on problematic files
                     try:
+                        loop = asyncio.get_running_loop()
                         coordinates = await asyncio.wait_for(
-                            asyncio.to_thread(parse_theta_rho_file, pattern_path),
+                            loop.run_in_executor(
+                                _get_process_pool(),
+                                parse_theta_rho_file,
+                                pattern_path
+                            ),
                             timeout=30.0  # 30 second timeout per file
                         )
                     except asyncio.TimeoutError:

+ 6 - 1
modules/core/pattern_manager.py

@@ -249,7 +249,7 @@ class MotionControlThread:
         self.paused = False
 
     def start(self):
-        """Start the motion control thread."""
+        """Start the motion control thread with elevated priority."""
         if self.thread and self.thread.is_alive():
             return
 
@@ -273,6 +273,11 @@ class MotionControlThread:
 
     def _motion_loop(self):
         """Main loop for the motion control thread."""
+        # Setup realtime priority from within thread to avoid native_id race
+        # Motion uses higher priority (60) than LED (40) for CNC reliability
+        from modules.core import scheduling
+        scheduling.setup_realtime_thread(priority=60)
+        
         logger.info("Motion control thread loop started")
 
         while self.running:

+ 58 - 16
modules/core/preview.py

@@ -1,25 +1,43 @@
-"""Preview module for generating image previews of patterns."""
+"""Preview module for generating image previews of patterns.
+
+Uses ProcessPoolExecutor to run CPU-intensive preview generation in separate
+processes, completely eliminating Python GIL contention with the motion control thread.
+"""
 import os
 import math
 import asyncio
+import logging
 from io import BytesIO
-from PIL import Image, ImageDraw
-from modules.core.pattern_manager import parse_theta_rho_file, THETA_RHO_DIR
+from modules.core import process_pool as pool_module
 
-async def generate_preview_image(pattern_file, format='WEBP'):
-    """Generate a preview for a pattern file, optimized for a 300x300 view."""
+logger = logging.getLogger(__name__)
+
+
+def _generate_preview_in_process(pattern_file, format='WEBP'):
+    """Generate preview for a pattern file, optimized for 300x300 view.
+    
+    Runs in a separate process with its own GIL, avoiding contention
+    with the motion control thread.
+    
+    All imports are done inside the function to ensure they happen
+    in the worker process, not the main process.
+    
+    Note: Worker CPU affinity/priority is configured once at pool init via initializer.
+    """
+    # Import dependencies in the worker process
+    from PIL import Image, ImageDraw
+    from modules.core.pattern_manager import parse_theta_rho_file, THETA_RHO_DIR
+    
     file_path = os.path.join(THETA_RHO_DIR, pattern_file)
-    # Use asyncio.to_thread to prevent blocking the event loop
-    coordinates = await asyncio.to_thread(parse_theta_rho_file, file_path)
+    coordinates = parse_theta_rho_file(file_path)
     
-    # Use 1000x1000 for high quality rendering
-    RENDER_SIZE = 2048
-    # Final display size
-    DISPLAY_SIZE = 512
+    # Image generation parameters
+    RENDER_SIZE = 2048  # Use 2048x2048 for high quality rendering
+    DISPLAY_SIZE = 512  # Final display size
     
     if not coordinates:
         # Create an image with "No pattern data" text
-        img = Image.new('RGBA', (DISPLAY_SIZE, DISPLAY_SIZE), (255, 255, 255, 0)) # Transparent background
+        img = Image.new('RGBA', (DISPLAY_SIZE, DISPLAY_SIZE), (255, 255, 255, 0))  # Transparent background
         draw = ImageDraw.Draw(img)
         text = "No pattern data"
         try:
@@ -38,10 +56,11 @@ async def generate_preview_image(pattern_file, format='WEBP'):
         img_byte_arr.seek(0)
         return img_byte_arr.getvalue()
 
-    # Image drawing parameters
-    img = Image.new('RGBA', (RENDER_SIZE, RENDER_SIZE), (255, 255, 255, 0)) # Transparent background
+    # Create image and draw pattern
+    img = Image.new('RGBA', (RENDER_SIZE, RENDER_SIZE), (255, 255, 255, 0))  # Transparent background
     draw = ImageDraw.Draw(img)
     
+    # Image drawing parameters
     CENTER = RENDER_SIZE / 2.0
     SCALE_FACTOR = (RENDER_SIZE / 2.0) - 10.0 
     LINE_COLOR = "black" 
@@ -62,11 +81,34 @@ async def generate_preview_image(pattern_file, format='WEBP'):
 
     # Scale down to display size with high-quality resampling
     img = img.resize((DISPLAY_SIZE, DISPLAY_SIZE), Image.Resampling.LANCZOS)
-    
     # Rotate the image 180 degrees
     img = img.rotate(180)
 
+    # Save to bytes
     img_byte_arr = BytesIO()
     img.save(img_byte_arr, format=format, lossless=False, alpha_quality=20, method=0)
     img_byte_arr.seek(0)
-    return img_byte_arr.getvalue()
+    return img_byte_arr.getvalue()
+
+
+async def generate_preview_image(pattern_file, format='WEBP'):
+    """Generate a preview for a pattern file.
+    
+    Runs in a separate process via ProcessPoolExecutor to completely
+    eliminate GIL contention with the motion control thread.
+    """
+    loop = asyncio.get_running_loop()
+    pool = pool_module.get_pool()
+    
+    try:
+        # Run preview generation in a separate process (separate GIL)
+        result = await loop.run_in_executor(
+            pool,
+            _generate_preview_in_process,
+            pattern_file,
+            format
+        )
+        return result
+    except Exception as e:
+        logger.error(f"Error generating preview for {pattern_file}: {e}")
+        return None

+ 65 - 0
modules/core/process_pool.py

@@ -0,0 +1,65 @@
+"""Shared process pool for CPU-intensive tasks.
+
+Provides a single ProcessPoolExecutor shared across modules to:
+- Isolate CPU-intensive work from real-time threads (separate GILs)
+- Manage worker count based on available CPUs
+- Configure CPU affinity to keep workers off CPU 0 (reserved for motion/LED)
+"""
+import logging
+from concurrent.futures import ProcessPoolExecutor
+from modules.core import scheduling
+
+logger = logging.getLogger(__name__)
+
+_pool: ProcessPoolExecutor | None = None
+
+
+def _get_worker_count() -> int:
+    """Calculate optimal worker count.
+    
+    - Reserve 1 CPU for motion control thread
+    - Max 3 workers (diminishing returns beyond)
+    - Min 1 worker
+    """
+    return min(3, max(1, scheduling.get_cpu_count() - 1))
+
+
+def setup_worker_process():
+    """Configure worker process (called at worker startup).
+    
+    Sets CPU affinity and lowers priority.
+    """
+    scheduling.setup_background_worker()
+
+
+def init_pool() -> ProcessPoolExecutor:
+    """Initialize the shared process pool."""
+    global _pool
+    if _pool is not None:
+        return _pool
+    
+    worker_count = _get_worker_count()
+    cpu_count = scheduling.get_cpu_count()
+    _pool = ProcessPoolExecutor(
+        max_workers=worker_count,
+        initializer=setup_worker_process
+    )
+    logger.info(f"Process pool initialized: {worker_count} workers, {cpu_count} CPUs")
+    return _pool
+
+
+def get_pool() -> ProcessPoolExecutor:
+    """Get the shared process pool (must be initialized first)."""
+    if _pool is None:
+        raise RuntimeError("Process pool not initialized - call init_pool() first")
+    return _pool
+
+
+def shutdown_pool(wait: bool = True, cancel_futures: bool = False):
+    """Shutdown the process pool."""
+    global _pool
+    if _pool is not None:
+        _pool.shutdown(wait=wait, cancel_futures=cancel_futures)
+        _pool = None
+        logger.info("Process pool shut down")
+

+ 199 - 0
modules/core/scheduling.py

@@ -0,0 +1,199 @@
+"""Scheduling utilities for thread/process priority and CPU affinity.
+
+Provides centralized functions to configure scheduling for:
+- Real-time I/O threads (motion control, LED effects) - high priority, CPU 0
+- Background workers (preview generation, file parsing) - low priority, CPUs 1-N
+"""
+import os
+import sys
+import ctypes
+import ctypes.util
+import logging
+
+logger = logging.getLogger(__name__)
+
+# Linux scheduling constants
+SCHED_RR = 2
+
+# Cached libc handle (lazy-loaded)
+_libc = None
+
+
+class _SchedParam(ctypes.Structure):
+    """Linux sched_param structure for real-time scheduling."""
+    _fields_ = [('sched_priority', ctypes.c_int)]
+
+
+def _get_libc():
+    """Get cached libc handle."""
+    global _libc
+    if _libc is None:
+        _libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
+    return _libc
+
+
+def get_cpu_count() -> int:
+    """Get available CPU cores."""
+    return os.cpu_count() or 1
+
+
+def get_background_cpus() -> set[int] | None:
+    """Get CPU set for background work (all except CPU 0).
+    
+    Returns None on single-core systems.
+    """
+    cpu_count = get_cpu_count()
+    if cpu_count <= 1:
+        return None
+    return set(range(1, cpu_count))
+
+
+def elevate_priority(tid: int | None = None, realtime_priority: int = 50) -> bool:
+    """Elevate thread/process to real-time priority.
+    
+    Attempts SCHED_RR (real-time round-robin) first, falls back to nice -10.
+    Requires CAP_SYS_NICE capability for full real-time scheduling.
+    
+    Args:
+        tid: Thread/process ID. If None, uses current thread (0).
+        realtime_priority: SCHED_RR priority (1-99, default 50).
+    
+    Returns:
+        True if any elevation succeeded.
+    """
+    if sys.platform != 'linux':
+        logger.debug("Priority elevation only supported on Linux")
+        return False
+    
+    target_id = tid if tid is not None else 0
+    
+    # Try SCHED_RR (real-time round-robin)
+    try:
+        libc = _get_libc()
+        param = _SchedParam(realtime_priority)
+        result = libc.sched_setscheduler(target_id, SCHED_RR, ctypes.byref(param))
+        
+        if result == 0:
+            logger.info(f"Thread {target_id} set to SCHED_RR priority {realtime_priority}")
+            return True
+        else:
+            errno = ctypes.get_errno()
+            logger.debug(f"SCHED_RR failed with errno {errno}, trying nice fallback")
+    except Exception as e:
+        logger.debug(f"SCHED_RR setup failed: {e}, trying nice fallback")
+    
+    # Fallback: negative nice value
+    try:
+        current_nice = os.nice(0)
+        if current_nice > -10:
+            os.nice(-10 - current_nice)
+            logger.info("Process priority elevated via nice(-10)")
+            return True
+    except PermissionError:
+        logger.info("Priority elevation requires CAP_SYS_NICE capability - using default priority")
+    except Exception as e:
+        logger.debug(f"Nice priority elevation failed: {e}")
+    
+    return False
+
+
+def lower_priority(nice_value: int = 10) -> bool:
+    """Lower current thread/process priority for background work.
+    
+    Args:
+        nice_value: Target nice value (positive = lower priority).
+    
+    Returns:
+        True if priority was lowered.
+    """
+    if sys.platform != 'linux':
+        return False
+    
+    try:
+        current_nice = os.nice(0)
+        if current_nice < nice_value:
+            os.nice(nice_value - current_nice)
+            logger.debug(f"Process priority lowered to nice {nice_value}")
+        return True
+    except Exception as e:
+        logger.debug(f"Could not lower priority: {e}")
+        return False
+
+
+def pin_to_cpu(cpu_id: int, tid: int | None = None) -> bool:
+    """Pin thread/process to a specific CPU core.
+    
+    Args:
+        cpu_id: CPU core number (0-indexed).
+        tid: Thread/process ID. If None, uses current (0).
+    
+    Returns:
+        True if affinity was set.
+    """
+    return pin_to_cpus({cpu_id}, tid)
+
+
+def pin_to_cpus(cpu_ids: set[int], tid: int | None = None) -> bool:
+    """Pin thread/process to multiple CPU cores.
+    
+    Args:
+        cpu_ids: Set of CPU core numbers.
+        tid: Thread/process ID. If None, uses current (0).
+    
+    Returns:
+        True if affinity was set.
+    """
+    if sys.platform != 'linux':
+        return False
+    
+    if not cpu_ids:
+        return False
+    
+    target_id = tid if tid is not None else 0
+    
+    try:
+        os.sched_setaffinity(target_id, cpu_ids)
+        cpu_str = ','.join(map(str, sorted(cpu_ids)))
+        logger.debug(f"Thread {target_id} pinned to CPU(s) {cpu_str}")
+        return True
+    except Exception as e:
+        logger.debug(f"CPU affinity not set: {e}")
+        return False
+
+
+def setup_realtime_thread(tid: int | None = None, priority: int = 50) -> None:
+    """Setup for time-critical I/O threads (motion control, LED effects).
+    
+    Elevates priority and pins to CPU 0.
+    
+    Args:
+        tid: Thread native_id. If None, uses current thread.
+        priority: SCHED_RR priority (1-99). Higher = more important.
+                  Motion should use higher than LED (e.g., 60 vs 40).
+    """
+    cpu_count = get_cpu_count()
+    
+    # Elevate priority (logs internally on success)
+    elevate_priority(tid, realtime_priority=priority)
+    
+    # Pin to CPU 0 if multi-core
+    if cpu_count > 1:
+        if pin_to_cpu(0, tid):
+            logger.info(f"Real-time thread pinned to CPU 0 ({cpu_count} CPUs detected)")
+
+
+def setup_background_worker() -> None:
+    """Setup for CPU-intensive background workers.
+    
+    Lowers priority and pins to CPUs 1-N (avoiding CPU 0).
+    Called at worker process startup.
+    """
+    # Lower priority
+    lower_priority(10)
+    
+    # Pin to background CPUs (1-N)
+    worker_cpus = get_background_cpus()
+    if worker_cpus:
+        pin_to_cpus(worker_cpus)
+        cpu_str = ','.join(map(str, sorted(worker_cpus)))
+        logger.debug(f"Background worker pinned to CPUs {cpu_str}")

+ 5 - 0
modules/led/dw_led_controller.py

@@ -139,6 +139,11 @@ class DWLEDController:
 
     def _effect_loop(self):
         """Background thread that runs the current effect"""
+        # Elevate priority and pin to CPU 0 for consistent timing
+        # LED uses lower priority (40) than motion (60) since CNC is more critical
+        from modules.core import scheduling
+        scheduling.setup_realtime_thread(priority=40)
+        
         while not self._stop_thread.is_set():
             try:
                 with self._lock: