import os import threading import time import random import logging from datetime import datetime from tqdm import tqdm from dune_weaver_flask.modules.serial import serial_manager from dune_weaver_flask.modules.core.state import state from math import pi # Configure logging logger = logging.getLogger(__name__) # Global state THETA_RHO_DIR = './patterns' CLEAR_PATTERNS = { "clear_from_in": "./patterns/clear_from_in.thr", "clear_from_out": "./patterns/clear_from_out.thr", "clear_sideway": "./patterns/clear_sideway.thr" } os.makedirs(THETA_RHO_DIR, exist_ok=True) current_playlist = [] current_playing_index = None def list_theta_rho_files(): files = [] for root, _, filenames in os.walk(THETA_RHO_DIR): for file in filenames: relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR) files.append(relative_path) logger.debug(f"Found {len(files)} theta-rho files") return files def parse_theta_rho_file(file_path): """Parse a theta-rho file and return a list of (theta, rho) pairs.""" coordinates = [] try: logger.debug(f"Parsing theta-rho file: {file_path}") with open(file_path, 'r') as file: for line in file: line = line.strip() if not line or line.startswith("#"): continue try: theta, rho = map(float, line.split()) coordinates.append((theta, rho)) except ValueError: logger.warning(f"Skipping invalid line: {line}") continue except Exception as e: logger.error(f"Error reading file: {e}") return coordinates # Normalization Step if coordinates: first_theta = coordinates[0][0] normalized = [(theta - first_theta, rho) for theta, rho in coordinates] coordinates = normalized logger.debug(f"Parsed {len(coordinates)} coordinates from {file_path}") return coordinates def get_clear_pattern_file(clear_pattern_mode, path=None): """Return a .thr file path based on pattern_name.""" if not clear_pattern_mode or clear_pattern_mode == 'none': return logger.info("Clear pattern mode: " + clear_pattern_mode) if clear_pattern_mode == "random": return random.choice(list(CLEAR_PATTERNS.values())) if clear_pattern_mode == 'adaptive': _, first_rho = parse_theta_rho_file(path)[0] if first_rho < 0.5: return CLEAR_PATTERNS['clear_from_out'] else: return random.choice([CLEAR_PATTERNS['clear_from_in'], CLEAR_PATTERNS['clear_sideway']]) else: return CLEAR_PATTERNS[clear_pattern_mode] def schedule_checker(schedule_hours): """Pauses/resumes execution based on a given time range.""" if not schedule_hours: return start_time, end_time = schedule_hours now = datetime.now().time() if start_time <= now < end_time: if state.pause_requested: logger.info("Starting execution: Within schedule") serial_manager.update_machine_position() state.pause_requested = False with state.pause_condition: state.pause_condition.notify_all() else: if not state.pause_requested: logger.info("Pausing execution: Outside schedule") state.pause_requested = True serial_manager.update_machine_position() threading.Thread(target=wait_for_start_time, args=(schedule_hours,), daemon=True).start() def wait_for_start_time(schedule_hours): """Keep checking every 30 seconds if the time is within the schedule to resume execution.""" start_time, end_time = schedule_hours while state.pause_requested: now = datetime.now().time() if start_time <= now < end_time: logger.info("Resuming execution: Within schedule") state.pause_requested = False with state.pause_condition: state.pause_condition.notify_all() break else: time.sleep(30) def interpolate_path(theta, rho): # Adding soft limit to reduce hardware sound soft_limit_threshold = 0.01 if rho < soft_limit_threshold: rho = soft_limit_threshold elif rho > (1-soft_limit_threshold): rho = (1-soft_limit_threshold) delta_theta = theta - state.current_theta delta_rho = rho - state.current_rho # x_increment = delta_theta / (2 * pi) * 100 x_increment = delta_theta / pi * 100 y_increment = delta_rho * 100/5 offset = x_increment * (1600/5750/5) # Total angular steps = 16000 / gear ratio = 10 / angular steps = 5750 y_increment += offset new_x_abs = state.machine_x + x_increment new_y_abs = state.machine_y + y_increment # dynamic_speed = compute_dynamic_speed(rho, max_speed=state.speed) serial_manager.send_grbl_coordinates(round(new_x_abs, 3), round(new_y_abs,3), state.speed) state.current_theta = theta state.current_rho = rho state.machine_x = new_x_abs state.machine_y = new_y_abs def reset_theta(): logger.info('Resetting Theta') state.current_theta = 0 serial_manager.update_machine_position() def set_speed(new_speed): state.speed = new_speed logger.info(f'Set new state.speed {new_speed}') def run_theta_rho_file(file_path, schedule_hours=None): """Run a theta-rho file by sending data in optimized batches with tqdm ETA tracking.""" if not file_path: return coordinates = parse_theta_rho_file(file_path) total_coordinates = len(coordinates) if total_coordinates < 2: logger.warning("Not enough coordinates for interpolation") state.current_playing_file = None state.execution_progress = None return state.execution_progress = (0, total_coordinates, None) stop_actions() with serial_manager.serial_lock: state.current_playing_file = file_path state.execution_progress = (0, 0, None) state.stop_requested = False logger.info(f"Starting pattern execution: {file_path}") logger.info(f"t: {state.current_theta}, r: {state.current_rho}") reset_theta() with tqdm(total=total_coordinates, unit="coords", desc=f"Executing Pattern {file_path}", dynamic_ncols=True, disable=None) as pbar: for i, coordinate in enumerate(coordinates): theta, rho = coordinate if state.stop_requested: logger.info("Execution stopped by user after completing the current batch") break with state.pause_condition: while state.pause_requested: logger.info("Execution paused...") state.pause_condition.wait() schedule_checker(schedule_hours) interpolate_path(theta, rho) if i != 0: pbar.update(1) estimated_remaining_time = pbar.format_dict['elapsed'] / i * total_coordinates state.execution_progress = (i, total_coordinates, estimated_remaining_time) serial_manager.check_idle() state.current_playing_file = None state.execution_progress = None logger.info("Pattern execution completed") def run_theta_rho_files(file_paths, pause_time=0, clear_pattern=None, run_mode="single", shuffle=False, schedule_hours=None): """Run multiple .thr files in sequence with options.""" global current_playing_index, current_playlist state.stop_requested = False if shuffle: random.shuffle(file_paths) logger.info("Playlist shuffled") current_playlist = file_paths while True: for idx, path in enumerate(file_paths): logger.info(f"Upcoming pattern: {path}") logger.info(idx) current_playing_index = idx schedule_checker(schedule_hours) if state.stop_requested: logger.info("Execution stopped before starting next pattern") return if clear_pattern: if state.stop_requested: logger.info("Execution stopped before running the next clear pattern") return clear_file_path = get_clear_pattern_file(clear_pattern, path) logger.info(f"Running clear pattern: {clear_file_path}") run_theta_rho_file(clear_file_path, schedule_hours) if not state.stop_requested: logger.info(f"Running pattern {idx + 1} of {len(file_paths)}: {path}") run_theta_rho_file(path, schedule_hours) if idx < len(file_paths) - 1: if state.stop_requested: logger.info("Execution stopped before running the next clear pattern") return if pause_time > 0: logger.debug(f"Pausing for {pause_time} seconds") time.sleep(pause_time) if run_mode == "indefinite": logger.info("Playlist completed. Restarting as per 'indefinite' run mode") if pause_time > 0: logger.debug(f"Pausing for {pause_time} seconds before restarting") time.sleep(pause_time) if shuffle: random.shuffle(file_paths) logger.info("Playlist reshuffled for the next loop") continue else: logger.info("Playlist completed") break logger.info("All requested patterns completed (or stopped)") def stop_actions(): """Stop all current pattern execution.""" with state.pause_condition: state.pause_requested = False state.stop_requested = True current_playing_index = None current_playlist = None state.is_clearing = False state.current_playing_file = None state.execution_progress = None serial_manager.update_machine_position() def get_status(): """Get the current execution status.""" # Update state.is_clearing based on current file if state.current_playing_file in CLEAR_PATTERNS.values(): state.is_clearing = True else: state.is_clearing = False return { "ser_port": serial_manager.get_port(), "stop_requested": state.stop_requested, "pause_requested": state.pause_requested, "current_playing_file": state.current_playing_file, "execution_progress": state.execution_progress, "current_playing_index": current_playing_index, "current_playlist": current_playlist, "is_clearing": state.is_clearing }