1
0

serial_manager.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. import serial
  2. import serial.tools.list_ports
  3. import threading
  4. import time
  5. import logging
  6. logger = logging.getLogger(__name__)
  7. # Configuration
  8. IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  9. # Global variables
  10. ser = None
  11. ser_port = None
  12. arduino_table_name = None
  13. arduino_driver_type = 'Unknown'
  14. firmware_version = 'Unknown'
  15. serial_lock = threading.Lock()
  16. def list_serial_ports():
  17. """Return a list of available serial ports."""
  18. try:
  19. ports = serial.tools.list_ports.comports()
  20. return [port.device for port in ports if port.device not in IGNORE_PORTS]
  21. except Exception as e:
  22. logger.error(f"Error listing serial ports: {str(e)}", exc_info=True)
  23. return []
  24. def connect_to_serial(port=None, baudrate=115200):
  25. """Automatically connect to the first available serial port or a specified port."""
  26. global ser, ser_port, arduino_table_name, arduino_driver_type, firmware_version
  27. try:
  28. if port is None:
  29. ports = list_serial_ports()
  30. if not ports:
  31. logger.error("No serial port connected")
  32. return False
  33. port = ports[0] # Auto-select the first available port
  34. with serial_lock:
  35. if ser and ser.is_open:
  36. ser.close()
  37. ser = serial.Serial(port, baudrate, timeout=2) # Set timeout to avoid infinite waits
  38. ser_port = port # Store the connected port globally
  39. logger.info(f"Connected to serial port: {port}")
  40. time.sleep(2) # Allow time for the connection to establish
  41. # Read initial startup messages from Arduino
  42. arduino_table_name = None
  43. arduino_driver_type = None
  44. while ser.in_waiting > 0:
  45. try:
  46. line = ser.readline().decode().strip()
  47. logger.debug(f"Arduino: {line}") # Print the received message
  48. # Store the device details based on the expected messages
  49. if "Table:" in line:
  50. arduino_table_name = line.replace("Table: ", "").strip()
  51. elif "Drivers:" in line:
  52. arduino_driver_type = line.replace("Drivers: ", "").strip()
  53. elif "Version:" in line:
  54. firmware_version = line.replace("Version: ", "").strip()
  55. except UnicodeDecodeError as e:
  56. logger.warning(f"Failed to decode Arduino message: {str(e)}")
  57. continue
  58. logger.info(f"Detected Table: {arduino_table_name or 'Unknown'}")
  59. logger.info(f"Detected Drivers: {arduino_driver_type or 'Unknown'}")
  60. return True # Successfully connected
  61. except serial.SerialException as e:
  62. logger.error(f"Failed to connect to serial port {port}: {str(e)}", exc_info=True)
  63. return False
  64. except Exception as e:
  65. logger.error(f"Unexpected error connecting to serial port: {str(e)}", exc_info=True)
  66. return False
  67. def disconnect_serial():
  68. """Disconnect the current serial connection."""
  69. global ser, ser_port
  70. try:
  71. if ser and ser.is_open:
  72. ser.close()
  73. logger.info(f"Disconnected from serial port: {ser_port}")
  74. ser = None
  75. ser_port = None
  76. except Exception as e:
  77. logger.error(f"Error disconnecting serial port: {str(e)}", exc_info=True)
  78. def restart_serial(port, baudrate=115200):
  79. """Restart the serial connection."""
  80. try:
  81. disconnect_serial()
  82. return connect_to_serial(port, baudrate)
  83. except Exception as e:
  84. logger.error(f"Error restarting serial connection: {str(e)}", exc_info=True)
  85. return False
  86. def send_command(command):
  87. """Send a single command to the Arduino."""
  88. try:
  89. if not ser or not ser.is_open:
  90. logger.error("Cannot send command: Serial port not open")
  91. return None
  92. ser.write(f"{command}\n".encode())
  93. logger.debug(f"Sent: {command}")
  94. # Wait for "R" acknowledgment from Arduino
  95. while True:
  96. with serial_lock:
  97. if ser.in_waiting > 0:
  98. response = ser.readline().decode().strip()
  99. logger.debug(f"Arduino response: {response}")
  100. if response == "R":
  101. logger.debug("Command execution completed.")
  102. return response
  103. except serial.SerialException as e:
  104. logger.error(f"Serial communication error while sending command: {str(e)}", exc_info=True)
  105. return None
  106. except Exception as e:
  107. logger.error(f"Unexpected error sending command: {str(e)}", exc_info=True)
  108. return None
  109. def send_coordinate_batch(coordinates):
  110. """Send a batch of theta-rho pairs to the Arduino."""
  111. try:
  112. if not ser or not ser.is_open:
  113. logger.error("Cannot send coordinates: Serial port not open")
  114. return False
  115. batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
  116. ser.write(batch_str.encode())
  117. return True
  118. except Exception as e:
  119. logger.error(f"Error sending coordinate batch: {str(e)}", exc_info=True)
  120. return False
  121. def get_serial_status():
  122. """Get the current status of the serial connection."""
  123. try:
  124. return {
  125. 'connected': ser.is_open if ser else False,
  126. 'port': ser_port
  127. }
  128. except Exception as e:
  129. logger.error(f"Error getting serial status: {str(e)}", exc_info=True)
  130. return {
  131. 'connected': False,
  132. 'port': None
  133. }
  134. def get_device_info():
  135. """Get information about the connected device."""
  136. try:
  137. return {
  138. 'table_name': arduino_table_name,
  139. 'driver_type': arduino_driver_type,
  140. 'firmware_version': firmware_version
  141. }
  142. except Exception as e:
  143. logger.error(f"Error getting device info: {str(e)}", exc_info=True)
  144. return {
  145. 'table_name': None,
  146. 'driver_type': None,
  147. 'firmware_version': None
  148. }
  149. def reset_theta():
  150. """Reset theta on the Arduino."""
  151. try:
  152. if not ser or not ser.is_open:
  153. logger.error("Cannot reset theta: Serial port not open")
  154. return False
  155. ser.write("RESET_THETA\n".encode())
  156. while True:
  157. with serial_lock:
  158. if ser.in_waiting > 0:
  159. response = ser.readline().decode().strip()
  160. logger.debug(f"Arduino response: {response}")
  161. if response == "THETA_RESET":
  162. logger.info("Theta successfully reset.")
  163. return True
  164. time.sleep(0.5) # Small delay to avoid busy waiting
  165. except Exception as e:
  166. logger.error(f"Error resetting theta: {str(e)}", exc_info=True)
  167. return False