app.py 12 KB


  1. from flask import Flask, request, jsonify, render_template, send_from_directory
  2. import os
  3. import threading
  4. from datetime import datetime
  5. import logging
  6. from modules.serial.serial_manager import (
  7. list_serial_ports, connect_to_serial, disconnect_serial,
  8. restart_serial, get_serial_status, get_device_info,
  9. send_coordinate_batch
  10. )
  11. from modules.firmware.firmware_manager import (
  12. get_firmware_info, flash_firmware, check_git_updates,
  13. update_software
  14. )
  15. from modules.core.pattern_manager import (
  16. THETA_RHO_DIR, parse_theta_rho_file, run_theta_rho_file,
  17. run_theta_rho_files, get_execution_status, stop_execution,
  18. pause_execution, resume_execution
  19. )
  20. from modules.core.playlist_manager import (
  21. list_all_playlists, get_playlist, create_playlist,
  22. modify_playlist, delete_playlist, add_to_playlist
  23. )
  24. app = Flask(__name__)
  25. logging.basicConfig(level=logging.INFO)
  26. # Ensure the patterns directory exists
  27. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  28. # API Routes
  29. @app.route('/')
  30. def index():
  31. return render_template('index.html')
  32. # Serial Routes
  33. @app.route('/list_serial_ports', methods=['GET'])
  34. def api_list_ports():
  35. return jsonify(list_serial_ports())
  36. @app.route('/connect_serial', methods=['POST'])
  37. def api_connect_serial():
  38. port = request.json.get('port')
  39. if not port:
  40. return jsonify({'error': 'No port provided'}), 400
  41. try:
  42. success = connect_to_serial(port)
  43. return jsonify({'success': success})
  44. except Exception as e:
  45. app.logger.error(f"Error connecting to serial port: {str(e)}", exc_info=True)
  46. return jsonify({'error': str(e)}), 500
  47. @app.route('/disconnect_serial', methods=['POST'])
  48. def api_disconnect():
  49. try:
  50. disconnect_serial()
  51. return jsonify({'success': True})
  52. except Exception as e:
  53. return jsonify({'error': str(e)}), 500
  54. @app.route('/restart_serial', methods=['POST'])
  55. def api_restart():
  56. port = request.json.get('port')
  57. if not port:
  58. return jsonify({'error': 'No port provided'}), 400
  59. try:
  60. success = restart_serial(port)
  61. return jsonify({'success': success})
  62. except Exception as e:
  63. return jsonify({'error': str(e)}), 500
  64. @app.route('/serial_status', methods=['GET'])
  65. def api_serial_status():
  66. return jsonify(get_serial_status())
  67. # Pattern Routes
  68. @app.route('/list_theta_rho_files', methods=['GET'])
  69. def api_list_theta_rho_files():
  70. files = []
  71. for root, _, filenames in os.walk(THETA_RHO_DIR):
  72. for file in filenames:
  73. relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
  74. files.append(relative_path)
  75. return jsonify(sorted(files))
  76. @app.route('/upload_theta_rho', methods=['POST'])
  77. def api_upload_theta_rho():
  78. custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
  79. os.makedirs(custom_patterns_dir, exist_ok=True)
  80. file = request.files['file']
  81. if file:
  82. file.save(os.path.join(custom_patterns_dir, file.filename))
  83. return jsonify({'success': True})
  84. return jsonify({'success': False})
  85. @app.route('/run_theta_rho', methods=['POST'])
  86. def api_run_theta_rho():
  87. file_name = request.json.get('file_name')
  88. pre_execution = request.json.get('pre_execution')
  89. if not file_name:
  90. return jsonify({'error': 'No file name provided'}), 400
  91. file_path = os.path.join(THETA_RHO_DIR, file_name)
  92. if not os.path.exists(file_path):
  93. return jsonify({'error': 'File not found'}), 404
  94. try:
  95. files_to_run = []
  96. if pre_execution in ['clear_in', 'clear_out', 'clear_sideway']:
  97. files_to_run.append(f'./patterns/clear_from_{pre_execution.split("_")[1]}.thr')
  98. files_to_run.append(file_path)
  99. threading.Thread(
  100. target=run_theta_rho_files,
  101. args=(files_to_run,),
  102. kwargs={'pause_time': 0, 'clear_pattern': None}
  103. ).start()
  104. return jsonify({'success': True})
  105. except Exception as e:
  106. return jsonify({'error': str(e)}), 500
  107. @app.route('/preview_thr', methods=['POST'])
  108. def api_preview_thr():
  109. file_name = request.json.get('file_name')
  110. if not file_name:
  111. return jsonify({'error': 'No file name provided'}), 400
  112. file_path = os.path.join(THETA_RHO_DIR, file_name)
  113. if not os.path.exists(file_path):
  114. return jsonify({'error': 'File not found'}), 404
  115. try:
  116. coordinates = parse_theta_rho_file(file_path)
  117. return jsonify({'success': True, 'coordinates': coordinates})
  118. except Exception as e:
  119. return jsonify({'error': str(e)}), 500
  120. @app.route('/send_coordinate', methods=['POST'])
  121. def api_send_coordinate():
  122. try:
  123. data = request.json
  124. theta = data.get('theta')
  125. rho = data.get('rho')
  126. if theta is None or rho is None:
  127. return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
  128. send_coordinate_batch([(theta, rho)])
  129. return jsonify({"success": True})
  130. except Exception as e:
  131. return jsonify({"success": False, "error": str(e)}), 500
  132. # Playlist Routes
  133. @app.route("/list_all_playlists", methods=["GET"])
  134. def api_list_all_playlists():
  135. playlist_names = list_all_playlists()
  136. return jsonify(playlist_names)
  137. @app.route("/get_playlist", methods=["GET"])
  138. def api_get_playlist():
  139. playlist_name = request.args.get("name", "")
  140. if not playlist_name:
  141. return jsonify({"error": "Missing playlist 'name' parameter"}), 400
  142. playlist = get_playlist(playlist_name)
  143. if not playlist:
  144. return jsonify({"error": f"Playlist '{playlist_name}' not found"}), 404
  145. return jsonify(playlist)
  146. @app.route("/create_playlist", methods=["POST"])
  147. def api_create_playlist():
  148. data = request.get_json()
  149. if not data or "name" not in data or "files" not in data:
  150. return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
  151. success = create_playlist(data["name"], data["files"])
  152. return jsonify({
  153. "success": success,
  154. "message": f"Playlist '{data['name']}' created/updated"
  155. })
  156. @app.route("/modify_playlist", methods=["POST"])
  157. def api_modify_playlist():
  158. data = request.get_json()
  159. if not data or "name" not in data or "files" not in data:
  160. return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
  161. success = modify_playlist(data["name"], data["files"])
  162. return jsonify({
  163. "success": success,
  164. "message": f"Playlist '{data['name']}' updated"
  165. })
  166. @app.route("/delete_playlist", methods=["DELETE"])
  167. def api_delete_playlist():
  168. data = request.get_json()
  169. if not data or "name" not in data:
  170. return jsonify({"success": False, "error": "Missing 'name' field"}), 400
  171. success = delete_playlist(data["name"])
  172. if not success:
  173. return jsonify({"success": False, "error": f"Playlist '{data['name']}' not found"}), 404
  174. return jsonify({
  175. "success": True,
  176. "message": f"Playlist '{data['name']}' deleted"
  177. })
  178. @app.route('/add_to_playlist', methods=['POST'])
  179. def api_add_to_playlist():
  180. data = request.json
  181. playlist_name = data.get('playlist_name')
  182. pattern = data.get('pattern')
  183. success = add_to_playlist(playlist_name, pattern)
  184. if success:
  185. return jsonify(success=True)
  186. else:
  187. return jsonify(success=False, error='Playlist not found'), 404
  188. @app.route("/run_playlist", methods=["POST"])
  189. def api_run_playlist():
  190. data = request.get_json()
  191. if not data or "playlist_name" not in data:
  192. return jsonify({"success": False, "error": "Missing 'playlist_name' field"}), 400
  193. playlist = get_playlist(data["playlist_name"])
  194. if not playlist:
  195. return jsonify({"success": False, "error": f"Playlist '{data['playlist_name']}' not found"}), 404
  196. schedule_hours = None
  197. start_time = data.get("start_time")
  198. end_time = data.get("end_time")
  199. if start_time and end_time:
  200. try:
  201. start_time_obj = datetime.strptime(start_time, "%H:%M").time()
  202. end_time_obj = datetime.strptime(end_time, "%H:%M").time()
  203. if start_time_obj >= end_time_obj:
  204. return jsonify({"success": False, "error": "'start_time' must be earlier than 'end_time'"}), 400
  205. schedule_hours = (start_time_obj, end_time_obj)
  206. except ValueError:
  207. return jsonify({"success": False, "error": "Invalid time format. Use HH:MM (e.g., '09:30')"}), 400
  208. file_paths = [os.path.join(THETA_RHO_DIR, file) for file in playlist["files"]]
  209. if not file_paths:
  210. return jsonify({"success": False, "error": f"Playlist '{data['playlist_name']}' is empty"}), 400
  211. try:
  212. threading.Thread(
  213. target=run_theta_rho_files,
  214. args=(file_paths,),
  215. kwargs={
  216. 'pause_time': data.get("pause_time", 0),
  217. 'clear_pattern': data.get("clear_pattern"),
  218. 'run_mode': data.get("run_mode", "single"),
  219. 'shuffle': data.get("shuffle", False),
  220. 'schedule_hours': schedule_hours
  221. },
  222. daemon=True
  223. ).start()
  224. return jsonify({"success": True, "message": f"Playlist '{data['playlist_name']}' is now running."})
  225. except Exception as e:
  226. return jsonify({"success": False, "error": str(e)}), 500
  227. # Execution Control Routes
  228. @app.route('/stop_execution', methods=['POST'])
  229. def api_stop_execution():
  230. stop_execution()
  231. return jsonify({'success': True})
  232. @app.route('/pause_execution', methods=['POST'])
  233. def api_pause_execution():
  234. pause_execution()
  235. return jsonify({'success': True, 'message': 'Execution paused'})
  236. @app.route('/resume_execution', methods=['POST'])
  237. def api_resume_execution():
  238. resume_execution()
  239. return jsonify({'success': True, 'message': 'Execution resumed'})
  240. @app.route('/status', methods=['GET'])
  241. def api_get_status():
  242. return jsonify(get_execution_status())
  243. # Firmware Routes
  244. @app.route('/get_firmware_info', methods=['GET', 'POST'])
  245. def api_get_firmware_info():
  246. device_info = get_device_info()
  247. if request.method == "POST":
  248. motor_type = request.json.get("motorType")
  249. info, error = get_firmware_info(
  250. device_info['firmware_version'],
  251. device_info['driver_type'],
  252. motor_type
  253. )
  254. else:
  255. info, error = get_firmware_info(
  256. device_info['firmware_version'],
  257. device_info['driver_type']
  258. )
  259. if error:
  260. return jsonify({"success": False, "error": error}), 500
  261. return jsonify(info)
  262. @app.route('/flash_firmware', methods=['POST'])
  263. def api_flash_firmware():
  264. status = get_serial_status()
  265. if not status['connected']:
  266. return jsonify({"success": False, "error": "No Arduino connected or connection lost"}), 400
  267. motor_type = request.json.get("motorType")
  268. success, message = flash_firmware(status['port'], motor_type)
  269. if success:
  270. return jsonify({"success": True, "message": message})
  271. app.logger.error(message)
  272. return jsonify({"success": False, "error": message}), 500
  273. @app.route('/check_software_update', methods=['GET'])
  274. def api_check_updates():
  275. update_info = check_git_updates()
  276. return jsonify(update_info)
  277. @app.route('/update_software', methods=['POST'])
  278. def api_update_software():
  279. success, message, error_log = update_software()
  280. if success:
  281. return jsonify({"success": True})
  282. return jsonify({
  283. "success": False,
  284. "error": message,
  285. "details": error_log
  286. }), 500
  287. # File Management Routes
  288. @app.route('/download/<filename>', methods=['GET'])
  289. def download_file(filename):
  290. return send_from_directory(THETA_RHO_DIR, filename)
  291. @app.route('/delete_theta_rho_file', methods=['POST'])
  292. def api_delete_theta_rho_file():
  293. data = request.json
  294. file_name = data.get('file_name')
  295. if not file_name:
  296. return jsonify({"success": False, "error": "No file name provided"}), 400
  297. file_path = os.path.join(THETA_RHO_DIR, file_name)
  298. if not os.path.exists(file_path):
  299. return jsonify({"success": False, "error": "File not found"}), 404
  300. try:
  301. os.remove(file_path)
  302. return jsonify({"success": True})
  303. except Exception as e:
  304. return jsonify({"success": False, "error": str(e)}), 500
  305. if __name__ == '__main__':
  306. # Auto-connect to serial
  307. connect_to_serial()
  308. app.run(debug=True, host='0.0.0.0', port=8080)