firmware_manager.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. import os
  2. import subprocess
  3. import logging
  4. from ..serial import serial_manager
  5. # Configure logging
  6. logger = logging.getLogger(__name__)
  7. def check_git_updates():
  8. """Check for available Git updates."""
  9. try:
  10. logger.debug("Checking for Git updates")
  11. subprocess.run(["git", "fetch", "--tags", "--force"], check=True)
  12. latest_remote_tag = subprocess.check_output(
  13. ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
  14. ).strip().decode()
  15. latest_local_tag = subprocess.check_output(
  16. ["git", "describe", "--tags", "--abbrev=0"]
  17. ).strip().decode()
  18. tag_behind_count = 0
  19. if latest_local_tag != latest_remote_tag:
  20. tags = subprocess.check_output(
  21. ["git", "tag", "--merged", "origin/main"], text=True
  22. ).splitlines()
  23. found_local = False
  24. for tag in tags:
  25. if tag == latest_local_tag:
  26. found_local = True
  27. elif found_local:
  28. tag_behind_count += 1
  29. if tag == latest_remote_tag:
  30. break
  31. updates_available = latest_remote_tag != latest_local_tag
  32. logger.info(f"Updates available: {updates_available}, {tag_behind_count} versions behind")
  33. return {
  34. "updates_available": updates_available,
  35. "tag_behind_count": tag_behind_count,
  36. "latest_remote_tag": latest_remote_tag,
  37. "latest_local_tag": latest_local_tag,
  38. }
  39. except subprocess.CalledProcessError as e:
  40. logger.error(f"Error checking Git updates: {e}")
  41. return {
  42. "updates_available": False,
  43. "tag_behind_count": 0,
  44. "latest_remote_tag": None,
  45. "latest_local_tag": None,
  46. }
  47. def update_software():
  48. """Update the software to the latest version."""
  49. error_log = []
  50. logger.info("Starting software update process")
  51. def run_command(command, error_message):
  52. try:
  53. logger.debug(f"Running command: {' '.join(command)}")
  54. subprocess.run(command, check=True)
  55. except subprocess.CalledProcessError as e:
  56. logger.error(f"{error_message}: {e}")
  57. error_log.append(error_message)
  58. try:
  59. subprocess.run(["git", "fetch", "--tags"], check=True)
  60. latest_remote_tag = subprocess.check_output(
  61. ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
  62. ).strip().decode()
  63. logger.info(f"Latest remote tag: {latest_remote_tag}")
  64. except subprocess.CalledProcessError as e:
  65. error_msg = f"Failed to fetch tags or get latest remote tag: {e}"
  66. logger.error(error_msg)
  67. error_log.append(error_msg)
  68. return False, error_msg, error_log
  69. run_command(["git", "checkout", latest_remote_tag, '--force'], f"Failed to checkout version {latest_remote_tag}")
  70. run_command(["docker", "compose", "pull"], "Failed to fetch Docker containers")
  71. run_command(["docker", "compose", "up", "-d"], "Failed to restart Docker containers")
  72. update_status = check_git_updates()
  73. if (
  74. update_status["updates_available"] is False
  75. and update_status["latest_local_tag"] == update_status["latest_remote_tag"]
  76. ):
  77. logger.info("Software update completed successfully")
  78. return True, None, None
  79. else:
  80. logger.error("Software update incomplete")
  81. return False, "Update incomplete", error_log