1
0

app.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. from flask import Flask, request, jsonify, render_template
  2. import os
  3. import serial
  4. import time
  5. import threading
  6. import serial.tools.list_ports
  7. import math
  8. import json
  9. app = Flask(__name__)
  10. # Theta-rho directory
  11. THETA_RHO_DIR = './patterns'
  12. IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  13. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  14. # Serial connection (default None, will be set by user)
  15. ser = None
  16. ser_port = None # Global variable to store the serial port name
  17. stop_requested = False
  18. PLAYLISTS_FILE = 'playlists.json'
  19. def list_serial_ports():
  20. """Return a list of available serial ports."""
  21. ports = serial.tools.list_ports.comports()
  22. return [port.device for port in ports if port.device not in IGNORE_PORTS]
  23. def connect_to_serial(port, baudrate=115200):
  24. """Connect to the specified serial port."""
  25. global ser, ser_port
  26. if ser and ser.is_open:
  27. ser.close()
  28. ser = serial.Serial(port, baudrate)
  29. ser_port = port # Store the connected port globally
  30. time.sleep(2) # Allow time for the connection to establish
  31. def disconnect_serial():
  32. """Disconnect the current serial connection."""
  33. global ser, ser_port
  34. if ser and ser.is_open:
  35. ser.close()
  36. ser = None
  37. ser_port = None # Reset the port name
  38. def restart_serial(port, baudrate=115200):
  39. """Restart the serial connection."""
  40. disconnect_serial()
  41. connect_to_serial(port, baudrate)
  42. def parse_theta_rho_file(file_path, apply_transformations=False):
  43. """
  44. Parse a theta-rho file and return a list of (theta, rho) pairs.
  45. Optionally apply transformations (rotation and mirroring).
  46. """
  47. coordinates = []
  48. try:
  49. with open(file_path, 'r') as file:
  50. for line in file:
  51. line = line.strip()
  52. # Skip header or comment lines (starting with '#' or empty lines)
  53. if not line or line.startswith("#"):
  54. continue
  55. # Parse lines with theta and rho separated by spaces
  56. try:
  57. theta, rho = map(float, line.split())
  58. if apply_transformations:
  59. # Rotate 90 degrees clockwise
  60. theta_rotated = theta - (math.pi / 2)
  61. if theta_rotated < 0: # Ensure theta stays within [0, 2π)
  62. theta_rotated += 2 * math.pi
  63. # Apply vertical mirror (negate theta)
  64. theta_mirrored = -theta_rotated
  65. if theta_mirrored < 0: # Ensure theta stays within [0, 2π)
  66. theta_mirrored += 2 * math.pi
  67. coordinates.append((theta_mirrored, rho))
  68. else:
  69. coordinates.append((theta, rho))
  70. except ValueError:
  71. print(f"Skipping invalid line: {line}")
  72. except Exception as e:
  73. print(f"Error reading file: {e}")
  74. return coordinates
  75. def send_coordinate_batch(ser, coordinates):
  76. """Send a batch of theta-rho pairs to the Arduino."""
  77. # print("Sending batch:", coordinates)
  78. batch_str = ";".join(f"{theta:.3f},{rho:.3f}" for theta, rho in coordinates) + ";\n"
  79. ser.write(batch_str.encode())
  80. def send_command(command):
  81. """Send a single command to the Arduino."""
  82. ser.write(f"{command}\n".encode())
  83. print(f"Sent: {command}")
  84. # Wait for "DONE" acknowledgment from Arduino
  85. while True:
  86. if ser.in_waiting > 0:
  87. response = ser.readline().decode().strip()
  88. print(f"Arduino response: {response}")
  89. if response == "DONE":
  90. print("Command execution completed.")
  91. break
  92. # time.sleep(0.5) # Small delay to avoid busy waiting
  93. def run_theta_rho_file(file_path):
  94. """Run a theta-rho file by sending data in optimized batches."""
  95. global stop_requested
  96. stop_requested = False
  97. coordinates = parse_theta_rho_file(file_path, True)
  98. if len(coordinates) < 2:
  99. print("Not enough coordinates for interpolation.")
  100. return
  101. # Optimize batch size for smoother execution
  102. batch_size = 1 # Smaller batches may smooth movement further
  103. for i in range(0, len(coordinates), batch_size):
  104. # Check stop_requested flag after sending the batch
  105. if stop_requested:
  106. print("Execution stopped by user after completing the current batch.")
  107. break
  108. batch = coordinates[i:i + batch_size]
  109. if i == 0:
  110. send_coordinate_batch(ser, batch)
  111. continue
  112. # Wait until Arduino is READY before sending the batch
  113. while True:
  114. if ser.in_waiting > 0:
  115. response = ser.readline().decode().strip()
  116. if response == "R":
  117. send_coordinate_batch(ser, batch)
  118. break
  119. else:
  120. print(f"Arduino response: {response}")
  121. # Reset theta after execution or stopping
  122. reset_theta()
  123. ser.write("FINISHED\n".encode())
  124. def reset_theta():
  125. ser.write("RESET_THETA\n".encode())
  126. while True:
  127. if ser.in_waiting > 0:
  128. response = ser.readline().decode().strip()
  129. print(f"Arduino response: {response}")
  130. if response == "THETA_RESET":
  131. print("Theta successfully reset.")
  132. break
  133. time.sleep(0.5) # Small delay to avoid busy waiting
  134. def read_serial_responses():
  135. """Continuously read and print all responses from the Arduino."""
  136. global ser
  137. if ser is None or not ser.is_open:
  138. print("Serial connection not established.")
  139. return
  140. while True:
  141. try:
  142. if ser.in_waiting > 0:
  143. response = ser.readline().decode().strip()
  144. print(f"Arduino response: {response}")
  145. except Exception as e:
  146. print(f"Error reading from serial: {e}")
  147. break
  148. def load_playlists():
  149. """Load playlists from the file."""
  150. with open(PLAYLISTS_FILE, 'r') as f:
  151. return json.load(f)
  152. def save_playlists(playlists):
  153. """Save playlists to the file."""
  154. with open(PLAYLISTS_FILE, 'w') as f:
  155. json.dump(playlists, f, indent=4)
  156. # Flask endpoints
  157. @app.route('/')
  158. def index():
  159. return render_template('index.html')
  160. @app.route('/list_serial_ports', methods=['GET'])
  161. def list_ports():
  162. return jsonify(list_serial_ports())
  163. @app.route('/connect_serial', methods=['POST'])
  164. def connect_serial():
  165. port = request.json.get('port')
  166. if not port:
  167. return jsonify({'error': 'No port provided'}), 400
  168. try:
  169. connect_to_serial(port)
  170. return jsonify({'success': True})
  171. except Exception as e:
  172. return jsonify({'error': str(e)}), 500
  173. @app.route('/disconnect_serial', methods=['POST'])
  174. def disconnect():
  175. try:
  176. disconnect_serial()
  177. return jsonify({'success': True})
  178. except Exception as e:
  179. return jsonify({'error': str(e)}), 500
  180. @app.route('/restart_serial', methods=['POST'])
  181. def restart():
  182. port = request.json.get('port')
  183. if not port:
  184. return jsonify({'error': 'No port provided'}), 400
  185. try:
  186. restart_serial(port)
  187. return jsonify({'success': True})
  188. except Exception as e:
  189. return jsonify({'error': str(e)}), 500
  190. @app.route('/list_theta_rho_files', methods=['GET'])
  191. def list_theta_rho_files():
  192. files = []
  193. for root, _, filenames in os.walk(THETA_RHO_DIR):
  194. for file in filenames:
  195. # Construct the relative file path
  196. relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
  197. files.append(relative_path)
  198. return jsonify(sorted(files))
  199. @app.route('/upload_theta_rho', methods=['POST'])
  200. def upload_theta_rho():
  201. custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
  202. os.makedirs(custom_patterns_dir, exist_ok=True) # Ensure the directory exists
  203. file = request.files['file']
  204. if file:
  205. file.save(os.path.join(custom_patterns_dir, file.filename))
  206. return jsonify({'success': True})
  207. return jsonify({'success': False})
  208. @app.route('/run_theta_rho', methods=['POST'])
  209. def run_theta_rho():
  210. file_name = request.json.get('file_name')
  211. pre_execution = request.json.get('pre_execution') # New parameter for pre-execution action
  212. if not file_name:
  213. return jsonify({'error': 'No file name provided'}), 400
  214. file_path = os.path.join(THETA_RHO_DIR, file_name)
  215. if not os.path.exists(file_path):
  216. return jsonify({'error': 'File not found'}), 404
  217. try:
  218. # Handle pre-execution actions
  219. if pre_execution == 'clear_in':
  220. clear_in_thread = threading.Thread(target=run_theta_rho_file, args=('./patterns/clear_from_in.thr',))
  221. clear_in_thread.start()
  222. clear_in_thread.join() # Wait for completion before proceeding
  223. elif pre_execution == 'clear_out':
  224. clear_out_thread = threading.Thread(target=run_theta_rho_file, args=('./patterns/clear_from_out.thr',))
  225. clear_out_thread.start()
  226. clear_out_thread.join() # Wait for completion before proceeding
  227. elif pre_execution == 'none':
  228. pass # No pre-execution action required
  229. # Start the main pattern execution
  230. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  231. return jsonify({'success': True})
  232. except Exception as e:
  233. return jsonify({'error': str(e)}), 500
  234. @app.route('/stop_execution', methods=['POST'])
  235. def stop_execution():
  236. global stop_requested
  237. stop_requested = True
  238. return jsonify({'success': True})
  239. @app.route('/send_home', methods=['POST'])
  240. def send_home():
  241. """Send the HOME command to the Arduino."""
  242. try:
  243. send_command("HOME")
  244. return jsonify({'success': True})
  245. except Exception as e:
  246. return jsonify({'error': str(e)}), 500
  247. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  248. def run_specific_theta_rho_file(file_name):
  249. """Run a specific theta-rho file."""
  250. file_path = os.path.join(THETA_RHO_DIR, file_name)
  251. if not os.path.exists(file_path):
  252. return jsonify({'error': 'File not found'}), 404
  253. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  254. return jsonify({'success': True})
  255. @app.route('/delete_theta_rho_file', methods=['POST'])
  256. def delete_theta_rho_file():
  257. data = request.json
  258. file_name = data.get('file_name')
  259. if not file_name:
  260. return jsonify({"success": False, "error": "No file name provided"}), 400
  261. file_path = os.path.join(THETA_RHO_DIR, file_name)
  262. if not os.path.exists(file_path):
  263. return jsonify({"success": False, "error": "File not found"}), 404
  264. try:
  265. os.remove(file_path)
  266. return jsonify({"success": True})
  267. except Exception as e:
  268. return jsonify({"success": False, "error": str(e)}), 500
  269. @app.route('/move_to_center', methods=['POST'])
  270. def move_to_center():
  271. """Move the sand table to the center position."""
  272. try:
  273. if ser is None or not ser.is_open:
  274. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  275. coordinates = [(0, 0)] # Center position
  276. send_coordinate_batch(ser, coordinates)
  277. return jsonify({"success": True})
  278. except Exception as e:
  279. return jsonify({"success": False, "error": str(e)}), 500
  280. @app.route('/move_to_perimeter', methods=['POST'])
  281. def move_to_perimeter():
  282. """Move the sand table to the perimeter position."""
  283. try:
  284. if ser is None or not ser.is_open:
  285. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  286. MAX_RHO = 1
  287. coordinates = [(0, MAX_RHO)] # Perimeter position
  288. send_coordinate_batch(ser, coordinates)
  289. return jsonify({"success": True})
  290. except Exception as e:
  291. return jsonify({"success": False, "error": str(e)}), 500
  292. @app.route('/preview_thr', methods=['POST'])
  293. def preview_thr():
  294. file_name = request.json.get('file_name')
  295. if not file_name:
  296. return jsonify({'error': 'No file name provided'}), 400
  297. file_path = os.path.join(THETA_RHO_DIR, file_name)
  298. if not os.path.exists(file_path):
  299. return jsonify({'error': 'File not found'}), 404
  300. try:
  301. # Parse the .thr file with transformations
  302. coordinates = parse_theta_rho_file(file_path, apply_transformations=True)
  303. return jsonify({'success': True, 'coordinates': coordinates})
  304. except Exception as e:
  305. return jsonify({'error': str(e)}), 500
  306. @app.route('/send_coordinate', methods=['POST'])
  307. def send_coordinate():
  308. """Send a single (theta, rho) coordinate to the Arduino."""
  309. global ser
  310. if ser is None or not ser.is_open:
  311. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  312. try:
  313. data = request.json
  314. theta = data.get('theta')
  315. rho = data.get('rho')
  316. if theta is None or rho is None:
  317. return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
  318. # Send the coordinate to the Arduino
  319. send_coordinate_batch(ser, [(theta, rho)])
  320. reset_theta()
  321. return jsonify({"success": True})
  322. except Exception as e:
  323. return jsonify({"success": False, "error": str(e)}), 500
  324. # Expose files for download if needed
  325. @app.route('/download/<filename>', methods=['GET'])
  326. def download_file(filename):
  327. """Download a file from the theta-rho directory."""
  328. return send_from_directory(THETA_RHO_DIR, filename)
  329. @app.route('/serial_status', methods=['GET'])
  330. def serial_status():
  331. global ser, ser_port
  332. return jsonify({
  333. 'connected': ser.is_open if ser else False,
  334. 'port': ser_port # Include the port name
  335. })
  336. @app.route('/set_speed', methods=['POST'])
  337. def set_speed():
  338. """Set the speed for the Arduino."""
  339. global ser
  340. if ser is None or not ser.is_open:
  341. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  342. try:
  343. # Parse the speed value from the request
  344. data = request.json
  345. speed = data.get('speed')
  346. if speed is None:
  347. return jsonify({"success": False, "error": "Speed is required"}), 400
  348. if not isinstance(speed, (int, float)) or speed <= 0:
  349. return jsonify({"success": False, "error": "Invalid speed value"}), 400
  350. # Send the SET_SPEED command to the Arduino
  351. command = f"SET_SPEED {speed}"
  352. send_command(command)
  353. return jsonify({"success": True, "speed": speed})
  354. except Exception as e:
  355. return jsonify({"success": False, "error": str(e)}), 500
  356. @app.route('/playlists', methods=['GET'])
  357. def get_playlists():
  358. """Get all playlists."""
  359. return jsonify(load_playlists())
  360. @app.route('/playlists', methods=['POST'])
  361. def create_playlist():
  362. """Create a new playlist."""
  363. data = request.json
  364. name = data.get('name')
  365. tracks = data.get('tracks', [])
  366. if not name:
  367. return jsonify({'error': 'Playlist name is required'}), 400
  368. playlists = load_playlists()
  369. if name in playlists:
  370. return jsonify({'error': 'Playlist already exists'}), 400
  371. playlists[name] = {'tracks': tracks}
  372. save_playlists(playlists)
  373. return jsonify({'success': True, 'playlist': {name: playlists[name]}})
  374. @app.route('/playlists/<name>', methods=['PUT'])
  375. def modify_playlist(name):
  376. """Modify an existing playlist."""
  377. data = request.json
  378. tracks = data.get('tracks')
  379. playlists = load_playlists()
  380. if name not in playlists:
  381. return jsonify({'error': 'Playlist not found'}), 404
  382. if tracks is not None:
  383. playlists[name]['tracks'] = tracks
  384. save_playlists(playlists)
  385. return jsonify({'success': True, 'playlist': {name: playlists[name]}})
  386. @app.route('/playlists/<name>', methods=['DELETE'])
  387. def delete_playlist(name):
  388. """Delete a playlist."""
  389. playlists = load_playlists()
  390. if name not in playlists:
  391. return jsonify({'error': 'Playlist not found'}), 404
  392. del playlists[name]
  393. save_playlists(playlists)
  394. return jsonify({'success': True})
  395. @app.route('/playlists/run', methods=['POST'])
  396. def run_playlist():
  397. """Run a playlist with specified options."""
  398. data = request.json
  399. name = data.get('name')
  400. options = data.get('options', {})
  401. loop = options.get('loop', False)
  402. shuffle = options.get('shuffle', False)
  403. delay = options.get('delay', 0) # Time between patterns in seconds
  404. playlists = load_playlists()
  405. if name not in playlists:
  406. return jsonify({'error': 'Playlist not found'}), 404
  407. tracks = playlists[name]['tracks']
  408. if shuffle:
  409. import random
  410. random.shuffle(tracks)
  411. def run_tracks():
  412. while True:
  413. for track in tracks:
  414. run_theta_rho_file(os.path.join(THETA_RHO_DIR, track))
  415. time.sleep(delay)
  416. if not loop:
  417. break
  418. threading.Thread(target=run_tracks, daemon=True).start()
  419. return jsonify({'success': True, 'message': 'Playlist execution started'})
  420. if __name__ == '__main__':
  421. # Ensure the playlists file exists
  422. if not os.path.exists(PLAYLISTS_FILE):
  423. with open(PLAYLISTS_FILE, 'w') as f:
  424. json.dump({}, f)
  425. # Start the thread for reading Arduino responses
  426. threading.Thread(target=read_serial_responses, daemon=True).start()
  427. app.run(debug=True, host='0.0.0.0', port=8080)