瀏覽代碼

Refactor base

Fabio De Simone 1 年之前
父節點
當前提交
7eb8689760
共有 42 個文件被更改,包括 1126 次插入1287 次删除
  1. 6 1287
      app.py
  2. 0 0
      dune_weaver_flask/__init__.py
  3. 407 0
      dune_weaver_flask/app.py
  4. 0 0
      dune_weaver_flask/dune_weaver/__init__.py
  5. 0 0
      dune_weaver_flask/dune_weaver/core/__init__.py
  6. 0 0
      dune_weaver_flask/dune_weaver/core/pattern_manager/__init__.py
  7. 279 0
      dune_weaver_flask/dune_weaver/core/pattern_manager/manager.py
  8. 0 0
      dune_weaver_flask/dune_weaver/core/playlist_manager/__init__.py
  9. 99 0
      dune_weaver_flask/dune_weaver/core/playlist_manager/manager.py
  10. 0 0
      dune_weaver_flask/dune_weaver/firmware/__init__.py
  11. 226 0
      dune_weaver_flask/dune_weaver/firmware/manager.py
  12. 0 0
      dune_weaver_flask/dune_weaver/serial/__init__.py
  13. 109 0
      dune_weaver_flask/dune_weaver/serial/manager.py
  14. 0 0
      dune_weaver_flask/firmware/arduino_code/arduino_code.ino
  15. 0 0
      dune_weaver_flask/firmware/arduino_code/arduino_code.ino.hex
  16. 0 0
      dune_weaver_flask/firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino
  17. 0 0
      dune_weaver_flask/firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino.hex
  18. 0 0
      dune_weaver_flask/firmware/esp32/esp32.ino
  19. 0 0
      dune_weaver_flask/firmware/esp32/esp32.ino.bin
  20. 0 0
      dune_weaver_flask/firmware/esp32_TMC2209/esp32_TMC2209.ino
  21. 0 0
      dune_weaver_flask/firmware/esp32_TMC2209/esp32_TMC2209.ino.bin
  22. 0 0
      dune_weaver_flask/static/IMG_7404.gif
  23. 0 0
      dune_weaver_flask/static/IMG_9753.png
  24. 0 0
      dune_weaver_flask/static/UI.png
  25. 0 0
      dune_weaver_flask/static/UI_1.3.png
  26. 0 0
      dune_weaver_flask/static/css/all.min.css
  27. 0 0
      dune_weaver_flask/static/css/style.css
  28. 0 0
      dune_weaver_flask/static/fontawesome.min.css
  29. 0 0
      dune_weaver_flask/static/icons/chevron-down.svg
  30. 0 0
      dune_weaver_flask/static/icons/chevron-left.svg
  31. 0 0
      dune_weaver_flask/static/icons/chevron-right.svg
  32. 0 0
      dune_weaver_flask/static/icons/chevron-up.svg
  33. 0 0
      dune_weaver_flask/static/icons/pause.svg
  34. 0 0
      dune_weaver_flask/static/icons/play.svg
  35. 0 0
      dune_weaver_flask/static/js/main.js
  36. 0 0
      dune_weaver_flask/static/webfonts/Roboto-Italic-VariableFont_wdth,wght.ttf
  37. 0 0
      dune_weaver_flask/static/webfonts/Roboto-VariableFont_wdth,wght.ttf
  38. 0 0
      dune_weaver_flask/static/webfonts/fa-regular-400.ttf
  39. 0 0
      dune_weaver_flask/static/webfonts/fa-regular-400.woff2
  40. 0 0
      dune_weaver_flask/static/webfonts/fa-solid-900.ttf
  41. 0 0
      dune_weaver_flask/static/webfonts/fa-solid-900.woff2
  42. 0 0
      dune_weaver_flask/templates/index.html

+ 6 - 1287
app.py

@@ -1,1289 +1,8 @@
-from flask import Flask, request, jsonify, render_template
-import atexit
-import os
-import serial
-import time
-import random
-import threading
-import serial.tools.list_ports
-import math
-import json
-from datetime import datetime
-import subprocess
-from tqdm import tqdm
+"""
+entrypoint for the flask app
+the actual source of the app resides in dune-weaver-flask
+"""
 
-app = Flask(__name__)
+from dune_weaver_flask import app
 
