scheduling.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. """Scheduling utilities for thread/process priority and CPU affinity.
  2. Provides centralized functions to configure scheduling for:
  3. - Real-time I/O threads (motion control, LED effects) - high priority, CPU 0
  4. - Background workers (preview generation, file parsing) - low priority, CPUs 1-N
  5. """
  6. import os
  7. import sys
  8. import ctypes
  9. import ctypes.util
  10. import logging
  11. logger = logging.getLogger(__name__)
  12. # Linux scheduling constants
  13. SCHED_RR = 2
  14. # Cached libc handle (lazy-loaded)
  15. _libc = None
  16. class _SchedParam(ctypes.Structure):
  17. """Linux sched_param structure for real-time scheduling."""
  18. _fields_ = [('sched_priority', ctypes.c_int)]
  19. def _get_libc():
  20. """Get cached libc handle."""
  21. global _libc
  22. if _libc is None:
  23. _libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
  24. return _libc
  25. def get_cpu_count() -> int:
  26. """Get available CPU cores."""
  27. return os.cpu_count() or 1
  28. def get_background_cpus() -> set[int] | None:
  29. """Get CPU set for background work (all except CPU 0).
  30. Returns None on single-core systems.
  31. """
  32. cpu_count = get_cpu_count()
  33. if cpu_count <= 1:
  34. return None
  35. return set(range(1, cpu_count))
  36. def elevate_priority(tid: int | None = None, realtime_priority: int = 50) -> bool:
  37. """Elevate thread/process to real-time priority.
  38. Attempts SCHED_RR (real-time round-robin) first, falls back to nice -10.
  39. Requires CAP_SYS_NICE capability for full real-time scheduling.
  40. Args:
  41. tid: Thread/process ID. If None, uses current thread (0).
  42. realtime_priority: SCHED_RR priority (1-99, default 50).
  43. Returns:
  44. True if any elevation succeeded.
  45. """
  46. if sys.platform != 'linux':
  47. logger.debug("Priority elevation only supported on Linux")
  48. return False
  49. target_id = tid if tid is not None else 0
  50. # Try SCHED_RR (real-time round-robin)
  51. try:
  52. libc = _get_libc()
  53. param = _SchedParam(realtime_priority)
  54. result = libc.sched_setscheduler(target_id, SCHED_RR, ctypes.byref(param))
  55. if result == 0:
  56. logger.info(f"Thread {target_id} set to SCHED_RR priority {realtime_priority}")
  57. return True
  58. else:
  59. errno = ctypes.get_errno()
  60. logger.debug(f"SCHED_RR failed with errno {errno}, trying nice fallback")
  61. except Exception as e:
  62. logger.debug(f"SCHED_RR setup failed: {e}, trying nice fallback")
  63. # Fallback: negative nice value
  64. try:
  65. current_nice = os.nice(0)
  66. if current_nice > -10:
  67. os.nice(-10 - current_nice)
  68. logger.info("Process priority elevated via nice(-10)")
  69. return True
  70. except PermissionError:
  71. logger.info("Priority elevation requires CAP_SYS_NICE capability - using default priority")
  72. except Exception as e:
  73. logger.debug(f"Nice priority elevation failed: {e}")
  74. return False
  75. def lower_priority(nice_value: int = 10) -> bool:
  76. """Lower current thread/process priority for background work.
  77. Args:
  78. nice_value: Target nice value (positive = lower priority).
  79. Returns:
  80. True if priority was lowered.
  81. """
  82. if sys.platform != 'linux':
  83. return False
  84. try:
  85. current_nice = os.nice(0)
  86. if current_nice < nice_value:
  87. os.nice(nice_value - current_nice)
  88. logger.debug(f"Process priority lowered to nice {nice_value}")
  89. return True
  90. except Exception as e:
  91. logger.debug(f"Could not lower priority: {e}")
  92. return False
  93. def pin_to_cpu(cpu_id: int, tid: int | None = None) -> bool:
  94. """Pin thread/process to a specific CPU core.
  95. Args:
  96. cpu_id: CPU core number (0-indexed).
  97. tid: Thread/process ID. If None, uses current (0).
  98. Returns:
  99. True if affinity was set.
  100. """
  101. return pin_to_cpus({cpu_id}, tid)
  102. def pin_to_cpus(cpu_ids: set[int], tid: int | None = None) -> bool:
  103. """Pin thread/process to multiple CPU cores.
  104. Args:
  105. cpu_ids: Set of CPU core numbers.
  106. tid: Thread/process ID. If None, uses current (0).
  107. Returns:
  108. True if affinity was set.
  109. """
  110. if sys.platform != 'linux':
  111. return False
  112. if not cpu_ids:
  113. return False
  114. target_id = tid if tid is not None else 0
  115. try:
  116. os.sched_setaffinity(target_id, cpu_ids)
  117. cpu_str = ','.join(map(str, sorted(cpu_ids)))
  118. logger.debug(f"Thread {target_id} pinned to CPU(s) {cpu_str}")
  119. return True
  120. except Exception as e:
  121. logger.debug(f"CPU affinity not set: {e}")
  122. return False
  123. def setup_realtime_thread(tid: int | None = None, priority: int = 50) -> None:
  124. """Setup for time-critical I/O threads (motion control, LED effects).
  125. Elevates priority and pins to CPU 0.
  126. Args:
  127. tid: Thread native_id. If None, uses current thread.
  128. priority: SCHED_RR priority (1-99). Higher = more important.
  129. Motion should use higher than LED (e.g., 60 vs 40).
  130. """
  131. cpu_count = get_cpu_count()
  132. # Elevate priority (logs internally on success)
  133. elevate_priority(tid, realtime_priority=priority)
  134. # Pin to CPU 0 if multi-core
  135. if cpu_count > 1:
  136. if pin_to_cpu(0, tid):
  137. logger.info(f"Real-time thread pinned to CPU 0 ({cpu_count} CPUs detected)")
  138. def setup_background_worker() -> None:
  139. """Setup for CPU-intensive background workers.
  140. Lowers priority and pins to CPUs 1-N (avoiding CPU 0).
  141. Called at worker process startup.
  142. """
  143. # Lower priority
  144. lower_priority(10)
  145. # Pin to background CPUs (1-N)
  146. worker_cpus = get_background_cpus()
  147. if worker_cpus:
  148. pin_to_cpus(worker_cpus)
  149. cpu_str = ','.join(map(str, sorted(worker_cpus)))
  150. logger.debug(f"Background worker pinned to CPUs {cpu_str}")