mdns.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. """mDNS advertisement and discovery for multi-table support.
  2. This module provides:
  3. - Service advertisement: Allows this table to be discovered by other frontends
  4. - Service discovery: Finds other Dune Weaver tables on the local network
  5. """
  6. import asyncio
  7. import logging
  8. import socket
  9. from typing import List, Dict, Optional
  10. from zeroconf import ServiceInfo, Zeroconf, ServiceBrowser, ServiceListener
  11. from zeroconf.asyncio import AsyncZeroconf, AsyncServiceBrowser
  12. logger = logging.getLogger(__name__)
  13. # Service type for Dune Weaver tables
  14. SERVICE_TYPE = "_duneweaver._tcp.local."
  15. class DuneWeaverServiceListener(ServiceListener):
  16. """Listener for discovered Dune Weaver services."""
  17. def __init__(self):
  18. self.discovered_tables: Dict[str, Dict] = {}
  19. def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
  20. """Called when a new service is discovered."""
  21. info = zc.get_service_info(type_, name)
  22. if info:
  23. self._process_service_info(name, info)
  24. def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
  25. """Called when an existing service is updated."""
  26. info = zc.get_service_info(type_, name)
  27. if info:
  28. self._process_service_info(name, info)
  29. def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
  30. """Called when a service is removed."""
  31. if name in self.discovered_tables:
  32. del self.discovered_tables[name]
  33. logger.debug(f"Table removed: {name}")
  34. def _process_service_info(self, name: str, info: ServiceInfo) -> None:
  35. """Extract table information from service info."""
  36. try:
  37. # Get properties
  38. properties = {}
  39. if info.properties:
  40. for key, value in info.properties.items():
  41. if isinstance(value, bytes):
  42. properties[key.decode() if isinstance(key, bytes) else key] = value.decode()
  43. else:
  44. properties[key if isinstance(key, str) else key.decode()] = str(value)
  45. # Get addresses
  46. addresses = info.parsed_addresses()
  47. host = addresses[0] if addresses else None
  48. port = info.port
  49. if host and port:
  50. self.discovered_tables[name] = {
  51. "id": properties.get("id", ""),
  52. "name": properties.get("name", name.replace(f".{SERVICE_TYPE}", "")),
  53. "host": host,
  54. "port": port,
  55. "version": properties.get("version", "unknown"),
  56. "url": f"http://{host}:{port}"
  57. }
  58. logger.debug(f"Discovered table: {self.discovered_tables[name]}")
  59. except Exception as e:
  60. logger.warning(f"Error processing service info for {name}: {e}")
  61. class MDNSManager:
  62. """Manages mDNS advertisement and discovery for Dune Weaver."""
  63. def __init__(self):
  64. self._zeroconf: Optional[AsyncZeroconf] = None
  65. self._service_info: Optional[ServiceInfo] = None
  66. self._browser: Optional[AsyncServiceBrowser] = None
  67. self._listener: Optional[DuneWeaverServiceListener] = None
  68. self._advertised = False
  69. def _get_local_ip(self) -> str:
  70. """Get the local IP address of this machine."""
  71. try:
  72. # Create a socket to determine our IP
  73. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  74. try:
  75. # Doesn't need to be reachable
  76. s.connect(("10.255.255.255", 1))
  77. ip = s.getsockname()[0]
  78. except Exception:
  79. ip = "127.0.0.1"
  80. finally:
  81. s.close()
  82. return ip
  83. except Exception:
  84. return "127.0.0.1"
  85. async def start_advertisement(self, table_id: str, table_name: str, version: str, port: int = 8080) -> bool:
  86. """
  87. Start advertising this table on the network.
  88. Args:
  89. table_id: Unique identifier for this table
  90. table_name: Human-readable name for this table
  91. version: Software version
  92. port: HTTP port the server is running on
  93. Returns:
  94. True if advertisement started successfully
  95. """
  96. try:
  97. if self._advertised:
  98. await self.stop_advertisement()
  99. local_ip = self._get_local_ip()
  100. hostname = socket.gethostname()
  101. # Create service info
  102. # Service name must be unique on the network
  103. service_name = f"{table_name.replace(' ', '_')}_{table_id[:8]}.{SERVICE_TYPE}"
  104. self._service_info = ServiceInfo(
  105. SERVICE_TYPE,
  106. service_name,
  107. addresses=[socket.inet_aton(local_ip)],
  108. port=port,
  109. properties={
  110. "id": table_id,
  111. "name": table_name,
  112. "version": version,
  113. "hostname": hostname
  114. },
  115. server=f"{hostname}.local."
  116. )
  117. # Start zeroconf and register service
  118. self._zeroconf = AsyncZeroconf()
  119. await self._zeroconf.async_register_service(self._service_info)
  120. self._advertised = True
  121. logger.info(f"mDNS: Advertising table '{table_name}' at {local_ip}:{port}")
  122. return True
  123. except Exception as e:
  124. logger.error(f"Failed to start mDNS advertisement: {e}")
  125. return False
  126. async def stop_advertisement(self) -> None:
  127. """Stop advertising this table."""
  128. try:
  129. if self._service_info and self._zeroconf:
  130. await self._zeroconf.async_unregister_service(self._service_info)
  131. if self._zeroconf:
  132. await self._zeroconf.async_close()
  133. self._advertised = False
  134. self._service_info = None
  135. self._zeroconf = None
  136. logger.info("mDNS: Stopped advertising")
  137. except Exception as e:
  138. logger.warning(f"Error stopping mDNS advertisement: {e}")
  139. async def discover_tables(self, timeout: float = 3.0) -> List[Dict]:
  140. """
  141. Discover Dune Weaver tables on the local network.
  142. Args:
  143. timeout: How long to wait for discovery (seconds)
  144. Returns:
  145. List of discovered tables with their info
  146. """
  147. discovered = []
  148. try:
  149. # Create an async zeroconf instance for discovery
  150. async_zc = AsyncZeroconf()
  151. listener = DuneWeaverServiceListener()
  152. # Start browsing for services using async browser
  153. browser = AsyncServiceBrowser(async_zc.zeroconf, SERVICE_TYPE, listener)
  154. # Wait for discovery
  155. await asyncio.sleep(timeout)
  156. # Collect results
  157. discovered = list(listener.discovered_tables.values())
  158. # Cleanup using async methods
  159. browser.cancel()
  160. await async_zc.async_close()
  161. logger.info(f"mDNS: Discovered {len(discovered)} table(s)")
  162. except Exception as e:
  163. logger.error(f"Error during mDNS discovery: {e}")
  164. return discovered
  165. async def update_advertisement(self, table_name: str) -> None:
  166. """Update the advertised table name."""
  167. if self._advertised and self._service_info:
  168. # Get current info
  169. from modules.core.state import state
  170. from modules.core.version_manager import version_manager
  171. # Restart advertisement with new name
  172. await self.stop_advertisement()
  173. await self.start_advertisement(
  174. table_id=state.table_id,
  175. table_name=table_name,
  176. version=await version_manager.get_current_version(),
  177. port=state.server_port or 8080
  178. )
  179. # Singleton instance
  180. mdns_manager = MDNSManager()
  181. async def start_mdns_advertisement():
  182. """Start mDNS advertisement using current state."""
  183. from modules.core.state import state
  184. from modules.core.version_manager import version_manager
  185. await mdns_manager.start_advertisement(
  186. table_id=state.table_id,
  187. table_name=state.table_name,
  188. version=await version_manager.get_current_version(),
  189. port=state.server_port or 8080
  190. )
  191. async def stop_mdns_advertisement():
  192. """Stop mDNS advertisement."""
  193. await mdns_manager.stop_advertisement()
  194. async def discover_tables(timeout: float = 3.0) -> List[Dict]:
  195. """Discover Dune Weaver tables on the network."""
  196. return await mdns_manager.discover_tables(timeout)