app.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411
  1. from flask import Flask, request, jsonify, render_template
  2. import os
  3. import serial
  4. import time
  5. import threading
  6. import serial.tools.list_ports
  7. import math
  8. app = Flask(__name__)
  9. # Configuration
  10. THETA_RHO_DIR = './patterns'
  11. IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
  12. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  13. # Serial connection (First available will be selected by default)
  14. ser = None
  15. ser_port = None # Global variable to store the serial port name
  16. stop_requested = False
  17. serial_lock = threading.Lock()
  18. def list_serial_ports():
  19. """Return a list of available serial ports."""
  20. ports = serial.tools.list_ports.comports()
  21. return [port.device for port in ports if port.device not in IGNORE_PORTS]
  22. def connect_to_serial(port=None, baudrate=115200):
  23. """Automatically connect to the first available serial port or a specified port."""
  24. global ser, ser_port
  25. try:
  26. if port is None:
  27. ports = list_serial_ports()
  28. if not ports:
  29. print("No serial port connected")
  30. return False
  31. port = ports[0] # Auto-select the first available port
  32. with serial_lock:
  33. if ser and ser.is_open:
  34. ser.close()
  35. ser = serial.Serial(port, baudrate)
  36. ser_port = port # Store the connected port globally
  37. print(f"Connected to serial port: {port}")
  38. time.sleep(2) # Allow time for the connection to establish
  39. return True # Successfully connected
  40. except serial.SerialException as e:
  41. print(f"Failed to connect to serial port {port}: {e}")
  42. port = None # Reset the port to try the next available one
  43. print("Max retries reached. Could not connect to a serial port.")
  44. return False
  45. def disconnect_serial():
  46. """Disconnect the current serial connection."""
  47. global ser, ser_port
  48. if ser and ser.is_open:
  49. ser.close()
  50. ser = None
  51. ser_port = None # Reset the port name
  52. def restart_serial(port, baudrate=115200):
  53. """Restart the serial connection."""
  54. disconnect_serial()
  55. connect_to_serial(port, baudrate)
  56. def parse_theta_rho_file(file_path):
  57. """
  58. Parse a theta-rho file and return a list of (theta, rho) pairs.
  59. Optionally apply transformations (rotation and mirroring).
  60. """
  61. coordinates = []
  62. try:
  63. with open(file_path, 'r') as file:
  64. for line in file:
  65. line = line.strip()
  66. # Skip header or comment lines (starting with '#' or empty lines)
  67. if not line or line.startswith("#"):
  68. continue
  69. # Parse lines with theta and rho separated by spaces
  70. try:
  71. theta, rho = map(float, line.split())
  72. coordinates.append((theta, rho))
  73. except ValueError:
  74. print(f"Skipping invalid line: {line}")
  75. except Exception as e:
  76. print(f"Error reading file: {e}")
  77. return coordinates
  78. def send_coordinate_batch(ser, coordinates):
  79. """Send a batch of theta-rho pairs to the Arduino."""
  80. # print("Sending batch:", coordinates)
  81. batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
  82. ser.write(batch_str.encode())
  83. def send_command(command):
  84. """Send a single command to the Arduino."""
  85. ser.write(f"{command}\n".encode())
  86. print(f"Sent: {command}")
  87. # Wait for "DONE" acknowledgment from Arduino
  88. while True:
  89. with serial_lock:
  90. if ser.in_waiting > 0:
  91. response = ser.readline().decode().strip()
  92. print(f"Arduino response: {response}")
  93. if response == "DONE":
  94. print("Command execution completed.")
  95. break
  96. def run_theta_rho_file(file_path):
  97. """Run a theta-rho file by sending data in optimized batches."""
  98. global stop_requested
  99. stop_requested = False
  100. coordinates = parse_theta_rho_file(file_path)
  101. if len(coordinates) < 2:
  102. print("Not enough coordinates for interpolation.")
  103. return
  104. # Optimize batch size for smoother execution
  105. batch_size = 10 # Smaller batches may smooth movement further
  106. for i in range(0, len(coordinates), batch_size):
  107. # Check stop_requested flag after sending the batch
  108. if stop_requested:
  109. print("Execution stopped by user after completing the current batch.")
  110. break
  111. batch = coordinates[i:i + batch_size]
  112. if i == 0:
  113. send_coordinate_batch(ser, batch)
  114. continue
  115. # Wait until Arduino is READY before sending the batch
  116. while True:
  117. with serial_lock:
  118. if ser.in_waiting > 0:
  119. response = ser.readline().decode().strip()
  120. if response == "R":
  121. send_coordinate_batch(ser, batch)
  122. break
  123. else:
  124. print(f"Arduino response: {response}")
  125. # Reset theta after execution or stopping
  126. reset_theta()
  127. ser.write("FINISHED\n".encode())
  128. def reset_theta():
  129. """Reset theta on the Arduino."""
  130. ser.write("RESET_THETA\n".encode())
  131. while True:
  132. with serial_lock:
  133. if ser.in_waiting > 0:
  134. response = ser.readline().decode().strip()
  135. print(f"Arduino response: {response}")
  136. if response == "THETA_RESET":
  137. print("Theta successfully reset.")
  138. break
  139. time.sleep(0.5) # Small delay to avoid busy waiting
  140. # Flask API Endpoints
  141. @app.route('/')
  142. def index():
  143. return render_template('index.html')
  144. @app.route('/list_serial_ports', methods=['GET'])
  145. def list_ports():
  146. return jsonify(list_serial_ports())
  147. @app.route('/connect_serial', methods=['POST'])
  148. def connect_serial():
  149. port = request.json.get('port')
  150. if not port:
  151. return jsonify({'error': 'No port provided'}), 400
  152. try:
  153. connect_to_serial(port)
  154. return jsonify({'success': True})
  155. except Exception as e:
  156. return jsonify({'error': str(e)}), 500
  157. @app.route('/disconnect_serial', methods=['POST'])
  158. def disconnect():
  159. try:
  160. disconnect_serial()
  161. return jsonify({'success': True})
  162. except Exception as e:
  163. return jsonify({'error': str(e)}), 500
  164. @app.route('/restart_serial', methods=['POST'])
  165. def restart():
  166. port = request.json.get('port')
  167. if not port:
  168. return jsonify({'error': 'No port provided'}), 400
  169. try:
  170. restart_serial(port)
  171. return jsonify({'success': True})
  172. except Exception as e:
  173. return jsonify({'error': str(e)}), 500
  174. @app.route('/list_theta_rho_files', methods=['GET'])
  175. def list_theta_rho_files():
  176. files = []
  177. for root, _, filenames in os.walk(THETA_RHO_DIR):
  178. for file in filenames:
  179. # Construct the relative file path
  180. relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
  181. files.append(relative_path)
  182. return jsonify(sorted(files))
  183. @app.route('/upload_theta_rho', methods=['POST'])
  184. def upload_theta_rho():
  185. custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
  186. os.makedirs(custom_patterns_dir, exist_ok=True) # Ensure the directory exists
  187. file = request.files['file']
  188. if file:
  189. file.save(os.path.join(custom_patterns_dir, file.filename))
  190. return jsonify({'success': True})
  191. return jsonify({'success': False})
  192. @app.route('/run_theta_rho', methods=['POST'])
  193. def run_theta_rho():
  194. file_name = request.json.get('file_name')
  195. pre_execution = request.json.get('pre_execution') # New parameter for pre-execution action
  196. if not file_name:
  197. return jsonify({'error': 'No file name provided'}), 400
  198. file_path = os.path.join(THETA_RHO_DIR, file_name)
  199. if not os.path.exists(file_path):
  200. return jsonify({'error': 'File not found'}), 404
  201. try:
  202. # Handle pre-execution actions
  203. if pre_execution == 'clear_in':
  204. clear_in_thread = threading.Thread(target=run_theta_rho_file, args=('./patterns/clear_from_in.thr',))
  205. clear_in_thread.start()
  206. clear_in_thread.join() # Wait for completion before proceeding
  207. elif pre_execution == 'clear_out':
  208. clear_out_thread = threading.Thread(target=run_theta_rho_file, args=('./patterns/clear_from_out.thr',))
  209. clear_out_thread.start()
  210. clear_out_thread.join() # Wait for completion before proceeding
  211. elif pre_execution == 'none':
  212. pass # No pre-execution action required
  213. # Start the main pattern execution
  214. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  215. return jsonify({'success': True})
  216. except Exception as e:
  217. return jsonify({'error': str(e)}), 500
  218. @app.route('/stop_execution', methods=['POST'])
  219. def stop_execution():
  220. global stop_requested
  221. stop_requested = True
  222. return jsonify({'success': True})
  223. @app.route('/send_home', methods=['POST'])
  224. def send_home():
  225. """Send the HOME command to the Arduino."""
  226. try:
  227. send_command("HOME")
  228. return jsonify({'success': True})
  229. except Exception as e:
  230. return jsonify({'error': str(e)}), 500
  231. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  232. def run_specific_theta_rho_file(file_name):
  233. """Run a specific theta-rho file."""
  234. file_path = os.path.join(THETA_RHO_DIR, file_name)
  235. if not os.path.exists(file_path):
  236. return jsonify({'error': 'File not found'}), 404
  237. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  238. return jsonify({'success': True})
  239. @app.route('/delete_theta_rho_file', methods=['POST'])
  240. def delete_theta_rho_file():
  241. data = request.json
  242. file_name = data.get('file_name')
  243. if not file_name:
  244. return jsonify({"success": False, "error": "No file name provided"}), 400
  245. file_path = os.path.join(THETA_RHO_DIR, file_name)
  246. if not os.path.exists(file_path):
  247. return jsonify({"success": False, "error": "File not found"}), 404
  248. try:
  249. os.remove(file_path)
  250. return jsonify({"success": True})
  251. except Exception as e:
  252. return jsonify({"success": False, "error": str(e)}), 500
  253. @app.route('/move_to_center', methods=['POST'])
  254. def move_to_center():
  255. """Move the sand table to the center position."""
  256. try:
  257. if ser is None or not ser.is_open:
  258. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  259. coordinates = [(0, 0)] # Center position
  260. send_coordinate_batch(ser, coordinates)
  261. return jsonify({"success": True})
  262. except Exception as e:
  263. return jsonify({"success": False, "error": str(e)}), 500
  264. @app.route('/move_to_perimeter', methods=['POST'])
  265. def move_to_perimeter():
  266. """Move the sand table to the perimeter position."""
  267. try:
  268. if ser is None or not ser.is_open:
  269. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  270. MAX_RHO = 1
  271. coordinates = [(0, MAX_RHO)] # Perimeter position
  272. send_coordinate_batch(ser, coordinates)
  273. return jsonify({"success": True})
  274. except Exception as e:
  275. return jsonify({"success": False, "error": str(e)}), 500
  276. @app.route('/preview_thr', methods=['POST'])
  277. def preview_thr():
  278. file_name = request.json.get('file_name')
  279. if not file_name:
  280. return jsonify({'error': 'No file name provided'}), 400
  281. file_path = os.path.join(THETA_RHO_DIR, file_name)
  282. if not os.path.exists(file_path):
  283. return jsonify({'error': 'File not found'}), 404
  284. try:
  285. # Parse the .thr file with transformations
  286. coordinates = parse_theta_rho_file(file_path)
  287. return jsonify({'success': True, 'coordinates': coordinates})
  288. except Exception as e:
  289. return jsonify({'error': str(e)}), 500
  290. @app.route('/send_coordinate', methods=['POST'])
  291. def send_coordinate():
  292. """Send a single (theta, rho) coordinate to the Arduino."""
  293. global ser
  294. if ser is None or not ser.is_open:
  295. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  296. try:
  297. data = request.json
  298. theta = data.get('theta')
  299. rho = data.get('rho')
  300. if theta is None or rho is None:
  301. return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
  302. # Send the coordinate to the Arduino
  303. send_coordinate_batch(ser, [(theta, rho)])
  304. reset_theta()
  305. return jsonify({"success": True})
  306. except Exception as e:
  307. return jsonify({"success": False, "error": str(e)}), 500
  308. # Expose files for download if needed
  309. @app.route('/download/<filename>', methods=['GET'])
  310. def download_file(filename):
  311. """Download a file from the theta-rho directory."""
  312. return send_from_directory(THETA_RHO_DIR, filename)
  313. @app.route('/serial_status', methods=['GET'])
  314. def serial_status():
  315. global ser, ser_port
  316. return jsonify({
  317. 'connected': ser.is_open if ser else False,
  318. 'port': ser_port # Include the port name
  319. })
  320. @app.route('/set_speed', methods=['POST'])
  321. def set_speed():
  322. """Set the speed for the Arduino."""
  323. global ser
  324. if ser is None or not ser.is_open:
  325. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  326. try:
  327. # Parse the speed value from the request
  328. data = request.json
  329. speed = data.get('speed')
  330. if speed is None:
  331. return jsonify({"success": False, "error": "Speed is required"}), 400
  332. if not isinstance(speed, (int, float)) or speed <= 0:
  333. return jsonify({"success": False, "error": "Invalid speed value"}), 400
  334. # Send the SET_SPEED command to the Arduino
  335. command = f"SET_SPEED {speed}"
  336. send_command(command)
  337. return jsonify({"success": True, "speed": speed})
  338. except Exception as e:
  339. return jsonify({"success": False, "error": str(e)}), 500
  340. if __name__ == '__main__':
  341. # Auto-connect to serial
  342. connect_to_serial()
  343. # Start the Flask app
  344. app.run(debug=True, host='0.0.0.0', port=8080)