| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289 |
- from flask import Flask, request, jsonify, render_template
- import atexit
- import os
- import serial
- import time
- import random
- import threading
- import serial.tools.list_ports
- import math
- import json
- from datetime import datetime
- import subprocess
- from tqdm import tqdm
- app = Flask(__name__)
- # Configuration
- THETA_RHO_DIR = './patterns'
- IGNORE_PORTS = ['/dev/cu.debug-console', '/dev/cu.Bluetooth-Incoming-Port']
- CLEAR_PATTERNS = {
- "clear_from_in": "./patterns/clear_from_in.thr",
- "clear_from_out": "./patterns/clear_from_out.thr",
- "clear_sideway": "./patterns/clear_sideway.thr"
- }
- os.makedirs(THETA_RHO_DIR, exist_ok=True)
- # Serial connection (First available will be selected by default)
- ser = None
- ser_port = None # Global variable to store the serial port name
- stop_requested = False
- pause_requested = False
- pause_condition = threading.Condition()
- # Global variables to store device information
- arduino_table_name = None
- arduino_driver_type = 'Unknown'
- # Table status
- current_playing_file = None
- execution_progress = None
- firmware_version = 'Unknown'
- current_playing_index = None
- current_playlist = None
- is_clearing = False
- serial_lock = threading.RLock()
- PLAYLISTS_FILE = os.path.join(os.getcwd(), "playlists.json")
- MOTOR_TYPE_MAPPING = {
- "TMC2209": "./firmware/arduino_code_TMC2209/arduino_code_TMC2209.ino",
- "DRV8825": "./firmware/arduino_code/arduino_code.ino",
- "esp32": "./firmware/esp32/esp32.ino",
- "esp32_TMC2209": "./firmware/esp32_TMC2209/esp32_TMC2209.ino"
- }
- # Ensure the file exists and contains at least an empty JSON object
- if not os.path.exists(PLAYLISTS_FILE):
- with open(PLAYLISTS_FILE, "w") as f:
- json.dump({}, f, indent=2)
- def get_ino_firmware_details(ino_file_path):
- """
- Extract firmware details, including version and motor type, from the given .ino file.
- Args:
- ino_file_path (str): Path to the .ino file.
- Returns:
- dict: Dictionary containing firmware details such as version and motor type, or None if not found.
- """
- try:
- if not ino_file_path:
- raise ValueError("Invalid path: ino_file_path is None or empty.")
- firmware_details = {"version": None, "motorType": None}
- with open(ino_file_path, "r") as file:
- for line in file:
- # Extract firmware version
- if "firmwareVersion" in line:
- start = line.find('"') + 1
- end = line.rfind('"')
- if start != -1 and end != -1 and start < end:
- firmware_details["version"] = line[start:end]
- # Extract motor type
- if "motorType" in line:
- start = line.find('"') + 1
- end = line.rfind('"')
- if start != -1 and end != -1 and start < end:
- firmware_details["motorType"] = line[start:end]
- if not firmware_details["version"]:
- print(f"Firmware version not found in file: {ino_file_path}")
- if not firmware_details["motorType"]:
- print(f"Motor type not found in file: {ino_file_path}")
- return firmware_details if any(firmware_details.values()) else None
- except FileNotFoundError:
- print(f"File not found: {ino_file_path}")
- return None
- except Exception as e:
- print(f"Error reading .ino file: {str(e)}")
- return None
- def check_git_updates():
- try:
- # Fetch the latest updates from the remote repository
- subprocess.run(["git", "fetch", "--tags", "--force"], check=True)
- # Get the latest tag from the remote
- latest_remote_tag = subprocess.check_output(
- ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
- ).strip().decode()
- # Get the latest tag from the local branch
- latest_local_tag = subprocess.check_output(
- ["git", "describe", "--tags", "--abbrev=0"]
- ).strip().decode()
- # Count how many tags the local branch is behind
- tag_behind_count = 0
- if latest_local_tag != latest_remote_tag:
- tags = subprocess.check_output(
- ["git", "tag", "--merged", "origin/main"], text=True
- ).splitlines()
- found_local = False
- for tag in tags:
- if tag == latest_local_tag:
- found_local = True
- elif found_local:
- tag_behind_count += 1
- if tag == latest_remote_tag:
- break
- # Check if there are new commits
- updates_available = latest_remote_tag != latest_local_tag
- return {
- "updates_available": updates_available,
- "tag_behind_count": tag_behind_count, # Tags behind
- "latest_remote_tag": latest_remote_tag,
- "latest_local_tag": latest_local_tag,
- }
- except subprocess.CalledProcessError as e:
- print(f"Error checking Git updates: {e}")
- return {
- "updates_available": False,
- "tag_behind_count": 0,
- "latest_remote_tag": None,
- "latest_local_tag": None,
- }
- def list_serial_ports():
- """Return a list of available serial ports."""
- ports = serial.tools.list_ports.comports()
- return [port.device for port in ports if port.device not in IGNORE_PORTS]
- def connect_to_serial(port=None, baudrate=115200):
- """Automatically connect to the first available serial port or a specified port."""
- global ser, ser_port, arduino_table_name, arduino_driver_type, firmware_version
- try:
- if port is None:
- ports = list_serial_ports()
- if not ports:
- print("No serial port connected")
- return False
- port = ports[0] # Auto-select the first available port
- with serial_lock:
- if ser and ser.is_open:
- ser.close()
- ser = serial.Serial(port, baudrate, timeout=2) # Set timeout to avoid infinite waits
- ser_port = port # Store the connected port globally
- print(f"Connected to serial port: {port}")
- time.sleep(2) # Allow time for the connection to establish
- # Read initial startup messages from Arduino
- arduino_table_name = None
- arduino_driver_type = None
- while ser.in_waiting > 0:
- line = ser.readline().decode().strip()
- print(f"Arduino: {line}") # Print the received message
- # Store the device details based on the expected messages
- if "Table:" in line:
- arduino_table_name = line.replace("Table: ", "").strip()
- elif "Drivers:" in line:
- arduino_driver_type = line.replace("Drivers: ", "").strip()
- elif "Version:" in line:
- firmware_version = line.replace("Version: ", "").strip()
- # Display stored values
- print(f"Detected Table: {arduino_table_name or 'Unknown'}")
- print(f"Detected Drivers: {arduino_driver_type or 'Unknown'}")
- return True # Successfully connected
- except serial.SerialException as e:
- print(f"Failed to connect to serial port {port}: {e}")
- port = None # Reset the port to try the next available one
- print("Max retries reached. Could not connect to a serial port.")
- return False
- def disconnect_serial():
- """Disconnect the current serial connection."""
- global ser, ser_port
- if ser and ser.is_open:
- ser.close()
- ser = None
- ser_port = None # Reset the port name
- def restart_serial(port, baudrate=115200):
- """Restart the serial connection."""
- disconnect_serial()
- connect_to_serial(port, baudrate)
- def parse_theta_rho_file(file_path):
- """
- Parse a theta-rho file and return a list of (theta, rho) pairs.
- Normalizes the list so the first theta is always 0.
- """
- coordinates = []
- try:
- with open(file_path, 'r') as file:
- for line in file:
- line = line.strip()
- # Skip header or comment lines (starting with '#' or empty lines)
- if not line or line.startswith("#"):
- continue
- # Parse lines with theta and rho separated by spaces
- try:
- theta, rho = map(float, line.split())
- coordinates.append((theta, rho))
- except ValueError:
- print(f"Skipping invalid line: {line}")
- continue
- except Exception as e:
- print(f"Error reading file: {e}")
- return coordinates
- # ---- Normalization Step ----
- if coordinates:
- # Take the first coordinate's theta
- first_theta = coordinates[0][0]
- # Shift all thetas so the first coordinate has theta=0
- normalized = []
- for (theta, rho) in coordinates:
- normalized.append((theta - first_theta, rho))
- # Replace original list with normalized data
- coordinates = normalized
- return coordinates
- def send_coordinate_batch(ser, coordinates):
- """Send a batch of theta-rho pairs to the Arduino."""
- # print("Sending batch:", coordinates)
- batch_str = ";".join(f"{theta:.5f},{rho:.5f}" for theta, rho in coordinates) + ";\n"
- with serial_lock:
- ser.write(batch_str.encode())
- def send_command(command):
- """Send a single command to the Arduino."""
- with serial_lock:
- ser.write(f"{command}\n".encode())
- print(f"Sent: {command}")
- # Wait for "R" acknowledgment from Arduino
- while True:
- with serial_lock:
- if ser.in_waiting > 0:
- response = ser.readline().decode().strip()
- print(f"Arduino response: {response}")
- if response == "R":
- print("Command execution completed.")
- break
- def wait_for_start_time(schedule_hours):
- """
- Keep checking every 30 seconds if the time is within the schedule to resume execution.
- """
- global pause_requested
- start_time, end_time = schedule_hours
- while pause_requested:
- now = datetime.now().time()
- if start_time <= now < end_time:
- print("Resuming execution: Within schedule.")
- pause_requested = False
- with pause_condition:
- pause_condition.notify_all()
- break # Exit the loop once resumed
- else:
- time.sleep(30) # Wait for 30 seconds before checking again
- # Function to check schedule based on start and end time
- def schedule_checker(schedule_hours):
- """
- Pauses/resumes execution based on a given time range.
- Parameters:
- - schedule_hours (tuple): (start_time, end_time) as `datetime.time` objects.
- """
- global pause_requested
- if not schedule_hours:
- return # No scheduling restriction
- start_time, end_time = schedule_hours
- now = datetime.now().time() # Get the current time as `datetime.time`
- # Check if we are currently within the scheduled time
- if start_time <= now < end_time:
- if pause_requested:
- print("Starting execution: Within schedule.")
- pause_requested = False # Resume execution
- with pause_condition:
- pause_condition.notify_all()
- else:
- if not pause_requested:
- print("Pausing execution: Outside schedule.")
- pause_requested = True # Pause execution
- # Start a background thread to periodically check for start time
- threading.Thread(target=wait_for_start_time, args=(schedule_hours,), daemon=True).start()
- def run_theta_rho_file(file_path, schedule_hours=None):
- """Run a theta-rho file by sending data in optimized batches with tqdm ETA tracking."""
- global stop_requested, current_playing_file, execution_progress
- coordinates = parse_theta_rho_file(file_path)
- total_coordinates = len(coordinates)
- if total_coordinates < 2:
- print("Not enough coordinates for interpolation.")
- current_playing_file = None # Clear tracking if failed
- execution_progress = None
- return
- execution_progress = (0, total_coordinates, None) # Initialize progress with ETA as None
- batch_size = 10 # Smaller batches may smooth movement further
-
- # before trying to acuire the lock we send the stop command
- # so then we will just wait for the lock to be released so we can use the serial
- stop_actions()
- with serial_lock:
- current_playing_file = file_path # Track current playing file
- execution_progress = (0, 0, None) # Reset progress (ETA starts as None)
- stop_requested = False
- with tqdm(total=total_coordinates, unit="coords", desc=f"Executing Pattern {file_path}", dynamic_ncols=True, disable=None) as pbar:
- for i in range(0, total_coordinates, batch_size):
- if stop_requested:
- print("Execution stopped by user after completing the current batch.")
- break
- with pause_condition:
- while pause_requested:
- print("Execution paused...")
- pause_condition.wait() # This will block execution until notified
- batch = coordinates[i:i + batch_size]
- if i == 0:
- send_coordinate_batch(ser, batch)
- execution_progress = (i + batch_size, total_coordinates, None) # No ETA yet
- pbar.update(batch_size)
- continue
- while True:
- schedule_checker(schedule_hours) # Check if within schedule
- if ser.in_waiting > 0:
- response = ser.readline().decode().strip()
- if response == "R":
- send_coordinate_batch(ser, batch)
- pbar.update(batch_size) # Update tqdm progress
- # Use tqdm's built-in ETA tracking
- estimated_remaining_time = pbar.format_dict['elapsed'] / (i + batch_size) * (total_coordinates - (i + batch_size))
- # Update execution progress with formatted ETA
- execution_progress = (i + batch_size, total_coordinates, estimated_remaining_time)
- break
- elif response != "IGNORED: FINISHED" and response.startswith("IGNORE"): # Retry the previous batch
- print("Received IGNORE. Resending the previous batch...")
- print(response)
- # Calculate the previous batch indices
- prev_start = max(0, i - batch_size) # Ensure we don't go below 0
- prev_end = i # End of the previous batch is `i`
- previous_batch = coordinates[prev_start:prev_end]
- # Resend the previous batch
- send_coordinate_batch(ser, previous_batch)
- break # Exit the retry loop after resending
- else:
- print(f"Arduino response: {response}")
- reset_theta()
- ser.write("FINISHED\n".encode())
- # Clear tracking variables when done
- current_playing_file = None
- execution_progress = None
- print("Pattern execution completed.")
- def get_clear_pattern_file(clear_pattern_mode, path=None):
- """Return a .thr file path based on pattern_name."""
- if not clear_pattern_mode or clear_pattern_mode == 'none':
- return
- print("Clear pattern mode: " + clear_pattern_mode)
- if clear_pattern_mode == "random":
- # Randomly pick one of the three known patterns
- return random.choice(list(CLEAR_PATTERNS.values()))
- if clear_pattern_mode == 'adaptive':
- _, first_rho = parse_theta_rho_file(path)[0]
- if first_rho < 0.5:
- return CLEAR_PATTERNS['clear_from_out']
- else:
- return random.choice([CLEAR_PATTERNS['clear_from_in'], CLEAR_PATTERNS['clear_sideway']])
- else:
- return CLEAR_PATTERNS[clear_pattern_mode]
- def run_theta_rho_files(
- file_paths,
- pause_time=0,
- clear_pattern=None,
- run_mode="single",
- shuffle=False,
- schedule_hours=None
- ):
- """
- Runs multiple .thr files in sequence with options for pausing, clearing, shuffling, and looping.
- Parameters:
- - file_paths (list): List of file paths to run.
- - pause_time (float): Seconds to pause between patterns.
- - clear_pattern (str): Specific clear pattern to run ("clear_from_in", "clear_from_out", "clear_sideway", "adaptive", or "random").
- - run_mode (str): "single" for one-time run or "indefinite" for looping.
- - shuffle (bool): Whether to shuffle the playlist before running.
- """
- global stop_requested
- global current_playlist
- global current_playing_index
- stop_requested = False # Reset stop flag at the start
- if shuffle:
- random.shuffle(file_paths)
- print("Playlist shuffled.")
- current_playlist = file_paths
- while True:
- for idx, path in enumerate(file_paths):
- print("Upcoming pattern: " + path)
- current_playing_index = idx
- schedule_checker(schedule_hours)
- if stop_requested:
- print("Execution stopped before starting next pattern.")
- return
- if clear_pattern:
- if stop_requested:
- print("Execution stopped before running the next clear pattern.")
- return
- # Determine the clear pattern to run
- clear_file_path = get_clear_pattern_file(clear_pattern, path)
- print(f"Running clear pattern: {clear_file_path}")
- run_theta_rho_file(clear_file_path, schedule_hours)
- if not stop_requested:
- # Run the main pattern
- print(f"Running pattern {idx + 1} of {len(file_paths)}: {path}")
- run_theta_rho_file(path, schedule_hours)
- if idx < len(file_paths) -1:
- if stop_requested:
- print("Execution stopped before running the next clear pattern.")
- return
- # Pause after each pattern if requested
- if pause_time > 0:
- print(f"Pausing for {pause_time} seconds...")
- time.sleep(pause_time)
- # After completing the playlist
- if run_mode == "indefinite":
- print("Playlist completed. Restarting as per 'indefinite' run mode.")
- if pause_time > 0:
- print(f"Pausing for {pause_time} seconds before restarting...")
- time.sleep(pause_time)
- if shuffle:
- random.shuffle(file_paths)
- print("Playlist reshuffled for the next loop.")
- continue
- else:
- print("Playlist completed.")
- break
- # Reset theta after execution or stopping
- reset_theta()
- with serial_lock:
- ser.write("FINISHED\n".encode())
-
- print("All requested patterns completed (or stopped).")
- def reset_theta():
- """Reset theta on the Arduino."""
- with serial_lock:
- ser.write("RESET_THETA\n".encode())
- while True:
- with serial_lock:
- if ser.in_waiting > 0:
- response = ser.readline().decode().strip()
- print(f"Arduino response: {response}")
- if response == "THETA_RESET":
- print("Theta successfully reset.")
- break
- time.sleep(0.5) # Small delay to avoid busy waiting
- # Flask API Endpoints
- @app.route('/')
- def index():
- return render_template('index.html')
- @app.route('/list_serial_ports', methods=['GET'])
- def list_ports():
- return jsonify(list_serial_ports())
- @app.route('/connect_serial', methods=['POST'])
- def connect_serial():
- port = request.json.get('port')
- if not port:
- return jsonify({'error': 'No port provided'}), 400
- try:
- connect_to_serial(port)
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- @app.route('/disconnect_serial', methods=['POST'])
- def disconnect():
- try:
- disconnect_serial()
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- @app.route('/restart_serial', methods=['POST'])
- def restart():
- port = request.json.get('port')
- if not port:
- return jsonify({'error': 'No port provided'}), 400
- try:
- restart_serial(port)
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- @app.route('/list_theta_rho_files', methods=['GET'])
- def list_theta_rho_files():
- files = []
- for root, _, filenames in os.walk(THETA_RHO_DIR):
- for file in filenames:
- # Construct the relative file path
- relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
- files.append(relative_path)
- return jsonify(sorted(files))
- @app.route('/upload_theta_rho', methods=['POST'])
- def upload_theta_rho():
- custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
- os.makedirs(custom_patterns_dir, exist_ok=True) # Ensure the directory exists
- file = request.files['file']
- if file:
- file.save(os.path.join(custom_patterns_dir, file.filename))
- return jsonify({'success': True})
- return jsonify({'success': False})
- @app.route('/run_theta_rho', methods=['POST'])
- def run_theta_rho():
- file_name = request.json.get('file_name')
- pre_execution = request.json.get('pre_execution')
- if not file_name:
- return jsonify({'error': 'No file name provided'}), 400
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- return jsonify({'error': 'File not found'}), 404
- try:
- # Build a list of files to run in sequence
- files_to_run = []
- # Finally, add the main file
- files_to_run.append(file_path)
- # Run them in one shot using run_theta_rho_files (blocking call)
- threading.Thread(
- target=run_theta_rho_files,
- args=(files_to_run,),
- kwargs={
- 'pause_time': 0,
- 'clear_pattern': pre_execution
- }
- ).start()
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- def stop_actions():
- global pause_requested
- with pause_condition:
- pause_requested = False
- pause_condition.notify_all()
-
- global stop_requested, current_playing_index, current_playlist, is_clearing, current_playing_file, execution_progress
- stop_requested = True
- current_playing_index = None
- current_playlist = None
- is_clearing = False
- current_playing_file = None
- execution_progress = None
- @app.route('/stop_execution', methods=['POST'])
- def stop_execution():
- stop_actions()
- return jsonify({'success': True})
- @app.route('/send_home', methods=['POST'])
- def send_home():
- """Send the HOME command to the Arduino."""
- try:
- send_command("HOME")
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
- def run_specific_theta_rho_file(file_name):
- """Run a specific theta-rho file."""
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- return jsonify({'error': 'File not found'}), 404
- threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
- return jsonify({'success': True})
- @app.route('/delete_theta_rho_file', methods=['POST'])
- def delete_theta_rho_file():
- data = request.json
- file_name = data.get('file_name')
- if not file_name:
- return jsonify({"success": False, "error": "No file name provided"}), 400
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- return jsonify({"success": False, "error": "File not found"}), 404
- try:
- os.remove(file_path)
- return jsonify({"success": True})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/move_to_center', methods=['POST'])
- def move_to_center():
- """Move the sand table to the center position."""
- try:
- if ser is None or not ser.is_open:
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
- coordinates = [(0, 0)] # Center position
- send_coordinate_batch(ser, coordinates)
- return jsonify({"success": True})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/move_to_perimeter', methods=['POST'])
- def move_to_perimeter():
- """Move the sand table to the perimeter position."""
- try:
- if ser is None or not ser.is_open:
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
- MAX_RHO = 1
- coordinates = [(0, MAX_RHO)] # Perimeter position
- send_coordinate_batch(ser, coordinates)
- return jsonify({"success": True})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/preview_thr', methods=['POST'])
- def preview_thr():
- file_name = request.json.get('file_name')
- if not file_name:
- return jsonify({'error': 'No file name provided'}), 400
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- return jsonify({'error': 'File not found'}), 404
- try:
- # Parse the .thr file with transformations
- coordinates = parse_theta_rho_file(file_path)
- return jsonify({'success': True, 'coordinates': coordinates})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- @app.route('/send_coordinate', methods=['POST'])
- def send_coordinate():
- """Send a single (theta, rho) coordinate to the Arduino."""
- global ser
- if ser is None or not ser.is_open:
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
- try:
- data = request.json
- theta = data.get('theta')
- rho = data.get('rho')
- if theta is None or rho is None:
- return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
- # Send the coordinate to the Arduino
- send_coordinate_batch(ser, [(theta, rho)])
- return jsonify({"success": True})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- # Expose files for download if needed
- @app.route('/download/<filename>', methods=['GET'])
- def download_file(filename):
- """Download a file from the theta-rho directory."""
- return send_from_directory(THETA_RHO_DIR, filename)
- @app.route('/serial_status', methods=['GET'])
- def serial_status():
- global ser, ser_port
- return jsonify({
- 'connected': ser.is_open if ser else False,
- 'port': ser_port # Include the port name
- })
- @app.route('/pause_execution', methods=['POST'])
- def pause_execution():
- """Pause the current execution."""
- global pause_requested
- with pause_condition:
- pause_requested = True
- return jsonify({'success': True, 'message': 'Execution paused'})
- @app.route('/status', methods=['GET'])
- def get_status():
- """Returns the current status of the sand table."""
- global is_clearing
- if current_playing_file in CLEAR_PATTERNS.values():
- is_clearing = True
- else:
- is_clearing = False
- return jsonify({
- "ser_port": ser_port,
- "stop_requested": stop_requested,
- "pause_requested": pause_requested,
- "current_playing_file": current_playing_file,
- "execution_progress": execution_progress,
- "current_playing_index": current_playing_index,
- "current_playlist": current_playlist,
- "is_clearing": is_clearing
- })
- @app.route('/resume_execution', methods=['POST'])
- def resume_execution():
- """Resume execution after pausing."""
- global pause_requested
- with pause_condition:
- pause_requested = False
- pause_condition.notify_all() # Unblock the waiting thread
- return jsonify({'success': True, 'message': 'Execution resumed'})
- def load_playlists():
- """
- Load the entire playlists dictionary from the JSON file.
- Returns something like: {
- "My Playlist": ["file1.thr", "file2.thr"],
- "Another": ["x.thr"]
- }
- """
- with open(PLAYLISTS_FILE, "r") as f:
- return json.load(f)
- def save_playlists(playlists_dict):
- """
- Save the entire playlists dictionary back to the JSON file.
- """
- with open(PLAYLISTS_FILE, "w") as f:
- json.dump(playlists_dict, f, indent=2)
- @app.route("/list_all_playlists", methods=["GET"])
- def list_all_playlists():
- """
- Returns a list of all playlist names.
- Example return: ["My Playlist", "Another Playlist"]
- """
- playlists_dict = load_playlists()
- playlist_names = list(playlists_dict.keys())
- return jsonify(playlist_names)
- @app.route("/get_playlist", methods=["GET"])
- def get_playlist():
- """
- GET /get_playlist?name=My%20Playlist
- Returns: { "name": "My Playlist", "files": [... ] }
- """
- playlist_name = request.args.get("name", "")
- if not playlist_name:
- return jsonify({"error": "Missing playlist 'name' parameter"}), 400
- playlists_dict = load_playlists()
- if playlist_name not in playlists_dict:
- return jsonify({"error": f"Playlist '{playlist_name}' not found"}), 404
- files = playlists_dict[playlist_name] # e.g. ["file1.thr", "file2.thr"]
- return jsonify({
- "name": playlist_name,
- "files": files
- })
- @app.route("/create_playlist", methods=["POST"])
- def create_playlist():
- """
- POST /create_playlist
- Body: { "name": "My Playlist", "files": ["file1.thr", "file2.thr"] }
- Creates or overwrites a playlist with the given name.
- """
- data = request.get_json()
- if not data or "name" not in data or "files" not in data:
- return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
- playlist_name = data["name"]
- files = data["files"]
- # Load all playlists
- playlists_dict = load_playlists()
- # Overwrite or create new
- playlists_dict[playlist_name] = files
- # Save changes
- save_playlists(playlists_dict)
- return jsonify({
- "success": True,
- "message": f"Playlist '{playlist_name}' created/updated"
- })
- @app.route("/modify_playlist", methods=["POST"])
- def modify_playlist():
- """
- POST /modify_playlist
- Body: { "name": "My Playlist", "files": ["file1.thr", "file2.thr"] }
- Updates (or creates) the existing playlist with a new file list.
- You can 404 if you only want to allow modifications to existing playlists.
- """
- data = request.get_json()
- if not data or "name" not in data or "files" not in data:
- return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
- playlist_name = data["name"]
- files = data["files"]
- # Load all playlists
- playlists_dict = load_playlists()
- # Optional: If you want to disallow creating a new playlist here:
- # if playlist_name not in playlists_dict:
- # return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
- # Overwrite or create new
- playlists_dict[playlist_name] = files
- # Save
- save_playlists(playlists_dict)
- return jsonify({"success": True, "message": f"Playlist '{playlist_name}' updated"})
- @app.route("/delete_playlist", methods=["DELETE"])
- def delete_playlist():
- """
- DELETE /delete_playlist
- Body: { "name": "My Playlist" }
- Removes the playlist from the single JSON file.
- """
- data = request.get_json()
- if not data or "name" not in data:
- return jsonify({"success": False, "error": "Missing 'name' field"}), 400
- playlist_name = data["name"]
- playlists_dict = load_playlists()
- if playlist_name not in playlists_dict:
- return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
- # Remove from dict
- del playlists_dict[playlist_name]
- save_playlists(playlists_dict)
- return jsonify({
- "success": True,
- "message": f"Playlist '{playlist_name}' deleted"
- })
- @app.route('/add_to_playlist', methods=['POST'])
- def add_to_playlist():
- data = request.json
- playlist_name = data.get('playlist_name')
- pattern = data.get('pattern')
- # Load existing playlists
- with open('playlists.json', 'r') as f:
- playlists = json.load(f)
- # Add pattern to the selected playlist
- if playlist_name in playlists:
- playlists[playlist_name].append(pattern)
- with open('playlists.json', 'w') as f:
- json.dump(playlists, f)
- return jsonify(success=True)
- else:
- return jsonify(success=False, error='Playlist not found'), 404
- @app.route("/run_playlist", methods=["POST"])
- def run_playlist():
- """
- POST /run_playlist
- Body (JSON):
- {
- "playlist_name": "My Playlist",
- "pause_time": 1.0, # Optional: seconds to pause between patterns
- "clear_pattern": "random", # Optional: "clear_from_in", "clear_from_out", "clear_sideway", "adaptive" or "random"
- "run_mode": "single", # 'single' or 'indefinite'
- "shuffle": True # true or false
- "start_time": ""
- "end_time": ""
- }
- """
- data = request.get_json()
- # Validate input
- if not data or "playlist_name" not in data:
- return jsonify({"success": False, "error": "Missing 'playlist_name' field"}), 400
- playlist_name = data["playlist_name"]
- pause_time = data.get("pause_time", 0)
- clear_pattern = data.get("clear_pattern", None)
- run_mode = data.get("run_mode", "single") # Default to 'single' run
- shuffle = data.get("shuffle", False) # Default to no shuffle
- start_time = data.get("start_time", None)
- end_time = data.get("end_time", None)
- # Validate pause_time
- if not isinstance(pause_time, (int, float)) or pause_time < 0:
- return jsonify({"success": False, "error": "'pause_time' must be a non-negative number"}), 400
- # Validate clear_pattern
- valid_patterns = ["clear_from_in", "clear_from_out", "clear_sideway", "random", "adaptive"]
- if clear_pattern not in valid_patterns:
- clear_pattern = None
- # Validate run_mode
- if run_mode not in ["single", "indefinite"]:
- return jsonify({"success": False, "error": "'run_mode' must be 'single' or 'indefinite'"}), 400
- # Validate shuffle
- if not isinstance(shuffle, bool):
- return jsonify({"success": False, "error": "'shuffle' must be a boolean value"}), 400
- schedule_hours = None
- if start_time and end_time:
- try:
- # Convert HH:MM to datetime.time objects
- start_time_obj = datetime.strptime(start_time, "%H:%M").time()
- end_time_obj = datetime.strptime(end_time, "%H:%M").time()
- # Ensure start_time is before end_time
- if start_time_obj >= end_time_obj:
- return jsonify({"success": False, "error": "'start_time' must be earlier than 'end_time'"}), 400
- # Create schedule tuple with full time
- schedule_hours = (start_time_obj, end_time_obj)
- except ValueError:
- return jsonify({"success": False, "error": "Invalid time format. Use HH:MM (e.g., '09:30')"}), 400
- # Load playlists
- playlists = load_playlists()
- if playlist_name not in playlists:
- return jsonify({"success": False, "error": f"Playlist '{playlist_name}' not found"}), 404
- file_paths = playlists[playlist_name]
- file_paths = [os.path.join(THETA_RHO_DIR, file) for file in file_paths]
- if not file_paths:
- return jsonify({"success": False, "error": f"Playlist '{playlist_name}' is empty"}), 400
- # Start the playlist execution in a separate thread
- try:
- threading.Thread(
- target=run_theta_rho_files,
- args=(file_paths,),
- kwargs={
- 'pause_time': pause_time,
- 'clear_pattern': clear_pattern,
- 'run_mode': run_mode,
- 'shuffle': shuffle,
- 'schedule_hours': schedule_hours
- },
- daemon=True # Daemonize thread to exit with the main program
- ).start()
- return jsonify({"success": True, "message": f"Playlist '{playlist_name}' is now running."})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/set_speed', methods=['POST'])
- def set_speed():
- """Set the speed for the Arduino."""
- global ser
- if ser is None or not ser.is_open:
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
- try:
- # Parse the speed value from the request
- data = request.json
- speed = data.get('speed')
- if speed is None:
- return jsonify({"success": False, "error": "Speed is required"}), 400
- if not isinstance(speed, (int, float)) or speed <= 0:
- return jsonify({"success": False, "error": "Invalid speed value"}), 400
- # Send the SET_SPEED command to the Arduino
- command = f"SET_SPEED {speed}"
- send_command(command)
- return jsonify({"success": True, "speed": speed})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/get_firmware_info', methods=['GET', 'POST'])
- def get_firmware_info():
- """
- Compare the installed firmware version and motor type with the one in the .ino file.
- """
- global firmware_version, arduino_driver_type, ser
- if ser is None or not ser.is_open:
- return jsonify({"success": False, "error": "Arduino not connected or serial port not open"}), 400
- try:
- if request.method == "GET":
- # Attempt to retrieve installed firmware details from the Arduino
- time.sleep(0.5)
- installed_version = firmware_version
- installed_type = arduino_driver_type
- # If Arduino provides valid details, proceed with comparison
- if installed_version != 'Unknown' and installed_type != 'Unknown':
- ino_path = MOTOR_TYPE_MAPPING.get(installed_type)
- firmware_details = get_ino_firmware_details(ino_path)
- if not firmware_details or not firmware_details.get("version") or not firmware_details.get("motorType"):
- return jsonify({"success": False, "error": "Failed to retrieve .ino firmware details"}), 500
- update_available = (
- installed_version != firmware_details["version"] or
- installed_type != firmware_details["motorType"]
- )
- return jsonify({
- "success": True,
- "installedVersion": installed_version,
- "installedType": installed_type,
- "inoVersion": firmware_details["version"],
- "inoType": firmware_details["motorType"],
- "updateAvailable": update_available
- })
- # If Arduino details are unknown, indicate the need for POST
- return jsonify({
- "success": True,
- "installedVersion": installed_version,
- "installedType": installed_type,
- "updateAvailable": False
- })
- elif request.method == "POST":
- motor_type = request.json.get("motorType", None)
- if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
- return jsonify({
- "success": False,
- "error": "Invalid or missing motor type"
- }), 400
- # Fetch firmware details for the given motor type
- ino_path = MOTOR_TYPE_MAPPING[motor_type]
- firmware_details = get_ino_firmware_details(ino_path)
- if not firmware_details:
- return jsonify({
- "success": False,
- "error": "Failed to retrieve .ino firmware details"
- }), 500
- return jsonify({
- "success": True,
- "installedVersion": 'Unknown',
- "installedType": motor_type,
- "inoVersion": firmware_details["version"],
- "inoType": firmware_details["motorType"],
- "updateAvailable": True
- })
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/flash_firmware', methods=['POST'])
- def flash_firmware():
- """
- Flash the pre-compiled firmware to the connected device (Arduino or ESP32).
- """
- global ser_port
- # Ensure the device is connected
- if ser_port is None or ser is None or not ser.is_open:
- return jsonify({"success": False, "error": "No device connected or connection lost"}), 400
- try:
- data = request.json
- motor_type = data.get("motorType", None)
- # Validate motor type
- if not motor_type or motor_type not in MOTOR_TYPE_MAPPING:
- return jsonify({"success": False, "error": "Invalid or missing motor type"}), 400
- # Determine the firmware file
- ino_file_path = MOTOR_TYPE_MAPPING[motor_type] # Path to .ino file
- hex_file_path = f"{ino_file_path}.hex"
- bin_file_path = f"{ino_file_path}.bin" # For ESP32 firmware
- # Check the device type
- if motor_type.lower() in ["esp32", "esp32_tmc2209"]:
- if not os.path.exists(bin_file_path):
- return jsonify({"success": False, "error": f"Firmware binary not found: {bin_file_path}"}), 404
- # Flash ESP32 firmware
- flash_command = [
- "esptool.py",
- "--chip", "esp32",
- "--port", ser_port,
- "--baud", "115200",
- "write_flash", "-z", "0x1000", bin_file_path
- ]
- else:
- if not os.path.exists(hex_file_path):
- return jsonify({"success": False, "error": f"Hex file not found: {hex_file_path}"}), 404
- # Flash Arduino firmware
- flash_command = [
- "avrdude",
- "-v",
- "-c", "arduino",
- "-p", "atmega328p",
- "-P", ser_port,
- "-b", "115200",
- "-D",
- "-U", f"flash:w:{hex_file_path}:i"
- ]
- # Execute the flash command
- flash_process = subprocess.run(flash_command, capture_output=True, text=True)
- if flash_process.returncode != 0:
- return jsonify({
- "success": False,
- "error": flash_process.stderr
- }), 500
- return jsonify({"success": True, "message": "Firmware flashed successfully"})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/check_software_update', methods=['GET'])
- def check_updates():
- update_info = check_git_updates()
- return jsonify(update_info)
- @app.route('/update_software', methods=['POST'])
- def update_software():
- error_log = []
- def run_command(command, error_message):
- try:
- subprocess.run(command, check=True)
- except subprocess.CalledProcessError as e:
- print(f"{error_message}: {e}")
- error_log.append(error_message)
- # Fetch the latest version tag from remote
- try:
- subprocess.run(["git", "fetch", "--tags"], check=True)
- latest_remote_tag = subprocess.check_output(
- ["git", "describe", "--tags", "--abbrev=0", "origin/main"]
- ).strip().decode()
- except subprocess.CalledProcessError as e:
- error_log.append(f"Failed to fetch tags or get latest remote tag: {e}")
- return jsonify({
- "success": False,
- "error": "Failed to fetch tags or determine the latest version.",
- "details": error_log
- }), 500
- # Checkout the latest tag
- run_command(["git", "checkout", latest_remote_tag, '--force'], f"Failed to checkout version {latest_remote_tag}")
- run_command(["docker", "compose", "pull"], "Failed to fetch Docker containers")
- # Restart Docker containers
- run_command(["docker", "compose", "up", "-d"], "Failed to restart Docker containers")
- # Check if the update was successful
- update_status = check_git_updates()
- if (
- update_status["updates_available"] is False
- and update_status["latest_local_tag"] == update_status["latest_remote_tag"]
- ):
- # Update was successful
- return jsonify({"success": True})
- else:
- # Update failed; include the errors in the response
- return jsonify({
- "success": False,
- "error": "Update incomplete",
- "details": error_log
- }), 500
- def on_exit():
- """Function to execute on application shutdown."""
- print("Shutting down the application...")
- stop_actions()
- time.sleep(5)
- print("Execution stopped and resources cleaned up.")
- # Register the on_exit function
- atexit.register(on_exit)
-
-
- if __name__ == '__main__':
- # Auto-connect to serial
- connect_to_serial()
- try:
- app.run(debug=False, host='0.0.0.0', port=8080)
- except KeyboardInterrupt:
- print("Keyboard interrupt received. Shutting down.")
- finally:
- on_exit() # Ensure cleanup if app is interrupted
|