1
0

app.py 16 KB


  1. from flask import Flask, request, jsonify, render_template, send_from_directory
  2. import os
  3. import threading
  4. from datetime import datetime
  5. import logging
  6. from modules.serial.serial_manager import (
  7. list_serial_ports, connect_to_serial, disconnect_serial,
  8. restart_serial, get_serial_status, get_device_info,
  9. send_coordinate_batch, send_command
  10. )
  11. from modules.firmware.firmware_manager import (
  12. get_firmware_info, flash_firmware, check_git_updates,
  13. update_software
  14. )
  15. from modules.core.pattern_manager import (
  16. THETA_RHO_DIR, parse_theta_rho_file, run_theta_rho_file,
  17. run_theta_rho_files, get_execution_status, stop_execution,
  18. pause_execution, resume_execution
  19. )
  20. from modules.core.playlist_manager import (
  21. list_all_playlists, get_playlist, create_playlist,
  22. modify_playlist, delete_playlist, add_to_playlist
  23. )
  24. app = Flask(__name__)
  25. logging.basicConfig(level=logging.INFO)
  26. # Ensure the patterns directory exists
  27. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  28. # API Routes
  29. @app.route('/')
  30. def index():
  31. return render_template('index.html')
  32. # Serial Routes
  33. @app.route('/list_serial_ports', methods=['GET'])
  34. def api_list_ports():
  35. return jsonify(list_serial_ports())
  36. @app.route('/connect_serial', methods=['POST'])
  37. def api_connect_serial():
  38. port = request.json.get('port')
  39. if not port:
  40. app.logger.error("No port provided in connect_serial request")
  41. return jsonify({'error': 'No port provided'}), 400
  42. try:
  43. success = connect_to_serial(port)
  44. return jsonify({'success': success})
  45. except Exception as e:
  46. app.logger.error(f"Error connecting to serial port: {str(e)}", exc_info=True)
  47. return jsonify({'error': str(e)}), 500
  48. @app.route('/disconnect_serial', methods=['POST'])
  49. def api_disconnect():
  50. try:
  51. disconnect_serial()
  52. return jsonify({'success': True})
  53. except Exception as e:
  54. app.logger.error(f"Error disconnecting serial port: {str(e)}", exc_info=True)
  55. return jsonify({'error': str(e)}), 500
  56. @app.route('/restart_serial', methods=['POST'])
  57. def api_restart():
  58. port = request.json.get('port')
  59. if not port:
  60. app.logger.error("No port provided in restart_serial request")
  61. return jsonify({'error': 'No port provided'}), 400
  62. try:
  63. success = restart_serial(port)
  64. return jsonify({'success': success})
  65. except Exception as e:
  66. app.logger.error(f"Error restarting serial port: {str(e)}", exc_info=True)
  67. return jsonify({'error': str(e)}), 500
  68. @app.route('/serial_status', methods=['GET'])
  69. def api_serial_status():
  70. return jsonify(get_serial_status())
  71. # Pattern Routes
  72. @app.route('/list_theta_rho_files', methods=['GET'])
  73. def api_list_theta_rho_files():
  74. files = []
  75. for root, _, filenames in os.walk(THETA_RHO_DIR):
  76. for file in filenames:
  77. relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
  78. files.append(relative_path)
  79. return jsonify(sorted(files))
  80. @app.route('/upload_theta_rho', methods=['POST'])
  81. def api_upload_theta_rho():
  82. custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
  83. os.makedirs(custom_patterns_dir, exist_ok=True)
  84. file = request.files['file']
  85. if file:
  86. file.save(os.path.join(custom_patterns_dir, file.filename))
  87. return jsonify({'success': True})
  88. return jsonify({'success': False})
  89. @app.route('/run_theta_rho', methods=['POST'])
  90. def api_run_theta_rho():
  91. file_name = request.json.get('file_name')
  92. pre_execution = request.json.get('pre_execution')
  93. if not file_name:
  94. app.logger.error("No file name provided in run_theta_rho request")
  95. return jsonify({'error': 'No file name provided'}), 400
  96. file_path = os.path.join(THETA_RHO_DIR, file_name)
  97. if not os.path.exists(file_path):
  98. app.logger.error(f"File not found: {file_path}")
  99. return jsonify({'error': 'File not found'}), 404
  100. try:
  101. files_to_run = []
  102. if pre_execution in ['clear_in', 'clear_out', 'clear_sideway']:
  103. files_to_run.append(f'./patterns/clear_from_{pre_execution.split("_")[1]}.thr')
  104. files_to_run.append(file_path)
  105. threading.Thread(
  106. target=run_theta_rho_files,
  107. args=(files_to_run,),
  108. kwargs={'pause_time': 0, 'clear_pattern': None}
  109. ).start()
  110. return jsonify({'success': True})
  111. except Exception as e:
  112. app.logger.error(f"Error running theta rho file: {str(e)}", exc_info=True)
  113. return jsonify({'error': str(e)}), 500
  114. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  115. def api_run_specific_theta_rho_file(file_name):
  116. """Run a specific theta-rho file."""
  117. file_path = os.path.join(THETA_RHO_DIR, file_name)
  118. if not os.path.exists(file_path):
  119. return jsonify({'error': 'File not found'}), 404
  120. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  121. return jsonify({'success': True})
  122. @app.route('/preview_thr', methods=['POST'])
  123. def api_preview_thr():
  124. file_name = request.json.get('file_name')
  125. if not file_name:
  126. app.logger.error("No file name provided in preview_thr request")
  127. return jsonify({'error': 'No file name provided'}), 400
  128. # sometimes the frontend sends the complete path, not just the file name
  129. if file_name.startswith("./patterns"):
  130. file_name = file_name.split('/')[-1].split('\\')[-1]
  131. file_path = os.path.join(THETA_RHO_DIR, file_name)
  132. if not os.path.exists(file_path):
  133. app.logger.error(f"File not found: {file_path}")
  134. return jsonify({'error': 'File not found'}), 404
  135. try:
  136. coordinates = parse_theta_rho_file(file_path)
  137. return jsonify({'success': True, 'coordinates': coordinates})
  138. except Exception as e:
  139. app.logger.error(f"Error parsing theta rho file: {str(e)}", exc_info=True)
  140. return jsonify({'error': str(e)}), 500
  141. @app.route('/send_coordinate', methods=['POST'])
  142. def api_send_coordinate():
  143. try:
  144. data = request.json
  145. theta = data.get('theta')
  146. rho = data.get('rho')
  147. if theta is None or rho is None:
  148. return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
  149. send_coordinate_batch([(theta, rho)])
  150. return jsonify({"success": True})
  151. except Exception as e:
  152. return jsonify({"success": False, "error": str(e)}), 500
  153. @app.route('/send_home', methods=['POST'])
  154. def api_send_home():
  155. """Send the HOME command to the Arduino."""
  156. try:
  157. send_command("HOME")
  158. return jsonify({'success': True})
  159. except Exception as e:
  160. return jsonify({'error': str(e)}), 500
  161. @app.route('/move_to_center', methods=['POST'])
  162. def api_move_to_center():
  163. """Move the sand table to the center position."""
  164. try:
  165. coordinates = [(0, 0)] # Center position
  166. send_coordinate_batch(coordinates)
  167. return jsonify({"success": True})
  168. except Exception as e:
  169. return jsonify({"success": False, "error": str(e)}), 500
  170. @app.route('/move_to_perimeter', methods=['POST'])
  171. def api_move_to_perimeter():
  172. """Move the sand table to the perimeter position."""
  173. try:
  174. MAX_RHO = 1
  175. coordinates = [(0, MAX_RHO)] # Perimeter position
  176. send_coordinate_batch(coordinates)
  177. return jsonify({"success": True})
  178. except Exception as e:
  179. return jsonify({"success": False, "error": str(e)}), 500
  180. @app.route('/set_speed', methods=['POST'])
  181. def api_set_speed():
  182. """Set the speed for the Arduino."""
  183. try:
  184. data = request.json
  185. speed = data.get('speed')
  186. if speed is None:
  187. return jsonify({"success": False, "error": "Speed is required"}), 400
  188. if not isinstance(speed, (int, float)) or speed <= 0:
  189. return jsonify({"success": False, "error": "Invalid speed value"}), 400
  190. # Send the SET_SPEED command to the Arduino
  191. send_command(f"SET_SPEED {speed}")
  192. return jsonify({"success": True, "speed": speed})
  193. except Exception as e:
  194. return jsonify({"success": False, "error": str(e)}), 500
  195. # Playlist Routes
  196. @app.route("/list_all_playlists", methods=["GET"])
  197. def api_list_all_playlists():
  198. playlist_names = list_all_playlists()
  199. return jsonify(playlist_names)
  200. @app.route("/get_playlist", methods=["GET"])
  201. def api_get_playlist():
  202. playlist_name = request.args.get("name", "")
  203. if not playlist_name:
  204. return jsonify({"error": "Missing playlist 'name' parameter"}), 400
  205. playlist = get_playlist(playlist_name)
  206. if not playlist:
  207. return jsonify({"error": f"Playlist '{playlist_name}' not found"}), 404
  208. return jsonify(playlist)
  209. @app.route("/create_playlist", methods=["POST"])
  210. def api_create_playlist():
  211. data = request.get_json()
  212. if not data or "name" not in data or "files" not in data:
  213. app.logger.error("Missing required fields in create_playlist request")
  214. return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
  215. success = create_playlist(data["name"], data["files"])
  216. return jsonify({
  217. "success": success,
  218. "message": f"Playlist '{data['name']}' created/updated"
  219. })
  220. @app.route("/modify_playlist", methods=["POST"])
  221. def api_modify_playlist():
  222. data = request.get_json()
  223. if not data or "name" not in data or "files" not in data:
  224. return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
  225. success = modify_playlist(data["name"], data["files"])
  226. return jsonify({
  227. "success": success,
  228. "message": f"Playlist '{data['name']}' updated"
  229. })
  230. @app.route("/delete_playlist", methods=["DELETE"])
  231. def api_delete_playlist():
  232. data = request.get_json()
  233. if not data or "name" not in data:
  234. return jsonify({"success": False, "error": "Missing 'name' field"}), 400
  235. success = delete_playlist(data["name"])
  236. if not success:
  237. return jsonify({"success": False, "error": f"Playlist '{data['name']}' not found"}), 404
  238. return jsonify({
  239. "success": True,
  240. "message": f"Playlist '{data['name']}' deleted"
  241. })
  242. @app.route('/add_to_playlist', methods=['POST'])
  243. def api_add_to_playlist():
  244. data = request.json
  245. playlist_name = data.get('playlist_name')
  246. pattern = data.get('pattern')
  247. success = add_to_playlist(playlist_name, pattern)
  248. if success:
  249. return jsonify(success=True)
  250. else:
  251. return jsonify(success=False, error='Playlist not found'), 404
  252. @app.route("/run_playlist", methods=["POST"])
  253. def api_run_playlist():
  254. data = request.get_json()
  255. if not data or "playlist_name" not in data:
  256. return jsonify({"success": False, "error": "Missing 'playlist_name' field"}), 400
  257. playlist = get_playlist(data["playlist_name"])
  258. if not playlist:
  259. return jsonify({"success": False, "error": f"Playlist '{data['playlist_name']}' not found"}), 404
  260. schedule_hours = None
  261. start_time = data.get("start_time")
  262. end_time = data.get("end_time")
  263. if start_time and end_time:
  264. try:
  265. start_time_obj = datetime.strptime(start_time, "%H:%M").time()
  266. end_time_obj = datetime.strptime(end_time, "%H:%M").time()
  267. if start_time_obj >= end_time_obj:
  268. return jsonify({"success": False, "error": "'start_time' must be earlier than 'end_time'"}), 400
  269. schedule_hours = (start_time_obj, end_time_obj)
  270. except ValueError:
  271. return jsonify({"success": False, "error": "Invalid time format. Use HH:MM (e.g., '09:30')"}), 400
  272. file_paths = [os.path.join(THETA_RHO_DIR, file) for file in playlist["files"]]
  273. if not file_paths:
  274. return jsonify({"success": False, "error": f"Playlist '{data['playlist_name']}' is empty"}), 400
  275. try:
  276. threading.Thread(
  277. target=run_theta_rho_files,
  278. args=(file_paths,),
  279. kwargs={
  280. 'pause_time': data.get("pause_time", 0),
  281. 'clear_pattern': data.get("clear_pattern"),
  282. 'run_mode': data.get("run_mode", "single"),
  283. 'shuffle': data.get("shuffle", False),
  284. 'schedule_hours': schedule_hours
  285. },
  286. daemon=True
  287. ).start()
  288. return jsonify({"success": True, "message": f"Playlist '{data['playlist_name']}' is now running."})
  289. except Exception as e:
  290. return jsonify({"success": False, "error": str(e)}), 500
  291. # Execution Control Routes
  292. @app.route('/stop_execution', methods=['POST'])
  293. def api_stop_execution():
  294. stop_execution()
  295. return jsonify({'success': True})
  296. @app.route('/pause_execution', methods=['POST'])
  297. def api_pause_execution():
  298. pause_execution()
  299. return jsonify({'success': True, 'message': 'Execution paused'})
  300. @app.route('/resume_execution', methods=['POST'])
  301. def api_resume_execution():
  302. resume_execution()
  303. return jsonify({'success': True, 'message': 'Execution resumed'})
  304. @app.route('/status', methods=['GET'])
  305. def api_get_status():
  306. return jsonify(get_execution_status())
  307. # Firmware Routes
  308. @app.route('/get_firmware_info', methods=['GET', 'POST'])
  309. def api_get_firmware_info():
  310. device_info = get_device_info()
  311. if request.method == "POST":
  312. motor_type = request.json.get("motorType")
  313. info, error = get_firmware_info(
  314. device_info['firmware_version'],
  315. device_info['driver_type'],
  316. motor_type
  317. )
  318. else:
  319. info, error = get_firmware_info(
  320. device_info['firmware_version'],
  321. device_info['driver_type']
  322. )
  323. if error:
  324. return jsonify({"success": False, "error": error}), 500
  325. return jsonify(info)
  326. @app.route('/flash_firmware', methods=['POST'])
  327. def api_flash_firmware():
  328. status = get_serial_status()
  329. if not status['connected']:
  330. return jsonify({"success": False, "error": "No Arduino connected or connection lost"}), 400
  331. motor_type = request.json.get("motorType")
  332. success, message = flash_firmware(status['port'], motor_type)
  333. if success:
  334. return jsonify({"success": True, "message": message})
  335. app.logger.error(message)
  336. return jsonify({"success": False, "error": message}), 500
  337. @app.route('/check_software_update', methods=['GET'])
  338. def api_check_updates():
  339. update_info = check_git_updates()
  340. return jsonify(update_info)
  341. @app.route('/update_software', methods=['POST'])
  342. def api_update_software():
  343. success, message, error_log = update_software()
  344. if success:
  345. return jsonify({"success": True})
  346. return jsonify({
  347. "success": False,
  348. "error": message,
  349. "details": error_log
  350. }), 500
  351. # File Management Routes
  352. @app.route('/download/<filename>', methods=['GET'])
  353. def download_file(filename):
  354. return send_from_directory(THETA_RHO_DIR, filename)
  355. @app.route('/delete_theta_rho_file', methods=['POST'])
  356. def api_delete_theta_rho_file():
  357. data = request.json
  358. file_name = data.get('file_name')
  359. if not file_name:
  360. app.logger.error("No file name provided in delete_theta_rho_file request")
  361. return jsonify({"success": False, "error": "No file name provided"}), 400
  362. file_path = os.path.join(THETA_RHO_DIR, file_name)
  363. if not os.path.exists(file_path):
  364. app.logger.error(f"File not found: {file_path}")
  365. return jsonify({"success": False, "error": "File not found"}), 404
  366. try:
  367. os.remove(file_path)
  368. return jsonify({"success": True})
  369. except Exception as e:
  370. app.logger.error(f"Error deleting theta rho file: {str(e)}", exc_info=True)
  371. return jsonify({"success": False, "error": str(e)}), 500
  372. if __name__ == '__main__':
  373. # Auto-connect to serial
  374. connect_to_serial()
  375. app.run(debug=False, host='0.0.0.0', port=8080)