pattern_manager.py 20 KB


  1. import os
  2. import threading
  3. import time
  4. import random
  5. import logging
  6. from datetime import datetime
  7. from tqdm import tqdm
  8. from modules.connection import connection_manager
  9. from modules.core.state import state
  10. from math import pi
  11. import asyncio
  12. import json
  13. # Configure logging
  14. logger = logging.getLogger(__name__)
  15. # Global state
  16. THETA_RHO_DIR = './patterns'
  17. CLEAR_PATTERNS = {
  18. "clear_from_in": "./patterns/clear_from_in.thr",
  19. "clear_from_out": "./patterns/clear_from_out.thr",
  20. "clear_sideway": "./patterns/clear_sideway.thr"
  21. }
  22. os.makedirs(THETA_RHO_DIR, exist_ok=True)
  23. # Create an asyncio Event for pause/resume
  24. pause_event = asyncio.Event()
  25. pause_event.set() # Initially not paused
  26. # Create an asyncio Lock for pattern execution
  27. pattern_lock = asyncio.Lock()
  28. # Progress update task
  29. progress_update_task = None
  30. async def cleanup_pattern_manager():
  31. """Clean up pattern manager resources."""
  32. global progress_update_task, pattern_lock, pause_event
  33. try:
  34. # Cancel progress update task if running
  35. if progress_update_task and not progress_update_task.done():
  36. progress_update_task.cancel()
  37. try:
  38. # Use shield to prevent cancellation of the cleanup itself
  39. await asyncio.shield(progress_update_task)
  40. except asyncio.CancelledError:
  41. pass
  42. except Exception as e:
  43. logger.error(f"Error cancelling progress update task: {e}")
  44. finally:
  45. progress_update_task = None
  46. # Clean up pattern lock
  47. if pattern_lock:
  48. try:
  49. if pattern_lock.locked():
  50. # Release the lock directly instead of manipulating internal state
  51. pattern_lock._locked = False
  52. for waiter in pattern_lock._waiters:
  53. if not waiter.done():
  54. waiter.set_result(True)
  55. except Exception as e:
  56. logger.error(f"Error cleaning up pattern lock: {e}")
  57. pattern_lock = None
  58. # Clean up pause event
  59. if pause_event:
  60. try:
  61. # Set the event and wake up any waiters
  62. pause_event.set()
  63. for waiter in pause_event._waiters:
  64. if not waiter.done():
  65. waiter.set()
  66. except Exception as e:
  67. logger.error(f"Error cleaning up pause event: {e}")
  68. pause_event = None
  69. # Clean up pause condition from state
  70. if state.pause_condition:
  71. try:
  72. with state.pause_condition:
  73. # Wake up all waiting threads
  74. state.pause_condition.notify_all()
  75. # Create a new condition to ensure clean state
  76. state.pause_condition = threading.Condition()
  77. except Exception as e:
  78. logger.error(f"Error cleaning up pause condition: {e}")
  79. # Clear all state variables
  80. state.current_playing_file = None
  81. state.execution_progress = None
  82. state.current_playlist = None
  83. state.current_playlist_index = None
  84. state.playlist_mode = None
  85. state.pause_requested = False
  86. state.stop_requested = True
  87. state.is_clearing = False
  88. # Reset machine state
  89. connection_manager.update_machine_position()
  90. logger.info("Pattern manager resources cleaned up")
  91. except Exception as e:
  92. logger.error(f"Error during pattern manager cleanup: {e}")
  93. raise
  94. def list_theta_rho_files():
  95. files = []
  96. for root, _, filenames in os.walk(THETA_RHO_DIR):
  97. for file in filenames:
  98. relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
  99. files.append(relative_path)
  100. logger.debug(f"Found {len(files)} theta-rho files")
  101. return files
  102. def parse_theta_rho_file(file_path):
  103. """Parse a theta-rho file and return a list of (theta, rho) pairs."""
  104. coordinates = []
  105. try:
  106. logger.debug(f"Parsing theta-rho file: {file_path}")
  107. with open(file_path, 'r') as file:
  108. for line in file:
  109. line = line.strip()
  110. if not line or line.startswith("#"):
  111. continue
  112. try:
  113. theta, rho = map(float, line.split())
  114. coordinates.append((theta, rho))
  115. except ValueError:
  116. logger.warning(f"Skipping invalid line: {line}")
  117. continue
  118. except Exception as e:
  119. logger.error(f"Error reading file: {e}")
  120. return coordinates
  121. # Normalization Step
  122. if coordinates:
  123. first_theta = coordinates[0][0]
  124. normalized = [(theta - first_theta, rho) for theta, rho in coordinates]
  125. coordinates = normalized
  126. logger.debug(f"Parsed {len(coordinates)} coordinates from {file_path}")
  127. return coordinates
  128. def get_clear_pattern_file(clear_pattern_mode, path=None):
  129. """Return a .thr file path based on pattern_name."""
  130. if not clear_pattern_mode or clear_pattern_mode == 'none':
  131. return
  132. logger.info("Clear pattern mode: " + clear_pattern_mode)
  133. if clear_pattern_mode == "random":
  134. return random.choice(list(CLEAR_PATTERNS.values()))
  135. if clear_pattern_mode == 'adaptive':
  136. if not path:
  137. logger.warning("No path provided for adaptive clear pattern")
  138. return random.choice(list(CLEAR_PATTERNS.values()))
  139. coordinates = parse_theta_rho_file(path)
  140. if not coordinates:
  141. logger.warning("No valid coordinates found in file for adaptive clear pattern")
  142. return random.choice(list(CLEAR_PATTERNS.values()))
  143. first_rho = coordinates[0][1]
  144. if first_rho < 0.5:
  145. return CLEAR_PATTERNS['clear_from_out']
  146. else:
  147. return random.choice([CLEAR_PATTERNS['clear_from_in'], CLEAR_PATTERNS['clear_sideway']])
  148. else:
  149. if clear_pattern_mode not in CLEAR_PATTERNS:
  150. logger.warning(f"Invalid clear pattern mode: {clear_pattern_mode}")
  151. return random.choice(list(CLEAR_PATTERNS.values()))
  152. return CLEAR_PATTERNS[clear_pattern_mode]
  153. async def run_theta_rho_file(file_path, is_playlist=False):
  154. """Run a theta-rho file by sending data in optimized batches with tqdm ETA tracking."""
  155. if pattern_lock.locked():
  156. logger.warning("Another pattern is already running. Cannot start a new one.")
  157. return
  158. async with pattern_lock: # This ensures only one pattern can run at a time
  159. # Start progress update task only if not part of a playlist
  160. global progress_update_task
  161. if not is_playlist and not progress_update_task:
  162. progress_update_task = asyncio.create_task(broadcast_progress())
  163. coordinates = parse_theta_rho_file(file_path)
  164. total_coordinates = len(coordinates)
  165. if total_coordinates < 2:
  166. logger.warning("Not enough coordinates for interpolation")
  167. if not is_playlist:
  168. state.current_playing_file = None
  169. state.execution_progress = None
  170. return
  171. state.execution_progress = (0, total_coordinates, None, 0)
  172. # stop actions without resetting the playlist
  173. stop_actions(clear_playlist=False)
  174. state.current_playing_file = file_path
  175. state.stop_requested = False
  176. logger.info(f"Starting pattern execution: {file_path}")
  177. logger.info(f"t: {state.current_theta}, r: {state.current_rho}")
  178. reset_theta()
  179. start_time = time.time()
  180. with tqdm(
  181. total=total_coordinates,
  182. unit="coords",
  183. desc=f"Executing Pattern {file_path}",
  184. dynamic_ncols=True,
  185. disable=False,
  186. mininterval=1.0
  187. ) as pbar:
  188. for i, coordinate in enumerate(coordinates):
  189. theta, rho = coordinate
  190. if state.stop_requested:
  191. logger.info("Execution stopped by user")
  192. break
  193. # Wait for resume if paused
  194. if state.pause_requested:
  195. logger.info("Execution paused...")
  196. await pause_event.wait()
  197. logger.info("Execution resumed...")
  198. move_polar(theta, rho)
  199. # Update progress for all coordinates including the first one
  200. pbar.update(1)
  201. elapsed_time = time.time() - start_time
  202. estimated_remaining_time = (total_coordinates - (i + 1)) / pbar.format_dict['rate'] if pbar.format_dict['rate'] and total_coordinates else 0
  203. state.execution_progress = (i + 1, total_coordinates, estimated_remaining_time, elapsed_time)
  204. # Add a small delay to allow other async operations
  205. await asyncio.sleep(0.001)
  206. # Update progress one last time to show 100%
  207. elapsed_time = time.time() - start_time
  208. state.execution_progress = (total_coordinates, total_coordinates, 0, elapsed_time)
  209. # Give WebSocket a chance to send the final update
  210. await asyncio.sleep(0.1)
  211. connection_manager.check_idle()
  212. # Only clear state if not part of a playlist
  213. if not is_playlist:
  214. state.current_playing_file = None
  215. state.execution_progress = None
  216. logger.info("Pattern execution completed and state cleared")
  217. else:
  218. logger.info("Pattern execution completed, maintaining state for playlist")
  219. # Only cancel progress update task if not part of a playlist
  220. if not is_playlist and progress_update_task:
  221. progress_update_task.cancel()
  222. try:
  223. await progress_update_task
  224. except asyncio.CancelledError:
  225. pass
  226. progress_update_task = None
  227. async def run_theta_rho_files(file_paths, pause_time=0, clear_pattern=None, run_mode="single", shuffle=False):
  228. """Run multiple .thr files in sequence with options."""
  229. state.stop_requested = False
  230. # Set initial playlist state
  231. state.playlist_mode = run_mode
  232. state.current_playlist_index = 0
  233. state.current_playlist = file_paths
  234. # Start progress update task for the playlist
  235. global progress_update_task
  236. if not progress_update_task:
  237. progress_update_task = asyncio.create_task(broadcast_progress())
  238. if shuffle:
  239. random.shuffle(file_paths)
  240. logger.info("Playlist shuffled")
  241. try:
  242. while True:
  243. for idx, path in enumerate(file_paths):
  244. logger.info(f"Upcoming pattern: {path}")
  245. state.current_playlist_index = idx
  246. if state.stop_requested:
  247. logger.info("Execution stopped before starting next pattern")
  248. return
  249. if clear_pattern and clear_pattern != 'none':
  250. if state.stop_requested:
  251. logger.info("Execution stopped before running the next clear pattern")
  252. return
  253. clear_file_path = get_clear_pattern_file(clear_pattern, path)
  254. if clear_file_path: # Only run clear pattern if we got a valid file path
  255. logger.info(f"Running clear pattern: {clear_file_path}")
  256. await run_theta_rho_file(clear_file_path, is_playlist=True)
  257. else:
  258. logger.info("Skipping clear pattern - no valid clear pattern file")
  259. if not state.stop_requested:
  260. logger.info(f"Running pattern {idx + 1} of {len(file_paths)}: {path}")
  261. await run_theta_rho_file(path, is_playlist=True)
  262. if idx < len(file_paths) - 1:
  263. if state.stop_requested:
  264. logger.info("Execution stopped before running the next clear pattern")
  265. return
  266. if pause_time > 0:
  267. logger.info(f"Pausing for {pause_time} seconds")
  268. await asyncio.sleep(pause_time)
  269. if run_mode == "indefinite":
  270. logger.info("Playlist completed. Restarting as per 'indefinite' run mode")
  271. if pause_time > 0:
  272. logger.debug(f"Pausing for {pause_time} seconds before restarting")
  273. await asyncio.sleep(pause_time)
  274. if shuffle:
  275. random.shuffle(file_paths)
  276. logger.info("Playlist reshuffled for the next loop")
  277. continue
  278. else:
  279. logger.info("Playlist completed")
  280. break
  281. finally:
  282. # Clean up progress update task at the end of the playlist
  283. if progress_update_task:
  284. progress_update_task.cancel()
  285. try:
  286. await progress_update_task
  287. except asyncio.CancelledError:
  288. pass
  289. progress_update_task = None
  290. # Clear all state variables
  291. state.current_playing_file = None
  292. state.execution_progress = None
  293. state.current_playlist = None
  294. state.current_playlist_index = None
  295. state.playlist_mode = None
  296. logger.info("All requested patterns completed (or stopped) and state cleared")
  297. def stop_actions(clear_playlist = True):
  298. """Stop all current actions."""
  299. try:
  300. with state.pause_condition:
  301. state.pause_requested = False
  302. state.stop_requested = True
  303. state.current_playing_file = None
  304. state.execution_progress = None
  305. state.is_clearing = False
  306. if clear_playlist:
  307. # Clear playlist state
  308. state.current_playlist = None
  309. state.current_playlist_index = None
  310. state.playlist_mode = None
  311. # Cancel progress update task if we're clearing the playlist
  312. global progress_update_task
  313. if progress_update_task and not progress_update_task.done():
  314. progress_update_task.cancel()
  315. state.pause_condition.notify_all()
  316. connection_manager.update_machine_position()
  317. except Exception as e:
  318. logger.error(f"Error during stop_actions: {e}")
  319. # Ensure we still update machine position even if there's an error
  320. connection_manager.update_machine_position()
  321. def move_polar(theta, rho):
  322. """
  323. This functions take in a pair of theta rho coordinate, compute the distance to travel based on current theta, rho,
  324. and translate the motion to gcode jog command and sent to grbl.
  325. Since having similar steps_per_mm will make x and y axis moves at around the same speed, we have to scale the
  326. x_steps_per_mm and y_steps_per_mm so that they are roughly the same. Here's the range of motion:
  327. X axis (angular): 50mm = 1 revolution
  328. Y axis (radial): 0 => 20mm = theta 0 (center) => 1 (perimeter)
  329. Args:
  330. theta (_type_): _description_
  331. rho (_type_): _description_
  332. """
  333. # Adding soft limit to reduce hardware sound
  334. soft_limit_inner = 0.01
  335. if rho < soft_limit_inner:
  336. rho = soft_limit_inner
  337. soft_limit_outter = 0.015
  338. if rho > (1-soft_limit_outter):
  339. rho = (1-soft_limit_outter)
  340. if state.gear_ratio == 6.25:
  341. x_scaling_factor = 2
  342. y_scaling_factor = 3.7
  343. else:
  344. x_scaling_factor = 2
  345. y_scaling_factor = 5
  346. delta_theta = theta - state.current_theta
  347. delta_rho = rho - state.current_rho
  348. x_increment = delta_theta * 100 / (2 * pi * x_scaling_factor) # Added -1 to reverse direction
  349. y_increment = delta_rho * 100 / y_scaling_factor
  350. x_total_steps = state.x_steps_per_mm * (100/x_scaling_factor)
  351. y_total_steps = state.y_steps_per_mm * (100/y_scaling_factor)
  352. offset = x_increment * (x_total_steps * x_scaling_factor / (state.gear_ratio * y_total_steps * y_scaling_factor))
  353. if state.gear_ratio == 6.25:
  354. y_increment -= offset
  355. else:
  356. y_increment += offset
  357. new_x_abs = state.machine_x + x_increment
  358. new_y_abs = state.machine_y + y_increment
  359. # dynamic_speed = compute_dynamic_speed(rho, max_speed=state.speed)
  360. connection_manager.send_grbl_coordinates(round(new_x_abs, 3), round(new_y_abs,3), state.speed)
  361. state.current_theta = theta
  362. state.current_rho = rho
  363. state.machine_x = new_x_abs
  364. state.machine_y = new_y_abs
  365. def pause_execution():
  366. """Pause pattern execution using asyncio Event."""
  367. logger.info("Pausing pattern execution")
  368. state.pause_requested = True
  369. pause_event.clear() # Clear the event to pause execution
  370. return True
  371. def resume_execution():
  372. """Resume pattern execution using asyncio Event."""
  373. logger.info("Resuming pattern execution")
  374. state.pause_requested = False
  375. pause_event.set() # Set the event to resume execution
  376. return True
  377. def reset_theta():
  378. logger.info('Resetting Theta')
  379. state.current_theta = 0
  380. connection_manager.update_machine_position()
  381. def set_speed(new_speed):
  382. state.speed = new_speed
  383. logger.info(f'Set new state.speed {new_speed}')
  384. def get_status():
  385. """Get the current status of pattern execution."""
  386. status = {
  387. "current_file": state.current_playing_file,
  388. "is_paused": state.pause_requested,
  389. "is_running": bool(state.current_playing_file and not state.stop_requested),
  390. "progress": None,
  391. "playlist": None,
  392. "speed": state.speed
  393. }
  394. # Add playlist information if available
  395. if state.current_playlist and state.current_playlist_index is not None:
  396. next_index = state.current_playlist_index + 1
  397. status["playlist"] = {
  398. "current_index": state.current_playlist_index,
  399. "total_files": len(state.current_playlist),
  400. "mode": state.playlist_mode,
  401. "next_file": state.current_playlist[next_index] if next_index < len(state.current_playlist) else (state.current_playlist[0] if state.playlist_mode == "loop" else None)
  402. }
  403. if state.execution_progress:
  404. current, total, remaining_time, elapsed_time = state.execution_progress
  405. status["progress"] = {
  406. "current": current,
  407. "total": total,
  408. "remaining_time": remaining_time,
  409. "elapsed_time": elapsed_time,
  410. "percentage": (current / total * 100) if total > 0 else 0
  411. }
  412. return status
  413. async def broadcast_progress():
  414. """Background task to broadcast progress updates."""
  415. from app import active_status_connections
  416. while True:
  417. # Send status updates regardless of pattern_lock state
  418. status = get_status()
  419. disconnected = set()
  420. # Create a copy of the set for iteration
  421. active_connections = active_status_connections.copy()
  422. for websocket in active_connections:
  423. try:
  424. await websocket.send_json(status)
  425. except Exception as e:
  426. logger.warning(f"Failed to send status update: {e}")
  427. disconnected.add(websocket)
  428. # Clean up disconnected clients
  429. if disconnected:
  430. active_status_connections.difference_update(disconnected)
  431. # Check if we should stop broadcasting
  432. if not state.current_playlist:
  433. # If no playlist, only stop if no pattern is being executed
  434. if not pattern_lock.locked():
  435. logger.info("No playlist or pattern running, stopping broadcast")
  436. break
  437. # Wait before next update
  438. await asyncio.sleep(1)