| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216 |
- """
- Memory-based log handler for capturing and streaming application logs.
- This module provides a circular buffer log handler that captures log messages
- in memory for display in the web UI, with support for real-time streaming
- via WebSocket.
- """
- import logging
- from collections import deque
- from datetime import datetime
- from typing import List, Dict, Any
- import threading
- import asyncio
- class MemoryLogHandler(logging.Handler):
- """
- A logging handler that stores log records in a circular buffer.
- Thread-safe implementation using a lock for concurrent access.
- Supports async iteration for WebSocket streaming.
- """
- def __init__(self, max_entries: int = 500):
- """
- Initialize the memory log handler.
- Args:
- max_entries: Maximum number of log entries to keep in memory.
- Older entries are automatically discarded.
- """
- super().__init__()
- self.max_entries = max_entries
- self._buffer: deque = deque(maxlen=max_entries)
- self._lock = threading.Lock()
- self._subscribers: List[asyncio.Queue] = []
- self._subscribers_lock = threading.Lock()
- def emit(self, record: logging.LogRecord) -> None:
- """
- Store a log record in the buffer and notify subscribers.
- Args:
- record: The log record to store.
- """
- try:
- log_entry = self._format_record(record)
- with self._lock:
- self._buffer.append(log_entry)
- # Notify all subscribers (for WebSocket streaming)
- self._notify_subscribers(log_entry)
- except Exception:
- self.handleError(record)
- def _format_record(self, record: logging.LogRecord) -> Dict[str, Any]:
- """
- Format a log record into a dictionary for JSON serialization.
- Args:
- record: The log record to format.
- Returns:
- Dictionary containing formatted log data.
- """
- return {
- "timestamp": datetime.fromtimestamp(record.created).isoformat(),
- "level": record.levelname,
- "logger": record.name,
- "line": record.lineno,
- "message": record.getMessage(),
- "module": record.module,
- }
- def get_logs(self, limit: int = None, level: str = None, offset: int = 0) -> List[Dict[str, Any]]:
- """
- Retrieve stored log entries with pagination support.
- Args:
- limit: Maximum number of entries to return (newest first).
- level: Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
- offset: Number of entries to skip from the newest (for pagination).
- Returns:
- List of log entries as dictionaries.
- """
- with self._lock:
- logs = list(self._buffer)
- # Filter by level if specified
- if level:
- level_upper = level.upper()
- logs = [log for log in logs if log["level"] == level_upper]
- # Return newest first
- logs.reverse()
- # Apply offset for pagination
- if offset > 0:
- logs = logs[offset:]
- # Apply limit
- if limit:
- logs = logs[:limit]
- return logs
- def get_total_count(self, level: str = None) -> int:
- """
- Get total count of log entries (optionally filtered by level).
- Args:
- level: Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
- Returns:
- Total count of matching log entries.
- """
- with self._lock:
- if not level:
- return len(self._buffer)
- level_upper = level.upper()
- return sum(1 for log in self._buffer if log["level"] == level_upper)
- def clear(self) -> None:
- """Clear all stored log entries."""
- with self._lock:
- self._buffer.clear()
- def subscribe(self) -> asyncio.Queue:
- """
- Subscribe to real-time log updates.
- Returns:
- An asyncio Queue that will receive new log entries.
- """
- queue = asyncio.Queue(maxsize=100)
- with self._subscribers_lock:
- self._subscribers.append(queue)
- return queue
- def unsubscribe(self, queue: asyncio.Queue) -> None:
- """
- Unsubscribe from real-time log updates.
- Args:
- queue: The queue returned by subscribe().
- """
- with self._subscribers_lock:
- if queue in self._subscribers:
- self._subscribers.remove(queue)
- def _notify_subscribers(self, log_entry: Dict[str, Any]) -> None:
- """
- Notify all subscribers of a new log entry.
- Args:
- log_entry: The formatted log entry to send.
- """
- with self._subscribers_lock:
- dead_subscribers = []
- for queue in self._subscribers:
- try:
- queue.put_nowait(log_entry)
- except asyncio.QueueFull:
- # If queue is full, skip this entry
- pass
- except Exception:
- dead_subscribers.append(queue)
- # Remove dead subscribers
- for queue in dead_subscribers:
- self._subscribers.remove(queue)
- # Global instance of the memory log handler
- memory_handler: MemoryLogHandler = None
- def init_memory_handler(max_entries: int = 500) -> MemoryLogHandler:
- """
- Initialize and install the memory log handler.
- This should be called once during application startup, after
- basicConfig but before any logging occurs.
- Args:
- max_entries: Maximum number of log entries to store.
- Returns:
- The initialized MemoryLogHandler instance.
- """
- global memory_handler
- memory_handler = MemoryLogHandler(max_entries=max_entries)
- memory_handler.setFormatter(
- logging.Formatter('%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s')
- )
- # Add to root logger to capture all logs
- root_logger = logging.getLogger()
- root_logger.addHandler(memory_handler)
- return memory_handler
- def get_memory_handler() -> MemoryLogHandler:
- """
- Get the global memory log handler instance.
- Returns:
- The MemoryLogHandler instance, or None if not initialized.
- """
- return memory_handler
|