__init__.py 1.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. import esphome.codegen as cg
  2. import esphome.config_validation as cv
  3. from esphome.components import uart
  4. from esphome.const import CONF_ID, CONF_PORT, CONF_BUFFER_SIZE
  5. # ESPHome doesn't know the Stream abstraction yet, so hardcode to use a UART for now.
  6. AUTO_LOAD = ["socket"]
  7. DEPENDENCIES = ["uart", "network"]
  8. MULTI_CONF = True
  9. ns = cg.global_ns
  10. StreamServerComponent = ns.class_("StreamServerComponent", cg.Component)
  11. def validate_buffer_size(buffer_size):
  12. if buffer_size & (buffer_size - 1) != 0:
  13. raise cv.Invalid("Buffer size must be a power of two.")
  14. return buffer_size
  15. CONFIG_SCHEMA = cv.All(
  16. cv.require_esphome_version(2022, 3, 0),
  17. cv.Schema(
  18. {
  19. cv.GenerateID(): cv.declare_id(StreamServerComponent),
  20. cv.Optional(CONF_PORT, default=6638): cv.port,
  21. cv.Optional(CONF_BUFFER_SIZE, default=128): cv.All(
  22. cv.positive_int, validate_buffer_size
  23. ),
  24. }
  25. )
  26. .extend(cv.COMPONENT_SCHEMA)
  27. .extend(uart.UART_DEVICE_SCHEMA),
  28. )
  29. async def to_code(config):
  30. var = cg.new_Pvariable(config[CONF_ID])
  31. cg.add(var.set_port(config[CONF_PORT]))
  32. cg.add(var.set_buffer_size(config[CONF_BUFFER_SIZE]))
  33. await cg.register_component(var, config)
  34. await uart.register_uart_device(var, config)