|
@@ -64,15 +64,26 @@ def log_execution_time(pattern_name: str, table_type: str, speed: int, actual_ti
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to log execution time: {e}")
|
|
logger.error(f"Failed to log execution time: {e}")
|
|
|
|
|
|
|
|
-# Create an asyncio Event for pause/resume
|
|
|
|
|
-pause_event = asyncio.Event()
|
|
|
|
|
-pause_event.set() # Initially not paused
|
|
|
|
|
|
|
+# Asyncio primitives - initialized lazily to avoid event loop issues
|
|
|
|
|
+# These must be created in the context of the running event loop
|
|
|
|
|
+pause_event: Optional[asyncio.Event] = None
|
|
|
|
|
+pattern_lock: Optional[asyncio.Lock] = None
|
|
|
|
|
+progress_update_task = None
|
|
|
|
|
|
|
|
-# Create an asyncio Lock for pattern execution
|
|
|
|
|
-pattern_lock = asyncio.Lock()
|
|
|
|
|
|
|
+def get_pause_event() -> asyncio.Event:
|
|
|
|
|
+ """Get or create the pause event in the current event loop."""
|
|
|
|
|
+ global pause_event
|
|
|
|
|
+ if pause_event is None:
|
|
|
|
|
+ pause_event = asyncio.Event()
|
|
|
|
|
+ pause_event.set() # Initially not paused
|
|
|
|
|
+ return pause_event
|
|
|
|
|
|
|
|
-# Progress update task
|
|
|
|
|
-progress_update_task = None
|
|
|
|
|
|
|
+def get_pattern_lock() -> asyncio.Lock:
|
|
|
|
|
+ """Get or create the pattern lock in the current event loop."""
|
|
|
|
|
+ global pattern_lock
|
|
|
|
|
+ if pattern_lock is None:
|
|
|
|
|
+ pattern_lock = asyncio.Lock()
|
|
|
|
|
+ return pattern_lock
|
|
|
|
|
|
|
|
# Cache timezone at module level - read once per session (cleared when user changes timezone)
|
|
# Cache timezone at module level - read once per session (cleared when user changes timezone)
|
|
|
_cached_timezone = None
|
|
_cached_timezone = None
|
|
@@ -430,12 +441,13 @@ async def cleanup_pattern_manager():
|
|
|
|
|
|
|
|
# Clean up pattern lock - wait for it to be released naturally, don't force release
|
|
# Clean up pattern lock - wait for it to be released naturally, don't force release
|
|
|
# Force releasing an asyncio.Lock can corrupt internal state if held by another coroutine
|
|
# Force releasing an asyncio.Lock can corrupt internal state if held by another coroutine
|
|
|
- if pattern_lock and pattern_lock.locked():
|
|
|
|
|
|
|
+ current_lock = pattern_lock
|
|
|
|
|
+ if current_lock and current_lock.locked():
|
|
|
logger.info("Pattern lock is held, waiting for release (max 5s)...")
|
|
logger.info("Pattern lock is held, waiting for release (max 5s)...")
|
|
|
try:
|
|
try:
|
|
|
# Wait with timeout for the lock to become available
|
|
# Wait with timeout for the lock to become available
|
|
|
async with asyncio.timeout(5.0):
|
|
async with asyncio.timeout(5.0):
|
|
|
- async with pattern_lock:
|
|
|
|
|
|
|
+ async with current_lock:
|
|
|
pass # Lock acquired means previous holder released it
|
|
pass # Lock acquired means previous holder released it
|
|
|
logger.info("Pattern lock released normally")
|
|
logger.info("Pattern lock released normally")
|
|
|
except asyncio.TimeoutError:
|
|
except asyncio.TimeoutError:
|
|
@@ -444,9 +456,10 @@ async def cleanup_pattern_manager():
|
|
|
logger.error(f"Error waiting for pattern lock: {e}")
|
|
logger.error(f"Error waiting for pattern lock: {e}")
|
|
|
|
|
|
|
|
# Clean up pause event - wake up any waiting tasks, then create fresh event
|
|
# Clean up pause event - wake up any waiting tasks, then create fresh event
|
|
|
- if pause_event:
|
|
|
|
|
|
|
+ current_event = pause_event
|
|
|
|
|
+ if current_event:
|
|
|
try:
|
|
try:
|
|
|
- pause_event.set() # Wake up any waiting tasks
|
|
|
|
|
|
|
+ current_event.set() # Wake up any waiting tasks
|
|
|
except Exception as e:
|
|
except Exception as e:
|
|
|
logger.error(f"Error setting pause event: {e}")
|
|
logger.error(f"Error setting pause event: {e}")
|
|
|
|
|
|
|
@@ -688,11 +701,12 @@ def is_clear_pattern(file_path):
|
|
|
|
|
|
|
|
async def run_theta_rho_file(file_path, is_playlist=False):
|
|
async def run_theta_rho_file(file_path, is_playlist=False):
|
|
|
"""Run a theta-rho file by sending data in optimized batches with tqdm ETA tracking."""
|
|
"""Run a theta-rho file by sending data in optimized batches with tqdm ETA tracking."""
|
|
|
- if pattern_lock.locked():
|
|
|
|
|
|
|
+ lock = get_pattern_lock()
|
|
|
|
|
+ if lock.locked():
|
|
|
logger.warning("Another pattern is already running. Cannot start a new one.")
|
|
logger.warning("Another pattern is already running. Cannot start a new one.")
|
|
|
return
|
|
return
|
|
|
|
|
|
|
|
- async with pattern_lock: # This ensures only one pattern can run at a time
|
|
|
|
|
|
|
+ async with lock: # This ensures only one pattern can run at a time
|
|
|
# Start progress update task only if not part of a playlist
|
|
# Start progress update task only if not part of a playlist
|
|
|
global progress_update_task
|
|
global progress_update_task
|
|
|
if not is_playlist and not progress_update_task:
|
|
if not is_playlist and not progress_update_task:
|
|
@@ -798,7 +812,7 @@ async def run_theta_rho_file(file_path, is_playlist=False):
|
|
|
if state.pause_requested:
|
|
if state.pause_requested:
|
|
|
# For manual pause, wait directly on the event for immediate response
|
|
# For manual pause, wait directly on the event for immediate response
|
|
|
# The while loop re-checks state after wake to handle rapid pause/resume
|
|
# The while loop re-checks state after wake to handle rapid pause/resume
|
|
|
- await pause_event.wait()
|
|
|
|
|
|
|
+ await get_pause_event().wait()
|
|
|
else:
|
|
else:
|
|
|
# For scheduled pause only, check periodically
|
|
# For scheduled pause only, check periodically
|
|
|
await asyncio.sleep(1)
|
|
await asyncio.sleep(1)
|
|
@@ -1102,10 +1116,11 @@ async def stop_actions(clear_playlist = True, wait_for_lock = True):
|
|
|
# Wait for the pattern lock to be released before continuing
|
|
# Wait for the pattern lock to be released before continuing
|
|
|
# This ensures that when stop_actions completes, the pattern has fully stopped
|
|
# This ensures that when stop_actions completes, the pattern has fully stopped
|
|
|
# Skip this if called from within pattern execution to avoid deadlock
|
|
# Skip this if called from within pattern execution to avoid deadlock
|
|
|
- if wait_for_lock and pattern_lock.locked():
|
|
|
|
|
|
|
+ lock = get_pattern_lock()
|
|
|
|
|
+ if wait_for_lock and lock.locked():
|
|
|
logger.info("Waiting for pattern to fully stop...")
|
|
logger.info("Waiting for pattern to fully stop...")
|
|
|
# Acquire and immediately release the lock to ensure the pattern has exited
|
|
# Acquire and immediately release the lock to ensure the pattern has exited
|
|
|
- async with pattern_lock:
|
|
|
|
|
|
|
+ async with lock:
|
|
|
logger.info("Pattern lock acquired - pattern has fully stopped")
|
|
logger.info("Pattern lock acquired - pattern has fully stopped")
|
|
|
|
|
|
|
|
# Call async function directly since we're in async context
|
|
# Call async function directly since we're in async context
|
|
@@ -1155,14 +1170,14 @@ def pause_execution():
|
|
|
"""Pause pattern execution using asyncio Event."""
|
|
"""Pause pattern execution using asyncio Event."""
|
|
|
logger.info("Pausing pattern execution")
|
|
logger.info("Pausing pattern execution")
|
|
|
state.pause_requested = True
|
|
state.pause_requested = True
|
|
|
- pause_event.clear() # Clear the event to pause execution
|
|
|
|
|
|
|
+ get_pause_event().clear() # Clear the event to pause execution
|
|
|
return True
|
|
return True
|
|
|
|
|
|
|
|
def resume_execution():
|
|
def resume_execution():
|
|
|
"""Resume pattern execution using asyncio Event."""
|
|
"""Resume pattern execution using asyncio Event."""
|
|
|
logger.info("Resuming pattern execution")
|
|
logger.info("Resuming pattern execution")
|
|
|
state.pause_requested = False
|
|
state.pause_requested = False
|
|
|
- pause_event.set() # Set the event to resume execution
|
|
|
|
|
|
|
+ get_pause_event().set() # Set the event to resume execution
|
|
|
return True
|
|
return True
|
|
|
|
|
|
|
|
async def reset_theta():
|
|
async def reset_theta():
|
|
@@ -1228,7 +1243,7 @@ async def broadcast_progress():
|
|
|
# Check if we should stop broadcasting
|
|
# Check if we should stop broadcasting
|
|
|
if not state.current_playlist:
|
|
if not state.current_playlist:
|
|
|
# If no playlist, only stop if no pattern is being executed
|
|
# If no playlist, only stop if no pattern is being executed
|
|
|
- if not pattern_lock.locked():
|
|
|
|
|
|
|
+ if not get_pattern_lock().locked():
|
|
|
logger.info("No playlist or pattern running, stopping broadcast")
|
|
logger.info("No playlist or pattern running, stopping broadcast")
|
|
|
break
|
|
break
|
|
|
|
|
|