| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- from flask import Flask, request, jsonify, render_template
- import os
- import serial
- import time
- import threading
- import serial.tools.list_ports
- import math
- app = Flask(__name__)
- # Theta-rho directory
- THETA_RHO_DIR = './theta_rho_files'
- os.makedirs(THETA_RHO_DIR, exist_ok=True)
- # Serial connection (default None, will be set by user)
- ser = None
- stop_requested = False
- def list_serial_ports():
- """Return a list of available serial ports."""
- ports = serial.tools.list_ports.comports()
- return [port.device for port in ports]
- def connect_to_serial(port, baudrate=115200):
- """Connect to the specified serial port."""
- global ser
- if ser and ser.is_open:
- ser.close()
- ser = serial.Serial(port, baudrate)
- time.sleep(2) # Allow time for the connection to establish
- def disconnect_serial():
- """Disconnect the current serial connection."""
- global ser
- if ser and ser.is_open:
- ser.close()
- ser = None
- def restart_serial(port, baudrate=115200):
- """Restart the serial connection."""
- disconnect_serial()
- connect_to_serial(port, baudrate)
- def parse_theta_rho_file(file_path):
- """Parse a theta-rho file and return a list of (theta, rho) pairs."""
- coordinates = []
- with open(file_path, 'r') as file:
- for line in file:
- line = line.strip()
-
- # Skip header or comment lines (starting with '#' or empty lines)
- if not line or line.startswith("#"):
- print(f"Skipping invalid line: {line}")
- continue
- # Parse lines with theta and rho separated by spaces
- try:
- theta, rho = map(float, line.split())
- coordinates.append((theta, rho))
- except ValueError:
- print(f"Skipping invalid line: {line}")
- return coordinates
- def send_coordinate_batch(ser, coordinates):
- """Send a batch of theta-rho pairs to the Arduino."""
- # print("Sending batch:", coordinates)
- batch_str = ";".join(f"{theta:.3f},{rho:.3f}" for theta, rho in coordinates) + ";\n"
- ser.write(batch_str.encode())
- def send_command(command):
- """Send a single command to the Arduino."""
- ser.write(f"{command}\n".encode())
- print(f"Sent: {command}")
-
- # Wait for "DONE" acknowledgment from Arduino
- while True:
- if ser.in_waiting > 0:
- response = ser.readline().decode().strip()
- print(f"Arduino response: {response}")
- if response == "DONE":
- print("Command execution completed.")
- break
- time.sleep(0.5) # Small delay to avoid busy waiting
- def run_theta_rho_file(file_path):
- """Run a theta-rho file by interpolating straight paths and sending data in optimized batches."""
- global stop_requested
- stop_requested = False
- coordinates = parse_theta_rho_file(file_path)
- if len(coordinates) < 2:
- print("Not enough coordinates for interpolation.")
- return
- # Optimize batch size for smoother execution
- batch_size = 10 # Smaller batches may smooth movement further
- for i in range(0, len(coordinates), batch_size):
- batch = coordinates[i:i + batch_size]
- # Wait until Arduino is READY before sending the batch
- while True:
- if ser.in_waiting > 0:
- response = ser.readline().decode().strip()
- if response == "READY":
- send_coordinate_batch(ser, batch)
- break
- else:
- print(f"Arduino response: {response}")
-
- # Check stop_requested flag after sending the batch
- if stop_requested:
- print("Execution stopped by user after completing the current batch.")
- break
- # Reset theta after execution or stopping
- reset_theta()
-
- def reset_theta():
- ser.write(b"RESET_THETA\n")
- while True:
- if ser.in_waiting > 0:
- response = ser.readline().decode().strip()
- print(f"Arduino response: {response}")
- if response == "THETA_RESET":
- print("Theta successfully reset.")
- break
- time.sleep(0.5) # Small delay to avoid busy waiting
- @app.route('/')
- def index():
- return render_template('theta_rho_controller.html')
- @app.route('/list_serial_ports', methods=['GET'])
- def list_ports():
- return jsonify(list_serial_ports())
- @app.route('/connect_serial', methods=['POST'])
- def connect_serial():
- port = request.json.get('port')
- if not port:
- return jsonify({'error': 'No port provided'}), 400
- try:
- connect_to_serial(port)
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- @app.route('/disconnect_serial', methods=['POST'])
- def disconnect():
- try:
- disconnect_serial()
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- @app.route('/restart_serial', methods=['POST'])
- def restart():
- port = request.json.get('port')
- if not port:
- return jsonify({'error': 'No port provided'}), 400
- try:
- restart_serial(port)
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- @app.route('/list_theta_rho_files', methods=['GET'])
- def list_theta_rho_files():
- files = os.listdir(THETA_RHO_DIR)
- # Exclude clear_from_in.thr and clear_from_out.thr and sort
- files = sorted(file for file in files if file not in ['clear_from_in.thr', 'clear_from_out.thr'])
- return jsonify(files)
- @app.route('/upload_theta_rho', methods=['POST'])
- def upload_theta_rho():
- file = request.files['file']
- if file:
- file.save(os.path.join(THETA_RHO_DIR, file.filename))
- return jsonify({'success': True})
- return jsonify({'success': False})
- @app.route('/run_theta_rho', methods=['POST'])
- def run_theta_rho():
- file_name = request.json.get('file_name')
- if not file_name:
- return jsonify({'error': 'No file name provided'}), 400
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- return jsonify({'error': 'File not found'}), 404
- threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
- return jsonify({'success': True})
- @app.route('/stop_execution', methods=['POST'])
- def stop_execution():
- global stop_requested
- stop_requested = True
- reset_theta()
- return jsonify({'success': True})
- @app.route('/send_home', methods=['POST'])
- def send_home():
- """Send the HOME command to the Arduino."""
- try:
- send_command("HOME")
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
- def run_specific_theta_rho_file(file_name):
- """Run a specific theta-rho file."""
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- return jsonify({'error': 'File not found'}), 404
- threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
- return jsonify({'success': True})
- # Expose files for download if needed
- @app.route('/download/<filename>', methods=['GET'])
- def download_file(filename):
- """Download a file from the theta-rho directory."""
- return send_from_directory(THETA_RHO_DIR, filename)
- @app.route('/run_theta_rho_with_action', methods=['POST'])
- def run_theta_rho_with_action():
- """Run a theta-rho file with a pre-execution action if specified."""
- data = request.json
- file_name = data.get('file_name')
- action = data.get('action', 'none') # Default to 'none' if no action is specified
- if not file_name:
- return jsonify({'error': 'No file name provided'}), 400
- # Handle pre-execution action
- if action != 'none':
- action_file_path = os.path.join(THETA_RHO_DIR, f"{action}.thr")
- if not os.path.exists(action_file_path):
- return jsonify({'error': f"Action file {action}.thr not found"}), 404
- try:
- # Run the pre-execution file synchronously
- run_theta_rho_file(action_file_path)
- except Exception as e:
- return jsonify({'error': f"Failed to execute pre-action {action}: {str(e)}"}), 500
- # Run the main Theta-Rho file
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- return jsonify({'error': 'File not found'}), 404
- try:
- # Start a thread to run the main file asynchronously
- threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': f"Failed to execute file {file_name}: {str(e)}"}), 500
- if __name__ == '__main__':
- app.run(debug=True, host='0.0.0.0', port=8080)
|