serial_manager.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import serial
  2. import serial.tools.list_ports
  3. import threading
  4. import time
  5. # Configuration
  6. IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  7. # Global variables
  8. ser = None
  9. ser_port = None
  10. arduino_table_name = None
  11. arduino_driver_type = 'Unknown'
  12. firmware_version = 'Unknown'
  13. serial_lock = threading.Lock()
  14. def list_serial_ports():
  15. """Return a list of available serial ports."""
  16. ports = serial.tools.list_ports.comports()
  17. return [port.device for port in ports if port.device not in IGNORE_PORTS]
  18. def connect_to_serial(port=None, baudrate=115200):
  19. """Automatically connect to the first available serial port or a specified port."""
  20. global ser, ser_port, arduino_table_name, arduino_driver_type, firmware_version
  21. try:
  22. if port is None:
  23. ports = list_serial_ports()
  24. if not ports:
  25. print("No serial port connected")
  26. return False
  27. port = ports[0] # Auto-select the first available port
  28. with serial_lock:
  29. if ser and ser.is_open:
  30. ser.close()
  31. ser = serial.Serial(port, baudrate, timeout=2) # Set timeout to avoid infinite waits
  32. ser_port = port # Store the connected port globally
  33. print(f"Connected to serial port: {port}")
  34. time.sleep(2) # Allow time for the connection to establish
  35. # Read initial startup messages from Arduino
  36. arduino_table_name = None
  37. arduino_driver_type = None
  38. while ser.in_waiting > 0:
  39. line = ser.readline().decode().strip()
  40. print(f"Arduino: {line}") # Print the received message
  41. # Store the device details based on the expected messages
  42. if "Table:" in line:
  43. arduino_table_name = line.replace("Table: ", "").strip()
  44. elif "Drivers:" in line:
  45. arduino_driver_type = line.replace("Drivers: ", "").strip()
  46. elif "Version:" in line:
  47. firmware_version = line.replace("Version: ", "").strip()
  48. # Display stored values
  49. print(f"Detected Table: {arduino_table_name or 'Unknown'}")
  50. print(f"Detected Drivers: {arduino_driver_type or 'Unknown'}")
  51. return True # Successfully connected
  52. except serial.SerialException as e:
  53. print(f"Failed to connect to serial port {port}: {e}")
  54. port = None # Reset the port to try the next available one
  55. print("Max retries reached. Could not connect to a serial port.")
  56. return False
  57. def disconnect_serial():
  58. """Disconnect the current serial connection."""
  59. global ser, ser_port
  60. if ser and ser.is_open:
  61. ser.close()
  62. ser = None
  63. ser_port = None # Reset the port name
  64. def restart_serial(port, baudrate=115200):
  65. """Restart the serial connection."""
  66. disconnect_serial()
  67. return connect_to_serial(port, baudrate)
  68. def send_command(command):
  69. """Send a single command to the Arduino."""
  70. ser.write(f"{command}\n".encode())
  71. print(f"Sent: {command}")
  72. # Wait for "R" acknowledgment from Arduino
  73. while True:
  74. with serial_lock:
  75. if ser.in_waiting > 0:
  76. response = ser.readline().decode().strip()
  77. print(f"Arduino response: {response}")
  78. if response == "R":
  79. print("Command execution completed.")
  80. break
  81. def send_coordinate_batch(coordinates):
  82. """Send a batch of theta-rho pairs to the Arduino."""
  83. batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
  84. ser.write(batch_str.encode())
  85. def get_serial_status():
  86. """Get the current status of the serial connection."""
  87. return {
  88. 'connected': ser.is_open if ser else False,
  89. 'port': ser_port
  90. }
  91. def get_device_info():
  92. """Get information about the connected device."""
  93. return {
  94. 'table_name': arduino_table_name,
  95. 'driver_type': arduino_driver_type,
  96. 'firmware_version': firmware_version
  97. }
  98. def reset_theta():
  99. """Reset theta on the Arduino."""
  100. ser.write("RESET_THETA\n".encode())
  101. while True:
  102. with serial_lock:
  103. if ser.in_waiting > 0:
  104. response = ser.readline().decode().strip()
  105. print(f"Arduino response: {response}")
  106. if response == "THETA_RESET":
  107. print("Theta successfully reset.")
  108. break
  109. time.sleep(0.5) # Small delay to avoid busy waiting