"""Shared process pool for CPU-intensive tasks. Provides a single ProcessPoolExecutor shared across modules to: - Isolate CPU-intensive work from real-time threads (separate GILs) - Manage worker count based on available CPUs - Configure CPU affinity to keep workers off CPU 0 (reserved for motion/LED) Environment variables: - POOL_WORKERS: Override worker count (default: 1 for RAM conservation) """ import logging import os from concurrent.futures import ProcessPoolExecutor from typing import Optional from modules.core import scheduling logger = logging.getLogger(__name__) _pool: Optional[ProcessPoolExecutor] = None _shutdown_in_progress: bool = False # Default to 1 worker to conserve RAM on low-memory devices (Pi Zero 2 W has only 512MB) DEFAULT_WORKERS = 1 def _get_worker_count() -> int: """Calculate worker count for the process pool. Uses POOL_WORKERS env var if set, otherwise defaults to 1 worker to conserve RAM on memory-constrained devices like Pi Zero 2 W. For systems with more RAM, set POOL_WORKERS=2 or POOL_WORKERS=3. """ env_workers = os.environ.get('POOL_WORKERS') if env_workers is not None: try: count = int(env_workers) if count >= 0: return count logger.warning(f"Invalid POOL_WORKERS={env_workers}, using default") except ValueError: logger.warning(f"Invalid POOL_WORKERS={env_workers}, using default") return DEFAULT_WORKERS def setup_worker_process(): """Configure worker process (called at worker startup). Sets CPU affinity and lowers priority. """ scheduling.setup_background_worker() def init_pool() -> ProcessPoolExecutor: """Initialize the shared process pool.""" global _pool if _pool is not None: return _pool worker_count = _get_worker_count() cpu_count = scheduling.get_cpu_count() _pool = ProcessPoolExecutor( max_workers=worker_count, initializer=setup_worker_process ) logger.info(f"Process pool initialized: {worker_count} workers, {cpu_count} CPUs") return _pool def get_pool() -> ProcessPoolExecutor: """Get the shared process pool (must be initialized first).""" if _pool is None: raise RuntimeError("Process pool not initialized - call init_pool() first") return _pool def shutdown_pool(wait: bool = True, cancel_futures: bool = False): """Shutdown the process pool. Args: wait: If True, wait for workers to finish current tasks before shutdown. This allows workers to properly release semaphores. cancel_futures: If True, cancel pending futures. Use with caution as this can cause semaphore leaks if wait=False. Note: Always use wait=True to prevent semaphore leaks. The wait=False option exists only for emergency shutdown scenarios. """ global _pool, _shutdown_in_progress # Prevent concurrent shutdown calls (race condition between signal handler and lifespan) if _shutdown_in_progress: logger.debug("Pool shutdown already in progress, skipping") return if _pool is not None: _shutdown_in_progress = True _pool.shutdown(wait=wait, cancel_futures=cancel_futures) _pool = None logger.info("Process pool shut down")