|
@@ -27,11 +27,20 @@ import json
|
|
|
import base64
|
|
import base64
|
|
|
import time
|
|
import time
|
|
|
import argparse
|
|
import argparse
|
|
|
|
|
+from concurrent.futures import ProcessPoolExecutor
|
|
|
|
|
+import multiprocessing
|
|
|
|
|
|
|
|
# Get log level from environment variable, default to INFO
|
|
# Get log level from environment variable, default to INFO
|
|
|
log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper()
|
|
log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper()
|
|
|
log_level = getattr(logging, log_level_str, logging.INFO)
|
|
log_level = getattr(logging, log_level_str, logging.INFO)
|
|
|
|
|
|
|
|
|
|
+# Create a process pool for CPU-intensive tasks
|
|
|
|
|
+# Limit to reasonable number of workers for embedded systems
|
|
|
|
|
+cpu_count = multiprocessing.cpu_count()
|
|
|
|
|
+# Maximum 3 workers (leaving 1 for motion), minimum 1
|
|
|
|
|
+process_pool_size = min(3, max(1, cpu_count - 1))
|
|
|
|
|
+process_pool = None # Will be initialized in lifespan
|
|
|
|
|
+
|
|
|
logging.basicConfig(
|
|
logging.basicConfig(
|
|
|
level=log_level,
|
|
level=log_level,
|
|
|
format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
|
|
format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
|
|
@@ -65,6 +74,11 @@ async def lifespan(app: FastAPI):
|
|
|
# Register signal handlers
|
|
# Register signal handlers
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
|
+
|
|
|
|
|
+ # Initialize process pool for CPU-intensive tasks
|
|
|
|
|
+ global process_pool
|
|
|
|
|
+ process_pool = ProcessPoolExecutor(max_workers=process_pool_size)
|
|
|
|
|
+ logger.info(f"Initialized process pool with {process_pool_size} workers (detected {cpu_count} cores total)")
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
connection_manager.connect_device()
|
|
connection_manager.connect_device()
|
|
@@ -116,6 +130,13 @@ async def lifespan(app: FastAPI):
|
|
|
|
|
|
|
|
yield # This separates startup from shutdown code
|
|
yield # This separates startup from shutdown code
|
|
|
|
|
|
|
|
|
|
+ # Shutdown
|
|
|
|
|
+ logger.info("Shutting down Dune Weaver application...")
|
|
|
|
|
+
|
|
|
|
|
+ # Shutdown process pool
|
|
|
|
|
+ if process_pool:
|
|
|
|
|
+ process_pool.shutdown(wait=True)
|
|
|
|
|
+ logger.info("Process pool shutdown complete")
|
|
|
|
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
templates = Jinja2Templates(directory="templates")
|
|
@@ -480,8 +501,10 @@ async def get_theta_rho_coordinates(request: GetCoordinatesRequest):
|
|
|
if not exists:
|
|
if not exists:
|
|
|
raise HTTPException(status_code=404, detail=f"File {file_name} not found")
|
|
raise HTTPException(status_code=404, detail=f"File {file_name} not found")
|
|
|
|
|
|
|
|
- # Parse the theta-rho file asynchronously
|
|
|
|
|
- coordinates = await asyncio.to_thread(parse_theta_rho_file, file_path)
|
|
|
|
|
|
|
+ # Parse the theta-rho file in a separate process for CPU-intensive work
|
|
|
|
|
+ # This prevents blocking the motion control thread
|
|
|
|
|
+ loop = asyncio.get_event_loop()
|
|
|
|
|
+ coordinates = await loop.run_in_executor(process_pool, parse_theta_rho_file, file_path)
|
|
|
|
|
|
|
|
if not coordinates:
|
|
if not coordinates:
|
|
|
raise HTTPException(status_code=400, detail="No valid coordinates found in file")
|
|
raise HTTPException(status_code=400, detail="No valid coordinates found in file")
|
|
@@ -1089,7 +1112,9 @@ async def preview_thr_batch(request: dict):
|
|
|
last_coord_obj = metadata.get('last_coordinate')
|
|
last_coord_obj = metadata.get('last_coordinate')
|
|
|
else:
|
|
else:
|
|
|
logger.debug(f"Metadata cache miss for {file_name}, parsing file")
|
|
logger.debug(f"Metadata cache miss for {file_name}, parsing file")
|
|
|
- coordinates = await asyncio.to_thread(parse_theta_rho_file, pattern_file_path)
|
|
|
|
|
|
|
+ # Use process pool for CPU-intensive parsing
|
|
|
|
|
+ loop = asyncio.get_event_loop()
|
|
|
|
|
+ coordinates = await loop.run_in_executor(process_pool, parse_theta_rho_file, pattern_file_path)
|
|
|
first_coord = coordinates[0] if coordinates else None
|
|
first_coord = coordinates[0] if coordinates else None
|
|
|
last_coord = coordinates[-1] if coordinates else None
|
|
last_coord = coordinates[-1] if coordinates else None
|
|
|
first_coord_obj = {"x": first_coord[0], "y": first_coord[1]} if first_coord else None
|
|
first_coord_obj = {"x": first_coord[0], "y": first_coord[1]} if first_coord else None
|