Przeglądaj źródła

test(integration): expand hardware tests with movement and pattern execution

Fixes:
- Status query: Don't assume Idle state, accept any valid GRBL status
- Soft reset: Auto-detect firmware (FluidNC=$Bye, GRBL=Ctrl+X)

New tests:
- test_firmware_detection: Detect FluidNC vs GRBL firmware
- test_homing_sequence: Full homing cycle with position verification
- test_move_to_perimeter: Move ball to rho=1.0
- test_move_to_center: Move ball to rho=0.0
- test_execute_star_pattern: Run star.thr pattern end-to-end
- test_websocket_status_endpoint: Test /ws/status WebSocket
- test_position_saved_on_disconnect: Verify state persistence

Total: 13 integration tests (all skip without --run-hardware)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
tuanchris 1 tydzień temu
rodzic
commit
b609da10e9
1 zmienionych plików z 343 dodań i 31 usunięć
  1. 343 31
      tests/integration/test_hardware.py

+ 343 - 31
tests/integration/test_hardware.py

@@ -6,9 +6,18 @@ Run with: pytest tests/integration/ --run-hardware
 
 All tests in this file are marked with @pytest.mark.hardware and will
 be automatically skipped in CI environments (when CI=true).
+
+Test order matters for some tests - they build on each other:
+1. test_homing_sequence - Homes the table (required first)
+2. test_move_to_perimeter - Moves ball to edge
+3. test_move_to_center - Moves ball to center
+4. test_execute_star_pattern - Runs a full pattern
 """
 import pytest
 import time
+import os
+import json
+import asyncio
 
 
 @pytest.mark.hardware
@@ -23,10 +32,12 @@ class TestSerialConnection:
     def test_grbl_status_query(self, serial_connection):
         """Test querying GRBL status with '?' command.
 
-        GRBL should respond with a status string like:
+        GRBL responds with a status string like:
         <Idle|MPos:0.000,0.000,0.000|Bf:15,128>
-        or
-        <Idle|WPos:0.000,0.000,0.000|Bf:15,128>
+        <Run|MPos:10.000,5.000,0.000|Bf:15,128>
+        <Hold|WPos:0.000,0.000,0.000|Bf:15,128>
+
+        Note: Table may be in any state (Idle, Run, Hold, Alarm, etc.)
         """
         # Clear any stale data
         serial_connection.reset_input_buffer()
@@ -40,8 +51,10 @@ class TestSerialConnection:
         response = serial_connection.readline().decode().strip()
 
         # GRBL status starts with '<' and contains position info
-        assert response.startswith('<'), f"Expected GRBL status, got: {response}"
-        assert 'Pos:' in response, f"Expected position data in: {response}"
+        # Don't assume Idle - table could be in Run, Hold, Alarm, etc.
+        assert response.startswith('<'), f"Expected GRBL status starting with '<', got: {response}"
+        assert 'Pos:' in response, f"Expected position data (MPos or WPos) in: {response}"
+        assert '>' in response, f"Expected closing '>' in status: {response}"
 
     def test_grbl_settings_query(self, serial_connection):
         """Test querying GRBL settings with '$$' command.
@@ -124,41 +137,340 @@ class TestConnectionManager:
         finally:
             conn.close()
 
+    def test_firmware_detection(self, hardware_port, run_hardware):
+        """Test firmware type detection (FluidNC vs GRBL)."""
+        if not run_hardware:
+            pytest.skip("Hardware tests disabled")
+
+        from modules.connection.connection_manager import SerialConnection, _detect_firmware
+        from modules.core.state import state
+
+        conn = SerialConnection(hardware_port)
+        state.conn = conn
+        try:
+            firmware_type, version = _detect_firmware()
+
+            # Should detect one of the known firmware types
+            assert firmware_type in ['fluidnc', 'grbl', 'unknown'], \
+                f"Unexpected firmware type: {firmware_type}"
+
+            print(f"Detected firmware: {firmware_type} {version or ''}")
+        finally:
+            conn.close()
+            state.conn = None
+
+
+@pytest.mark.hardware
+@pytest.mark.slow
+class TestSoftReset:
+    """Tests for soft reset functionality."""
+
+    def test_soft_reset(self, hardware_port, run_hardware):
+        """Test soft reset using firmware-appropriate command.
+
+        FluidNC uses $Bye, GRBL uses Ctrl+X (0x18).
+        The test auto-detects firmware type and sends the correct command.
+        """
+        if not run_hardware:
+            pytest.skip("Hardware tests disabled")
+
+        from modules.connection.connection_manager import SerialConnection, _detect_firmware
+        from modules.core.state import state
+
+        conn = SerialConnection(hardware_port)
+        state.conn = conn
+        try:
+            # Detect firmware to determine reset command
+            firmware_type, _ = _detect_firmware()
+
+            # Clear buffer
+            conn.ser.reset_input_buffer()
+
+            # Send appropriate reset command
+            if firmware_type == 'fluidnc':
+                conn.ser.write(b'$Bye\n')
+                reset_cmd = '$Bye'
+            else:
+                conn.ser.write(b'\x18')
+                reset_cmd = 'Ctrl+X'
+
+            conn.flush()
+            print(f"Sent {reset_cmd} reset command")
+
+            # Wait for reset and startup message
+            time.sleep(1.5)
+
+            # Collect responses
+            responses = []
+            timeout = time.time() + 3
+
+            while time.time() < timeout:
+                if conn.ser.in_waiting:
+                    line = conn.ser.readline().decode().strip()
+                    if line:
+                        responses.append(line)
+                        print(f"  Response: {line}")
+                time.sleep(0.01)
+
+            # Should see GRBL/FluidNC startup message
+            all_responses = ' '.join(responses)
+            assert 'Grbl' in all_responses or 'grbl' in all_responses.lower() or 'FluidNC' in all_responses, \
+                f"Expected GRBL/FluidNC startup message, got: {responses}"
+
+        finally:
+            conn.close()
+            state.conn = None
+
 
 @pytest.mark.hardware
 @pytest.mark.slow
