app.py 34 KB


  1. from flask import Flask, request, jsonify, render_template
  2. import os
  3. import serial
  4. import time
  5. import random
  6. import threading
  7. import serial.tools.list_ports
  8. import math
  9. import json
  10. import subprocess
  11. app = Flask(__name__)
  12. # Configuration
  13. THETA_RHO_DIR = './patterns'
  14. IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  15. CLEAR_PATTERNS = {
  16. "clear_from_in": "./patterns/clear_from_in.thr",
  17. "clear_from_out": "./patterns/clear_from_out.thr",
  18. "clear_sideway": "./patterns/clear_sideway.thr"
  19. }
  20. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  21. # Serial connection (First available will be selected by default)
  22. ser = None
  23. ser_port = None # Global variable to store the serial port name
  24. stop_requested = False
  25. serial_lock = threading.Lock()
  26. PLAYLISTS_FILE = os.path.join(os.getcwd(), "playlists.json")
  27. OPTIONS_FILE = os.path.join(os.getcwd(), "options.json")
  28. MOTOR_TYPE_MAPPING = {
  29. "TMC2209": "./arduino_code_TMC2209/arduino_code_TMC2209.ino",
  30. "DRV8825": "./arduino_code/arduino_code.ino",
  31. "esp32": "./esp32/esp32.ino"
  32. }
  33. # Ensure the file exists and contains at least an empty JSON object
  34. if not os.path.exists(PLAYLISTS_FILE):
  35. with open(PLAYLISTS_FILE, "w") as f:
  36. json.dump({}, f, indent=2)
  37. # Ensure the file exists and contains at least an empty JSON object
  38. if not os.path.exists(OPTIONS_FILE):
  39. with open(PLAYLISTS_FILE, "w") as f:
  40. json.dump({}, f, indent=2)
  41. def get_ino_firmware_details(ino_file_path):
  42. """
  43. Extract firmware details, including version and motor type, from the given .ino file.
  44. Args:
  45. ino_file_path (str): Path to the .ino file.
  46. Returns:
  47. dict: Dictionary containing firmware details such as version and motor type, or None if not found.
  48. """
  49. try:
  50. if not ino_file_path:
  51. raise ValueError("Invalid path: ino_file_path is None or empty.")
  52. firmware_details = {"version": None, "motorType": None}
  53. with open(ino_file_path, "r") as file:
  54. for line in file:
  55. # Extract firmware version
  56. if "firmwareVersion" in line:
  57. start = line.find('"') + 1
  58. end = line.rfind('"')
  59. if start != -1 and end != -1 and start < end:
  60. firmware_details["version"] = line[start:end]
  61. # Extract motor type
  62. if "motorType" in line:
  63. start = line.find('"') + 1
  64. end = line.rfind('"')
  65. if start != -1 and end != -1 and start < end:
  66. firmware_details["motorType"] = line[start:end]
  67. if not firmware_details["version"]:
  68. print(f"Firmware version not found in file: {ino_file_path}")
  69. if not firmware_details["motorType"]:
  70. print(f"Motor type not found in file: {ino_file_path}")
  71. return firmware_details if any(firmware_details.values()) else None
  72. except FileNotFoundError:
  73. print(f"File not found: {ino_file_path}")
  74. return None
  75. except Exception as e:
  76. print(f"Error reading .ino file: {str(e)}")
  77. return None
  78. def list_serial_ports():
  79. """Return a list of available serial ports."""
  80. ports = serial.tools.list_ports.comports()
  81. return [port.device for port in ports if port.device not in IGNORE_PORTS]
  82. def connect_to_serial(port=None, baudrate=115200):
  83. """Automatically connect to the first available serial port or a specified port."""
  84. global ser, ser_port
  85. try:
  86. if port is None:
  87. ports = list_serial_ports()
  88. if not ports:
  89. print("No serial port connected")
  90. return False
  91. port = ports[0] # Auto-select the first available port
  92. with serial_lock:
  93. if ser and ser.is_open:
  94. ser.close()
  95. ser = serial.Serial(port, baudrate)
  96. ser_port = port # Store the connected port globally
  97. print(f"Connected to serial port: {port}")
  98. time.sleep(2) # Allow time for the connection to establish
  99. return True # Successfully connected
  100. except serial.SerialException as e:
  101. print(f"Failed to connect to serial port {port}: {e}")
  102. port = None # Reset the port to try the next available one
  103. print("Max retries reached. Could not connect to a serial port.")
  104. return False
  105. def disconnect_serial():
  106. """Disconnect the current serial connection."""
  107. global ser, ser_port
  108. if ser and ser.is_open:
  109. ser.close()
  110. ser = None
  111. ser_port = None # Reset the port name
  112. def restart_serial(port, baudrate=115200):
  113. """Restart the serial connection."""
  114. disconnect_serial()
  115. connect_to_serial(port, baudrate)
  116. def parse_theta_rho_file(file_path):
  117. """
  118. Parse a theta-rho file and return a list of (theta, rho) pairs.
  119. Normalizes the list so the first theta is always 0.
  120. """
  121. coordinates = []
  122. try:
  123. with open(file_path, 'r') as file:
  124. for line in file:
  125. line = line.strip()
  126. # Skip header or comment lines (starting with '#' or empty lines)
  127. if not line or line.startswith("#"):
  128. continue
  129. # Parse lines with theta and rho separated by spaces
  130. try:
  131. theta, rho = map(float, line.split())
  132. coordinates.append((theta, rho))
  133. except ValueError:
  134. print(f"Skipping invalid line: {line}")
  135. continue
  136. except Exception as e:
  137. print(f"Error reading file: {e}")
  138. return coordinates
  139. # ---- Normalization Step ----
  140. if coordinates:
  141. # Take the first coordinate's theta
  142. first_theta = coordinates[0][0]
  143. # Shift all thetas so the first coordinate has theta=0
  144. normalized = []
  145. for (theta, rho) in coordinates:
  146. normalized.append((theta - first_theta, rho))
  147. # Replace original list with normalized data
  148. coordinates = normalized
  149. return coordinates
  150. def send_coordinate_batch(ser, coordinates):
  151. """Send a batch of theta-rho pairs to the Arduino."""
  152. # print("Sending batch:", coordinates)
  153. batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
  154. ser.write(batch_str.encode())
  155. def send_command(command):
  156. """Send a single command to the Arduino."""
  157. ser.write(f"{command}\n".encode())
  158. print(f"Sent: {command}")
  159. # Wait for "R" acknowledgment from Arduino
  160. while True:
  161. with serial_lock:
  162. if ser.in_waiting > 0:
  163. response = ser.readline().decode().strip()
  164. print(f"Arduino response: {response}")
  165. if response == "R":
  166. print("Command execution completed.")
  167. break
  168. def run_theta_rho_file(file_path):
  169. """Run a theta-rho file by sending data in optimized batches."""
  170. global stop_requested
  171. stop_requested = False
  172. coordinates = parse_theta_rho_file(file_path)
  173. if len(coordinates) < 2:
  174. print("Not enough coordinates for interpolation.")
  175. return
  176. # Optimize batch size for smoother execution
  177. batch_size = 10 # Smaller batches may smooth movement further
  178. for i in range(0, len(coordinates), batch_size):
  179. # Check stop_requested flag after sending the batch
  180. if stop_requested:
  181. print("Execution stopped by user after completing the current batch.")
  182. break
  183. batch = coordinates[i:i + batch_size]
  184. if i == 0:
  185. send_coordinate_batch(ser, batch)
  186. continue
  187. # Wait until Arduino is READY before sending the batch
  188. while True:
  189. with serial_lock:
  190. if ser.in_waiting > 0:
  191. response = ser.readline().decode().strip()
  192. if response == "R":
  193. send_coordinate_batch(ser, batch)
  194. break
  195. else:
  196. print(f"Arduino response: {response}")
  197. # Reset theta after execution or stopping
  198. reset_theta()
  199. ser.write("FINISHED\n".encode())
  200. def get_clear_pattern_file(pattern_name):
  201. """Return a .thr file path based on pattern_name."""
  202. if pattern_name == "random":
  203. # Randomly pick one of the three known patterns
  204. return random.choice(list(CLEAR_PATTERNS.values()))
  205. # If pattern_name is invalid or absent, default to 'clear_from_in'
  206. return CLEAR_PATTERNS.get(pattern_name, CLEAR_PATTERNS["clear_from_in"])
  207. def run_theta_rho_files(
  208. file_paths,
  209. pause_time=0,
  210. clear_pattern=None,
  211. run_mode="single",
  212. shuffle=False
  213. ):
  214. """
  215. Runs multiple .thr files in sequence with options for pausing, clearing, shuffling, and looping.
  216. Parameters:
  217. - file_paths (list): List of file paths to run.
  218. - pause_time (float): Seconds to pause between patterns.
  219. - clear_pattern (str): Specific clear pattern to run ("clear_in", "clear_out", "clear_sideway", or "random").
  220. - run_mode (str): "single" for one-time run or "indefinite" for looping.
  221. - shuffle (bool): Whether to shuffle the playlist before running.
  222. """
  223. global stop_requested
  224. stop_requested = False # Reset stop flag at the start
  225. if shuffle:
  226. random.shuffle(file_paths)
  227. print("Playlist shuffled.")
  228. while True:
  229. for idx, path in enumerate(file_paths):
  230. if stop_requested:
  231. print("Execution stopped before starting next pattern.")
  232. return
  233. if clear_pattern:
  234. if stop_requested:
  235. print("Execution stopped before running the next clear pattern.")
  236. return
  237. # Determine the clear pattern to run
  238. clear_file_path = get_clear_pattern_file(clear_pattern)
  239. print(f"Running clear pattern: {clear_file_path}")
  240. run_theta_rho_file(clear_file_path)
  241. if not stop_requested:
  242. # Run the main pattern
  243. print(f"Running pattern {idx + 1} of {len(file_paths)}: {path}")
  244. run_theta_rho_file(path)
  245. if idx < len(file_paths) -1:
  246. if stop_requested:
  247. print("Execution stopped before running the next clear pattern.")
  248. return
  249. # Pause after each pattern if requested
  250. if pause_time > 0:
  251. print(f"Pausing for {pause_time} seconds...")
  252. time.sleep(pause_time)
  253. # After completing the playlist
  254. if run_mode == "indefinite":
  255. print("Playlist completed. Restarting as per 'indefinite' run mode.")
  256. if pause_time > 0:
  257. print(f"Pausing for {pause_time} seconds before restarting...")
  258. time.sleep(pause_time)
  259. if shuffle:
  260. random.shuffle(file_paths)
  261. print("Playlist reshuffled for the next loop.")
  262. continue
  263. else:
  264. print("Playlist completed.")
  265. break
  266. # Reset theta after execution or stopping
  267. reset_theta()
  268. ser.write("FINISHED\n".encode())
  269. print("All requested patterns completed (or stopped).")
  270. def reset_theta():
  271. """Reset theta on the Arduino."""
  272. ser.write("RESET_THETA\n".encode())
  273. while True:
  274. with serial_lock:
  275. if ser.in_waiting > 0:
  276. response = ser.readline().decode().strip()
  277. print(f"Arduino response: {response}")
  278. if response == "THETA_RESET":
  279. print("Theta successfully reset.")
  280. break
  281. time.sleep(0.5) # Small delay to avoid busy waiting
  282. # Flask API Endpoints
  283. @app.route('/')
  284. def index():
  285. return render_template('index.html')
  286. @app.route('/list_serial_ports', methods=['GET'])
  287. def list_ports():
  288. return jsonify(list_serial_ports())
  289. @app.route('/connect_serial', methods=['POST'])
  290. def connect_serial():
  291. port = request.json.get('port')
  292. if not port:
  293. return jsonify({'error': 'No port provided'}), 400
  294. try:
  295. connect_to_serial(port)
  296. return jsonify({'success': True})
  297. except Exception as e:
  298. return jsonify({'error': str(e)}), 500
  299. @app.route('/disconnect_serial', methods=['POST'])
  300. def disconnect():
  301. try:
  302. disconnect_serial()
  303. return jsonify({'success': True})
  304. except Exception as e:
  305. return jsonify({'error': str(e)}), 500
  306. @app.route('/restart_serial', methods=['POST'])
  307. def restart():
  308. port = request.json.get('port')
  309. if not port:
  310. return jsonify({'error': 'No port provided'}), 400
  311. try:
  312. restart_serial(port)
  313. return jsonify({'success': True})
  314. except Exception as e:
  315. return jsonify({'error': str(e)}), 500
  316. @app.route('/list_theta_rho_files', methods=['GET'])
  317. def list_theta_rho_files():
  318. files = []
  319. for root, _, filenames in os.walk(THETA_RHO_DIR):
  320. for file in filenames:
  321. # Construct the relative file path
  322. relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
  323. files.append(relative_path)
  324. return jsonify(sorted(files))
  325. @app.route('/upload_theta_rho', methods=['POST'])
  326. def upload_theta_rho():
  327. custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
  328. os.makedirs(custom_patterns_dir, exist_ok=True) # Ensure the directory exists
  329. file = request.files['file']
  330. if file:
  331. file.save(os.path.join(custom_patterns_dir, file.filename))
  332. return jsonify({'success': True})
  333. return jsonify({'success': False})
  334. @app.route('/run_theta_rho', methods=['POST'])
  335. def run_theta_rho():
  336. file_name = request.json.get('file_name')
  337. pre_execution = request.json.get('pre_execution') # 'clear_in', 'clear_out', 'clear_sideway', or 'none'
  338. if not file_name:
  339. return jsonify({'error': 'No file name provided'}), 400
  340. file_path = os.path.join(THETA_RHO_DIR, file_name)
  341. if not os.path.exists(file_path):
  342. return jsonify({'error': 'File not found'}), 404
  343. try:
  344. # Build a list of files to run in sequence
  345. files_to_run = []
  346. if pre_execution == 'clear_in':
  347. files_to_run.append('./patterns/clear_from_in.thr')
  348. elif pre_execution == 'clear_out':
  349. files_to_run.append('./patterns/clear_from_out.thr')
  350. elif pre_execution == 'clear_sideway':
  351. files_to_run.append('./patterns/clear_sideway.thr')
  352. elif pre_execution == 'none':
  353. pass # No pre-execution action required
  354. # Finally, add the main file
  355. files_to_run.append(file_path)
  356. # Run them in one shot using run_theta_rho_files (blocking call)
  357. threading.Thread(
  358. target=run_theta_rho_files,
  359. args=(files_to_run,),
  360. kwargs={
  361. 'pause_time': 0,
  362. 'clear_pattern': None
  363. }
  364. ).start()
  365. return jsonify({'success': True})
  366. except Exception as e:
  367. return jsonify({'error': str(e)}), 500
  368. @app.route('/stop_execution', methods=['POST'])
  369. def stop_execution():
  370. global stop_requested
  371. stop_requested = True
  372. return jsonify({'success': True})
  373. @app.route('/send_home', methods=['POST'])
  374. def send_home():
  375. """Send the HOME command to the Arduino."""
  376. try:
  377. send_command("HOME")
  378. return jsonify({'success': True})
  379. except Exception as e:
  380. return jsonify({'error': str(e)}), 500
  381. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  382. def run_specific_theta_rho_file(file_name):
  383. """Run a specific theta-rho file."""
  384. file_path = os.path.join(THETA_RHO_DIR, file_name)
  385. if not os.path.exists(file_path):
  386. return jsonify({'error': 'File not found'}), 404
  387. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  388. return jsonify({'success': True})
  389. @app.route('/delete_theta_rho_file', methods=['POST'])
  390. def delete_theta_rho_file():
  391. data = request.json
  392. file_name = data.get('file_name')
  393. if not file_name:
  394. return jsonify({"success": False, "error": "No file name provided"}), 400
  395. file_path = os.path.join(THETA_RHO_DIR, file_name)
  396. if not os.path.exists(file_path):
  397. return jsonify({"success": False, "error": "File not found"}), 404
  398. try:
  399. os.remove(file_path)
  400. return jsonify({"success": True})
  401. except Exception as e:
  402. return jsonify({"success": False, "error": str(e)}), 500
  403. @app.route('/move_to_center', methods=['POST'])
  404. def move_to_center():
  405. """Move the sand table to the center position."""
  406. try:
  407. if ser is None or not ser.is_open:
  408. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  409. coordinates = [(0, 0)] # Center position
  410. send_coordinate_batch(ser, coordinates)
  411. return jsonify({"success": True})
  412. except Exception as e:
  413. return jsonify({"success": False, "error": str(e)}), 500
  414. @app.route('/move_to_perimeter', methods=['POST'])
  415. def move_to_perimeter():
  416. """Move the sand table to the perimeter position."""
  417. try:
  418. if ser is None or not ser.is_open:
  419. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  420. MAX_RHO = 1
  421. coordinates = [(0, MAX_RHO)] # Perimeter position
  422. send_coordinate_batch(ser, coordinates)
  423. return jsonify({"success": True})
  424. except Exception as e:
  425. return jsonify({"success": False, "error": str(e)}), 500
  426. @app.route('/preview_thr', methods=['POST'])
  427. def preview_thr():
  428. file_name = request.json.get('file_name')
  429. if not file_name:
  430. return jsonify({'error': 'No file name provided'}), 400
  431. file_path = os.path.join(THETA_RHO_DIR, file_name)
  432. if not os.path.exists(file_path):
  433. return jsonify({'error': 'File not found'}), 404
  434. try:
  435. # Parse the .thr file with transformations
  436. coordinates = parse_theta_rho_file(file_path)
  437. return jsonify({'success': True, 'coordinates': coordinates})
  438. except Exception as e:
  439. return jsonify({'error': str(e)}), 500
  440. @app.route('/send_coordinate', methods=['POST'])
  441. def send_coordinate():
  442. """Send a single (theta, rho) coordinate to the Arduino."""
  443. global ser
  444. if ser is None or not ser.is_open:
  445. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  446. try:
  447. data = request.json
  448. theta = data.get('theta')
  449. rho = data.get('rho')
  450. if theta is None or rho is None:
  451. return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
  452. # Send the coordinate to the Arduino
  453. send_coordinate_batch(ser, [(theta, rho)])
  454. reset_theta()
  455. return jsonify({"success": True})
  456. except Exception as e:
  457. return jsonify({"success": False, "error": str(e)}), 500
  458. # Expose files for download if needed
  459. @app.route('/download/<filename>', methods=['GET'])
  460. def download_file(filename):
  461. """Download a file from the theta-rho directory."""
  462. return send_from_directory(THETA_RHO_DIR, filename)
  463. @app.route('/serial_status', methods=['GET'])
  464. def serial_status():
  465. global ser, ser_port
  466. return jsonify({
  467. 'connected': ser.is_open if ser else False,
  468. 'port': ser_port # Include the port name
  469. })
  470. if not os.path.exists(PLAYLISTS_FILE):
  471. with open(PLAYLISTS_FILE, "w") as f:
  472. json.dump({}, f, indent=2)
  473. def load_playlists():
  474. """
  475. Load the entire playlists dictionary from the JSON file.
  476. Returns something like: {
  477. "My Playlist": ["file1.thr", "file2.thr"],
  478. "Another": ["x.thr"]
  479. }
  480. """
  481. with open(PLAYLISTS_FILE, "r") as f:
  482. return json.load(f)
  483. def save_playlists(playlists_dict):
  484. """
  485. Save the entire playlists dictionary back to the JSON file.
  486. """
  487. with open(PLAYLISTS_FILE, "w") as f:
  488. json.dump(playlists_dict, f, indent=2)
  489. @app.route("/list_all_playlists", methods=["GET"])
  490. def list_all_playlists():
  491. """
  492. Returns a list of all playlist names.
  493. Example return: ["My Playlist", "Another Playlist"]
  494. """
  495. playlists_dict = load_playlists()
  496. playlist_names = list(playlists_dict.keys())
  497. return jsonify(playlist_names)
  498. @app.route("/get_playlist", methods=["GET"])
  499. def get_playlist():
  500. """
  501. GET /get_playlist?name=My%20Playlist
  502. Returns: { "name": "My Playlist", "files": [... ] }
  503. """
  504. playlist_name = request.args.get("name", "")
  505. if not playlist_name:
  506. return jsonify({"error": "Missing playlist 'name' parameter"}), 400
  507. playlists_dict = load_playlists()
  508. if playlist_name not in playlists_dict:
  509. return jsonify({"error": f"Playlist '{playlist_name}' not found"}), 404
  510. files = playlists_dict[playlist_name] # e.g. ["file1.thr", "file2.thr"]
  511. return jsonify({
  512. "name": playlist_name,
  513. "files": files
  514. })
  515. @app.route("/create_playlist", methods=["POST"])
  516. def create_playlist():
  517. """
  518. POST /create_playlist
  519. Body: { "name": "My Playlist", "files": ["file1.thr", "file2.thr"] }
  520. Creates or overwrites a playlist with the given name.
  521. """
  522. data = request.get_json()
  523. if not data or "name" not in data or "files" not in data:
  524. return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
  525. playlist_name = data["name"]
  526. files = data["files"]
  527. # Load all playlists
  528. playlists_dict = load_playlists()
  529. # Overwrite or create new
  530. playlists_dict[playlist_name] = files
  531. # Save changes
  532. save_playlists(playlists_dict)
  533. return jsonify({
  534. "success": True,
  535. "message": f"Playlist '{playlist_name}' created/updated"
  536. })
  537. @app.route("/modify_playlist", methods=["POST"])
  538. def modify_playlist():
  539. """
  540. POST /modify_playlist
  541. Body: { "name": "My Playlist", "files": ["file1.thr", "file2.thr"] }
  542. Updates (or creates) the existing playlist with a new file list.
  543. You can 404 if you only want to allow modifications to existing playlists.
  544. """
  545. data = request.get_json()
  546. if not data or "name" not in data or "files" not in data:
  547. return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
  548. playlist_name = data["name"]
  549. files = data["files"]
  550. # Load all playlists
  551. playlists_dict = load_playlists()
  552. # Optional: If you want to disallow creating a new playlist here:
  553. # if playlist_name not in playlists_dict:
  554. # return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
  555. # Overwrite or create new
  556. playlists_dict[playlist_name] = files
  557. # Save
  558. save_playlists(playlists_dict)
  559. return jsonify({"success": True, "message": f"Playlist '{playlist_name}' updated"})
  560. @app.route("/delete_playlist", methods=["DELETE"])
  561. def delete_playlist():
  562. """
  563. DELETE /delete_playlist
  564. Body: { "name": "My Playlist" }
  565. Removes the playlist from the single JSON file.
  566. """
  567. data = request.get_json()
  568. if not data or "name" not in data:
  569. return jsonify({"success": False, "error": "Missing 'name' field"}), 400
  570. playlist_name = data["name"]
  571. playlists_dict = load_playlists()
  572. if playlist_name not in playlists_dict:
  573. return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
  574. # Remove from dict
  575. del playlists_dict[playlist_name]
  576. save_playlists(playlists_dict)
  577. return jsonify({
  578. "success": True,
  579. "message": f"Playlist '{playlist_name}' deleted"
  580. })
  581. @app.route('/add_to_playlist', methods=['POST'])
  582. def add_to_playlist():
  583. data = request.json
  584. playlist_name = data.get('playlist_name')
  585. pattern = data.get('pattern')
  586. # Load existing playlists
  587. with open('playlists.json', 'r') as f:
  588. playlists = json.load(f)
  589. # Add pattern to the selected playlist
  590. if playlist_name in playlists:
  591. playlists[playlist_name].append(pattern)
  592. with open('playlists.json', 'w') as f:
  593. json.dump(playlists, f)
  594. return jsonify(success=True)
  595. else:
  596. return jsonify(success=False, error='Playlist not found'), 404
  597. @app.route("/run_playlist", methods=["POST"])
  598. def run_playlist():
  599. """
  600. POST /run_playlist
  601. Body (JSON):
  602. {
  603. "playlist_name": "My Playlist",
  604. "pause_time": 1.0, # Optional: seconds to pause between patterns
  605. "clear_pattern": "random", # Optional: "clear_in", "clear_out", "clear_sideway", or "random"
  606. "run_mode": "single", # 'single' or 'indefinite'
  607. "shuffle": True # true or false
  608. }
  609. """
  610. data = request.get_json()
  611. # Validate input
  612. if not data or "playlist_name" not in data:
  613. return jsonify({"success": False, "error": "Missing 'playlist_name' field"}), 400
  614. playlist_name = data["playlist_name"]
  615. pause_time = data.get("pause_time", 0)
  616. clear_pattern = data.get("clear_pattern", None)
  617. run_mode = data.get("run_mode", "single") # Default to 'single' run
  618. shuffle = data.get("shuffle", False) # Default to no shuffle
  619. # Validate pause_time
  620. if not isinstance(pause_time, (int, float)) or pause_time < 0:
  621. return jsonify({"success": False, "error": "'pause_time' must be a non-negative number"}), 400
  622. # Validate clear_pattern
  623. valid_patterns = ["clear_in", "clear_out", "clear_sideway", "random"]
  624. if clear_pattern not in valid_patterns:
  625. clear_pattern = None
  626. # Validate run_mode
  627. if run_mode not in ["single", "indefinite"]:
  628. return jsonify({"success": False, "error": "'run_mode' must be 'single' or 'indefinite'"}), 400
  629. # Validate shuffle
  630. if not isinstance(shuffle, bool):
  631. return jsonify({"success": False, "error": "'shuffle' must be a boolean value"}), 400
  632. # Load playlists
  633. playlists = load_playlists()
  634. if playlist_name not in playlists:
  635. return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
  636. file_paths = playlists[playlist_name]
  637. file_paths = [os.path.join(THETA_RHO_DIR, file) for file in file_paths]
  638. if not file_paths:
  639. return jsonify({"success": False, "error": f"Playlist '{playlist_name}' is empty"}), 400
  640. # Start the playlist execution in a separate thread
  641. try:
  642. threading.Thread(
  643. target=run_theta_rho_files,
  644. args=(file_paths,),
  645. kwargs={
  646. 'pause_time': pause_time,
  647. 'clear_pattern': clear_pattern,
  648. 'run_mode': run_mode,
  649. 'shuffle': shuffle
  650. },
  651. daemon=True # Daemonize thread to exit with the main program
  652. ).start()
  653. return jsonify({"success": True, "message": f"Playlist '{playlist_name}' is now running."})
  654. except Exception as e:
  655. return jsonify({"success": False, "error": str(e)}), 500
  656. @app.route('/set_speed', methods=['POST'])
  657. def set_speed():
  658. """Set the speed for the Arduino."""
  659. global ser
  660. if ser is None or not ser.is_open:
  661. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  662. try:
  663. # Parse the speed value from the request
  664. data = request.json
  665. speed = data.get('speed')
  666. if speed is None:
  667. return jsonify({"success": False, "error": "Speed is required"}), 400
  668. if not isinstance(speed, (int, float)) or speed <= 0:
  669. return jsonify({"success": False, "error": "Invalid speed value"}), 400
  670. # Send the SET_SPEED command to the Arduino
  671. command = f"SET_SPEED {speed}"
  672. send_command(command)
  673. return jsonify({"success": True, "speed": speed})
  674. except Exception as e:
  675. return jsonify({"success": False, "error": str(e)}), 500
  676. @app.route('/get_firmware_info', methods=['GET', 'POST'])
  677. def get_firmware_info():
  678. """
  679. Compare the installed firmware version and motor type with the one in the .ino file.
  680. """
  681. global ser
  682. if ser is None or not ser.is_open:
  683. return jsonify({"success": False, "error": "Arduino not connected or serial port not open"}), 400
  684. try:
  685. if request.method == "GET":
  686. # Attempt to retrieve installed firmware details from the Arduino
  687. ser.reset_input_buffer()
  688. ser.reset_output_buffer()
  689. ser.write(b"GET_VERSION\n")
  690. time.sleep(0.5)
  691. installed_version = 'Unknown'
  692. installed_type = 'Unknown'
  693. if ser.in_waiting > 0:
  694. response = ser.readline().decode().strip()
  695. if " | " in response:
  696. installed_version, installed_type = response.split(" | ", 1)
  697. # If Arduino provides valid details, proceed with comparison
  698. if installed_version != 'Unknown' and installed_type != 'Unknown':
  699. ino_path = MOTOR_TYPE_MAPPING.get(installed_type)
  700. firmware_details = get_ino_firmware_details(ino_path)
  701. if not firmware_details or not firmware_details.get("version") or not firmware_details.get("motorType"):
  702. return jsonify({"success": False, "error": "Failed to retrieve .ino firmware details"}), 500
  703. update_available = (
  704. installed_version != firmware_details["version"] or
  705. installed_type != firmware_details["motorType"]
  706. )
  707. return jsonify({
  708. "success": True,
  709. "installedVersion": installed_version,
  710. "installedType": installed_type,
  711. "inoVersion": firmware_details["version"],
  712. "inoType": firmware_details["motorType"],
  713. "updateAvailable": update_available
  714. })
  715. # If Arduino details are unknown, indicate the need for POST
  716. return jsonify({
  717. "success": True,
  718. "installedVersion": installed_version,
  719. "installedType": installed_type,
  720. "updateAvailable": False
  721. })
  722. if request.method == "POST":
  723. motor_type = request.json.get("motorType", None)
  724. if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
  725. return jsonify({"success": False, "error": "Invalid or missing motor type"}), 400
  726. ino_path = MOTOR_TYPE_MAPPING.get(motor_type)
  727. firmware_details = get_ino_firmware_details(ino_path)
  728. if not firmware_details or not firmware_details.get("version") or not firmware_details.get("motorType"):
  729. return jsonify({"success": False, "error": "Failed to retrieve .ino firmware details"}), 500
  730. return jsonify({
  731. "success": True,
  732. "installedVersion": 'Unknown',
  733. "installedType": motor_type,
  734. "inoVersion": firmware_details["version"],
  735. "inoType": firmware_details["motorType"],
  736. "updateAvailable": True
  737. })
  738. except Exception as e:
  739. return jsonify({"success": False, "error": str(e)}), 500
  740. @app.route('/flash_firmware', methods=['POST'])
  741. def flash_firmware():
  742. """
  743. Compile and flash the firmware to the connected Arduino.
  744. """
  745. global ser_port
  746. # Ensure the Arduino is connected
  747. if ser_port is None or ser is None or not ser.is_open:
  748. return jsonify({"success": False, "error": "No Arduino connected or connection lost"}), 400
  749. build_dir = "/tmp/arduino_build" # Temporary build directory
  750. try:
  751. data = request.json
  752. motor_type = data.get("motorType", None)
  753. # Validate motor type
  754. if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
  755. return jsonify({"success": False, "error": "Invalid or missing motor type"}), 400
  756. # Get the .ino file path based on the motor type
  757. ino_file_path = MOTOR_TYPE_MAPPING[motor_type]
  758. ino_file_name = os.path.basename(ino_file_path)
  759. # Install required libraries
  760. required_libraries = ["AccelStepper"] # AccelStepper includes MultiStepper
  761. for library in required_libraries:
  762. library_install_command = ["arduino-cli", "lib", "install", library]
  763. install_process = subprocess.run(library_install_command, capture_output=True, text=True)
  764. if install_process.returncode != 0:
  765. return jsonify({
  766. "success": False,
  767. "error": f"Library installation failed for {library}: {install_process.stderr}"
  768. }), 500
  769. # Step 1: Compile the .ino file to a .hex file
  770. compile_command = [
  771. "arduino-cli",
  772. "compile",
  773. "--fqbn", "arduino:avr:uno", # Use the detected FQBN
  774. "--output-dir", build_dir,
  775. ino_file_path
  776. ]
  777. compile_process = subprocess.run(compile_command, capture_output=True, text=True)
  778. if compile_process.returncode != 0:
  779. return jsonify({
  780. "success": False,
  781. "error": compile_process.stderr
  782. }), 500
  783. # Step 2: Flash the .hex file to the Arduino
  784. hex_file_path = os.path.join(build_dir, ino_file_name+".hex")
  785. flash_command = [
  786. "avrdude",
  787. "-v",
  788. "-c", "arduino", # Programmer type
  789. "-p", "atmega328p", # Microcontroller type
  790. "-P", ser_port, # Use the dynamic serial port
  791. "-b", "115200", # Baud rate
  792. "-D",
  793. "-U", f"flash:w:{hex_file_path}:i" # Flash memory write command
  794. ]
  795. flash_process = subprocess.run(flash_command, capture_output=True, text=True)
  796. if flash_process.returncode != 0:
  797. return jsonify({
  798. "success": False,
  799. "error": flash_process.stderr
  800. }), 500
  801. return jsonify({"success": True, "message": "Firmware flashed successfully"})
  802. except Exception as e:
  803. return jsonify({"success": False, "error": str(e)}), 500
  804. finally:
  805. # Clean up temporary files
  806. if os.path.exists(build_dir):
  807. for file in os.listdir(build_dir):
  808. os.remove(os.path.join(build_dir, file))
  809. os.rmdir(build_dir)
  810. if __name__ == '__main__':
  811. # Auto-connect to serial
  812. connect_to_serial()
  813. app.run(debug=True, host='0.0.0.0', port=8080)