theta_rho_app.py 8.7 KB


  1. from flask import Flask, request, jsonify, render_template
  2. import os
  3. import serial
  4. import time
  5. import threading
  6. import serial.tools.list_ports
  7. import math
  8. app = Flask(__name__)
  9. # Theta-rho directory
  10. THETA_RHO_DIR = './theta_rho_files'
  11. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  12. # Serial connection (default None, will be set by user)
  13. ser = None
  14. stop_requested = False
  15. def list_serial_ports():
  16. """Return a list of available serial ports."""
  17. ports = serial.tools.list_ports.comports()
  18. return [port.device for port in ports]
  19. def connect_to_serial(port, baudrate=115200):
  20. """Connect to the specified serial port."""
  21. global ser
  22. if ser and ser.is_open:
  23. ser.close()
  24. ser = serial.Serial(port, baudrate)
  25. time.sleep(2) # Allow time for the connection to establish
  26. def disconnect_serial():
  27. """Disconnect the current serial connection."""
  28. global ser
  29. if ser and ser.is_open:
  30. ser.close()
  31. ser = None
  32. def restart_serial(port, baudrate=115200):
  33. """Restart the serial connection."""
  34. disconnect_serial()
  35. connect_to_serial(port, baudrate)
  36. def parse_theta_rho_file(file_path):
  37. """Parse a theta-rho file and return a list of (theta, rho) pairs."""
  38. coordinates = []
  39. with open(file_path, 'r') as file:
  40. for line in file:
  41. line = line.strip()
  42. # Skip header or comment lines (starting with '#' or empty lines)
  43. if not line or line.startswith("#"):
  44. print(f"Skipping invalid line: {line}")
  45. continue
  46. # Parse lines with theta and rho separated by spaces
  47. try:
  48. theta, rho = map(float, line.split())
  49. coordinates.append((theta, rho))
  50. except ValueError:
  51. print(f"Skipping invalid line: {line}")
  52. return coordinates
  53. def send_coordinate_batch(ser, coordinates):
  54. """Send a batch of theta-rho pairs to the Arduino."""
  55. # print("Sending batch:", coordinates)
  56. batch_str = ";".join(f"{theta:.3f},{rho:.3f}" for theta, rho in coordinates) + ";\n"
  57. ser.write(batch_str.encode())
  58. def send_command(command):
  59. """Send a single command to the Arduino."""
  60. ser.write(f"{command}\n".encode())
  61. print(f"Sent: {command}")
  62. # Wait for "DONE" acknowledgment from Arduino
  63. while True:
  64. if ser.in_waiting > 0:
  65. response = ser.readline().decode().strip()
  66. print(f"Arduino response: {response}")
  67. if response == "DONE":
  68. print("Command execution completed.")
  69. break
  70. time.sleep(0.5) # Small delay to avoid busy waiting
  71. def run_theta_rho_file(file_path):
  72. """Run a theta-rho file by interpolating straight paths and sending data in optimized batches."""
  73. global stop_requested
  74. stop_requested = False
  75. coordinates = parse_theta_rho_file(file_path)
  76. if len(coordinates) < 2:
  77. print("Not enough coordinates for interpolation.")
  78. return
  79. # Optimize batch size for smoother execution
  80. batch_size = 10 # Smaller batches may smooth movement further
  81. for i in range(0, len(coordinates), batch_size):
  82. batch = coordinates[i:i + batch_size]
  83. # Wait until Arduino is READY before sending the batch
  84. while True:
  85. if ser.in_waiting > 0:
  86. response = ser.readline().decode().strip()
  87. if response == "READY":
  88. send_coordinate_batch(ser, batch)
  89. break
  90. else:
  91. print(f"Arduino response: {response}")
  92. # Check stop_requested flag after sending the batch
  93. if stop_requested:
  94. print("Execution stopped by user after completing the current batch.")
  95. break
  96. # Reset theta after execution or stopping
  97. reset_theta()
  98. def reset_theta():
  99. ser.write(b"RESET_THETA\n")
  100. while True:
  101. if ser.in_waiting > 0:
  102. response = ser.readline().decode().strip()
  103. print(f"Arduino response: {response}")
  104. if response == "THETA_RESET":
  105. print("Theta successfully reset.")
  106. break
  107. time.sleep(0.5) # Small delay to avoid busy waiting
  108. @app.route('/')
  109. def index():
  110. return render_template('theta_rho_controller.html')
  111. @app.route('/list_serial_ports', methods=['GET'])
  112. def list_ports():
  113. return jsonify(list_serial_ports())
  114. @app.route('/connect_serial', methods=['POST'])
  115. def connect_serial():
  116. port = request.json.get('port')
  117. if not port:
  118. return jsonify({'error': 'No port provided'}), 400
  119. try:
  120. connect_to_serial(port)
  121. return jsonify({'success': True})
  122. except Exception as e:
  123. return jsonify({'error': str(e)}), 500
  124. @app.route('/disconnect_serial', methods=['POST'])
  125. def disconnect():
  126. try:
  127. disconnect_serial()
  128. return jsonify({'success': True})
  129. except Exception as e:
  130. return jsonify({'error': str(e)}), 500
  131. @app.route('/restart_serial', methods=['POST'])
  132. def restart():
  133. port = request.json.get('port')
  134. if not port:
  135. return jsonify({'error': 'No port provided'}), 400
  136. try:
  137. restart_serial(port)
  138. return jsonify({'success': True})
  139. except Exception as e:
  140. return jsonify({'error': str(e)}), 500
  141. @app.route('/list_theta_rho_files', methods=['GET'])
  142. def list_theta_rho_files():
  143. files = os.listdir(THETA_RHO_DIR)
  144. # Exclude clear_from_in.thr and clear_from_out.thr and sort
  145. files = sorted(file for file in files if file not in ['clear_from_in.thr', 'clear_from_out.thr'])
  146. return jsonify(files)
  147. @app.route('/upload_theta_rho', methods=['POST'])
  148. def upload_theta_rho():
  149. file = request.files['file']
  150. if file:
  151. file.save(os.path.join(THETA_RHO_DIR, file.filename))
  152. return jsonify({'success': True})
  153. return jsonify({'success': False})
  154. @app.route('/run_theta_rho', methods=['POST'])
  155. def run_theta_rho():
  156. file_name = request.json.get('file_name')
  157. if not file_name:
  158. return jsonify({'error': 'No file name provided'}), 400
  159. file_path = os.path.join(THETA_RHO_DIR, file_name)
  160. if not os.path.exists(file_path):
  161. return jsonify({'error': 'File not found'}), 404
  162. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  163. return jsonify({'success': True})
  164. @app.route('/stop_execution', methods=['POST'])
  165. def stop_execution():
  166. global stop_requested
  167. stop_requested = True
  168. reset_theta()
  169. return jsonify({'success': True})
  170. @app.route('/send_home', methods=['POST'])
  171. def send_home():
  172. """Send the HOME command to the Arduino."""
  173. try:
  174. send_command("HOME")
  175. return jsonify({'success': True})
  176. except Exception as e:
  177. return jsonify({'error': str(e)}), 500
  178. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  179. def run_specific_theta_rho_file(file_name):
  180. """Run a specific theta-rho file."""
  181. file_path = os.path.join(THETA_RHO_DIR, file_name)
  182. if not os.path.exists(file_path):
  183. return jsonify({'error': 'File not found'}), 404
  184. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  185. return jsonify({'success': True})
  186. # Expose files for download if needed
  187. @app.route('/download/<filename>', methods=['GET'])
  188. def download_file(filename):
  189. """Download a file from the theta-rho directory."""
  190. return send_from_directory(THETA_RHO_DIR, filename)
  191. @app.route('/run_theta_rho_with_action', methods=['POST'])
  192. def run_theta_rho_with_action():
  193. """Run a theta-rho file with a pre-execution action if specified."""
  194. data = request.json
  195. file_name = data.get('file_name')
  196. action = data.get('action', 'none') # Default to 'none' if no action is specified
  197. if not file_name:
  198. return jsonify({'error': 'No file name provided'}), 400
  199. # Handle pre-execution action
  200. if action != 'none':
  201. action_file_path = os.path.join(THETA_RHO_DIR, f"{action}.thr")
  202. if not os.path.exists(action_file_path):
  203. return jsonify({'error': f"Action file {action}.thr not found"}), 404
  204. try:
  205. # Run the pre-execution file synchronously
  206. run_theta_rho_file(action_file_path)
  207. except Exception as e:
  208. return jsonify({'error': f"Failed to execute pre-action {action}: {str(e)}"}), 500
  209. # Run the main Theta-Rho file
  210. file_path = os.path.join(THETA_RHO_DIR, file_name)
  211. if not os.path.exists(file_path):
  212. return jsonify({'error': 'File not found'}), 404
  213. try:
  214. # Start a thread to run the main file asynchronously
  215. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  216. return jsonify({'success': True})
  217. except Exception as e:
  218. return jsonify({'error': f"Failed to execute file {file_name}: {str(e)}"}), 500
  219. if __name__ == '__main__':
  220. app.run(debug=True, host='0.0.0.0', port=8080)