1
0

process_pool.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. """Shared process pool for CPU-intensive tasks.
  2. Provides a single ProcessPoolExecutor shared across modules to:
  3. - Isolate CPU-intensive work from real-time threads (separate GILs)
  4. - Manage worker count based on available CPUs
  5. - Configure CPU affinity to keep workers off CPU 0 (reserved for motion/LED)
  6. """
  7. import logging
  8. from concurrent.futures import ProcessPoolExecutor
  9. from typing import Optional
  10. from modules.core import scheduling
  11. logger = logging.getLogger(__name__)
  12. _pool: Optional[ProcessPoolExecutor] = None
  13. def _get_worker_count() -> int:
  14. """Calculate optimal worker count.
  15. - Reserve 1 CPU for motion control thread
  16. - Max 3 workers (diminishing returns beyond)
  17. - Min 1 worker
  18. """
  19. return min(3, max(1, scheduling.get_cpu_count() - 1))
  20. def setup_worker_process():
  21. """Configure worker process (called at worker startup).
  22. Sets CPU affinity and lowers priority.
  23. """
  24. scheduling.setup_background_worker()
  25. def init_pool() -> ProcessPoolExecutor:
  26. """Initialize the shared process pool."""
  27. global _pool
  28. if _pool is not None:
  29. return _pool
  30. worker_count = _get_worker_count()
  31. cpu_count = scheduling.get_cpu_count()
  32. _pool = ProcessPoolExecutor(
  33. max_workers=worker_count,
  34. initializer=setup_worker_process
  35. )
  36. logger.info(f"Process pool initialized: {worker_count} workers, {cpu_count} CPUs")
  37. return _pool
  38. def get_pool() -> ProcessPoolExecutor:
  39. """Get the shared process pool (must be initialized first)."""
  40. if _pool is None:
  41. raise RuntimeError("Process pool not initialized - call init_pool() first")
  42. return _pool
  43. def shutdown_pool(wait: bool = True, cancel_futures: bool = False):
  44. """Shutdown the process pool."""
  45. global _pool
  46. if _pool is not None:
  47. _pool.shutdown(wait=wait, cancel_futures=cancel_futures)
  48. _pool = None
  49. logger.info("Process pool shut down")