test_connection_manager.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. """
  2. Unit tests for connection_manager parsing functions.
  3. Tests the pure functions that parse GRBL responses:
  4. - Machine position parsing (MPos and WPos formats)
  5. - Serial port listing/filtering
  6. """
  7. import pytest
  8. from unittest.mock import patch, MagicMock
  9. class TestParseMachinePosition:
  10. """Tests for parse_machine_position function."""
  11. def test_parse_machine_position_mpos_format(self):
  12. """Test parsing MPos format from GRBL status response."""
  13. from modules.connection.connection_manager import parse_machine_position
  14. response = "<Idle|MPos:100.500,-50.250,0.000|Bf:15,128>"
  15. result = parse_machine_position(response)
  16. assert result is not None
  17. assert result == (100.5, -50.25)
  18. def test_parse_machine_position_wpos_format(self):
  19. """Test parsing WPos format from GRBL status response."""
  20. from modules.connection.connection_manager import parse_machine_position
  21. response = "<Idle|WPos:0.000,19.000,0.000|Bf:15,128>"
  22. result = parse_machine_position(response)
  23. assert result is not None
  24. assert result == (0.0, 19.0)
  25. def test_parse_machine_position_prefers_mpos(self):
  26. """Test that MPos is preferred when both are present (rare but possible)."""
  27. from modules.connection.connection_manager import parse_machine_position
  28. # This response has both MPos and WPos - MPos should be used first
  29. response = "<Idle|MPos:10.0,20.0,0.0|WPos:5.0,10.0,0.0|Bf:15,128>"
  30. result = parse_machine_position(response)
  31. assert result is not None
  32. assert result == (10.0, 20.0)
  33. def test_parse_machine_position_invalid(self):
  34. """Test parsing returns None for invalid response."""
  35. from modules.connection.connection_manager import parse_machine_position
  36. # No position info
  37. result = parse_machine_position("ok")
  38. assert result is None
  39. # Empty string
  40. result = parse_machine_position("")
  41. assert result is None
  42. # Malformed response
  43. result = parse_machine_position("<Idle|Bf:15,128>")
  44. assert result is None
  45. def test_parse_machine_position_run_state(self):
  46. """Test parsing position during run state."""
  47. from modules.connection.connection_manager import parse_machine_position
  48. response = "<Run|MPos:-994.869,-321.861,0.000|Bf:15,127>"
  49. result = parse_machine_position(response)
  50. assert result is not None
  51. assert result[0] == pytest.approx(-994.869)
  52. assert result[1] == pytest.approx(-321.861)
  53. def test_parse_machine_position_alarm_state(self):
  54. """Test parsing position during alarm state."""
  55. from modules.connection.connection_manager import parse_machine_position
  56. response = "<Alarm|MPos:0.000,0.000,0.000|Bf:15,128|Pn:XY>"
  57. result = parse_machine_position(response)
  58. assert result is not None
  59. assert result == (0.0, 0.0)
  60. def test_parse_machine_position_with_extra_info(self):
  61. """Test parsing position with extra fields in response."""
  62. from modules.connection.connection_manager import parse_machine_position
  63. # Response with WCO (Work Coordinate Offset)
  64. response = "<Idle|MPos:5.0,10.0,0.0|FS:0,0|WCO:0,0,0>"
  65. result = parse_machine_position(response)
  66. assert result is not None
  67. assert result == (5.0, 10.0)
  68. def test_parse_machine_position_negative_coords(self):
  69. """Test parsing negative coordinates."""
  70. from modules.connection.connection_manager import parse_machine_position
  71. response = "<Idle|MPos:-100.123,-200.456,0.000|Bf:15,128>"
  72. result = parse_machine_position(response)
  73. assert result is not None
  74. assert result[0] == pytest.approx(-100.123)
  75. assert result[1] == pytest.approx(-200.456)
  76. def test_parse_machine_position_high_precision(self):
  77. """Test parsing high precision coordinates."""
  78. from modules.connection.connection_manager import parse_machine_position
  79. response = "<Idle|MPos:123.456789,987.654321,0.000000|Bf:15,128>"
  80. result = parse_machine_position(response)
  81. assert result is not None
  82. assert result[0] == pytest.approx(123.456789)
  83. assert result[1] == pytest.approx(987.654321)
  84. class TestListSerialPorts:
  85. """Tests for list_serial_ports function."""
  86. def test_list_serial_ports_filters_ignored(self):
  87. """Test that ignored ports are filtered out."""
  88. # Create mock port objects
  89. mock_port1 = MagicMock()
  90. mock_port1.device = "/dev/ttyUSB0"
  91. mock_port2 = MagicMock()
  92. mock_port2.device = "/dev/cu.debug-console" # Should be filtered
  93. mock_port3 = MagicMock()
  94. mock_port3.device = "/dev/cu.Bluetooth-Incoming-Port" # Should be filtered
  95. mock_port4 = MagicMock()
  96. mock_port4.device = "/dev/ttyACM0"
  97. with patch("serial.tools.list_ports.comports", return_value=[mock_port1, mock_port2, mock_port3, mock_port4]):
  98. from modules.connection.connection_manager import list_serial_ports
  99. ports = list_serial_ports()
  100. assert "/dev/ttyUSB0" in ports
  101. assert "/dev/ttyACM0" in ports
  102. assert "/dev/cu.debug-console" not in ports
  103. assert "/dev/cu.Bluetooth-Incoming-Port" not in ports
  104. assert len(ports) == 2
  105. def test_list_serial_ports_empty(self):
  106. """Test list_serial_ports returns empty when no ports available."""
  107. with patch("serial.tools.list_ports.comports", return_value=[]):
  108. from modules.connection.connection_manager import list_serial_ports
  109. ports = list_serial_ports()
  110. assert ports == []
  111. def test_list_serial_ports_all_ignored(self):
  112. """Test list_serial_ports when all ports are ignored."""
  113. mock_port1 = MagicMock()
  114. mock_port1.device = "/dev/cu.debug-console"
  115. mock_port2 = MagicMock()
  116. mock_port2.device = "/dev/cu.Bluetooth-Incoming-Port"
  117. with patch("serial.tools.list_ports.comports", return_value=[mock_port1, mock_port2]):
  118. from modules.connection.connection_manager import list_serial_ports
  119. ports = list_serial_ports()
  120. assert ports == []
  121. class TestConnectionClasses:
  122. """Tests for connection class structure (no hardware required)."""
  123. def test_base_connection_interface(self):
  124. """Test that BaseConnection defines required interface."""
  125. from modules.connection.connection_manager import BaseConnection
  126. # BaseConnection should have these abstract methods
  127. base = BaseConnection()
  128. with pytest.raises(NotImplementedError):
  129. base.send("test")
  130. with pytest.raises(NotImplementedError):
  131. base.flush()
  132. with pytest.raises(NotImplementedError):
  133. base.readline()
  134. with pytest.raises(NotImplementedError):
  135. base.in_waiting()
  136. with pytest.raises(NotImplementedError):
  137. base.is_connected()
  138. with pytest.raises(NotImplementedError):
  139. base.close()
  140. def test_serial_connection_inherits_base(self):
  141. """Test SerialConnection inherits from BaseConnection."""
  142. from modules.connection.connection_manager import SerialConnection, BaseConnection
  143. assert issubclass(SerialConnection, BaseConnection)
  144. def test_websocket_connection_inherits_base(self):
  145. """Test WebSocketConnection inherits from BaseConnection."""
  146. from modules.connection.connection_manager import WebSocketConnection, BaseConnection
  147. assert issubclass(WebSocketConnection, BaseConnection)
  148. class TestIgnorePorts:
  149. """Tests for IGNORE_PORTS and DEPRIORITIZED_PORTS constants."""
  150. def test_ignore_ports_defined(self):
  151. """Test that IGNORE_PORTS constant is defined."""
  152. from modules.connection.connection_manager import IGNORE_PORTS
  153. assert isinstance(IGNORE_PORTS, list)
  154. assert "/dev/cu.debug-console" in IGNORE_PORTS
  155. assert "/dev/cu.Bluetooth-Incoming-Port" in IGNORE_PORTS
  156. def test_deprioritized_ports_defined(self):
  157. """Test that DEPRIORITIZED_PORTS constant is defined."""
  158. from modules.connection.connection_manager import DEPRIORITIZED_PORTS
  159. assert isinstance(DEPRIORITIZED_PORTS, list)
  160. # ttyS0 is typically the Pi hardware UART - should be deprioritized
  161. assert "/dev/ttyS0" in DEPRIORITIZED_PORTS
  162. class TestIsMachineIdle:
  163. """Tests for is_machine_idle function."""
  164. def test_is_machine_idle_no_connection(self, mock_state):
  165. """Test is_machine_idle returns False when no connection."""
  166. mock_state.conn = None
  167. with patch("modules.connection.connection_manager.state", mock_state):
  168. from modules.connection.connection_manager import is_machine_idle
  169. result = is_machine_idle()
  170. assert result is False
  171. def test_is_machine_idle_disconnected(self, mock_state):
  172. """Test is_machine_idle returns False when disconnected."""
  173. mock_state.conn.is_connected.return_value = False
  174. with patch("modules.connection.connection_manager.state", mock_state):
  175. from modules.connection.connection_manager import is_machine_idle
  176. result = is_machine_idle()
  177. assert result is False
  178. def test_is_machine_idle_when_idle(self, mock_state):
  179. """Test is_machine_idle returns True when machine is idle."""
  180. mock_state.conn.is_connected.return_value = True
  181. mock_state.conn.send = MagicMock()
  182. mock_state.conn.readline.return_value = "<Idle|MPos:0,0,0|Bf:15,128>"
  183. with patch("modules.connection.connection_manager.state", mock_state):
  184. from modules.connection.connection_manager import is_machine_idle
  185. result = is_machine_idle()
  186. assert result is True
  187. mock_state.conn.send.assert_called_with('?')
  188. def test_is_machine_idle_when_running(self, mock_state):
  189. """Test is_machine_idle returns False when machine is running."""
  190. mock_state.conn.is_connected.return_value = True
  191. mock_state.conn.send = MagicMock()
  192. mock_state.conn.readline.return_value = "<Run|MPos:0,0,0|Bf:15,128>"
  193. with patch("modules.connection.connection_manager.state", mock_state):
  194. from modules.connection.connection_manager import is_machine_idle
  195. result = is_machine_idle()
  196. assert result is False