firmware_manager.py 9.0 KB


  1. import os
  2. import subprocess
  3. import json
  4. import logging
  5. MOTOR_TYPE_MAPPING = {
  6. "TMC2209": "../firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino",
  7. "DRV8825": "../firmware/arduino_code/arduino_code.ino",
  8. "esp32": "../firmware/esp32/esp32.ino"
  9. }
  10. logger=logging.getLogger(__name__)
  11. def get_ino_firmware_details(ino_file_path):
  12. """
  13. Extract firmware details, including version and motor type, from the given .ino file.
  14. """
  15. try:
  16. if not ino_file_path:
  17. raise ValueError("Invalid path: ino_file_path is None or empty.")
  18. firmware_details = {"version": None, "motorType": None}
  19. with open(ino_file_path, "r") as file:
  20. for line in file:
  21. # Extract firmware version
  22. if "firmwareVersion" in line:
  23. start = line.find('"') + 1
  24. end = line.rfind('"')
  25. if start != -1 and end != -1 and start < end:
  26. firmware_details["version"] = line[start:end]
  27. # Extract motor type
  28. if "motorType" in line:
  29. start = line.find('"') + 1
  30. end = line.rfind('"')
  31. if start != -1 and end != -1 and start < end:
  32. firmware_details["motorType"] = line[start:end]
  33. if not firmware_details["version"]:
  34. print(f"Firmware version not found in file: {ino_file_path}")
  35. if not firmware_details["motorType"]:
  36. print(f"Motor type not found in file: {ino_file_path}")
  37. return firmware_details if any(firmware_details.values()) else None
  38. except FileNotFoundError:
  39. print(f"File not found: {ino_file_path}")
  40. return None
  41. except Exception as e:
  42. print(f"Error reading .ino file: {str(e)}")
  43. return None
  44. def get_firmware_info(installed_version, installed_type, motor_type=None):
  45. """
  46. Compare installed firmware with available firmware.
  47. """
  48. if motor_type:
  49. # For POST request with specific motor type
  50. if motor_type not in MOTOR_TYPE_MAPPING:
  51. return None, "Invalid motor type"
  52. ino_path = MOTOR_TYPE_MAPPING[motor_type]
  53. firmware_details = get_ino_firmware_details(ino_path)
  54. if not firmware_details:
  55. return None, "Failed to retrieve .ino firmware details"
  56. return {
  57. "success": True,
  58. "installedVersion": 'Unknown',
  59. "installedType": motor_type,
  60. "inoVersion": firmware_details["version"],
  61. "inoType": firmware_details["motorType"],
  62. "updateAvailable": True
  63. }, None
  64. # For GET request to check current firmware
  65. if installed_version != 'Unknown' and installed_type != 'Unknown':
  66. ino_path = MOTOR_TYPE_MAPPING.get(installed_type)
  67. firmware_details = get_ino_firmware_details(ino_path)
  68. if not firmware_details:
  69. return None, "Failed to retrieve .ino firmware details"
  70. update_available = (
  71. installed_version != firmware_details["version"] or
  72. installed_type != firmware_details["motorType"]
  73. )
  74. return {
  75. "success": True,
  76. "installedVersion": installed_version,
  77. "installedType": installed_type,
  78. "inoVersion": firmware_details["version"],
  79. "inoType": firmware_details["motorType"],
  80. "updateAvailable": update_available
  81. }, None
  82. return {
  83. "success": True,
  84. "installedVersion": installed_version,
  85. "installedType": installed_type,
  86. "updateAvailable": False
  87. }, None
  88. def flash_firmware(ser_port, motor_type):
  89. """
  90. Compile and flash the firmware to the connected Arduino.
  91. """
  92. if motor_type not in MOTOR_TYPE_MAPPING:
  93. return False, "Invalid motor type"
  94. build_dir = "/tmp/arduino_build" # Temporary build directory
  95. os.makedirs(build_dir, exist_ok=True)
  96. try:
  97. # Get the .ino file path based on the motor type
  98. ino_file_path = MOTOR_TYPE_MAPPING[motor_type]
  99. ino_file_name = os.path.basename(ino_file_path)
  100. # Install required libraries
  101. required_libraries = ["AccelStepper"]
  102. for library in required_libraries:
  103. library_install_command = ["arduino-cli", "lib", "install", library]
  104. install_process = subprocess.run(library_install_command, capture_output=True, text=True)
  105. if install_process.returncode != 0:
  106. return False, f"Library installation failed for {library}: {install_process.stderr}"
  107. # Compile the .ino file
  108. compile_command = [
  109. "arduino-cli",
  110. "compile",
  111. "--fqbn", "arduino:avr:uno",
  112. "--output-dir", build_dir,
  113. ino_file_path
  114. ]
  115. compile_process = subprocess.run(compile_command, capture_output=True, text=True)
  116. if compile_process.returncode != 0:
  117. return False, compile_process.stderr
  118. # Flash the .hex file
  119. hex_file_path = os.path.join(build_dir, f"{ino_file_name}.hex")
  120. flash_command = [
  121. "avrdude",
  122. "-v",
  123. "-c", "arduino",
  124. "-p", "atmega328p",
  125. "-P", ser_port,
  126. "-b", "115200",
  127. "-D",
  128. "-U", f"flash:w:{hex_file_path}:i"
  129. ]
  130. flash_process = subprocess.run(flash_command, capture_output=True, text=True)
  131. if flash_process.returncode != 0:
  132. return False, flash_process.stderr
  133. return True, "Firmware flashed successfully"
  134. except Exception as e:
  135. logger.error(f"Error flashing firmware: {str(e)}", exc_info=True)
  136. return False, str(e)
  137. finally:
  138. # Clean up temporary files
  139. if os.path.exists(build_dir):
  140. for file in os.listdir(build_dir):
  141. os.remove(os.path.join(build_dir, file))
  142. os.rmdir(build_dir)
  143. def check_git_updates():
  144. """
  145. Check for available software updates.
  146. """
  147. try:
  148. # Fetch the latest updates from the remote repository
  149. subprocess.run(["git", "fetch", "--tags", "--force"], check=True)
  150. # Get the latest tag from the remote
  151. latest_remote_tag = subprocess.check_output(
  152. ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
  153. ).strip().decode()
  154. # Get the latest tag from the local branch
  155. latest_local_tag = subprocess.check_output(
  156. ["git", "describe", "--tags", "--abbrev=0"]
  157. ).strip().decode()
  158. # Count how many tags the local branch is behind
  159. tag_behind_count = 0
  160. if latest_local_tag != latest_remote_tag:
  161. tags = subprocess.check_output(
  162. ["git", "tag", "--merged", "origin/main"], text=True
  163. ).splitlines()
  164. found_local = False
  165. for tag in tags:
  166. if tag == latest_local_tag:
  167. found_local = True
  168. elif found_local:
  169. tag_behind_count += 1
  170. if tag == latest_remote_tag:
  171. break
  172. return {
  173. "updates_available": latest_remote_tag != latest_local_tag,
  174. "tag_behind_count": tag_behind_count,
  175. "latest_remote_tag": latest_remote_tag,
  176. "latest_local_tag": latest_local_tag,
  177. }
  178. except subprocess.CalledProcessError as e:
  179. print(f"Error checking Git updates: {e}")
  180. return {
  181. "updates_available": False,
  182. "tag_behind_count": 0,
  183. "latest_remote_tag": None,
  184. "latest_local_tag": None,
  185. }
  186. def update_software():
  187. """
  188. Update the software to the latest version.
  189. """
  190. error_log = []
  191. def run_command(command, error_message):
  192. try:
  193. subprocess.run(command, check=True)
  194. except subprocess.CalledProcessError as e:
  195. print(f"{error_message}: {e}")
  196. error_log.append(error_message)
  197. try:
  198. # Fetch the latest version tag from remote
  199. subprocess.run(["git", "fetch", "--tags"], check=True)
  200. latest_remote_tag = subprocess.check_output(
  201. ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
  202. ).strip().decode()
  203. except subprocess.CalledProcessError as e:
  204. error_log.append(f"Failed to fetch tags or get latest remote tag: {e}")
  205. return False, "Failed to fetch tags or determine the latest version.", error_log
  206. # Checkout the latest tag
  207. run_command(["git", "checkout", latest_remote_tag, '--force'],
  208. f"Failed to checkout version {latest_remote_tag}")
  209. # Restart Docker containers
  210. run_command(["docker", "compose", "up", "-d"],
  211. "Failed to restart Docker containers")
  212. # Check if the update was successful
  213. update_status = check_git_updates()
  214. if (
  215. update_status["updates_available"] is False
  216. and update_status["latest_local_tag"] == update_status["latest_remote_tag"]
  217. ):
  218. return True, "Update successful", None
  219. else:
  220. return False, "Update incomplete", error_log