theta_rho_app.py 7.1 KB


  1. from flask import Flask, request, jsonify, render_template
  2. import os
  3. import serial
  4. import time
  5. import threading
  6. import serial.tools.list_ports
  7. import math
  8. app = Flask(__name__)
  9. # Theta-rho directory
  10. THETA_RHO_DIR = './theta_rho_files'
  11. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  12. # Serial connection (default None, will be set by user)
  13. ser = None
  14. stop_requested = False
  15. def list_serial_ports():
  16. """Return a list of available serial ports."""
  17. ports = serial.tools.list_ports.comports()
  18. return [port.device for port in ports]
  19. def connect_to_serial(port, baudrate=115200):
  20. """Connect to the specified serial port."""
  21. global ser
  22. if ser and ser.is_open:
  23. ser.close()
  24. ser = serial.Serial(port, baudrate)
  25. time.sleep(2) # Allow time for the connection to establish
  26. def disconnect_serial():
  27. """Disconnect the current serial connection."""
  28. global ser
  29. if ser and ser.is_open:
  30. ser.close()
  31. ser = None
  32. def restart_serial(port, baudrate=115200):
  33. """Restart the serial connection."""
  34. disconnect_serial()
  35. connect_to_serial(port, baudrate)
  36. def parse_theta_rho_file(file_path):
  37. """Parse a theta-rho file and return a list of (theta, rho) pairs."""
  38. coordinates = []
  39. with open(file_path, 'r') as file:
  40. for line in file:
  41. line = line.strip()
  42. # Skip header or comment lines (starting with '#' or empty lines)
  43. if not line or line.startswith("#"):
  44. print(f"Skipping invalid line: {line}")
  45. continue
  46. # Parse lines with theta and rho separated by spaces
  47. try:
  48. theta, rho = map(float, line.split())
  49. coordinates.append((theta, rho))
  50. except ValueError:
  51. print(f"Skipping invalid line: {line}")
  52. return coordinates
  53. def send_coordinate_batch(ser, coordinates):
  54. """Send a batch of theta-rho pairs to the Arduino."""
  55. print("Sending batch:", coordinates)
  56. batch_str = ";".join(f"{theta:.3f},{rho:.3f}" for theta, rho in coordinates) + ";\n"
  57. ser.write(batch_str.encode())
  58. def send_command(command):
  59. """Send a single command to the Arduino."""
  60. ser.write(f"{command}\n".encode())
  61. print(f"Sent: {command}")
  62. # Wait for "DONE" acknowledgment from Arduino
  63. while True:
  64. if ser.in_waiting > 0:
  65. response = ser.readline().decode().strip()
  66. print(f"Arduino response: {response}")
  67. if response == "DONE":
  68. print("Command execution completed.")
  69. break
  70. time.sleep(0.5) # Small delay to avoid busy waiting
  71. def run_theta_rho_file(file_path):
  72. """Run a theta-rho file by interpolating straight paths and sending data in optimized batches."""
  73. global stop_requested
  74. stop_requested = False
  75. coordinates = parse_theta_rho_file(file_path)
  76. if len(coordinates) < 2:
  77. print("Not enough coordinates for interpolation.")
  78. return
  79. # Optimize batch size for smoother execution
  80. batch_size = 10 # Smaller batches may smooth movement further
  81. for i in range(0, len(coordinates), batch_size):
  82. if stop_requested:
  83. print("Execution stopped by user.")
  84. break
  85. batch = coordinates[i:i + batch_size]
  86. while True:
  87. if ser.in_waiting > 0:
  88. response = ser.readline().decode().strip()
  89. print(f"Arduino response: {response}")
  90. if response == "READY":
  91. send_coordinate_batch(ser, batch)
  92. break
  93. reset_theta()
  94. def reset_theta():
  95. ser.write(b"RESET_THETA\n")
  96. while True:
  97. if ser.in_waiting > 0:
  98. response = ser.readline().decode().strip()
  99. print(f"Arduino response: {response}")
  100. if response == "THETA_RESET":
  101. print("Theta successfully reset.")
  102. break
  103. else:
  104. print("No response or unexpected response:", response)
  105. time.sleep(0.5) # Small delay to avoid busy waiting
  106. @app.route('/')
  107. def index():
  108. return render_template('theta_rho_controller.html')
  109. @app.route('/list_serial_ports', methods=['GET'])
  110. def list_ports():
  111. return jsonify(list_serial_ports())
  112. @app.route('/connect_serial', methods=['POST'])
  113. def connect_serial():
  114. port = request.json.get('port')
  115. if not port:
  116. return jsonify({'error': 'No port provided'}), 400
  117. try:
  118. connect_to_serial(port)
  119. return jsonify({'success': True})
  120. except Exception as e:
  121. return jsonify({'error': str(e)}), 500
  122. @app.route('/disconnect_serial', methods=['POST'])
  123. def disconnect():
  124. try:
  125. disconnect_serial()
  126. return jsonify({'success': True})
  127. except Exception as e:
  128. return jsonify({'error': str(e)}), 500
  129. @app.route('/restart_serial', methods=['POST'])
  130. def restart():
  131. port = request.json.get('port')
  132. if not port:
  133. return jsonify({'error': 'No port provided'}), 400
  134. try:
  135. restart_serial(port)
  136. return jsonify({'success': True})
  137. except Exception as e:
  138. return jsonify({'error': str(e)}), 500
  139. @app.route('/list_theta_rho_files', methods=['GET'])
  140. def list_theta_rho_files():
  141. files = os.listdir(THETA_RHO_DIR)
  142. return jsonify(files)
  143. @app.route('/upload_theta_rho', methods=['POST'])
  144. def upload_theta_rho():
  145. file = request.files['file']
  146. if file:
  147. file.save(os.path.join(THETA_RHO_DIR, file.filename))
  148. return jsonify({'success': True})
  149. return jsonify({'success': False})
  150. @app.route('/run_theta_rho', methods=['POST'])
  151. def run_theta_rho():
  152. file_name = request.json.get('file_name')
  153. if not file_name:
  154. return jsonify({'error': 'No file name provided'}), 400
  155. file_path = os.path.join(THETA_RHO_DIR, file_name)
  156. if not os.path.exists(file_path):
  157. return jsonify({'error': 'File not found'}), 404
  158. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  159. return jsonify({'success': True})
  160. @app.route('/stop_execution', methods=['POST'])
  161. def stop_execution():
  162. global stop_requested
  163. stop_requested = True
  164. reset_theta()
  165. return jsonify({'success': True})
  166. @app.route('/send_home', methods=['POST'])
  167. def send_home():
  168. """Send the HOME command to the Arduino."""
  169. try:
  170. send_command("HOME")
  171. return jsonify({'success': True})
  172. except Exception as e:
  173. return jsonify({'error': str(e)}), 500
  174. @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
  175. def run_specific_theta_rho_file(file_name):
  176. """Run a specific theta-rho file."""
  177. file_path = os.path.join(THETA_RHO_DIR, file_name)
  178. if not os.path.exists(file_path):
  179. return jsonify({'error': 'File not found'}), 404
  180. threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
  181. return jsonify({'success': True})
  182. # Expose files for download if needed
  183. @app.route('/download/<filename>', methods=['GET'])
  184. def download_file(filename):
  185. """Download a file from the theta-rho directory."""
  186. return send_from_directory(THETA_RHO_DIR, filename)
  187. if __name__ == '__main__':
  188. app.run(debug=True, host='0.0.0.0', port=8080)