test_hardware.py 16 KB


  1. """
  2. Integration tests for hardware communication.
  3. These tests require real hardware to be connected and are skipped by default.
  4. Run with: pytest tests/integration/ --run-hardware
  5. All tests in this file are marked with @pytest.mark.hardware and will
  6. be automatically skipped in CI environments (when CI=true).
  7. Test order matters for some tests - they build on each other:
  8. 1. test_homing_sequence - Homes the table (required first)
  9. 2. test_move_to_perimeter - Moves ball to edge
  10. 3. test_move_to_center - Moves ball to center
  11. 4. test_execute_star_pattern - Runs a full pattern
  12. """
  13. import pytest
  14. import time
  15. import os
  16. import json
  17. import asyncio
  18. @pytest.mark.hardware
  19. class TestSerialConnection:
  20. """Tests for real serial connection to sand table hardware."""
  21. def test_serial_port_opens(self, serial_connection):
  22. """Test that we can open a serial connection to the hardware."""
  23. assert serial_connection.is_open
  24. assert serial_connection.baudrate == 115200
  25. def test_grbl_status_query(self, serial_connection):
  26. """Test querying GRBL status with '?' command.
  27. GRBL responds with a status string like:
  28. <Idle|MPos:0.000,0.000,0.000|Bf:15,128>
  29. <Run|MPos:10.000,5.000,0.000|Bf:15,128>
  30. <Hold|WPos:0.000,0.000,0.000|Bf:15,128>
  31. Note: Table may be in any state (Idle, Run, Hold, Alarm, etc.)
  32. """
  33. # Clear any stale data
  34. serial_connection.reset_input_buffer()
  35. # Send status query
  36. serial_connection.write(b'?')
  37. serial_connection.flush()
  38. # Wait for response
  39. time.sleep(0.1)
  40. response = serial_connection.readline().decode().strip()
  41. # GRBL status starts with '<' and contains position info
  42. # Don't assume Idle - table could be in Run, Hold, Alarm, etc.
  43. assert response.startswith('<'), f"Expected GRBL status starting with '<', got: {response}"
  44. assert 'Pos:' in response, f"Expected position data (MPos or WPos) in: {response}"
  45. assert '>' in response, f"Expected closing '>' in status: {response}"
  46. def test_grbl_settings_query(self, serial_connection):
  47. """Test querying GRBL settings with '$$' command.
  48. GRBL should respond with settings like:
  49. $0=10
  50. $1=25
  51. ...
  52. ok
  53. """
  54. # Clear any stale data
  55. serial_connection.reset_input_buffer()
  56. # Send settings query
  57. serial_connection.write(b'$$\n')
  58. serial_connection.flush()
  59. # Collect all response lines
  60. responses = []
  61. timeout = time.time() + 2 # 2 second timeout
  62. while time.time() < timeout:
  63. if serial_connection.in_waiting:
  64. line = serial_connection.readline().decode().strip()
  65. responses.append(line)
  66. if line == 'ok':
  67. break
  68. time.sleep(0.01)
  69. # Should have received settings
  70. assert len(responses) > 1, "Expected GRBL settings response"
  71. assert responses[-1] == 'ok', f"Expected 'ok' at end, got: {responses[-1]}"
  72. # At least some settings should start with '$'
  73. settings = [r for r in responses if r.startswith('$')]
  74. assert len(settings) > 0, "Expected at least one setting line"
  75. @pytest.mark.hardware
  76. class TestConnectionManager:
  77. """Integration tests for the connection_manager module with real hardware."""
  78. def test_list_serial_ports_finds_hardware(self, available_serial_ports, run_hardware):
  79. """Test that list_serial_ports finds the connected hardware."""
  80. if not run_hardware:
  81. pytest.skip("Hardware tests disabled")
  82. from modules.connection import connection_manager
  83. ports = connection_manager.list_serial_ports()
  84. # Should find at least one port
  85. assert len(ports) > 0, "Expected to find at least one serial port"
  86. # Should match what we found independently
  87. for port in available_serial_ports:
  88. if 'usb' in port.lower() or 'tty' in port.lower():
  89. assert port in ports or any(port in p for p in ports)
  90. def test_serial_connection_class(self, hardware_port, run_hardware):
  91. """Test SerialConnection class with real hardware.
  92. This tests the actual SerialConnection wrapper from connection_manager.
  93. """
  94. if not run_hardware:
  95. pytest.skip("Hardware tests disabled")
  96. from modules.connection.connection_manager import SerialConnection
  97. conn = SerialConnection(hardware_port)
  98. try:
  99. assert conn.is_connected()
  100. # Send status query
  101. conn.send('?')
  102. time.sleep(0.1)
  103. response = conn.readline()
  104. assert '<' in response, f"Expected GRBL status, got: {response}"
  105. finally:
  106. conn.close()
  107. def test_firmware_detection(self, hardware_port, run_hardware):
  108. """Test firmware type detection (FluidNC vs GRBL)."""
  109. if not run_hardware:
  110. pytest.skip("Hardware tests disabled")
  111. from modules.connection.connection_manager import SerialConnection, _detect_firmware
  112. from modules.core.state import state
  113. conn = SerialConnection(hardware_port)
  114. state.conn = conn
  115. try:
  116. firmware_type, version = _detect_firmware()
  117. # Should detect one of the known firmware types
  118. assert firmware_type in ['fluidnc', 'grbl', 'unknown'], \
  119. f"Unexpected firmware type: {firmware_type}"
  120. print(f"Detected firmware: {firmware_type} {version or ''}")
  121. finally:
  122. conn.close()
  123. state.conn = None
  124. @pytest.mark.hardware
  125. @pytest.mark.slow
  126. class TestSoftReset:
  127. """Tests for soft reset functionality."""
  128. def test_soft_reset(self, hardware_port, run_hardware):
  129. """Test soft reset using firmware-appropriate command.
  130. FluidNC uses $Bye, GRBL uses Ctrl+X (0x18).
  131. The test auto-detects firmware type and sends the correct command.
  132. """
  133. if not run_hardware:
  134. pytest.skip("Hardware tests disabled")
  135. from modules.connection.connection_manager import SerialConnection, _detect_firmware
  136. from modules.core.state import state
  137. conn = SerialConnection(hardware_port)
  138. state.conn = conn
  139. try:
  140. # Detect firmware to determine reset command
  141. firmware_type, _ = _detect_firmware()
  142. # Clear buffer
  143. conn.ser.reset_input_buffer()
  144. # Send appropriate reset command
  145. if firmware_type == 'fluidnc':
  146. conn.ser.write(b'$Bye\n')
  147. reset_cmd = '$Bye'
  148. else:
  149. conn.ser.write(b'\x18')
  150. reset_cmd = 'Ctrl+X'
  151. conn.flush()
  152. print(f"Sent {reset_cmd} reset command")
  153. # Wait for reset and startup message
  154. time.sleep(1.5)
  155. # Collect responses
  156. responses = []
  157. timeout = time.time() + 3
  158. while time.time() < timeout:
  159. if conn.ser.in_waiting:
  160. line = conn.ser.readline().decode().strip()
  161. if line:
  162. responses.append(line)
  163. print(f" Response: {line}")
  164. time.sleep(0.01)
  165. # Should see GRBL/FluidNC startup message
  166. all_responses = ' '.join(responses)
  167. assert 'Grbl' in all_responses or 'grbl' in all_responses.lower() or 'FluidNC' in all_responses, \
  168. f"Expected GRBL/FluidNC startup message, got: {responses}"
  169. finally:
  170. conn.close()
  171. state.conn = None
  172. @pytest.mark.hardware
  173. @pytest.mark.slow
  174. class TestTableMovement:
  175. """Tests for table movement operations.
  176. IMPORTANT: These tests physically move the table!
  177. Run in order: homing -> perimeter -> center -> pattern
  178. """
  179. def test_homing_sequence(self, hardware_port, run_hardware):
  180. """Test full homing sequence.
  181. This test:
  182. 1. Connects to hardware
  183. 2. Runs the homing procedure
  184. 3. Verifies position matches the configured homing offset
  185. """
  186. if not run_hardware:
  187. pytest.skip("Hardware tests disabled")
  188. import math
  189. from modules.connection import connection_manager
  190. from modules.core.state import state
  191. # Connect and initialize
  192. conn = connection_manager.SerialConnection(hardware_port)
  193. state.conn = conn
  194. try:
  195. # Run homing (timeout 120 seconds for crash homing)
  196. print("Starting homing sequence...")
  197. success = connection_manager.home(timeout=120)
  198. assert success, "Homing sequence failed"
  199. # After homing, theta should match the configured angular_homing_offset_degrees
  200. # (converted to radians), and rho should be near 0
  201. expected_theta = math.radians(state.angular_homing_offset_degrees)
  202. theta_diff = abs(state.current_theta - expected_theta)
  203. assert theta_diff < 0.1, \
  204. f"Expected theta near {expected_theta:.3f} rad ({state.angular_homing_offset_degrees}°), got: {state.current_theta:.3f}"
  205. assert abs(state.current_rho) < 0.1, \
  206. f"Expected rho near 0 after homing, got: {state.current_rho}"
  207. print(f"Homing complete: theta={state.current_theta:.3f} rad (offset={state.angular_homing_offset_degrees}°), rho={state.current_rho:.3f}")
  208. finally:
  209. conn.close()
  210. state.conn = None
  211. def test_move_to_perimeter(self, hardware_port, run_hardware):
  212. """Test moving ball to perimeter (rho=1.0) via API endpoint.
  213. Uses the /move_to_perimeter endpoint which waits for idle before returning.
  214. """
  215. if not run_hardware:
  216. pytest.skip("Hardware tests disabled")
  217. from httpx import Client
  218. from modules.connection import connection_manager
  219. from modules.core.state import state
  220. # Connect
  221. conn = connection_manager.SerialConnection(hardware_port)
  222. state.conn = conn
  223. try:
  224. # Use the API endpoint which waits for idle
  225. print("Moving to perimeter via API...")
  226. from main import app
  227. from fastapi.testclient import TestClient
  228. client = TestClient(app)
  229. response = client.post("/move_to_perimeter")
  230. assert response.status_code == 200, f"API returned {response.status_code}: {response.text}"
  231. assert response.json()["success"] is True
  232. # Verify we're near the perimeter
  233. assert state.current_rho > 0.9, \
  234. f"Expected rho near 1.0, got: {state.current_rho}"
  235. print(f"At perimeter: theta={state.current_theta:.3f}, rho={state.current_rho:.3f}")
  236. finally:
  237. conn.close()
  238. state.conn = None
  239. def test_move_to_center(self, hardware_port, run_hardware):
  240. """Test moving ball to center (rho=0.0) via API endpoint.
  241. Uses the /move_to_center endpoint which waits for idle before returning.
  242. """
  243. if not run_hardware:
  244. pytest.skip("Hardware tests disabled")
  245. from modules.connection import connection_manager
  246. from modules.core.state import state
  247. # Connect
  248. conn = connection_manager.SerialConnection(hardware_port)
  249. state.conn = conn
  250. try:
  251. # Use the API endpoint which waits for idle
  252. print("Moving to center via API...")
  253. from main import app
  254. from fastapi.testclient import TestClient
  255. client = TestClient(app)
  256. response = client.post("/move_to_center")
  257. assert response.status_code == 200, f"API returned {response.status_code}: {response.text}"
  258. assert response.json()["success"] is True
  259. # Verify we're near the center
  260. assert state.current_rho < 0.1, \
  261. f"Expected rho near 0.0, got: {state.current_rho}"
  262. print(f"At center: theta={state.current_theta:.3f}, rho={state.current_rho:.3f}")
  263. finally:
  264. conn.close()
  265. state.conn = None
  266. def test_execute_star_pattern(self, hardware_port, run_hardware):
  267. """Test executing the star.thr pattern.
  268. This runs a full pattern execution and verifies it completes successfully.
  269. The star pattern is relatively quick and good for testing.
  270. """
  271. if not run_hardware:
  272. pytest.skip("Hardware tests disabled")
  273. from modules.connection import connection_manager
  274. from modules.core import pattern_manager
  275. from modules.core.state import state
  276. # Connect
  277. conn = connection_manager.SerialConnection(hardware_port)
  278. state.conn = conn
  279. try:
  280. pattern_path = './patterns/star.thr'
  281. assert os.path.exists(pattern_path), f"Pattern file not found: {pattern_path}"
  282. print(f"Executing pattern: {pattern_path}")
  283. async def run_pattern():
  284. await pattern_manager.run_theta_rho_file(pattern_path)
  285. asyncio.get_event_loop().run_until_complete(run_pattern())
  286. # Pattern should have completed
  287. assert state.current_playing_file is None, \
  288. "Pattern should have completed (current_playing_file should be None)"
  289. print("Pattern execution completed successfully")
  290. finally:
  291. conn.close()
  292. state.conn = None
  293. @pytest.mark.hardware
  294. class TestWebSocketConnection:
  295. """Tests for WebSocket connection to FluidNC."""
  296. def test_websocket_status_endpoint(self, run_hardware):
  297. """Test the /ws/status WebSocket endpoint.
  298. This tests the FastAPI WebSocket endpoint, not direct FluidNC WebSocket.
  299. """
  300. if not run_hardware:
  301. pytest.skip("Hardware tests disabled")
  302. from fastapi.testclient import TestClient
  303. from main import app
  304. client = TestClient(app)
  305. # Connect to WebSocket
  306. with client.websocket_connect("/ws/status") as websocket:
  307. # Should receive initial status
  308. data = websocket.receive_json()
  309. # Verify status structure
  310. assert "is_running" in data or "current_file" in data, \
  311. f"Unexpected status format: {data}"
  312. print(f"Received WebSocket status: {data}")
  313. @pytest.mark.hardware
  314. class TestStatePersistence:
  315. """Tests for state persistence across connections."""
  316. def test_position_saved_on_disconnect(self, hardware_port, run_hardware, tmp_path):
  317. """Test that position is saved to state.json on disconnect.
  318. This verifies the state persistence mechanism works correctly.
  319. """
  320. if not run_hardware:
  321. pytest.skip("Hardware tests disabled")
  322. from modules.connection import connection_manager
  323. from modules.core.state import state
  324. # Connect
  325. conn = connection_manager.SerialConnection(hardware_port)
  326. state.conn = conn
  327. try:
  328. # Record current position
  329. initial_theta = state.current_theta
  330. initial_rho = state.current_rho
  331. # The state file path
  332. state_file = './state.json'
  333. # Disconnect (this should trigger state save)
  334. conn.close()
  335. state.conn = None
  336. # Give it a moment to save
  337. time.sleep(0.5)
  338. # Verify state was saved
  339. assert os.path.exists(state_file), "state.json should exist"
  340. with open(state_file, 'r') as f:
  341. saved_state = json.load(f)
  342. # Check that position-related fields exist
  343. # The exact field names depend on your state implementation
  344. assert 'current_theta' in saved_state or 'theta' in saved_state or 'machine_x' in saved_state, \
  345. f"Expected position data in state.json, got keys: {list(saved_state.keys())}"
  346. print(f"State saved successfully. Position before disconnect: theta={initial_theta}, rho={initial_rho}")
  347. finally:
  348. if state.conn:
  349. state.conn.close()
  350. state.conn = None