firmware_manager.py 9.3 KB


  1. import os
  2. import subprocess
  3. import logging
  4. from ..serial import serial_manager
  5. # Configure logging
  6. logger = logging.getLogger(__name__)
  7. # Global state
  8. MOTOR_TYPE_MAPPING = {
  9. "TMC2209": "./firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino",
  10. "DRV8825": "./firmware/arduino_code/arduino_code.ino",
  11. "esp32": "./firmware/esp32/esp32.ino",
  12. "esp32_TMC2209": "./firmware/esp32_TMC2209/esp32_TMC2209.ino"
  13. }
  14. def get_ino_firmware_details(ino_file_path):
  15. """Extract firmware details from the given .ino file."""
  16. try:
  17. if not ino_file_path:
  18. raise ValueError("Invalid path: ino_file_path is None or empty.")
  19. firmware_details = {"version": None, "motorType": None}
  20. logger.debug(f"Reading firmware details from {ino_file_path}")
  21. with open(ino_file_path, "r") as file:
  22. for line in file:
  23. if "firmwareVersion" in line:
  24. start = line.find('"') + 1
  25. end = line.rfind('"')
  26. if start != -1 and end != -1 and start < end:
  27. firmware_details["version"] = line[start:end]
  28. if "motorType" in line:
  29. start = line.find('"') + 1
  30. end = line.rfind('"')
  31. if start != -1 and end != -1 and start < end:
  32. firmware_details["motorType"] = line[start:end]
  33. if not firmware_details["version"]:
  34. logger.warning(f"Firmware version not found in file: {ino_file_path}")
  35. if not firmware_details["motorType"]:
  36. logger.warning(f"Motor type not found in file: {ino_file_path}")
  37. return firmware_details if any(firmware_details.values()) else None
  38. except FileNotFoundError:
  39. logger.error(f"File not found: {ino_file_path}")
  40. return None
  41. except Exception as e:
  42. logger.error(f"Error reading .ino file: {str(e)}")
  43. return None
  44. def check_git_updates():
  45. """Check for available Git updates."""
  46. try:
  47. logger.debug("Checking for Git updates")
  48. subprocess.run(["git", "fetch", "--tags", "--force"], check=True)
  49. latest_remote_tag = subprocess.check_output(
  50. ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
  51. ).strip().decode()
  52. latest_local_tag = subprocess.check_output(
  53. ["git", "describe", "--tags", "--abbrev=0"]
  54. ).strip().decode()
  55. tag_behind_count = 0
  56. if latest_local_tag != latest_remote_tag:
  57. tags = subprocess.check_output(
  58. ["git", "tag", "--merged", "origin/main"], text=True
  59. ).splitlines()
  60. found_local = False
  61. for tag in tags:
  62. if tag == latest_local_tag:
  63. found_local = True
  64. elif found_local:
  65. tag_behind_count += 1
  66. if tag == latest_remote_tag:
  67. break
  68. updates_available = latest_remote_tag != latest_local_tag
  69. logger.info(f"Updates available: {updates_available}, {tag_behind_count} versions behind")
  70. return {
  71. "updates_available": updates_available,
  72. "tag_behind_count": tag_behind_count,
  73. "latest_remote_tag": latest_remote_tag,
  74. "latest_local_tag": latest_local_tag,
  75. }
  76. except subprocess.CalledProcessError as e:
  77. logger.error(f"Error checking Git updates: {e}")
  78. return {
  79. "updates_available": False,
  80. "tag_behind_count": 0,
  81. "latest_remote_tag": None,
  82. "latest_local_tag": None,
  83. }
  84. def update_software():
  85. """Update the software to the latest version."""
  86. error_log = []
  87. logger.info("Starting software update process")
  88. def run_command(command, error_message):
  89. try:
  90. logger.debug(f"Running command: {' '.join(command)}")
  91. subprocess.run(command, check=True)
  92. except subprocess.CalledProcessError as e:
  93. logger.error(f"{error_message}: {e}")
  94. error_log.append(error_message)
  95. try:
  96. subprocess.run(["git", "fetch", "--tags"], check=True)
  97. latest_remote_tag = subprocess.check_output(
  98. ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
  99. ).strip().decode()
  100. logger.info(f"Latest remote tag: {latest_remote_tag}")
  101. except subprocess.CalledProcessError as e:
  102. error_msg = f"Failed to fetch tags or get latest remote tag: {e}"
  103. logger.error(error_msg)
  104. error_log.append(error_msg)
  105. return False, error_msg, error_log
  106. run_command(["git", "checkout", latest_remote_tag, '--force'], f"Failed to checkout version {latest_remote_tag}")
  107. run_command(["docker", "compose", "pull"], "Failed to fetch Docker containers")
  108. run_command(["docker", "compose", "up", "-d"], "Failed to restart Docker containers")
  109. update_status = check_git_updates()
  110. if (
  111. update_status["updates_available"] is False
  112. and update_status["latest_local_tag"] == update_status["latest_remote_tag"]
  113. ):
  114. logger.info("Software update completed successfully")
  115. return True, None, None
  116. else:
  117. logger.error("Software update incomplete")
  118. return False, "Update incomplete", error_log
  119. def get_firmware_info(motor_type=None):
  120. """Get firmware information for the current or specified motor type."""
  121. if motor_type and motor_type not in MOTOR_TYPE_MAPPING:
  122. return False, "Invalid motor type"
  123. installed_version = serial_manager.firmware_version
  124. installed_type = serial_manager.arduino_driver_type
  125. if motor_type:
  126. # POST request with specified motor type
  127. ino_path = MOTOR_TYPE_MAPPING[motor_type]
  128. firmware_details = get_ino_firmware_details(ino_path)
  129. if not firmware_details:
  130. return False, "Failed to retrieve .ino firmware details"
  131. return True, {
  132. "installedVersion": 'Unknown',
  133. "installedType": motor_type,
  134. "inoVersion": firmware_details["version"],
  135. "inoType": firmware_details["motorType"],
  136. "updateAvailable": True
  137. }
  138. else:
  139. # GET request for current firmware info
  140. if installed_version != 'Unknown' and installed_type != 'Unknown':
  141. ino_path = MOTOR_TYPE_MAPPING.get(installed_type)
  142. firmware_details = get_ino_firmware_details(ino_path)
  143. if not firmware_details or not firmware_details.get("version") or not firmware_details.get("motorType"):
  144. return False, "Failed to retrieve .ino firmware details"
  145. update_available = (
  146. installed_version != firmware_details["version"] or
  147. installed_type != firmware_details["motorType"]
  148. )
  149. return True, {
  150. "installedVersion": installed_version,
  151. "installedType": installed_type,
  152. "inoVersion": firmware_details["version"],
  153. "inoType": firmware_details["motorType"],
  154. "updateAvailable": update_available
  155. }
  156. return True, {
  157. "installedVersion": installed_version,
  158. "installedType": installed_type,
  159. "updateAvailable": False
  160. }
  161. def flash_firmware(motor_type):
  162. """Flash firmware for the specified motor type."""
  163. if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
  164. logger.error(f"Invalid or missing motor type: {motor_type}")
  165. return False, "Invalid or missing motor type"
  166. if not serial_manager.is_connected():
  167. logger.error("No device connected or connection lost")
  168. return False, "No device connected or connection lost"
  169. try:
  170. ino_file_path = MOTOR_TYPE_MAPPING[motor_type]
  171. hex_file_path = f"{ino_file_path}.hex"
  172. bin_file_path = f"{ino_file_path}.bin"
  173. logger.info(f"Flashing firmware for motor type: {motor_type}")
  174. if motor_type.lower() in ["esp32", "esp32_tmc2209"]:
  175. if not os.path.exists(bin_file_path):
  176. logger.error(f"Firmware binary not found: {bin_file_path}")
  177. return False, f"Firmware binary not found: {bin_file_path}"
  178. flash_command = [
  179. "esptool.py",
  180. "--chip", "esp32",
  181. "--port", serial_manager.get_port(),
  182. "--baud", "115200",
  183. "write_flash", "-z", "0x1000", bin_file_path
  184. ]
  185. else:
  186. if not os.path.exists(hex_file_path):
  187. logger.error(f"Hex file not found: {hex_file_path}")
  188. return False, f"Hex file not found: {hex_file_path}"
  189. flash_command = [
  190. "avrdude",
  191. "-v",
  192. "-c", "arduino",
  193. "-p", "atmega328p",
  194. "-P", serial_manager.get_port(),
  195. "-b", "115200",
  196. "-D",
  197. "-U", f"flash:w:{hex_file_path}:i"
  198. ]
  199. logger.debug(f"Running flash command: {' '.join(flash_command)}")
  200. flash_process = subprocess.run(flash_command, capture_output=True, text=True)
  201. if flash_process.returncode != 0:
  202. logger.error(f"Firmware flash failed: {flash_process.stderr}")
  203. return False, flash_process.stderr
  204. logger.info("Firmware flashed successfully")
  205. return True, "Firmware flashed successfully"
  206. except Exception as e:
  207. logger.error(f"Error during firmware flash: {str(e)}")
  208. return False, str(e)