reed_switch.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. """
  2. Reed switch monitoring module for Raspberry Pi GPIO.
  3. Used for angular homing to detect home position.
  4. """
  5. import logging
  6. import platform
  7. logger = logging.getLogger(__name__)
  8. class ReedSwitchMonitor:
  9. """Monitor a reed switch connected to a Raspberry Pi GPIO pin."""
  10. def __init__(self, gpio_pin=18):
  11. """
  12. Initialize the reed switch monitor.
  13. Args:
  14. gpio_pin: GPIO pin number (BCM numbering) for the reed switch
  15. """
  16. self.gpio_pin = gpio_pin
  17. self.gpio = None
  18. self.is_raspberry_pi = self._check_raspberry_pi()
  19. if self.is_raspberry_pi:
  20. try:
  21. import RPi.GPIO as GPIO
  22. self.gpio = GPIO
  23. # Set up GPIO mode (BCM numbering)
  24. self.gpio.setmode(GPIO.BCM)
  25. # Set up the pin as input with pull-up resistor
  26. # Reed switch should connect pin to ground when triggered
  27. self.gpio.setup(self.gpio_pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
  28. logger.info(f"Reed switch initialized on GPIO pin {self.gpio_pin}")
  29. except ImportError:
  30. logger.warning("RPi.GPIO not available. Reed switch monitoring disabled.")
  31. self.is_raspberry_pi = False
  32. except Exception as e:
  33. logger.error(f"Error initializing reed switch: {e}")
  34. self.is_raspberry_pi = False
  35. else:
  36. logger.info("Not running on Raspberry Pi. Reed switch monitoring disabled.")
  37. def _check_raspberry_pi(self):
  38. """Check if we're running on a Raspberry Pi."""
  39. try:
  40. # Check if we're on Linux first
  41. if platform.system() != 'Linux':
  42. return False
  43. # Check for Raspberry Pi specific identifiers
  44. with open('/proc/cpuinfo', 'r') as f:
  45. cpuinfo = f.read()
  46. if 'Raspberry Pi' in cpuinfo or 'BCM' in cpuinfo:
  47. return True
  48. # Alternative check using device tree
  49. try:
  50. with open('/proc/device-tree/model', 'r') as f:
  51. model = f.read()
  52. if 'Raspberry Pi' in model:
  53. return True
  54. except FileNotFoundError:
  55. pass
  56. return False
  57. except Exception as e:
  58. logger.debug(f"Error checking for Raspberry Pi: {e}")
  59. return False
  60. def is_triggered(self):
  61. """
  62. Check if the reed switch is currently triggered.
  63. Returns:
  64. bool: True if reed switch is triggered (pin is HIGH), False otherwise
  65. """
  66. if not self.is_raspberry_pi or not self.gpio:
  67. return False
  68. try:
  69. # Pin is HIGH (1) when reed switch is closed (triggered)
  70. # This assumes the switch connects the pin to 3.3V when closed
  71. return self.gpio.input(self.gpio_pin) == 1
  72. except Exception as e:
  73. logger.error(f"Error reading reed switch: {e}")
  74. return False
  75. def wait_for_trigger(self, timeout=None):
  76. """
  77. Wait for the reed switch to be triggered.
  78. Args:
  79. timeout: Maximum time to wait in seconds (None = wait indefinitely)
  80. Returns:
  81. bool: True if triggered, False if timeout occurred
  82. """
  83. if not self.is_raspberry_pi or not self.gpio:
  84. logger.warning("Reed switch not available, cannot wait for trigger")
  85. return False
  86. try:
  87. # Wait for rising edge (pin goes from LOW to HIGH)
  88. channel = self.gpio.wait_for_edge(
  89. self.gpio_pin,
  90. self.gpio.RISING,
  91. timeout=int(timeout * 1000) if timeout else None
  92. )
  93. return channel is not None
  94. except Exception as e:
  95. logger.error(f"Error waiting for reed switch trigger: {e}")
  96. return False
  97. def cleanup(self):
  98. """Clean up GPIO resources."""
  99. if self.is_raspberry_pi and self.gpio:
  100. try:
  101. self.gpio.cleanup(self.gpio_pin)
  102. logger.info(f"Reed switch GPIO pin {self.gpio_pin} cleaned up")
  103. except Exception as e:
  104. logger.error(f"Error cleaning up reed switch GPIO: {e}")
  105. def __del__(self):
  106. """Destructor to ensure GPIO cleanup."""
  107. self.cleanup()