1
0

log_handler.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. """
  2. Memory-based log handler for capturing and streaming application logs.
  3. This module provides a circular buffer log handler that captures log messages
  4. in memory for display in the web UI, with support for real-time streaming
  5. via WebSocket.
  6. """
  7. import logging
  8. from collections import deque
  9. from datetime import datetime
  10. from typing import List, Dict, Any
  11. import threading
  12. import asyncio
  13. class MemoryLogHandler(logging.Handler):
  14. """
  15. A logging handler that stores log records in a circular buffer.
  16. Thread-safe implementation using a lock for concurrent access.
  17. Supports async iteration for WebSocket streaming.
  18. """
  19. def __init__(self, max_entries: int = 500):
  20. """
  21. Initialize the memory log handler.
  22. Args:
  23. max_entries: Maximum number of log entries to keep in memory.
  24. Older entries are automatically discarded.
  25. """
  26. super().__init__()
  27. self.max_entries = max_entries
  28. self._buffer: deque = deque(maxlen=max_entries)
  29. self._lock = threading.Lock()
  30. self._subscribers: List[asyncio.Queue] = []
  31. self._subscribers_lock = threading.Lock()
  32. def emit(self, record: logging.LogRecord) -> None:
  33. """
  34. Store a log record in the buffer and notify subscribers.
  35. Args:
  36. record: The log record to store.
  37. """
  38. try:
  39. log_entry = self._format_record(record)
  40. with self._lock:
  41. self._buffer.append(log_entry)
  42. # Notify all subscribers (for WebSocket streaming)
  43. self._notify_subscribers(log_entry)
  44. except Exception:
  45. self.handleError(record)
  46. def _format_record(self, record: logging.LogRecord) -> Dict[str, Any]:
  47. """
  48. Format a log record into a dictionary for JSON serialization.
  49. Args:
  50. record: The log record to format.
  51. Returns:
  52. Dictionary containing formatted log data.
  53. """
  54. return {
  55. "timestamp": datetime.fromtimestamp(record.created).isoformat(),
  56. "level": record.levelname,
  57. "logger": record.name,
  58. "line": record.lineno,
  59. "message": record.getMessage(),
  60. "module": record.module,
  61. }
  62. def get_logs(self, limit: int = None, level: str = None, offset: int = 0) -> List[Dict[str, Any]]:
  63. """
  64. Retrieve stored log entries with pagination support.
  65. Args:
  66. limit: Maximum number of entries to return (newest first).
  67. level: Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
  68. offset: Number of entries to skip from the newest (for pagination).
  69. Returns:
  70. List of log entries as dictionaries.
  71. """
  72. with self._lock:
  73. logs = list(self._buffer)
  74. # Filter by level if specified
  75. if level:
  76. level_upper = level.upper()
  77. logs = [log for log in logs if log["level"] == level_upper]
  78. # Return newest first
  79. logs.reverse()
  80. # Apply offset for pagination
  81. if offset > 0:
  82. logs = logs[offset:]
  83. # Apply limit
  84. if limit:
  85. logs = logs[:limit]
  86. return logs
  87. def get_total_count(self, level: str = None) -> int:
  88. """
  89. Get total count of log entries (optionally filtered by level).
  90. Args:
  91. level: Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
  92. Returns:
  93. Total count of matching log entries.
  94. """
  95. with self._lock:
  96. if not level:
  97. return len(self._buffer)
  98. level_upper = level.upper()
  99. return sum(1 for log in self._buffer if log["level"] == level_upper)
  100. def clear(self) -> None:
  101. """Clear all stored log entries."""
  102. with self._lock:
  103. self._buffer.clear()
  104. def subscribe(self) -> asyncio.Queue:
  105. """
  106. Subscribe to real-time log updates.
  107. Returns:
  108. An asyncio Queue that will receive new log entries.
  109. """
  110. queue = asyncio.Queue(maxsize=100)
  111. with self._subscribers_lock:
  112. self._subscribers.append(queue)
  113. return queue
  114. def unsubscribe(self, queue: asyncio.Queue) -> None:
  115. """
  116. Unsubscribe from real-time log updates.
  117. Args:
  118. queue: The queue returned by subscribe().
  119. """
  120. with self._subscribers_lock:
  121. if queue in self._subscribers:
  122. self._subscribers.remove(queue)
  123. def _notify_subscribers(self, log_entry: Dict[str, Any]) -> None:
  124. """
  125. Notify all subscribers of a new log entry.
  126. Args:
  127. log_entry: The formatted log entry to send.
  128. """
  129. with self._subscribers_lock:
  130. dead_subscribers = []
  131. for queue in self._subscribers:
  132. try:
  133. queue.put_nowait(log_entry)
  134. except asyncio.QueueFull:
  135. # If queue is full, skip this entry
  136. pass
  137. except Exception:
  138. dead_subscribers.append(queue)
  139. # Remove dead subscribers
  140. for queue in dead_subscribers:
  141. self._subscribers.remove(queue)
  142. # Global instance of the memory log handler
  143. memory_handler: MemoryLogHandler = None
  144. def init_memory_handler(max_entries: int = 500) -> MemoryLogHandler:
  145. """
  146. Initialize and install the memory log handler.
  147. This should be called once during application startup, after
  148. basicConfig but before any logging occurs.
  149. Args:
  150. max_entries: Maximum number of log entries to store.
  151. Returns:
  152. The initialized MemoryLogHandler instance.
  153. """
  154. global memory_handler
  155. memory_handler = MemoryLogHandler(max_entries=max_entries)
  156. memory_handler.setFormatter(
  157. logging.Formatter('%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s')
  158. )
  159. # Add to root logger to capture all logs
  160. root_logger = logging.getLogger()
  161. root_logger.addHandler(memory_handler)
  162. return memory_handler
  163. def get_memory_handler() -> MemoryLogHandler:
  164. """
  165. Get the global memory log handler instance.
  166. Returns:
  167. The MemoryLogHandler instance, or None if not initialized.
  168. """
  169. return memory_handler