| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- """mDNS advertisement and discovery for multi-table support.
- This module provides:
- - Service advertisement: Allows this table to be discovered by other frontends
- - Service discovery: Finds other Dune Weaver tables on the local network
- """
- import asyncio
- import logging
- import socket
- from typing import List, Dict, Optional
- from zeroconf import ServiceInfo, Zeroconf, ServiceBrowser, ServiceListener
- from zeroconf.asyncio import AsyncZeroconf, AsyncServiceBrowser
- logger = logging.getLogger(__name__)
- # Service type for Dune Weaver tables
- SERVICE_TYPE = "_duneweaver._tcp.local."
- class DuneWeaverServiceListener(ServiceListener):
- """Listener for discovered Dune Weaver services."""
- def __init__(self):
- self.discovered_tables: Dict[str, Dict] = {}
- def add_service(self, zc: Zeroconf, type_: str, name: str) -> None:
- """Called when a new service is discovered."""
- info = zc.get_service_info(type_, name)
- if info:
- self._process_service_info(name, info)
- def update_service(self, zc: Zeroconf, type_: str, name: str) -> None:
- """Called when an existing service is updated."""
- info = zc.get_service_info(type_, name)
- if info:
- self._process_service_info(name, info)
- def remove_service(self, zc: Zeroconf, type_: str, name: str) -> None:
- """Called when a service is removed."""
- if name in self.discovered_tables:
- del self.discovered_tables[name]
- logger.debug(f"Table removed: {name}")
- def _process_service_info(self, name: str, info: ServiceInfo) -> None:
- """Extract table information from service info."""
- try:
- # Get properties
- properties = {}
- if info.properties:
- for key, value in info.properties.items():
- if isinstance(value, bytes):
- properties[key.decode() if isinstance(key, bytes) else key] = value.decode()
- else:
- properties[key if isinstance(key, str) else key.decode()] = str(value)
- # Get addresses
- addresses = info.parsed_addresses()
- host = addresses[0] if addresses else None
- port = info.port
- if host and port:
- self.discovered_tables[name] = {
- "id": properties.get("id", ""),
- "name": properties.get("name", name.replace(f".{SERVICE_TYPE}", "")),
- "host": host,
- "port": port,
- "version": properties.get("version", "unknown"),
- "url": f"http://{host}:{port}"
- }
- logger.debug(f"Discovered table: {self.discovered_tables[name]}")
- except Exception as e:
- logger.warning(f"Error processing service info for {name}: {e}")
- class MDNSManager:
- """Manages mDNS advertisement and discovery for Dune Weaver."""
- def __init__(self):
- self._zeroconf: Optional[AsyncZeroconf] = None
- self._service_info: Optional[ServiceInfo] = None
- self._browser: Optional[AsyncServiceBrowser] = None
- self._listener: Optional[DuneWeaverServiceListener] = None
- self._advertised = False
- def _get_local_ip(self) -> str:
- """Get the local IP address of this machine."""
- try:
- # Create a socket to determine our IP
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- try:
- # Doesn't need to be reachable
- s.connect(("10.255.255.255", 1))
- ip = s.getsockname()[0]
- except Exception:
- ip = "127.0.0.1"
- finally:
- s.close()
- return ip
- except Exception:
- return "127.0.0.1"
- async def start_advertisement(self, table_id: str, table_name: str, version: str, port: int = 8080) -> bool:
- """
- Start advertising this table on the network.
- Args:
- table_id: Unique identifier for this table
- table_name: Human-readable name for this table
- version: Software version
- port: HTTP port the server is running on
- Returns:
- True if advertisement started successfully
- """
- try:
- if self._advertised:
- await self.stop_advertisement()
- local_ip = self._get_local_ip()
- hostname = socket.gethostname()
- # Create service info
- # Service name must be unique on the network
- service_name = f"{table_name.replace(' ', '_')}_{table_id[:8]}.{SERVICE_TYPE}"
- self._service_info = ServiceInfo(
- SERVICE_TYPE,
- service_name,
- addresses=[socket.inet_aton(local_ip)],
- port=port,
- properties={
- "id": table_id,
- "name": table_name,
- "version": version,
- "hostname": hostname
- },
- server=f"{hostname}.local."
- )
- # Start zeroconf and register service
- self._zeroconf = AsyncZeroconf()
- await self._zeroconf.async_register_service(self._service_info)
- self._advertised = True
- logger.info(f"mDNS: Advertising table '{table_name}' at {local_ip}:{port}")
- return True
- except Exception as e:
- logger.error(f"Failed to start mDNS advertisement: {e}")
- return False
- async def stop_advertisement(self) -> None:
- """Stop advertising this table."""
- try:
- if self._service_info and self._zeroconf:
- await self._zeroconf.async_unregister_service(self._service_info)
- if self._zeroconf:
- await self._zeroconf.async_close()
- self._advertised = False
- self._service_info = None
- self._zeroconf = None
- logger.info("mDNS: Stopped advertising")
- except Exception as e:
- logger.warning(f"Error stopping mDNS advertisement: {e}")
- async def discover_tables(self, timeout: float = 3.0) -> List[Dict]:
- """
- Discover Dune Weaver tables on the local network.
- Args:
- timeout: How long to wait for discovery (seconds)
- Returns:
- List of discovered tables with their info
- """
- discovered = []
- try:
- # Create an async zeroconf instance for discovery
- async_zc = AsyncZeroconf()
- listener = DuneWeaverServiceListener()
- # Start browsing for services using async browser
- browser = AsyncServiceBrowser(async_zc.zeroconf, SERVICE_TYPE, listener)
- # Wait for discovery
- await asyncio.sleep(timeout)
- # Collect results
- discovered = list(listener.discovered_tables.values())
- # Cleanup using async methods
- browser.cancel()
- await async_zc.async_close()
- logger.info(f"mDNS: Discovered {len(discovered)} table(s)")
- except Exception as e:
- logger.error(f"Error during mDNS discovery: {e}")
- return discovered
- async def update_advertisement(self, table_name: str) -> None:
- """Update the advertised table name."""
- if self._advertised and self._service_info:
- # Get current info
- from modules.core.state import state
- from modules.core.version_manager import version_manager
- # Restart advertisement with new name
- await self.stop_advertisement()
- await self.start_advertisement(
- table_id=state.table_id,
- table_name=table_name,
- version=await version_manager.get_current_version(),
- port=state.server_port or 8080
- )
- # Singleton instance
- mdns_manager = MDNSManager()
- async def start_mdns_advertisement():
- """Start mDNS advertisement using current state."""
- from modules.core.state import state
- from modules.core.version_manager import version_manager
- await mdns_manager.start_advertisement(
- table_id=state.table_id,
- table_name=state.table_name,
- version=await version_manager.get_current_version(),
- port=state.server_port or 8080
- )
- async def stop_mdns_advertisement():
- """Stop mDNS advertisement."""
- await mdns_manager.stop_advertisement()
- async def discover_tables(timeout: float = 3.0) -> List[Dict]:
- """Discover Dune Weaver tables on the network."""
- return await mdns_manager.discover_tables(timeout)
|