process_pool.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. """Shared process pool for CPU-intensive tasks.
  2. Provides a single ProcessPoolExecutor shared across modules to:
  3. - Isolate CPU-intensive work from real-time threads (separate GILs)
  4. - Manage worker count based on available CPUs
  5. - Configure CPU affinity to keep workers off CPU 0 (reserved for motion/LED)
  6. """
  7. import logging
  8. from concurrent.futures import ProcessPoolExecutor
  9. from modules.core import scheduling
  10. logger = logging.getLogger(__name__)
  11. _pool: ProcessPoolExecutor | None = None
  12. def _get_worker_count() -> int:
  13. """Calculate optimal worker count.
  14. - Reserve 1 CPU for motion control thread
  15. - Max 3 workers (diminishing returns beyond)
  16. - Min 1 worker
  17. """
  18. return min(3, max(1, scheduling.get_cpu_count() - 1))
  19. def setup_worker_process():
  20. """Configure worker process (called at worker startup).
  21. Sets CPU affinity and lowers priority.
  22. """
  23. scheduling.setup_background_worker()
  24. def init_pool() -> ProcessPoolExecutor:
  25. """Initialize the shared process pool."""
  26. global _pool
  27. if _pool is not None:
  28. return _pool
  29. worker_count = _get_worker_count()
  30. cpu_count = scheduling.get_cpu_count()
  31. _pool = ProcessPoolExecutor(
  32. max_workers=worker_count,
  33. initializer=setup_worker_process
  34. )
  35. logger.info(f"Process pool initialized: {worker_count} workers, {cpu_count} CPUs")
  36. return _pool
  37. def get_pool() -> ProcessPoolExecutor:
  38. """Get the shared process pool (must be initialized first)."""
  39. if _pool is None:
  40. raise RuntimeError("Process pool not initialized - call init_pool() first")
  41. return _pool
  42. def shutdown_pool(wait: bool = True, cancel_futures: bool = False):
  43. """Shutdown the process pool."""
  44. global _pool
  45. if _pool is not None:
  46. _pool.shutdown(wait=wait, cancel_futures=cancel_futures)
  47. _pool = None
  48. logger.info("Process pool shut down")