app.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. from flask import Flask, request, jsonify, render_template
  2. import os
  3. import serial
  4. import time
  5. import threading
  6. import serial.tools.list_ports
  7. import math
  8. app = Flask(__name__)
  9. # Theta-rho directory
  10. THETA_RHO_DIR = './patterns'
  11. IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  12. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  13. # Serial connection (default None, will be set by user)
  14. ser = None
  15. ser_port = None # Global variable to store the serial port name
  16. stop_requested = False
  17. def list_serial_ports():
  18. """Return a list of available serial ports."""
  19. ports = serial.tools.list_ports.comports()
  20. return [port.device for port in ports if port.device not in IGNORE_PORTS]
  21. def connect_to_serial(port, baudrate=115200):
  22. """Connect to the specified serial port."""
  23. global ser, ser_port
  24. if ser and ser.is_open:
  25. ser.close()
  26. ser = serial.Serial(port, baudrate)
  27. ser_port = port # Store the connected port globally
  28. time.sleep(2) # Allow time for the connection to establish
  29. def disconnect_serial():
  30. """Disconnect the current serial connection."""
  31. global ser, ser_port
  32. if ser and ser.is_open:
  33. ser.close()
  34. ser = None
  35. ser_port = None # Reset the port name
  36. def restart_serial(port, baudrate=115200):
  37. """Restart the serial connection."""
  38. disconnect_serial()
  39. connect_to_serial(port, baudrate)
  40. def parse_theta_rho_file(file_path, apply_transformations=False):
  41. """
  42. Parse a theta-rho file and return a list of (theta, rho) pairs.
  43. Optionally apply transformations (rotation and mirroring).
  44. """
  45. coordinates = []
  46. try:
  47. with open(file_path, 'r') as file:
  48. for line in file:
  49. line = line.strip()
  50. # Skip header or comment lines (starting with '#' or empty lines)
  51. if not line or line.startswith("#"):
  52. continue
  53. # Parse lines with theta and rho separated by spaces
  54. try:
  55. theta, rho = map(float, line.split())
  56. if apply_transformations:
  57. # Rotate 90 degrees clockwise
  58. theta_rotated = theta - (math.pi / 2)
  59. if theta_rotated < 0: # Ensure theta stays within [0, 2π)
  60. theta_rotated += 2 * math.pi
  61. # Apply vertical mirror (negate theta)
  62. theta_mirrored = -theta_rotated
  63. if theta_mirrored < 0: # Ensure theta stays within [0, 2π)
  64. theta_mirrored += 2 * math.pi
  65. coordinates.append((theta_mirrored, rho))
  66. else:
  67. coordinates.append((theta, rho))
  68. except ValueError:
  69. print(f"Skipping invalid line: {line}")
  70. except Exception as e:
  71. print(f"Error reading file: {e}")
  72. return coordinates
  73. def send_coordinate_batch(ser, coordinates):
  74. """Send a batch of theta-rho pairs to the Arduino."""
  75. # print("Sending batch:", coordinates)
  76. batch_str = ";".join(f"{theta:.3f},{rho:.3f}" for theta, rho in coordinates) + ";\n"
  77. ser.write(batch_str.encode())
  78. def send_command(command):
  79. """Send a single command to the Arduino."""
  80. ser.write(f"{command}\n".encode())
  81. print(f"Sent: {command}")
  82. # Wait for "DONE" acknowledgment from Arduino
  83. while True:
  84. if ser.in_waiting > 0:
  85. response = ser.readline().decode().strip()
  86. print(f"Arduino response: {response}")
  87. if response == "DONE":
  88. print("Command execution completed.")
  89. break
  90. # time.sleep(0.5) # Small delay to avoid busy waiting
  91. def run_theta_rho_file(file_path):
  92. """Run a theta-rho file by sending data in optimized batches."""
  93. global stop_requested
  94. stop_requested = False
  95. coordinates = parse_theta_rho_file(file_path, True)
  96. if len(coordinates) < 2:
  97. print("Not enough coordinates for interpolation.")
  98. return
  99. # Optimize batch size for smoother execution
  100. batch_size = 1 # Smaller batches may smooth movement further
  101. for i in range(0, len(coordinates), batch_size):
  102. # Check stop_requested flag after sending the batch
  103. if stop_requested:
  104. print("Execution stopped by user after completing the current batch.")
  105. break
  106. batch = coordinates[i:i + batch_size]
  107. if i == 0:
  108. send_coordinate_batch(ser, batch)
  109. continue
  110. # Wait until Arduino is READY before sending the batch
  111. while True:
  112. if ser.in_waiting > 0:
  113. response = ser.readline().decode().strip()
  114. if response == "R":
  115. send_coordinate_batch(ser, batch)
  116. break
  117. else:
  118. print(f"Arduino response: {response}")
  119. # Reset theta after execution or stopping
  120. reset_theta()
  121. ser.write("FINISHED\n".encode())
  122. def reset_theta():
  123. ser.write("RESET_THETA\n".encode())
  124. while True:
  125. if ser.in_waiting > 0:
  126. response = ser.readline().decode().strip()
  127. print(f"Arduino response: {response}")
  128. if response == "THETA_RESET":
  129. print("Theta successfully reset.")
  130. break
  131. time.sleep(0.5) # Small delay to avoid busy waiting
  132. def read_serial_responses():
  133. """Continuously read and print all responses from the Arduino."""
  134. global ser
  135. if ser is None or not ser.is_open:
  136. print("Serial connection not established.")
  137. return
  138. while True:
  139. try:
  140. if ser.in_waiting > 0:
  141. response = ser.readline().decode().strip()
  142. print(f"Arduino response: {response}")
  143. except Exception as e:
  144. print(f"Error reading from serial: {e}")
  145. break
  146. # Flask endpoints
  147. @app.route('/')
  148. def index():
  149. return render_template('index.html')
  150. @app.route('/list_serial_ports', methods=['GET'])
  151. def list_ports():
  152. return jsonify(list_serial_ports())
  153. @app.route('/connect_serial', methods=['POST'])
  154. def connect_serial():
  155. port = request.json.get('port')
  156. if not port:
  157. return jsonify({'error': 'No port provided'}), 400
  158. try:
  159. connect_to_serial(port)
  160. return jsonify({'success': True})
  161. except Exception as e:
  162. return jsonify({'error': str(e)}), 500
  163. @app.route('/disconnect_serial', methods=['POST'])
  164. def disconnect():
  165. try:
  166. disconnect_serial()
  167. return jsonify({'success': True})
  168. except Exception as e:
  169. return jsonify({'error': str(e)}), 500
  170. @app.route('/restart_serial', methods=['POST'])
  171. def restart():
  172. port = request.json.get('port')
  173. if not port:
  174. return jsonify({'error': 'No port provided'}), 400
  175. try:
  176. restart_serial(port)
  177. return jsonify({'success': True})
  178. except Exception as e:
  179. return jsonify({'error': str(e)}), 500
  180. @app.route('/list_theta_rho_files', methods=['GET'])
  181. def list_theta_rho_files():
  182. files = []
  183. for root, _, filenames in os.walk(THETA_RHO_DIR):
  184. for file in filenames:
  185. # Construct the relative file path
  186. relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
  187. files.append(relative_path)
  188. return jsonify(sorted(files))
  189. @app.route('/upload_theta_rho', methods=['POST'])
  190. def upload_theta_rho():
  191. custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
  192. os.makedirs(custom_patterns_dir, exist_ok=True) # Ensure the directory exists
  193. file = request.files['file']
  194. if file:
  195. file.save(os.path.join(custom_patterns_dir, file.filename))
  196. return jsonify({'success': True})
  197. return jsonify({'success': False})
  198. @app.route('/run_theta_rho', methods=['POST'])
  199. def run_theta_rho():
  200. file_name = request.json.get('file_name')
  201. pre_execution = request.json.get('pre_execution') # New parameter for pre-execution action
  202. if not file_name:
  203. return jsonify({'error': 'No file name provided'}), 400
  204. file_path = os.path.join(THETA_RHO_DIR, file_name)
  205. if not os.path.exists(file_path):
  206. return jsonify({'error': 'File not found'}), 404
  207. try:
  208. # Handle pre-execution actions
  209. if pre_execution == 'clear_in':
  210. clear_in_thread = threading.Thread(target=run_theta_rho_file, args=('./patterns/clear_from_in.thr',))
  211. clear_in_thread.start()
  212. clear_in_thread.join() # Wait for completion before proceeding
  213. elif pre_execution == 'clear_out':
  214. clear_out_thread = threading.Thread(target=run_theta_rho_file, args=('./patterns/clear_from_out.thr',))
  215. clear_out_thread.start()
  216. clear_out_thread.join() # Wait for completion before proceeding
  217. elif pre_execution == 'none':
  218. pass # No pre-execution action required
  219. # Start the main pattern execution
  220. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  221. return jsonify({'success': True})
  222. except Exception as e:
  223. return jsonify({'error': str(e)}), 500
  224. @app.route('/stop_execution', methods=['POST'])
  225. def stop_execution():
  226. global stop_requested
  227. stop_requested = True
  228. return jsonify({'success': True})
  229. @app.route('/send_home', methods=['POST'])
  230. def send_home():
  231. """Send the HOME command to the Arduino."""
  232. try:
  233. send_command("HOME")
  234. return jsonify({'success': True})
  235. except Exception as e:
  236. return jsonify({'error': str(e)}), 500
  237. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  238. def run_specific_theta_rho_file(file_name):
  239. """Run a specific theta-rho file."""
  240. file_path = os.path.join(THETA_RHO_DIR, file_name)
  241. if not os.path.exists(file_path):
  242. return jsonify({'error': 'File not found'}), 404
  243. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  244. return jsonify({'success': True})
  245. @app.route('/delete_theta_rho_file', methods=['POST'])
  246. def delete_theta_rho_file():
  247. data = request.json
  248. file_name = data.get('file_name')
  249. if not file_name:
  250. return jsonify({"success": False, "error": "No file name provided"}), 400
  251. file_path = os.path.join(THETA_RHO_DIR, file_name)
  252. if not os.path.exists(file_path):
  253. return jsonify({"success": False, "error": "File not found"}), 404
  254. try:
  255. os.remove(file_path)
  256. return jsonify({"success": True})
  257. except Exception as e:
  258. return jsonify({"success": False, "error": str(e)}), 500
  259. @app.route('/move_to_center', methods=['POST'])
  260. def move_to_center():
  261. """Move the sand table to the center position."""
  262. try:
  263. if ser is None or not ser.is_open:
  264. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  265. coordinates = [(0, 0)] # Center position
  266. send_coordinate_batch(ser, coordinates)
  267. return jsonify({"success": True})
  268. except Exception as e:
  269. return jsonify({"success": False, "error": str(e)}), 500
  270. @app.route('/move_to_perimeter', methods=['POST'])
  271. def move_to_perimeter():
  272. """Move the sand table to the perimeter position."""
  273. try:
  274. if ser is None or not ser.is_open:
  275. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  276. MAX_RHO = 1
  277. coordinates = [(0, MAX_RHO)] # Perimeter position
  278. send_coordinate_batch(ser, coordinates)
  279. return jsonify({"success": True})
  280. except Exception as e:
  281. return jsonify({"success": False, "error": str(e)}), 500
  282. @app.route('/preview_thr', methods=['POST'])
  283. def preview_thr():
  284. file_name = request.json.get('file_name')
  285. if not file_name:
  286. return jsonify({'error': 'No file name provided'}), 400
  287. file_path = os.path.join(THETA_RHO_DIR, file_name)
  288. if not os.path.exists(file_path):
  289. return jsonify({'error': 'File not found'}), 404
  290. try:
  291. # Parse the .thr file with transformations
  292. coordinates = parse_theta_rho_file(file_path, apply_transformations=True)
  293. return jsonify({'success': True, 'coordinates': coordinates})
  294. except Exception as e:
  295. return jsonify({'error': str(e)}), 500
  296. @app.route('/send_coordinate', methods=['POST'])
  297. def send_coordinate():
  298. """Send a single (theta, rho) coordinate to the Arduino."""
  299. global ser
  300. if ser is None or not ser.is_open:
  301. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  302. try:
  303. data = request.json
  304. theta = data.get('theta')
  305. rho = data.get('rho')
  306. if theta is None or rho is None:
  307. return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
  308. # Send the coordinate to the Arduino
  309. send_coordinate_batch(ser, [(theta, rho)])
  310. reset_theta()
  311. return jsonify({"success": True})
  312. except Exception as e:
  313. return jsonify({"success": False, "error": str(e)}), 500
  314. # Expose files for download if needed
  315. @app.route('/download/<filename>', methods=['GET'])
  316. def download_file(filename):
  317. """Download a file from the theta-rho directory."""
  318. return send_from_directory(THETA_RHO_DIR, filename)
  319. @app.route('/serial_status', methods=['GET'])
  320. def serial_status():
  321. global ser, ser_port
  322. return jsonify({
  323. 'connected': ser.is_open if ser else False,
  324. 'port': ser_port # Include the port name
  325. })
  326. @app.route('/set_speed', methods=['POST'])
  327. def set_speed():
  328. """Set the speed for the Arduino."""
  329. global ser
  330. if ser is None or not ser.is_open:
  331. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  332. try:
  333. # Parse the speed value from the request
  334. data = request.json
  335. speed = data.get('speed')
  336. if speed is None:
  337. return jsonify({"success": False, "error": "Speed is required"}), 400
  338. if not isinstance(speed, (int, float)) or speed <= 0:
  339. return jsonify({"success": False, "error": "Invalid speed value"}), 400
  340. # Send the SET_SPEED command to the Arduino
  341. command = f"SET_SPEED {speed}"
  342. send_command(command)
  343. return jsonify({"success": True, "speed": speed})
  344. except Exception as e:
  345. return jsonify({"success": False, "error": str(e)}), 500
  346. if __name__ == '__main__':
  347. # Start the thread for reading Arduino responses
  348. threading.Thread(target=read_serial_responses, daemon=True).start()
  349. app.run(debug=True, host='0.0.0.0', port=8080)