||
- import os
- import subprocess
- import logging
- from ..serial import serial_manager
- # Configure logging
- logger = logging.getLogger(__name__)
- # Global state
- MOTOR_TYPE_MAPPING = {
- "TMC2209": "./firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino",
- "DRV8825": "./firmware/arduino_code/arduino_code.ino",
- "esp32": "./firmware/esp32/esp32.ino",
- "esp32_TMC2209": "./firmware/esp32_TMC2209/esp32_TMC2209.ino"
- }
- def get_ino_firmware_details(ino_file_path):
- """Extract firmware details from the given .ino file."""
- try:
- if not ino_file_path:
- raise ValueError("Invalid path: ino_file_path is None or empty.")
- firmware_details = {"version": None, "motorType": None}
- logger.debug(f"Reading firmware details from {ino_file_path}")
- with open(ino_file_path, "r") as file:
- for line in file:
- if "firmwareVersion" in line:
- start = line.find('"') + 1
- end = line.rfind('"')
- if start != -1 and end != -1 and start < end:
- firmware_details["version"] = line[start:end]
- if "motorType" in line:
- start = line.find('"') + 1
- end = line.rfind('"')
- if start != -1 and end != -1 and start < end:
- firmware_details["motorType"] = line[start:end]
- if not firmware_details["version"]:
- logger.warning(f"Firmware version not found in file: {ino_file_path}")
- if not firmware_details["motorType"]:
- logger.warning(f"Motor type not found in file: {ino_file_path}")
- return firmware_details if any(firmware_details.values()) else None
- except FileNotFoundError:
- logger.error(f"File not found: {ino_file_path}")
- return None
- except Exception as e:
- logger.error(f"Error reading .ino file: {str(e)}")
- return None
- def check_git_updates():
- """Check for available Git updates."""
- try:
- logger.debug("Checking for Git updates")
- subprocess.run(["git", "fetch", "--tags", "--force"], check=True)
- latest_remote_tag = subprocess.check_output(
- ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
- ).strip().decode()
- latest_local_tag = subprocess.check_output(
- ["git", "describe", "--tags", "--abbrev=0"]
- ).strip().decode()
- tag_behind_count = 0
- if latest_local_tag != latest_remote_tag:
- tags = subprocess.check_output(
- ["git", "tag", "--merged", "origin/main"], text=True
- ).splitlines()
- found_local = False
- for tag in tags:
- if tag == latest_local_tag:
- found_local = True
- elif found_local:
- tag_behind_count += 1
- if tag == latest_remote_tag:
- break
- updates_available = latest_remote_tag != latest_local_tag
- logger.info(f"Updates available: {updates_available}, {tag_behind_count} versions behind")
- return {
- "updates_available": updates_available,
- "tag_behind_count": tag_behind_count,
- "latest_remote_tag": latest_remote_tag,
- "latest_local_tag": latest_local_tag,
- }
- except subprocess.CalledProcessError as e:
- logger.error(f"Error checking Git updates: {e}")
- return {
- "updates_available": False,
- "tag_behind_count": 0,
- "latest_remote_tag": None,
- "latest_local_tag": None,
- }
- def update_software():
- """Update the software to the latest version."""
- error_log = []
- logger.info("Starting software update process")
- def run_command(command, error_message):
- try:
- logger.debug(f"Running command: {' '.join(command)}")
- subprocess.run(command, check=True)
- except subprocess.CalledProcessError as e:
- logger.error(f"{error_message}: {e}")
- error_log.append(error_message)
- try:
- subprocess.run(["git", "fetch", "--tags"], check=True)
- latest_remote_tag = subprocess.check_output(
- ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
- ).strip().decode()
- logger.info(f"Latest remote tag: {latest_remote_tag}")
- except subprocess.CalledProcessError as e:
- error_msg = f"Failed to fetch tags or get latest remote tag: {e}"
- logger.error(error_msg)
- error_log.append(error_msg)
- return False, error_msg, error_log
- run_command(["git", "checkout", latest_remote_tag, '--force'], f"Failed to checkout version {latest_remote_tag}")
- run_command(["docker", "compose", "pull"], "Failed to fetch Docker containers")
- run_command(["docker", "compose", "up", "-d"], "Failed to restart Docker containers")
- update_status = check_git_updates()
- if (
- update_status["updates_available"] is False
- and update_status["latest_local_tag"] == update_status["latest_remote_tag"]
- ):
- logger.info("Software update completed successfully")
- return True, None, None
- else:
- logger.error("Software update incomplete")
- return False, "Update incomplete", error_log
- def get_firmware_info(motor_type=None):
- """Get firmware information for the current or specified motor type."""
- if motor_type and motor_type not in MOTOR_TYPE_MAPPING:
- return False, "Invalid motor type"
- installed_version = serial_manager.firmware_version
- installed_type = serial_manager.arduino_driver_type
- if motor_type:
- # POST request with specified motor type
- ino_path = MOTOR_TYPE_MAPPING[motor_type]
- firmware_details = get_ino_firmware_details(ino_path)
- if not firmware_details:
- return False, "Failed to retrieve .ino firmware details"
- return True, {
- "installedVersion": 'Unknown',
- "installedType": motor_type,
- "inoVersion": firmware_details["version"],
- "inoType": firmware_details["motorType"],
- "updateAvailable": True
- }
- else:
- # GET request for current firmware info
- if installed_version != 'Unknown' and installed_type != 'Unknown':
- ino_path = MOTOR_TYPE_MAPPING.get(installed_type)
- firmware_details = get_ino_firmware_details(ino_path)
- if not firmware_details or not firmware_details.get("version") or not firmware_details.get("motorType"):
- return False, "Failed to retrieve .ino firmware details"
- update_available = (
- installed_version != firmware_details["version"] or
- installed_type != firmware_details["motorType"]
- )
- return True, {
- "installedVersion": installed_version,
- "installedType": installed_type,
- "inoVersion": firmware_details["version"],
- "inoType": firmware_details["motorType"],
- "updateAvailable": update_available
- }
- return True, {
- "installedVersion": installed_version,
- "installedType": installed_type,
- "updateAvailable": False
- }
- def flash_firmware(motor_type):
- """Flash firmware for the specified motor type."""
- if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
- logger.error(f"Invalid or missing motor type: {motor_type}")
- return False, "Invalid or missing motor type"
- if not serial_manager.is_connected():
- logger.error("No device connected or connection lost")
- return False, "No device connected or connection lost"
- try:
- ino_file_path = MOTOR_TYPE_MAPPING[motor_type]
- hex_file_path = f"{ino_file_path}.hex"
- bin_file_path = f"{ino_file_path}.bin"
- logger.info(f"Flashing firmware for motor type: {motor_type}")
- if motor_type.lower() in ["esp32", "esp32_tmc2209"]:
- if not os.path.exists(bin_file_path):
- logger.error(f"Firmware binary not found: {bin_file_path}")
- return False, f"Firmware binary not found: {bin_file_path}"
- flash_command = [
- "esptool.py",
- "--chip", "esp32",
- "--port", serial_manager.get_port(),
- "--baud", "115200",
- "write_flash", "-z", "0x1000", bin_file_path
- ]
- else:
- if not os.path.exists(hex_file_path):
- logger.error(f"Hex file not found: {hex_file_path}")
- return False, f"Hex file not found: {hex_file_path}"
- flash_command = [
- "avrdude",
- "-v",
- "-c", "arduino",
- "-p", "atmega328p",
- "-P", serial_manager.get_port(),
- "-b", "115200",
- "-D",
- "-U", f"flash:w:{hex_file_path}:i"
- ]
- logger.debug(f"Running flash command: {' '.join(flash_command)}")
- flash_process = subprocess.run(flash_command, capture_output=True, text=True)
- if flash_process.returncode != 0:
- logger.error(f"Firmware flash failed: {flash_process.stderr}")
- return False, flash_process.stderr
- logger.info("Firmware flashed successfully")
- return True, "Firmware flashed successfully"
- except Exception as e:
- logger.error(f"Error during firmware flash: {str(e)}")
- return False, str(e)
|