serial_manager.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import serial
  2. import serial.tools.list_ports
  3. import threading
  4. import time
  5. class SerialManager:
  6. def __init__(self):
  7. self.ser = None
  8. self.ser_port = None
  9. self.serial_lock = threading.RLock()
  10. self.IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  11. # Device information
  12. self.arduino_table_name = None
  13. self.arduino_driver_type = 'Unknown'
  14. self.firmware_version = 'Unknown'
  15. def list_serial_ports(self):
  16. """Return a list of available serial ports."""
  17. ports = serial.tools.list_ports.comports()
  18. return [port.device for port in ports if port.device not in self.IGNORE_PORTS]
  19. def connect_to_serial(self, port=None, baudrate=115200):
  20. """Automatically connect to the first available serial port or a specified port."""
  21. try:
  22. if port is None:
  23. ports = self.list_serial_ports()
  24. if not ports:
  25. print("No serial port connected")
  26. return False
  27. port = ports[0] # Auto-select the first available port
  28. with self.serial_lock:
  29. if self.ser and self.ser.is_open:
  30. self.ser.close()
  31. self.ser = serial.Serial(port, baudrate, timeout=2)
  32. self.ser_port = port
  33. print(f"Connected to serial port: {port}")
  34. time.sleep(2) # Allow time for the connection to establish
  35. # Read initial startup messages from Arduino
  36. while self.ser.in_waiting > 0:
  37. line = self.ser.readline().decode().strip()
  38. print(f"Arduino: {line}")
  39. # Store the device details based on the expected messages
  40. if "Table:" in line:
  41. self.arduino_table_name = line.replace("Table: ", "").strip()
  42. elif "Drivers:" in line:
  43. self.arduino_driver_type = line.replace("Drivers: ", "").strip()
  44. elif "Version:" in line:
  45. self.firmware_version = line.replace("Version: ", "").strip()
  46. print(f"Detected Table: {self.arduino_table_name or 'Unknown'}")
  47. print(f"Detected Drivers: {self.arduino_driver_type or 'Unknown'}")
  48. return True
  49. except serial.SerialException as e:
  50. print(f"Failed to connect to serial port {port}: {e}")
  51. self.ser_port = None
  52. print("Max retries reached. Could not connect to a serial port.")
  53. return False
  54. def disconnect_serial(self):
  55. """Disconnect the current serial connection."""
  56. if self.ser and self.ser.is_open:
  57. self.ser.close()
  58. self.ser = None
  59. self.ser_port = None
  60. def restart_serial(self, port, baudrate=115200):
  61. """Restart the serial connection."""
  62. self.disconnect_serial()
  63. return self.connect_to_serial(port, baudrate)
  64. def send_coordinate_batch(self, coordinates):
  65. """Send a batch of theta-rho pairs to the Arduino."""
  66. batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
  67. with self.serial_lock:
  68. self.ser.write(batch_str.encode())
  69. def send_command(self, command):
  70. """Send a single command to the Arduino."""
  71. with self.serial_lock:
  72. self.ser.write(f"{command}\n".encode())
  73. print(f"Sent: {command}")
  74. # Wait for "R" acknowledgment from Arduino
  75. while True:
  76. with self.serial_lock:
  77. if self.ser.in_waiting > 0:
  78. response = self.ser.readline().decode().strip()
  79. print(f"Arduino response: {response}")
  80. if response == "R":
  81. print("Command execution completed.")
  82. break
  83. def is_connected(self):
  84. """Check if serial connection is established and open."""
  85. return self.ser is not None and self.ser.is_open
  86. def get_port(self):
  87. """Get the current serial port."""
  88. return self.ser_port
  89. # Create a global instance
  90. serial_manager = SerialManager()