-# Configuration
-THETA_RHO_DIR = './patterns'
-IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
-CLEAR_PATTERNS = {
-    "clear_from_in":  "./patterns/clear_from_in.thr",
-    "clear_from_out": "./patterns/clear_from_out.thr",
-    "clear_sideway":  "./patterns/clear_sideway.thr"
-}
-os.makedirs(THETA_RHO_DIR, exist_ok=True)
-
-# Serial connection (First available will be selected by default)
-ser = None
-ser_port = None  # Global variable to store the serial port name
-stop_requested = False
-pause_requested = False
-pause_condition = threading.Condition()
-
-# Global variables to store device information
-arduino_table_name = None
-arduino_driver_type = 'Unknown'
-
-# Table status
-current_playing_file = None
-execution_progress = None
-firmware_version = 'Unknown'
-current_playing_index = None
-current_playlist = None
-is_clearing = False
-
-serial_lock = threading.RLock()
-
-PLAYLISTS_FILE = os.path.join(os.getcwd(), "playlists.json")
-
-MOTOR_TYPE_MAPPING = {
-    "TMC2209": "./firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino",
-    "DRV8825": "./firmware/arduino_code/arduino_code.ino",
-    "esp32": "./firmware/esp32/esp32.ino",
-    "esp32_TMC2209": "./firmware/esp32_TMC2209/esp32_TMC2209.ino"
-}
-
-# Ensure the file exists and contains at least an empty JSON object
-if not os.path.exists(PLAYLISTS_FILE):
-    with open(PLAYLISTS_FILE, "w") as f:
-        json.dump({}, f, indent=2)
-
-def get_ino_firmware_details(ino_file_path):
-    """
-    Extract firmware details, including version and motor type, from the given .ino file.
-
-    Args:
-        ino_file_path (str): Path to the .ino file.
-
-    Returns:
-        dict: Dictionary containing firmware details such as version and motor type, or None if not found.
-    """
-    try:
-        if not ino_file_path:
-            raise ValueError("Invalid path: ino_file_path is None or empty.")
-
-        firmware_details = {"version": None, "motorType": None}
-
-        with open(ino_file_path, "r") as file:
-            for line in file:
-                # Extract firmware version
-                if "firmwareVersion" in line:
-                    start = line.find('"') + 1
-                    end = line.rfind('"')
-                    if start != -1 and end != -1 and start < end:
-                        firmware_details["version"] = line[start:end]
-
-                # Extract motor type
-                if "motorType" in line:
-                    start = line.find('"') + 1
-                    end = line.rfind('"')
-                    if start != -1 and end != -1 and start < end:
-                        firmware_details["motorType"] = line[start:end]
-
-        if not firmware_details["version"]:
-            print(f"Firmware version not found in file: {ino_file_path}")
-        if not firmware_details["motorType"]:
-            print(f"Motor type not found in file: {ino_file_path}")
-
-        return firmware_details if any(firmware_details.values()) else None
-
-    except FileNotFoundError:
-        print(f"File not found: {ino_file_path}")
-        return None
-    except Exception as e:
-        print(f"Error reading .ino file: {str(e)}")
-        return None
-
-def check_git_updates():
-    try:
-        # Fetch the latest updates from the remote repository
-        subprocess.run(["git", "fetch", "--tags", "--force"], check=True)
-
-        # Get the latest tag from the remote
-        latest_remote_tag = subprocess.check_output(
-            ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
-        ).strip().decode()
-
-        # Get the latest tag from the local branch
-        latest_local_tag = subprocess.check_output(
-            ["git", "describe", "--tags", "--abbrev=0"]
-        ).strip().decode()
-
-        # Count how many tags the local branch is behind
-        tag_behind_count = 0
-        if latest_local_tag != latest_remote_tag:
-            tags = subprocess.check_output(
-                ["git", "tag", "--merged", "origin/main"], text=True
-            ).splitlines()
-
-            found_local = False
-            for tag in tags:
-                if tag == latest_local_tag:
-                    found_local = True
-                elif found_local:
-                    tag_behind_count += 1
-                    if tag == latest_remote_tag:
-                        break
-
-
-        # Check if there are new commits
-        updates_available = latest_remote_tag != latest_local_tag
-
-        return {
-            "updates_available": updates_available,
-            "tag_behind_count": tag_behind_count,  # Tags behind
-            "latest_remote_tag": latest_remote_tag,
-            "latest_local_tag": latest_local_tag,
-        }
-    except subprocess.CalledProcessError as e:
-        print(f"Error checking Git updates: {e}")
-        return {
-            "updates_available": False,
-            "tag_behind_count": 0,
-            "latest_remote_tag": None,
-            "latest_local_tag": None,
-        }
-
-
-
-def list_serial_ports():
-    """Return a list of available serial ports."""
-    ports = serial.tools.list_ports.comports()
-    return [port.device for port in ports if port.device not in IGNORE_PORTS]
-
-def connect_to_serial(port=None, baudrate=115200):
-    """Automatically connect to the first available serial port or a specified port."""
-    global ser, ser_port, arduino_table_name, arduino_driver_type, firmware_version
-
-    try:
-        if port is None:
-            ports = list_serial_ports()
-            if not ports:
-                print("No serial port connected")
-                return False
-            port = ports[0]  # Auto-select the first available port
-
-        with serial_lock:
-            if ser and ser.is_open:
-                ser.close()
-            ser = serial.Serial(port, baudrate, timeout=2)  # Set timeout to avoid infinite waits
-            ser_port = port  # Store the connected port globally
-
-        print(f"Connected to serial port: {port}")
-        time.sleep(2)  # Allow time for the connection to establish
-
-        # Read initial startup messages from Arduino
-        arduino_table_name = None
-        arduino_driver_type = None
-
-        while ser.in_waiting > 0:
-            line = ser.readline().decode().strip()
-            print(f"Arduino: {line}")  # Print the received message
-
-            # Store the device details based on the expected messages
-            if "Table:" in line:
-                arduino_table_name = line.replace("Table: ", "").strip()
-            elif "Drivers:" in line:
-                arduino_driver_type = line.replace("Drivers: ", "").strip()
-            elif "Version:" in line:
-                firmware_version = line.replace("Version: ", "").strip()
-
-        # Display stored values
-        print(f"Detected Table: {arduino_table_name or 'Unknown'}")
-        print(f"Detected Drivers: {arduino_driver_type or 'Unknown'}")
-
-        return True  # Successfully connected
-    except serial.SerialException as e:
-        print(f"Failed to connect to serial port {port}: {e}")
-        port = None  # Reset the port to try the next available one
-
-    print("Max retries reached. Could not connect to a serial port.")
-    return False
-
-def disconnect_serial():
-    """Disconnect the current serial connection."""
-    global ser, ser_port
-    if ser and ser.is_open:
-        ser.close()
-        ser = None
-    ser_port = None  # Reset the port name
-
-def restart_serial(port, baudrate=115200):
-    """Restart the serial connection."""
-    disconnect_serial()
-    connect_to_serial(port, baudrate)
-
-def parse_theta_rho_file(file_path):
-    """
-    Parse a theta-rho file and return a list of (theta, rho) pairs.
-    Normalizes the list so the first theta is always 0.
-    """
-    coordinates = []
-    try:
-        with open(file_path, 'r') as file:
-            for line in file:
-                line = line.strip()
-                # Skip header or comment lines (starting with '#' or empty lines)
-                if not line or line.startswith("#"):
-                    continue
-
-                # Parse lines with theta and rho separated by spaces
-                try:
-                    theta, rho = map(float, line.split())
-                    coordinates.append((theta, rho))
-                except ValueError:
-                    print(f"Skipping invalid line: {line}")
-                    continue
-    except Exception as e:
-        print(f"Error reading file: {e}")
-        return coordinates
-
-    # ---- Normalization Step ----
-    if coordinates:
-        # Take the first coordinate's theta
-        first_theta = coordinates[0][0]
-
-        # Shift all thetas so the first coordinate has theta=0
-        normalized = []
-        for (theta, rho) in coordinates:
-            normalized.append((theta - first_theta, rho))
-
-        # Replace original list with normalized data
-        coordinates = normalized
-
-    return coordinates
-
-def send_coordinate_batch(ser, coordinates):
-    """Send a batch of theta-rho pairs to the Arduino."""
-    # print("Sending batch:", coordinates)
-    batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
-    with serial_lock:
-        ser.write(batch_str.encode())
-
-def send_command(command):
-    """Send a single command to the Arduino."""
-    with serial_lock:
-        ser.write(f"{command}\n".encode())
-        print(f"Sent: {command}")
-
-        # Wait for "R" acknowledgment from Arduino
-        while True:
-            with serial_lock:
-                if ser.in_waiting > 0:
-                    response = ser.readline().decode().strip()
-                    print(f"Arduino response: {response}")
-                    if response == "R":
-                        print("Command execution completed.")
-                        break
-
-def wait_for_start_time(schedule_hours):
-    """
-    Keep checking every 30 seconds if the time is within the schedule to resume execution.
-    """
-    global pause_requested
-    start_time, end_time = schedule_hours
-
-    while pause_requested:
-        now = datetime.now().time()
-        if start_time <= now < end_time:
-            print("Resuming execution: Within schedule.")
-            pause_requested = False
-            with pause_condition:
-                pause_condition.notify_all()
-            break  # Exit the loop once resumed
-        else:
-            time.sleep(30)  # Wait for 30 seconds before checking again
-
-# Function to check schedule based on start and end time
-def schedule_checker(schedule_hours):
-    """
-    Pauses/resumes execution based on a given time range.
-
-    Parameters:
-    - schedule_hours (tuple): (start_time, end_time) as `datetime.time` objects.
-    """
-    global pause_requested
-    if not schedule_hours:
-        return  # No scheduling restriction
-
-    start_time, end_time = schedule_hours
-    now = datetime.now().time()  # Get the current time as `datetime.time`
-
-    # Check if we are currently within the scheduled time
-    if start_time <= now < end_time:
-        if pause_requested:
-            print("Starting execution: Within schedule.")
-        pause_requested = False  # Resume execution
-        with pause_condition:
-            pause_condition.notify_all()
-    else:
-        if not pause_requested:
-            print("Pausing execution: Outside schedule.")
-        pause_requested = True  # Pause execution
-
-        # Start a background thread to periodically check for start time
-        threading.Thread(target=wait_for_start_time, args=(schedule_hours,), daemon=True).start()
-
-def run_theta_rho_file(file_path, schedule_hours=None):
-    """Run a theta-rho file by sending data in optimized batches with tqdm ETA tracking."""
-    global stop_requested, current_playing_file, execution_progress
-
-    coordinates = parse_theta_rho_file(file_path)
-    total_coordinates = len(coordinates)
-
-    if total_coordinates < 2:
-        print("Not enough coordinates for interpolation.")
-        current_playing_file = None  # Clear tracking if failed
-        execution_progress = None
-        return
-
-    execution_progress = (0, total_coordinates, None)  # Initialize progress with ETA as None
-    batch_size = 10  # Smaller batches may smooth movement further
-    
-    # before trying to acuire the lock we send the stop command
-    # so then we will just wait for the lock to be released so we can use the serial
-    stop_actions()
-    with serial_lock:
-        current_playing_file = file_path  # Track current playing file
-        execution_progress = (0, 0, None)  # Reset progress (ETA starts as None)
-        stop_requested = False
-        with tqdm(total=total_coordinates, unit="coords", desc=f"Executing Pattern {file_path}", dynamic_ncols=True, disable=None) as pbar:
-            for i in range(0, total_coordinates, batch_size):
-                if stop_requested:
-                    print("Execution stopped by user after completing the current batch.")
-                    break
-
-                with pause_condition:
-                    while pause_requested:
-                        print("Execution paused...")
-                        pause_condition.wait()  # This will block execution until notified
-
-                batch = coordinates[i:i + batch_size]
-
-                if i == 0:
-                    send_coordinate_batch(ser, batch)
-                    execution_progress = (i + batch_size, total_coordinates, None)  # No ETA yet
-                    pbar.update(batch_size)
-                    continue
-
-                while True:
-                    schedule_checker(schedule_hours)  # Check if within schedule
-                    if ser.in_waiting > 0:
-                        response = ser.readline().decode().strip()
-                        if response == "R":
-                            send_coordinate_batch(ser, batch)
-                            pbar.update(batch_size)  # Update tqdm progress
-
-                            # Use tqdm's built-in ETA tracking
-                            estimated_remaining_time = pbar.format_dict['elapsed'] / (i + batch_size) * (total_coordinates - (i + batch_size))
-
-                            # Update execution progress with formatted ETA
-                            execution_progress = (i + batch_size, total_coordinates, estimated_remaining_time)
-                            break
-                        elif response != "IGNORED: FINISHED" and response.startswith("IGNORE"):  # Retry the previous batch
-                            print("Received IGNORE. Resending the previous batch...")
-                            print(response)
-                            # Calculate the previous batch indices
-                            prev_start = max(0, i - batch_size)  # Ensure we don't go below 0
-                            prev_end = i  # End of the previous batch is `i`
-                            previous_batch = coordinates[prev_start:prev_end]
-
-                            # Resend the previous batch
-                            send_coordinate_batch(ser, previous_batch)
-                            break  # Exit the retry loop after resending
-                        else:
-                            print(f"Arduino response: {response}")
-
-        reset_theta()
-        ser.write("FINISHED\n".encode())
-
-    # Clear tracking variables when done
-    current_playing_file = None
-    execution_progress = None
-    print("Pattern execution completed.")
-
-def get_clear_pattern_file(clear_pattern_mode, path=None):
-    """Return a .thr file path based on pattern_name."""
-    if not clear_pattern_mode or clear_pattern_mode == 'none':
-        return
-    print("Clear pattern mode: " + clear_pattern_mode)
-    if clear_pattern_mode == "random":
-        # Randomly pick one of the three known patterns
-        return random.choice(list(CLEAR_PATTERNS.values()))
-
-    if clear_pattern_mode == 'adaptive':
-        _, first_rho = parse_theta_rho_file(path)[0]
-        if first_rho < 0.5:
-            return CLEAR_PATTERNS['clear_from_out']
-        else:
-            return random.choice([CLEAR_PATTERNS['clear_from_in'], CLEAR_PATTERNS['clear_sideway']])
-    else:
-        return CLEAR_PATTERNS[clear_pattern_mode]
-
-def run_theta_rho_files(
-    file_paths,
-    pause_time=0,
-    clear_pattern=None,
-    run_mode="single",
-    shuffle=False,
-    schedule_hours=None
-):
-    """
-    Runs multiple .thr files in sequence with options for pausing, clearing, shuffling, and looping.
-
-    Parameters:
-    - file_paths (list): List of file paths to run.
-    - pause_time (float): Seconds to pause between patterns.
-    - clear_pattern (str): Specific clear pattern to run ("clear_from_in", "clear_from_out", "clear_sideway", "adaptive", or "random").
-    - run_mode (str): "single" for one-time run or "indefinite" for looping.
-    - shuffle (bool): Whether to shuffle the playlist before running.
-    """
-    global stop_requested
-    global current_playlist
-    global current_playing_index
-    stop_requested = False  # Reset stop flag at the start
-
-    if shuffle:
-        random.shuffle(file_paths)
-        print("Playlist shuffled.")
-
-    current_playlist = file_paths
-
-    while True:
-        for idx, path in enumerate(file_paths):
-            print("Upcoming pattern: " + path)
-            current_playing_index = idx
-            schedule_checker(schedule_hours)
-            if stop_requested:
-                print("Execution stopped before starting next pattern.")
-                return
-
-            if clear_pattern:
-                if stop_requested:
-                    print("Execution stopped before running the next clear pattern.")
-                    return
-
-                # Determine the clear pattern to run
-                clear_file_path = get_clear_pattern_file(clear_pattern, path)
-                print(f"Running clear pattern: {clear_file_path}")
-                run_theta_rho_file(clear_file_path, schedule_hours)
-
-            if not stop_requested:
-                # Run the main pattern
-                print(f"Running pattern {idx + 1} of {len(file_paths)}: {path}")
-                run_theta_rho_file(path, schedule_hours)
-
-            if idx < len(file_paths) -1:
-                if stop_requested:
-                    print("Execution stopped before running the next clear pattern.")
-                    return
-                # Pause after each pattern if requested
-                if pause_time > 0:
-                    print(f"Pausing for {pause_time} seconds...")
-                    time.sleep(pause_time)
-
-        # After completing the playlist
-        if run_mode == "indefinite":
-            print("Playlist completed. Restarting as per 'indefinite' run mode.")
-            if pause_time > 0:
-                print(f"Pausing for {pause_time} seconds before restarting...")
-                time.sleep(pause_time)
-            if shuffle:
-                random.shuffle(file_paths)
-                print("Playlist reshuffled for the next loop.")
-            continue
-        else:
-            print("Playlist completed.")
-            break
-
-    # Reset theta after execution or stopping
-    reset_theta()
-    with serial_lock:
-        ser.write("FINISHED\n".encode())
-        
-    print("All requested patterns completed (or stopped).")
-
-def reset_theta():
-    """Reset theta on the Arduino."""
-    with serial_lock:
-        ser.write("RESET_THETA\n".encode())
-        while True:
-            with serial_lock:
-                if ser.in_waiting > 0:
-                    response = ser.readline().decode().strip()
-                    print(f"Arduino response: {response}")
-                    if response == "THETA_RESET":
-                        print("Theta successfully reset.")
-                        break
-            time.sleep(0.5)  # Small delay to avoid busy waiting
-
-# Flask API Endpoints
-@app.route('/')
-def index():
-    return render_template('index.html')
-
-@app.route('/list_serial_ports', methods=['GET'])
-def list_ports():
-    return jsonify(list_serial_ports())
-
-@app.route('/connect_serial', methods=['POST'])
-def connect_serial():
-    port = request.json.get('port')
-    if not port:
-        return jsonify({'error': 'No port provided'}), 400
-
-    try:
-        connect_to_serial(port)
-        return jsonify({'success': True})
-    except Exception as e:
-        return jsonify({'error': str(e)}), 500
-
-@app.route('/disconnect_serial', methods=['POST'])
-def disconnect():
-    try:
-        disconnect_serial()
-        return jsonify({'success': True})
-    except Exception as e:
-        return jsonify({'error': str(e)}), 500
-
-@app.route('/restart_serial', methods=['POST'])
-def restart():
-    port = request.json.get('port')
-    if not port:
-        return jsonify({'error': 'No port provided'}), 400
-
-    try:
-        restart_serial(port)
-        return jsonify({'success': True})
-    except Exception as e:
-        return jsonify({'error': str(e)}), 500
-
-@app.route('/list_theta_rho_files', methods=['GET'])
-def list_theta_rho_files():
-    files = []
-    for root, _, filenames in os.walk(THETA_RHO_DIR):
-        for file in filenames:
-            # Construct the relative file path
-            relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
-            files.append(relative_path)
-    return jsonify(sorted(files))
-
-@app.route('/upload_theta_rho', methods=['POST'])
-def upload_theta_rho():
-    custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
-    os.makedirs(custom_patterns_dir, exist_ok=True)  # Ensure the directory exists
-
-    file = request.files['file']
-    if file:
-        file.save(os.path.join(custom_patterns_dir, file.filename))
-        return jsonify({'success': True})
-    return jsonify({'success': False})
-
-
-@app.route('/run_theta_rho', methods=['POST'])
-def run_theta_rho():
-    file_name = request.json.get('file_name')
-    pre_execution = request.json.get('pre_execution')
-
-    if not file_name:
-        return jsonify({'error': 'No file name provided'}), 400
-
-    file_path = os.path.join(THETA_RHO_DIR, file_name)
-    if not os.path.exists(file_path):
-        return jsonify({'error': 'File not found'}), 404
-
-    try:
-        # Build a list of files to run in sequence
-        files_to_run = []
-
-        # Finally, add the main file
-        files_to_run.append(file_path)
-
-        # Run them in one shot using run_theta_rho_files (blocking call)
-        threading.Thread(
-            target=run_theta_rho_files,
-            args=(files_to_run,),
-            kwargs={
-                'pause_time': 0,
-                'clear_pattern': pre_execution
-            }
-        ).start()
-        return jsonify({'success': True})
-
-    except Exception as e:
-        return jsonify({'error': str(e)}), 500
-
-def stop_actions():
-    global pause_requested
-    with pause_condition:
-        pause_requested = False
-        pause_condition.notify_all()
-        
-    global stop_requested, current_playing_index, current_playlist, is_clearing, current_playing_file, execution_progress
-    stop_requested = True
-    current_playing_index = None
-    current_playlist = None
-    is_clearing = False
-    current_playing_file = None
-    execution_progress = None
-
-@app.route('/stop_execution', methods=['POST'])
-def stop_execution():
-    stop_actions()
-    return jsonify({'success': True})
-
-@app.route('/send_home', methods=['POST'])
-def send_home():
-    """Send the HOME command to the Arduino."""
-    try:
-        send_command("HOME")
-        return jsonify({'success': True})
-    except Exception as e:
-        return jsonify({'error': str(e)}), 500
-
-@app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
-def run_specific_theta_rho_file(file_name):
-    """Run a specific theta-rho file."""
-    file_path = os.path.join(THETA_RHO_DIR, file_name)
-    if not os.path.exists(file_path):
-        return jsonify({'error': 'File not found'}), 404
-
-    threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
-    return jsonify({'success': True})
-
-@app.route('/delete_theta_rho_file', methods=['POST'])
-def delete_theta_rho_file():
-    data = request.json
-    file_name = data.get('file_name')
-
-    if not file_name:
-        return jsonify({"success": False, "error": "No file name provided"}), 400
-
-    file_path = os.path.join(THETA_RHO_DIR, file_name)
-
-    if not os.path.exists(file_path):
-        return jsonify({"success": False, "error": "File not found"}), 404
-
-    try:
-        os.remove(file_path)
-        return jsonify({"success": True})
-    except Exception as e:
-        return jsonify({"success": False, "error": str(e)}), 500
-
-@app.route('/move_to_center', methods=['POST'])
-def move_to_center():
-    """Move the sand table to the center position."""
-    try:
-        if ser is None or not ser.is_open:
-            return jsonify({"success": False, "error": "Serial connection not established"}), 400
-
-        coordinates = [(0, 0)]  # Center position
-        send_coordinate_batch(ser, coordinates)
-        return jsonify({"success": True})
-    except Exception as e:
-        return jsonify({"success": False, "error": str(e)}), 500
-
-@app.route('/move_to_perimeter', methods=['POST'])
-def move_to_perimeter():
-    """Move the sand table to the perimeter position."""
-    try:
-        if ser is None or not ser.is_open:
-            return jsonify({"success": False, "error": "Serial connection not established"}), 400
-
-        MAX_RHO = 1
-        coordinates = [(0, MAX_RHO)]  # Perimeter position
-        send_coordinate_batch(ser, coordinates)
-        return jsonify({"success": True})
-    except Exception as e:
-        return jsonify({"success": False, "error": str(e)}), 500
-
-@app.route('/preview_thr', methods=['POST'])
-def preview_thr():
-    file_name = request.json.get('file_name')
-
-    if not file_name:
-        return jsonify({'error': 'No file name provided'}), 400
-
-    file_path = os.path.join(THETA_RHO_DIR, file_name)
-    if not os.path.exists(file_path):
-        return jsonify({'error': 'File not found'}), 404
-
-    try:
-        # Parse the .thr file with transformations
-        coordinates = parse_theta_rho_file(file_path)
-        return jsonify({'success': True, 'coordinates': coordinates})
-    except Exception as e:
-        return jsonify({'error': str(e)}), 500
-
-
-@app.route('/send_coordinate', methods=['POST'])
-def send_coordinate():
-    """Send a single (theta, rho) coordinate to the Arduino."""
-    global ser
-    if ser is None or not ser.is_open:
-        return jsonify({"success": False, "error": "Serial connection not established"}), 400
-
-    try:
-        data = request.json
-        theta = data.get('theta')
-        rho = data.get('rho')
-
-        if theta is None or rho is None:
-            return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
-
-        # Send the coordinate to the Arduino
-        send_coordinate_batch(ser, [(theta, rho)])
-        return jsonify({"success": True})
-    except Exception as e:
-        return jsonify({"success": False, "error": str(e)}), 500
-
-# Expose files for download if needed
-@app.route('/download/<filename>', methods=['GET'])
-def download_file(filename):
-    """Download a file from the theta-rho directory."""
-    return send_from_directory(THETA_RHO_DIR, filename)
-
-@app.route('/serial_status', methods=['GET'])
-def serial_status():
-    global ser, ser_port
-    return jsonify({
-        'connected': ser.is_open if ser else False,
-        'port': ser_port  # Include the port name
-    })
-
-@app.route('/pause_execution', methods=['POST'])
-def pause_execution():
-    """Pause the current execution."""
-    global pause_requested
-    with pause_condition:
-        pause_requested = True
-    return jsonify({'success': True, 'message': 'Execution paused'})
-
-@app.route('/status', methods=['GET'])
-def get_status():
-    """Returns the current status of the sand table."""
-    global is_clearing
-    if current_playing_file in CLEAR_PATTERNS.values():
-        is_clearing = True
-    else:
-        is_clearing = False
-
-    return jsonify({
-        "ser_port": ser_port,
-        "stop_requested": stop_requested,
-        "pause_requested": pause_requested,
-        "current_playing_file": current_playing_file,
-        "execution_progress": execution_progress,
-        "current_playing_index": current_playing_index,
-        "current_playlist": current_playlist,
-        "is_clearing": is_clearing
-    })
-
-@app.route('/resume_execution', methods=['POST'])
-def resume_execution():
-    """Resume execution after pausing."""
-    global pause_requested
-    with pause_condition:
-        pause_requested = False
-        pause_condition.notify_all()  # Unblock the waiting thread
-    return jsonify({'success': True, 'message': 'Execution resumed'})
-
-def load_playlists():
-    """
-    Load the entire playlists dictionary from the JSON file.
-    Returns something like: {
-        "My Playlist": ["file1.thr", "file2.thr"],
-        "Another": ["x.thr"]
-    }
-    """
-    with open(PLAYLISTS_FILE, "r") as f:
-        return json.load(f)
-
-def save_playlists(playlists_dict):
-    """
-    Save the entire playlists dictionary back to the JSON file.
-    """
-    with open(PLAYLISTS_FILE, "w") as f:
-        json.dump(playlists_dict, f, indent=2)
-
-@app.route("/list_all_playlists", methods=["GET"])
-def list_all_playlists():
-    """
-    Returns a list of all playlist names.
-    Example return: ["My Playlist", "Another Playlist"]
-    """
-    playlists_dict = load_playlists()
-    playlist_names = list(playlists_dict.keys())
-    return jsonify(playlist_names)
-
-@app.route("/get_playlist", methods=["GET"])
-def get_playlist():
-    """
-    GET /get_playlist?name=My%20Playlist
-    Returns: { "name": "My Playlist", "files": [... ] }
-    """
-    playlist_name = request.args.get("name", "")
-    if not playlist_name:
-        return jsonify({"error": "Missing playlist 'name' parameter"}), 400
-
-    playlists_dict = load_playlists()
-    if playlist_name not in playlists_dict:
-        return jsonify({"error": f"Playlist '{playlist_name}' not found"}), 404
-
-    files = playlists_dict[playlist_name]  # e.g. ["file1.thr", "file2.thr"]
-    return jsonify({
-        "name": playlist_name,
-        "files": files
-    })
-
-@app.route("/create_playlist", methods=["POST"])
-def create_playlist():
-    """
-    POST /create_playlist
-    Body: { "name": "My Playlist", "files": ["file1.thr", "file2.thr"] }
-    Creates or overwrites a playlist with the given name.
-    """
-    data = request.get_json()
-    if not data or "name" not in data or "files" not in data:
-        return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
-
-    playlist_name = data["name"]
-    files = data["files"]
-
-    # Load all playlists
-    playlists_dict = load_playlists()
-
-    # Overwrite or create new
-    playlists_dict[playlist_name] = files
-
-    # Save changes
-    save_playlists(playlists_dict)
-
-    return jsonify({
-        "success": True,
-        "message": f"Playlist '{playlist_name}' created/updated"
-    })
-
-@app.route("/modify_playlist", methods=["POST"])
-def modify_playlist():
-    """
-    POST /modify_playlist
-    Body: { "name": "My Playlist", "files": ["file1.thr", "file2.thr"] }
-    Updates (or creates) the existing playlist with a new file list.
-    You can 404 if you only want to allow modifications to existing playlists.
-    """
-    data = request.get_json()
-    if not data or "name" not in data or "files" not in data:
-        return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
-
-    playlist_name = data["name"]
-    files = data["files"]
-
-    # Load all playlists
-    playlists_dict = load_playlists()
-
-    # Optional: If you want to disallow creating a new playlist here:
-    # if playlist_name not in playlists_dict:
-    #     return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
-
-    # Overwrite or create new
-    playlists_dict[playlist_name] = files
-
-    # Save
-    save_playlists(playlists_dict)
-
-    return jsonify({"success": True, "message": f"Playlist '{playlist_name}' updated"})
-
-@app.route("/delete_playlist", methods=["DELETE"])
-def delete_playlist():
-    """
-    DELETE /delete_playlist
-    Body: { "name": "My Playlist" }
-    Removes the playlist from the single JSON file.
-    """
-    data = request.get_json()
-    if not data or "name" not in data:
-        return jsonify({"success": False, "error": "Missing 'name' field"}), 400
-
-    playlist_name = data["name"]
-
-    playlists_dict = load_playlists()
-    if playlist_name not in playlists_dict:
-        return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
-
-    # Remove from dict
-    del playlists_dict[playlist_name]
-    save_playlists(playlists_dict)
-
-    return jsonify({
-        "success": True,
-        "message": f"Playlist '{playlist_name}' deleted"
-    })
-
-@app.route('/add_to_playlist', methods=['POST'])
-def add_to_playlist():
-    data = request.json
-    playlist_name = data.get('playlist_name')
-    pattern = data.get('pattern')
-
-    # Load existing playlists
-    with open('playlists.json', 'r') as f:
-        playlists = json.load(f)
-
-    # Add pattern to the selected playlist
-    if playlist_name in playlists:
-        playlists[playlist_name].append(pattern)
-        with open('playlists.json', 'w') as f:
-            json.dump(playlists, f)
-        return jsonify(success=True)
-    else:
-        return jsonify(success=False, error='Playlist not found'), 404
-
-@app.route("/run_playlist", methods=["POST"])
-def run_playlist():
-    """
-    POST /run_playlist
-    Body (JSON):
-    {
-        "playlist_name": "My Playlist",
-        "pause_time": 1.0,                # Optional: seconds to pause between patterns
-        "clear_pattern": "random",         # Optional: "clear_from_in", "clear_from_out", "clear_sideway", "adaptive" or "random"
-        "run_mode": "single",              # 'single' or 'indefinite'
-        "shuffle": True                    # true or false
-        "start_time": ""
-        "end_time": ""
-    }
-    """
-    data = request.get_json()
-
-    # Validate input
-    if not data or "playlist_name" not in data:
-        return jsonify({"success": False, "error": "Missing 'playlist_name' field"}), 400
-
-    playlist_name = data["playlist_name"]
-    pause_time = data.get("pause_time", 0)
-    clear_pattern = data.get("clear_pattern", None)
-    run_mode = data.get("run_mode", "single")  # Default to 'single' run
-    shuffle = data.get("shuffle", False)       # Default to no shuffle
-    start_time = data.get("start_time", None)
-    end_time = data.get("end_time", None)
-
-    # Validate pause_time
-    if not isinstance(pause_time, (int, float)) or pause_time < 0:
-        return jsonify({"success": False, "error": "'pause_time' must be a non-negative number"}), 400
-
-    # Validate clear_pattern
-    valid_patterns = ["clear_from_in", "clear_from_out", "clear_sideway", "random", "adaptive"]
-    if clear_pattern not in valid_patterns:
-        clear_pattern = None
-
-    # Validate run_mode
-    if run_mode not in ["single", "indefinite"]:
-        return jsonify({"success": False, "error": "'run_mode' must be 'single' or 'indefinite'"}), 400
-
-    # Validate shuffle
-    if not isinstance(shuffle, bool):
-        return jsonify({"success": False, "error": "'shuffle' must be a boolean value"}), 400
-
-    schedule_hours = None
-    if start_time and end_time:
-        try:
-            # Convert HH:MM to datetime.time objects
-            start_time_obj = datetime.strptime(start_time, "%H:%M").time()
-            end_time_obj = datetime.strptime(end_time, "%H:%M").time()
-
-            # Ensure start_time is before end_time
-            if start_time_obj >= end_time_obj:
-                return jsonify({"success": False, "error": "'start_time' must be earlier than 'end_time'"}), 400
-
-            # Create schedule tuple with full time
-            schedule_hours = (start_time_obj, end_time_obj)
-        except ValueError:
-            return jsonify({"success": False, "error": "Invalid time format. Use HH:MM (e.g., '09:30')"}), 400
-
-
-    # Load playlists
-    playlists = load_playlists()
-
-    if playlist_name not in playlists:
-        return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
-
-    file_paths = playlists[playlist_name]
-    file_paths = [os.path.join(THETA_RHO_DIR, file) for file in file_paths]
-
-    if not file_paths:
-        return jsonify({"success": False, "error": f"Playlist '{playlist_name}' is empty"}), 400
-
-    # Start the playlist execution in a separate thread
-    try:
-        threading.Thread(
-            target=run_theta_rho_files,
-            args=(file_paths,),
-            kwargs={
-                'pause_time': pause_time,
-                'clear_pattern': clear_pattern,
-                'run_mode': run_mode,
-                'shuffle': shuffle,
-                'schedule_hours': schedule_hours
-            },
-            daemon=True  # Daemonize thread to exit with the main program
-        ).start()
-        return jsonify({"success": True, "message": f"Playlist '{playlist_name}' is now running."})
-    except Exception as e:
-        return jsonify({"success": False, "error": str(e)}), 500
-
-@app.route('/set_speed', methods=['POST'])
-def set_speed():
-    """Set the speed for the Arduino."""
-    global ser
-    if ser is None or not ser.is_open:
-        return jsonify({"success": False, "error": "Serial connection not established"}), 400
-
-    try:
-        # Parse the speed value from the request
-        data = request.json
-        speed = data.get('speed')
-
-        if speed is None:
-            return jsonify({"success": False, "error": "Speed is required"}), 400
-
-        if not isinstance(speed, (int, float)) or speed <= 0:
-            return jsonify({"success": False, "error": "Invalid speed value"}), 400
-
-        # Send the SET_SPEED command to the Arduino
-        command = f"SET_SPEED {speed}"
-        send_command(command)
-        return jsonify({"success": True, "speed": speed})
-    except Exception as e:
-        return jsonify({"success": False, "error": str(e)}), 500
-
-@app.route('/get_firmware_info', methods=['GET', 'POST'])
-def get_firmware_info():
-    """
-    Compare the installed firmware version and motor type with the one in the .ino file.
-    """
-    global firmware_version, arduino_driver_type, ser
-
-    if ser is None or not ser.is_open:
-        return jsonify({"success": False, "error": "Arduino not connected or serial port not open"}), 400
-
-    try:
-        if request.method == "GET":
-            # Attempt to retrieve installed firmware details from the Arduino
-            time.sleep(0.5)
-
-            installed_version = firmware_version
-            installed_type = arduino_driver_type
-
-            # If Arduino provides valid details, proceed with comparison
-            if installed_version != 'Unknown' and installed_type != 'Unknown':
-                ino_path = MOTOR_TYPE_MAPPING.get(installed_type)
-                firmware_details = get_ino_firmware_details(ino_path)
-
-                if not firmware_details or not firmware_details.get("version") or not firmware_details.get("motorType"):
-                    return jsonify({"success": False, "error": "Failed to retrieve .ino firmware details"}), 500
-
-                update_available = (
-                    installed_version != firmware_details["version"] or
-                    installed_type != firmware_details["motorType"]
-                )
-
-                return jsonify({
-                    "success": True,
-                    "installedVersion": installed_version,
-                    "installedType": installed_type,
-                    "inoVersion": firmware_details["version"],
-                    "inoType": firmware_details["motorType"],
-                    "updateAvailable": update_available
-                })
-
-            # If Arduino details are unknown, indicate the need for POST
-            return jsonify({
-                "success": True,
-                "installedVersion": installed_version,
-                "installedType": installed_type,
-                "updateAvailable": False
-            })
-
-        elif request.method == "POST":
-            motor_type = request.json.get("motorType", None)
-            if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
-                return jsonify({
-                    "success": False,
-                    "error": "Invalid or missing motor type"
-                }), 400
-
-            # Fetch firmware details for the given motor type
-            ino_path = MOTOR_TYPE_MAPPING[motor_type]
-            firmware_details = get_ino_firmware_details(ino_path)
-
-            if not firmware_details:
-                return jsonify({
-                    "success": False,
-                    "error": "Failed to retrieve .ino firmware details"
-                }), 500
-
-            return jsonify({
-                "success": True,
-                "installedVersion": 'Unknown',
-                "installedType": motor_type,
-                "inoVersion": firmware_details["version"],
-                "inoType": firmware_details["motorType"],
-                "updateAvailable": True
-            })
-
-    except Exception as e:
-        return jsonify({"success": False, "error": str(e)}), 500
-
-@app.route('/flash_firmware', methods=['POST'])
-def flash_firmware():
-    """
-    Flash the pre-compiled firmware to the connected device (Arduino or ESP32).
-    """
-    global ser_port
-
-    # Ensure the device is connected
-    if ser_port is None or ser is None or not ser.is_open:
-        return jsonify({"success": False, "error": "No device connected or connection lost"}), 400
-
-    try:
-        data = request.json
-        motor_type = data.get("motorType", None)
-
-        # Validate motor type
-        if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
-            return jsonify({"success": False, "error": "Invalid or missing motor type"}), 400
-
-        # Determine the firmware file
-        ino_file_path = MOTOR_TYPE_MAPPING[motor_type]  # Path to .ino file
-        hex_file_path = f"{ino_file_path}.hex"
-        bin_file_path = f"{ino_file_path}.bin"  # For ESP32 firmware
-
-        # Check the device type
-        if motor_type.lower() in ["esp32", "esp32_tmc2209"]:
-            if not os.path.exists(bin_file_path):
-                return jsonify({"success": False, "error": f"Firmware binary not found: {bin_file_path}"}), 404
-
-            # Flash ESP32 firmware
-            flash_command = [
-                "esptool.py",
-                "--chip", "esp32",
-                "--port", ser_port,
-                "--baud", "115200",
-                "write_flash", "-z", "0x1000", bin_file_path
-            ]
-        else:
-            if not os.path.exists(hex_file_path):
-                return jsonify({"success": False, "error": f"Hex file not found: {hex_file_path}"}), 404
-
-            # Flash Arduino firmware
-            flash_command = [
-                "avrdude",
-                "-v",
-                "-c", "arduino",
-                "-p", "atmega328p",
-                "-P", ser_port,
-                "-b", "115200",
-                "-D",
-                "-U", f"flash:w:{hex_file_path}:i"
-            ]
-
-        # Execute the flash command
-        flash_process = subprocess.run(flash_command, capture_output=True, text=True)
-        if flash_process.returncode != 0:
-            return jsonify({
-                "success": False,
-                "error": flash_process.stderr
-            }), 500
-
-        return jsonify({"success": True, "message": "Firmware flashed successfully"})
-    except Exception as e:
-        return jsonify({"success": False, "error": str(e)}), 500
-
-@app.route('/check_software_update', methods=['GET'])
-def check_updates():
-    update_info = check_git_updates()
-    return jsonify(update_info)
-
-@app.route('/update_software', methods=['POST'])
-def update_software():
-    error_log = []
-
-    def run_command(command, error_message):
-        try:
-            subprocess.run(command, check=True)
-        except subprocess.CalledProcessError as e:
-            print(f"{error_message}: {e}")
-            error_log.append(error_message)
-
-    # Fetch the latest version tag from remote
-    try:
-        subprocess.run(["git", "fetch", "--tags"], check=True)
-        latest_remote_tag = subprocess.check_output(
-            ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
-        ).strip().decode()
-    except subprocess.CalledProcessError as e:
-        error_log.append(f"Failed to fetch tags or get latest remote tag: {e}")
-        return jsonify({
-            "success": False,
-            "error": "Failed to fetch tags or determine the latest version.",
-            "details": error_log
-        }), 500
-
-    # Checkout the latest tag
-    run_command(["git", "checkout", latest_remote_tag, '--force'], f"Failed to checkout version {latest_remote_tag}")
-
-    run_command(["docker", "compose", "pull"], "Failed to fetch Docker containers")
-
-    # Restart Docker containers
-    run_command(["docker", "compose", "up", "-d"], "Failed to restart Docker containers")
-
-    # Check if the update was successful
-    update_status = check_git_updates()
-
-    if (
-        update_status["updates_available"] is False
-        and update_status["latest_local_tag"] == update_status["latest_remote_tag"]
-    ):
-        # Update was successful
-        return jsonify({"success": True})
-    else:
-        # Update failed; include the errors in the response
-        return jsonify({
-            "success": False,
-            "error": "Update incomplete",
-            "details": error_log
-        }), 500
-
-
-def on_exit():
-    """Function to execute on application shutdown."""
-    print("Shutting down the application...")
-    stop_actions()
-    time.sleep(5)
-    print("Execution stopped and resources cleaned up.")
-
-# Register the on_exit function
-atexit.register(on_exit)
-        
-        
-if __name__ == '__main__':
-    # Auto-connect to serial
-    connect_to_serial()
-    try:
-        app.run(debug=False, host='0.0.0.0', port=8080)
-    except KeyboardInterrupt:
-        print("Keyboard interrupt received. Shutting down.")
-    finally:
-        on_exit()  # Ensure cleanup if app is interrupted
+app.entrypoint()

