1
0

log_handler.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193
  1. """
  2. Memory-based log handler for capturing and streaming application logs.
  3. This module provides a circular buffer log handler that captures log messages
  4. in memory for display in the web UI, with support for real-time streaming
  5. via WebSocket.
  6. """
  7. import logging
  8. from collections import deque
  9. from datetime import datetime
  10. from typing import List, Dict, Any
  11. import threading
  12. import asyncio
  13. class MemoryLogHandler(logging.Handler):
  14. """
  15. A logging handler that stores log records in a circular buffer.
  16. Thread-safe implementation using a lock for concurrent access.
  17. Supports async iteration for WebSocket streaming.
  18. """
  19. def __init__(self, max_entries: int = 500):
  20. """
  21. Initialize the memory log handler.
  22. Args:
  23. max_entries: Maximum number of log entries to keep in memory.
  24. Older entries are automatically discarded.
  25. """
  26. super().__init__()
  27. self.max_entries = max_entries
  28. self._buffer: deque = deque(maxlen=max_entries)
  29. self._lock = threading.Lock()
  30. self._subscribers: List[asyncio.Queue] = []
  31. self._subscribers_lock = threading.Lock()
  32. def emit(self, record: logging.LogRecord) -> None:
  33. """
  34. Store a log record in the buffer and notify subscribers.
  35. Args:
  36. record: The log record to store.
  37. """
  38. try:
  39. log_entry = self._format_record(record)
  40. with self._lock:
  41. self._buffer.append(log_entry)
  42. # Notify all subscribers (for WebSocket streaming)
  43. self._notify_subscribers(log_entry)
  44. except Exception:
  45. self.handleError(record)
  46. def _format_record(self, record: logging.LogRecord) -> Dict[str, Any]:
  47. """
  48. Format a log record into a dictionary for JSON serialization.
  49. Args:
  50. record: The log record to format.
  51. Returns:
  52. Dictionary containing formatted log data.
  53. """
  54. return {
  55. "timestamp": datetime.fromtimestamp(record.created).isoformat(),
  56. "level": record.levelname,
  57. "logger": record.name,
  58. "line": record.lineno,
  59. "message": record.getMessage(),
  60. "module": record.module,
  61. }
  62. def get_logs(self, limit: int = None, level: str = None) -> List[Dict[str, Any]]:
  63. """
  64. Retrieve stored log entries.
  65. Args:
  66. limit: Maximum number of entries to return (newest first).
  67. level: Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
  68. Returns:
  69. List of log entries as dictionaries.
  70. """
  71. with self._lock:
  72. logs = list(self._buffer)
  73. # Filter by level if specified
  74. if level:
  75. level_upper = level.upper()
  76. logs = [log for log in logs if log["level"] == level_upper]
  77. # Return newest first, with optional limit
  78. logs.reverse()
  79. if limit:
  80. logs = logs[:limit]
  81. return logs
  82. def clear(self) -> None:
  83. """Clear all stored log entries."""
  84. with self._lock:
  85. self._buffer.clear()
  86. def subscribe(self) -> asyncio.Queue:
  87. """
  88. Subscribe to real-time log updates.
  89. Returns:
  90. An asyncio Queue that will receive new log entries.
  91. """
  92. queue = asyncio.Queue(maxsize=100)
  93. with self._subscribers_lock:
  94. self._subscribers.append(queue)
  95. return queue
  96. def unsubscribe(self, queue: asyncio.Queue) -> None:
  97. """
  98. Unsubscribe from real-time log updates.
  99. Args:
  100. queue: The queue returned by subscribe().
  101. """
  102. with self._subscribers_lock:
  103. if queue in self._subscribers:
  104. self._subscribers.remove(queue)
  105. def _notify_subscribers(self, log_entry: Dict[str, Any]) -> None:
  106. """
  107. Notify all subscribers of a new log entry.
  108. Args:
  109. log_entry: The formatted log entry to send.
  110. """
  111. with self._subscribers_lock:
  112. dead_subscribers = []
  113. for queue in self._subscribers:
  114. try:
  115. queue.put_nowait(log_entry)
  116. except asyncio.QueueFull:
  117. # If queue is full, skip this entry
  118. pass
  119. except Exception:
  120. dead_subscribers.append(queue)
  121. # Remove dead subscribers
  122. for queue in dead_subscribers:
  123. self._subscribers.remove(queue)
  124. # Global instance of the memory log handler
  125. memory_handler: MemoryLogHandler = None
  126. def init_memory_handler(max_entries: int = 500) -> MemoryLogHandler:
  127. """
  128. Initialize and install the memory log handler.
  129. This should be called once during application startup, after
  130. basicConfig but before any logging occurs.
  131. Args:
  132. max_entries: Maximum number of log entries to store.
  133. Returns:
  134. The initialized MemoryLogHandler instance.
  135. """
  136. global memory_handler
  137. memory_handler = MemoryLogHandler(max_entries=max_entries)
  138. memory_handler.setFormatter(
  139. logging.Formatter('%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s')
  140. )
  141. # Add to root logger to capture all logs
  142. root_logger = logging.getLogger()
  143. root_logger.addHandler(memory_handler)
  144. return memory_handler
  145. def get_memory_handler() -> MemoryLogHandler:
  146. """
  147. Get the global memory log handler instance.
  148. Returns:
  149. The MemoryLogHandler instance, or None if not initialized.
  150. """
  151. return memory_handler