pytest_mdns.py 8.5 KB


  1. # SPDX-FileCopyrightText: 2022-2023 Espressif Systems (Shanghai) CO LTD
  2. # SPDX-License-Identifier: Unlicense OR CC0-1.0
  3. import re
  4. import select
  5. import socket
  6. import struct
  7. import subprocess
  8. import time
  9. from threading import Event, Thread
  10. try:
  11. import dpkt
  12. import dpkt.dns
  13. except ImportError:
  14. pass
  15. def get_dns_query_for_esp(esp_host):
  16. dns = dpkt.dns.DNS(
  17. b'\x00\x00\x01\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x01'
  18. )
  19. dns.qd[0].name = esp_host + u'.local'
  20. print('Created query for esp host: {} '.format(dns.__repr__()))
  21. return dns.pack()
  22. def get_dns_answer_to_mdns(tester_host):
  23. dns = dpkt.dns.DNS(
  24. b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'
  25. )
  26. dns.op = dpkt.dns.DNS_QR | dpkt.dns.DNS_AA
  27. dns.rcode = dpkt.dns.DNS_RCODE_NOERR
  28. arr = dpkt.dns.DNS.RR()
  29. arr.cls = dpkt.dns.DNS_IN
  30. arr.type = dpkt.dns.DNS_A
  31. arr.name = tester_host
  32. arr.ip = socket.inet_aton('127.0.0.1')
  33. dns.an.append(arr)
  34. print('Created answer to mdns query: {} '.format(dns.__repr__()))
  35. return dns.pack()
  36. def get_dns_answer_to_mdns_lwip(tester_host, id):
  37. dns = dpkt.dns.DNS(
  38. b'\x5e\x39\x84\x00\x00\x01\x00\x01\x00\x00\x00\x00\x0a\x64\x61\x76\x69\x64'
  39. b'\x2d\x63\x6f\x6d\x70\x05\x6c\x6f\x63\x61\x6c\x00\x00\x01\x00\x01\xc0\x0c'
  40. b'\x00\x01\x00\x01\x00\x00\x00\x0a\x00\x04\xc0\xa8\x0a\x6c')
  41. dns.qd[0].name = tester_host
  42. dns.an[0].name = tester_host
  43. dns.an[0].ip = socket.inet_aton('127.0.0.1')
  44. dns.an[0].rdata = socket.inet_aton('127.0.0.1')
  45. dns.id = id
  46. print('Created answer to mdns (lwip) query: {} '.format(dns.__repr__()))
  47. return dns.pack()
  48. def mdns_server(esp_host, events):
  49. UDP_IP = '0.0.0.0'
  50. UDP_PORT = 5353
  51. MCAST_GRP = '224.0.0.251'
  52. TESTER_NAME = u'tinytester.local'
  53. TESTER_NAME_LWIP = u'tinytester-lwip.local'
  54. QUERY_TIMEOUT = 0.2
  55. sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  56. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  57. sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
  58. sock.setblocking(False)
  59. sock.bind((UDP_IP, UDP_PORT))
  60. mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY)
  61. sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
  62. last_query_timepoint = time.time()
  63. while not events['stop'].is_set():
  64. try:
  65. current_time = time.time()
  66. if current_time - last_query_timepoint > QUERY_TIMEOUT:
  67. last_query_timepoint = current_time
  68. if not events['esp_answered'].is_set():
  69. sock.sendto(get_dns_query_for_esp(esp_host),
  70. (MCAST_GRP, UDP_PORT))
  71. if not events['esp_delegated_answered'].is_set():
  72. sock.sendto(get_dns_query_for_esp(esp_host + '-delegated'),
  73. (MCAST_GRP, UDP_PORT))
  74. timeout = max(
  75. 0, QUERY_TIMEOUT - (current_time - last_query_timepoint))
  76. read_socks, _, _ = select.select([sock], [], [], timeout)
  77. if not read_socks:
  78. continue
  79. data, addr = sock.recvfrom(1024)
  80. dns = dpkt.dns.DNS(data)
  81. if len(dns.qd) > 0:
  82. for dns_query in dns.qd:
  83. if dns_query.type == dpkt.dns.DNS_A:
  84. if dns_query.name == TESTER_NAME:
  85. print('Received query: {} '.format(dns.__repr__()))
  86. sock.sendto(get_dns_answer_to_mdns(TESTER_NAME),
  87. (MCAST_GRP, UDP_PORT))
  88. elif dns_query.name == TESTER_NAME_LWIP:
  89. print('Received query: {} '.format(dns.__repr__()))
  90. sock.sendto(
  91. get_dns_answer_to_mdns_lwip(TESTER_NAME_LWIP, dns.id),
  92. addr)
  93. if len(dns.an) > 0:
  94. for dns_answer in dns.an:
  95. if dns_answer.type == dpkt.dns.DNS_A:
  96. print('Received answer from {}'.format(dns_answer.name))
  97. if dns_answer.name == esp_host + u'.local':
  98. print('Received answer to esp32-mdns query: {}'.format(
  99. dns.__repr__()))
  100. events['esp_answered'].set()
  101. if dns_answer.name == esp_host + u'-delegated.local':
  102. print('Received answer to esp32-mdns-delegate query: {}'.format(
  103. dns.__repr__()))
  104. events['esp_delegated_answered'].set()
  105. except socket.timeout:
  106. break
  107. except dpkt.UnpackError:
  108. continue
  109. def test_examples_protocol_mdns(dut):
  110. """
  111. steps: |
  112. 1. obtain IP address + init mdns example
  113. 2. get the dut host name (and IP address)
  114. 3. check the mdns name is accessible
  115. 4. check DUT output if mdns advertized host is resolved
  116. 5. check if DUT responds to dig
  117. 6. check the DUT is searchable via reverse IP lookup
  118. """
  119. specific_host = dut.expect(r'mdns hostname set to: \[(.*?)\]')[1].decode()
  120. mdns_server_events = {
  121. 'stop': Event(),
  122. 'esp_answered': Event(),
  123. 'esp_delegated_answered': Event()
  124. }
  125. mdns_responder = Thread(target=mdns_server,
  126. args=(str(specific_host), mdns_server_events))
  127. ip_addresses = []
  128. if dut.app.sdkconfig.get('LWIP_IPV4') is True:
  129. ipv4 = dut.expect(r'IPv4 address: (\d+\.\d+\.\d+\.\d+)[^\d]',
  130. timeout=30)[1].decode()
  131. ip_addresses.append(ipv4)
  132. if dut.app.sdkconfig.get('LWIP_IPV6') is True:
  133. ipv6_r = r':'.join((r'[0-9a-fA-F]{4}', ) * 8)
  134. ipv6 = dut.expect(ipv6_r, timeout=30)[0].decode()
  135. ip_addresses.append(ipv6)
  136. print('Connected with IP addresses: {}'.format(','.join(ip_addresses)))
  137. try:
  138. # TODO: Add test for example disabling IPV4
  139. mdns_responder.start()
  140. if dut.app.sdkconfig.get('LWIP_IPV4') is True:
  141. # 3. check the mdns name is accessible.
  142. if not mdns_server_events['esp_answered'].wait(timeout=30):
  143. raise ValueError(
  144. 'Test has failed: did not receive mdns answer within timeout')
  145. if not mdns_server_events['esp_delegated_answered'].wait(timeout=30):
  146. raise ValueError(
  147. 'Test has failed: did not receive mdns answer for delegated host within timeout'
  148. )
  149. # 4. check DUT output if mdns advertized host is resolved
  150. dut.expect(
  151. re.compile(
  152. b'mdns-test: Query A: tinytester.local resolved to: 127.0.0.1')
  153. )
  154. dut.expect(
  155. re.compile(
  156. b'mdns-test: gethostbyname: tinytester-lwip.local resolved to: 127.0.0.1'
  157. ))
  158. dut.expect(
  159. re.compile(
  160. b'mdns-test: getaddrinfo: tinytester-lwip.local resolved to: 127.0.0.1'
  161. ))
  162. # 5. check the DUT answers to `dig` command
  163. dig_output = subprocess.check_output([
  164. 'dig', '+short', '-p', '5353', '@224.0.0.251',
  165. '{}.local'.format(specific_host)
  166. ])
  167. print('Resolving {} using "dig" succeeded with:\n{}'.format(
  168. specific_host, dig_output))
  169. if not ipv4.encode('utf-8') in dig_output:
  170. raise ValueError(
  171. 'Test has failed: Incorrectly resolved DUT hostname using dig'
  172. "Output should've contained DUT's IP address:{}".format(ipv4))
  173. # 6. check the DUT reverse lookup
  174. if dut.app.sdkconfig.get('MDNS_RESPOND_REVERSE_QUERIES') is True:
  175. for ip_address in ip_addresses:
  176. dig_output = subprocess.check_output([
  177. 'dig', '+short', '-p', '5353', '@224.0.0.251', '-x',
  178. '{}'.format(ip_address)
  179. ])
  180. print('Reverse lookup for {} using "dig" succeeded with:\n{}'.
  181. format(ip_address, dig_output))
  182. if specific_host not in dig_output.decode():
  183. raise ValueError(
  184. 'Test has failed: Incorrectly resolved DUT IP address using dig'
  185. "Output should've contained DUT's name:{}".format(
  186. specific_host))
  187. finally:
  188. mdns_server_events['stop'].set()
  189. mdns_responder.join()