+ 0 - 0
dune_weaver_flask/__init__.py


+ 407 - 0
dune_weaver_flask/app.py

@@ -0,0 +1,407 @@
+from flask import Flask, request, jsonify, render_template, send_from_directory
+import atexit
+import os
+from datetime import datetime
+from .dune_weaver.serial.manager import serial_manager
+from .dune_weaver.core.pattern_manager.manager import pattern_manager
+from .dune_weaver.core.playlist_manager.manager import playlist_manager
+from .dune_weaver.firmware.manager import firmware_manager
+
+app = Flask(__name__)
+
+# Flask API Endpoints
+@app.route('/')
+def index():
+    return render_template('index.html')
+
+@app.route('/list_serial_ports', methods=['GET'])
+def list_ports():
+    return jsonify(serial_manager.list_serial_ports())
+
+@app.route('/connect_serial', methods=['POST'])
+def connect_serial():
+    port = request.json.get('port')
+    if not port:
+        return jsonify({'error': 'No port provided'}), 400
+
+    try:
+        serial_manager.connect_to_serial(port)
+        return jsonify({'success': True})
+    except Exception as e:
+        return jsonify({'error': str(e)}), 500
+
+@app.route('/disconnect_serial', methods=['POST'])
+def disconnect():
+    try:
+        serial_manager.disconnect_serial()
+        return jsonify({'success': True})
+    except Exception as e:
+        return jsonify({'error': str(e)}), 500
+
+@app.route('/restart_serial', methods=['POST'])
+def restart():
+    port = request.json.get('port')
+    if not port:
+        return jsonify({'error': 'No port provided'}), 400
+
+    try:
+        serial_manager.restart_serial(port)
+        return jsonify({'success': True})
+    except Exception as e:
+        return jsonify({'error': str(e)}), 500
+
+@app.route('/list_theta_rho_files', methods=['GET'])
+def list_theta_rho_files():
+    files = []
+    for root, _, filenames in os.walk(pattern_manager.THETA_RHO_DIR):
+        for file in filenames:
+            relative_path = os.path.relpath(os.path.join(root, file), pattern_manager.THETA_RHO_DIR)
+            files.append(relative_path)
+    return jsonify(sorted(files))
+
+@app.route('/upload_theta_rho', methods=['POST'])
+def upload_theta_rho():
+    custom_patterns_dir = os.path.join(pattern_manager.THETA_RHO_DIR, 'custom_patterns')
+    os.makedirs(custom_patterns_dir, exist_ok=True)
+
+    file = request.files['file']
+    if file:
+        file.save(os.path.join(custom_patterns_dir, file.filename))
+        return jsonify({'success': True})
+    return jsonify({'success': False})
+
+@app.route('/run_theta_rho', methods=['POST'])
+def run_theta_rho():
+    file_name = request.json.get('file_name')
+    pre_execution = request.json.get('pre_execution')
+
+    if not file_name:
+        return jsonify({'error': 'No file name provided'}), 400
+
+    file_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_name)
+    if not os.path.exists(file_path):
+        return jsonify({'error': 'File not found'}), 404
+
+    try:
+        files_to_run = [file_path]
+        pattern_manager.run_theta_rho_files(files_to_run, clear_pattern=pre_execution)
+        return jsonify({'success': True})
+    except Exception as e:
+        return jsonify({'error': str(e)}), 500
+
+@app.route('/stop_execution', methods=['POST'])
+def stop_execution():
+    pattern_manager.stop_actions()
+    return jsonify({'success': True})
+
+@app.route('/send_home', methods=['POST'])
+def send_home():
+    try:
+        serial_manager.send_command("HOME")
+        return jsonify({'success': True})
+    except Exception as e:
+        return jsonify({'error': str(e)}), 500
+
+@app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
+def run_specific_theta_rho_file(file_name):
+    file_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_name)
+    if not os.path.exists(file_path):
+        return jsonify({'error': 'File not found'}), 404
+
+    pattern_manager.run_theta_rho_file(file_path)
+    return jsonify({'success': True})
+
+@app.route('/delete_theta_rho_file', methods=['POST'])
+def delete_theta_rho_file():
+    file_name = request.json.get('file_name')
+    if not file_name:
+        return jsonify({"success": False, "error": "No file name provided"}), 400
+
+    file_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_name)
+    if not os.path.exists(file_path):
+        return jsonify({"success": False, "error": "File not found"}), 404
+
+    try:
+        os.remove(file_path)
+        return jsonify({"success": True})
+    except Exception as e:
+        return jsonify({"success": False, "error": str(e)}), 500
+
+@app.route('/move_to_center', methods=['POST'])
+def move_to_center():
+    try:
+        if not serial_manager.is_connected():
+            return jsonify({"success": False, "error": "Serial connection not established"}), 400
+
+        coordinates = [(0, 0)]
+        serial_manager.send_coordinate_batch(coordinates)
+        return jsonify({"success": True})
+    except Exception as e:
+        return jsonify({"success": False, "error": str(e)}), 500
+
+@app.route('/move_to_perimeter', methods=['POST'])
+def move_to_perimeter():
+    try:
+        if not serial_manager.is_connected():
+            return jsonify({"success": False, "error": "Serial connection not established"}), 400
+
+        MAX_RHO = 1
+        coordinates = [(0, MAX_RHO)]
+        serial_manager.send_coordinate_batch(coordinates)
+        return jsonify({"success": True})
+    except Exception as e:
+        return jsonify({"success": False, "error": str(e)}), 500
+
+@app.route('/preview_thr', methods=['POST'])
+def preview_thr():
+    file_name = request.json.get('file_name')
+    if not file_name:
+        return jsonify({'error': 'No file name provided'}), 400
+
+    file_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_name)
+    if not os.path.exists(file_path):
+        return jsonify({'error': 'File not found'}), 404
+
+    try:
+        coordinates = pattern_manager.parse_theta_rho_file(file_path)
+        return jsonify({'success': True, 'coordinates': coordinates})
+    except Exception as e:
+        return jsonify({'error': str(e)}), 500
+
+@app.route('/send_coordinate', methods=['POST'])
+def send_coordinate():
+    if not serial_manager.is_connected():
+        return jsonify({"success": False, "error": "Serial connection not established"}), 400
+
+    try:
+        data = request.json
+        theta = data.get('theta')
+        rho = data.get('rho')
+
+        if theta is None or rho is None:
+            return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
+
+        serial_manager.send_coordinate_batch([(theta, rho)])
+        return jsonify({"success": True})
+    except Exception as e:
+        return jsonify({"success": False, "error": str(e)}), 500
+
+@app.route('/download/<filename>', methods=['GET'])
+def download_file(filename):
+    return send_from_directory(pattern_manager.THETA_RHO_DIR, filename)
+
+@app.route('/serial_status', methods=['GET'])
+def serial_status():
+    return jsonify({
+        'connected': serial_manager.is_connected(),
+        'port': serial_manager.get_port()
+    })
+
+@app.route('/pause_execution', methods=['POST'])
+def pause_execution():
+    pattern_manager.pause_requested = True
+    return jsonify({'success': True, 'message': 'Execution paused'})
+
+@app.route('/status', methods=['GET'])
+def get_status():
+    return jsonify(pattern_manager.get_status())
+
+@app.route('/resume_execution', methods=['POST'])
+def resume_execution():
+    with pattern_manager.pause_condition:
+        pattern_manager.pause_requested = False
+        pattern_manager.pause_condition.notify_all()
+    return jsonify({'success': True, 'message': 'Execution resumed'})
+
+# Playlist endpoints
+@app.route("/list_all_playlists", methods=["GET"])
+def list_all_playlists():
+    playlist_names = playlist_manager.list_all_playlists()
+    return jsonify(playlist_names)
+
+@app.route("/get_playlist", methods=["GET"])
+def get_playlist():
+    playlist_name = request.args.get("name", "")
+    if not playlist_name:
+        return jsonify({"error": "Missing playlist 'name' parameter"}), 400
+
+    playlist = playlist_manager.get_playlist(playlist_name)
+    if not playlist:
+        return jsonify({"error": f"Playlist '{playlist_name}' not found"}), 404
+
+    return jsonify(playlist)
+
+@app.route("/create_playlist", methods=["POST"])
+def create_playlist():
+    data = request.get_json()
+    if not data or "name" not in data or "files" not in data:
+        return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
+
+    success = playlist_manager.create_playlist(data["name"], data["files"])
+    return jsonify({
+        "success": success,
+        "message": f"Playlist '{data['name']}' created/updated"
+    })
+
+@app.route("/modify_playlist", methods=["POST"])
+def modify_playlist():
+    data = request.get_json()
+    if not data or "name" not in data or "files" not in data:
+        return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
+
+    success = playlist_manager.modify_playlist(data["name"], data["files"])
+    return jsonify({"success": success, "message": f"Playlist '{data['name']}' updated"})
+
+@app.route("/delete_playlist", methods=["DELETE"])
+def delete_playlist():
+    data = request.get_json()
+    if not data or "name" not in data:
+        return jsonify({"success": False, "error": "Missing 'name' field"}), 400
+
+    success = playlist_manager.delete_playlist(data["name"])
+    if not success:
+        return jsonify({"success": False, "error": f"Playlist '{data['name']}' not found"}), 404
+
+    return jsonify({
+        "success": True,
+        "message": f"Playlist '{data['name']}' deleted"
+    })
+
+@app.route('/add_to_playlist', methods=['POST'])
+def add_to_playlist():
+    data = request.json
+    playlist_name = data.get('playlist_name')
+    pattern = data.get('pattern')
+
+    success = playlist_manager.add_to_playlist(playlist_name, pattern)
+    if not success:
+        return jsonify(success=False, error='Playlist not found'), 404
+    return jsonify(success=True)
+
+@app.route("/run_playlist", methods=["POST"])
+def run_playlist():
+    data = request.get_json()
+    if not data or "playlist_name" not in data:
+        return jsonify({"success": False, "error": "Missing 'playlist_name' field"}), 400
+
+    playlist_name = data["playlist_name"]
+    pause_time = data.get("pause_time", 0)
+    clear_pattern = data.get("clear_pattern", None)
+    run_mode = data.get("run_mode", "single")
+    shuffle = data.get("shuffle", False)
+    
+    schedule_hours = None
+    start_time = data.get("start_time")
+    end_time = data.get("end_time")
+    
+    if start_time and end_time:
+        try:
+            start_time_obj = datetime.strptime(start_time, "%H:%M").time()
+            end_time_obj = datetime.strptime(end_time, "%H:%M").time()
+            if start_time_obj >= end_time_obj:
+                return jsonify({"success": False, "error": "'start_time' must be earlier than 'end_time'"}), 400
+            schedule_hours = (start_time_obj, end_time_obj)
+        except ValueError:
+            return jsonify({"success": False, "error": "Invalid time format. Use HH:MM (e.g., '09:30')"}), 400
+
+    success, message = playlist_manager.run_playlist(
+        playlist_name,
+        pause_time=pause_time,
+        clear_pattern=clear_pattern,
+        run_mode=run_mode,
+        shuffle=shuffle,
+        schedule_hours=schedule_hours
+    )
+
+    if not success:
+        return jsonify({"success": False, "error": message}), 500
+    return jsonify({"success": True, "message": message})
+
+# Firmware endpoints
+@app.route('/set_speed', methods=['POST'])
+def set_speed():
+    if not serial_manager.is_connected():
+        return jsonify({"success": False, "error": "Serial connection not established"}), 400
+
+    try:
+        data = request.json
+        speed = data.get('speed')
+
+        if speed is None:
+            return jsonify({"success": False, "error": "Speed is required"}), 400
+
+        if not isinstance(speed, (int, float)) or speed <= 0:
+            return jsonify({"success": False, "error": "Invalid speed value"}), 400
+
+        serial_manager.send_command(f"SET_SPEED {speed}")
+        return jsonify({"success": True, "speed": speed})
+    except Exception as e:
+        return jsonify({"success": False, "error": str(e)}), 500
+
+@app.route('/get_firmware_info', methods=['GET', 'POST'])
+def get_firmware_info():
+    if not serial_manager.is_connected():
+        return jsonify({"success": False, "error": "Arduino not connected or serial port not open"}), 400
+
+    try:
+        if request.method == "POST":
+            motor_type = request.json.get("motorType", None)
+            success, result = firmware_manager.get_firmware_info(motor_type)
+        else:
+            success, result = firmware_manager.get_firmware_info()
+
+        if not success:
+            return jsonify({"success": False, "error": result}), 500
+        return jsonify({"success": True, **result})
+
+    except Exception as e:
+        return jsonify({"success": False, "error": str(e)}), 500
+
+@app.route('/flash_firmware', methods=['POST'])
+def flash_firmware():
+    try:
+        motor_type = request.json.get("motorType", None)
+        success, message = firmware_manager.flash_firmware(motor_type)
+        
+        if not success:
+            return jsonify({"success": False, "error": message}), 500
+        return jsonify({"success": True, "message": message})
+    except Exception as e:
+        return jsonify({"success": False, "error": str(e)}), 500
+
+@app.route('/check_software_update', methods=['GET'])
+def check_updates():
+    update_info = firmware_manager.check_git_updates()
+    return jsonify(update_info)
+
+@app.route('/update_software', methods=['POST'])
+def update_software():
+    success, error_message, error_log = firmware_manager.update_software()
+    
+    if success:
+        return jsonify({"success": True})
+    else:
+        return jsonify({
+            "success": False,
+            "error": error_message,
+            "details": error_log
+        }), 500
+
+def on_exit():
+    """Function to execute on application shutdown."""
+    print("Shutting down the application...")
+    pattern_manager.stop_actions()
+    print("Execution stopped and resources cleaned up.")
+
+# Register the on_exit function
+atexit.register(on_exit)
+
+def entrypoint():
+    # Auto-connect to serial
+    serial_manager.connect_to_serial()
+    try:
+        app.run(debug=False, host='0.0.0.0', port=8080)
+    except KeyboardInterrupt:
+        print("Keyboard interrupt received. Shutting down.")
+    finally:
+        on_exit()

