process_pool.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. """Shared process pool for CPU-intensive tasks.
  2. Provides a single ProcessPoolExecutor shared across modules to:
  3. - Isolate CPU-intensive work from real-time threads (separate GILs)
  4. - Manage worker count based on available CPUs
  5. - Configure CPU affinity to keep workers off CPU 0 (reserved for motion/LED)
  6. Environment variables:
  7. - POOL_WORKERS: Override worker count (default: 1 for RAM conservation)
  8. """
  9. import logging
  10. import os
  11. from concurrent.futures import ProcessPoolExecutor
  12. from typing import Optional
  13. from modules.core import scheduling
  14. logger = logging.getLogger(__name__)
  15. _pool: Optional[ProcessPoolExecutor] = None
  16. _shutdown_in_progress: bool = False
  17. # Default to 1 worker to conserve RAM on low-memory devices (Pi Zero 2 W has only 512MB)
  18. DEFAULT_WORKERS = 1
  19. def _get_worker_count() -> int:
  20. """Calculate worker count for the process pool.
  21. Uses POOL_WORKERS env var if set, otherwise defaults to 1 worker
  22. to conserve RAM on memory-constrained devices like Pi Zero 2 W.
  23. For systems with more RAM, set POOL_WORKERS=2 or POOL_WORKERS=3.
  24. """
  25. env_workers = os.environ.get('POOL_WORKERS')
  26. if env_workers is not None:
  27. try:
  28. count = int(env_workers)
  29. if count >= 0:
  30. return count
  31. logger.warning(f"Invalid POOL_WORKERS={env_workers}, using default")
  32. except ValueError:
  33. logger.warning(f"Invalid POOL_WORKERS={env_workers}, using default")
  34. return DEFAULT_WORKERS
  35. def setup_worker_process():
  36. """Configure worker process (called at worker startup).
  37. Sets CPU affinity and lowers priority.
  38. """
  39. scheduling.setup_background_worker()
  40. def init_pool() -> ProcessPoolExecutor:
  41. """Initialize the shared process pool."""
  42. global _pool
  43. if _pool is not None:
  44. return _pool
  45. worker_count = _get_worker_count()
  46. cpu_count = scheduling.get_cpu_count()
  47. _pool = ProcessPoolExecutor(
  48. max_workers=worker_count,
  49. initializer=setup_worker_process
  50. )
  51. logger.info(f"Process pool initialized: {worker_count} workers, {cpu_count} CPUs")
  52. return _pool
  53. def get_pool() -> ProcessPoolExecutor:
  54. """Get the shared process pool (must be initialized first)."""
  55. if _pool is None:
  56. raise RuntimeError("Process pool not initialized - call init_pool() first")
  57. return _pool
  58. def shutdown_pool(wait: bool = True, cancel_futures: bool = False):
  59. """Shutdown the process pool.
  60. Args:
  61. wait: If True, wait for workers to finish current tasks before shutdown.
  62. This allows workers to properly release semaphores.
  63. cancel_futures: If True, cancel pending futures. Use with caution as this
  64. can cause semaphore leaks if wait=False.
  65. Note: Always use wait=True to prevent semaphore leaks. The wait=False option
  66. exists only for emergency shutdown scenarios.
  67. """
  68. global _pool, _shutdown_in_progress
  69. # Prevent concurrent shutdown calls (race condition between signal handler and lifespan)
  70. if _shutdown_in_progress:
  71. logger.debug("Pool shutdown already in progress, skipping")
  72. return
  73. if _pool is not None:
  74. _shutdown_in_progress = True
  75. _pool.shutdown(wait=wait, cancel_futures=cancel_futures)
  76. _pool = None
  77. logger.info("Process pool shut down")