process_pool.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. """Shared process pool for CPU-intensive tasks.
  2. Provides a single ProcessPoolExecutor shared across modules to:
  3. - Isolate CPU-intensive work from real-time threads (separate GILs)
  4. - Manage worker count based on available CPUs
  5. - Configure CPU affinity to keep workers off CPU 0 (reserved for motion/LED)
  6. """
  7. import logging
  8. from concurrent.futures import ProcessPoolExecutor
  9. from typing import Optional
  10. from modules.core import scheduling
  11. logger = logging.getLogger(__name__)
  12. _pool: Optional[ProcessPoolExecutor] = None
  13. _shutdown_in_progress: bool = False
  14. def _get_worker_count() -> int:
  15. """Calculate optimal worker count.
  16. - Reserve 1 CPU for motion control thread
  17. - Max 3 workers (diminishing returns beyond)
  18. - Min 1 worker
  19. """
  20. return min(3, max(1, scheduling.get_cpu_count() - 1))
  21. def setup_worker_process():
  22. """Configure worker process (called at worker startup).
  23. Sets CPU affinity and lowers priority.
  24. """
  25. scheduling.setup_background_worker()
  26. def init_pool() -> ProcessPoolExecutor:
  27. """Initialize the shared process pool."""
  28. global _pool
  29. if _pool is not None:
  30. return _pool
  31. worker_count = _get_worker_count()
  32. cpu_count = scheduling.get_cpu_count()
  33. _pool = ProcessPoolExecutor(
  34. max_workers=worker_count,
  35. initializer=setup_worker_process
  36. )
  37. logger.info(f"Process pool initialized: {worker_count} workers, {cpu_count} CPUs")
  38. return _pool
  39. def get_pool() -> ProcessPoolExecutor:
  40. """Get the shared process pool (must be initialized first)."""
  41. if _pool is None:
  42. raise RuntimeError("Process pool not initialized - call init_pool() first")
  43. return _pool
  44. def shutdown_pool(wait: bool = True, cancel_futures: bool = False):
  45. """Shutdown the process pool.
  46. Args:
  47. wait: If True, wait for workers to finish current tasks before shutdown.
  48. This allows workers to properly release semaphores.
  49. cancel_futures: If True, cancel pending futures. Use with caution as this
  50. can cause semaphore leaks if wait=False.
  51. Note: Always use wait=True to prevent semaphore leaks. The wait=False option
  52. exists only for emergency shutdown scenarios.
  53. """
  54. global _pool, _shutdown_in_progress
  55. # Prevent concurrent shutdown calls (race condition between signal handler and lifespan)
  56. if _shutdown_in_progress:
  57. logger.debug("Pool shutdown already in progress, skipping")
  58. return
  59. if _pool is not None:
  60. _shutdown_in_progress = True
  61. _pool.shutdown(wait=wait, cancel_futures=cancel_futures)
  62. _pool = None
  63. logger.info("Process pool shut down")