Browse Source

Fix process pool semaphore leaks and WebSocket connection warnings

Backend:
- Add shutdown flag to prevent race conditions in process pool
- Use wait=True in signal handler to allow workers to release semaphores

Frontend:
- Fix WebSocket cleanup to avoid closing CONNECTING sockets
- Add onopen handlers to gracefully close orphaned connections
- Properly handle React StrictMode double-mounting

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
tuanchris 2 weeks ago
parent
commit
d8ed3ca772

+ 12 - 1
frontend/src/components/NowPlayingBar.tsx

@@ -197,6 +197,13 @@ export function NowPlayingBar({ isLogsOpen = false, isVisible, openExpanded = fa
       // Assign to ref IMMEDIATELY so concurrent calls see it's connecting
       wsRef.current = ws
 
+      ws.onopen = () => {
+        if (!shouldReconnect) {
+          // Component unmounted while connecting - close the WebSocket now
+          ws.close()
+        }
+      }
+
       ws.onmessage = (event) => {
         if (!shouldReconnect) return
         try {
@@ -234,7 +241,11 @@ export function NowPlayingBar({ isLogsOpen = false, isVisible, openExpanded = fa
         clearTimeout(reconnectTimeout)
       }
       if (wsRef.current) {
-        wsRef.current.close()
+        // Only close if already OPEN - CONNECTING WebSockets will close in onopen
+        if (wsRef.current.readyState === WebSocket.OPEN) {
+          wsRef.current.close()
+        }
+        wsRef.current = null
       }
     }
   }, [])

+ 98 - 20
frontend/src/components/layout/Layout.tsx

@@ -185,7 +185,11 @@ export function Layout() {
       wsRef.current = ws
 
       ws.onopen = () => {
-        if (!isMounted) return
+        if (!isMounted) {
+          // Component unmounted while connecting - close the WebSocket now
+          ws.close()
+          return
+        }
         setIsBackendConnected(true)
         setConnectionAttempts(0)
         // Dispatch event so pages can refetch data
@@ -277,7 +281,10 @@ export function Layout() {
         clearTimeout(reconnectTimeout)
       }
       if (wsRef.current) {
-        wsRef.current.close()
+        // Only close if already OPEN - CONNECTING WebSockets will close in onopen
+        if (wsRef.current.readyState === WebSocket.OPEN) {
+          wsRef.current.close()
+        }
         wsRef.current = null
       }
     }
@@ -286,14 +293,16 @@ export function Layout() {
   // Connect to logs WebSocket when drawer opens
   useEffect(() => {
     if (!isLogsOpen) {
-      // Close WebSocket when drawer closes
-      if (logsWsRef.current) {
+      // Close WebSocket when drawer closes - only if OPEN (CONNECTING will close in onopen)
+      if (logsWsRef.current && logsWsRef.current.readyState === WebSocket.OPEN) {
         logsWsRef.current.close()
-        logsWsRef.current = null
       }
+      logsWsRef.current = null
       return
     }
 
+    let shouldConnect = true
+
     // Fetch initial logs
     const fetchInitialLogs = async () => {
       try {
@@ -322,9 +331,27 @@ export function Layout() {
     let reconnectTimeout: ReturnType<typeof setTimeout> | null = null
 
     const connectLogsWebSocket = () => {
+      // Don't interrupt an existing connection that's still connecting
+      if (logsWsRef.current) {
+        if (logsWsRef.current.readyState === WebSocket.CONNECTING) {
+          return // Already connecting, wait for it
+        }
+        if (logsWsRef.current.readyState === WebSocket.OPEN) {
+          logsWsRef.current.close()
+        }
+        logsWsRef.current = null
+      }
+
       const ws = new WebSocket(apiClient.getWebSocketUrl('/ws/logs'))
+      // Assign to ref IMMEDIATELY so concurrent calls see it's connecting
+      logsWsRef.current = ws
 
       ws.onopen = () => {
+        if (!shouldConnect) {
+          // Effect cleanup ran while connecting - close now
+          ws.close()
+          return
+        }
         console.log('Logs WebSocket connected')
       }
 
@@ -364,10 +391,11 @@ export function Layout() {
       }
 
       ws.onclose = () => {
+        if (!shouldConnect) return
         console.log('Logs WebSocket closed, reconnecting...')
         // Reconnect after 3 seconds if drawer is still open
         reconnectTimeout = setTimeout(() => {
-          if (logsWsRef.current === ws) {
+          if (shouldConnect && logsWsRef.current === ws) {
             connectLogsWebSocket()
           }
         }, 3000)
@@ -376,18 +404,20 @@ export function Layout() {
       ws.onerror = (error) => {
         console.error('Logs WebSocket error:', error)
       }
-
-      logsWsRef.current = ws
     }
 
     connectLogsWebSocket()
 
     return () => {
+      shouldConnect = false
       if (reconnectTimeout) {
         clearTimeout(reconnectTimeout)
       }
       if (logsWsRef.current) {
-        logsWsRef.current.close()
+        // Only close if already OPEN - CONNECTING WebSockets will close in onopen
+        if (logsWsRef.current.readyState === WebSocket.OPEN) {
+          logsWsRef.current.close()
+        }
         logsWsRef.current = null
       }
     }
@@ -513,11 +543,11 @@ export function Layout() {
 
     if (!showOverlay) {
       setConnectionLogs([])
-      // Close WebSocket if open
-      if (blockingLogsWsRef.current) {
+      // Close WebSocket if open - only if OPEN (CONNECTING will close in onopen)
+      if (blockingLogsWsRef.current && blockingLogsWsRef.current.readyState === WebSocket.OPEN) {
         blockingLogsWsRef.current.close()
-        blockingLogsWsRef.current = null
       }
+      blockingLogsWsRef.current = null
       return
     }
 
@@ -549,7 +579,29 @@ export function Layout() {
     if (isHoming && isBackendConnected) {
       addLog('INFO', 'Homing started...')
 
+      let shouldConnect = true
+
+      // Don't interrupt an existing connection that's still connecting
+      if (blockingLogsWsRef.current) {
+        if (blockingLogsWsRef.current.readyState === WebSocket.CONNECTING) {
+          return // Already connecting, wait for it
+        }
+        if (blockingLogsWsRef.current.readyState === WebSocket.OPEN) {
+          blockingLogsWsRef.current.close()
+        }
+        blockingLogsWsRef.current = null
+      }
+
       const ws = new WebSocket(apiClient.getWebSocketUrl('/ws/logs'))
+      // Assign to ref IMMEDIATELY so concurrent calls see it's connecting
+      blockingLogsWsRef.current = ws
+
+      ws.onopen = () => {
+        if (!shouldConnect) {
+          // Effect cleanup ran while connecting - close now
+          ws.close()
+        }
+      }
 
       ws.onmessage = (event) => {
         try {
@@ -585,10 +637,12 @@ export function Layout() {
         }
       }
 
-      blockingLogsWsRef.current = ws
-
       return () => {
-        ws.close()
+        shouldConnect = false
+        // Only close if already OPEN - CONNECTING WebSockets will close in onopen
+        if (ws.readyState === WebSocket.OPEN) {
+          ws.close()
+        }
         blockingLogsWsRef.current = null
       }
     }
@@ -618,11 +672,32 @@ export function Layout() {
     if (!isBackendConnected) return
 
     let reconnectTimeout: ReturnType<typeof setTimeout> | null = null
+    let shouldConnect = true
 
     const connectCacheWebSocket = () => {
-      if (cacheWsRef.current) return
+      if (!shouldConnect) return
+      // Don't interrupt an existing connection that's still connecting
+      if (cacheWsRef.current) {
+        if (cacheWsRef.current.readyState === WebSocket.CONNECTING) {
+          return // Already connecting, wait for it
+        }
+        if (cacheWsRef.current.readyState === WebSocket.OPEN) {
+          return // Already connected
+        }
+        // CLOSING or CLOSED state - clear the ref
+        cacheWsRef.current = null
+      }
 
       const ws = new WebSocket(apiClient.getWebSocketUrl('/ws/cache-progress'))
+      // Assign to ref IMMEDIATELY so concurrent calls see it's connecting
+      cacheWsRef.current = ws
+
+      ws.onopen = () => {
+        if (!shouldConnect) {
+          // Effect cleanup ran while connecting - close now
+          ws.close()
+        }
+      }
 
       ws.onmessage = (event) => {
         try {
@@ -656,9 +731,10 @@ export function Layout() {
       }
 
       ws.onclose = () => {
+        if (!shouldConnect) return
         cacheWsRef.current = null
         // Reconnect after 3 seconds
-        if (isBackendConnected) {
+        if (shouldConnect && isBackendConnected) {
           reconnectTimeout = setTimeout(connectCacheWebSocket, 3000)
         }
       }
@@ -666,18 +742,20 @@ export function Layout() {
       ws.onerror = () => {
         // Will trigger onclose
       }
-
-      cacheWsRef.current = ws
     }
 
     connectCacheWebSocket()
 
     return () => {
+      shouldConnect = false
       if (reconnectTimeout) {
         clearTimeout(reconnectTimeout)
       }
       if (cacheWsRef.current) {
-        cacheWsRef.current.close()
+        // Only close if already OPEN - CONNECTING WebSockets will close in onopen
+        if (cacheWsRef.current.readyState === WebSocket.OPEN) {
+          cacheWsRef.current.close()
+        }
         cacheWsRef.current = null
       }
     }

+ 30 - 2
frontend/src/pages/TableControlPage.tsx

@@ -50,11 +50,33 @@ export function TableControlPage() {
   // Connect to status WebSocket to get current speed and playback status
   useEffect(() => {
     let ws: WebSocket | null = null
+    let shouldReconnect = true
 
     const connect = () => {
+      if (!shouldReconnect) return
+
+      // Don't interrupt an existing connection that's still connecting
+      if (ws) {
+        if (ws.readyState === WebSocket.CONNECTING) {
+          return // Already connecting, wait for it
+        }
+        if (ws.readyState === WebSocket.OPEN) {
+          ws.close()
+        }
+        ws = null
+      }
+
       ws = new WebSocket(apiClient.getWebSocketUrl('/ws/status'))
 
+      ws.onopen = () => {
+        if (!shouldReconnect) {
+          // Component unmounted while connecting - close the WebSocket now
+          ws?.close()
+        }
+      }
+
       ws.onmessage = (event) => {
+        if (!shouldReconnect) return
         try {
           const message = JSON.parse(event.data)
           if (message.type === 'status_update' && message.data) {
@@ -74,13 +96,19 @@ export function TableControlPage() {
 
     // Reconnect when table changes
     const unsubscribe = apiClient.onBaseUrlChange(() => {
-      if (ws) ws.close()
       connect()
     })
 
     return () => {
+      shouldReconnect = false
       unsubscribe()
-      if (ws) ws.close()
+      if (ws) {
+        // Only close if already OPEN - CONNECTING WebSockets will close in onopen
+        if (ws.readyState === WebSocket.OPEN) {
+          ws.close()
+        }
+        ws = null
+      }
     }
   }, [])
 

+ 2 - 2
main.py

@@ -3101,8 +3101,8 @@ def signal_handler(signum, frame):
         if state.led_controller:
             state.led_controller.set_power(0)
 
-        # Shutdown process pool to prevent semaphore leaks
-        pool_module.shutdown_pool(wait=False, cancel_futures=True)
+        # Shutdown process pool - wait=True allows workers to release semaphores properly
+        pool_module.shutdown_pool(wait=True)
 
         # Stop pattern manager motion controller
         pattern_manager.motion_controller.stop()

+ 20 - 2
modules/core/process_pool.py

@@ -13,6 +13,7 @@ from modules.core import scheduling
 logger = logging.getLogger(__name__)
 
 _pool: Optional[ProcessPoolExecutor] = None
+_shutdown_in_progress: bool = False
 
 
 def _get_worker_count() -> int:
@@ -57,9 +58,26 @@ def get_pool() -> ProcessPoolExecutor:
 
 
 def shutdown_pool(wait: bool = True, cancel_futures: bool = False):
-    """Shutdown the process pool."""
-    global _pool
+    """Shutdown the process pool.
+
+    Args:
+        wait: If True, wait for workers to finish current tasks before shutdown.
+              This allows workers to properly release semaphores.
+        cancel_futures: If True, cancel pending futures. Use with caution as this
+                       can cause semaphore leaks if wait=False.
+
+    Note: Always use wait=True to prevent semaphore leaks. The wait=False option
+    exists only for emergency shutdown scenarios.
+    """
+    global _pool, _shutdown_in_progress
+
+    # Prevent concurrent shutdown calls (race condition between signal handler and lifespan)
+    if _shutdown_in_progress:
+        logger.debug("Pool shutdown already in progress, skipping")
+        return
+
     if _pool is not None:
+        _shutdown_in_progress = True
         _pool.shutdown(wait=wait, cancel_futures=cancel_futures)
         _pool = None
         logger.info("Process pool shut down")