1
0

app.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. from flask import Flask, request, jsonify, render_template
  2. import os
  3. import serial
  4. import time
  5. import threading
  6. import serial.tools.list_ports
  7. import math
  8. app = Flask(__name__)
  9. # Theta-rho directory
  10. THETA_RHO_DIR = './patterns'
  11. IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  12. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  13. # Serial connection (default None, will be set by user)
  14. ser = None
  15. ser_port = None # Global variable to store the serial port name
  16. stop_requested = False
  17. def list_serial_ports():
  18. """Return a list of available serial ports."""
  19. ports = serial.tools.list_ports.comports()
  20. return [port.device for port in ports if port.device not in IGNORE_PORTS]
  21. def connect_to_serial(port, baudrate=115200):
  22. """Connect to the specified serial port."""
  23. global ser, ser_port
  24. if ser and ser.is_open:
  25. ser.close()
  26. ser = serial.Serial(port, baudrate)
  27. ser_port = port # Store the connected port globally
  28. time.sleep(2) # Allow time for the connection to establish
  29. def disconnect_serial():
  30. """Disconnect the current serial connection."""
  31. global ser, ser_port
  32. if ser and ser.is_open:
  33. ser.close()
  34. ser = None
  35. ser_port = None # Reset the port name
  36. def restart_serial(port, baudrate=115200):
  37. """Restart the serial connection."""
  38. disconnect_serial()
  39. connect_to_serial(port, baudrate)
  40. def parse_theta_rho_file(file_path):
  41. """Parse a theta-rho file and return a list of (theta, rho) pairs."""
  42. coordinates = []
  43. with open(file_path, 'r') as file:
  44. for line in file:
  45. line = line.strip()
  46. # Skip header or comment lines (starting with '#' or empty lines)
  47. if not line or line.startswith("#"):
  48. print(f"Skipping invalid line: {line}")
  49. continue
  50. # Parse lines with theta and rho separated by spaces
  51. try:
  52. theta, rho = map(float, line.split())
  53. coordinates.append((theta, rho))
  54. except ValueError:
  55. print(f"Skipping invalid line: {line}")
  56. return coordinates
  57. def send_coordinate_batch(ser, coordinates):
  58. """Send a batch of theta-rho pairs to the Arduino."""
  59. # print("Sending batch:", coordinates)
  60. batch_str = ";".join(f"{theta:.3f},{rho:.3f}" for theta, rho in coordinates) + ";\n"
  61. ser.write(batch_str.encode())
  62. def send_command(command):
  63. """Send a single command to the Arduino."""
  64. ser.write(f"{command}\n".encode())
  65. print(f"Sent: {command}")
  66. # Wait for "DONE" acknowledgment from Arduino
  67. while True:
  68. if ser.in_waiting > 0:
  69. response = ser.readline().decode().strip()
  70. print(f"Arduino response: {response}")
  71. if response == "DONE":
  72. print("Command execution completed.")
  73. break
  74. time.sleep(0.5) # Small delay to avoid busy waiting
  75. def run_theta_rho_file(file_path):
  76. """Run a theta-rho file by sending data in optimized batches."""
  77. global stop_requested
  78. stop_requested = False
  79. coordinates = parse_theta_rho_file(file_path)
  80. if len(coordinates) < 2:
  81. print("Not enough coordinates for interpolation.")
  82. return
  83. # Optimize batch size for smoother execution
  84. batch_size = 10 # Smaller batches may smooth movement further
  85. for i in range(0, len(coordinates), batch_size):
  86. batch = coordinates[i:i + batch_size]
  87. # Wait until Arduino is READY before sending the batch
  88. while True:
  89. if ser.in_waiting > 0:
  90. response = ser.readline().decode().strip()
  91. if response == "READY":
  92. send_coordinate_batch(ser, batch)
  93. break
  94. else:
  95. print(f"Arduino response: {response}")
  96. # Check stop_requested flag after sending the batch
  97. if stop_requested:
  98. print("Execution stopped by user after completing the current batch.")
  99. break
  100. # Reset theta after execution or stopping
  101. reset_theta()
  102. ser.write("FINISHED\n".encode())
  103. def reset_theta():
  104. ser.write("RESET_THETA\n".encode())
  105. while True:
  106. if ser.in_waiting > 0:
  107. response = ser.readline().decode().strip()
  108. print(f"Arduino response: {response}")
  109. if response == "THETA_RESET":
  110. print("Theta successfully reset.")
  111. break
  112. time.sleep(0.5) # Small delay to avoid busy waiting
  113. def read_serial_responses():
  114. """Continuously read and print all responses from the Arduino."""
  115. global ser
  116. if ser is None or not ser.is_open:
  117. print("Serial connection not established.")
  118. return
  119. while True:
  120. try:
  121. if ser.in_waiting > 0:
  122. response = ser.readline().decode().strip()
  123. print(f"Arduino response: {response}")
  124. except Exception as e:
  125. print(f"Error reading from serial: {e}")
  126. break
  127. @app.route('/')
  128. def index():
  129. return render_template('index.html')
  130. @app.route('/list_serial_ports', methods=['GET'])
  131. def list_ports():
  132. return jsonify(list_serial_ports())
  133. @app.route('/connect_serial', methods=['POST'])
  134. def connect_serial():
  135. port = request.json.get('port')
  136. if not port:
  137. return jsonify({'error': 'No port provided'}), 400
  138. try:
  139. connect_to_serial(port)
  140. return jsonify({'success': True})
  141. except Exception as e:
  142. return jsonify({'error': str(e)}), 500
  143. @app.route('/disconnect_serial', methods=['POST'])
  144. def disconnect():
  145. try:
  146. disconnect_serial()
  147. return jsonify({'success': True})
  148. except Exception as e:
  149. return jsonify({'error': str(e)}), 500
  150. @app.route('/restart_serial', methods=['POST'])
  151. def restart():
  152. port = request.json.get('port')
  153. if not port:
  154. return jsonify({'error': 'No port provided'}), 400
  155. try:
  156. restart_serial(port)
  157. return jsonify({'success': True})
  158. except Exception as e:
  159. return jsonify({'error': str(e)}), 500
  160. @app.route('/list_theta_rho_files', methods=['GET'])
  161. def list_theta_rho_files():
  162. files = os.listdir(THETA_RHO_DIR)
  163. files = sorted([file for file in files if files not in ['clear_from_in.thr', 'clear_from_out.thr']])
  164. return jsonify(sorted(files))
  165. @app.route('/upload_theta_rho', methods=['POST'])
  166. def upload_theta_rho():
  167. file = request.files['file']
  168. if file:
  169. file.save(os.path.join(THETA_RHO_DIR, file.filename))
  170. return jsonify({'success': True})
  171. return jsonify({'success': False})
  172. @app.route('/run_theta_rho', methods=['POST'])
  173. def run_theta_rho():
  174. file_name = request.json.get('file_name')
  175. pre_execution = request.json.get('pre_execution') # New parameter for pre-execution action
  176. if not file_name:
  177. return jsonify({'error': 'No file name provided'}), 400
  178. file_path = os.path.join(THETA_RHO_DIR, file_name)
  179. if not os.path.exists(file_path):
  180. return jsonify({'error': 'File not found'}), 404
  181. try:
  182. # Handle pre-execution actions
  183. if pre_execution == 'clear_in':
  184. clear_in_thread = threading.Thread(target=run_theta_rho_file, args=('./patterns/clear_from_in.thr',))
  185. clear_in_thread.start()
  186. clear_in_thread.join() # Wait for completion before proceeding
  187. elif pre_execution == 'clear_out':
  188. clear_out_thread = threading.Thread(target=run_theta_rho_file, args=('./patterns/clear_from_out.thr',))
  189. clear_out_thread.start()
  190. clear_out_thread.join() # Wait for completion before proceeding
  191. elif pre_execution == 'none':
  192. pass # No pre-execution action required
  193. # Start the main pattern execution
  194. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  195. return jsonify({'success': True})
  196. except Exception as e:
  197. return jsonify({'error': str(e)}), 500
  198. @app.route('/stop_execution', methods=['POST'])
  199. def stop_execution():
  200. global stop_requested
  201. stop_requested = True
  202. reset_theta()
  203. return jsonify({'success': True})
  204. @app.route('/send_home', methods=['POST'])
  205. def send_home():
  206. """Send the HOME command to the Arduino."""
  207. try:
  208. send_command("HOME")
  209. return jsonify({'success': True})
  210. except Exception as e:
  211. return jsonify({'error': str(e)}), 500
  212. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  213. def run_specific_theta_rho_file(file_name):
  214. """Run a specific theta-rho file."""
  215. file_path = os.path.join(THETA_RHO_DIR, file_name)
  216. if not os.path.exists(file_path):
  217. return jsonify({'error': 'File not found'}), 404
  218. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  219. return jsonify({'success': True})
  220. @app.route('/delete_theta_rho_file', methods=['POST'])
  221. def delete_theta_rho_file():
  222. data = request.json
  223. file_name = data.get('file_name')
  224. if not file_name:
  225. return jsonify({"success": False, "error": "No file name provided"}), 400
  226. file_path = os.path.join(THETA_RHO_DIR, file_name)
  227. if not os.path.exists(file_path):
  228. return jsonify({"success": False, "error": "File not found"}), 404
  229. try:
  230. os.remove(file_path)
  231. return jsonify({"success": True})
  232. except Exception as e:
  233. return jsonify({"success": False, "error": str(e)}), 500
  234. @app.route('/move_to_center', methods=['POST'])
  235. def move_to_center():
  236. """Move the sand table to the center position."""
  237. try:
  238. if ser is None or not ser.is_open:
  239. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  240. coordinates = [(0, 0)] # Center position
  241. send_coordinate_batch(ser, coordinates)
  242. return jsonify({"success": True})
  243. except Exception as e:
  244. return jsonify({"success": False, "error": str(e)}), 500
  245. @app.route('/move_to_perimeter', methods=['POST'])
  246. def move_to_perimeter():
  247. """Move the sand table to the perimeter position."""
  248. try:
  249. if ser is None or not ser.is_open:
  250. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  251. MAX_RHO = 1
  252. coordinates = [(0, MAX_RHO)] # Perimeter position
  253. send_coordinate_batch(ser, coordinates)
  254. return jsonify({"success": True})
  255. except Exception as e:
  256. return jsonify({"success": False, "error": str(e)}), 500
  257. @app.route('/preview_thr', methods=['POST'])
  258. def preview_thr():
  259. file_name = request.json.get('file_name')
  260. if not file_name:
  261. return jsonify({'error': 'No file name provided'}), 400
  262. file_path = os.path.join(THETA_RHO_DIR, file_name)
  263. if not os.path.exists(file_path):
  264. return jsonify({'error': 'File not found'}), 404
  265. try:
  266. # Read the .thr file and parse the coordinates
  267. with open(file_path, 'r') as file:
  268. lines = file.readlines()
  269. coordinates = []
  270. for line in lines:
  271. # Ignore comments or blank lines
  272. if line.strip().startswith('#') or not line.strip():
  273. continue
  274. theta, rho = map(float, line.split())
  275. coordinates.append((theta, rho))
  276. return jsonify({'success': True, 'coordinates': coordinates})
  277. except Exception as e:
  278. return jsonify({'error': str(e)}), 500
  279. @app.route('/send_coordinate', methods=['POST'])
  280. def send_coordinate():
  281. """Send a single (theta, rho) coordinate to the Arduino."""
  282. global ser
  283. if ser is None or not ser.is_open:
  284. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  285. try:
  286. data = request.json
  287. theta = data.get('theta')
  288. rho = data.get('rho')
  289. if theta is None or rho is None:
  290. return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
  291. # Send the coordinate to the Arduino
  292. send_coordinate_batch(ser, [(theta, rho)])
  293. reset_theta()
  294. return jsonify({"success": True})
  295. except Exception as e:
  296. return jsonify({"success": False, "error": str(e)}), 500
  297. # Expose files for download if needed
  298. @app.route('/download/<filename>', methods=['GET'])
  299. def download_file(filename):
  300. """Download a file from the theta-rho directory."""
  301. return send_from_directory(THETA_RHO_DIR, filename)
  302. @app.route('/serial_status', methods=['GET'])
  303. def serial_status():
  304. global ser, ser_port
  305. return jsonify({
  306. 'connected': ser.is_open if ser else False,
  307. 'port': ser_port # Include the port name
  308. })
  309. if __name__ == '__main__':
  310. # Start the thread for reading Arduino responses
  311. threading.Thread(target=read_serial_responses, daemon=True).start()
  312. app.run(debug=True, host='0.0.0.0', port=8080)