test_websocket.py 1.1 KB

1234567891011121314151617181920212223242526272829303132333435
  1. import socketio
  2. # Create a Socket.IO client instance
  3. sio = socketio.Client()
  4. # Event handler for a successful connection on the /ws/status namespace
  5. @sio.event(namespace='/ws/status')
  6. def connect():
  7. print("Connected to /ws/status namespace")
  8. # Event handler for connection errors on the /ws/status namespace
  9. @sio.event(namespace='/ws/status')
  10. def connect_error(data):
  11. print("Connection failed:", data)
  12. # Event handler for disconnection from the /ws/status namespace
  13. @sio.event(namespace='/ws/status')
  14. def disconnect():
  15. print("Disconnected from /ws/status namespace")
  16. # Listen for 'status_update' events in the /ws/status namespace
  17. @sio.on('status_update', namespace='/ws/status')
  18. def on_status_update(data):
  19. print("Received status update:", data)
  20. if __name__ == "__main__":
  21. # Replace with your server address if different
  22. server_url = "http://192.168.2.131:8080"
  23. try:
  24. sio.connect(server_url, namespaces=['/ws/status'])
  25. print("Waiting for status updates...")
  26. # Keep the client running to listen for events
  27. sio.wait()
  28. except Exception as e:
  29. print("An error occurred:", e)