+ 0 - 0
dune_weaver_flask/dune_weaver/__init__.py


+ 0 - 0
dune_weaver_flask/dune_weaver/core/__init__.py


+ 0 - 0
dune_weaver_flask/dune_weaver/core/pattern_manager/__init__.py


+ 279 - 0
dune_weaver_flask/dune_weaver/core/pattern_manager/manager.py

@@ -0,0 +1,279 @@
+import os
+import threading
+import time
+import random
+from datetime import datetime
+from tqdm import tqdm
+from ...serial.manager import serial_manager
+
+class PatternManager:
+    def __init__(self):
+        self.THETA_RHO_DIR = './patterns'
+        self.CLEAR_PATTERNS = {
+            "clear_from_in":  "./patterns/clear_from_in.thr",
+            "clear_from_out": "./patterns/clear_from_out.thr",
+            "clear_sideway":  "./patterns/clear_sideway.thr"
+        }
+        os.makedirs(self.THETA_RHO_DIR, exist_ok=True)
+
+        # Execution state
+        self.stop_requested = False
+        self.pause_requested = False
+        self.pause_condition = threading.Condition()
+        self.current_playing_file = None
+        self.execution_progress = None
+        self.current_playing_index = None
+        self.current_playlist = None
+        self.is_clearing = False
+
+    def parse_theta_rho_file(self, file_path):
+        """Parse a theta-rho file and return a list of (theta, rho) pairs."""
+        coordinates = []
+        try:
+            with open(file_path, 'r') as file:
+                for line in file:
+                    line = line.strip()
+                    if not line or line.startswith("#"):
+                        continue
+                    try:
+                        theta, rho = map(float, line.split())
+                        coordinates.append((theta, rho))
+                    except ValueError:
+                        print(f"Skipping invalid line: {line}")
+                        continue
+        except Exception as e:
+            print(f"Error reading file: {e}")
+            return coordinates
+
+        # Normalization Step
+        if coordinates:
+            first_theta = coordinates[0][0]
+            normalized = [(theta - first_theta, rho) for theta, rho in coordinates]
+            coordinates = normalized
+
+        return coordinates
+
+    def get_clear_pattern_file(self, clear_pattern_mode, path=None):
+        """Return a .thr file path based on pattern_name."""
+        if not clear_pattern_mode or clear_pattern_mode == 'none':
+            return
+        print("Clear pattern mode: " + clear_pattern_mode)
+        if clear_pattern_mode == "random":
+            return random.choice(list(self.CLEAR_PATTERNS.values()))
+
+        if clear_pattern_mode == 'adaptive':
+            _, first_rho = self.parse_theta_rho_file(path)[0]
+            if first_rho < 0.5:
+                return self.CLEAR_PATTERNS['clear_from_out']
+            else:
+                return random.choice([self.CLEAR_PATTERNS['clear_from_in'], self.CLEAR_PATTERNS['clear_sideway']])
+        else:
+            return self.CLEAR_PATTERNS[clear_pattern_mode]
+
+    def schedule_checker(self, schedule_hours):
+        """Pauses/resumes execution based on a given time range."""
+        if not schedule_hours:
+            return
+
+        start_time, end_time = schedule_hours
+        now = datetime.now().time()
+
+        if start_time <= now < end_time:
+            if self.pause_requested:
+                print("Starting execution: Within schedule.")
+            self.pause_requested = False
+            with self.pause_condition:
+                self.pause_condition.notify_all()
+        else:
+            if not self.pause_requested:
+                print("Pausing execution: Outside schedule.")
+            self.pause_requested = True
+            threading.Thread(target=self.wait_for_start_time, args=(schedule_hours,), daemon=True).start()
+
+    def wait_for_start_time(self, schedule_hours):
+        """Keep checking every 30 seconds if the time is within the schedule to resume execution."""
+        start_time, end_time = schedule_hours
+
+        while self.pause_requested:
+            now = datetime.now().time()
+            if start_time <= now < end_time:
+                print("Resuming execution: Within schedule.")
+                self.pause_requested = False
+                with self.pause_condition:
+                    self.pause_condition.notify_all()
+                break
+            else:
+                time.sleep(30)
+
+    def run_theta_rho_file(self, file_path, schedule_hours=None):
+        """Run a theta-rho file by sending data in optimized batches with tqdm ETA tracking."""
+        coordinates = self.parse_theta_rho_file(file_path)
+        total_coordinates = len(coordinates)
+
+        if total_coordinates < 2:
+            print("Not enough coordinates for interpolation.")
+            self.current_playing_file = None
+            self.execution_progress = None
+            return
+
+        self.execution_progress = (0, total_coordinates, None)
+        batch_size = 10
+
+        self.stop_actions()
+        with serial_manager.serial_lock:
+            self.current_playing_file = file_path
+            self.execution_progress = (0, 0, None)
+            self.stop_requested = False
+            with tqdm(total=total_coordinates, unit="coords", desc=f"Executing Pattern {file_path}", dynamic_ncols=True, disable=None) as pbar:
+                for i in range(0, total_coordinates, batch_size):
+                    if self.stop_requested:
+                        print("Execution stopped by user after completing the current batch.")
+                        break
+
+                    with self.pause_condition:
+                        while self.pause_requested:
+                            print("Execution paused...")
+                            self.pause_condition.wait()
+
+                    batch = coordinates[i:i + batch_size]
+
+                    if i == 0:
+                        serial_manager.send_coordinate_batch(batch)
+                        self.execution_progress = (i + batch_size, total_coordinates, None)
+                        pbar.update(batch_size)
+                        continue
+
+                    while True:
+                        self.schedule_checker(schedule_hours)
+                        if serial_manager.ser.in_waiting > 0:
+                            response = serial_manager.ser.readline().decode().strip()
+                            if response == "R":
+                                serial_manager.send_coordinate_batch(batch)
+                                pbar.update(batch_size)
+                                estimated_remaining_time = pbar.format_dict['elapsed'] / (i + batch_size) * (total_coordinates - (i + batch_size))
+                                self.execution_progress = (i + batch_size, total_coordinates, estimated_remaining_time)
+                                break
+                            elif response != "IGNORED: FINISHED" and response.startswith("IGNORE"):
+                                print("Received IGNORE. Resending the previous batch...")
+                                print(response)
+                                prev_start = max(0, i - batch_size)
+                                prev_end = i
+                                previous_batch = coordinates[prev_start:prev_end]
+                                serial_manager.send_coordinate_batch(previous_batch)
+                                break
+                            else:
+                                print(f"Arduino response: {response}")
+
+            self.reset_theta()
+            serial_manager.ser.write("FINISHED\n".encode())
+
+        self.current_playing_file = None
+        self.execution_progress = None
+        print("Pattern execution completed.")
+
+    def run_theta_rho_files(self, file_paths, pause_time=0, clear_pattern=None, run_mode="single", shuffle=False, schedule_hours=None):
+        """Run multiple .thr files in sequence with options."""
+        self.stop_requested = False
+        
+        if shuffle:
+            random.shuffle(file_paths)
+            print("Playlist shuffled.")
+
+        self.current_playlist = file_paths
+
+        while True:
+            for idx, path in enumerate(file_paths):
+                print("Upcoming pattern: " + path)
+                self.current_playing_index = idx
+                self.schedule_checker(schedule_hours)
+                if self.stop_requested:
+                    print("Execution stopped before starting next pattern.")
+                    return
+
+                if clear_pattern:
+                    if self.stop_requested:
+                        print("Execution stopped before running the next clear pattern.")
+                        return
+
+                    clear_file_path = self.get_clear_pattern_file(clear_pattern, path)
+                    print(f"Running clear pattern: {clear_file_path}")
+                    self.run_theta_rho_file(clear_file_path, schedule_hours)
+
+                if not self.stop_requested:
+                    print(f"Running pattern {idx + 1} of {len(file_paths)}: {path}")
+                    self.run_theta_rho_file(path, schedule_hours)
+
+                if idx < len(file_paths) - 1:
+                    if self.stop_requested:
+                        print("Execution stopped before running the next clear pattern.")
+                        return
+                    if pause_time > 0:
+                        print(f"Pausing for {pause_time} seconds...")
+                        time.sleep(pause_time)
+
+            if run_mode == "indefinite":
+                print("Playlist completed. Restarting as per 'indefinite' run mode.")
+                if pause_time > 0:
+                    print(f"Pausing for {pause_time} seconds before restarting...")
+                    time.sleep(pause_time)
+                if shuffle:
+                    random.shuffle(file_paths)
+                    print("Playlist reshuffled for the next loop.")
+                continue
+            else:
+                print("Playlist completed.")
+                break
+
+        self.reset_theta()
+        with serial_manager.serial_lock:
+            serial_manager.ser.write("FINISHED\n".encode())
+            
+        print("All requested patterns completed (or stopped).")
+
+    def reset_theta(self):
+        """Reset theta on the Arduino."""
+        with serial_manager.serial_lock:
+            serial_manager.ser.write("RESET_THETA\n".encode())
+            while True:
+                with serial_manager.serial_lock:
+                    if serial_manager.ser.in_waiting > 0:
+                        response = serial_manager.ser.readline().decode().strip()
+                        print(f"Arduino response: {response}")
+                        if response == "THETA_RESET":
+                            print("Theta successfully reset.")
+                            break
+                time.sleep(0.5)
+
+    def stop_actions(self):
+        """Stop all current pattern execution."""
+        with self.pause_condition:
+            self.pause_requested = False
+            self.pause_condition.notify_all()
+            
+        self.stop_requested = True
+        self.current_playing_index = None
+        self.current_playlist = None
+        self.is_clearing = False
+        self.current_playing_file = None
+        self.execution_progress = None
+
+    def get_status(self):
+        """Get the current status of pattern execution."""
+        if self.current_playing_file in self.CLEAR_PATTERNS.values():
+            self.is_clearing = True
+        else:
+            self.is_clearing = False
+
+        return {
+            "ser_port": serial_manager.get_port(),
+            "stop_requested": self.stop_requested,
+            "pause_requested": self.pause_requested,
+            "current_playing_file": self.current_playing_file,
+            "execution_progress": self.execution_progress,
+            "current_playing_index": self.current_playing_index,
+            "current_playlist": self.current_playlist,
+            "is_clearing": self.is_clearing
+        }
+
+# Create a global instance
+pattern_manager = PatternManager()

