firmware_manager.py 8.1 KB


  1. import os
  2. import subprocess
  3. from ..serial import serial_manager
  4. # Global state
  5. MOTOR_TYPE_MAPPING = {
  6. "TMC2209": "./firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino",
  7. "DRV8825": "./firmware/arduino_code/arduino_code.ino",
  8. "esp32": "./firmware/esp32/esp32.ino",
  9. "esp32_TMC2209": "./firmware/esp32_TMC2209/esp32_TMC2209.ino"
  10. }
  11. def get_ino_firmware_details(ino_file_path):
  12. """Extract firmware details from the given .ino file."""
  13. try:
  14. if not ino_file_path:
  15. raise ValueError("Invalid path: ino_file_path is None or empty.")
  16. firmware_details = {"version": None, "motorType": None}
  17. with open(ino_file_path, "r") as file:
  18. for line in file:
  19. if "firmwareVersion" in line:
  20. start = line.find('"') + 1
  21. end = line.rfind('"')
  22. if start != -1 and end != -1 and start < end:
  23. firmware_details["version"] = line[start:end]
  24. if "motorType" in line:
  25. start = line.find('"') + 1
  26. end = line.rfind('"')
  27. if start != -1 and end != -1 and start < end:
  28. firmware_details["motorType"] = line[start:end]
  29. if not firmware_details["version"]:
  30. print(f"Firmware version not found in file: {ino_file_path}")
  31. if not firmware_details["motorType"]:
  32. print(f"Motor type not found in file: {ino_file_path}")
  33. return firmware_details if any(firmware_details.values()) else None
  34. except FileNotFoundError:
  35. print(f"File not found: {ino_file_path}")
  36. return None
  37. except Exception as e:
  38. print(f"Error reading .ino file: {str(e)}")
  39. return None
  40. def check_git_updates():
  41. """Check for available Git updates."""
  42. try:
  43. subprocess.run(["git", "fetch", "--tags", "--force"], check=True)
  44. latest_remote_tag = subprocess.check_output(
  45. ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
  46. ).strip().decode()
  47. latest_local_tag = subprocess.check_output(
  48. ["git", "describe", "--tags", "--abbrev=0"]
  49. ).strip().decode()
  50. tag_behind_count = 0
  51. if latest_local_tag != latest_remote_tag:
  52. tags = subprocess.check_output(
  53. ["git", "tag", "--merged", "origin/main"], text=True
  54. ).splitlines()
  55. found_local = False
  56. for tag in tags:
  57. if tag == latest_local_tag:
  58. found_local = True
  59. elif found_local:
  60. tag_behind_count += 1
  61. if tag == latest_remote_tag:
  62. break
  63. updates_available = latest_remote_tag != latest_local_tag
  64. return {
  65. "updates_available": updates_available,
  66. "tag_behind_count": tag_behind_count,
  67. "latest_remote_tag": latest_remote_tag,
  68. "latest_local_tag": latest_local_tag,
  69. }
  70. except subprocess.CalledProcessError as e:
  71. print(f"Error checking Git updates: {e}")
  72. return {
  73. "updates_available": False,
  74. "tag_behind_count": 0,
  75. "latest_remote_tag": None,
  76. "latest_local_tag": None,
  77. }
  78. def update_software():
  79. """Update the software to the latest version."""
  80. error_log = []
  81. def run_command(command, error_message):
  82. try:
  83. subprocess.run(command, check=True)
  84. except subprocess.CalledProcessError as e:
  85. print(f"{error_message}: {e}")
  86. error_log.append(error_message)
  87. try:
  88. subprocess.run(["git", "fetch", "--tags"], check=True)
  89. latest_remote_tag = subprocess.check_output(
  90. ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
  91. ).strip().decode()
  92. except subprocess.CalledProcessError as e:
  93. error_log.append(f"Failed to fetch tags or get latest remote tag: {e}")
  94. return False, "Failed to fetch tags or determine the latest version.", error_log
  95. run_command(["git", "checkout", latest_remote_tag, '--force'], f"Failed to checkout version {latest_remote_tag}")
  96. run_command(["docker", "compose", "pull"], "Failed to fetch Docker containers")
  97. run_command(["docker", "compose", "up", "-d"], "Failed to restart Docker containers")
  98. update_status = check_git_updates()
  99. if (
  100. update_status["updates_available"] is False
  101. and update_status["latest_local_tag"] == update_status["latest_remote_tag"]
  102. ):
  103. return True, None, None
  104. else:
  105. return False, "Update incomplete", error_log
  106. def get_firmware_info(motor_type=None):
  107. """Get firmware information for the current or specified motor type."""
  108. if motor_type and motor_type not in MOTOR_TYPE_MAPPING:
  109. return False, "Invalid motor type"
  110. installed_version = serial_manager.firmware_version
  111. installed_type = serial_manager.arduino_driver_type
  112. if motor_type:
  113. # POST request with specified motor type
  114. ino_path = MOTOR_TYPE_MAPPING[motor_type]
  115. firmware_details = get_ino_firmware_details(ino_path)
  116. if not firmware_details:
  117. return False, "Failed to retrieve .ino firmware details"
  118. return True, {
  119. "installedVersion": 'Unknown',
  120. "installedType": motor_type,
  121. "inoVersion": firmware_details["version"],
  122. "inoType": firmware_details["motorType"],
  123. "updateAvailable": True
  124. }
  125. else:
  126. # GET request for current firmware info
  127. if installed_version != 'Unknown' and installed_type != 'Unknown':
  128. ino_path = MOTOR_TYPE_MAPPING.get(installed_type)
  129. firmware_details = get_ino_firmware_details(ino_path)
  130. if not firmware_details or not firmware_details.get("version") or not firmware_details.get("motorType"):
  131. return False, "Failed to retrieve .ino firmware details"
  132. update_available = (
  133. installed_version != firmware_details["version"] or
  134. installed_type != firmware_details["motorType"]
  135. )
  136. return True, {
  137. "installedVersion": installed_version,
  138. "installedType": installed_type,
  139. "inoVersion": firmware_details["version"],
  140. "inoType": firmware_details["motorType"],
  141. "updateAvailable": update_available
  142. }
  143. return True, {
  144. "installedVersion": installed_version,
  145. "installedType": installed_type,
  146. "updateAvailable": False
  147. }
  148. def flash_firmware(motor_type):
  149. """Flash firmware for the specified motor type."""
  150. if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
  151. return False, "Invalid or missing motor type"
  152. if not serial_manager.is_connected():
  153. return False, "No device connected or connection lost"
  154. try:
  155. ino_file_path = MOTOR_TYPE_MAPPING[motor_type]
  156. hex_file_path = f"{ino_file_path}.hex"
  157. bin_file_path = f"{ino_file_path}.bin"
  158. if motor_type.lower() in ["esp32", "esp32_tmc2209"]:
  159. if not os.path.exists(bin_file_path):
  160. return False, f"Firmware binary not found: {bin_file_path}"
  161. flash_command = [
  162. "esptool.py",
  163. "--chip", "esp32",
  164. "--port", serial_manager.get_port(),
  165. "--baud", "115200",
  166. "write_flash", "-z", "0x1000", bin_file_path
  167. ]
  168. else:
  169. if not os.path.exists(hex_file_path):
  170. return False, f"Hex file not found: {hex_file_path}"
  171. flash_command = [
  172. "avrdude",
  173. "-v",
  174. "-c", "arduino",
  175. "-p", "atmega328p",
  176. "-P", serial_manager.get_port(),
  177. "-b", "115200",
  178. "-D",
  179. "-U", f"flash:w:{hex_file_path}:i"
  180. ]
  181. flash_process = subprocess.run(flash_command, capture_output=True, text=True)
  182. if flash_process.returncode != 0:
  183. return False, flash_process.stderr
  184. return True, "Firmware flashed successfully"
  185. except Exception as e:
  186. return False, str(e)