|
|
@@ -2,6 +2,9 @@ import serial
|
|
|
import serial.tools.list_ports
|
|
|
import threading
|
|
|
import time
|
|
|
+import logging
|
|
|
+
|
|
|
+logger = logging.getLogger(__name__)
|
|
|
|
|
|
# Configuration
|
|
|
IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
|
|
|
@@ -16,8 +19,12 @@ serial_lock = threading.Lock()
|
|
|
|
|
|
def list_serial_ports():
|
|
|
"""Return a list of available serial ports."""
|
|
|
- ports = serial.tools.list_ports.comports()
|
|
|
- return [port.device for port in ports if port.device not in IGNORE_PORTS]
|
|
|
+ try:
|
|
|
+ ports = serial.tools.list_ports.comports()
|
|
|
+ return [port.device for port in ports if port.device not in IGNORE_PORTS]
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error listing serial ports: {str(e)}", exc_info=True)
|
|
|
+ return []
|
|
|
|
|
|
def connect_to_serial(port=None, baudrate=115200):
|
|
|
"""Automatically connect to the first available serial port or a specified port."""
|
|
|
@@ -27,7 +34,7 @@ def connect_to_serial(port=None, baudrate=115200):
|
|
|
if port is None:
|
|
|
ports = list_serial_ports()
|
|
|
if not ports:
|
|
|
- print("No serial port connected")
|
|
|
+ logger.error("No serial port connected")
|
|
|
return False
|
|
|
port = ports[0] # Auto-select the first available port
|
|
|
|
|
|
@@ -37,7 +44,7 @@ def connect_to_serial(port=None, baudrate=115200):
|
|
|
ser = serial.Serial(port, baudrate, timeout=2) # Set timeout to avoid infinite waits
|
|
|
ser_port = port # Store the connected port globally
|
|
|
|
|
|
- print(f"Connected to serial port: {port}")
|
|
|
+ logger.info(f"Connected to serial port: {port}")
|
|
|
time.sleep(2) # Allow time for the connection to establish
|
|
|
|
|
|
# Read initial startup messages from Arduino
|
|
|
@@ -45,86 +52,140 @@ def connect_to_serial(port=None, baudrate=115200):
|
|
|
arduino_driver_type = None
|
|
|
|
|
|
while ser.in_waiting > 0:
|
|
|
- line = ser.readline().decode().strip()
|
|
|
- print(f"Arduino: {line}") # Print the received message
|
|
|
-
|
|
|
- # Store the device details based on the expected messages
|
|
|
- if "Table:" in line:
|
|
|
- arduino_table_name = line.replace("Table: ", "").strip()
|
|
|
- elif "Drivers:" in line:
|
|
|
- arduino_driver_type = line.replace("Drivers: ", "").strip()
|
|
|
- elif "Version:" in line:
|
|
|
- firmware_version = line.replace("Version: ", "").strip()
|
|
|
-
|
|
|
- # Display stored values
|
|
|
- print(f"Detected Table: {arduino_table_name or 'Unknown'}")
|
|
|
- print(f"Detected Drivers: {arduino_driver_type or 'Unknown'}")
|
|
|
+ try:
|
|
|
+ line = ser.readline().decode().strip()
|
|
|
+ logger.debug(f"Arduino: {line}") # Print the received message
|
|
|
+
|
|
|
+ # Store the device details based on the expected messages
|
|
|
+ if "Table:" in line:
|
|
|
+ arduino_table_name = line.replace("Table: ", "").strip()
|
|
|
+ elif "Drivers:" in line:
|
|
|
+ arduino_driver_type = line.replace("Drivers: ", "").strip()
|
|
|
+ elif "Version:" in line:
|
|
|
+ firmware_version = line.replace("Version: ", "").strip()
|
|
|
+ except UnicodeDecodeError as e:
|
|
|
+ logger.warning(f"Failed to decode Arduino message: {str(e)}")
|
|
|
+ continue
|
|
|
+
|
|
|
+ logger.info(f"Detected Table: {arduino_table_name or 'Unknown'}")
|
|
|
+ logger.info(f"Detected Drivers: {arduino_driver_type or 'Unknown'}")
|
|
|
|
|
|
return True # Successfully connected
|
|
|
except serial.SerialException as e:
|
|
|
- print(f"Failed to connect to serial port {port}: {e}")
|
|
|
- port = None # Reset the port to try the next available one
|
|
|
-
|
|
|
- print("Max retries reached. Could not connect to a serial port.")
|
|
|
- return False
|
|
|
+ logger.error(f"Failed to connect to serial port {port}: {str(e)}", exc_info=True)
|
|
|
+ return False
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Unexpected error connecting to serial port: {str(e)}", exc_info=True)
|
|
|
+ return False
|
|
|
|
|
|
def disconnect_serial():
|
|
|
"""Disconnect the current serial connection."""
|
|
|
global ser, ser_port
|
|
|
- if ser and ser.is_open:
|
|
|
- ser.close()
|
|
|
+ try:
|
|
|
+ if ser and ser.is_open:
|
|
|
+ ser.close()
|
|
|
+ logger.info(f"Disconnected from serial port: {ser_port}")
|
|
|
ser = None
|
|
|
- ser_port = None # Reset the port name
|
|
|
+ ser_port = None
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error disconnecting serial port: {str(e)}", exc_info=True)
|
|
|
|
|
|
def restart_serial(port, baudrate=115200):
|
|
|
"""Restart the serial connection."""
|
|
|
- disconnect_serial()
|
|
|
- return connect_to_serial(port, baudrate)
|
|
|
+ try:
|
|
|
+ disconnect_serial()
|
|
|
+ return connect_to_serial(port, baudrate)
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error restarting serial connection: {str(e)}", exc_info=True)
|
|
|
+ return False
|
|
|
|
|
|
def send_command(command):
|
|
|
"""Send a single command to the Arduino."""
|
|
|
- ser.write(f"{command}\n".encode())
|
|
|
- print(f"Sent: {command}")
|
|
|
-
|
|
|
- # Wait for "R" acknowledgment from Arduino
|
|
|
- while True:
|
|
|
- with serial_lock:
|
|
|
- if ser.in_waiting > 0:
|
|
|
- response = ser.readline().decode().strip()
|
|
|
- print(f"Arduino response: {response}")
|
|
|
- if response == "R":
|
|
|
- print("Command execution completed.")
|
|
|
- break
|
|
|
+ try:
|
|
|
+ if not ser or not ser.is_open:
|
|
|
+ logger.error("Cannot send command: Serial port not open")
|
|
|
+ return None
|
|
|
+
|
|
|
+ ser.write(f"{command}\n".encode())
|
|
|
+ logger.debug(f"Sent: {command}")
|
|
|
+
|
|
|
+ # Wait for "R" acknowledgment from Arduino
|
|
|
+ while True:
|
|
|
+ with serial_lock:
|
|
|
+ if ser.in_waiting > 0:
|
|
|
+ response = ser.readline().decode().strip()
|
|
|
+ logger.debug(f"Arduino response: {response}")
|
|
|
+ if response == "R":
|
|
|
+ logger.debug("Command execution completed.")
|
|
|
+ return response
|
|
|
+ except serial.SerialException as e:
|
|
|
+ logger.error(f"Serial communication error while sending command: {str(e)}", exc_info=True)
|
|
|
+ return None
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Unexpected error sending command: {str(e)}", exc_info=True)
|
|
|
+ return None
|
|
|
|
|
|
def send_coordinate_batch(coordinates):
|
|
|
"""Send a batch of theta-rho pairs to the Arduino."""
|
|
|
- batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
|
|
|
- ser.write(batch_str.encode())
|
|
|
+ try:
|
|
|
+ if not ser or not ser.is_open:
|
|
|
+ logger.error("Cannot send coordinates: Serial port not open")
|
|
|
+ return False
|
|
|
+
|
|
|
+ batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
|
|
|
+ ser.write(batch_str.encode())
|
|
|
+ return True
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error sending coordinate batch: {str(e)}", exc_info=True)
|
|
|
+ return False
|
|
|
|
|
|
def get_serial_status():
|
|
|
"""Get the current status of the serial connection."""
|
|
|
- return {
|
|
|
- 'connected': ser.is_open if ser else False,
|
|
|
- 'port': ser_port
|
|
|
- }
|
|
|
+ try:
|
|
|
+ return {
|
|
|
+ 'connected': ser.is_open if ser else False,
|
|
|
+ 'port': ser_port
|
|
|
+ }
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error getting serial status: {str(e)}", exc_info=True)
|
|
|
+ return {
|
|
|
+ 'connected': False,
|
|
|
+ 'port': None
|
|
|
+ }
|
|
|
|
|
|
def get_device_info():
|
|
|
"""Get information about the connected device."""
|
|
|
- return {
|
|
|
- 'table_name': arduino_table_name,
|
|
|
- 'driver_type': arduino_driver_type,
|
|
|
- 'firmware_version': firmware_version
|
|
|
- }
|
|
|
+ try:
|
|
|
+ return {
|
|
|
+ 'table_name': arduino_table_name,
|
|
|
+ 'driver_type': arduino_driver_type,
|
|
|
+ 'firmware_version': firmware_version
|
|
|
+ }
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error getting device info: {str(e)}", exc_info=True)
|
|
|
+ return {
|
|
|
+ 'table_name': None,
|
|
|
+ 'driver_type': None,
|
|
|
+ 'firmware_version': None
|
|
|
+ }
|
|
|
|
|
|
def reset_theta():
|
|
|
"""Reset theta on the Arduino."""
|
|
|
- ser.write("RESET_THETA\n".encode())
|
|
|
- while True:
|
|
|
- with serial_lock:
|
|
|
- if ser.in_waiting > 0:
|
|
|
- response = ser.readline().decode().strip()
|
|
|
- print(f"Arduino response: {response}")
|
|
|
- if response == "THETA_RESET":
|
|
|
- print("Theta successfully reset.")
|
|
|
- break
|
|
|
- time.sleep(0.5) # Small delay to avoid busy waiting
|
|
|
+ try:
|
|
|
+ if not ser or not ser.is_open:
|
|
|
+ logger.error("Cannot reset theta: Serial port not open")
|
|
|
+ return False
|
|
|
+
|
|
|
+ ser.write("RESET_THETA\n".encode())
|
|
|
+ while True:
|
|
|
+ with serial_lock:
|
|
|
+ if ser.in_waiting > 0:
|
|
|
+ response = ser.readline().decode().strip()
|
|
|
+ logger.debug(f"Arduino response: {response}")
|
|
|
+ if response == "THETA_RESET":
|
|
|
+ logger.info("Theta successfully reset.")
|
|
|
+ return True
|
|
|
+ time.sleep(0.5) # Small delay to avoid busy waiting
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error resetting theta: {str(e)}", exc_info=True)
|
|
|
+ return False
|