|
@@ -2,6 +2,7 @@ from flask import Flask, request, jsonify, render_template
|
|
|
import os
|
|
import os
|
|
|
import serial
|
|
import serial
|
|
|
import time
|
|
import time
|
|
|
|
|
+import random
|
|
|
import threading
|
|
import threading
|
|
|
import serial.tools.list_ports
|
|
import serial.tools.list_ports
|
|
|
import math
|
|
import math
|
|
@@ -11,6 +12,11 @@ app = Flask(__name__)
|
|
|
# Configuration
|
|
# Configuration
|
|
|
THETA_RHO_DIR = './patterns'
|
|
THETA_RHO_DIR = './patterns'
|
|
|
IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
|
|
IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
|
|
|
|
|
+CLEAR_PATTERNS = {
|
|
|
|
|
+ "clear_from_in": "./patterns/clear_from_in.thr",
|
|
|
|
|
+ "clear_from_out": "./patterns/clear_from_out.thr",
|
|
|
|
|
+ "clear_sideway": "./patterns/clear_sideway.thr"
|
|
|
|
|
+}
|
|
|
os.makedirs(THETA_RHO_DIR, exist_ok=True)
|
|
os.makedirs(THETA_RHO_DIR, exist_ok=True)
|
|
|
|
|
|
|
|
# Serial connection (First available will be selected by default)
|
|
# Serial connection (First available will be selected by default)
|
|
@@ -67,7 +73,7 @@ def restart_serial(port, baudrate=115200):
|
|
|
def parse_theta_rho_file(file_path):
|
|
def parse_theta_rho_file(file_path):
|
|
|
"""
|
|
"""
|
|
|
Parse a theta-rho file and return a list of (theta, rho) pairs.
|
|
Parse a theta-rho file and return a list of (theta, rho) pairs.
|
|
|
- Optionally apply transformations (rotation and mirroring).
|
|
|
|
|
|
|
+ Normalizes the list so the first theta is always 0.
|
|
|
"""
|
|
"""
|
|
|
coordinates = []
|
|
coordinates = []
|
|
|
try:
|
|
try:
|
|
@@ -84,8 +90,28 @@ def parse_theta_rho_file(file_path):
|
|
|
coordinates.append((theta, rho))
|
|
coordinates.append((theta, rho))
|
|
|
except ValueError:
|
|
except ValueError:
|
|
|
print(f"Skipping invalid line: {line}")
|
|
print(f"Skipping invalid line: {line}")
|
|
|
|
|
+ continue
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
print(f"Error reading file: {e}")
|
|
print(f"Error reading file: {e}")
|
|
|
|
|
+ return coordinates
|
|
|
|
|
+
|
|
|
|
|
+ # ---- Normalization Step ----
|
|
|
|
|
+ if coordinates:
|
|
|
|
|
+ # Take the first coordinate's theta
|
|
|
|
|
+ first_theta = coordinates[0][0]
|
|
|
|
|
+
|
|
|
|
|
+ # Shift all thetas so the first coordinate has theta=0
|
|
|
|
|
+ normalized = []
|
|
|
|
|
+ for (theta, rho) in coordinates:
|
|
|
|
|
+ if rho > 1:
|
|
|
|
|
+ rho = 1
|
|
|
|
|
+ elif rho < 0:
|
|
|
|
|
+ rho = 0
|
|
|
|
|
+ normalized.append((theta - first_theta, rho))
|
|
|
|
|
+
|
|
|
|
|
+ # Replace original list with normalized data
|
|
|
|
|
+ coordinates = normalized
|
|
|
|
|
+
|
|
|
return coordinates
|
|
return coordinates
|
|
|
|
|
|
|
|
def send_coordinate_batch(ser, coordinates):
|
|
def send_coordinate_batch(ser, coordinates):
|
|
@@ -144,6 +170,60 @@ def run_theta_rho_file(file_path):
|
|
|
# Reset theta after execution or stopping
|
|
# Reset theta after execution or stopping
|
|
|
reset_theta()
|
|
reset_theta()
|
|
|
ser.write("FINISHED\n".encode())
|
|
ser.write("FINISHED\n".encode())
|
|
|
|
|
+
|
|
|
|
|
+def get_clear_pattern_file(pattern_name):
|
|
|
|
|
+ """Return a .thr file path based on pattern_name."""
|
|
|
|
|
+ if pattern_name == "random":
|
|
|
|
|
+ # Randomly pick one of the three known patterns
|
|
|
|
|
+ return random.choice(list(CLEAR_PATTERNS.values()))
|
|
|
|
|
+ # If pattern_name is invalid or absent, default to 'clear_from_in'
|
|
|
|
|
+ return CLEAR_PATTERNS.get(pattern_name, CLEAR_PATTERNS["clear_from_in"])
|
|
|
|
|
+
|
|
|
|
|
+def run_theta_rho_files(
|
|
|
|
|
+ file_paths,
|
|
|
|
|
+ pause_time=0,
|
|
|
|
|
+ clear_between_files=False,
|
|
|
|
|
+ clear_pattern=None
|
|
|
|
|
+):
|
|
|
|
|
+ """
|
|
|
|
|
+ Runs multiple .thr files in sequence, optionally pausing between each pattern,
|
|
|
|
|
+ and optionally running a clear pattern between files.
|
|
|
|
|
+ Now supports:
|
|
|
|
|
+ - no_clear: skips running a clear pattern
|
|
|
|
|
+ - random: chooses any of the 3 clear patterns randomly
|
|
|
|
|
+ """
|
|
|
|
|
+ global stop_requested
|
|
|
|
|
+ stop_requested = False # reset stop flag at the start
|
|
|
|
|
+
|
|
|
|
|
+ for idx, path in enumerate(file_paths):
|
|
|
|
|
+ if stop_requested:
|
|
|
|
|
+ print("Execution stopped before starting next pattern.")
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ # Run the main pattern
|
|
|
|
|
+ print(f"Running pattern {idx + 1} of {len(file_paths)}: {path}")
|
|
|
|
|
+ run_theta_rho_file(path)
|
|
|
|
|
+
|
|
|
|
|
+ # If there are more files to run
|
|
|
|
|
+ if idx < len(file_paths) - 1:
|
|
|
|
|
+ # Pause after each pattern if requested
|
|
|
|
|
+ time.sleep(pause_time)
|
|
|
|
|
+
|
|
|
|
|
+ if stop_requested:
|
|
|
|
|
+ print("Execution stopped before running the next pattern or clear.")
|
|
|
|
|
+ break
|
|
|
|
|
+
|
|
|
|
|
+ # Conditionally run a "clear" pattern
|
|
|
|
|
+ if clear_between_files:
|
|
|
|
|
+ # If user explicitly wants no clear pattern, skip
|
|
|
|
|
+ if not clear_pattern:
|
|
|
|
|
+ continue
|
|
|
|
|
+ # Otherwise, get the file (could be random or a known pattern)
|
|
|
|
|
+ clear_file_path = get_clear_pattern_file(clear_pattern)
|
|
|
|
|
+ print(f"Running clear pattern: {clear_file_path}")
|
|
|
|
|
+ run_theta_rho_file(clear_file_path)
|
|
|
|
|
+
|
|
|
|
|
+ print("All requested patterns completed (or stopped).")
|
|
|
|
|
|
|
|
def reset_theta():
|
|
def reset_theta():
|
|
|
"""Reset theta on the Arduino."""
|
|
"""Reset theta on the Arduino."""
|
|
@@ -220,10 +300,11 @@ def upload_theta_rho():
|
|
|
return jsonify({'success': True})
|
|
return jsonify({'success': True})
|
|
|
return jsonify({'success': False})
|
|
return jsonify({'success': False})
|
|
|
|
|
|
|
|
|
|
+
|
|
|
@app.route('/run_theta_rho', methods=['POST'])
|
|
@app.route('/run_theta_rho', methods=['POST'])
|
|
|
def run_theta_rho():
|
|
def run_theta_rho():
|
|
|
file_name = request.json.get('file_name')
|
|
file_name = request.json.get('file_name')
|
|
|
- pre_execution = request.json.get('pre_execution') # New parameter for pre-execution action
|
|
|
|
|
|
|
+ pre_execution = request.json.get('pre_execution') # 'clear_in', 'clear_out', or 'none'
|
|
|
|
|
|
|
|
if not file_name:
|
|
if not file_name:
|
|
|
return jsonify({'error': 'No file name provided'}), 400
|
|
return jsonify({'error': 'No file name provided'}), 400
|
|
@@ -233,21 +314,33 @@ def run_theta_rho():
|
|
|
return jsonify({'error': 'File not found'}), 404
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
- # Handle pre-execution actions
|
|
|
|
|
|
|
+ # Build a list of files to run in sequence
|
|
|
|
|
+ files_to_run = []
|
|
|
|
|
+
|
|
|
if pre_execution == 'clear_in':
|
|
if pre_execution == 'clear_in':
|
|
|
- clear_in_thread = threading.Thread(target=run_theta_rho_file, args=('./patterns/clear_from_in.thr',))
|
|
|
|
|
- clear_in_thread.start()
|
|
|
|
|
- clear_in_thread.join() # Wait for completion before proceeding
|
|
|
|
|
|
|
+ files_to_run.append('./patterns/clear_from_in.thr')
|
|
|
elif pre_execution == 'clear_out':
|
|
elif pre_execution == 'clear_out':
|
|
|
- clear_out_thread = threading.Thread(target=run_theta_rho_file, args=('./patterns/clear_from_out.thr',))
|
|
|
|
|
- clear_out_thread.start()
|
|
|
|
|
- clear_out_thread.join() # Wait for completion before proceeding
|
|
|
|
|
|
|
+ files_to_run.append('./patterns/clear_from_out.thr')
|
|
|
|
|
+ elif pre_execution == 'clear_sideway':
|
|
|
|
|
+ files_to_run.append('./patterns/clear_sideway.thr')
|
|
|
elif pre_execution == 'none':
|
|
elif pre_execution == 'none':
|
|
|
pass # No pre-execution action required
|
|
pass # No pre-execution action required
|
|
|
|
|
|
|
|
- # Start the main pattern execution
|
|
|
|
|
- threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
|
|
|
|
|
|
|
+ # Finally, add the main file
|
|
|
|
|
+ files_to_run.append(file_path)
|
|
|
|
|
+
|
|
|
|
|
+ # Run them in one shot using run_theta_rho_files (blocking call)
|
|
|
|
|
+ threading.Thread(
|
|
|
|
|
+ target=run_theta_rho_files,
|
|
|
|
|
+ args=(files_to_run,),
|
|
|
|
|
+ kwargs={
|
|
|
|
|
+ 'pause_time': 0,
|
|
|
|
|
+ 'clear_between_files': False,
|
|
|
|
|
+ 'clear_pattern': None
|
|
|
|
|
+ }
|
|
|
|
|
+ ).start()
|
|
|
return jsonify({'success': True})
|
|
return jsonify({'success': True})
|
|
|
|
|
+
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
return jsonify({'error': str(e)}), 500
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
|
@@ -377,35 +470,6 @@ def serial_status():
|
|
|
'port': ser_port # Include the port name
|
|
'port': ser_port # Include the port name
|
|
|
})
|
|
})
|
|
|
|
|
|
|
|
-@app.route('/set_speed', methods=['POST'])
|
|
|
|
|
-def set_speed():
|
|
|
|
|
- """Set the speed for the Arduino."""
|
|
|
|
|
- global ser
|
|
|
|
|
- if ser is None or not ser.is_open:
|
|
|
|
|
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
|
|
|
|
|
-
|
|
|
|
|
- try:
|
|
|
|
|
- # Parse the speed value from the request
|
|
|
|
|
- data = request.json
|
|
|
|
|
- speed = data.get('speed')
|
|
|
|
|
-
|
|
|
|
|
- if speed is None:
|
|
|
|
|
- return jsonify({"success": False, "error": "Speed is required"}), 400
|
|
|
|
|
-
|
|
|
|
|
- if not isinstance(speed, (int, float)) or speed <= 0:
|
|
|
|
|
- return jsonify({"success": False, "error": "Invalid speed value"}), 400
|
|
|
|
|
-
|
|
|
|
|
- # Send the SET_SPEED command to the Arduino
|
|
|
|
|
- command = f"SET_SPEED {speed}"
|
|
|
|
|
- send_command(command)
|
|
|
|
|
- return jsonify({"success": True, "speed": speed})
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
-
|
|
|
|
|
if __name__ == '__main__':
|
|
if __name__ == '__main__':
|
|
|
- # Auto-connect to serial
|
|
|
|
|
- connect_to_serial()
|
|
|
|
|
-
|
|
|
|
|
- # Start the Flask app
|
|
|
|
|
app.run(debug=True, host='0.0.0.0', port=8080)
|
|
app.run(debug=True, host='0.0.0.0', port=8080)
|
|
|
|
|
|