|
|
@@ -1,514 +1,59 @@
|
|
|
-from flask import Flask, request, jsonify, render_template
|
|
|
+from flask import Flask, request, jsonify, render_template, send_from_directory
|
|
|
import os
|
|
|
-import serial
|
|
|
-import time
|
|
|
-import random
|
|
|
import threading
|
|
|
-import serial.tools.list_ports
|
|
|
-import math
|
|
|
-import json
|
|
|
from datetime import datetime
|
|
|
-import subprocess
|
|
|
+import logging
|
|
|
+
|
|
|
+from modules.serial.serial_manager import (
|
|
|
+ list_serial_ports, connect_to_serial, disconnect_serial,
|
|
|
+ restart_serial, get_serial_status, get_device_info,
|
|
|
+ send_coordinate_batch
|
|
|
+)
|
|
|
+from modules.firmware.firmware_manager import (
|
|
|
+ get_firmware_info, flash_firmware, check_git_updates,
|
|
|
+ update_software
|
|
|
+)
|
|
|
+from modules.core.pattern_manager import (
|
|
|
+ THETA_RHO_DIR, parse_theta_rho_file, run_theta_rho_file,
|
|
|
+ run_theta_rho_files, get_execution_status, stop_execution,
|
|
|
+ pause_execution, resume_execution
|
|
|
+)
|
|
|
+from modules.core.playlist_manager import (
|
|
|
+ list_all_playlists, get_playlist, create_playlist,
|
|
|
+ modify_playlist, delete_playlist, add_to_playlist
|
|
|
+)
|
|
|
|
|
|
app = Flask(__name__)
|
|
|
+logging.basicConfig(level=logging.INFO)
|
|
|
|
|
|
-# Configuration
|
|
|
-THETA_RHO_DIR = './patterns'
|
|
|
-IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
|
|
|
-CLEAR_PATTERNS = {
|
|
|
- "clear_from_in": "./patterns/clear_from_in.thr",
|
|
|
- "clear_from_out": "./patterns/clear_from_out.thr",
|
|
|
- "clear_sideway": "./patterns/clear_sideway.thr"
|
|
|
-}
|
|
|
+# Ensure the patterns directory exists
|
|
|
os.makedirs(THETA_RHO_DIR, exist_ok=True)
|
|
|
|
|
|
-# Serial connection (First available will be selected by default)
|
|
|
-ser = None
|
|
|
-ser_port = None # Global variable to store the serial port name
|
|
|
-stop_requested = False
|
|
|
-pause_requested = False
|
|
|
-pause_condition = threading.Condition()
|
|
|
-
|
|
|
-# Global variables to store device information
|
|
|
-arduino_table_name = None
|
|
|
-arduino_driver_type = 'Unknown'
|
|
|
-
|
|
|
-# Table status
|
|
|
-current_playing_file = None
|
|
|
-execution_progress = None
|
|
|
-firmware_version = 'Unknown'
|
|
|
-current_playing_index = None
|
|
|
-current_playlist = None
|
|
|
-is_clearing = False
|
|
|
-
|
|
|
-serial_lock = threading.Lock()
|
|
|
-
|
|
|
-PLAYLISTS_FILE = os.path.join(os.getcwd(), "playlists.json")
|
|
|
-
|
|
|
-MOTOR_TYPE_MAPPING = {
|
|
|
- "TMC2209": "./firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino",
|
|
|
- "DRV8825": "./firmware/arduino_code/arduino_code.ino",
|
|
|
- "esp32": "./firmware/esp32/esp32.ino"
|
|
|
-}
|
|
|
-
|
|
|
-# Ensure the file exists and contains at least an empty JSON object
|
|
|
-if not os.path.exists(PLAYLISTS_FILE):
|
|
|
- with open(PLAYLISTS_FILE, "w") as f:
|
|
|
- json.dump({}, f, indent=2)
|
|
|
-
|
|
|
-def get_ino_firmware_details(ino_file_path):
|
|
|
- """
|
|
|
- Extract firmware details, including version and motor type, from the given .ino file.
|
|
|
-
|
|
|
- Args:
|
|
|
- ino_file_path (str): Path to the .ino file.
|
|
|
-
|
|
|
- Returns:
|
|
|
- dict: Dictionary containing firmware details such as version and motor type, or None if not found.
|
|
|
- """
|
|
|
- try:
|
|
|
- if not ino_file_path:
|
|
|
- raise ValueError("Invalid path: ino_file_path is None or empty.")
|
|
|
-
|
|
|
- firmware_details = {"version": None, "motorType": None}
|
|
|
-
|
|
|
- with open(ino_file_path, "r") as file:
|
|
|
- for line in file:
|
|
|
- # Extract firmware version
|
|
|
- if "firmwareVersion" in line:
|
|
|
- start = line.find('"') + 1
|
|
|
- end = line.rfind('"')
|
|
|
- if start != -1 and end != -1 and start < end:
|
|
|
- firmware_details["version"] = line[start:end]
|
|
|
-
|
|
|
- # Extract motor type
|
|
|
- if "motorType" in line:
|
|
|
- start = line.find('"') + 1
|
|
|
- end = line.rfind('"')
|
|
|
- if start != -1 and end != -1 and start < end:
|
|
|
- firmware_details["motorType"] = line[start:end]
|
|
|
-
|
|
|
- if not firmware_details["version"]:
|
|
|
- print(f"Firmware version not found in file: {ino_file_path}")
|
|
|
- if not firmware_details["motorType"]:
|
|
|
- print(f"Motor type not found in file: {ino_file_path}")
|
|
|
-
|
|
|
- return firmware_details if any(firmware_details.values()) else None
|
|
|
-
|
|
|
- except FileNotFoundError:
|
|
|
- print(f"File not found: {ino_file_path}")
|
|
|
- return None
|
|
|
- except Exception as e:
|
|
|
- print(f"Error reading .ino file: {str(e)}")
|
|
|
- return None
|
|
|
-
|
|
|
-def check_git_updates():
|
|
|
- try:
|
|
|
- # Fetch the latest updates from the remote repository
|
|
|
- subprocess.run(["git", "fetch", "--tags", "--force"], check=True)
|
|
|
-
|
|
|
- # Get the latest tag from the remote
|
|
|
- latest_remote_tag = subprocess.check_output(
|
|
|
- ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
|
|
|
- ).strip().decode()
|
|
|
-
|
|
|
- # Get the latest tag from the local branch
|
|
|
- latest_local_tag = subprocess.check_output(
|
|
|
- ["git", "describe", "--tags", "--abbrev=0"]
|
|
|
- ).strip().decode()
|
|
|
-
|
|
|
- # Count how many tags the local branch is behind
|
|
|
- tag_behind_count = 0
|
|
|
- if latest_local_tag != latest_remote_tag:
|
|
|
- tags = subprocess.check_output(
|
|
|
- ["git", "tag", "--merged", "origin/main"], text=True
|
|
|
- ).splitlines()
|
|
|
-
|
|
|
- found_local = False
|
|
|
- for tag in tags:
|
|
|
- if tag == latest_local_tag:
|
|
|
- found_local = True
|
|
|
- elif found_local:
|
|
|
- tag_behind_count += 1
|
|
|
- if tag == latest_remote_tag:
|
|
|
- break
|
|
|
-
|
|
|
-
|
|
|
- # Check if there are new commits
|
|
|
- updates_available = latest_remote_tag != latest_local_tag
|
|
|
-
|
|
|
- return {
|
|
|
- "updates_available": updates_available,
|
|
|
- "tag_behind_count": tag_behind_count, # Tags behind
|
|
|
- "latest_remote_tag": latest_remote_tag,
|
|
|
- "latest_local_tag": latest_local_tag,
|
|
|
- }
|
|
|
- except subprocess.CalledProcessError as e:
|
|
|
- print(f"Error checking Git updates: {e}")
|
|
|
- return {
|
|
|
- "updates_available": False,
|
|
|
- "tag_behind_count": 0,
|
|
|
- "latest_remote_tag": None,
|
|
|
- "latest_local_tag": None,
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
-def list_serial_ports():
|
|
|
- """Return a list of available serial ports."""
|
|
|
- ports = serial.tools.list_ports.comports()
|
|
|
- return [port.device for port in ports if port.device not in IGNORE_PORTS]
|
|
|
-
|
|
|
-def connect_to_serial(port=None, baudrate=115200):
|
|
|
- """Automatically connect to the first available serial port or a specified port."""
|
|
|
- global ser, ser_port, arduino_table_name, arduino_driver_type, firmware_version
|
|
|
-
|
|
|
- try:
|
|
|
- if port is None:
|
|
|
- ports = list_serial_ports()
|
|
|
- if not ports:
|
|
|
- print("No serial port connected")
|
|
|
- return False
|
|
|
- port = ports[0] # Auto-select the first available port
|
|
|
-
|
|
|
- with serial_lock:
|
|
|
- if ser and ser.is_open:
|
|
|
- ser.close()
|
|
|
- ser = serial.Serial(port, baudrate, timeout=2) # Set timeout to avoid infinite waits
|
|
|
- ser_port = port # Store the connected port globally
|
|
|
-
|
|
|
- print(f"Connected to serial port: {port}")
|
|
|
- time.sleep(2) # Allow time for the connection to establish
|
|
|
-
|
|
|
- # Read initial startup messages from Arduino
|
|
|
- arduino_table_name = None
|
|
|
- arduino_driver_type = None
|
|
|
-
|
|
|
- while ser.in_waiting > 0:
|
|
|
- line = ser.readline().decode().strip()
|
|
|
- print(f"Arduino: {line}") # Print the received message
|
|
|
-
|
|
|
- # Store the device details based on the expected messages
|
|
|
- if "Table:" in line:
|
|
|
- arduino_table_name = line.replace("Table: ", "").strip()
|
|
|
- elif "Drivers:" in line:
|
|
|
- arduino_driver_type = line.replace("Drivers: ", "").strip()
|
|
|
- elif "Version:" in line:
|
|
|
- firmware_version = line.replace("Version: ", "").strip()
|
|
|
-
|
|
|
- # Display stored values
|
|
|
- print(f"Detected Table: {arduino_table_name or 'Unknown'}")
|
|
|
- print(f"Detected Drivers: {arduino_driver_type or 'Unknown'}")
|
|
|
-
|
|
|
- return True # Successfully connected
|
|
|
- except serial.SerialException as e:
|
|
|
- print(f"Failed to connect to serial port {port}: {e}")
|
|
|
- port = None # Reset the port to try the next available one
|
|
|
-
|
|
|
- print("Max retries reached. Could not connect to a serial port.")
|
|
|
- return False
|
|
|
-
|
|
|
-def disconnect_serial():
|
|
|
- """Disconnect the current serial connection."""
|
|
|
- global ser, ser_port
|
|
|
- if ser and ser.is_open:
|
|
|
- ser.close()
|
|
|
- ser = None
|
|
|
- ser_port = None # Reset the port name
|
|
|
-
|
|
|
-def restart_serial(port, baudrate=115200):
|
|
|
- """Restart the serial connection."""
|
|
|
- disconnect_serial()
|
|
|
- connect_to_serial(port, baudrate)
|
|
|
-
|
|
|
-def parse_theta_rho_file(file_path):
|
|
|
- """
|
|
|
- Parse a theta-rho file and return a list of (theta, rho) pairs.
|
|
|
- Normalizes the list so the first theta is always 0.
|
|
|
- """
|
|
|
- coordinates = []
|
|
|
- try:
|
|
|
- with open(file_path, 'r') as file:
|
|
|
- for line in file:
|
|
|
- line = line.strip()
|
|
|
- # Skip header or comment lines (starting with '#' or empty lines)
|
|
|
- if not line or line.startswith("#"):
|
|
|
- continue
|
|
|
-
|
|
|
- # Parse lines with theta and rho separated by spaces
|
|
|
- try:
|
|
|
- theta, rho = map(float, line.split())
|
|
|
- coordinates.append((theta, rho))
|
|
|
- except ValueError:
|
|
|
- print(f"Skipping invalid line: {line}")
|
|
|
- continue
|
|
|
- except Exception as e:
|
|
|
- print(f"Error reading file: {e}")
|
|
|
- return coordinates
|
|
|
-
|
|
|
- # ---- Normalization Step ----
|
|
|
- if coordinates:
|
|
|
- # Take the first coordinate's theta
|
|
|
- first_theta = coordinates[0][0]
|
|
|
-
|
|
|
- # Shift all thetas so the first coordinate has theta=0
|
|
|
- normalized = []
|
|
|
- for (theta, rho) in coordinates:
|
|
|
- normalized.append((theta - first_theta, rho))
|
|
|
-
|
|
|
- # Replace original list with normalized data
|
|
|
- coordinates = normalized
|
|
|
-
|
|
|
- return coordinates
|
|
|
-
|
|
|
-def send_coordinate_batch(ser, coordinates):
|
|
|
- """Send a batch of theta-rho pairs to the Arduino."""
|
|
|
- # print("Sending batch:", coordinates)
|
|
|
- batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
|
|
|
- ser.write(batch_str.encode())
|
|
|
-
|
|
|
-def send_command(command):
|
|
|
- """Send a single command to the Arduino."""
|
|
|
- ser.write(f"{command}\n".encode())
|
|
|
- print(f"Sent: {command}")
|
|
|
-
|
|
|
- # Wait for "R" acknowledgment from Arduino
|
|
|
- while True:
|
|
|
- with serial_lock:
|
|
|
- if ser.in_waiting > 0:
|
|
|
- response = ser.readline().decode().strip()
|
|
|
- print(f"Arduino response: {response}")
|
|
|
- if response == "R":
|
|
|
- print("Command execution completed.")
|
|
|
- break
|
|
|
-
|
|
|
-def wait_for_start_time(schedule_hours):
|
|
|
- """
|
|
|
- Keep checking every 30 seconds if the time is within the schedule to resume execution.
|
|
|
- """
|
|
|
- global pause_requested
|
|
|
- start_time, end_time = schedule_hours
|
|
|
-
|
|
|
- while pause_requested:
|
|
|
- now = datetime.now().time()
|
|
|
- if start_time <= now < end_time:
|
|
|
- print("Resuming execution: Within schedule.")
|
|
|
- pause_requested = False
|
|
|
- with pause_condition:
|
|
|
- pause_condition.notify_all()
|
|
|
- break # Exit the loop once resumed
|
|
|
- else:
|
|
|
- time.sleep(30) # Wait for 30 seconds before checking again
|
|
|
-
|
|
|
-# Function to check schedule based on start and end time
|
|
|
-def schedule_checker(schedule_hours):
|
|
|
- """
|
|
|
- Pauses/resumes execution based on a given time range.
|
|
|
-
|
|
|
- Parameters:
|
|
|
- - schedule_hours (tuple): (start_time, end_time) as `datetime.time` objects.
|
|
|
- """
|
|
|
- global pause_requested
|
|
|
- if not schedule_hours:
|
|
|
- return # No scheduling restriction
|
|
|
-
|
|
|
- start_time, end_time = schedule_hours
|
|
|
- now = datetime.now().time() # Get the current time as `datetime.time`
|
|
|
-
|
|
|
- # Check if we are currently within the scheduled time
|
|
|
- if start_time <= now < end_time:
|
|
|
- if pause_requested:
|
|
|
- print("Starting execution: Within schedule.")
|
|
|
- pause_requested = False # Resume execution
|
|
|
- with pause_condition:
|
|
|
- pause_condition.notify_all()
|
|
|
- else:
|
|
|
- if not pause_requested:
|
|
|
- print("Pausing execution: Outside schedule.")
|
|
|
- pause_requested = True # Pause execution
|
|
|
-
|
|
|
- # Start a background thread to periodically check for start time
|
|
|
- threading.Thread(target=wait_for_start_time, args=(schedule_hours,), daemon=True).start()
|
|
|
-
|
|
|
-def run_theta_rho_file(file_path, schedule_hours=None):
|
|
|
- """Run a theta-rho file by sending data in optimized batches."""
|
|
|
- global stop_requested, current_playing_file, execution_progress
|
|
|
- stop_requested = False
|
|
|
- current_playing_file = file_path # Track current playing file
|
|
|
- execution_progress = (0, 0) # Reset progress
|
|
|
-
|
|
|
- coordinates = parse_theta_rho_file(file_path)
|
|
|
- total_coordinates = len(coordinates)
|
|
|
-
|
|
|
- if total_coordinates < 2:
|
|
|
- print("Not enough coordinates for interpolation.")
|
|
|
- current_playing_file = None # Clear tracking if failed
|
|
|
- execution_progress = None
|
|
|
- return
|
|
|
-
|
|
|
- execution_progress = (0, total_coordinates) # Update total coordinates
|
|
|
- batch_size = 10 # Smaller batches may smooth movement further
|
|
|
-
|
|
|
- for i in range(0, total_coordinates, batch_size):
|
|
|
- if stop_requested:
|
|
|
- print("Execution stopped by user after completing the current batch.")
|
|
|
- break
|
|
|
-
|
|
|
- with pause_condition:
|
|
|
- while pause_requested:
|
|
|
- print("Execution paused...")
|
|
|
- pause_condition.wait() # This will block execution until notified
|
|
|
-
|
|
|
- batch = coordinates[i:i + batch_size]
|
|
|
- if i == 0:
|
|
|
- send_coordinate_batch(ser, batch)
|
|
|
- execution_progress = (i + batch_size, total_coordinates) # Update progress
|
|
|
- continue
|
|
|
-
|
|
|
- while True:
|
|
|
- schedule_checker(schedule_hours) # Check if within schedule
|
|
|
- with serial_lock:
|
|
|
- if ser.in_waiting > 0:
|
|
|
- response = ser.readline().decode().strip()
|
|
|
- if response == "R":
|
|
|
- send_coordinate_batch(ser, batch)
|
|
|
- execution_progress = (i + batch_size, total_coordinates) # Update progress
|
|
|
- break
|
|
|
- else:
|
|
|
- print(f"Arduino response: {response}")
|
|
|
-
|
|
|
- reset_theta()
|
|
|
- ser.write("FINISHED\n".encode())
|
|
|
-
|
|
|
- # Clear tracking variables when done
|
|
|
- current_playing_file = None
|
|
|
- execution_progress = None
|
|
|
- print("Pattern execution completed.")
|
|
|
-
|
|
|
-def get_clear_pattern_file(pattern_name):
|
|
|
- """Return a .thr file path based on pattern_name."""
|
|
|
- if pattern_name == "random":
|
|
|
- # Randomly pick one of the three known patterns
|
|
|
- return random.choice(list(CLEAR_PATTERNS.values()))
|
|
|
- # If pattern_name is invalid or absent, default to 'clear_from_in'
|
|
|
- return CLEAR_PATTERNS.get(pattern_name, CLEAR_PATTERNS["clear_from_in"])
|
|
|
-
|
|
|
-def run_theta_rho_files(
|
|
|
- file_paths,
|
|
|
- pause_time=0,
|
|
|
- clear_pattern=None,
|
|
|
- run_mode="single",
|
|
|
- shuffle=False,
|
|
|
- schedule_hours=None
|
|
|
-):
|
|
|
- """
|
|
|
- Runs multiple .thr files in sequence with options for pausing, clearing, shuffling, and looping.
|
|
|
-
|
|
|
- Parameters:
|
|
|
- - file_paths (list): List of file paths to run.
|
|
|
- - pause_time (float): Seconds to pause between patterns.
|
|
|
- - clear_pattern (str): Specific clear pattern to run ("clear_in", "clear_out", "clear_sideway", or "random").
|
|
|
- - run_mode (str): "single" for one-time run or "indefinite" for looping.
|
|
|
- - shuffle (bool): Whether to shuffle the playlist before running.
|
|
|
- """
|
|
|
- global stop_requested
|
|
|
- global current_playlist
|
|
|
- global current_playing_index
|
|
|
- stop_requested = False # Reset stop flag at the start
|
|
|
-
|
|
|
- if shuffle:
|
|
|
- random.shuffle(file_paths)
|
|
|
- print("Playlist shuffled.")
|
|
|
-
|
|
|
- current_playlist = file_paths
|
|
|
-
|
|
|
- while True:
|
|
|
- for idx, path in enumerate(file_paths):
|
|
|
- current_playing_index = idx
|
|
|
- schedule_checker(schedule_hours)
|
|
|
- if stop_requested:
|
|
|
- print("Execution stopped before starting next pattern.")
|
|
|
- return
|
|
|
-
|
|
|
- if clear_pattern:
|
|
|
- if stop_requested:
|
|
|
- print("Execution stopped before running the next clear pattern.")
|
|
|
- return
|
|
|
-
|
|
|
- # Determine the clear pattern to run
|
|
|
- clear_file_path = get_clear_pattern_file(clear_pattern)
|
|
|
- print(f"Running clear pattern: {clear_file_path}")
|
|
|
- run_theta_rho_file(clear_file_path, schedule_hours)
|
|
|
-
|
|
|
- if not stop_requested:
|
|
|
- # Run the main pattern
|
|
|
- print(f"Running pattern {idx + 1} of {len(file_paths)}: {path}")
|
|
|
- run_theta_rho_file(path, schedule_hours)
|
|
|
-
|
|
|
- if idx < len(file_paths) -1:
|
|
|
- if stop_requested:
|
|
|
- print("Execution stopped before running the next clear pattern.")
|
|
|
- return
|
|
|
- # Pause after each pattern if requested
|
|
|
- if pause_time > 0:
|
|
|
- print(f"Pausing for {pause_time} seconds...")
|
|
|
- time.sleep(pause_time)
|
|
|
-
|
|
|
- # After completing the playlist
|
|
|
- if run_mode == "indefinite":
|
|
|
- print("Playlist completed. Restarting as per 'indefinite' run mode.")
|
|
|
- if pause_time > 0:
|
|
|
- print(f"Pausing for {pause_time} seconds before restarting...")
|
|
|
- time.sleep(pause_time)
|
|
|
- if shuffle:
|
|
|
- random.shuffle(file_paths)
|
|
|
- print("Playlist reshuffled for the next loop.")
|
|
|
- continue
|
|
|
- else:
|
|
|
- print("Playlist completed.")
|
|
|
- break
|
|
|
-
|
|
|
- # Reset theta after execution or stopping
|
|
|
- reset_theta()
|
|
|
- ser.write("FINISHED\n".encode())
|
|
|
- print("All requested patterns completed (or stopped).")
|
|
|
-
|
|
|
-def reset_theta():
|
|
|
- """Reset theta on the Arduino."""
|
|
|
- ser.write("RESET_THETA\n".encode())
|
|
|
- while True:
|
|
|
- with serial_lock:
|
|
|
- if ser.in_waiting > 0:
|
|
|
- response = ser.readline().decode().strip()
|
|
|
- print(f"Arduino response: {response}")
|
|
|
- if response == "THETA_RESET":
|
|
|
- print("Theta successfully reset.")
|
|
|
- break
|
|
|
- time.sleep(0.5) # Small delay to avoid busy waiting
|
|
|
-
|
|
|
-# Flask API Endpoints
|
|
|
+# API Routes
|
|
|
@app.route('/')
|
|
|
def index():
|
|
|
return render_template('index.html')
|
|
|
|
|
|
+# Serial Routes
|
|
|
@app.route('/list_serial_ports', methods=['GET'])
|
|
|
-def list_ports():
|
|
|
+def api_list_ports():
|
|
|
return jsonify(list_serial_ports())
|
|
|
|
|
|
@app.route('/connect_serial', methods=['POST'])
|
|
|
-def connect_serial():
|
|
|
+def api_connect_serial():
|
|
|
port = request.json.get('port')
|
|
|
if not port:
|
|
|
return jsonify({'error': 'No port provided'}), 400
|
|
|
|
|
|
try:
|
|
|
- connect_to_serial(port)
|
|
|
- return jsonify({'success': True})
|
|
|
+ success = connect_to_serial(port)
|
|
|
+ return jsonify({'success': success})
|
|
|
except Exception as e:
|
|
|
+ app.logger.error(f"Error connecting to serial port: {str(e)}", exc_info=True)
|
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/disconnect_serial', methods=['POST'])
|
|
|
-def disconnect():
|
|
|
+def api_disconnect():
|
|
|
try:
|
|
|
disconnect_serial()
|
|
|
return jsonify({'success': True})
|
|
|
@@ -516,31 +61,35 @@ def disconnect():
|
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
@app.route('/restart_serial', methods=['POST'])
|
|
|
-def restart():
|
|
|
+def api_restart():
|
|
|
port = request.json.get('port')
|
|
|
if not port:
|
|
|
return jsonify({'error': 'No port provided'}), 400
|
|
|
|
|
|
try:
|
|
|
- restart_serial(port)
|
|
|
- return jsonify({'success': True})
|
|
|
+ success = restart_serial(port)
|
|
|
+ return jsonify({'success': success})
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
+@app.route('/serial_status', methods=['GET'])
|
|
|
+def api_serial_status():
|
|
|
+ return jsonify(get_serial_status())
|
|
|
+
|
|
|
+# Pattern Routes
|
|
|
@app.route('/list_theta_rho_files', methods=['GET'])
|
|
|
-def list_theta_rho_files():
|
|
|
+def api_list_theta_rho_files():
|
|
|
files = []
|
|
|
for root, _, filenames in os.walk(THETA_RHO_DIR):
|
|
|
for file in filenames:
|
|
|
- # Construct the relative file path
|
|
|
relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
|
|
|
files.append(relative_path)
|
|
|
return jsonify(sorted(files))
|
|
|
|
|
|
@app.route('/upload_theta_rho', methods=['POST'])
|
|
|
-def upload_theta_rho():
|
|
|
+def api_upload_theta_rho():
|
|
|
custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
|
|
|
- os.makedirs(custom_patterns_dir, exist_ok=True) # Ensure the directory exists
|
|
|
+ os.makedirs(custom_patterns_dir, exist_ok=True)
|
|
|
|
|
|
file = request.files['file']
|
|
|
if file:
|
|
|
@@ -548,11 +97,10 @@ def upload_theta_rho():
|
|
|
return jsonify({'success': True})
|
|
|
return jsonify({'success': False})
|
|
|
|
|
|
-
|
|
|
@app.route('/run_theta_rho', methods=['POST'])
|
|
|
-def run_theta_rho():
|
|
|
+def api_run_theta_rho():
|
|
|
file_name = request.json.get('file_name')
|
|
|
- pre_execution = request.json.get('pre_execution') # 'clear_in', 'clear_out', 'clear_sideway', or 'none'
|
|
|
+ pre_execution = request.json.get('pre_execution')
|
|
|
|
|
|
if not file_name:
|
|
|
return jsonify({'error': 'No file name provided'}), 400
|
|
|
@@ -562,121 +110,23 @@ def run_theta_rho():
|
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
|
|
try:
|
|
|
- # Build a list of files to run in sequence
|
|
|
files_to_run = []
|
|
|
-
|
|
|
- if pre_execution == 'clear_in':
|
|
|
- files_to_run.append('./patterns/clear_from_in.thr')
|
|
|
- elif pre_execution == 'clear_out':
|
|
|
- files_to_run.append('./patterns/clear_from_out.thr')
|
|
|
- elif pre_execution == 'clear_sideway':
|
|
|
- files_to_run.append('./patterns/clear_sideway.thr')
|
|
|
- elif pre_execution == 'none':
|
|
|
- pass # No pre-execution action required
|
|
|
-
|
|
|
- # Finally, add the main file
|
|
|
+ if pre_execution in ['clear_in', 'clear_out', 'clear_sideway']:
|
|
|
+ files_to_run.append(f'./patterns/clear_from_{pre_execution.split("_")[1]}.thr')
|
|
|
files_to_run.append(file_path)
|
|
|
|
|
|
- # Run them in one shot using run_theta_rho_files (blocking call)
|
|
|
threading.Thread(
|
|
|
target=run_theta_rho_files,
|
|
|
args=(files_to_run,),
|
|
|
- kwargs={
|
|
|
- 'pause_time': 0,
|
|
|
- 'clear_pattern': None
|
|
|
- }
|
|
|
+ kwargs={'pause_time': 0, 'clear_pattern': None}
|
|
|
).start()
|
|
|
return jsonify({'success': True})
|
|
|
-
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
-@app.route('/stop_execution', methods=['POST'])
|
|
|
-def stop_execution():
|
|
|
- global pause_requested
|
|
|
- with pause_condition:
|
|
|
- pause_requested = False
|
|
|
- pause_condition.notify_all()
|
|
|
-
|
|
|
- global stop_requested, current_playing_index, current_playlist, is_clearing, current_playing_file, execution_progress
|
|
|
- stop_requested = True
|
|
|
- current_playing_index = None
|
|
|
- current_playlist = None
|
|
|
- is_clearing = False
|
|
|
- current_playing_file = None
|
|
|
- execution_progress = None
|
|
|
-
|
|
|
- return jsonify({'success': True})
|
|
|
-
|
|
|
-@app.route('/send_home', methods=['POST'])
|
|
|
-def send_home():
|
|
|
- """Send the HOME command to the Arduino."""
|
|
|
- try:
|
|
|
- send_command("HOME")
|
|
|
- return jsonify({'success': True})
|
|
|
- except Exception as e:
|
|
|
- return jsonify({'error': str(e)}), 500
|
|
|
-
|
|
|
-@app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
|
|
|
-def run_specific_theta_rho_file(file_name):
|
|
|
- """Run a specific theta-rho file."""
|
|
|
- file_path = os.path.join(THETA_RHO_DIR, file_name)
|
|
|
- if not os.path.exists(file_path):
|
|
|
- return jsonify({'error': 'File not found'}), 404
|
|
|
-
|
|
|
- threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
|
|
|
- return jsonify({'success': True})
|
|
|
-
|
|
|
-@app.route('/delete_theta_rho_file', methods=['POST'])
|
|
|
-def delete_theta_rho_file():
|
|
|
- data = request.json
|
|
|
- file_name = data.get('file_name')
|
|
|
-
|
|
|
- if not file_name:
|
|
|
- return jsonify({"success": False, "error": "No file name provided"}), 400
|
|
|
-
|
|
|
- file_path = os.path.join(THETA_RHO_DIR, file_name)
|
|
|
-
|
|
|
- if not os.path.exists(file_path):
|
|
|
- return jsonify({"success": False, "error": "File not found"}), 404
|
|
|
-
|
|
|
- try:
|
|
|
- os.remove(file_path)
|
|
|
- return jsonify({"success": True})
|
|
|
- except Exception as e:
|
|
|
- return jsonify({"success": False, "error": str(e)}), 500
|
|
|
-
|
|
|
-@app.route('/move_to_center', methods=['POST'])
|
|
|
-def move_to_center():
|
|
|
- """Move the sand table to the center position."""
|
|
|
- try:
|
|
|
- if ser is None or not ser.is_open:
|
|
|
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
|
|
|
-
|
|
|
- coordinates = [(0, 0)] # Center position
|
|
|
- send_coordinate_batch(ser, coordinates)
|
|
|
- return jsonify({"success": True})
|
|
|
- except Exception as e:
|
|
|
- return jsonify({"success": False, "error": str(e)}), 500
|
|
|
-
|
|
|
-@app.route('/move_to_perimeter', methods=['POST'])
|
|
|
-def move_to_perimeter():
|
|
|
- """Move the sand table to the perimeter position."""
|
|
|
- try:
|
|
|
- if ser is None or not ser.is_open:
|
|
|
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
|
|
|
-
|
|
|
- MAX_RHO = 1
|
|
|
- coordinates = [(0, MAX_RHO)] # Perimeter position
|
|
|
- send_coordinate_batch(ser, coordinates)
|
|
|
- return jsonify({"success": True})
|
|
|
- except Exception as e:
|
|
|
- return jsonify({"success": False, "error": str(e)}), 500
|
|
|
-
|
|
|
@app.route('/preview_thr', methods=['POST'])
|
|
|
-def preview_thr():
|
|
|
+def api_preview_thr():
|
|
|
file_name = request.json.get('file_name')
|
|
|
-
|
|
|
if not file_name:
|
|
|
return jsonify({'error': 'No file name provided'}), 400
|
|
|
|
|
|
@@ -685,20 +135,13 @@ def preview_thr():
|
|
|
return jsonify({'error': 'File not found'}), 404
|
|
|
|
|
|
try:
|
|
|
- # Parse the .thr file with transformations
|
|
|
coordinates = parse_theta_rho_file(file_path)
|
|
|
return jsonify({'success': True, 'coordinates': coordinates})
|
|
|
except Exception as e:
|
|
|
return jsonify({'error': str(e)}), 500
|
|
|
|
|
|
-
|
|
|
@app.route('/send_coordinate', methods=['POST'])
|
|
|
-def send_coordinate():
|
|
|
- """Send a single (theta, rho) coordinate to the Arduino."""
|
|
|
- global ser
|
|
|
- if ser is None or not ser.is_open:
|
|
|
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
|
|
|
-
|
|
|
+def api_send_coordinate():
|
|
|
try:
|
|
|
data = request.json
|
|
|
theta = data.get('theta')
|
|
|
@@ -707,549 +150,220 @@ def send_coordinate():
|
|
|
if theta is None or rho is None:
|
|
|
return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
|
|
|
|
|
|
- # Send the coordinate to the Arduino
|
|
|
- send_coordinate_batch(ser, [(theta, rho)])
|
|
|
- reset_theta()
|
|
|
+ send_coordinate_batch([(theta, rho)])
|
|
|
return jsonify({"success": True})
|
|
|
except Exception as e:
|
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
-# Expose files for download if needed
|
|
|
-@app.route('/download/<filename>', methods=['GET'])
|
|
|
-def download_file(filename):
|
|
|
- """Download a file from the theta-rho directory."""
|
|
|
- return send_from_directory(THETA_RHO_DIR, filename)
|
|
|
-
|
|
|
-@app.route('/serial_status', methods=['GET'])
|
|
|
-def serial_status():
|
|
|
- global ser, ser_port
|
|
|
- return jsonify({
|
|
|
- 'connected': ser.is_open if ser else False,
|
|
|
- 'port': ser_port # Include the port name
|
|
|
- })
|
|
|
-
|
|
|
-@app.route('/pause_execution', methods=['POST'])
|
|
|
-def pause_execution():
|
|
|
- """Pause the current execution."""
|
|
|
- global pause_requested
|
|
|
- with pause_condition:
|
|
|
- pause_requested = True
|
|
|
- return jsonify({'success': True, 'message': 'Execution paused'})
|
|
|
-
|
|
|
-@app.route('/status', methods=['GET'])
|
|
|
-def get_status():
|
|
|
- """Returns the current status of the sand table."""
|
|
|
- global is_clearing
|
|
|
- if current_playing_file in CLEAR_PATTERNS.values():
|
|
|
- is_clearing = True
|
|
|
- else:
|
|
|
- is_clearing = False
|
|
|
-
|
|
|
- return jsonify({
|
|
|
- "ser_port": ser_port,
|
|
|
- "stop_requested": stop_requested,
|
|
|
- "pause_requested": pause_requested,
|
|
|
- "current_playing_file": current_playing_file,
|
|
|
- "execution_progress": execution_progress,
|
|
|
- "current_playing_index": current_playing_index,
|
|
|
- "current_playlist": current_playlist,
|
|
|
- "is_clearing": is_clearing
|
|
|
- })
|
|
|
-
|
|
|
-@app.route('/resume_execution', methods=['POST'])
|
|
|
-def resume_execution():
|
|
|
- """Resume execution after pausing."""
|
|
|
- global pause_requested
|
|
|
- with pause_condition:
|
|
|
- pause_requested = False
|
|
|
- pause_condition.notify_all() # Unblock the waiting thread
|
|
|
- return jsonify({'success': True, 'message': 'Execution resumed'})
|
|
|
-
|
|
|
-def load_playlists():
|
|
|
- """
|
|
|
- Load the entire playlists dictionary from the JSON file.
|
|
|
- Returns something like: {
|
|
|
- "My Playlist": ["file1.thr", "file2.thr"],
|
|
|
- "Another": ["x.thr"]
|
|
|
- }
|
|
|
- """
|
|
|
- with open(PLAYLISTS_FILE, "r") as f:
|
|
|
- return json.load(f)
|
|
|
-
|
|
|
-def save_playlists(playlists_dict):
|
|
|
- """
|
|
|
- Save the entire playlists dictionary back to the JSON file.
|
|
|
- """
|
|
|
- with open(PLAYLISTS_FILE, "w") as f:
|
|
|
- json.dump(playlists_dict, f, indent=2)
|
|
|
-
|
|
|
+# Playlist Routes
|
|
|
@app.route("/list_all_playlists", methods=["GET"])
|
|
|
-def list_all_playlists():
|
|
|
- """
|
|
|
- Returns a list of all playlist names.
|
|
|
- Example return: ["My Playlist", "Another Playlist"]
|
|
|
- """
|
|
|
- playlists_dict = load_playlists()
|
|
|
- playlist_names = list(playlists_dict.keys())
|
|
|
+def api_list_all_playlists():
|
|
|
+ playlist_names = list_all_playlists()
|
|
|
return jsonify(playlist_names)
|
|
|
|
|
|
@app.route("/get_playlist", methods=["GET"])
|
|
|
-def get_playlist():
|
|
|
- """
|
|
|
- GET /get_playlist?name=My%20Playlist
|
|
|
- Returns: { "name": "My Playlist", "files": [... ] }
|
|
|
- """
|
|
|
+def api_get_playlist():
|
|
|
playlist_name = request.args.get("name", "")
|
|
|
if not playlist_name:
|
|
|
return jsonify({"error": "Missing playlist 'name' parameter"}), 400
|
|
|
|
|
|
- playlists_dict = load_playlists()
|
|
|
- if playlist_name not in playlists_dict:
|
|
|
+ playlist = get_playlist(playlist_name)
|
|
|
+ if not playlist:
|
|
|
return jsonify({"error": f"Playlist '{playlist_name}' not found"}), 404
|
|
|
|
|
|
- files = playlists_dict[playlist_name] # e.g. ["file1.thr", "file2.thr"]
|
|
|
- return jsonify({
|
|
|
- "name": playlist_name,
|
|
|
- "files": files
|
|
|
- })
|
|
|
+ return jsonify(playlist)
|
|
|
|
|
|
@app.route("/create_playlist", methods=["POST"])
|
|
|
-def create_playlist():
|
|
|
- """
|
|
|
- POST /create_playlist
|
|
|
- Body: { "name": "My Playlist", "files": ["file1.thr", "file2.thr"] }
|
|
|
- Creates or overwrites a playlist with the given name.
|
|
|
- """
|
|
|
+def api_create_playlist():
|
|
|
data = request.get_json()
|
|
|
if not data or "name" not in data or "files" not in data:
|
|
|
return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
|
|
|
|
|
|
- playlist_name = data["name"]
|
|
|
- files = data["files"]
|
|
|
-
|
|
|
- # Load all playlists
|
|
|
- playlists_dict = load_playlists()
|
|
|
-
|
|
|
- # Overwrite or create new
|
|
|
- playlists_dict[playlist_name] = files
|
|
|
-
|
|
|
- # Save changes
|
|
|
- save_playlists(playlists_dict)
|
|
|
-
|
|
|
+ success = create_playlist(data["name"], data["files"])
|
|
|
return jsonify({
|
|
|
- "success": True,
|
|
|
- "message": f"Playlist '{playlist_name}' created/updated"
|
|
|
+ "success": success,
|
|
|
+ "message": f"Playlist '{data['name']}' created/updated"
|
|
|
})
|
|
|
|
|
|
@app.route("/modify_playlist", methods=["POST"])
|
|
|
-def modify_playlist():
|
|
|
- """
|
|
|
- POST /modify_playlist
|
|
|
- Body: { "name": "My Playlist", "files": ["file1.thr", "file2.thr"] }
|
|
|
- Updates (or creates) the existing playlist with a new file list.
|
|
|
- You can 404 if you only want to allow modifications to existing playlists.
|
|
|
- """
|
|
|
+def api_modify_playlist():
|
|
|
data = request.get_json()
|
|
|
if not data or "name" not in data or "files" not in data:
|
|
|
return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
|
|
|
|
|
|
- playlist_name = data["name"]
|
|
|
- files = data["files"]
|
|
|
-
|
|
|
- # Load all playlists
|
|
|
- playlists_dict = load_playlists()
|
|
|
-
|
|
|
- # Optional: If you want to disallow creating a new playlist here:
|
|
|
- # if playlist_name not in playlists_dict:
|
|
|
- # return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
|
|
|
-
|
|
|
- # Overwrite or create new
|
|
|
- playlists_dict[playlist_name] = files
|
|
|
-
|
|
|
- # Save
|
|
|
- save_playlists(playlists_dict)
|
|
|
-
|
|
|
- return jsonify({"success": True, "message": f"Playlist '{playlist_name}' updated"})
|
|
|
+ success = modify_playlist(data["name"], data["files"])
|
|
|
+ return jsonify({
|
|
|
+ "success": success,
|
|
|
+ "message": f"Playlist '{data['name']}' updated"
|
|
|
+ })
|
|
|
|
|
|
@app.route("/delete_playlist", methods=["DELETE"])
|
|
|
-def delete_playlist():
|
|
|
- """
|
|
|
- DELETE /delete_playlist
|
|
|
- Body: { "name": "My Playlist" }
|
|
|
- Removes the playlist from the single JSON file.
|
|
|
- """
|
|
|
+def api_delete_playlist():
|
|
|
data = request.get_json()
|
|
|
if not data or "name" not in data:
|
|
|
return jsonify({"success": False, "error": "Missing 'name' field"}), 400
|
|
|
|
|
|
- playlist_name = data["name"]
|
|
|
-
|
|
|
- playlists_dict = load_playlists()
|
|
|
- if playlist_name not in playlists_dict:
|
|
|
- return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
|
|
|
-
|
|
|
- # Remove from dict
|
|
|
- del playlists_dict[playlist_name]
|
|
|
- save_playlists(playlists_dict)
|
|
|
+ success = delete_playlist(data["name"])
|
|
|
+ if not success:
|
|
|
+ return jsonify({"success": False, "error": f"Playlist '{data['name']}' not found"}), 404
|
|
|
|
|
|
return jsonify({
|
|
|
"success": True,
|
|
|
- "message": f"Playlist '{playlist_name}' deleted"
|
|
|
+ "message": f"Playlist '{data['name']}' deleted"
|
|
|
})
|
|
|
|
|
|
@app.route('/add_to_playlist', methods=['POST'])
|
|
|
-def add_to_playlist():
|
|
|
+def api_add_to_playlist():
|
|
|
data = request.json
|
|
|
playlist_name = data.get('playlist_name')
|
|
|
pattern = data.get('pattern')
|
|
|
|
|
|
- # Load existing playlists
|
|
|
- with open('playlists.json', 'r') as f:
|
|
|
- playlists = json.load(f)
|
|
|
-
|
|
|
- # Add pattern to the selected playlist
|
|
|
- if playlist_name in playlists:
|
|
|
- playlists[playlist_name].append(pattern)
|
|
|
- with open('playlists.json', 'w') as f:
|
|
|
- json.dump(playlists, f)
|
|
|
+ success = add_to_playlist(playlist_name, pattern)
|
|
|
+ if success:
|
|
|
return jsonify(success=True)
|
|
|
else:
|
|
|
return jsonify(success=False, error='Playlist not found'), 404
|
|
|
|
|
|
@app.route("/run_playlist", methods=["POST"])
|
|
|
-def run_playlist():
|
|
|
- """
|
|
|
- POST /run_playlist
|
|
|
- Body (JSON):
|
|
|
- {
|
|
|
- "playlist_name": "My Playlist",
|
|
|
- "pause_time": 1.0, # Optional: seconds to pause between patterns
|
|
|
- "clear_pattern": "random", # Optional: "clear_in", "clear_out", "clear_sideway", or "random"
|
|
|
- "run_mode": "single", # 'single' or 'indefinite'
|
|
|
- "shuffle": True # true or false
|
|
|
- "start_time": ""
|
|
|
- "end_time": ""
|
|
|
- }
|
|
|
- """
|
|
|
+def api_run_playlist():
|
|
|
data = request.get_json()
|
|
|
-
|
|
|
- # Validate input
|
|
|
if not data or "playlist_name" not in data:
|
|
|
return jsonify({"success": False, "error": "Missing 'playlist_name' field"}), 400
|
|
|
|
|
|
- playlist_name = data["playlist_name"]
|
|
|
- pause_time = data.get("pause_time", 0)
|
|
|
- clear_pattern = data.get("clear_pattern", None)
|
|
|
- run_mode = data.get("run_mode", "single") # Default to 'single' run
|
|
|
- shuffle = data.get("shuffle", False) # Default to no shuffle
|
|
|
- start_time = data.get("start_time", None)
|
|
|
- end_time = data.get("end_time", None)
|
|
|
-
|
|
|
- # Validate pause_time
|
|
|
- if not isinstance(pause_time, (int, float)) or pause_time < 0:
|
|
|
- return jsonify({"success": False, "error": "'pause_time' must be a non-negative number"}), 400
|
|
|
-
|
|
|
- # Validate clear_pattern
|
|
|
- valid_patterns = ["clear_in", "clear_out", "clear_sideway", "random"]
|
|
|
- if clear_pattern not in valid_patterns:
|
|
|
- clear_pattern = None
|
|
|
-
|
|
|
- # Validate run_mode
|
|
|
- if run_mode not in ["single", "indefinite"]:
|
|
|
- return jsonify({"success": False, "error": "'run_mode' must be 'single' or 'indefinite'"}), 400
|
|
|
-
|
|
|
- # Validate shuffle
|
|
|
- if not isinstance(shuffle, bool):
|
|
|
- return jsonify({"success": False, "error": "'shuffle' must be a boolean value"}), 400
|
|
|
+ playlist = get_playlist(data["playlist_name"])
|
|
|
+ if not playlist:
|
|
|
+ return jsonify({"success": False, "error": f"Playlist '{data['playlist_name']}' not found"}), 404
|
|
|
|
|
|
schedule_hours = None
|
|
|
+ start_time = data.get("start_time")
|
|
|
+ end_time = data.get("end_time")
|
|
|
if start_time and end_time:
|
|
|
try:
|
|
|
- # Convert HH:MM to datetime.time objects
|
|
|
start_time_obj = datetime.strptime(start_time, "%H:%M").time()
|
|
|
end_time_obj = datetime.strptime(end_time, "%H:%M").time()
|
|
|
-
|
|
|
- # Ensure start_time is before end_time
|
|
|
if start_time_obj >= end_time_obj:
|
|
|
return jsonify({"success": False, "error": "'start_time' must be earlier than 'end_time'"}), 400
|
|
|
-
|
|
|
- # Create schedule tuple with full time
|
|
|
schedule_hours = (start_time_obj, end_time_obj)
|
|
|
except ValueError:
|
|
|
return jsonify({"success": False, "error": "Invalid time format. Use HH:MM (e.g., '09:30')"}), 400
|
|
|
|
|
|
-
|
|
|
- # Load playlists
|
|
|
- playlists = load_playlists()
|
|
|
-
|
|
|
- if playlist_name not in playlists:
|
|
|
- return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
|
|
|
-
|
|
|
- file_paths = playlists[playlist_name]
|
|
|
- file_paths = [os.path.join(THETA_RHO_DIR, file) for file in file_paths]
|
|
|
-
|
|
|
+ file_paths = [os.path.join(THETA_RHO_DIR, file) for file in playlist["files"]]
|
|
|
if not file_paths:
|
|
|
- return jsonify({"success": False, "error": f"Playlist '{playlist_name}' is empty"}), 400
|
|
|
+ return jsonify({"success": False, "error": f"Playlist '{data['playlist_name']}' is empty"}), 400
|
|
|
|
|
|
- # Start the playlist execution in a separate thread
|
|
|
try:
|
|
|
threading.Thread(
|
|
|
target=run_theta_rho_files,
|
|
|
args=(file_paths,),
|
|
|
kwargs={
|
|
|
- 'pause_time': pause_time,
|
|
|
- 'clear_pattern': clear_pattern,
|
|
|
- 'run_mode': run_mode,
|
|
|
- 'shuffle': shuffle,
|
|
|
+ 'pause_time': data.get("pause_time", 0),
|
|
|
+ 'clear_pattern': data.get("clear_pattern"),
|
|
|
+ 'run_mode': data.get("run_mode", "single"),
|
|
|
+ 'shuffle': data.get("shuffle", False),
|
|
|
'schedule_hours': schedule_hours
|
|
|
},
|
|
|
- daemon=True # Daemonize thread to exit with the main program
|
|
|
+ daemon=True
|
|
|
).start()
|
|
|
- return jsonify({"success": True, "message": f"Playlist '{playlist_name}' is now running."})
|
|
|
+ return jsonify({"success": True, "message": f"Playlist '{data['playlist_name']}' is now running."})
|
|
|
except Exception as e:
|
|
|
return jsonify({"success": False, "error": str(e)}), 500
|
|
|
|
|
|
-@app.route('/set_speed', methods=['POST'])
|
|
|
-def set_speed():
|
|
|
- """Set the speed for the Arduino."""
|
|
|
- global ser
|
|
|
- if ser is None or not ser.is_open:
|
|
|
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
|
|
|
-
|
|
|
- try:
|
|
|
- # Parse the speed value from the request
|
|
|
- data = request.json
|
|
|
- speed = data.get('speed')
|
|
|
+# Execution Control Routes
|
|
|
+@app.route('/stop_execution', methods=['POST'])
|
|
|
+def api_stop_execution():
|
|
|
+ stop_execution()
|
|
|
+ return jsonify({'success': True})
|
|
|
|
|
|
- if speed is None:
|
|
|
- return jsonify({"success": False, "error": "Speed is required"}), 400
|
|
|
+@app.route('/pause_execution', methods=['POST'])
|
|
|
+def api_pause_execution():
|
|
|
+ pause_execution()
|
|
|
+ return jsonify({'success': True, 'message': 'Execution paused'})
|
|
|
|
|
|
- if not isinstance(speed, (int, float)) or speed <= 0:
|
|
|
- return jsonify({"success": False, "error": "Invalid speed value"}), 400
|
|
|
+@app.route('/resume_execution', methods=['POST'])
|
|
|
+def api_resume_execution():
|
|
|
+ resume_execution()
|
|
|
+ return jsonify({'success': True, 'message': 'Execution resumed'})
|
|
|
|
|
|
- # Send the SET_SPEED command to the Arduino
|
|
|
- command = f"SET_SPEEzD {speed}"
|
|
|
- send_command(command)
|
|
|
- return jsonify({"success": True, "speed": speed})
|
|
|
- except Exception as e:
|
|
|
- return jsonify({"success": False, "error": str(e)}), 500
|
|
|
+@app.route('/status', methods=['GET'])
|
|
|
+def api_get_status():
|
|
|
+ return jsonify(get_execution_status())
|
|
|
|
|
|
+# Firmware Routes
|
|
|
@app.route('/get_firmware_info', methods=['GET', 'POST'])
|
|
|
-def get_firmware_info():
|
|
|
- """
|
|
|
- Compare the installed firmware version and motor type with the one in the .ino file.
|
|
|
- """
|
|
|
- global firmware_version, arduino_driver_type, ser
|
|
|
-
|
|
|
- if ser is None or not ser.is_open:
|
|
|
- return jsonify({"success": False, "error": "Arduino not connected or serial port not open"}), 400
|
|
|
-
|
|
|
- try:
|
|
|
- if request.method == "GET":
|
|
|
- # Attempt to retrieve installed firmware details from the Arduino
|
|
|
- ser.reset_input_buffer()
|
|
|
- ser.reset_output_buffer()
|
|
|
- ser.write(b"GET_VERSION\n")
|
|
|
- time.sleep(0.5)
|
|
|
-
|
|
|
- installed_version = firmware_version
|
|
|
- installed_type = arduino_driver_type
|
|
|
-
|
|
|
- # If Arduino provides valid details, proceed with comparison
|
|
|
- if installed_version != 'Unknown' and installed_type != 'Unknown':
|
|
|
- ino_path = MOTOR_TYPE_MAPPING.get(installed_type)
|
|
|
- firmware_details = get_ino_firmware_details(ino_path)
|
|
|
-
|
|
|
- if not firmware_details or not firmware_details.get("version") or not firmware_details.get("motorType"):
|
|
|
- return jsonify({"success": False, "error": "Failed to retrieve .ino firmware details"}), 500
|
|
|
-
|
|
|
- update_available = (
|
|
|
- installed_version != firmware_details["version"] or
|
|
|
- installed_type != firmware_details["motorType"]
|
|
|
- )
|
|
|
-
|
|
|
- return jsonify({
|
|
|
- "success": True,
|
|
|
- "installedVersion": installed_version,
|
|
|
- "installedType": installed_type,
|
|
|
- "inoVersion": firmware_details["version"],
|
|
|
- "inoType": firmware_details["motorType"],
|
|
|
- "updateAvailable": update_available
|
|
|
- })
|
|
|
-
|
|
|
- # If Arduino details are unknown, indicate the need for POST
|
|
|
- return jsonify({
|
|
|
- "success": True,
|
|
|
- "installedVersion": installed_version,
|
|
|
- "installedType": installed_type,
|
|
|
- "updateAvailable": False
|
|
|
- })
|
|
|
-
|
|
|
- elif request.method == "POST":
|
|
|
- motor_type = request.json.get("motorType", None)
|
|
|
- if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": "Invalid or missing motor type"
|
|
|
- }), 400
|
|
|
-
|
|
|
- # Fetch firmware details for the given motor type
|
|
|
- ino_path = MOTOR_TYPE_MAPPING[motor_type]
|
|
|
- firmware_details = get_ino_firmware_details(ino_path)
|
|
|
-
|
|
|
- if not firmware_details:
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": "Failed to retrieve .ino firmware details"
|
|
|
- }), 500
|
|
|
-
|
|
|
- return jsonify({
|
|
|
- "success": True,
|
|
|
- "installedVersion": 'Unknown',
|
|
|
- "installedType": motor_type,
|
|
|
- "inoVersion": firmware_details["version"],
|
|
|
- "inoType": firmware_details["motorType"],
|
|
|
- "updateAvailable": True
|
|
|
- })
|
|
|
+def api_get_firmware_info():
|
|
|
+ device_info = get_device_info()
|
|
|
+ if request.method == "POST":
|
|
|
+ motor_type = request.json.get("motorType")
|
|
|
+ info, error = get_firmware_info(
|
|
|
+ device_info['firmware_version'],
|
|
|
+ device_info['driver_type'],
|
|
|
+ motor_type
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ info, error = get_firmware_info(
|
|
|
+ device_info['firmware_version'],
|
|
|
+ device_info['driver_type']
|
|
|
+ )
|
|
|
|
|
|
- except Exception as e:
|
|
|
- return jsonify({"success": False, "error": str(e)}), 500
|
|
|
+ if error:
|
|
|
+ return jsonify({"success": False, "error": error}), 500
|
|
|
+ return jsonify(info)
|
|
|
|
|
|
@app.route('/flash_firmware', methods=['POST'])
|
|
|
-def flash_firmware():
|
|
|
- """
|
|
|
- Compile and flash the firmware to the connected Arduino.
|
|
|
- """
|
|
|
- global ser_port
|
|
|
-
|
|
|
- # Ensure the Arduino is connected
|
|
|
- if ser_port is None or ser is None or not ser.is_open:
|
|
|
+def api_flash_firmware():
|
|
|
+ status = get_serial_status()
|
|
|
+ if not status['connected']:
|
|
|
return jsonify({"success": False, "error": "No Arduino connected or connection lost"}), 400
|
|
|
|
|
|
- build_dir = "/tmp/arduino_build" # Temporary build directory
|
|
|
-
|
|
|
- try:
|
|
|
- data = request.json
|
|
|
- motor_type = data.get("motorType", None)
|
|
|
-
|
|
|
- # Validate motor type
|
|
|
- if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
|
|
|
- return jsonify({"success": False, "error": "Invalid or missing motor type"}), 400
|
|
|
-
|
|
|
- # Get the .ino file path based on the motor type
|
|
|
- ino_file_path = MOTOR_TYPE_MAPPING[motor_type]
|
|
|
- ino_file_name = os.path.basename(ino_file_path)
|
|
|
-
|
|
|
- # Install required libraries
|
|
|
- required_libraries = ["AccelStepper"] # AccelStepper includes MultiStepper
|
|
|
- for library in required_libraries:
|
|
|
- library_install_command = ["arduino-cli", "lib", "install", library]
|
|
|
- install_process = subprocess.run(library_install_command, capture_output=True, text=True)
|
|
|
- if install_process.returncode != 0:
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": f"Library installation failed for {library}: {install_process.stderr}"
|
|
|
- }), 500
|
|
|
-
|
|
|
- # Step 1: Compile the .ino file to a .hex file
|
|
|
- compile_command = [
|
|
|
- "arduino-cli",
|
|
|
- "compile",
|
|
|
- "--fqbn", "arduino:avr:uno", # Use the detected FQBN
|
|
|
- "--output-dir", build_dir,
|
|
|
- ino_file_path
|
|
|
- ]
|
|
|
-
|
|
|
- compile_process = subprocess.run(compile_command, capture_output=True, text=True)
|
|
|
- if compile_process.returncode != 0:
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": compile_process.stderr
|
|
|
- }), 500
|
|
|
-
|
|
|
- # Step 2: Flash the .hex file to the Arduino
|
|
|
- hex_file_path = os.path.join(build_dir, ino_file_name+".hex")
|
|
|
- flash_command = [
|
|
|
- "avrdude",
|
|
|
- "-v",
|
|
|
- "-c", "arduino", # Programmer type
|
|
|
- "-p", "atmega328p", # Microcontroller type
|
|
|
- "-P", ser_port, # Use the dynamic serial port
|
|
|
- "-b", "115200", # Baud rate
|
|
|
- "-D",
|
|
|
- "-U", f"flash:w:{hex_file_path}:i" # Flash memory write command
|
|
|
- ]
|
|
|
-
|
|
|
- flash_process = subprocess.run(flash_command, capture_output=True, text=True)
|
|
|
- if flash_process.returncode != 0:
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": flash_process.stderr
|
|
|
- }), 500
|
|
|
-
|
|
|
- return jsonify({"success": True, "message": "Firmware flashed successfully"})
|
|
|
- except Exception as e:
|
|
|
- return jsonify({"success": False, "error": str(e)}), 500
|
|
|
- finally:
|
|
|
- # Clean up temporary files
|
|
|
- if os.path.exists(build_dir):
|
|
|
- for file in os.listdir(build_dir):
|
|
|
- os.remove(os.path.join(build_dir, file))
|
|
|
- os.rmdir(build_dir)
|
|
|
+ motor_type = request.json.get("motorType")
|
|
|
+ success, message = flash_firmware(status['port'], motor_type)
|
|
|
+
|
|
|
+ if success:
|
|
|
+ return jsonify({"success": True, "message": message})
|
|
|
+
|
|
|
+ app.logger.error(message)
|
|
|
+ return jsonify({"success": False, "error": message}), 500
|
|
|
|
|
|
@app.route('/check_software_update', methods=['GET'])
|
|
|
-def check_updates():
|
|
|
+def api_check_updates():
|
|
|
update_info = check_git_updates()
|
|
|
return jsonify(update_info)
|
|
|
|
|
|
@app.route('/update_software', methods=['POST'])
|
|
|
-def update_software():
|
|
|
- error_log = []
|
|
|
-
|
|
|
- def run_command(command, error_message):
|
|
|
- try:
|
|
|
- subprocess.run(command, check=True)
|
|
|
- except subprocess.CalledProcessError as e:
|
|
|
- print(f"{error_message}: {e}")
|
|
|
- error_log.append(error_message)
|
|
|
-
|
|
|
- # Fetch the latest version tag from remote
|
|
|
- try:
|
|
|
- subprocess.run(["git", "fetch", "--tags"], check=True)
|
|
|
- latest_remote_tag = subprocess.check_output(
|
|
|
- ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
|
|
|
- ).strip().decode()
|
|
|
- except subprocess.CalledProcessError as e:
|
|
|
- error_log.append(f"Failed to fetch tags or get latest remote tag: {e}")
|
|
|
+def api_update_software():
|
|
|
+ success, message, error_log = update_software()
|
|
|
+ if success:
|
|
|
+ return jsonify({"success": True})
|
|
|
return jsonify({
|
|
|
"success": False,
|
|
|
- "error": "Failed to fetch tags or determine the latest version.",
|
|
|
+ "error": message,
|
|
|
"details": error_log
|
|
|
}), 500
|
|
|
|
|
|
- # Checkout the latest tag
|
|
|
- run_command(["git", "checkout", latest_remote_tag, '--force'], f"Failed to checkout version {latest_remote_tag}")
|
|
|
+# File Management Routes
|
|
|
+@app.route('/download/<filename>', methods=['GET'])
|
|
|
+def download_file(filename):
|
|
|
+ return send_from_directory(THETA_RHO_DIR, filename)
|
|
|
+
|
|
|
+@app.route('/delete_theta_rho_file', methods=['POST'])
|
|
|
+def api_delete_theta_rho_file():
|
|
|
+ data = request.json
|
|
|
+ file_name = data.get('file_name')
|
|
|
|
|
|
- # Restart Docker containers
|
|
|
- run_command(["docker", "compose", "up", "-d"], "Failed to restart Docker containers")
|
|
|
+ if not file_name:
|
|
|
+ return jsonify({"success": False, "error": "No file name provided"}), 400
|
|
|
|
|
|
- # Check if the update was successful
|
|
|
- update_status = check_git_updates()
|
|
|
+ file_path = os.path.join(THETA_RHO_DIR, file_name)
|
|
|
+ if not os.path.exists(file_path):
|
|
|
+ return jsonify({"success": False, "error": "File not found"}), 404
|
|
|
|
|
|
- if (
|
|
|
- update_status["updates_available"] is False
|
|
|
- and update_status["latest_local_tag"] == update_status["latest_remote_tag"]
|
|
|
- ):
|
|
|
- # Update was successful
|
|
|
+ try:
|
|
|
+ os.remove(file_path)
|
|
|
return jsonify({"success": True})
|
|
|
- else:
|
|
|
- # Update failed; include the errors in the response
|
|
|
- return jsonify({
|
|
|
- "success": False,
|
|
|
- "error": "Update incomplete",
|
|
|
- "details": error_log
|
|
|
- }), 500
|
|
|
+ except Exception as e:
|
|
|
+ return jsonify({"success": False, "error": str(e)}), 500
|
|
|
+
|
|
|
if __name__ == '__main__':
|
|
|
# Auto-connect to serial
|
|
|
connect_to_serial()
|
|
|
- app.run(debug=False, host='0.0.0.0', port=8080)
|
|
|
+ app.run(debug=True, host='0.0.0.0', port=8080)
|