|
|
@@ -27,11 +27,20 @@ import json
|
|
|
import base64
|
|
|
import time
|
|
|
import argparse
|
|
|
+from concurrent.futures import ProcessPoolExecutor
|
|
|
+import multiprocessing
|
|
|
|
|
|
# Get log level from environment variable, default to INFO
|
|
|
log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper()
|
|
|
log_level = getattr(logging, log_level_str, logging.INFO)
|
|
|
|
|
|
+# Create a process pool for CPU-intensive tasks
|
|
|
+# Limit to reasonable number of workers for embedded systems
|
|
|
+cpu_count = multiprocessing.cpu_count()
|
|
|
+# Maximum 3 workers (leaving 1 for motion), minimum 1
|
|
|
+process_pool_size = min(3, max(1, cpu_count - 1))
|
|
|
+process_pool = None # Will be initialized in lifespan
|
|
|
+
|
|
|
logging.basicConfig(
|
|
|
level=log_level,
|
|
|
format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
|
|
|
@@ -65,6 +74,11 @@ async def lifespan(app: FastAPI):
|
|
|
# Register signal handlers
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
+
|
|
|
+ # Initialize process pool for CPU-intensive tasks
|
|
|
+ global process_pool
|
|
|
+ process_pool = ProcessPoolExecutor(max_workers=process_pool_size)
|
|
|
+ logger.info(f"Initialized process pool with {process_pool_size} workers (detected {cpu_count} cores total)")
|
|
|
|
|
|
try:
|
|
|
connection_manager.connect_device()
|
|
|
@@ -116,6 +130,13 @@ async def lifespan(app: FastAPI):
|
|
|
|
|
|
yield # This separates startup from shutdown code
|
|
|
|
|
|
+ # Shutdown
|
|
|
+ logger.info("Shutting down Dune Weaver application...")
|
|
|
+
|
|
|
+ # Shutdown process pool
|
|
|
+ if process_pool:
|
|
|
+ process_pool.shutdown(wait=True)
|
|
|
+ logger.info("Process pool shutdown complete")
|
|
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
@@ -133,6 +154,16 @@ class auto_playModeRequest(BaseModel):
|
|
|
clear_pattern: Optional[str] = "adaptive"
|
|
|
shuffle: Optional[bool] = False
|
|
|
|
|
|
+class TimeSlot(BaseModel):
|
|
|
+ start_time: str # HH:MM format
|
|
|
+ end_time: str # HH:MM format
|
|
|
+ days: str # "daily", "weekdays", "weekends", or "custom"
|
|
|
+ custom_days: Optional[List[str]] = [] # ["monday", "tuesday", etc.]
|
|
|
+
|
|
|
+class ScheduledPauseRequest(BaseModel):
|
|
|
+ enabled: bool
|
|
|
+ time_slots: List[TimeSlot] = []
|
|
|
+
|
|
|
class CoordinateRequest(BaseModel):
|
|
|
theta: float
|
|
|
rho: float
|
|
|
@@ -218,11 +249,12 @@ async def broadcast_status_update(status: dict):
|
|
|
|
|
|
@app.websocket("/ws/cache-progress")
|
|
|
async def websocket_cache_progress_endpoint(websocket: WebSocket):
|
|
|
+ from modules.core.cache_manager import get_cache_progress
|
|
|
+
|
|
|
await websocket.accept()
|
|
|
active_cache_progress_connections.add(websocket)
|
|
|
try:
|
|
|
while True:
|
|
|
- from modules.core.cache_manager import get_cache_progress
|
|
|
progress = get_cache_progress()
|
|
|
try:
|
|
|
await websocket.send_json({
|
|
|
@@ -233,7 +265,7 @@ async def websocket_cache_progress_endpoint(websocket: WebSocket):
|
|
|
if "close message has been sent" in str(e):
|
|
|
break
|
|
|
raise
|
|
|
- await asyncio.sleep(0.5) # Update every 500ms
|
|
|
+ await asyncio.sleep(1.0) # Update every 1 second (reduced frequency for better performance)
|
|
|
except WebSocketDisconnect:
|
|
|
pass
|
|
|
finally:
|
|
|
@@ -283,10 +315,71 @@ async def set_auto_play_mode(request: auto_playModeRequest):
|
|
|
logger.info(f"auto_play mode {'enabled' if request.enabled else 'disabled'}, playlist: {request.playlist}")
|
|
|
return {"success": True, "message": "auto_play mode settings updated"}
|
|
|
|
|
|
+@app.get("/api/scheduled-pause")
|
|
|
+async def get_scheduled_pause():
|
|
|
+ """Get current Still Sands settings."""
|
|
|
+ return {
|
|
|
+ "enabled": state.scheduled_pause_enabled,
|
|
|
+ "time_slots": state.scheduled_pause_time_slots
|
|
|
+ }
|
|
|
+
|
|
|
+@app.post("/api/scheduled-pause")
|
|
|
+async def set_scheduled_pause(request: ScheduledPauseRequest):
|
|
|
+ """Update Still Sands settings."""
|
|
|
+ try:
|
|
|
+ # Validate time slots
|
|
|
+ for i, slot in enumerate(request.time_slots):
|
|
|
+ # Validate time format (HH:MM)
|
|
|
+ try:
|
|
|
+ start_time = datetime.strptime(slot.start_time, "%H:%M").time()
|
|
|
+ end_time = datetime.strptime(slot.end_time, "%H:%M").time()
|
|
|
+ except ValueError:
|
|
|
+ raise HTTPException(
|
|
|
+ status_code=400,
|
|
|
+ detail=f"Invalid time format in slot {i+1}. Use HH:MM format."
|
|
|
+ )
|
|
|
+
|
|
|
+ # Validate days setting
|
|
|
+ if slot.days not in ["daily", "weekdays", "weekends", "custom"]:
|
|
|
+ raise HTTPException(
|
|
|
+ status_code=400,
|
|
|
+ detail=f"Invalid days setting in slot {i+1}. Must be 'daily', 'weekdays', 'weekends', or 'custom'."
|
|
|
+ )
|
|
|
+
|
|
|
+ # Validate custom days if applicable
|
|
|
+ if slot.days == "custom":
|
|
|
+ if not slot.custom_days or len(slot.custom_days) == 0:
|
|
|
+ raise HTTPException(
|
|
|
+ status_code=400,
|
|
|
+ detail=f"Custom days must be specified for slot {i+1} when days is set to 'custom'."
|
|
|
+ )
|
|
|
+
|
|
|
+ valid_days = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
|
|
|
+ for day in slot.custom_days:
|
|
|
+ if day not in valid_days:
|
|
|
+ raise HTTPException(
|
|
|
+ status_code=400,
|
|
|
+ detail=f"Invalid day '{day}' in slot {i+1}. Valid days are: {', '.join(valid_days)}"
|
|
|
+ )
|
|
|
+
|
|
|
+ # Update state
|
|
|
+ state.scheduled_pause_enabled = request.enabled
|
|
|
+ state.scheduled_pause_time_slots = [slot.model_dump() for slot in request.time_slots]
|
|
|
+ state.save()
|
|
|
+
|
|
|
+ logger.info(f"Still Sands {'enabled' if request.enabled else 'disabled'} with {len(request.time_slots)} time slots")
|
|
|
+ return {"success": True, "message": "Still Sands settings updated"}
|
|
|
+
|
|
|
+ except HTTPException:
|
|
|
+ raise
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Error updating Still Sands settings: {str(e)}")
|
|
|
+ raise HTTPException(status_code=500, detail=f"Failed to update Still Sands settings: {str(e)}")
|
|
|
+
|
|
|
@app.get("/list_serial_ports")
|
|
|
async def list_ports():
|
|
|
logger.debug("Listing available serial ports")
|
|
|
- return connection_manager.list_serial_ports()
|
|
|
+ return await asyncio.to_thread(connection_manager.list_serial_ports)
|
|
|
|
|
|
@app.post("/connect")
|
|
|
async def connect(request: ConnectRequest):
|
|
|
@@ -332,7 +425,8 @@ async def restart(request: ConnectRequest):
|
|
|
@app.get("/list_theta_rho_files")
|
|
|
async def list_theta_rho_files():
|
|
|
logger.debug("Listing theta-rho files")
|
|
|
- files = pattern_manager.list_theta_rho_files()
|
|
|
+ # Run the blocking file system operation in a thread pool
|
|
|
+ files = await asyncio.to_thread(pattern_manager.list_theta_rho_files)
|
|
|
return sorted(files)
|
|
|
|
|
|
@app.get("/list_theta_rho_files_with_metadata")
|
|
|
@@ -345,9 +439,10 @@ async def list_theta_rho_files_with_metadata():
|
|
|
import asyncio
|
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
|
|
- files = pattern_manager.list_theta_rho_files()
|
|
|
+ # Run the blocking file listing in a thread
|
|
|
+ files = await asyncio.to_thread(pattern_manager.list_theta_rho_files)
|
|
|
files_with_metadata = []
|
|
|
-
|
|
|
+
|
|
|
# Use ThreadPoolExecutor for I/O-bound operations
|
|
|
executor = ThreadPoolExecutor(max_workers=4)
|
|
|
|
|
|
@@ -400,18 +495,74 @@ async def list_theta_rho_files_with_metadata():
|
|
|
'coordinates_count': 0
|
|
|
}
|
|
|
|
|
|
- # Process files in parallel using asyncio
|
|
|
- loop = asyncio.get_event_loop()
|
|
|
- tasks = [loop.run_in_executor(executor, process_file, file_path) for file_path in files]
|
|
|
-
|
|
|
- # Process results as they complete
|
|
|
- for task in asyncio.as_completed(tasks):
|
|
|
- try:
|
|
|
- result = await task
|
|
|
- files_with_metadata.append(result)
|
|
|
- except Exception as e:
|
|
|
- logger.error(f"Error processing file: {str(e)}")
|
|
|
-
|
|
|
+ # Load the entire metadata cache at once (async)
|
|
|
+ # This is much faster than 1000+ individual metadata lookups
|
|
|
+ try:
|
|
|
+ import json
|
|
|
+ metadata_cache_path = "metadata_cache.json"
|
|
|
+ # Use async file reading to avoid blocking the event loop
|
|
|
+ cache_data = await asyncio.to_thread(lambda: json.load(open(metadata_cache_path, 'r')))
|
|
|
+ cache_dict = cache_data.get('data', {})
|
|
|
+ logger.debug(f"Loaded metadata cache with {len(cache_dict)} entries")
|
|
|
+
|
|
|
+ # Process all files using cached data only
|
|
|
+ for file_path in files:
|
|
|
+ try:
|
|
|
+ # Extract category from path
|
|
|
+ path_parts = file_path.split('/')
|
|
|
+ category = '/'.join(path_parts[:-1]) if len(path_parts) > 1 else 'root'
|
|
|
+
|
|
|
+ # Get file name without extension
|
|
|
+ file_name = os.path.splitext(os.path.basename(file_path))[0]
|
|
|
+
|
|
|
+ # Get metadata from cache
|
|
|
+ cached_entry = cache_dict.get(file_path, {})
|
|
|
+ if isinstance(cached_entry, dict) and 'metadata' in cached_entry:
|
|
|
+ metadata = cached_entry['metadata']
|
|
|
+ coords_count = metadata.get('total_coordinates', 0)
|
|
|
+ date_modified = cached_entry.get('mtime', 0)
|
|
|
+ else:
|
|
|
+ coords_count = 0
|
|
|
+ date_modified = 0
|
|
|
+
|
|
|
+ files_with_metadata.append({
|
|
|
+ 'path': file_path,
|
|
|
+ 'name': file_name,
|
|
|
+ 'category': category,
|
|
|
+ 'date_modified': date_modified,
|
|
|
+ 'coordinates_count': coords_count
|
|
|
+ })
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.warning(f"Error processing {file_path}: {e}")
|
|
|
+ # Include file with minimal info if processing fails
|
|
|
+ path_parts = file_path.split('/')
|
|
|
+ category = '/'.join(path_parts[:-1]) if len(path_parts) > 1 else 'root'
|
|
|
+ files_with_metadata.append({
|
|
|
+ 'path': file_path,
|
|
|
+ 'name': os.path.splitext(os.path.basename(file_path))[0],
|
|
|
+ 'category': category,
|
|
|
+ 'date_modified': 0,
|
|
|
+ 'coordinates_count': 0
|
|
|
+ })
|
|
|
+
|
|
|
+ except Exception as e:
|
|
|
+ logger.error(f"Failed to load metadata cache, falling back to slow method: {e}")
|
|
|
+ # Fallback to original method if cache loading fails
|
|
|
+ # Create tasks only when needed
|
|
|
+ loop = asyncio.get_event_loop()
|
|
|
+ tasks = [loop.run_in_executor(executor, process_file, file_path) for file_path in files]
|
|
|
+
|
|
|
+ for task in asyncio.as_completed(tasks):
|
|
|
+ try:
|
|
|
+ result = await task
|
|
|
+ files_with_metadata.append(result)
|
|
|
+ except Exception as task_error:
|
|
|
+ logger.error(f"Error processing file: {str(task_error)}")
|
|
|
+
|
|
|
+ # Clean up executor
|
|
|
+ executor.shutdown(wait=False)
|
|
|
+
|
|
|
return files_with_metadata
|
|
|
|
|
|
@app.post("/upload_theta_rho")
|
|
|
@@ -472,11 +623,15 @@ async def get_theta_rho_coordinates(request: GetCoordinatesRequest):
|
|
|
file_name = normalize_file_path(request.file_name)
|
|
|
file_path = os.path.join(THETA_RHO_DIR, file_name)
|
|
|
|
|
|
- if not os.path.exists(file_path):
|
|
|
+ # Check file existence asynchronously
|
|
|
+ exists = await asyncio.to_thread(os.path.exists, file_path)
|
|
|
+ if not exists:
|
|
|
raise HTTPException(status_code=404, detail=f"File {file_name} not found")
|
|
|
-
|
|
|
- # Parse the theta-rho file
|
|
|
- coordinates = parse_theta_rho_file(file_path)
|
|
|
+
|
|
|
+ # Parse the theta-rho file in a separate process for CPU-intensive work
|
|
|
+ # This prevents blocking the motion control thread
|
|
|
+ loop = asyncio.get_event_loop()
|
|
|
+ coordinates = await loop.run_in_executor(process_pool, parse_theta_rho_file, file_path)
|
|
|
|
|
|
if not coordinates:
|
|
|
raise HTTPException(status_code=400, detail="No valid coordinates found in file")
|
|
|
@@ -546,7 +701,7 @@ async def stop_execution():
|
|
|
if not (state.conn.is_connected() if state.conn else False):
|
|
|
logger.warning("Attempted to stop without a connection")
|
|
|
raise HTTPException(status_code=400, detail="Connection not established")
|
|
|
- pattern_manager.stop_actions()
|
|
|
+ await pattern_manager.stop_actions()
|
|
|
return {"success": True}
|
|
|
|
|
|
@app.post("/send_home")
|
|
|
@@ -594,18 +749,21 @@ async def delete_theta_rho_file(request: DeleteFileRequest):
|
|
|
# Normalize file path for cross-platform compatibility
|
|
|
normalized_file_name = normalize_file_path(request.file_name)
|
|
|
file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
|
|
|
- if not os.path.exists(file_path):
|
|
|
+
|
|
|
+ # Check file existence asynchronously
|
|
|
+ exists = await asyncio.to_thread(os.path.exists, file_path)
|
|
|
+ if not exists:
|
|
|
logger.error(f"Attempted to delete non-existent file: {file_path}")
|
|
|
raise HTTPException(status_code=404, detail="File not found")
|
|
|
|
|
|
try:
|
|
|
- # Delete the pattern file
|
|
|
- os.remove(file_path)
|
|
|
+ # Delete the pattern file asynchronously
|
|
|
+ await asyncio.to_thread(os.remove, file_path)
|
|
|
logger.info(f"Successfully deleted theta-rho file: {request.file_name}")
|
|
|
|
|
|
- # Clean up cached preview image and metadata
|
|
|
+ # Clean up cached preview image and metadata asynchronously
|
|
|
from modules.core.cache_manager import delete_pattern_cache
|
|
|
- cache_cleanup_success = delete_pattern_cache(normalized_file_name)
|
|
|
+ cache_cleanup_success = await asyncio.to_thread(delete_pattern_cache, normalized_file_name)
|
|
|
if cache_cleanup_success:
|
|
|
logger.info(f"Successfully cleaned up cache for {request.file_name}")
|
|
|
else:
|
|
|
@@ -624,8 +782,8 @@ async def move_to_center():
|
|
|
raise HTTPException(status_code=400, detail="Connection not established")
|
|
|
|
|
|
logger.info("Moving device to center position")
|
|
|
- pattern_manager.reset_theta()
|
|
|
- pattern_manager.move_polar(0, 0)
|
|
|
+ await pattern_manager.reset_theta()
|
|
|
+ await pattern_manager.move_polar(0, 0)
|
|
|
return {"success": True}
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to move to center: {str(e)}")
|
|
|
@@ -637,8 +795,8 @@ async def move_to_perimeter():
|
|
|
if not (state.conn.is_connected() if state.conn else False):
|
|
|
logger.warning("Attempted to move to perimeter without a connection")
|
|
|
raise HTTPException(status_code=400, detail="Connection not established")
|
|
|
- pattern_manager.reset_theta()
|
|
|
- pattern_manager.move_polar(0, 1)
|
|
|
+ await pattern_manager.reset_theta()
|
|
|
+ await pattern_manager.move_polar(0, 1)
|
|
|
return {"success": True}
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to move to perimeter: {str(e)}")
|
|
|
@@ -654,18 +812,24 @@ async def preview_thr(request: DeleteFileRequest):
|
|
|
normalized_file_name = normalize_file_path(request.file_name)
|
|
|
# Construct the full path to the pattern file to check existence
|
|
|
pattern_file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
|
|
|
- if not os.path.exists(pattern_file_path):
|
|
|
+
|
|
|
+ # Check file existence asynchronously
|
|
|
+ exists = await asyncio.to_thread(os.path.exists, pattern_file_path)
|
|
|
+ if not exists:
|
|
|
logger.error(f"Attempted to preview non-existent pattern file: {pattern_file_path}")
|
|
|
raise HTTPException(status_code=404, detail="Pattern file not found")
|
|
|
|
|
|
try:
|
|
|
cache_path = get_cache_path(normalized_file_name)
|
|
|
-
|
|
|
- if not os.path.exists(cache_path):
|
|
|
+
|
|
|
+ # Check cache existence asynchronously
|
|
|
+ cache_exists = await asyncio.to_thread(os.path.exists, cache_path)
|
|
|
+ if not cache_exists:
|
|
|
logger.info(f"Cache miss for {request.file_name}. Generating preview...")
|
|
|
# Attempt to generate the preview if it's missing
|
|
|
success = await generate_image_preview(normalized_file_name)
|
|
|
- if not success or not os.path.exists(cache_path):
|
|
|
+ cache_exists_after = await asyncio.to_thread(os.path.exists, cache_path)
|
|
|
+ if not success or not cache_exists_after:
|
|
|
logger.error(f"Failed to generate or find preview for {request.file_name} after attempting generation.")
|
|
|
raise HTTPException(status_code=500, detail="Failed to generate preview image.")
|
|
|
|
|
|
@@ -747,7 +911,7 @@ async def send_coordinate(request: CoordinateRequest):
|
|
|
|
|
|
try:
|
|
|
logger.debug(f"Sending coordinate: theta={request.theta}, rho={request.rho}")
|
|
|
- pattern_manager.move_polar(request.theta, request.rho)
|
|
|
+ await pattern_manager.move_polar(request.theta, request.rho)
|
|
|
return {"success": True}
|
|
|
except Exception as e:
|
|
|
logger.error(f"Failed to send coordinate: {str(e)}")
|
|
|
@@ -1043,27 +1207,31 @@ async def preview_thr_batch(request: dict):
|
|
|
"Content-Type": "application/json"
|
|
|
}
|
|
|
|
|
|
- results = {}
|
|
|
- for file_name in file_names:
|
|
|
+ async def process_single_file(file_name):
|
|
|
+ """Process a single file and return its preview data."""
|
|
|
t1 = time.time()
|
|
|
try:
|
|
|
# Normalize file path for cross-platform compatibility
|
|
|
normalized_file_name = normalize_file_path(file_name)
|
|
|
pattern_file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
|
|
|
- if not os.path.exists(pattern_file_path):
|
|
|
+
|
|
|
+ # Check file existence asynchronously
|
|
|
+ exists = await asyncio.to_thread(os.path.exists, pattern_file_path)
|
|
|
+ if not exists:
|
|
|
logger.warning(f"Pattern file not found: {pattern_file_path}")
|
|
|
- results[file_name] = {"error": "Pattern file not found"}
|
|
|
- continue
|
|
|
+ return file_name, {"error": "Pattern file not found"}
|
|
|
|
|
|
cache_path = get_cache_path(normalized_file_name)
|
|
|
-
|
|
|
- if not os.path.exists(cache_path):
|
|
|
+
|
|
|
+ # Check cache existence asynchronously
|
|
|
+ cache_exists = await asyncio.to_thread(os.path.exists, cache_path)
|
|
|
+ if not cache_exists:
|
|
|
logger.info(f"Cache miss for {file_name}. Generating preview...")
|
|
|
success = await generate_image_preview(normalized_file_name)
|
|
|
- if not success or not os.path.exists(cache_path):
|
|
|
+ cache_exists_after = await asyncio.to_thread(os.path.exists, cache_path)
|
|
|
+ if not success or not cache_exists_after:
|
|
|
logger.error(f"Failed to generate or find preview for {file_name}")
|
|
|
- results[file_name] = {"error": "Failed to generate preview"}
|
|
|
- continue
|
|
|
+ return file_name, {"error": "Failed to generate preview"}
|
|
|
|
|
|
metadata = get_pattern_metadata(normalized_file_name)
|
|
|
if metadata:
|
|
|
@@ -1071,25 +1239,34 @@ async def preview_thr_batch(request: dict):
|
|
|
last_coord_obj = metadata.get('last_coordinate')
|
|
|
else:
|
|
|
logger.debug(f"Metadata cache miss for {file_name}, parsing file")
|
|
|
- coordinates = await asyncio.to_thread(parse_theta_rho_file, pattern_file_path)
|
|
|
+ # Use process pool for CPU-intensive parsing
|
|
|
+ loop = asyncio.get_event_loop()
|
|
|
+ coordinates = await loop.run_in_executor(process_pool, parse_theta_rho_file, pattern_file_path)
|
|
|
first_coord = coordinates[0] if coordinates else None
|
|
|
last_coord = coordinates[-1] if coordinates else None
|
|
|
first_coord_obj = {"x": first_coord[0], "y": first_coord[1]} if first_coord else None
|
|
|
last_coord_obj = {"x": last_coord[0], "y": last_coord[1]} if last_coord else None
|
|
|
|
|
|
- with open(cache_path, 'rb') as f:
|
|
|
- image_data = f.read()
|
|
|
+ # Read image file asynchronously
|
|
|
+ image_data = await asyncio.to_thread(lambda: open(cache_path, 'rb').read())
|
|
|
image_b64 = base64.b64encode(image_data).decode('utf-8')
|
|
|
- results[file_name] = {
|
|
|
+ result = {
|
|
|
"image_data": f"data:image/webp;base64,{image_b64}",
|
|
|
"first_coordinate": first_coord_obj,
|
|
|
"last_coordinate": last_coord_obj
|
|
|
}
|
|
|
+ logger.debug(f"Processed {file_name} in {time.time() - t1:.2f}s")
|
|
|
+ return file_name, result
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error processing {file_name}: {str(e)}")
|
|
|
- results[file_name] = {"error": str(e)}
|
|
|
- finally:
|
|
|
- logger.debug(f"Processed {file_name} in {time.time() - t1:.2f}s")
|
|
|
+ return file_name, {"error": str(e)}
|
|
|
+
|
|
|
+ # Process all files concurrently
|
|
|
+ tasks = [process_single_file(file_name) for file_name in file_names]
|
|
|
+ file_results = await asyncio.gather(*tasks)
|
|
|
+
|
|
|
+ # Convert results to dictionary
|
|
|
+ results = dict(file_results)
|
|
|
|
|
|
logger.info(f"Total batch processing time: {time.time() - start:.2f}s for {len(file_names)} files")
|
|
|
return JSONResponse(content=results, headers=headers)
|
|
|
@@ -1134,10 +1311,24 @@ def signal_handler(signum, frame):
|
|
|
try:
|
|
|
if state.led_controller:
|
|
|
state.led_controller.set_power(0)
|
|
|
- # Run cleanup operations synchronously to ensure completion
|
|
|
- pattern_manager.stop_actions()
|
|
|
+ # Run cleanup operations - need to handle async in sync context
|
|
|
+ try:
|
|
|
+ # Try to run in existing loop if available
|
|
|
+ import asyncio
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
+ # If we're in an event loop, schedule the coroutine
|
|
|
+ import concurrent.futures
|
|
|
+ with concurrent.futures.ThreadPoolExecutor() as executor:
|
|
|
+ future = executor.submit(asyncio.run, pattern_manager.stop_actions())
|
|
|
+ future.result(timeout=5.0) # Wait up to 5 seconds
|
|
|
+ except RuntimeError:
|
|
|
+ # No running loop, create a new one
|
|
|
+ import asyncio
|
|
|
+ asyncio.run(pattern_manager.stop_actions())
|
|
|
+ except Exception as cleanup_err:
|
|
|
+ logger.error(f"Error in async cleanup: {cleanup_err}")
|
|
|
+
|
|
|
state.save()
|
|
|
-
|
|
|
logger.info("Cleanup completed")
|
|
|
except Exception as e:
|
|
|
logger.error(f"Error during cleanup: {str(e)}")
|