firmware_manager.py 9.0 KB


  1. import os
  2. import subprocess
  3. from ..serial.serial_manager import serial_manager
  4. class FirmwareManager:
  5. def __init__(self):
  6. self.MOTOR_TYPE_MAPPING = {
  7. "TMC2209": "./firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino",
  8. "DRV8825": "./firmware/arduino_code/arduino_code.ino",
  9. "esp32": "./firmware/esp32/esp32.ino",
  10. "esp32_TMC2209": "./firmware/esp32_TMC2209/esp32_TMC2209.ino"
  11. }
  12. def get_ino_firmware_details(self, ino_file_path):
  13. """Extract firmware details from the given .ino file."""
  14. try:
  15. if not ino_file_path:
  16. raise ValueError("Invalid path: ino_file_path is None or empty.")
  17. firmware_details = {"version": None, "motorType": None}
  18. with open(ino_file_path, "r") as file:
  19. for line in file:
  20. if "firmwareVersion" in line:
  21. start = line.find('"') + 1
  22. end = line.rfind('"')
  23. if start != -1 and end != -1 and start < end:
  24. firmware_details["version"] = line[start:end]
  25. if "motorType" in line:
  26. start = line.find('"') + 1
  27. end = line.rfind('"')
  28. if start != -1 and end != -1 and start < end:
  29. firmware_details["motorType"] = line[start:end]
  30. if not firmware_details["version"]:
  31. print(f"Firmware version not found in file: {ino_file_path}")
  32. if not firmware_details["motorType"]:
  33. print(f"Motor type not found in file: {ino_file_path}")
  34. return firmware_details if any(firmware_details.values()) else None
  35. except FileNotFoundError:
  36. print(f"File not found: {ino_file_path}")
  37. return None
  38. except Exception as e:
  39. print(f"Error reading .ino file: {str(e)}")
  40. return None
  41. def check_git_updates(self):
  42. """Check for available Git updates."""
  43. try:
  44. subprocess.run(["git", "fetch", "--tags", "--force"], check=True)
  45. latest_remote_tag = subprocess.check_output(
  46. ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
  47. ).strip().decode()
  48. latest_local_tag = subprocess.check_output(
  49. ["git", "describe", "--tags", "--abbrev=0"]
  50. ).strip().decode()
  51. tag_behind_count = 0
  52. if latest_local_tag != latest_remote_tag:
  53. tags = subprocess.check_output(
  54. ["git", "tag", "--merged", "origin/main"], text=True
  55. ).splitlines()
  56. found_local = False
  57. for tag in tags:
  58. if tag == latest_local_tag:
  59. found_local = True
  60. elif found_local:
  61. tag_behind_count += 1
  62. if tag == latest_remote_tag:
  63. break
  64. updates_available = latest_remote_tag != latest_local_tag
  65. return {
  66. "updates_available": updates_available,
  67. "tag_behind_count": tag_behind_count,
  68. "latest_remote_tag": latest_remote_tag,
  69. "latest_local_tag": latest_local_tag,
  70. }
  71. except subprocess.CalledProcessError as e:
  72. print(f"Error checking Git updates: {e}")
  73. return {
  74. "updates_available": False,
  75. "tag_behind_count": 0,
  76. "latest_remote_tag": None,
  77. "latest_local_tag": None,
  78. }
  79. def update_software(self):
  80. """Update the software to the latest version."""
  81. error_log = []
  82. def run_command(command, error_message):
  83. try:
  84. subprocess.run(command, check=True)
  85. except subprocess.CalledProcessError as e:
  86. print(f"{error_message}: {e}")
  87. error_log.append(error_message)
  88. try:
  89. subprocess.run(["git", "fetch", "--tags"], check=True)
  90. latest_remote_tag = subprocess.check_output(
  91. ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
  92. ).strip().decode()
  93. except subprocess.CalledProcessError as e:
  94. error_log.append(f"Failed to fetch tags or get latest remote tag: {e}")
  95. return False, "Failed to fetch tags or determine the latest version.", error_log
  96. run_command(["git", "checkout", latest_remote_tag, '--force'], f"Failed to checkout version {latest_remote_tag}")
  97. run_command(["docker", "compose", "pull"], "Failed to fetch Docker containers")
  98. run_command(["docker", "compose", "up", "-d"], "Failed to restart Docker containers")
  99. update_status = self.check_git_updates()
  100. if (
  101. update_status["updates_available"] is False
  102. and update_status["latest_local_tag"] == update_status["latest_remote_tag"]
  103. ):
  104. return True, None, None
  105. else:
  106. return False, "Update incomplete", error_log
  107. def get_firmware_info(self, motor_type=None):
  108. """Get firmware information for the current or specified motor type."""
  109. if motor_type and motor_type not in self.MOTOR_TYPE_MAPPING:
  110. return False, "Invalid motor type"
  111. installed_version = serial_manager.firmware_version
  112. installed_type = serial_manager.arduino_driver_type
  113. if motor_type:
  114. # POST request with specified motor type
  115. ino_path = self.MOTOR_TYPE_MAPPING[motor_type]
  116. firmware_details = self.get_ino_firmware_details(ino_path)
  117. if not firmware_details:
  118. return False, "Failed to retrieve .ino firmware details"
  119. return True, {
  120. "installedVersion": 'Unknown',
  121. "installedType": motor_type,
  122. "inoVersion": firmware_details["version"],
  123. "inoType": firmware_details["motorType"],
  124. "updateAvailable": True
  125. }
  126. else:
  127. # GET request for current firmware info
  128. if installed_version != 'Unknown' and installed_type != 'Unknown':
  129. ino_path = self.MOTOR_TYPE_MAPPING.get(installed_type)
  130. firmware_details = self.get_ino_firmware_details(ino_path)
  131. if not firmware_details or not firmware_details.get("version") or not firmware_details.get("motorType"):
  132. return False, "Failed to retrieve .ino firmware details"
  133. update_available = (
  134. installed_version != firmware_details["version"] or
  135. installed_type != firmware_details["motorType"]
  136. )
  137. return True, {
  138. "installedVersion": installed_version,
  139. "installedType": installed_type,
  140. "inoVersion": firmware_details["version"],
  141. "inoType": firmware_details["motorType"],
  142. "updateAvailable": update_available
  143. }
  144. return True, {
  145. "installedVersion": installed_version,
  146. "installedType": installed_type,
  147. "updateAvailable": False
  148. }
  149. def flash_firmware(self, motor_type):
  150. """Flash firmware for the specified motor type."""
  151. if not motor_type or motor_type not in self.MOTOR_TYPE_MAPPING:
  152. return False, "Invalid or missing motor type"
  153. if not serial_manager.is_connected():
  154. return False, "No device connected or connection lost"
  155. try:
  156. ino_file_path = self.MOTOR_TYPE_MAPPING[motor_type]
  157. hex_file_path = f"{ino_file_path}.hex"
  158. bin_file_path = f"{ino_file_path}.bin"
  159. if motor_type.lower() in ["esp32", "esp32_tmc2209"]:
  160. if not os.path.exists(bin_file_path):
  161. return False, f"Firmware binary not found: {bin_file_path}"
  162. flash_command = [
  163. "esptool.py",
  164. "--chip", "esp32",
  165. "--port", serial_manager.get_port(),
  166. "--baud", "115200",
  167. "write_flash", "-z", "0x1000", bin_file_path
  168. ]
  169. else:
  170. if not os.path.exists(hex_file_path):
  171. return False, f"Hex file not found: {hex_file_path}"
  172. flash_command = [
  173. "avrdude",
  174. "-v",
  175. "-c", "arduino",
  176. "-p", "atmega328p",
  177. "-P", serial_manager.get_port(),
  178. "-b", "115200",
  179. "-D",
  180. "-U", f"flash:w:{hex_file_path}:i"
  181. ]
  182. flash_process = subprocess.run(flash_command, capture_output=True, text=True)
  183. if flash_process.returncode != 0:
  184. return False, flash_process.stderr
  185. return True, "Firmware flashed successfully"
  186. except Exception as e:
  187. return False, str(e)
  188. # Create a global instance
  189. firmware_manager = FirmwareManager()