log_handler.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218
  1. """
  2. Memory-based log handler for capturing and streaming application logs.
  3. This module provides a circular buffer log handler that captures log messages
  4. in memory for display in the web UI, with support for real-time streaming
  5. via WebSocket.
  6. """
  7. import logging
  8. from collections import deque
  9. from datetime import datetime, timezone as dt_timezone
  10. from typing import List, Dict, Any
  11. import threading
  12. import asyncio
  13. try:
  14. from zoneinfo import ZoneInfo
  15. except ImportError:
  16. from backports.zoneinfo import ZoneInfo
  17. def _get_configured_timezone() -> ZoneInfo:
  18. """
  19. Get the configured timezone from state.
  20. Returns UTC if state is not available or timezone is invalid.
  21. """
  22. try:
  23. # Import here to avoid circular import at module load time
  24. from modules.core.state import state
  25. tz_name = getattr(state, 'timezone', 'UTC') or 'UTC'
  26. return ZoneInfo(tz_name)
  27. except Exception:
  28. return ZoneInfo('UTC')
  29. class MemoryLogHandler(logging.Handler):
  30. """
  31. A logging handler that stores log records in a circular buffer.
  32. Thread-safe implementation using a lock for concurrent access.
  33. Supports async iteration for WebSocket streaming.
  34. """
  35. def __init__(self, max_entries: int = 500):
  36. """
  37. Initialize the memory log handler.
  38. Args:
  39. max_entries: Maximum number of log entries to keep in memory.
  40. Older entries are automatically discarded.
  41. """
  42. super().__init__()
  43. self.max_entries = max_entries
  44. self._buffer: deque = deque(maxlen=max_entries)
  45. self._lock = threading.Lock()
  46. self._subscribers: List[asyncio.Queue] = []
  47. self._subscribers_lock = threading.Lock()
  48. def emit(self, record: logging.LogRecord) -> None:
  49. """
  50. Store a log record in the buffer and notify subscribers.
  51. Args:
  52. record: The log record to store.
  53. """
  54. try:
  55. log_entry = self._format_record(record)
  56. with self._lock:
  57. self._buffer.append(log_entry)
  58. # Notify all subscribers (for WebSocket streaming)
  59. self._notify_subscribers(log_entry)
  60. except Exception:
  61. self.handleError(record)
  62. def _format_record(self, record: logging.LogRecord) -> Dict[str, Any]:
  63. """
  64. Format a log record into a dictionary for JSON serialization.
  65. Args:
  66. record: The log record to format.
  67. Returns:
  68. Dictionary containing formatted log data.
  69. """
  70. # Convert timestamp to configured timezone
  71. tz = _get_configured_timezone()
  72. utc_dt = datetime.fromtimestamp(record.created, tz=dt_timezone.utc)
  73. local_dt = utc_dt.astimezone(tz)
  74. return {
  75. "timestamp": local_dt.isoformat(),
  76. "level": record.levelname,
  77. "logger": record.name,
  78. "line": record.lineno,
  79. "message": record.getMessage(),
  80. "module": record.module,
  81. }
  82. def get_logs(self, limit: int = None, level: str = None) -> List[Dict[str, Any]]:
  83. """
  84. Retrieve stored log entries.
  85. Args:
  86. limit: Maximum number of entries to return (newest first).
  87. level: Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL).
  88. Returns:
  89. List of log entries as dictionaries.
  90. """
  91. with self._lock:
  92. logs = list(self._buffer)
  93. # Filter by level if specified
  94. if level:
  95. level_upper = level.upper()
  96. logs = [log for log in logs if log["level"] == level_upper]
  97. # Return newest first, with optional limit
  98. logs.reverse()
  99. if limit:
  100. logs = logs[:limit]
  101. return logs
  102. def clear(self) -> None:
  103. """Clear all stored log entries."""
  104. with self._lock:
  105. self._buffer.clear()
  106. def subscribe(self) -> asyncio.Queue:
  107. """
  108. Subscribe to real-time log updates.
  109. Returns:
  110. An asyncio Queue that will receive new log entries.
  111. """
  112. queue = asyncio.Queue(maxsize=100)
  113. with self._subscribers_lock:
  114. self._subscribers.append(queue)
  115. return queue
  116. def unsubscribe(self, queue: asyncio.Queue) -> None:
  117. """
  118. Unsubscribe from real-time log updates.
  119. Args:
  120. queue: The queue returned by subscribe().
  121. """
  122. with self._subscribers_lock:
  123. if queue in self._subscribers:
  124. self._subscribers.remove(queue)
  125. def _notify_subscribers(self, log_entry: Dict[str, Any]) -> None:
  126. """
  127. Notify all subscribers of a new log entry.
  128. Args:
  129. log_entry: The formatted log entry to send.
  130. """
  131. with self._subscribers_lock:
  132. dead_subscribers = []
  133. for queue in self._subscribers:
  134. try:
  135. queue.put_nowait(log_entry)
  136. except asyncio.QueueFull:
  137. # If queue is full, skip this entry
  138. pass
  139. except Exception:
  140. dead_subscribers.append(queue)
  141. # Remove dead subscribers
  142. for queue in dead_subscribers:
  143. self._subscribers.remove(queue)
  144. # Global instance of the memory log handler
  145. memory_handler: MemoryLogHandler = None
  146. def init_memory_handler(max_entries: int = 500) -> MemoryLogHandler:
  147. """
  148. Initialize and install the memory log handler.
  149. This should be called once during application startup, after
  150. basicConfig but before any logging occurs.
  151. Args:
  152. max_entries: Maximum number of log entries to store.
  153. Returns:
  154. The initialized MemoryLogHandler instance.
  155. """
  156. global memory_handler
  157. memory_handler = MemoryLogHandler(max_entries=max_entries)
  158. memory_handler.setFormatter(
  159. logging.Formatter('%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s')
  160. )
  161. # Add to root logger to capture all logs
  162. root_logger = logging.getLogger()
  163. root_logger.addHandler(memory_handler)
  164. return memory_handler
  165. def get_memory_handler() -> MemoryLogHandler:
  166. """
  167. Get the global memory log handler instance.
  168. Returns:
  169. The MemoryLogHandler instance, or None if not initialized.
  170. """
  171. return memory_handler