scheduling.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. """Scheduling utilities for thread/process priority and CPU affinity.
  2. Provides centralized functions to configure scheduling for:
  3. - Real-time I/O threads (motion control, LED effects) - high priority, CPU 0
  4. - Background workers (preview generation, file parsing) - low priority, CPUs 1-N
  5. """
  6. import os
  7. import sys
  8. import ctypes
  9. import ctypes.util
  10. import logging
  11. from typing import Optional, Set
  12. logger = logging.getLogger(__name__)
  13. # Linux scheduling constants
  14. SCHED_RR = 2
  15. # Cached libc handle (lazy-loaded)
  16. _libc = None
  17. class _SchedParam(ctypes.Structure):
  18. """Linux sched_param structure for real-time scheduling."""
  19. _fields_ = [('sched_priority', ctypes.c_int)]
  20. def _get_libc():
  21. """Get cached libc handle."""
  22. global _libc
  23. if _libc is None:
  24. _libc = ctypes.CDLL(ctypes.util.find_library('c'), use_errno=True)
  25. return _libc
  26. def get_cpu_count() -> int:
  27. """Get available CPU cores."""
  28. return os.cpu_count() or 1
  29. def get_background_cpus() -> Optional[Set[int]]:
  30. """Get CPU set for background work (all except CPU 0).
  31. Returns None on single-core systems.
  32. """
  33. cpu_count = get_cpu_count()
  34. if cpu_count <= 1:
  35. return None
  36. return set(range(1, cpu_count))
  37. def elevate_priority(tid: Optional[int] = None, realtime_priority: int = 50) -> bool:
  38. """Elevate thread/process to real-time priority.
  39. Attempts SCHED_RR (real-time round-robin) first, falls back to nice -10.
  40. Requires CAP_SYS_NICE capability for full real-time scheduling.
  41. Args:
  42. tid: Thread/process ID. If None, uses current thread (0).
  43. realtime_priority: SCHED_RR priority (1-99, default 50).
  44. Returns:
  45. True if any elevation succeeded.
  46. """
  47. if sys.platform != 'linux':
  48. logger.debug("Priority elevation only supported on Linux")
  49. return False
  50. target_id = tid if tid is not None else 0
  51. # Try SCHED_RR (real-time round-robin)
  52. try:
  53. libc = _get_libc()
  54. param = _SchedParam(realtime_priority)
  55. result = libc.sched_setscheduler(target_id, SCHED_RR, ctypes.byref(param))
  56. if result == 0:
  57. logger.info(f"Thread {target_id} set to SCHED_RR priority {realtime_priority}")
  58. return True
  59. else:
  60. errno = ctypes.get_errno()
  61. logger.debug(f"SCHED_RR failed with errno {errno}, trying nice fallback")
  62. except Exception as e:
  63. logger.debug(f"SCHED_RR setup failed: {e}, trying nice fallback")
  64. # Fallback: negative nice value
  65. try:
  66. current_nice = os.nice(0)
  67. if current_nice > -10:
  68. os.nice(-10 - current_nice)
  69. logger.info("Process priority elevated via nice(-10)")
  70. return True
  71. except PermissionError:
  72. logger.info("Priority elevation requires CAP_SYS_NICE capability - using default priority")
  73. except Exception as e:
  74. logger.debug(f"Nice priority elevation failed: {e}")
  75. return False
  76. def lower_priority(nice_value: int = 10) -> bool:
  77. """Lower current thread/process priority for background work.
  78. Args:
  79. nice_value: Target nice value (positive = lower priority).
  80. Returns:
  81. True if priority was lowered.
  82. """
  83. if sys.platform != 'linux':
  84. return False
  85. try:
  86. current_nice = os.nice(0)
  87. if current_nice < nice_value:
  88. os.nice(nice_value - current_nice)
  89. logger.debug(f"Process priority lowered to nice {nice_value}")
  90. return True
  91. except Exception as e:
  92. logger.debug(f"Could not lower priority: {e}")
  93. return False
  94. def pin_to_cpu(cpu_id: int, tid: Optional[int] = None) -> bool:
  95. """Pin thread/process to a specific CPU core.
  96. Args:
  97. cpu_id: CPU core number (0-indexed).
  98. tid: Thread/process ID. If None, uses current (0).
  99. Returns:
  100. True if affinity was set.
  101. """
  102. return pin_to_cpus({cpu_id}, tid)
  103. def pin_to_cpus(cpu_ids: Set[int], tid: Optional[int] = None) -> bool:
  104. """Pin thread/process to multiple CPU cores.
  105. Args:
  106. cpu_ids: Set of CPU core numbers.
  107. tid: Thread/process ID. If None, uses current (0).
  108. Returns:
  109. True if affinity was set.
  110. """
  111. if sys.platform != 'linux':
  112. return False
  113. if not cpu_ids:
  114. return False
  115. target_id = tid if tid is not None else 0
  116. try:
  117. os.sched_setaffinity(target_id, cpu_ids)
  118. cpu_str = ','.join(map(str, sorted(cpu_ids)))
  119. logger.debug(f"Thread {target_id} pinned to CPU(s) {cpu_str}")
  120. return True
  121. except Exception as e:
  122. logger.debug(f"CPU affinity not set: {e}")
  123. return False
  124. def setup_realtime_thread(tid: Optional[int] = None, priority: int = 50) -> None:
  125. """Setup for time-critical I/O threads (motion control, LED effects).
  126. Elevates priority and pins to CPU 0.
  127. Args:
  128. tid: Thread native_id. If None, uses current thread.
  129. priority: SCHED_RR priority (1-99). Higher = more important.
  130. Motion should use higher than LED (e.g., 60 vs 40).
  131. """
  132. cpu_count = get_cpu_count()
  133. # Elevate priority (logs internally on success)
  134. elevate_priority(tid, realtime_priority=priority)
  135. # Pin to CPU 0 if multi-core
  136. if cpu_count > 1:
  137. if pin_to_cpu(0, tid):
  138. logger.info(f"Real-time thread pinned to CPU 0 ({cpu_count} CPUs detected)")
  139. def setup_background_worker() -> None:
  140. """Setup for CPU-intensive background workers.
  141. Lowers priority and pins to CPUs 1-N (avoiding CPU 0).
  142. Called at worker process startup.
  143. """
  144. # Lower priority
  145. lower_priority(10)
  146. # Pin to background CPUs (1-N)
  147. worker_cpus = get_background_cpus()
  148. if worker_cpus:
  149. pin_to_cpus(worker_cpus)
  150. cpu_str = ','.join(map(str, sorted(worker_cpus)))
  151. logger.debug(f"Background worker pinned to CPUs {cpu_str}")