test_playback_controls.py 20 KB


  1. """
  2. Integration tests for playback controls.
  3. These tests verify pause, resume, stop, skip, and speed control functionality
  4. with real hardware connected.
  5. Run with: pytest tests/integration/test_playback_controls.py --run-hardware -v
  6. """
  7. import pytest
  8. import time
  9. import threading
  10. import os
  11. def start_pattern_async(client, file_name="star.thr"):
  12. """Helper to start a pattern in a background thread.
  13. Returns the thread so caller can join() it after stopping.
  14. """
  15. def run():
  16. client.post("/run_theta_rho", json={"file_name": file_name})
  17. thread = threading.Thread(target=run)
  18. thread.start()
  19. return thread
  20. def stop_pattern(client):
  21. """Helper to stop pattern execution.
  22. Uses force_stop which doesn't wait for locks (avoids event loop issues in tests).
  23. """
  24. response = client.post("/force_stop")
  25. return response
  26. @pytest.mark.hardware
  27. @pytest.mark.slow
  28. class TestPauseResume:
  29. """Tests for pause and resume functionality."""
  30. def test_pause_during_pattern(self, hardware_port, run_hardware):
  31. """Test pausing execution mid-pattern.
  32. Verifies:
  33. 1. Pattern starts executing
  34. 2. Pause request is acknowledged
  35. 3. Ball actually stops moving
  36. """
  37. if not run_hardware:
  38. pytest.skip("Hardware tests disabled")
  39. from modules.connection import connection_manager
  40. from modules.core.state import state
  41. from fastapi.testclient import TestClient
  42. from main import app
  43. conn = connection_manager.SerialConnection(hardware_port)
  44. state.conn = conn
  45. try:
  46. client = TestClient(app)
  47. # Start pattern in background
  48. pattern_thread = start_pattern_async(client, "star.thr")
  49. # Wait for pattern to start
  50. time.sleep(3)
  51. assert state.current_playing_file is not None, "Pattern should be running"
  52. print(f"Pattern running: {state.current_playing_file}")
  53. # Record position before pause
  54. pos_before = (state.current_theta, state.current_rho)
  55. # Pause execution
  56. response = client.post("/pause_execution")
  57. assert response.status_code == 200, f"Pause failed: {response.text}"
  58. assert state.pause_requested, "pause_requested should be True"
  59. # Wait and check ball stopped
  60. time.sleep(1)
  61. pos_after = (state.current_theta, state.current_rho)
  62. theta_diff = abs(pos_after[0] - pos_before[0])
  63. rho_diff = abs(pos_after[1] - pos_before[1])
  64. print(f"Position change during pause: theta={theta_diff:.4f}, rho={rho_diff:.4f}")
  65. # Allow small tolerance for deceleration
  66. assert theta_diff < 0.5, f"Theta changed too much while paused: {theta_diff}"
  67. assert rho_diff < 0.1, f"Rho changed too much while paused: {rho_diff}"
  68. # Clean up
  69. stop_pattern(client)
  70. pattern_thread.join(timeout=5)
  71. finally:
  72. conn.close()
  73. state.conn = None
  74. def test_resume_after_pause(self, hardware_port, run_hardware):
  75. """Test resuming execution after pause.
  76. Verifies:
  77. 1. Pattern can be paused
  78. 2. Resume causes movement to continue
  79. 3. Position changes after resume
  80. """
  81. if not run_hardware:
  82. pytest.skip("Hardware tests disabled")
  83. from modules.connection import connection_manager
  84. from modules.core.state import state
  85. from fastapi.testclient import TestClient
  86. from main import app
  87. conn = connection_manager.SerialConnection(hardware_port)
  88. state.conn = conn
  89. try:
  90. client = TestClient(app)
  91. # Start pattern
  92. pattern_thread = start_pattern_async(client, "star.thr")
  93. # Wait for pattern to actually start executing (not just queued)
  94. # Check that position has changed from initial, indicating movement
  95. initial_pos = (state.current_theta, state.current_rho)
  96. max_wait = 10 # seconds
  97. started = False
  98. for _ in range(max_wait * 2): # Check every 0.5s
  99. time.sleep(0.5)
  100. if state.current_playing_file is not None:
  101. current_pos = (state.current_theta, state.current_rho)
  102. # Check if position changed (pattern actually moving)
  103. if (abs(current_pos[0] - initial_pos[0]) > 0.01 or
  104. abs(current_pos[1] - initial_pos[1]) > 0.01):
  105. started = True
  106. print(f"Pattern started moving: theta={current_pos[0]:.3f}, rho={current_pos[1]:.3f}")
  107. break
  108. assert started, "Pattern should start moving within timeout"
  109. # Pause
  110. client.post("/pause_execution")
  111. time.sleep(1) # Wait for pause to take effect
  112. pos_paused = (state.current_theta, state.current_rho)
  113. print(f"Position when paused: theta={pos_paused[0]:.4f}, rho={pos_paused[1]:.4f}")
  114. # Resume
  115. response = client.post("/resume_execution")
  116. assert response.status_code == 200, f"Resume failed: {response.text}"
  117. assert not state.pause_requested, "pause_requested should be False after resume"
  118. # Wait for movement after resume
  119. time.sleep(3)
  120. pos_resumed = (state.current_theta, state.current_rho)
  121. theta_diff = abs(pos_resumed[0] - pos_paused[0])
  122. rho_diff = abs(pos_resumed[1] - pos_paused[1])
  123. print(f"Position after resume: theta={pos_resumed[0]:.4f}, rho={pos_resumed[1]:.4f}")
  124. print(f"Position change after resume: theta={theta_diff:.4f}, rho={rho_diff:.4f}")
  125. assert theta_diff > 0.1 or rho_diff > 0.05, "Position should change after resume"
  126. # Clean up
  127. stop_pattern(client)
  128. pattern_thread.join(timeout=5)
  129. finally:
  130. conn.close()
  131. state.conn = None
  132. @pytest.mark.hardware
  133. @pytest.mark.slow
  134. class TestStop:
  135. """Tests for stop functionality."""
  136. def test_stop_during_pattern(self, hardware_port, run_hardware):
  137. """Test stopping execution mid-pattern.
  138. Verifies:
  139. 1. Stop clears current_playing_file
  140. 2. Pattern execution actually stops
  141. Note: Uses force_stop in test environment because regular stop_execution
  142. has asyncio lock issues with TestClient's event loop handling.
  143. """
  144. if not run_hardware:
  145. pytest.skip("Hardware tests disabled")
  146. from modules.connection import connection_manager
  147. from modules.core.state import state
  148. from fastapi.testclient import TestClient
  149. from main import app
  150. conn = connection_manager.SerialConnection(hardware_port)
  151. state.conn = conn
  152. try:
  153. client = TestClient(app)
  154. # Start pattern
  155. pattern_thread = start_pattern_async(client, "star.thr")
  156. time.sleep(3)
  157. assert state.current_playing_file is not None, "Pattern should be running"
  158. # Stop execution (use force_stop for test reliability)
  159. response = stop_pattern(client)
  160. assert response.status_code == 200, f"Stop failed: {response.text}"
  161. # Verify stopped
  162. time.sleep(0.5)
  163. assert state.current_playing_file is None, "current_playing_file should be None"
  164. print("Stop completed successfully")
  165. pattern_thread.join(timeout=5)
  166. finally:
  167. conn.close()
  168. state.conn = None
  169. def test_force_stop(self, hardware_port, run_hardware):
  170. """Test force stop clears all state."""
  171. if not run_hardware:
  172. pytest.skip("Hardware tests disabled")
  173. from modules.connection import connection_manager
  174. from modules.core.state import state
  175. from fastapi.testclient import TestClient
  176. from main import app
  177. conn = connection_manager.SerialConnection(hardware_port)
  178. state.conn = conn
  179. try:
  180. client = TestClient(app)
  181. # Start pattern
  182. pattern_thread = start_pattern_async(client, "star.thr")
  183. time.sleep(3)
  184. # Force stop via API
  185. response = client.post("/force_stop")
  186. assert response.status_code == 200, f"Force stop failed: {response.text}"
  187. time.sleep(0.5)
  188. # Verify all state cleared
  189. assert state.current_playing_file is None
  190. assert state.current_playlist is None
  191. print("Force stop completed successfully")
  192. pattern_thread.join(timeout=5)
  193. finally:
  194. conn.close()
  195. state.conn = None
  196. def test_pause_then_stop(self, hardware_port, run_hardware):
  197. """Test that stop works while paused."""
  198. if not run_hardware:
  199. pytest.skip("Hardware tests disabled")
  200. from modules.connection import connection_manager
  201. from modules.core.state import state
  202. from fastapi.testclient import TestClient
  203. from main import app
  204. conn = connection_manager.SerialConnection(hardware_port)
  205. state.conn = conn
  206. try:
  207. client = TestClient(app)
  208. # Start pattern
  209. pattern_thread = start_pattern_async(client, "star.thr")
  210. time.sleep(3)
  211. # Pause first
  212. client.post("/pause_execution")
  213. time.sleep(0.5)
  214. assert state.pause_requested, "Should be paused"
  215. # Now stop while paused
  216. response = stop_pattern(client)
  217. assert response.status_code == 200, f"Stop while paused failed: {response.text}"
  218. assert state.current_playing_file is None, "Pattern should be stopped"
  219. print("Stop while paused completed successfully")
  220. pattern_thread.join(timeout=5)
  221. finally:
  222. conn.close()
  223. state.conn = None
  224. @pytest.mark.hardware
  225. @pytest.mark.slow
  226. class TestSpeedControl:
  227. """Tests for speed control functionality."""
  228. def test_set_speed_during_playback(self, hardware_port, run_hardware):
  229. """Test changing speed during pattern execution."""
  230. if not run_hardware:
  231. pytest.skip("Hardware tests disabled")
  232. from modules.connection import connection_manager
  233. from modules.core.state import state
  234. from fastapi.testclient import TestClient
  235. from main import app
  236. conn = connection_manager.SerialConnection(hardware_port)
  237. state.conn = conn
  238. try:
  239. client = TestClient(app)
  240. original_speed = state.speed
  241. # Start pattern
  242. pattern_thread = start_pattern_async(client, "star.thr")
  243. time.sleep(3)
  244. # Change speed via API
  245. new_speed = 150
  246. response = client.post("/set_speed", json={"speed": new_speed})
  247. assert response.status_code == 200, f"Set speed failed: {response.text}"
  248. assert state.speed == new_speed, "Speed should be updated"
  249. print(f"Speed changed from {original_speed} to {new_speed}")
  250. # Let it run at new speed briefly
  251. time.sleep(2)
  252. # Clean up
  253. stop_pattern(client)
  254. pattern_thread.join(timeout=5)
  255. finally:
  256. conn.close()
  257. state.conn = None
  258. def test_speed_bounds(self, hardware_port, run_hardware):
  259. """Test that invalid speed values are rejected."""
  260. if not run_hardware:
  261. pytest.skip("Hardware tests disabled")
  262. from modules.connection import connection_manager
  263. from modules.core.state import state
  264. from fastapi.testclient import TestClient
  265. from main import app
  266. conn = connection_manager.SerialConnection(hardware_port)
  267. state.conn = conn
  268. try:
  269. client = TestClient(app)
  270. original_speed = state.speed
  271. # Valid speeds should work
  272. response = client.post("/set_speed", json={"speed": 50})
  273. assert response.status_code == 200
  274. response = client.post("/set_speed", json={"speed": 200})
  275. assert response.status_code == 200
  276. # Invalid speed (0 or negative) should fail
  277. response = client.post("/set_speed", json={"speed": 0})
  278. assert response.status_code == 400, "Speed 0 should be rejected"
  279. response = client.post("/set_speed", json={"speed": -10})
  280. assert response.status_code == 400, "Negative speed should be rejected"
  281. # Restore
  282. client.post("/set_speed", json={"speed": original_speed})
  283. finally:
  284. conn.close()
  285. state.conn = None
  286. def test_change_speed_while_paused(self, hardware_port, run_hardware):
  287. """Test changing speed while paused, then resuming."""
  288. if not run_hardware:
  289. pytest.skip("Hardware tests disabled")
  290. from modules.connection import connection_manager
  291. from modules.core.state import state
  292. from fastapi.testclient import TestClient
  293. from main import app
  294. conn = connection_manager.SerialConnection(hardware_port)
  295. state.conn = conn
  296. try:
  297. client = TestClient(app)
  298. original_speed = state.speed
  299. # Start pattern
  300. pattern_thread = start_pattern_async(client, "star.thr")
  301. time.sleep(3)
  302. # Pause
  303. client.post("/pause_execution")
  304. time.sleep(0.5)
  305. # Change speed while paused
  306. new_speed = 180
  307. response = client.post("/set_speed", json={"speed": new_speed})
  308. assert response.status_code == 200
  309. print(f"Speed changed to {new_speed} while paused")
  310. # Resume
  311. client.post("/resume_execution")
  312. time.sleep(2)
  313. # Verify speed persisted
  314. assert state.speed == new_speed, "Speed should persist after resume"
  315. # Clean up
  316. stop_pattern(client)
  317. pattern_thread.join(timeout=5)
  318. # Restore original speed
  319. state.speed = original_speed
  320. finally:
  321. conn.close()
  322. state.conn = None
  323. @pytest.mark.hardware
  324. @pytest.mark.slow
  325. class TestSkip:
  326. """Tests for skip pattern functionality."""
  327. def test_skip_pattern_in_playlist(self, hardware_port, run_hardware):
  328. """Test skipping to next pattern in playlist."""
  329. if not run_hardware:
  330. pytest.skip("Hardware tests disabled")
  331. from modules.connection import connection_manager
  332. from modules.core import playlist_manager
  333. from modules.core.state import state
  334. from fastapi.testclient import TestClient
  335. from main import app
  336. conn = connection_manager.SerialConnection(hardware_port)
  337. state.conn = conn
  338. try:
  339. client = TestClient(app)
  340. # Create test playlist with 2 patterns
  341. test_playlist_name = "_test_skip_playlist"
  342. patterns = ["star.thr", "circle_normalized.thr"]
  343. existing_patterns = [p for p in patterns if os.path.exists(f"./patterns/{p}")]
  344. if len(existing_patterns) < 2:
  345. pytest.skip("Need at least 2 patterns for skip test")
  346. playlist_manager.create_playlist(test_playlist_name, existing_patterns)
  347. try:
  348. # Run playlist in background
  349. def run_playlist():
  350. client.post("/run_playlist", json={
  351. "playlist_name": test_playlist_name,
  352. "pause_time": 0,
  353. "run_mode": "single"
  354. })
  355. playlist_thread = threading.Thread(target=run_playlist)
  356. playlist_thread.start()
  357. # Wait for first pattern to start
  358. time.sleep(3)
  359. first_pattern = state.current_playing_file
  360. print(f"First pattern: {first_pattern}")
  361. assert first_pattern is not None
  362. # Skip to next pattern
  363. response = client.post("/skip_pattern")
  364. assert response.status_code == 200, f"Skip failed: {response.text}"
  365. # Wait for skip to process
  366. time.sleep(3)
  367. second_pattern = state.current_playing_file
  368. print(f"After skip: {second_pattern}")
  369. # Pattern should have changed (or playlist ended)
  370. if second_pattern is not None:
  371. assert second_pattern != first_pattern or state.current_playlist_index > 0
  372. # Clean up
  373. stop_pattern(client)
  374. playlist_thread.join(timeout=5)
  375. finally:
  376. playlist_manager.delete_playlist(test_playlist_name)
  377. finally:
  378. conn.close()
  379. state.conn = None
  380. def test_skip_while_paused(self, hardware_port, run_hardware):
  381. """Test that skip works while paused."""
  382. if not run_hardware:
  383. pytest.skip("Hardware tests disabled")
  384. from modules.connection import connection_manager
  385. from modules.core import playlist_manager
  386. from modules.core.state import state
  387. from fastapi.testclient import TestClient
  388. from main import app
  389. conn = connection_manager.SerialConnection(hardware_port)
  390. state.conn = conn
  391. try:
  392. client = TestClient(app)
  393. # Create test playlist
  394. test_playlist_name = "_test_skip_paused"
  395. patterns = ["star.thr", "circle_normalized.thr"]
  396. existing_patterns = [p for p in patterns if os.path.exists(f"./patterns/{p}")]
  397. if len(existing_patterns) < 2:
  398. pytest.skip("Need at least 2 patterns")
  399. playlist_manager.create_playlist(test_playlist_name, existing_patterns)
  400. try:
  401. # Run playlist
  402. def run_playlist():
  403. client.post("/run_playlist", json={
  404. "playlist_name": test_playlist_name,
  405. "run_mode": "single"
  406. })
  407. playlist_thread = threading.Thread(target=run_playlist)
  408. playlist_thread.start()
  409. time.sleep(3)
  410. # Pause
  411. client.post("/pause_execution")
  412. time.sleep(0.5)
  413. assert state.pause_requested
  414. first_pattern = state.current_playing_file
  415. # Skip while paused
  416. response = client.post("/skip_pattern")
  417. assert response.status_code == 200
  418. # Resume to allow skip to process
  419. client.post("/resume_execution")
  420. time.sleep(3)
  421. print(f"Skipped from {first_pattern} while paused")
  422. # Clean up
  423. stop_pattern(client)
  424. playlist_thread.join(timeout=5)
  425. finally:
  426. playlist_manager.delete_playlist(test_playlist_name)
  427. finally:
  428. conn.close()
  429. state.conn = None