serial_manager.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. import serial
  2. import serial.tools.list_ports
  3. import threading
  4. import time
  5. import logging
  6. # Configure logging
  7. logger = logging.getLogger(__name__)
  8. # Global state
  9. ser = None
  10. ser_port = None
  11. serial_lock = threading.RLock()
  12. IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  13. # Device information
  14. arduino_table_name = None
  15. arduino_driver_type = 'Unknown'
  16. firmware_version = 'Unknown'
  17. def list_serial_ports():
  18. """Return a list of available serial ports."""
  19. ports = serial.tools.list_ports.comports()
  20. available_ports = [port.device for port in ports if port.device not in IGNORE_PORTS]
  21. logger.debug(f"Available serial ports: {available_ports}")
  22. return available_ports
  23. def connect_to_serial(port=None, baudrate=115200):
  24. """Automatically connect to the first available serial port or a specified port."""
  25. global ser, ser_port, arduino_table_name, arduino_driver_type, firmware_version
  26. try:
  27. if port is None:
  28. ports = list_serial_ports()
  29. if not ports:
  30. logger.warning("No serial port connected")
  31. return False
  32. port = ports[0] # Auto-select the first available port
  33. with serial_lock:
  34. if ser and ser.is_open:
  35. ser.close()
  36. ser = serial.Serial(port, baudrate, timeout=2)
  37. ser_port = port
  38. logger.info(f"Connected to serial port: {port}")
  39. time.sleep(2) # Allow time for the connection to establish
  40. # Read initial startup messages from Arduino
  41. while ser.in_waiting > 0:
  42. line = ser.readline().decode().strip()
  43. logger.debug(f"Arduino: {line}")
  44. # Store the device details based on the expected messages
  45. if "Table:" in line:
  46. arduino_table_name = line.replace("Table: ", "").strip()
  47. elif "Drivers:" in line:
  48. arduino_driver_type = line.replace("Drivers: ", "").strip()
  49. elif "Version:" in line:
  50. firmware_version = line.replace("Version: ", "").strip()
  51. logger.info(f"Detected Table: {arduino_table_name or 'Unknown'}")
  52. logger.info(f"Detected Drivers: {arduino_driver_type or 'Unknown'}")
  53. return True
  54. except serial.SerialException as e:
  55. logger.error(f"Failed to connect to serial port {port}: {e}")
  56. ser_port = None
  57. logger.error("Max retries reached. Could not connect to a serial port.")
  58. return False
  59. def disconnect_serial():
  60. """Disconnect the current serial connection."""
  61. global ser, ser_port
  62. if ser and ser.is_open:
  63. logger.info("Disconnecting serial connection")
  64. ser.close()
  65. ser = None
  66. ser_port = None
  67. def restart_serial(port, baudrate=115200):
  68. """Restart the serial connection."""
  69. logger.info(f"Restarting serial connection on port {port}")
  70. disconnect_serial()
  71. return connect_to_serial(port, baudrate)
  72. def send_coordinate_batch(coordinates):
  73. """Send a batch of theta-rho pairs to the Arduino."""
  74. batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
  75. with serial_lock:
  76. ser.write(batch_str.encode())
  77. logger.debug(f"Sent coordinate batch: {batch_str.strip()}")
  78. def send_command(command, ack=None):
  79. """Send a single command to the Arduino."""
  80. timeout = 10 # Timeout in seconds
  81. start_time = time.time()
  82. with serial_lock:
  83. ser.write(f"{command}\n".encode())
  84. logger.debug(f"Sent command: {command}")
  85. # Wait for "R" acknowledgment from Arduino
  86. while True:
  87. if time.time() - start_time > timeout:
  88. logger.error(f"Timeout: No acknowledgment received within {timeout} seconds")
  89. break # Exit loop after timeout
  90. with serial_lock:
  91. if ser.in_waiting > 0:
  92. response = ser.readline().decode().strip()
  93. logger.debug(f"Arduino response: {response}")
  94. if response == ack:
  95. logger.debug("Command execution completed")
  96. break
  97. def is_connected():
  98. """Check if serial connection is established and open."""
  99. return ser is not None and ser.is_open
  100. def get_port():
  101. """Get the current serial port."""
  102. return ser_port