| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485 |
- from flask import Flask, request, jsonify, render_template, send_from_directory
- import atexit
- import os
- import logging
- from datetime import datetime
- from .modules.serial import serial_manager
- from dune_weaver_flask.modules.core import pattern_manager
- from dune_weaver_flask.modules.core import playlist_manager
- from .modules.firmware import firmware_manager
- # Configure logging
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
- handlers=[
- logging.StreamHandler(),
- # disable file logging for now, to not gobble up resources
- # logging.FileHandler('dune_weaver.log')
- ]
- )
- logger = logging.getLogger(__name__)
- app = Flask(__name__)
- # Flask API Endpoints
- @app.route('/')
- def index():
- return render_template('index.html')
- @app.route('/list_serial_ports', methods=['GET'])
- def list_ports():
- logger.debug("Listing available serial ports")
- return jsonify(serial_manager.list_serial_ports())
- @app.route('/connect_serial', methods=['POST'])
- def connect_serial():
- port = request.json.get('port')
- if not port:
- logger.warning('Serial connection attempt without port specified')
- return jsonify({'error': 'No port provided'}), 400
- try:
- serial_manager.connect_to_serial(port)
- logger.info(f'Successfully connected to serial port {port}')
- return jsonify({'success': True})
- except Exception as e:
- logger.error(f'Failed to connect to serial port {port}: {str(e)}')
- return jsonify({'error': str(e)}), 500
- @app.route('/disconnect_serial', methods=['POST'])
- def disconnect():
- try:
- serial_manager.disconnect_serial()
- logger.info('Successfully disconnected from serial port')
- return jsonify({'success': True})
- except Exception as e:
- logger.error(f'Failed to disconnect serial: {str(e)}')
- return jsonify({'error': str(e)}), 500
- @app.route('/restart_serial', methods=['POST'])
- def restart():
- port = request.json.get('port')
- if not port:
- logger.warning("Restart serial request received without port")
- return jsonify({'error': 'No port provided'}), 400
- try:
- logger.info(f"Restarting serial connection on port {port}")
- serial_manager.restart_serial(port)
- return jsonify({'success': True})
- except Exception as e:
- logger.error(f"Failed to restart serial on port {port}: {str(e)}")
- return jsonify({'error': str(e)}), 500
- @app.route('/list_theta_rho_files', methods=['GET'])
- def list_theta_rho_files():
- logger.debug("Listing theta-rho files")
- files = pattern_manager.list_theta_rho_files()
- return jsonify(sorted(files))
- @app.route('/upload_theta_rho', methods=['POST'])
- def upload_theta_rho():
- custom_patterns_dir = os.path.join(pattern_manager.THETA_RHO_DIR, 'custom_patterns')
- os.makedirs(custom_patterns_dir, exist_ok=True)
- logger.debug(f'Ensuring custom patterns directory exists: {custom_patterns_dir}')
- file = request.files['file']
- if file:
- file_path = os.path.join(custom_patterns_dir, file.filename)
- file.save(file_path)
- logger.info(f'Successfully uploaded theta-rho file: {file.filename}')
- return jsonify({'success': True})
-
- logger.warning('Upload theta-rho request received without file')
- return jsonify({'success': False})
- @app.route('/run_theta_rho', methods=['POST'])
- def run_theta_rho():
- file_name = request.json.get('file_name')
- pre_execution = request.json.get('pre_execution')
- if not file_name:
- logger.warning('Run theta-rho request received without file name')
- return jsonify({'error': 'No file name provided'}), 400
- file_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- logger.error(f'Theta-rho file not found: {file_path}')
- return jsonify({'error': 'File not found'}), 404
- try:
- files_to_run = [file_path]
- logger.info(f'Running theta-rho file: {file_name} with pre_execution={pre_execution}')
- pattern_manager.run_theta_rho_files(files_to_run, clear_pattern=pre_execution)
- return jsonify({'success': True})
- except Exception as e:
- logger.error(f'Failed to run theta-rho file {file_name}: {str(e)}')
- return jsonify({'error': str(e)}), 500
- @app.route('/stop_execution', methods=['POST'])
- def stop_execution():
- pattern_manager.stop_actions()
- return jsonify({'success': True})
- @app.route('/send_home', methods=['POST'])
- def send_home():
- try:
- serial_manager.send_command("HOME", ack="HOMED")
- return jsonify({'success': True})
- except Exception as e:
- logger.error(f"Failed to send home command: {str(e)}")
- return jsonify({'error': str(e)}), 500
- @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
- def run_specific_theta_rho_file(file_name):
- file_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- return jsonify({'error': 'File not found'}), 404
- pattern_manager.run_theta_rho_file(file_path)
- return jsonify({'success': True})
- @app.route('/delete_theta_rho_file', methods=['POST'])
- def delete_theta_rho_file():
- file_name = request.json.get('file_name')
- if not file_name:
- logger.warning("Delete theta-rho file request received without filename")
- return jsonify({"success": False, "error": "No file name provided"}), 400
- file_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- logger.error(f"Attempted to delete non-existent file: {file_path}")
- return jsonify({"success": False, "error": "File not found"}), 404
- try:
- os.remove(file_path)
- logger.info(f"Successfully deleted theta-rho file: {file_name}")
- return jsonify({"success": True})
- except Exception as e:
- logger.error(f"Failed to delete theta-rho file {file_name}: {str(e)}")
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/move_to_center', methods=['POST'])
- def move_to_center():
- try:
- if not serial_manager.is_connected():
- logger.warning("Attempted to move to center without serial connection")
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
- logger.info("Moving device to center position")
- coordinates = [(0, 0)]
- serial_manager.send_coordinate_batch(coordinates)
- return jsonify({"success": True})
- except Exception as e:
- logger.error(f"Failed to move to center: {str(e)}")
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/move_to_perimeter', methods=['POST'])
- def move_to_perimeter():
- try:
- if not serial_manager.is_connected():
- logger.warning("Attempted to move to perimeter without serial connection")
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
- MAX_RHO = 1
- coordinates = [(0, MAX_RHO)]
- serial_manager.send_coordinate_batch(coordinates)
- return jsonify({"success": True})
- except Exception as e:
- logger.error(f"Failed to move to perimeter: {str(e)}")
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/preview_thr', methods=['POST'])
- def preview_thr():
- file_name = request.json.get('file_name')
- if not file_name:
- logger.warning("Preview theta-rho request received without filename")
- return jsonify({'error': 'No file name provided'}), 400
- file_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- logger.error(f"Attempted to preview non-existent file: {file_path}")
- return jsonify({'error': 'File not found'}), 404
- try:
- coordinates = pattern_manager.parse_theta_rho_file(file_path)
- return jsonify({'success': True, 'coordinates': coordinates})
- except Exception as e:
- logger.error(f"Failed to generate preview for {file_name}: {str(e)}")
- return jsonify({'error': str(e)}), 500
- @app.route('/send_coordinate', methods=['POST'])
- def send_coordinate():
- if not serial_manager.is_connected():
- logger.warning("Attempted to send coordinate without serial connection")
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
- try:
- data = request.json
- theta = data.get('theta')
- rho = data.get('rho')
- if theta is None or rho is None:
- logger.warning("Send coordinate request missing theta or rho values")
- return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
- logger.debug(f"Sending coordinate: theta={theta}, rho={rho}")
- serial_manager.send_coordinate_batch([(theta, rho)])
- return jsonify({"success": True})
- except Exception as e:
- logger.error(f"Failed to send coordinate: {str(e)}")
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/download/<filename>', methods=['GET'])
- def download_file(filename):
- return send_from_directory(pattern_manager.THETA_RHO_DIR, filename)
- @app.route('/serial_status', methods=['GET'])
- def serial_status():
- connected = serial_manager.is_connected()
- port = serial_manager.get_port()
- logger.debug(f"Serial status check - connected: {connected}, port: {port}")
- return jsonify({
- 'connected': connected,
- 'port': port
- })
- @app.route('/pause_execution', methods=['POST'])
- def pause_execution():
- logger.info("Pausing pattern execution")
- pattern_manager.pause_requested = True
- return jsonify({'success': True, 'message': 'Execution paused'})
- @app.route('/status', methods=['GET'])
- def get_status():
- return jsonify(pattern_manager.get_status())
- @app.route('/resume_execution', methods=['POST'])
- def resume_execution():
- logger.info("Resuming pattern execution")
- with pattern_manager.pause_condition:
- pattern_manager.pause_requested = False
- pattern_manager.pause_condition.notify_all()
- return jsonify({'success': True, 'message': 'Execution resumed'})
- # Playlist endpoints
- @app.route("/list_all_playlists", methods=["GET"])
- def list_all_playlists():
- playlist_names = playlist_manager.list_all_playlists()
- return jsonify(playlist_names)
- @app.route("/get_playlist", methods=["GET"])
- def get_playlist():
- playlist_name = request.args.get("name", "")
- if not playlist_name:
- return jsonify({"error": "Missing playlist 'name' parameter"}), 400
- playlist = playlist_manager.get_playlist(playlist_name)
- if not playlist:
- return jsonify({"error": f"Playlist '{playlist_name}' not found"}), 404
- return jsonify(playlist)
- @app.route("/create_playlist", methods=["POST"])
- def create_playlist():
- data = request.get_json()
- if not data or "name" not in data or "files" not in data:
- return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
- success = playlist_manager.create_playlist(data["name"], data["files"])
- return jsonify({
- "success": success,
- "message": f"Playlist '{data['name']}' created/updated"
- })
- @app.route("/modify_playlist", methods=["POST"])
- def modify_playlist():
- data = request.get_json()
- if not data or "name" not in data or "files" not in data:
- return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
- success = playlist_manager.modify_playlist(data["name"], data["files"])
- return jsonify({"success": success, "message": f"Playlist '{data['name']}' updated"})
- @app.route("/delete_playlist", methods=["DELETE"])
- def delete_playlist():
- data = request.get_json()
- if not data or "name" not in data:
- return jsonify({"success": False, "error": "Missing 'name' field"}), 400
- success = playlist_manager.delete_playlist(data["name"])
- if not success:
- return jsonify({"success": False, "error": f"Playlist '{data['name']}' not found"}), 404
- return jsonify({
- "success": True,
- "message": f"Playlist '{data['name']}' deleted"
- })
- @app.route('/add_to_playlist', methods=['POST'])
- def add_to_playlist():
- data = request.json
- playlist_name = data.get('playlist_name')
- pattern = data.get('pattern')
- success = playlist_manager.add_to_playlist(playlist_name, pattern)
- if not success:
- return jsonify(success=False, error='Playlist not found'), 404
- return jsonify(success=True)
- @app.route("/run_playlist", methods=["POST"])
- def run_playlist():
- data = request.get_json()
- if not data or "playlist_name" not in data:
- logger.warning("Run playlist request received without playlist name")
- return jsonify({"success": False, "error": "Missing 'playlist_name' field"}), 400
- playlist_name = data["playlist_name"]
- pause_time = data.get("pause_time", 0)
- clear_pattern = data.get("clear_pattern", None)
- run_mode = data.get("run_mode", "single")
- shuffle = data.get("shuffle", False)
-
- schedule_hours = None
- start_time = data.get("start_time")
- end_time = data.get("end_time")
-
- if start_time and end_time:
- try:
- start_time_obj = datetime.strptime(start_time, "%H:%M").time()
- end_time_obj = datetime.strptime(end_time, "%H:%M").time()
- if start_time_obj >= end_time_obj:
- logger.error(f"Invalid schedule times: start_time {start_time} >= end_time {end_time}")
- return jsonify({"success": False, "error": "'start_time' must be earlier than 'end_time'"}), 400
- schedule_hours = (start_time_obj, end_time_obj)
- logger.info(f"Playlist {playlist_name} scheduled to run between {start_time} and {end_time}")
- except ValueError:
- logger.error(f"Invalid time format provided: start_time={start_time}, end_time={end_time}")
- return jsonify({"success": False, "error": "Invalid time format. Use HH:MM (e.g., '09:30')"}), 400
- logger.info(f"Starting playlist '{playlist_name}' with mode={run_mode}, shuffle={shuffle}")
- success, message = playlist_manager.run_playlist(
- playlist_name,
- pause_time=pause_time,
- clear_pattern=clear_pattern,
- run_mode=run_mode,
- shuffle=shuffle,
- schedule_hours=schedule_hours
- )
- if not success:
- logger.error(f"Failed to run playlist '{playlist_name}': {message}")
- return jsonify({"success": False, "error": message}), 500
-
- return jsonify({"success": True, "message": message})
- # Firmware endpoints
- @app.route('/set_speed', methods=['POST'])
- def set_speed():
- if not serial_manager.is_connected():
- logger.warning("Attempted to set speed without serial connection")
- return jsonify({"success": False, "error": "Serial connection not established"}), 400
- try:
- data = request.json
- speed = data.get('speed')
- if speed is None:
- logger.warning("Set speed request received without speed value")
- return jsonify({"success": False, "error": "Speed is required"}), 400
- if not isinstance(speed, (int, float)) or speed <= 0:
- logger.warning(f"Invalid speed value received: {speed}")
- return jsonify({"success": False, "error": "Invalid speed value"}), 400
- serial_manager.send_command(f"SET_SPEED {speed}", ack="SPEED_SET")
- return jsonify({"success": True, "speed": speed})
- except Exception as e:
- logger.error(f"Failed to set speed: {str(e)}")
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/get_firmware_info', methods=['GET', 'POST'])
- def get_firmware_info():
- if not serial_manager.is_connected():
- logger.warning("Attempted to get firmware info without serial connection")
- return jsonify({"success": False, "error": "Arduino not connected or serial port not open"}), 400
- try:
- if request.method == "POST":
- motor_type = request.json.get("motorType", None)
- success, result = firmware_manager.get_firmware_info(motor_type)
- else:
- success, result = firmware_manager.get_firmware_info()
- if not success:
- logger.error(f"Failed to get firmware info: {result}")
- return jsonify({"success": False, "error": result}), 500
- return jsonify({"success": True, **result})
- except Exception as e:
- logger.error(f"Unexpected error while getting firmware info: {str(e)}")
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/flash_firmware', methods=['POST'])
- def flash_firmware():
- try:
- motor_type = request.json.get("motorType", None)
- logger.info(f"Starting firmware flash for motor type: {motor_type}")
- success, message = firmware_manager.flash_firmware(motor_type)
-
- if not success:
- logger.error(f"Firmware flash failed: {message}")
- return jsonify({"success": False, "error": message}), 500
-
- logger.info("Firmware flash completed successfully")
- return jsonify({"success": True, "message": message})
- except Exception as e:
- logger.critical(f"Unexpected error during firmware flash: {str(e)}")
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/check_software_update', methods=['GET'])
- def check_updates():
- update_info = firmware_manager.check_git_updates()
- return jsonify(update_info)
- @app.route('/update_software', methods=['POST'])
- def update_software():
- logger.info("Starting software update process")
- success, error_message, error_log = firmware_manager.update_software()
-
- if success:
- logger.info("Software update completed successfully")
- return jsonify({"success": True})
- else:
- logger.error(f"Software update failed: {error_message}\nDetails: {error_log}")
- return jsonify({
- "success": False,
- "error": error_message,
- "details": error_log
- }), 500
- def on_exit():
- """Function to execute on application shutdown."""
- pattern_manager.stop_actions()
- # Register the on_exit function
- atexit.register(on_exit)
- def entrypoint():
- logger.info("Starting Dune Weaver application...")
- # Auto-connect to serial
- try:
- serial_manager.connect_to_serial()
- except Exception as e:
- logger.warning(f"Failed to auto-connect to serial port: {str(e)}")
- try:
- logger.info("Starting Flask server on port 8080...")
- app.run(debug=False, host='0.0.0.0', port=8080)
- except KeyboardInterrupt:
- logger.info("Keyboard interrupt received. Shutting down.")
- except Exception as e:
- logger.critical(f"Unexpected error during server startup: {str(e)}")
- finally:
- on_exit()
|