-class TestHardwareOperations:
-    """Slow integration tests that perform actual hardware operations.
+class TestTableMovement:
+    """Tests for table movement operations.
 
-    These tests take longer and may move the hardware.
-    Use with caution!
+    IMPORTANT: These tests physically move the table!
+    Run in order: homing -> perimeter -> center -> pattern
     """
 
-    def test_soft_reset(self, serial_connection):
-        """Test GRBL soft reset (Ctrl+X).
+    def test_homing_sequence(self, hardware_port, run_hardware):
+        """Test full homing sequence.
 
-        This sends a soft reset command and verifies GRBL responds
-        with its startup message.
+        This test:
+        1. Connects to hardware
+        2. Runs the homing procedure
+        3. Verifies position is reset to origin
         """
-        # Send soft reset (Ctrl+X = 0x18)
-        serial_connection.write(b'\x18')
-        serial_connection.flush()
+        if not run_hardware:
+            pytest.skip("Hardware tests disabled")
 
-        # Wait for reset and startup message
-        time.sleep(1)
+        from modules.connection import connection_manager
+        from modules.core.state import state
 
-        # Collect responses
-        responses = []
-        timeout = time.time() + 3
+        # Connect and initialize
+        conn = connection_manager.SerialConnection(hardware_port)
+        state.conn = conn
+
+        try:
+            # Run homing (timeout 120 seconds for crash homing)
+            print("Starting homing sequence...")
+            success = connection_manager.home(timeout=120)
+
+            assert success, "Homing sequence failed"
+
+            # After homing, position should be at or near origin
+            # Allow small tolerance for mechanical variance
+            assert abs(state.current_theta) < 0.1, \
+                f"Expected theta near 0 after homing, got: {state.current_theta}"
+            assert abs(state.current_rho) < 0.1, \
+                f"Expected rho near 0 after homing, got: {state.current_rho}"
+
+            print(f"Homing complete: theta={state.current_theta}, rho={state.current_rho}")
+
+        finally:
+            conn.close()
+            state.conn = None
+
+    def test_move_to_perimeter(self, hardware_port, run_hardware):
+        """Test moving ball to perimeter (rho=1.0).
+
+        Moves the ball from current position to the outer edge of the table.
+        """
+        if not run_hardware:
+            pytest.skip("Hardware tests disabled")
+
+        from modules.connection import connection_manager
+        from modules.core import pattern_manager
+        from modules.core.state import state
+
+        # Connect
+        conn = connection_manager.SerialConnection(hardware_port)
+        state.conn = conn
+
+        try:
+            # Start motion controller
+            if not pattern_manager.motion_controller.running:
+                pattern_manager.motion_controller.start()
+
+            # Move to perimeter (theta=0, rho=1.0)
+            print("Moving to perimeter (rho=1.0)...")
+
+            async def do_move():
+                await pattern_manager.move_polar(theta=0, rho=1.0, speed=200)
+
+            asyncio.get_event_loop().run_until_complete(do_move())
+
+            # Wait for movement to complete
+            time.sleep(2)
+
+            # Verify we're near the perimeter
+            assert state.current_rho > 0.9, \
+                f"Expected rho near 1.0, got: {state.current_rho}"
+
+            print(f"At perimeter: theta={state.current_theta}, rho={state.current_rho}")
+
+        finally:
+            pattern_manager.motion_controller.stop()
+            conn.close()
+            state.conn = None
+
+    def test_move_to_center(self, hardware_port, run_hardware):
+        """Test moving ball to center (rho=0.0).
+
+        Moves the ball from current position to the center of the table.
+        """
+        if not run_hardware:
+            pytest.skip("Hardware tests disabled")
+
+        from modules.connection import connection_manager
+        from modules.core import pattern_manager
+        from modules.core.state import state
+
+        # Connect
+        conn = connection_manager.SerialConnection(hardware_port)
+        state.conn = conn
+
+        try:
+            # Start motion controller
+            if not pattern_manager.motion_controller.running:
+                pattern_manager.motion_controller.start()
+
+            # Move to center (theta=0, rho=0.0)
+            print("Moving to center (rho=0.0)...")
+
+            async def do_move():
+                await pattern_manager.move_polar(theta=0, rho=0.0, speed=200)
+
+            asyncio.get_event_loop().run_until_complete(do_move())
+
+            # Wait for movement to complete
+            time.sleep(2)
+
+            # Verify we're near the center
+            assert state.current_rho < 0.1, \
+                f"Expected rho near 0.0, got: {state.current_rho}"
+
+            print(f"At center: theta={state.current_theta}, rho={state.current_rho}")
+
+        finally:
+            pattern_manager.motion_controller.stop()
+            conn.close()
+            state.conn = None
+
+    def test_execute_star_pattern(self, hardware_port, run_hardware):
+        """Test executing the star.thr pattern.
+
+        This runs a full pattern execution and verifies it completes successfully.
+        The star pattern is relatively quick and good for testing.
+        """
+        if not run_hardware:
+            pytest.skip("Hardware tests disabled")
+
+        from modules.connection import connection_manager
+        from modules.core import pattern_manager
+        from modules.core.state import state
+
+        # Connect
+        conn = connection_manager.SerialConnection(hardware_port)
+        state.conn = conn
+
+        try:
+            pattern_path = './patterns/star.thr'
+            assert os.path.exists(pattern_path), f"Pattern file not found: {pattern_path}"
+
+            print(f"Executing pattern: {pattern_path}")
+
+            async def run_pattern():
+                await pattern_manager.run_theta_rho_file(pattern_path)
+
+            asyncio.get_event_loop().run_until_complete(run_pattern())
+
+            # Pattern should have completed
+            assert state.current_playing_file is None, \
+                "Pattern should have completed (current_playing_file should be None)"
+
+            print("Pattern execution completed successfully")
+
+        finally:
+            conn.close()
+            state.conn = None
 