+ 0 - 0
dune_weaver_flask/dune_weaver/core/playlist_manager/__init__.py


+ 99 - 0
dune_weaver_flask/dune_weaver/core/playlist_manager/manager.py

@@ -0,0 +1,99 @@
+import json
+import os
+import threading
+from ...core.pattern_manager.manager import pattern_manager
+
+class PlaylistManager:
+    def __init__(self):
+        self.PLAYLISTS_FILE = os.path.join(os.getcwd(), "playlists.json")
+        
+        # Ensure the file exists and contains at least an empty JSON object
+        if not os.path.exists(self.PLAYLISTS_FILE):
+            with open(self.PLAYLISTS_FILE, "w") as f:
+                json.dump({}, f, indent=2)
+
+    def load_playlists(self):
+        """Load the entire playlists dictionary from the JSON file."""
+        with open(self.PLAYLISTS_FILE, "r") as f:
+            return json.load(f)
+
+    def save_playlists(self, playlists_dict):
+        """Save the entire playlists dictionary back to the JSON file."""
+        with open(self.PLAYLISTS_FILE, "w") as f:
+            json.dump(playlists_dict, f, indent=2)
+
+    def list_all_playlists(self):
+        """Returns a list of all playlist names."""
+        playlists_dict = self.load_playlists()
+        return list(playlists_dict.keys())
+
+    def get_playlist(self, playlist_name):
+        """Get a specific playlist by name."""
+        playlists_dict = self.load_playlists()
+        if playlist_name not in playlists_dict:
+            return None
+        return {
+            "name": playlist_name,
+            "files": playlists_dict[playlist_name]
+        }
+
+    def create_playlist(self, playlist_name, files):
+        """Create or update a playlist."""
+        playlists_dict = self.load_playlists()
+        playlists_dict[playlist_name] = files
+        self.save_playlists(playlists_dict)
+        return True
+
+    def modify_playlist(self, playlist_name, files):
+        """Modify an existing playlist."""
+        return self.create_playlist(playlist_name, files)
+
+    def delete_playlist(self, playlist_name):
+        """Delete a playlist."""
+        playlists_dict = self.load_playlists()
+        if playlist_name not in playlists_dict:
+            return False
+        del playlists_dict[playlist_name]
+        self.save_playlists(playlists_dict)
+        return True
+
+    def add_to_playlist(self, playlist_name, pattern):
+        """Add a pattern to an existing playlist."""
+        playlists_dict = self.load_playlists()
+        if playlist_name not in playlists_dict:
+            return False
+        playlists_dict[playlist_name].append(pattern)
+        self.save_playlists(playlists_dict)
+        return True
+
+    def run_playlist(self, playlist_name, pause_time=0, clear_pattern=None, run_mode="single", shuffle=False, schedule_hours=None):
+        """Run a playlist with the given options."""
+        playlists = self.load_playlists()
+        if playlist_name not in playlists:
+            return False, "Playlist not found"
+
+        file_paths = playlists[playlist_name]
+        file_paths = [os.path.join(pattern_manager.THETA_RHO_DIR, file) for file in file_paths]
+
+        if not file_paths:
+            return False, "Playlist is empty"
+
+        try:
+            threading.Thread(
+                target=pattern_manager.run_theta_rho_files,
+                args=(file_paths,),
+                kwargs={
+                    'pause_time': pause_time,
+                    'clear_pattern': clear_pattern,
+                    'run_mode': run_mode,
+                    'shuffle': shuffle,
+                    'schedule_hours': schedule_hours
+                },
+                daemon=True
+            ).start()
+            return True, f"Playlist '{playlist_name}' is now running."
+        except Exception as e:
+            return False, str(e)
+
+# Create a global instance
+playlist_manager = PlaylistManager()

