process_pool.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. """Shared process pool for CPU-intensive tasks.
  2. Provides a single ProcessPoolExecutor shared across modules to:
  3. - Isolate CPU-intensive work from real-time threads (separate GILs)
  4. - Manage worker count based on available CPUs
  5. - Configure CPU affinity to keep workers off CPU 0 (reserved for motion/LED)
  6. Environment variables:
  7. - POOL_WORKERS: Override worker count (default: 3)
  8. """
  9. import logging
  10. import os
  11. from concurrent.futures import ProcessPoolExecutor
  12. from typing import Optional
  13. from modules.core import scheduling
  14. logger = logging.getLogger(__name__)
  15. _pool: Optional[ProcessPoolExecutor] = None
  16. _shutdown_in_progress: bool = False
  17. # Default to 3 workers for parallel processing
  18. DEFAULT_WORKERS = 3
  19. def _get_worker_count() -> int:
  20. """Calculate worker count for the process pool.
  21. Uses POOL_WORKERS env var if set, otherwise defaults to 3 workers.
  22. For memory-constrained devices (Pi Zero 2 W), set POOL_WORKERS=1.
  23. """
  24. env_workers = os.environ.get('POOL_WORKERS')
  25. if env_workers is not None:
  26. try:
  27. count = int(env_workers)
  28. if count >= 0:
  29. return count
  30. logger.warning(f"Invalid POOL_WORKERS={env_workers}, using default")
  31. except ValueError:
  32. logger.warning(f"Invalid POOL_WORKERS={env_workers}, using default")
  33. return DEFAULT_WORKERS
  34. def setup_worker_process():
  35. """Configure worker process (called at worker startup).
  36. Sets CPU affinity and lowers priority.
  37. """
  38. scheduling.setup_background_worker()
  39. def init_pool() -> ProcessPoolExecutor:
  40. """Initialize the shared process pool."""
  41. global _pool
  42. if _pool is not None:
  43. return _pool
  44. worker_count = _get_worker_count()
  45. cpu_count = scheduling.get_cpu_count()
  46. _pool = ProcessPoolExecutor(
  47. max_workers=worker_count,
  48. initializer=setup_worker_process
  49. )
  50. logger.info(f"Process pool initialized: {worker_count} workers, {cpu_count} CPUs")
  51. return _pool
  52. def get_pool() -> ProcessPoolExecutor:
  53. """Get the shared process pool (must be initialized first)."""
  54. if _pool is None:
  55. raise RuntimeError("Process pool not initialized - call init_pool() first")
  56. return _pool
  57. def shutdown_pool(wait: bool = True, cancel_futures: bool = False):
  58. """Shutdown the process pool.
  59. Args:
  60. wait: If True, wait for workers to finish current tasks before shutdown.
  61. This allows workers to properly release semaphores.
  62. cancel_futures: If True, cancel pending futures. Use with caution as this
  63. can cause semaphore leaks if wait=False.
  64. Note: Always use wait=True to prevent semaphore leaks. The wait=False option
  65. exists only for emergency shutdown scenarios.
  66. """
  67. global _pool, _shutdown_in_progress
  68. # Prevent concurrent shutdown calls (race condition between signal handler and lifespan)
  69. if _shutdown_in_progress:
  70. logger.debug("Pool shutdown already in progress, skipping")
  71. return
  72. if _pool is not None:
  73. _shutdown_in_progress = True
  74. _pool.shutdown(wait=wait, cancel_futures=cancel_futures)
  75. _pool = None
  76. logger.info("Process pool shut down")