-        while time.time() < timeout:
-            if serial_connection.in_waiting:
-                line = serial_connection.readline().decode().strip()
-                if line:
-                    responses.append(line)
-            time.sleep(0.01)
 
-        # GRBL should output startup message containing "Grbl"
-        all_responses = ' '.join(responses)
-        assert 'Grbl' in all_responses or 'grbl' in all_responses.lower(), \
-            f"Expected GRBL startup message, got: {responses}"
+@pytest.mark.hardware
+class TestWebSocketConnection:
+    """Tests for WebSocket connection to FluidNC."""
+
+    def test_websocket_status_endpoint(self, run_hardware):
+        """Test the /ws/status WebSocket endpoint.
+
+        This tests the FastAPI WebSocket endpoint, not direct FluidNC WebSocket.
+        """
+        if not run_hardware:
+            pytest.skip("Hardware tests disabled")
+
+        from fastapi.testclient import TestClient
+        from main import app
+
+        client = TestClient(app)
+
+        # Connect to WebSocket
+        with client.websocket_connect("/ws/status") as websocket:
+            # Should receive initial status
+            data = websocket.receive_json(timeout=5)
+
+            # Verify status structure
+            assert "is_running" in data or "current_file" in data, \
+                f"Unexpected status format: {data}"
+
+            print(f"Received WebSocket status: {data}")
+
+
+@pytest.mark.hardware
+class TestStatePersistence:
+    """Tests for state persistence across connections."""
+
+    def test_position_saved_on_disconnect(self, hardware_port, run_hardware, tmp_path):
+        """Test that position is saved to state.json on disconnect.
+
+        This verifies the state persistence mechanism works correctly.
+        """
+        if not run_hardware:
+            pytest.skip("Hardware tests disabled")
+
+        from modules.connection import connection_manager
+        from modules.core.state import state
+
+        # Connect
+        conn = connection_manager.SerialConnection(hardware_port)
+        state.conn = conn
+
+        try:
+            # Record current position
+            initial_theta = state.current_theta
+            initial_rho = state.current_rho
+
+            # The state file path
+            state_file = './state.json'
+
+            # Disconnect (this should trigger state save)
+            conn.close()
+            state.conn = None
+
+            # Give it a moment to save
+            time.sleep(0.5)
+
+            # Verify state was saved
+            assert os.path.exists(state_file), "state.json should exist"
+
+            with open(state_file, 'r') as f:
+                saved_state = json.load(f)
+
+            # Check that position-related fields exist
+            # The exact field names depend on your state implementation
+            assert 'current_theta' in saved_state or 'theta' in saved_state or 'machine_x' in saved_state, \
+                f"Expected position data in state.json, got keys: {list(saved_state.keys())}"
+
+            print(f"State saved successfully. Position before disconnect: theta={initial_theta}, rho={initial_rho}")
+
+        finally:
+            if state.conn:
+                state.conn.close()
+                state.conn = None