|
@@ -30,22 +30,14 @@ import json
|
|
|
import base64
|
|
import base64
|
|
|
import time
|
|
import time
|
|
|
import argparse
|
|
import argparse
|
|
|
-from concurrent.futures import ProcessPoolExecutor
|
|
|
|
|
-import multiprocessing
|
|
|
|
|
import subprocess
|
|
import subprocess
|
|
|
import platform
|
|
import platform
|
|
|
|
|
+from modules.core import process_pool as pool_module
|
|
|
|
|
|
|
|
# Get log level from environment variable, default to INFO
|
|
# Get log level from environment variable, default to INFO
|
|
|
log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper()
|
|
log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper()
|
|
|
log_level = getattr(logging, log_level_str, logging.INFO)
|
|
log_level = getattr(logging, log_level_str, logging.INFO)
|
|
|
|
|
|
|
|
-# Create a process pool for CPU-intensive tasks
|
|
|
|
|
-# Limit to reasonable number of workers for embedded systems
|
|
|
|
|
-cpu_count = multiprocessing.cpu_count()
|
|
|
|
|
-# Maximum 3 workers (leaving 1 for motion), minimum 1
|
|
|
|
|
-process_pool_size = min(3, max(1, cpu_count - 1))
|
|
|
|
|
-process_pool = None # Will be initialized in lifespan
|
|
|
|
|
-
|
|
|
|
|
logging.basicConfig(
|
|
logging.basicConfig(
|
|
|
level=log_level,
|
|
level=log_level,
|
|
|
format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
|
|
format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
|
|
@@ -103,10 +95,17 @@ async def lifespan(app: FastAPI):
|
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
signal.signal(signal.SIGTERM, signal_handler)
|
|
|
|
|
|
|
|
- # Initialize process pool for CPU-intensive tasks
|
|
|
|
|
- global process_pool
|
|
|
|
|
- process_pool = ProcessPoolExecutor(max_workers=process_pool_size)
|
|
|
|
|
- logger.info(f"Initialized process pool with {process_pool_size} workers (detected {cpu_count} cores total)")
|
|
|
|
|
|
|
+ # Initialize shared process pool for CPU-intensive tasks
|
|
|
|
|
+ pool_module.init_pool()
|
|
|
|
|
+
|
|
|
|
|
+ # Pin main process to CPUs 1-N to keep CPU 0 dedicated to motion/LED
|
|
|
|
|
+ from modules.core import scheduling
|
|
|
|
|
+ background_cpus = scheduling.get_background_cpus()
|
|
|
|
|
+ if background_cpus:
|
|
|
|
|
+ scheduling.pin_to_cpus(background_cpus)
|
|
|
|
|
+ logger.info(f"FastAPI main process pinned to CPUs {sorted(background_cpus)}")
|
|
|
|
|
+ else:
|
|
|
|
|
+ logger.info("Single-core system detected, skipping CPU pinning")
|
|
|
|
|
|
|
|
try:
|
|
try:
|
|
|
connection_manager.connect_device()
|
|
connection_manager.connect_device()
|
|
@@ -243,14 +242,16 @@ async def lifespan(app: FastAPI):
|
|
|
logger.info("Shutting down Dune Weaver application...")
|
|
logger.info("Shutting down Dune Weaver application...")
|
|
|
|
|
|
|
|
# Shutdown process pool
|
|
# Shutdown process pool
|
|
|
- if process_pool:
|
|
|
|
|
- process_pool.shutdown(wait=True)
|
|
|
|
|
- logger.info("Process pool shutdown complete")
|
|
|
|
|
|
|
+ pool_module.shutdown_pool(wait=True)
|
|
|
|
|
|
|
|
app = FastAPI(lifespan=lifespan)
|
|
app = FastAPI(lifespan=lifespan)
|
|
|
templates = Jinja2Templates(directory="templates")
|
|
templates = Jinja2Templates(directory="templates")
|
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
app.mount("/static", StaticFiles(directory="static"), name="static")
|
|
|
|
|
|
|
|
|
|
+# Global semaphore to limit concurrent preview processing
|
|
|
|
|
+# Prevents resource exhaustion when loading many previews simultaneously
|
|
|
|
|
+preview_semaphore = asyncio.Semaphore(5)
|
|
|
|
|
+
|
|
|
# Pydantic models for request/response validation
|
|
# Pydantic models for request/response validation
|
|
|
class ConnectRequest(BaseModel):
|
|
class ConnectRequest(BaseModel):
|
|
|
port: Optional[str] = None
|
|
port: Optional[str] = None
|
|
@@ -1155,7 +1156,7 @@ async def list_theta_rho_files_with_metadata():
|
|
|
logger.error(f"Failed to load metadata cache, falling back to slow method: {e}")
|
|
logger.error(f"Failed to load metadata cache, falling back to slow method: {e}")
|
|
|
# Fallback to original method if cache loading fails
|
|
# Fallback to original method if cache loading fails
|
|
|
# Create tasks only when needed
|
|
# Create tasks only when needed
|
|
|
- loop = asyncio.get_event_loop()
|
|
|
|
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
tasks = [loop.run_in_executor(executor, process_file, file_path) for file_path in files]
|
|
tasks = [loop.run_in_executor(executor, process_file, file_path) for file_path in files]
|
|
|
|
|
|
|
|
for task in asyncio.as_completed(tasks):
|
|
for task in asyncio.as_completed(tasks):
|
|
@@ -1235,8 +1236,8 @@ async def get_theta_rho_coordinates(request: GetCoordinatesRequest):
|
|
|
|
|
|
|
|
# Parse the theta-rho file in a separate process for CPU-intensive work
|
|
# Parse the theta-rho file in a separate process for CPU-intensive work
|
|
|
# This prevents blocking the motion control thread
|
|
# This prevents blocking the motion control thread
|
|
|
- loop = asyncio.get_event_loop()
|
|
|
|
|
- coordinates = await loop.run_in_executor(process_pool, parse_theta_rho_file, file_path)
|
|
|
|
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
|
|
+ coordinates = await loop.run_in_executor(pool_module.get_pool(), parse_theta_rho_file, file_path)
|
|
|
|
|
|
|
|
if not coordinates:
|
|
if not coordinates:
|
|
|
raise HTTPException(status_code=400, detail="No valid coordinates found in file")
|
|
raise HTTPException(status_code=400, detail="No valid coordinates found in file")
|
|
@@ -2271,6 +2272,16 @@ async def test_mqtt_connection(request: dict):
|
|
|
status_code=500
|
|
status_code=500
|
|
|
)
|
|
)
|
|
|
|
|
|
|
|
|
|
+def _read_and_encode_preview(cache_path: str) -> str:
|
|
|
|
|
+ """Read preview image from disk and encode as base64.
|
|
|
|
|
+
|
|
|
|
|
+ Combines file I/O and base64 encoding in a single function
|
|
|
|
|
+ to be run in executor, reducing context switches.
|
|
|
|
|
+ """
|
|
|
|
|
+ with open(cache_path, 'rb') as f:
|
|
|
|
|
+ image_data = f.read()
|
|
|
|
|
+ return base64.b64encode(image_data).decode('utf-8')
|
|
|
|
|
+
|
|
|
@app.post("/preview_thr_batch")
|
|
@app.post("/preview_thr_batch")
|
|
|
async def preview_thr_batch(request: dict):
|
|
async def preview_thr_batch(request: dict):
|
|
|
start = time.time()
|
|
start = time.time()
|
|
@@ -2289,57 +2300,59 @@ async def preview_thr_batch(request: dict):
|
|
|
|
|
|
|
|
async def process_single_file(file_name):
|
|
async def process_single_file(file_name):
|
|
|
"""Process a single file and return its preview data."""
|
|
"""Process a single file and return its preview data."""
|
|
|
- t1 = time.time()
|
|
|
|
|
- try:
|
|
|
|
|
- # Normalize file path for cross-platform compatibility
|
|
|
|
|
- normalized_file_name = normalize_file_path(file_name)
|
|
|
|
|
- pattern_file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
|
|
|
|
|
-
|
|
|
|
|
- # Check file existence asynchronously
|
|
|
|
|
- exists = await asyncio.to_thread(os.path.exists, pattern_file_path)
|
|
|
|
|
- if not exists:
|
|
|
|
|
- logger.warning(f"Pattern file not found: {pattern_file_path}")
|
|
|
|
|
- return file_name, {"error": "Pattern file not found"}
|
|
|
|
|
-
|
|
|
|
|
- cache_path = get_cache_path(normalized_file_name)
|
|
|
|
|
-
|
|
|
|
|
- # Check cache existence asynchronously
|
|
|
|
|
- cache_exists = await asyncio.to_thread(os.path.exists, cache_path)
|
|
|
|
|
- if not cache_exists:
|
|
|
|
|
- logger.info(f"Cache miss for {file_name}. Generating preview...")
|
|
|
|
|
- success = await generate_image_preview(normalized_file_name)
|
|
|
|
|
- cache_exists_after = await asyncio.to_thread(os.path.exists, cache_path)
|
|
|
|
|
- if not success or not cache_exists_after:
|
|
|
|
|
- logger.error(f"Failed to generate or find preview for {file_name}")
|
|
|
|
|
- return file_name, {"error": "Failed to generate preview"}
|
|
|
|
|
-
|
|
|
|
|
- metadata = get_pattern_metadata(normalized_file_name)
|
|
|
|
|
- if metadata:
|
|
|
|
|
- first_coord_obj = metadata.get('first_coordinate')
|
|
|
|
|
- last_coord_obj = metadata.get('last_coordinate')
|
|
|
|
|
- else:
|
|
|
|
|
- logger.debug(f"Metadata cache miss for {file_name}, parsing file")
|
|
|
|
|
- # Use process pool for CPU-intensive parsing
|
|
|
|
|
- loop = asyncio.get_event_loop()
|
|
|
|
|
- coordinates = await loop.run_in_executor(process_pool, parse_theta_rho_file, pattern_file_path)
|
|
|
|
|
- first_coord = coordinates[0] if coordinates else None
|
|
|
|
|
- last_coord = coordinates[-1] if coordinates else None
|
|
|
|
|
- first_coord_obj = {"x": first_coord[0], "y": first_coord[1]} if first_coord else None
|
|
|
|
|
- last_coord_obj = {"x": last_coord[0], "y": last_coord[1]} if last_coord else None
|
|
|
|
|
-
|
|
|
|
|
- # Read image file asynchronously
|
|
|
|
|
- image_data = await asyncio.to_thread(lambda: open(cache_path, 'rb').read())
|
|
|
|
|
- image_b64 = base64.b64encode(image_data).decode('utf-8')
|
|
|
|
|
- result = {
|
|
|
|
|
- "image_data": f"data:image/webp;base64,{image_b64}",
|
|
|
|
|
- "first_coordinate": first_coord_obj,
|
|
|
|
|
- "last_coordinate": last_coord_obj
|
|
|
|
|
- }
|
|
|
|
|
- logger.debug(f"Processed {file_name} in {time.time() - t1:.2f}s")
|
|
|
|
|
- return file_name, result
|
|
|
|
|
- except Exception as e:
|
|
|
|
|
- logger.error(f"Error processing {file_name}: {str(e)}")
|
|
|
|
|
- return file_name, {"error": str(e)}
|
|
|
|
|
|
|
+ # Acquire semaphore to limit concurrent processing
|
|
|
|
|
+ async with preview_semaphore:
|
|
|
|
|
+ t1 = time.time()
|
|
|
|
|
+ try:
|
|
|
|
|
+ # Normalize file path for cross-platform compatibility
|
|
|
|
|
+ normalized_file_name = normalize_file_path(file_name)
|
|
|
|
|
+ pattern_file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
|
|
|
|
|
+
|
|
|
|
|
+ # Check file existence asynchronously
|
|
|
|
|
+ exists = await asyncio.to_thread(os.path.exists, pattern_file_path)
|
|
|
|
|
+ if not exists:
|
|
|
|
|
+ logger.warning(f"Pattern file not found: {pattern_file_path}")
|
|
|
|
|
+ return file_name, {"error": "Pattern file not found"}
|
|
|
|
|
+
|
|
|
|
|
+ cache_path = get_cache_path(normalized_file_name)
|
|
|
|
|
+
|
|
|
|
|
+ # Check cache existence asynchronously
|
|
|
|
|
+ cache_exists = await asyncio.to_thread(os.path.exists, cache_path)
|
|
|
|
|
+ if not cache_exists:
|
|
|
|
|
+ logger.info(f"Cache miss for {file_name}. Generating preview...")
|
|
|
|
|
+ success = await generate_image_preview(normalized_file_name)
|
|
|
|
|
+ cache_exists_after = await asyncio.to_thread(os.path.exists, cache_path)
|
|
|
|
|
+ if not success or not cache_exists_after:
|
|
|
|
|
+ logger.error(f"Failed to generate or find preview for {file_name}")
|
|
|
|
|
+ return file_name, {"error": "Failed to generate preview"}
|
|
|
|
|
+
|
|
|
|
|
+ metadata = get_pattern_metadata(normalized_file_name)
|
|
|
|
|
+ if metadata:
|
|
|
|
|
+ first_coord_obj = metadata.get('first_coordinate')
|
|
|
|
|
+ last_coord_obj = metadata.get('last_coordinate')
|
|
|
|
|
+ else:
|
|
|
|
|
+ logger.debug(f"Metadata cache miss for {file_name}, parsing file")
|
|
|
|
|
+ # Use process pool for CPU-intensive parsing
|
|
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
|
|
+ coordinates = await loop.run_in_executor(pool_module.get_pool(), parse_theta_rho_file, pattern_file_path)
|
|
|
|
|
+ first_coord = coordinates[0] if coordinates else None
|
|
|
|
|
+ last_coord = coordinates[-1] if coordinates else None
|
|
|
|
|
+ first_coord_obj = {"x": first_coord[0], "y": first_coord[1]} if first_coord else None
|
|
|
|
|
+ last_coord_obj = {"x": last_coord[0], "y": last_coord[1]} if last_coord else None
|
|
|
|
|
+
|
|
|
|
|
+ # Read image file and encode in executor to avoid blocking event loop
|
|
|
|
|
+ loop = asyncio.get_running_loop()
|
|
|
|
|
+ image_b64 = await loop.run_in_executor(None, _read_and_encode_preview, cache_path)
|
|
|
|
|
+ result = {
|
|
|
|
|
+ "image_data": f"data:image/webp;base64,{image_b64}",
|
|
|
|
|
+ "first_coordinate": first_coord_obj,
|
|
|
|
|
+ "last_coordinate": last_coord_obj
|
|
|
|
|
+ }
|
|
|
|
|
+ logger.debug(f"Processed {file_name} in {time.time() - t1:.2f}s")
|
|
|
|
|
+ return file_name, result
|
|
|
|
|
+ except Exception as e:
|
|
|
|
|
+ logger.error(f"Error processing {file_name}: {str(e)}")
|
|
|
|
|
+ return file_name, {"error": str(e)}
|
|
|
|
|
|
|
|
# Process all files concurrently
|
|
# Process all files concurrently
|
|
|
tasks = [process_single_file(file_name) for file_name in file_names]
|
|
tasks = [process_single_file(file_name) for file_name in file_names]
|
|
@@ -2750,11 +2763,7 @@ def signal_handler(signum, frame):
|
|
|
state.led_controller.set_power(0)
|
|
state.led_controller.set_power(0)
|
|
|
|
|
|
|
|
# Shutdown process pool to prevent semaphore leaks
|
|
# Shutdown process pool to prevent semaphore leaks
|
|
|
- global process_pool
|
|
|
|
|
- if process_pool:
|
|
|
|
|
- logger.info("Shutting down process pool...")
|
|
|
|
|
- process_pool.shutdown(wait=False, cancel_futures=True)
|
|
|
|
|
- process_pool = None
|
|
|
|
|
|
|
+ pool_module.shutdown_pool(wait=False, cancel_futures=True)
|
|
|
|
|
|
|
|
# Stop pattern manager motion controller
|
|
# Stop pattern manager motion controller
|
|
|
pattern_manager.motion_controller.stop()
|
|
pattern_manager.motion_controller.stop()
|