+ 0 - 0
dune_weaver_flask/dune_weaver/firmware/__init__.py


+ 226 - 0
dune_weaver_flask/dune_weaver/firmware/manager.py

@@ -0,0 +1,226 @@
+import os
+import subprocess
+from ..serial.manager import serial_manager
+
+class FirmwareManager:
+    def __init__(self):
+        self.MOTOR_TYPE_MAPPING = {
+            "TMC2209": "./firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino",
+            "DRV8825": "./firmware/arduino_code/arduino_code.ino",
+            "esp32": "./firmware/esp32/esp32.ino",
+            "esp32_TMC2209": "./firmware/esp32_TMC2209/esp32_TMC2209.ino"
+        }
+
+    def get_ino_firmware_details(self, ino_file_path):
+        """Extract firmware details from the given .ino file."""
+        try:
+            if not ino_file_path:
+                raise ValueError("Invalid path: ino_file_path is None or empty.")
+
+            firmware_details = {"version": None, "motorType": None}
+
+            with open(ino_file_path, "r") as file:
+                for line in file:
+                    if "firmwareVersion" in line:
+                        start = line.find('"') + 1
+                        end = line.rfind('"')
+                        if start != -1 and end != -1 and start < end:
+                            firmware_details["version"] = line[start:end]
+
+                    if "motorType" in line:
+                        start = line.find('"') + 1
+                        end = line.rfind('"')
+                        if start != -1 and end != -1 and start < end:
+                            firmware_details["motorType"] = line[start:end]
+
+            if not firmware_details["version"]:
+                print(f"Firmware version not found in file: {ino_file_path}")
+            if not firmware_details["motorType"]:
+                print(f"Motor type not found in file: {ino_file_path}")
+
+            return firmware_details if any(firmware_details.values()) else None
+
+        except FileNotFoundError:
+            print(f"File not found: {ino_file_path}")
+            return None
+        except Exception as e:
+            print(f"Error reading .ino file: {str(e)}")
+            return None
+
+    def check_git_updates(self):
+        """Check for available Git updates."""
+        try:
+            subprocess.run(["git", "fetch", "--tags", "--force"], check=True)
+            latest_remote_tag = subprocess.check_output(
+                ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
+            ).strip().decode()
+            latest_local_tag = subprocess.check_output(
+                ["git", "describe", "--tags", "--abbrev=0"]
+            ).strip().decode()
+
+            tag_behind_count = 0
+            if latest_local_tag != latest_remote_tag:
+                tags = subprocess.check_output(
+                    ["git", "tag", "--merged", "origin/main"], text=True
+                ).splitlines()
+
+                found_local = False
+                for tag in tags:
+                    if tag == latest_local_tag:
+                        found_local = True
+                    elif found_local:
+                        tag_behind_count += 1
+                        if tag == latest_remote_tag:
+                            break
+
+            updates_available = latest_remote_tag != latest_local_tag
+
+            return {
+                "updates_available": updates_available,
+                "tag_behind_count": tag_behind_count,
+                "latest_remote_tag": latest_remote_tag,
+                "latest_local_tag": latest_local_tag,
+            }
+        except subprocess.CalledProcessError as e:
+            print(f"Error checking Git updates: {e}")
+            return {
+                "updates_available": False,
+                "tag_behind_count": 0,
+                "latest_remote_tag": None,
+                "latest_local_tag": None,
+            }
+
+    def update_software(self):
+        """Update the software to the latest version."""
+        error_log = []
+
+        def run_command(command, error_message):
+            try:
+                subprocess.run(command, check=True)
+            except subprocess.CalledProcessError as e:
+                print(f"{error_message}: {e}")
+                error_log.append(error_message)
+
+        try:
+            subprocess.run(["git", "fetch", "--tags"], check=True)
+            latest_remote_tag = subprocess.check_output(
+                ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
+            ).strip().decode()
+        except subprocess.CalledProcessError as e:
+            error_log.append(f"Failed to fetch tags or get latest remote tag: {e}")
+            return False, "Failed to fetch tags or determine the latest version.", error_log
+
+        run_command(["git", "checkout", latest_remote_tag, '--force'], f"Failed to checkout version {latest_remote_tag}")
+        run_command(["docker", "compose", "pull"], "Failed to fetch Docker containers")
+        run_command(["docker", "compose", "up", "-d"], "Failed to restart Docker containers")
+
+        update_status = self.check_git_updates()
+
+        if (
+            update_status["updates_available"] is False
+            and update_status["latest_local_tag"] == update_status["latest_remote_tag"]
+        ):
+            return True, None, None
+        else:
+            return False, "Update incomplete", error_log
+
+    def get_firmware_info(self, motor_type=None):
+        """Get firmware information for the current or specified motor type."""
+        if motor_type and motor_type not in self.MOTOR_TYPE_MAPPING:
+            return False, "Invalid motor type"
+
+        installed_version = serial_manager.firmware_version
+        installed_type = serial_manager.arduino_driver_type
+
+        if motor_type:
+            # POST request with specified motor type
+            ino_path = self.MOTOR_TYPE_MAPPING[motor_type]
+            firmware_details = self.get_ino_firmware_details(ino_path)
+
+            if not firmware_details:
+                return False, "Failed to retrieve .ino firmware details"
+
+            return True, {
+                "installedVersion": 'Unknown',
+                "installedType": motor_type,
+                "inoVersion": firmware_details["version"],
+                "inoType": firmware_details["motorType"],
+                "updateAvailable": True
+            }
+        else:
+            # GET request for current firmware info
+            if installed_version != 'Unknown' and installed_type != 'Unknown':
+                ino_path = self.MOTOR_TYPE_MAPPING.get(installed_type)
+                firmware_details = self.get_ino_firmware_details(ino_path)
+
+                if not firmware_details or not firmware_details.get("version") or not firmware_details.get("motorType"):
+                    return False, "Failed to retrieve .ino firmware details"
+
+                update_available = (
+                    installed_version != firmware_details["version"] or
+                    installed_type != firmware_details["motorType"]
+                )
+
+                return True, {
+                    "installedVersion": installed_version,
+                    "installedType": installed_type,
+                    "inoVersion": firmware_details["version"],
+                    "inoType": firmware_details["motorType"],
+                    "updateAvailable": update_available
+                }
+
+            return True, {
+                "installedVersion": installed_version,
+                "installedType": installed_type,
+                "updateAvailable": False
+            }
+
+    def flash_firmware(self, motor_type):
+        """Flash firmware for the specified motor type."""
+        if not motor_type or motor_type not in self.MOTOR_TYPE_MAPPING:
+            return False, "Invalid or missing motor type"
+
+        if not serial_manager.is_connected():
+            return False, "No device connected or connection lost"
+
+        try:
+            ino_file_path = self.MOTOR_TYPE_MAPPING[motor_type]
+            hex_file_path = f"{ino_file_path}.hex"
+            bin_file_path = f"{ino_file_path}.bin"
+
+            if motor_type.lower() in ["esp32", "esp32_tmc2209"]:
+                if not os.path.exists(bin_file_path):
+                    return False, f"Firmware binary not found: {bin_file_path}"
+
+                flash_command = [
+                    "esptool.py",
+                    "--chip", "esp32",
+                    "--port", serial_manager.get_port(),
+                    "--baud", "115200",
+                    "write_flash", "-z", "0x1000", bin_file_path
+                ]
+            else:
+                if not os.path.exists(hex_file_path):
+                    return False, f"Hex file not found: {hex_file_path}"
+
+                flash_command = [
+                    "avrdude",
+                    "-v",
+                    "-c", "arduino",
+                    "-p", "atmega328p",
+                    "-P", serial_manager.get_port(),
+                    "-b", "115200",
+                    "-D",
+                    "-U", f"flash:w:{hex_file_path}:i"
+                ]
+
+            flash_process = subprocess.run(flash_command, capture_output=True, text=True)
+            if flash_process.returncode != 0:
+                return False, flash_process.stderr
+
+            return True, "Firmware flashed successfully"
+        except Exception as e:
+            return False, str(e)
+
+# Create a global instance
+firmware_manager = FirmwareManager()

