app.py 29 KB


  1. from flask import Flask, request, jsonify, render_template
  2. import os
  3. import serial
  4. import time
  5. import random
  6. import threading
  7. import serial.tools.list_ports
  8. import math
  9. import json
  10. from datetime import datetime
  11. app = Flask(__name__)
  12. # Configuration
  13. THETA_RHO_DIR = './patterns'
  14. IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  15. CLEAR_PATTERNS = {
  16. "clear_from_in": "./patterns/clear_from_in.thr",
  17. "clear_from_out": "./patterns/clear_from_out.thr",
  18. "clear_sideway": "./patterns/clear_sideway.thr"
  19. }
  20. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  21. # Serial connection (First available will be selected by default)
  22. ser = None
  23. ser_port = None # Global variable to store the serial port name
  24. stop_requested = False
  25. pause_requested = False
  26. pause_condition = threading.Condition()
  27. serial_lock = threading.Lock()
  28. PLAYLISTS_FILE = os.path.join(os.getcwd(), "playlists.json")
  29. # Ensure the file exists and contains at least an empty JSON object
  30. if not os.path.exists(PLAYLISTS_FILE):
  31. with open(PLAYLISTS_FILE, "w") as f:
  32. json.dump({}, f, indent=2)
  33. def list_serial_ports():
  34. """Return a list of available serial ports."""
  35. ports = serial.tools.list_ports.comports()
  36. return [port.device for port in ports if port.device not in IGNORE_PORTS]
  37. def connect_to_serial(port=None, baudrate=115200):
  38. """Automatically connect to the first available serial port or a specified port."""
  39. global ser, ser_port
  40. try:
  41. if port is None:
  42. ports = list_serial_ports()
  43. if not ports:
  44. print("No serial port connected")
  45. return False
  46. port = ports[0] # Auto-select the first available port
  47. with serial_lock:
  48. if ser and ser.is_open:
  49. ser.close()
  50. ser = serial.Serial(port, baudrate)
  51. ser_port = port # Store the connected port globally
  52. print(f"Connected to serial port: {port}")
  53. time.sleep(2) # Allow time for the connection to establish
  54. return True # Successfully connected
  55. except serial.SerialException as e:
  56. print(f"Failed to connect to serial port {port}: {e}")
  57. port = None # Reset the port to try the next available one
  58. print("Max retries reached. Could not connect to a serial port.")
  59. return False
  60. def disconnect_serial():
  61. """Disconnect the current serial connection."""
  62. global ser, ser_port
  63. if ser and ser.is_open:
  64. ser.close()
  65. ser = None
  66. ser_port = None # Reset the port name
  67. def restart_serial(port, baudrate=115200):
  68. """Restart the serial connection."""
  69. disconnect_serial()
  70. connect_to_serial(port, baudrate)
  71. def parse_theta_rho_file(file_path):
  72. """
  73. Parse a theta-rho file and return a list of (theta, rho) pairs.
  74. Normalizes the list so the first theta is always 0.
  75. """
  76. coordinates = []
  77. try:
  78. with open(file_path, 'r') as file:
  79. for line in file:
  80. line = line.strip()
  81. # Skip header or comment lines (starting with '#' or empty lines)
  82. if not line or line.startswith("#"):
  83. continue
  84. # Parse lines with theta and rho separated by spaces
  85. try:
  86. theta, rho = map(float, line.split())
  87. coordinates.append((theta, rho))
  88. except ValueError:
  89. print(f"Skipping invalid line: {line}")
  90. continue
  91. except Exception as e:
  92. print(f"Error reading file: {e}")
  93. return coordinates
  94. # ---- Normalization Step ----
  95. if coordinates:
  96. # Take the first coordinate's theta
  97. first_theta = coordinates[0][0]
  98. # Shift all thetas so the first coordinate has theta=0
  99. normalized = []
  100. for (theta, rho) in coordinates:
  101. normalized.append((theta - first_theta, rho))
  102. # Replace original list with normalized data
  103. coordinates = normalized
  104. return coordinates
  105. def send_coordinate_batch(ser, coordinates):
  106. """Send a batch of theta-rho pairs to the Arduino."""
  107. # print("Sending batch:", coordinates)
  108. batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
  109. ser.write(batch_str.encode())
  110. def send_command(command):
  111. """Send a single command to the Arduino."""
  112. ser.write(f"{command}\n".encode())
  113. print(f"Sent: {command}")
  114. # Wait for "R" acknowledgment from Arduino
  115. while True:
  116. with serial_lock:
  117. if ser.in_waiting > 0:
  118. response = ser.readline().decode().strip()
  119. print(f"Arduino response: {response}")
  120. if response == "R":
  121. print("Command execution completed.")
  122. break
  123. def wait_for_start_time(schedule_hours):
  124. """
  125. Keep checking every 30 seconds if the time is within the schedule to resume execution.
  126. """
  127. global pause_requested
  128. start_time, end_time = schedule_hours
  129. while pause_requested:
  130. now = datetime.now().time()
  131. if start_time <= now < end_time:
  132. print("Resuming execution: Within schedule.")
  133. pause_requested = False
  134. with pause_condition:
  135. pause_condition.notify_all()
  136. break # Exit the loop once resumed
  137. else:
  138. time.sleep(30) # Wait for 30 seconds before checking again
  139. # Function to check schedule based on start and end time
  140. def schedule_checker(schedule_hours):
  141. """
  142. Pauses/resumes execution based on a given time range.
  143. Parameters:
  144. - schedule_hours (tuple): (start_time, end_time) as `datetime.time` objects.
  145. """
  146. global pause_requested
  147. if not schedule_hours:
  148. return # No scheduling restriction
  149. start_time, end_time = schedule_hours
  150. now = datetime.now().time() # Get the current time as `datetime.time`
  151. # Check if we are currently within the scheduled time
  152. if start_time <= now < end_time:
  153. if pause_requested:
  154. print("Starting execution: Within schedule.")
  155. pause_requested = False # Resume execution
  156. with pause_condition:
  157. pause_condition.notify_all()
  158. else:
  159. if not pause_requested:
  160. print("Pausing execution: Outside schedule.")
  161. pause_requested = True # Pause execution
  162. # Start a background thread to periodically check for start time
  163. threading.Thread(target=wait_for_start_time, args=(schedule_hours,), daemon=True).start()
  164. def run_theta_rho_file(file_path, schedule_hours=None):
  165. """Run a theta-rho file by sending data in optimized batches."""
  166. global stop_requested
  167. stop_requested = False
  168. coordinates = parse_theta_rho_file(file_path)
  169. if len(coordinates) < 2:
  170. print("Not enough coordinates for interpolation.")
  171. return
  172. # Optimize batch size for smoother execution
  173. batch_size = 10 # Smaller batches may smooth movement further
  174. for i in range(0, len(coordinates), batch_size):
  175. # Check stop_requested flag after sending the batch
  176. if stop_requested:
  177. print("Execution stopped by user after completing the current batch.")
  178. break
  179. with pause_condition:
  180. while pause_requested:
  181. print("Execution paused...")
  182. pause_condition.wait() # This will block execution until notified
  183. batch = coordinates[i:i + batch_size]
  184. if i == 0:
  185. send_coordinate_batch(ser, batch)
  186. continue
  187. # Wait until Arduino is READY before sending the batch
  188. while True:
  189. schedule_checker(schedule_hours)
  190. with serial_lock:
  191. if ser.in_waiting > 0:
  192. response = ser.readline().decode().strip()
  193. if response == "R":
  194. send_coordinate_batch(ser, batch)
  195. break
  196. else:
  197. print(f"Arduino response: {response}")
  198. # Reset theta after execution or stopping
  199. reset_theta()
  200. ser.write("FINISHED\n".encode())
  201. def get_clear_pattern_file(pattern_name):
  202. """Return a .thr file path based on pattern_name."""
  203. if pattern_name == "random":
  204. # Randomly pick one of the three known patterns
  205. return random.choice(list(CLEAR_PATTERNS.values()))
  206. # If pattern_name is invalid or absent, default to 'clear_from_in'
  207. return CLEAR_PATTERNS.get(pattern_name, CLEAR_PATTERNS["clear_from_in"])
  208. def run_theta_rho_files(
  209. file_paths,
  210. pause_time=0,
  211. clear_pattern=None,
  212. run_mode="single",
  213. shuffle=False,
  214. schedule_hours=None
  215. ):
  216. """
  217. Runs multiple .thr files in sequence with options for pausing, clearing, shuffling, and looping.
  218. Parameters:
  219. - file_paths (list): List of file paths to run.
  220. - pause_time (float): Seconds to pause between patterns.
  221. - clear_pattern (str): Specific clear pattern to run ("clear_in", "clear_out", "clear_sideway", or "random").
  222. - run_mode (str): "single" for one-time run or "indefinite" for looping.
  223. - shuffle (bool): Whether to shuffle the playlist before running.
  224. """
  225. global stop_requested
  226. stop_requested = False # Reset stop flag at the start
  227. if shuffle:
  228. random.shuffle(file_paths)
  229. print("Playlist shuffled.")
  230. while True:
  231. for idx, path in enumerate(file_paths):
  232. schedule_checker(schedule_hours)
  233. if stop_requested:
  234. print("Execution stopped before starting next pattern.")
  235. return
  236. if clear_pattern:
  237. if stop_requested:
  238. print("Execution stopped before running the next clear pattern.")
  239. return
  240. # Determine the clear pattern to run
  241. clear_file_path = get_clear_pattern_file(clear_pattern)
  242. print(f"Running clear pattern: {clear_file_path}")
  243. run_theta_rho_file(clear_file_path, schedule_hours)
  244. if not stop_requested:
  245. # Run the main pattern
  246. print(f"Running pattern {idx + 1} of {len(file_paths)}: {path}")
  247. run_theta_rho_file(path, schedule_hours)
  248. if idx < len(file_paths) -1:
  249. if stop_requested:
  250. print("Execution stopped before running the next clear pattern.")
  251. return
  252. # Pause after each pattern if requested
  253. if pause_time > 0:
  254. print(f"Pausing for {pause_time} seconds...")
  255. time.sleep(pause_time)
  256. # After completing the playlist
  257. if run_mode == "indefinite":
  258. print("Playlist completed. Restarting as per 'indefinite' run mode.")
  259. if pause_time > 0:
  260. print(f"Pausing for {pause_time} seconds before restarting...")
  261. time.sleep(pause_time)
  262. if shuffle:
  263. random.shuffle(file_paths)
  264. print("Playlist reshuffled for the next loop.")
  265. continue
  266. else:
  267. print("Playlist completed.")
  268. break
  269. # Reset theta after execution or stopping
  270. reset_theta()
  271. ser.write("FINISHED\n".encode())
  272. print("All requested patterns completed (or stopped).")
  273. def reset_theta():
  274. """Reset theta on the Arduino."""
  275. ser.write("RESET_THETA\n".encode())
  276. while True:
  277. with serial_lock:
  278. if ser.in_waiting > 0:
  279. response = ser.readline().decode().strip()
  280. print(f"Arduino response: {response}")
  281. if response == "THETA_RESET":
  282. print("Theta successfully reset.")
  283. break
  284. time.sleep(0.5) # Small delay to avoid busy waiting
  285. # Flask API Endpoints
  286. @app.route('/')
  287. def index():
  288. return render_template('index.html')
  289. @app.route('/list_serial_ports', methods=['GET'])
  290. def list_ports():
  291. return jsonify(list_serial_ports())
  292. @app.route('/connect_serial', methods=['POST'])
  293. def connect_serial():
  294. port = request.json.get('port')
  295. if not port:
  296. return jsonify({'error': 'No port provided'}), 400
  297. try:
  298. connect_to_serial(port)
  299. return jsonify({'success': True})
  300. except Exception as e:
  301. return jsonify({'error': str(e)}), 500
  302. @app.route('/disconnect_serial', methods=['POST'])
  303. def disconnect():
  304. try:
  305. disconnect_serial()
  306. return jsonify({'success': True})
  307. except Exception as e:
  308. return jsonify({'error': str(e)}), 500
  309. @app.route('/restart_serial', methods=['POST'])
  310. def restart():
  311. port = request.json.get('port')
  312. if not port:
  313. return jsonify({'error': 'No port provided'}), 400
  314. try:
  315. restart_serial(port)
  316. return jsonify({'success': True})
  317. except Exception as e:
  318. return jsonify({'error': str(e)}), 500
  319. @app.route('/list_theta_rho_files', methods=['GET'])
  320. def list_theta_rho_files():
  321. files = []
  322. for root, _, filenames in os.walk(THETA_RHO_DIR):
  323. for file in filenames:
  324. # Construct the relative file path
  325. relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
  326. files.append(relative_path)
  327. return jsonify(sorted(files))
  328. @app.route('/upload_theta_rho', methods=['POST'])
  329. def upload_theta_rho():
  330. custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
  331. os.makedirs(custom_patterns_dir, exist_ok=True) # Ensure the directory exists
  332. file = request.files['file']
  333. if file:
  334. file.save(os.path.join(custom_patterns_dir, file.filename))
  335. return jsonify({'success': True})
  336. return jsonify({'success': False})
  337. @app.route('/run_theta_rho', methods=['POST'])
  338. def run_theta_rho():
  339. file_name = request.json.get('file_name')
  340. pre_execution = request.json.get('pre_execution') # 'clear_in', 'clear_out', 'clear_sideway', or 'none'
  341. if not file_name:
  342. return jsonify({'error': 'No file name provided'}), 400
  343. file_path = os.path.join(THETA_RHO_DIR, file_name)
  344. if not os.path.exists(file_path):
  345. return jsonify({'error': 'File not found'}), 404
  346. try:
  347. # Build a list of files to run in sequence
  348. files_to_run = []
  349. if pre_execution == 'clear_in':
  350. files_to_run.append('./patterns/clear_from_in.thr')
  351. elif pre_execution == 'clear_out':
  352. files_to_run.append('./patterns/clear_from_out.thr')
  353. elif pre_execution == 'clear_sideway':
  354. files_to_run.append('./patterns/clear_sideway.thr')
  355. elif pre_execution == 'none':
  356. pass # No pre-execution action required
  357. # Finally, add the main file
  358. files_to_run.append(file_path)
  359. # Run them in one shot using run_theta_rho_files (blocking call)
  360. threading.Thread(
  361. target=run_theta_rho_files,
  362. args=(files_to_run,),
  363. kwargs={
  364. 'pause_time': 0,
  365. 'clear_pattern': None
  366. }
  367. ).start()
  368. return jsonify({'success': True})
  369. except Exception as e:
  370. return jsonify({'error': str(e)}), 500
  371. @app.route('/stop_execution', methods=['POST'])
  372. def stop_execution():
  373. global stop_requested
  374. stop_requested = True
  375. return jsonify({'success': True})
  376. @app.route('/send_home', methods=['POST'])
  377. def send_home():
  378. """Send the HOME command to the Arduino."""
  379. try:
  380. send_command("HOME")
  381. return jsonify({'success': True})
  382. except Exception as e:
  383. return jsonify({'error': str(e)}), 500
  384. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  385. def run_specific_theta_rho_file(file_name):
  386. """Run a specific theta-rho file."""
  387. file_path = os.path.join(THETA_RHO_DIR, file_name)
  388. if not os.path.exists(file_path):
  389. return jsonify({'error': 'File not found'}), 404
  390. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  391. return jsonify({'success': True})
  392. @app.route('/delete_theta_rho_file', methods=['POST'])
  393. def delete_theta_rho_file():
  394. data = request.json
  395. file_name = data.get('file_name')
  396. if not file_name:
  397. return jsonify({"success": False, "error": "No file name provided"}), 400
  398. file_path = os.path.join(THETA_RHO_DIR, file_name)
  399. if not os.path.exists(file_path):
  400. return jsonify({"success": False, "error": "File not found"}), 404
  401. try:
  402. os.remove(file_path)
  403. return jsonify({"success": True})
  404. except Exception as e:
  405. return jsonify({"success": False, "error": str(e)}), 500
  406. @app.route('/move_to_center', methods=['POST'])
  407. def move_to_center():
  408. """Move the sand table to the center position."""
  409. try:
  410. if ser is None or not ser.is_open:
  411. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  412. coordinates = [(0, 0)] # Center position
  413. send_coordinate_batch(ser, coordinates)
  414. return jsonify({"success": True})
  415. except Exception as e:
  416. return jsonify({"success": False, "error": str(e)}), 500
  417. @app.route('/move_to_perimeter', methods=['POST'])
  418. def move_to_perimeter():
  419. """Move the sand table to the perimeter position."""
  420. try:
  421. if ser is None or not ser.is_open:
  422. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  423. MAX_RHO = 1
  424. coordinates = [(0, MAX_RHO)] # Perimeter position
  425. send_coordinate_batch(ser, coordinates)
  426. return jsonify({"success": True})
  427. except Exception as e:
  428. return jsonify({"success": False, "error": str(e)}), 500
  429. @app.route('/preview_thr', methods=['POST'])
  430. def preview_thr():
  431. file_name = request.json.get('file_name')
  432. if not file_name:
  433. return jsonify({'error': 'No file name provided'}), 400
  434. file_path = os.path.join(THETA_RHO_DIR, file_name)
  435. if not os.path.exists(file_path):
  436. return jsonify({'error': 'File not found'}), 404
  437. try:
  438. # Parse the .thr file with transformations
  439. coordinates = parse_theta_rho_file(file_path)
  440. return jsonify({'success': True, 'coordinates': coordinates})
  441. except Exception as e:
  442. return jsonify({'error': str(e)}), 500
  443. @app.route('/send_coordinate', methods=['POST'])
  444. def send_coordinate():
  445. """Send a single (theta, rho) coordinate to the Arduino."""
  446. global ser
  447. if ser is None or not ser.is_open:
  448. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  449. try:
  450. data = request.json
  451. theta = data.get('theta')
  452. rho = data.get('rho')
  453. if theta is None or rho is None:
  454. return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
  455. # Send the coordinate to the Arduino
  456. send_coordinate_batch(ser, [(theta, rho)])
  457. return jsonify({"success": True})
  458. except Exception as e:
  459. return jsonify({"success": False, "error": str(e)}), 500
  460. # Expose files for download if needed
  461. @app.route('/download/<filename>', methods=['GET'])
  462. def download_file(filename):
  463. """Download a file from the theta-rho directory."""
  464. return send_from_directory(THETA_RHO_DIR, filename)
  465. @app.route('/serial_status', methods=['GET'])
  466. def serial_status():
  467. global ser, ser_port
  468. return jsonify({
  469. 'connected': ser.is_open if ser else False,
  470. 'port': ser_port # Include the port name
  471. })
  472. @app.route('/pause_execution', methods=['POST'])
  473. def pause_execution():
  474. """Pause the current execution."""
  475. global pause_requested
  476. with pause_condition:
  477. pause_requested = True
  478. return jsonify({'success': True, 'message': 'Execution paused'})
  479. @app.route('/resume_execution', methods=['POST'])
  480. def resume_execution():
  481. """Resume execution after pausing."""
  482. global pause_requested
  483. with pause_condition:
  484. pause_requested = False
  485. pause_condition.notify_all() # Unblock the waiting thread
  486. return jsonify({'success': True, 'message': 'Execution resumed'})
  487. def load_playlists():
  488. """
  489. Load the entire playlists dictionary from the JSON file.
  490. Returns something like: {
  491. "My Playlist": ["file1.thr", "file2.thr"],
  492. "Another": ["x.thr"]
  493. }
  494. """
  495. with open(PLAYLISTS_FILE, "r") as f:
  496. return json.load(f)
  497. def save_playlists(playlists_dict):
  498. """
  499. Save the entire playlists dictionary back to the JSON file.
  500. """
  501. with open(PLAYLISTS_FILE, "w") as f:
  502. json.dump(playlists_dict, f, indent=2)
  503. @app.route("/list_all_playlists", methods=["GET"])
  504. def list_all_playlists():
  505. """
  506. Returns a list of all playlist names.
  507. Example return: ["My Playlist", "Another Playlist"]
  508. """
  509. playlists_dict = load_playlists()
  510. playlist_names = list(playlists_dict.keys())
  511. return jsonify(playlist_names)
  512. @app.route("/get_playlist", methods=["GET"])
  513. def get_playlist():
  514. """
  515. GET /get_playlist?name=My%20Playlist
  516. Returns: { "name": "My Playlist", "files": [... ] }
  517. """
  518. playlist_name = request.args.get("name", "")
  519. if not playlist_name:
  520. return jsonify({"error": "Missing playlist 'name' parameter"}), 400
  521. playlists_dict = load_playlists()
  522. if playlist_name not in playlists_dict:
  523. return jsonify({"error": f"Playlist '{playlist_name}' not found"}), 404
  524. files = playlists_dict[playlist_name] # e.g. ["file1.thr", "file2.thr"]
  525. return jsonify({
  526. "name": playlist_name,
  527. "files": files
  528. })
  529. @app.route("/create_playlist", methods=["POST"])
  530. def create_playlist():
  531. """
  532. POST /create_playlist
  533. Body: { "name": "My Playlist", "files": ["file1.thr", "file2.thr"] }
  534. Creates or overwrites a playlist with the given name.
  535. """
  536. data = request.get_json()
  537. if not data or "name" not in data or "files" not in data:
  538. return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
  539. playlist_name = data["name"]
  540. files = data["files"]
  541. # Load all playlists
  542. playlists_dict = load_playlists()
  543. # Overwrite or create new
  544. playlists_dict[playlist_name] = files
  545. # Save changes
  546. save_playlists(playlists_dict)
  547. return jsonify({
  548. "success": True,
  549. "message": f"Playlist '{playlist_name}' created/updated"
  550. })
  551. @app.route("/modify_playlist", methods=["POST"])
  552. def modify_playlist():
  553. """
  554. POST /modify_playlist
  555. Body: { "name": "My Playlist", "files": ["file1.thr", "file2.thr"] }
  556. Updates (or creates) the existing playlist with a new file list.
  557. You can 404 if you only want to allow modifications to existing playlists.
  558. """
  559. data = request.get_json()
  560. if not data or "name" not in data or "files" not in data:
  561. return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
  562. playlist_name = data["name"]
  563. files = data["files"]
  564. # Load all playlists
  565. playlists_dict = load_playlists()
  566. # Optional: If you want to disallow creating a new playlist here:
  567. # if playlist_name not in playlists_dict:
  568. # return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
  569. # Overwrite or create new
  570. playlists_dict[playlist_name] = files
  571. # Save
  572. save_playlists(playlists_dict)
  573. return jsonify({"success": True, "message": f"Playlist '{playlist_name}' updated"})
  574. @app.route("/delete_playlist", methods=["DELETE"])
  575. def delete_playlist():
  576. """
  577. DELETE /delete_playlist
  578. Body: { "name": "My Playlist" }
  579. Removes the playlist from the single JSON file.
  580. """
  581. data = request.get_json()
  582. if not data or "name" not in data:
  583. return jsonify({"success": False, "error": "Missing 'name' field"}), 400
  584. playlist_name = data["name"]
  585. playlists_dict = load_playlists()
  586. if playlist_name not in playlists_dict:
  587. return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
  588. # Remove from dict
  589. del playlists_dict[playlist_name]
  590. save_playlists(playlists_dict)
  591. return jsonify({
  592. "success": True,
  593. "message": f"Playlist '{playlist_name}' deleted"
  594. })
  595. @app.route('/add_to_playlist', methods=['POST'])
  596. def add_to_playlist():
  597. data = request.json
  598. playlist_name = data.get('playlist_name')
  599. pattern = data.get('pattern')
  600. # Load existing playlists
  601. with open('playlists.json', 'r') as f:
  602. playlists = json.load(f)
  603. # Add pattern to the selected playlist
  604. if playlist_name in playlists:
  605. playlists[playlist_name].append(pattern)
  606. with open('playlists.json', 'w') as f:
  607. json.dump(playlists, f)
  608. return jsonify(success=True)
  609. else:
  610. return jsonify(success=False, error='Playlist not found'), 404
  611. @app.route("/run_playlist", methods=["POST"])
  612. def run_playlist():
  613. """
  614. POST /run_playlist
  615. Body (JSON):
  616. {
  617. "playlist_name": "My Playlist",
  618. "pause_time": 1.0, # Optional: seconds to pause between patterns
  619. "clear_pattern": "random", # Optional: "clear_in", "clear_out", "clear_sideway", or "random"
  620. "run_mode": "single", # 'single' or 'indefinite'
  621. "shuffle": True # true or false
  622. "start_time": ""
  623. "end_time": ""
  624. }
  625. """
  626. data = request.get_json()
  627. # Validate input
  628. if not data or "playlist_name" not in data:
  629. return jsonify({"success": False, "error": "Missing 'playlist_name' field"}), 400
  630. playlist_name = data["playlist_name"]
  631. pause_time = data.get("pause_time", 0)
  632. clear_pattern = data.get("clear_pattern", None)
  633. run_mode = data.get("run_mode", "single") # Default to 'single' run
  634. shuffle = data.get("shuffle", False) # Default to no shuffle
  635. start_time = data.get("start_time", None)
  636. end_time = data.get("end_time", None)
  637. # Validate pause_time
  638. if not isinstance(pause_time, (int, float)) or pause_time < 0:
  639. return jsonify({"success": False, "error": "'pause_time' must be a non-negative number"}), 400
  640. # Validate clear_pattern
  641. valid_patterns = ["clear_in", "clear_out", "clear_sideway", "random"]
  642. if clear_pattern not in valid_patterns:
  643. clear_pattern = None
  644. # Validate run_mode
  645. if run_mode not in ["single", "indefinite"]:
  646. return jsonify({"success": False, "error": "'run_mode' must be 'single' or 'indefinite'"}), 400
  647. # Validate shuffle
  648. if not isinstance(shuffle, bool):
  649. return jsonify({"success": False, "error": "'shuffle' must be a boolean value"}), 400
  650. schedule_hours = None
  651. if start_time and end_time:
  652. try:
  653. # Convert HH:MM to datetime.time objects
  654. start_time_obj = datetime.strptime(start_time, "%H:%M").time()
  655. end_time_obj = datetime.strptime(end_time, "%H:%M").time()
  656. # Ensure start_time is before end_time
  657. if start_time_obj >= end_time_obj:
  658. return jsonify({"success": False, "error": "'start_time' must be earlier than 'end_time'"}), 400
  659. # Create schedule tuple with full time
  660. schedule_hours = (start_time_obj, end_time_obj)
  661. except ValueError:
  662. return jsonify({"success": False, "error": "Invalid time format. Use HH:MM (e.g., '09:30')"}), 400
  663. # Load playlists
  664. playlists = load_playlists()
  665. if playlist_name not in playlists:
  666. return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
  667. file_paths = playlists[playlist_name]
  668. file_paths = [os.path.join(THETA_RHO_DIR, file) for file in file_paths]
  669. if not file_paths:
  670. return jsonify({"success": False, "error": f"Playlist '{playlist_name}' is empty"}), 400
  671. # Start the playlist execution in a separate thread
  672. try:
  673. threading.Thread(
  674. target=run_theta_rho_files,
  675. args=(file_paths,),
  676. kwargs={
  677. 'pause_time': pause_time,
  678. 'clear_pattern': clear_pattern,
  679. 'run_mode': run_mode,
  680. 'shuffle': shuffle,
  681. 'schedule_hours': schedule_hours
  682. },
  683. daemon=True # Daemonize thread to exit with the main program
  684. ).start()
  685. return jsonify({"success": True, "message": f"Playlist '{playlist_name}' is now running."})
  686. except Exception as e:
  687. return jsonify({"success": False, "error": str(e)}), 500
  688. @app.route('/set_speed', methods=['POST'])
  689. def set_speed():
  690. """Set the speed for the Arduino."""
  691. global ser
  692. if ser is None or not ser.is_open:
  693. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  694. try:
  695. # Parse the speed value from the request
  696. data = request.json
  697. speed = data.get('speed')
  698. if speed is None:
  699. return jsonify({"success": False, "error": "Speed is required"}), 400
  700. if not isinstance(speed, (int, float)) or speed <= 0:
  701. return jsonify({"success": False, "error": "Invalid speed value"}), 400
  702. # Send the SET_SPEED command to the Arduino
  703. command = f"SET_SPEED {speed}"
  704. send_command(command)
  705. return jsonify({"success": True, "speed": speed})
  706. except Exception as e:
  707. return jsonify({"success": False, "error": str(e)}), 500
  708. if __name__ == '__main__':
  709. # Auto-connect to serial
  710. connect_to_serial()
  711. app.run(debug=True, host='0.0.0.0', port=8080)