app.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. from flask import Flask, request, jsonify, render_template
  2. import os
  3. import serial
  4. import time
  5. import threading
  6. import serial.tools.list_ports
  7. import math
  8. app = Flask(__name__)
  9. # Theta-rho directory
  10. THETA_RHO_DIR = './patterns'
  11. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  12. # Serial connection (default None, will be set by user)
  13. ser = None
  14. stop_requested = False
  15. def list_serial_ports():
  16. """Return a list of available serial ports."""
  17. ports = serial.tools.list_ports.comports()
  18. return [port.device for port in ports]
  19. def connect_to_serial(port, baudrate=115200):
  20. """Connect to the specified serial port."""
  21. global ser
  22. if ser and ser.is_open:
  23. ser.close()
  24. ser = serial.Serial(port, baudrate)
  25. time.sleep(2) # Allow time for the connection to establish
  26. def disconnect_serial():
  27. """Disconnect the current serial connection."""
  28. global ser
  29. if ser and ser.is_open:
  30. ser.close()
  31. ser = None
  32. def restart_serial(port, baudrate=115200):
  33. """Restart the serial connection."""
  34. disconnect_serial()
  35. connect_to_serial(port, baudrate)
  36. def parse_theta_rho_file(file_path):
  37. """Parse a theta-rho file and return a list of (theta, rho) pairs."""
  38. coordinates = []
  39. with open(file_path, 'r') as file:
  40. for line in file:
  41. line = line.strip()
  42. # Skip header or comment lines (starting with '#' or empty lines)
  43. if not line or line.startswith("#"):
  44. print(f"Skipping invalid line: {line}")
  45. continue
  46. # Parse lines with theta and rho separated by spaces
  47. try:
  48. theta, rho = map(float, line.split())
  49. coordinates.append((theta, rho))
  50. except ValueError:
  51. print(f"Skipping invalid line: {line}")
  52. return coordinates
  53. def send_coordinate_batch(ser, coordinates):
  54. """Send a batch of theta-rho pairs to the Arduino."""
  55. # print("Sending batch:", coordinates)
  56. batch_str = ";".join(f"{theta:.3f},{rho:.3f}" for theta, rho in coordinates) + ";\n"
  57. ser.write(batch_str.encode())
  58. def send_command(command):
  59. """Send a single command to the Arduino."""
  60. ser.write(f"{command}\n".encode())
  61. print(f"Sent: {command}")
  62. # Wait for "DONE" acknowledgment from Arduino
  63. while True:
  64. if ser.in_waiting > 0:
  65. response = ser.readline().decode().strip()
  66. print(f"Arduino response: {response}")
  67. if response == "DONE":
  68. print("Command execution completed.")
  69. break
  70. time.sleep(0.5) # Small delay to avoid busy waiting
  71. def run_theta_rho_file(file_path):
  72. """Run a theta-rho file by interpolating straight paths and sending data in optimized batches."""
  73. global stop_requested
  74. stop_requested = False
  75. coordinates = parse_theta_rho_file(file_path)
  76. if len(coordinates) < 2:
  77. print("Not enough coordinates for interpolation.")
  78. return
  79. # Optimize batch size for smoother execution
  80. batch_size = 10 # Smaller batches may smooth movement further
  81. for i in range(0, len(coordinates), batch_size):
  82. batch = coordinates[i:i + batch_size]
  83. # Wait until Arduino is READY before sending the batch
  84. while True:
  85. if ser.in_waiting > 0:
  86. response = ser.readline().decode().strip()
  87. if response == "READY":
  88. send_coordinate_batch(ser, batch)
  89. break
  90. else:
  91. print(f"Arduino response: {response}")
  92. # Check stop_requested flag after sending the batch
  93. if stop_requested:
  94. print("Execution stopped by user after completing the current batch.")
  95. break
  96. # Reset theta after execution or stopping
  97. reset_theta()
  98. def reset_theta():
  99. ser.write(b"RESET_THETA\n")
  100. while True:
  101. if ser.in_waiting > 0:
  102. response = ser.readline().decode().strip()
  103. print(f"Arduino response: {response}")
  104. if response == "THETA_RESET":
  105. print("Theta successfully reset.")
  106. break
  107. time.sleep(0.5) # Small delay to avoid busy waiting
  108. @app.route('/')
  109. def index():
  110. return render_template('index.html')
  111. @app.route('/list_serial_ports', methods=['GET'])
  112. def list_ports():
  113. return jsonify(list_serial_ports())
  114. @app.route('/connect_serial', methods=['POST'])
  115. def connect_serial():
  116. port = request.json.get('port')
  117. if not port:
  118. return jsonify({'error': 'No port provided'}), 400
  119. try:
  120. connect_to_serial(port)
  121. return jsonify({'success': True})
  122. except Exception as e:
  123. return jsonify({'error': str(e)}), 500
  124. @app.route('/disconnect_serial', methods=['POST'])
  125. def disconnect():
  126. try:
  127. disconnect_serial()
  128. return jsonify({'success': True})
  129. except Exception as e:
  130. return jsonify({'error': str(e)}), 500
  131. @app.route('/restart_serial', methods=['POST'])
  132. def restart():
  133. port = request.json.get('port')
  134. if not port:
  135. return jsonify({'error': 'No port provided'}), 400
  136. try:
  137. restart_serial(port)
  138. return jsonify({'success': True})
  139. except Exception as e:
  140. return jsonify({'error': str(e)}), 500
  141. @app.route('/list_theta_rho_files', methods=['GET'])
  142. def list_theta_rho_files():
  143. files = os.listdir(THETA_RHO_DIR)
  144. return jsonify(files)
  145. @app.route('/upload_theta_rho', methods=['POST'])
  146. def upload_theta_rho():
  147. file = request.files['file']
  148. if file:
  149. file.save(os.path.join(THETA_RHO_DIR, file.filename))
  150. return jsonify({'success': True})
  151. return jsonify({'success': False})
  152. @app.route('/run_theta_rho', methods=['POST'])
  153. def run_theta_rho():
  154. file_name = request.json.get('file_name')
  155. if not file_name:
  156. return jsonify({'error': 'No file name provided'}), 400
  157. file_path = os.path.join(THETA_RHO_DIR, file_name)
  158. if not os.path.exists(file_path):
  159. return jsonify({'error': 'File not found'}), 404
  160. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  161. return jsonify({'success': True})
  162. @app.route('/stop_execution', methods=['POST'])
  163. def stop_execution():
  164. global stop_requested
  165. stop_requested = True
  166. reset_theta()
  167. return jsonify({'success': True})
  168. @app.route('/send_home', methods=['POST'])
  169. def send_home():
  170. """Send the HOME command to the Arduino."""
  171. try:
  172. send_command("HOME")
  173. return jsonify({'success': True})
  174. except Exception as e:
  175. return jsonify({'error': str(e)}), 500
  176. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  177. def run_specific_theta_rho_file(file_name):
  178. """Run a specific theta-rho file."""
  179. file_path = os.path.join(THETA_RHO_DIR, file_name)
  180. if not os.path.exists(file_path):
  181. return jsonify({'error': 'File not found'}), 404
  182. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  183. return jsonify({'success': True})
  184. @app.route('/delete_theta_rho_file', methods=['POST'])
  185. def delete_theta_rho_file():
  186. data = request.json
  187. file_name = data.get('file_name')
  188. if not file_name:
  189. return jsonify({"success": False, "error": "No file name provided"}), 400
  190. file_path = os.path.join(THETA_RHO_DIR, file_name)
  191. if not os.path.exists(file_path):
  192. return jsonify({"success": False, "error": "File not found"}), 404
  193. try:
  194. os.remove(file_path)
  195. return jsonify({"success": True})
  196. except Exception as e:
  197. return jsonify({"success": False, "error": str(e)}), 500
  198. @app.route('/move_to_center', methods=['POST'])
  199. def move_to_center():
  200. """Move the sand table to the center position."""
  201. try:
  202. if ser is None or not ser.is_open:
  203. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  204. coordinates = [(0, 0)] # Center position
  205. send_coordinate_batch(ser, coordinates)
  206. return jsonify({"success": True})
  207. except Exception as e:
  208. return jsonify({"success": False, "error": str(e)}), 500
  209. @app.route('/move_to_perimeter', methods=['POST'])
  210. def move_to_perimeter():
  211. """Move the sand table to the perimeter position."""
  212. try:
  213. if ser is None or not ser.is_open:
  214. return jsonify({"success": False, "error": "Serial connection not established"}), 400
  215. MAX_RHO = 1
  216. coordinates = [(0, MAX_RHO)] # Perimeter position
  217. send_coordinate_batch(ser, coordinates)
  218. return jsonify({"success": True})
  219. except Exception as e:
  220. return jsonify({"success": False, "error": str(e)}), 500
  221. # Expose files for download if needed
  222. @app.route('/download/<filename>', methods=['GET'])
  223. def download_file(filename):
  224. """Download a file from the theta-rho directory."""
  225. return send_from_directory(THETA_RHO_DIR, filename)
  226. if __name__ == '__main__':
  227. app.run(debug=True, host='0.0.0.0', port=8080)