+ 0 - 0
dune_weaver_flask/dune_weaver/serial/__init__.py


+ 109 - 0
dune_weaver_flask/dune_weaver/serial/manager.py

@@ -0,0 +1,109 @@
+import serial
+import serial.tools.list_ports
+import threading
+import time
+
+class SerialManager:
+    def __init__(self):
+        self.ser = None
+        self.ser_port = None
+        self.serial_lock = threading.RLock()
+        self.IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
+        
+        # Device information
+        self.arduino_table_name = None
+        self.arduino_driver_type = 'Unknown'
+        self.firmware_version = 'Unknown'
+
+    def list_serial_ports(self):
+        """Return a list of available serial ports."""
+        ports = serial.tools.list_ports.comports()
+        return [port.device for port in ports if port.device not in self.IGNORE_PORTS]
+
+    def connect_to_serial(self, port=None, baudrate=115200):
+        """Automatically connect to the first available serial port or a specified port."""
+        try:
+            if port is None:
+                ports = self.list_serial_ports()
+                if not ports:
+                    print("No serial port connected")
+                    return False
+                port = ports[0]  # Auto-select the first available port
+
+            with self.serial_lock:
+                if self.ser and self.ser.is_open:
+                    self.ser.close()
+                self.ser = serial.Serial(port, baudrate, timeout=2)
+                self.ser_port = port
+
+            print(f"Connected to serial port: {port}")
+            time.sleep(2)  # Allow time for the connection to establish
+
+            # Read initial startup messages from Arduino
+            while self.ser.in_waiting > 0:
+                line = self.ser.readline().decode().strip()
+                print(f"Arduino: {line}")
+
+                # Store the device details based on the expected messages
+                if "Table:" in line:
+                    self.arduino_table_name = line.replace("Table: ", "").strip()
+                elif "Drivers:" in line:
+                    self.arduino_driver_type = line.replace("Drivers: ", "").strip()
+                elif "Version:" in line:
+                    self.firmware_version = line.replace("Version: ", "").strip()
+
+            print(f"Detected Table: {self.arduino_table_name or 'Unknown'}")
+            print(f"Detected Drivers: {self.arduino_driver_type or 'Unknown'}")
+
+            return True
+        except serial.SerialException as e:
+            print(f"Failed to connect to serial port {port}: {e}")
+            self.ser_port = None
+
+        print("Max retries reached. Could not connect to a serial port.")
+        return False
+
+    def disconnect_serial(self):
+        """Disconnect the current serial connection."""
+        if self.ser and self.ser.is_open:
+            self.ser.close()
+            self.ser = None
+        self.ser_port = None
+
+    def restart_serial(self, port, baudrate=115200):
+        """Restart the serial connection."""
+        self.disconnect_serial()
+        return self.connect_to_serial(port, baudrate)
+
+    def send_coordinate_batch(self, coordinates):
+        """Send a batch of theta-rho pairs to the Arduino."""
+        batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
+        with self.serial_lock:
+            self.ser.write(batch_str.encode())
+
+    def send_command(self, command):
+        """Send a single command to the Arduino."""
+        with self.serial_lock:
+            self.ser.write(f"{command}\n".encode())
+            print(f"Sent: {command}")
+
+            # Wait for "R" acknowledgment from Arduino
+            while True:
+                with self.serial_lock:
+                    if self.ser.in_waiting > 0:
+                        response = self.ser.readline().decode().strip()
+                        print(f"Arduino response: {response}")
+                        if response == "R":
+                            print("Command execution completed.")
+                            break
+
+    def is_connected(self):
+        """Check if serial connection is established and open."""
+        return self.ser is not None and self.ser.is_open
+
+    def get_port(self):
+        """Get the current serial port."""
+        return self.ser_port
+
+# Create a global instance
+serial_manager = SerialManager()

