| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100 |
- """Shared process pool for CPU-intensive tasks.
- Provides a single ProcessPoolExecutor shared across modules to:
- - Isolate CPU-intensive work from real-time threads (separate GILs)
- - Manage worker count based on available CPUs
- - Configure CPU affinity to keep workers off CPU 0 (reserved for motion/LED)
- Environment variables:
- - POOL_WORKERS: Override worker count (default: 3)
- """
- import logging
- import os
- from concurrent.futures import ProcessPoolExecutor
- from typing import Optional
- from modules.core import scheduling
- logger = logging.getLogger(__name__)
- _pool: Optional[ProcessPoolExecutor] = None
- _shutdown_in_progress: bool = False
- # Default to 3 workers for parallel processing
- DEFAULT_WORKERS = 3
- def _get_worker_count() -> int:
- """Calculate worker count for the process pool.
- Uses POOL_WORKERS env var if set, otherwise defaults to 3 workers.
- For memory-constrained devices (Pi Zero 2 W), set POOL_WORKERS=1.
- """
- env_workers = os.environ.get('POOL_WORKERS')
- if env_workers is not None:
- try:
- count = int(env_workers)
- if count >= 0:
- return count
- logger.warning(f"Invalid POOL_WORKERS={env_workers}, using default")
- except ValueError:
- logger.warning(f"Invalid POOL_WORKERS={env_workers}, using default")
- return DEFAULT_WORKERS
- def setup_worker_process():
- """Configure worker process (called at worker startup).
-
- Sets CPU affinity and lowers priority.
- """
- scheduling.setup_background_worker()
- def init_pool() -> ProcessPoolExecutor:
- """Initialize the shared process pool."""
- global _pool
- if _pool is not None:
- return _pool
-
- worker_count = _get_worker_count()
- cpu_count = scheduling.get_cpu_count()
- _pool = ProcessPoolExecutor(
- max_workers=worker_count,
- initializer=setup_worker_process
- )
- logger.info(f"Process pool initialized: {worker_count} workers, {cpu_count} CPUs")
- return _pool
- def get_pool() -> ProcessPoolExecutor:
- """Get the shared process pool (must be initialized first)."""
- if _pool is None:
- raise RuntimeError("Process pool not initialized - call init_pool() first")
- return _pool
- def shutdown_pool(wait: bool = True, cancel_futures: bool = False):
- """Shutdown the process pool.
- Args:
- wait: If True, wait for workers to finish current tasks before shutdown.
- This allows workers to properly release semaphores.
- cancel_futures: If True, cancel pending futures. Use with caution as this
- can cause semaphore leaks if wait=False.
- Note: Always use wait=True to prevent semaphore leaks. The wait=False option
- exists only for emergency shutdown scenarios.
- """
- global _pool, _shutdown_in_progress
- # Prevent concurrent shutdown calls (race condition between signal handler and lifespan)
- if _shutdown_in_progress:
- logger.debug("Pool shutdown already in progress, skipping")
- return
- if _pool is not None:
- _shutdown_in_progress = True
- _pool.shutdown(wait=wait, cancel_futures=cancel_futures)
- _pool = None
- logger.info("Process pool shut down")
|