app.py 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296
  1. from flask import Flask, request, jsonify, render_template
  2. import os
  3. import serial
  4. import time
  5. import threading
  6. import serial.tools.list_ports
  7. import math
  8. app = Flask(__name__)
  9. # Theta-rho directory
  10. THETA_RHO_DIR = './patterns'
  11. IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  12. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  13. # Serial connection (default None, will be set by user)
  14. ser = None
  15. stop_requested = False
  16. def list_serial_ports():
  17. """Return a list of available serial ports."""
  18. ports = serial.tools.list_ports.comports()
  19. return [port.device for port in ports if port.device not in IGNORE_PORTS]
  20. def connect_to_serial(port, baudrate=115200):
  21. """Connect to the specified serial port."""
  22. global ser
  23. if ser and ser.is_open:
  24. ser.close()
  25. ser = serial.Serial(port, baudrate)
  26. time.sleep(2) # Allow time for the connection to establish
  27. def disconnect_serial():
  28. """Disconnect the current serial connection."""
  29. global ser
  30. if ser and ser.is_open:
  31. ser.close()
  32. ser = None
  33. def restart_serial(port, baudrate=115200):
  34. """Restart the serial connection."""
  35. disconnect_serial()
  36. connect_to_serial(port, baudrate)
  37. def parse_theta_rho_file(file_path):
  38. """Parse a theta-rho file and return a list of (theta, rho) pairs."""
  39. coordinates = []
  40. with open(file_path, 'r') as file:
  41. for line in file:
  42. line = line.strip()
  43. # Skip header or comment lines (starting with '#' or empty lines)
  44. if not line or line.startswith("#"):
  45. print(f"Skipping invalid line: {line}")
  46. continue
  47. # Parse lines with theta and rho separated by spaces
  48. try:
  49. theta, rho = map(float, line.split())
  50. coordinates.append((theta, rho))
  51. except ValueError:
  52. print(f"Skipping invalid line: {line}")
  53. return coordinates
  54. def send_coordinate_batch(ser, coordinates):
  55. """Send a batch of theta-rho pairs to the Arduino."""
  56. # print("Sending batch:", coordinates)
  57. batch_str = ";".join(f"{theta:.3f},{rho:.3f}" for theta, rho in coordinates) + ";\n"
  58. ser.write(batch_str.encode())
  59. def send_command(command):
  60. """Send a single command to the Arduino."""
  61. ser.write(f"{command}\n".encode())
  62. print(f"Sent: {command}")
  63. # Wait for "DONE" acknowledgment from Arduino
  64. while True:
  65. if ser.in_waiting > 0:
  66. response = ser.readline().decode().strip()
  67. print(f"Arduino response: {response}")
  68. if response == "DONE":
  69. print("Command execution completed.")
  70. break
  71. time.sleep(0.5) # Small delay to avoid busy waiting
  72. def run_theta_rho_file(file_path):
  73. """Run a theta-rho file by sending data in optimized batches."""
  74. global stop_requested
  75. stop_requested = False
  76. coordinates = parse_theta_rho_file(file_path)
  77. if len(coordinates) < 2:
  78. print("Not enough coordinates for interpolation.")
  79. return
  80. # Optimize batch size for smoother execution
  81. batch_size = 10 # Smaller batches may smooth movement further
  82. for i in range(0, len(coordinates), batch_size):
  83. batch = coordinates[i:i + batch_size]
  84. # Wait until Arduino is READY before sending the batch
  85. while True:
  86. if ser.in_waiting > 0:
  87. response = ser.readline().decode().strip()
  88. if response == "READY":
  89. send_coordinate_batch(ser, batch)
  90. break
  91. else:
  92. print(f"Arduino response: {response}")
  93. # Check stop_requested flag after sending the batch
  94. if stop_requested:
  95. print("Execution stopped by user after completing the current batch.")
  96. break
  97. # Reset theta after execution or stopping
  98. reset_theta()
  99. ser.write("FINISHED\n".encode())
  100. def reset_theta():
  101. ser.write("RESET_THETA\n".encode())
  102. while True:
  103. if ser.in_waiting > 0:
  104. response = ser.readline().decode().strip()
  105. print(f"Arduino response: {response}")
  106. if response == "THETA_RESET":
  107. print("Theta successfully reset.")
  108. break
  109. time.sleep(0.5) # Small delay to avoid busy waiting
  110. def read_serial_responses():
  111. """Continuously read and print all responses from the Arduino."""
  112. global ser
  113. if ser is None or not ser.is_open:
  114. print("Serial connection not established.")
  115. return
  116. while True:
  117. try:
  118. if ser.in_waiting > 0:
  119. response = ser.readline().decode().strip()
  120. print(f"Arduino response: {response}")
  121. except Exception as e:
  122. print(f"Error reading from serial: {e}")
  123. break
  124. @app.route('/')
  125. def index():
  126. return render_template('index.html')
  127. @app.route('/list_serial_ports', methods=['GET'])
  128. def list_ports():
  129. return jsonify(list_serial_ports())
  130. @app.route('/connect_serial', methods=['POST'])
  131. def connect_serial():
  132. port = request.json.get('port')
  133. if not port:
  134. return jsonify({'error': 'No port provided'}), 400
  135. try:
  136. connect_to_serial(port)
  137. return jsonify({'success': True})
  138. except Exception as e:
  139. return jsonify({'error': str(e)}), 500
  140. @app.route('/disconnect_serial', methods=['POST'])
  141. def disconnect():
  142. try:
  143. disconnect_serial()
  144. return jsonify({'success': True})
  145. except Exception as e:
  146. return jsonify({'error': str(e)}), 500
  147. @app.route('/restart_serial', methods=['POST'])
  148. def restart():
  149. port = request.json.get('port')
  150. if not port:
  151. return jsonify({'error': 'No port provided'}), 400
  152. try:
  153. restart_serial(port)
  154. return jsonify({'success': True})
  155. except Exception as e:
  156. return jsonify({'error': str(e)}), 500
  157. @app.route('/list_theta_rho_files', methods=['GET'])
  158. def list_theta_rho_files():
  159. files = os.listdir(THETA_RHO_DIR)
  160. return jsonify(files)
  161. @app.route('/upload_theta_rho', methods=['POST'])
  162. def upload_theta_rho():
  163. file = request.files['file']
  164. if file:
  165. file.save(os.path.join(THETA_RHO_DIR, file.filename))
  166. return jsonify({'success': True})
  167. return jsonify({'success': False})
  168. @app.route('/run_theta_rho', methods=['POST'])
  169. def run_theta_rho():
  170. file_name = request.json.get('file_name')
  171. if not file_name:
  172. return jsonify({'error': 'No file name provided'}), 400
  173. file_path = os.path.join(THETA_RHO_DIR, file_name)
  174. if not os.path.exists(file_path):
  175. return jsonify({'error': 'File not found'}), 404
  176. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  177. return jsonify({'success': True})
  178. @app.route('/stop_execution', methods=['POST'])
  179. def stop_execution():
  180. global stop_requested
  181. stop_requested = True
  182. reset_theta()
  183. return jsonify({'success': True})
  184. @app.route('/send_home', methods=['POST'])
  185. def send_home():
  186. """Send the HOME command to the Arduino."""
  187. try:
  188. send_command("HOME")
  189. return jsonify({'success': True})
  190. except Exception as e:
  191. return jsonify({'error': str(e)}), 500
  192. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  193. def run_specific_theta_rho_file(file_name):
  194. """Run a specific theta-rho file."""
  195. file_path = os.path.join(THETA_RHO_DIR, file_name)
  196. if not os.path.exists(file_path):
  197. return jsonify({'error': 'File not found'}), 404
  198. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  199. return jsonify({'success': True})
  200. @app.route('/delete_theta_rho_file', methods=['POST'])
  201. def delete_theta_rho_file():
  202. data = request.json
  203. file_name = data.get('file_name')
  204. if not file_name:
  205. return jsonify({"success": False, "error": "No file name provided"}), 400
  206. file_path = os.path.join(THETA_RHO_DIR, file_name)
  207. if not os.path.exists(file_path):
  208. return jsonify({"success": False, "error": "File not found"}), 404
  209. try:
  210. os.remove(file_path)
  211. return jsonify({"success": True})
  212. except Exception as e:
  213. return jsonify({"success": False, "error": str(e)}), 500
  214. @app.route('/move_to_center', methods=['POST'])
  215. def move_to_center():
  216. """Move the sand table to the center position."""
  217. try:
  218. if ser is None or not ser.is_open:
  219. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  220. coordinates = [(0, 0)] # Center position
  221. send_coordinate_batch(ser, coordinates)
  222. return jsonify({"success": True})
  223. except Exception as e:
  224. return jsonify({"success": False, "error": str(e)}), 500
  225. @app.route('/move_to_perimeter', methods=['POST'])
  226. def move_to_perimeter():
  227. """Move the sand table to the perimeter position."""
  228. try:
  229. if ser is None or not ser.is_open:
  230. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  231. MAX_RHO = 1
  232. coordinates = [(0, MAX_RHO)] # Perimeter position
  233. send_coordinate_batch(ser, coordinates)
  234. return jsonify({"success": True})
  235. except Exception as e:
  236. return jsonify({"success": False, "error": str(e)}), 500
  237. # Expose files for download if needed
  238. @app.route('/download/<filename>', methods=['GET'])
  239. def download_file(filename):
  240. """Download a file from the theta-rho directory."""
  241. return send_from_directory(THETA_RHO_DIR, filename)
  242. if __name__ == '__main__':
  243. # Start the thread for reading Arduino responses
  244. threading.Thread(target=read_serial_responses, daemon=True).start()
  245. app.run(debug=True, host='0.0.0.0', port=8080)