+ 0 - 0
firmware/arduino_code/arduino_code.ino → dune_weaver_flask/firmware/arduino_code/arduino_code.ino


+ 0 - 0
firmware/arduino_code/arduino_code.ino.hex → dune_weaver_flask/firmware/arduino_code/arduino_code.ino.hex


+ 0 - 0
firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino → dune_weaver_flask/firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino


+ 0 - 0
firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino.hex → dune_weaver_flask/firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino.hex


+ 0 - 0
firmware/esp32/esp32.ino → dune_weaver_flask/firmware/esp32/esp32.ino


+ 0 - 0
firmware/esp32/esp32.ino.bin → dune_weaver_flask/firmware/esp32/esp32.ino.bin


+ 0 - 0
firmware/esp32_TMC2209/esp32_TMC2209.ino → dune_weaver_flask/firmware/esp32_TMC2209/esp32_TMC2209.ino


+ 0 - 0
firmware/esp32_TMC2209/esp32_TMC2209.ino.bin → dune_weaver_flask/firmware/esp32_TMC2209/esp32_TMC2209.ino.bin


+ 0 - 0
static/IMG_7404.gif → dune_weaver_flask/static/IMG_7404.gif


+ 0 - 0
static/IMG_9753.png → dune_weaver_flask/static/IMG_9753.png


+ 0 - 0
static/UI.png → dune_weaver_flask/static/UI.png


+ 0 - 0
static/UI_1.3.png → dune_weaver_flask/static/UI_1.3.png


+ 0 - 0
static/css/all.min.css → dune_weaver_flask/static/css/all.min.css


+ 0 - 0
static/css/style.css → dune_weaver_flask/static/css/style.css


+ 0 - 0
static/fontawesome.min.css → dune_weaver_flask/static/fontawesome.min.css


+ 0 - 0
static/icons/chevron-down.svg → dune_weaver_flask/static/icons/chevron-down.svg


+ 0 - 0
static/icons/chevron-left.svg → dune_weaver_flask/static/icons/chevron-left.svg


+ 0 - 0
static/icons/chevron-right.svg → dune_weaver_flask/static/icons/chevron-right.svg


+ 0 - 0
static/icons/chevron-up.svg → dune_weaver_flask/static/icons/chevron-up.svg


+ 0 - 0
static/icons/pause.svg → dune_weaver_flask/static/icons/pause.svg


+ 0 - 0
static/icons/play.svg → dune_weaver_flask/static/icons/play.svg


+ 0 - 0
static/js/main.js → dune_weaver_flask/static/js/main.js


+ 0 - 0
static/webfonts/Roboto-Italic-VariableFont_wdth,wght.ttf → dune_weaver_flask/static/webfonts/Roboto-Italic-VariableFont_wdth,wght.ttf


+ 0 - 0
static/webfonts/Roboto-VariableFont_wdth,wght.ttf → dune_weaver_flask/static/webfonts/Roboto-VariableFont_wdth,wght.ttf


+ 0 - 0
static/webfonts/fa-regular-400.ttf → dune_weaver_flask/static/webfonts/fa-regular-400.ttf


+ 0 - 0
static/webfonts/fa-regular-400.woff2 → dune_weaver_flask/static/webfonts/fa-regular-400.woff2


+ 0 - 0
static/webfonts/fa-solid-900.ttf → dune_weaver_flask/static/webfonts/fa-solid-900.ttf


+ 0 - 0
static/webfonts/fa-solid-900.woff2 → dune_weaver_flask/static/webfonts/fa-solid-900.woff2


+ 0 - 0
templates/index.html → dune_weaver_flask/templates/index.html