| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447 |
- from flask import Flask, request, jsonify, render_template, send_from_directory
- import os
- import threading
- from datetime import datetime
- import logging
- from modules.serial.serial_manager import (
- list_serial_ports, connect_to_serial, disconnect_serial,
- restart_serial, get_serial_status, get_device_info,
- send_coordinate_batch, send_command
- )
- from modules.firmware.firmware_manager import (
- get_firmware_info, flash_firmware, check_git_updates,
- update_software
- )
- from modules.core.pattern_manager import (
- THETA_RHO_DIR, parse_theta_rho_file, run_theta_rho_file,
- run_theta_rho_files, get_execution_status, stop_execution,
- pause_execution, resume_execution
- )
- from modules.core.playlist_manager import (
- list_all_playlists, get_playlist, create_playlist,
- modify_playlist, delete_playlist, add_to_playlist
- )
- app = Flask(__name__)
- logging.basicConfig(level=logging.INFO)
- # Ensure the patterns directory exists
- os.makedirs(THETA_RHO_DIR, exist_ok=True)
- # API Routes
- @app.route('/')
- def index():
- return render_template('index.html')
- # Serial Routes
- @app.route('/list_serial_ports', methods=['GET'])
- def api_list_ports():
- return jsonify(list_serial_ports())
- @app.route('/connect_serial', methods=['POST'])
- def api_connect_serial():
- port = request.json.get('port')
- if not port:
- app.logger.error("No port provided in connect_serial request")
- return jsonify({'error': 'No port provided'}), 400
- try:
- success = connect_to_serial(port)
- return jsonify({'success': success})
- except Exception as e:
- app.logger.error(f"Error connecting to serial port: {str(e)}", exc_info=True)
- return jsonify({'error': str(e)}), 500
- @app.route('/disconnect_serial', methods=['POST'])
- def api_disconnect():
- try:
- disconnect_serial()
- return jsonify({'success': True})
- except Exception as e:
- app.logger.error(f"Error disconnecting serial port: {str(e)}", exc_info=True)
- return jsonify({'error': str(e)}), 500
- @app.route('/restart_serial', methods=['POST'])
- def api_restart():
- port = request.json.get('port')
- if not port:
- app.logger.error("No port provided in restart_serial request")
- return jsonify({'error': 'No port provided'}), 400
- try:
- success = restart_serial(port)
- return jsonify({'success': success})
- except Exception as e:
- app.logger.error(f"Error restarting serial port: {str(e)}", exc_info=True)
- return jsonify({'error': str(e)}), 500
- @app.route('/serial_status', methods=['GET'])
- def api_serial_status():
- return jsonify(get_serial_status())
- # Pattern Routes
- @app.route('/list_theta_rho_files', methods=['GET'])
- def api_list_theta_rho_files():
- files = []
- for root, _, filenames in os.walk(THETA_RHO_DIR):
- for file in filenames:
- relative_path = os.path.relpath(os.path.join(root, file), THETA_RHO_DIR)
- files.append(relative_path)
- return jsonify(sorted(files))
- @app.route('/upload_theta_rho', methods=['POST'])
- def api_upload_theta_rho():
- custom_patterns_dir = os.path.join(THETA_RHO_DIR, 'custom_patterns')
- os.makedirs(custom_patterns_dir, exist_ok=True)
- file = request.files['file']
- if file:
- file.save(os.path.join(custom_patterns_dir, file.filename))
- return jsonify({'success': True})
- return jsonify({'success': False})
- @app.route('/run_theta_rho', methods=['POST'])
- def api_run_theta_rho():
- file_name = request.json.get('file_name')
- pre_execution = request.json.get('pre_execution')
- if not file_name:
- app.logger.error("No file name provided in run_theta_rho request")
- return jsonify({'error': 'No file name provided'}), 400
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- app.logger.error(f"File not found: {file_path}")
- return jsonify({'error': 'File not found'}), 404
- try:
- files_to_run = []
- if pre_execution in ['clear_in', 'clear_out', 'clear_sideway']:
- files_to_run.append(f'./patterns/clear_from_{pre_execution.split("_")[1]}.thr')
- files_to_run.append(file_path)
- threading.Thread(
- target=run_theta_rho_files,
- args=(files_to_run,),
- kwargs={'pause_time': 0, 'clear_pattern': None}
- ).start()
- return jsonify({'success': True})
- except Exception as e:
- app.logger.error(f"Error running theta rho file: {str(e)}", exc_info=True)
- return jsonify({'error': str(e)}), 500
- @app.route('/run_theta_rho_file/<file_name>', methods=['POST'])
- def api_run_specific_theta_rho_file(file_name):
- """Run a specific theta-rho file."""
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- return jsonify({'error': 'File not found'}), 404
- threading.Thread(target=run_theta_rho_file, args=(file_path,)).start()
- return jsonify({'success': True})
- @app.route('/preview_thr', methods=['POST'])
- def api_preview_thr():
- file_name = request.json.get('file_name')
- if not file_name:
- app.logger.error("No file name provided in preview_thr request")
- return jsonify({'error': 'No file name provided'}), 400
-
- # sometimes the frontend sends the complete path, not just the file name
- if file_name.startswith("./patterns"):
- file_name = file_name.split('/')[-1].split('\\')[-1]
-
- file_path = os.path.join(THETA_RHO_DIR, file_name)
-
- if not os.path.exists(file_path):
- app.logger.error(f"File not found: {file_path}")
- return jsonify({'error': 'File not found'}), 404
- try:
- coordinates = parse_theta_rho_file(file_path)
- return jsonify({'success': True, 'coordinates': coordinates})
- except Exception as e:
- app.logger.error(f"Error parsing theta rho file: {str(e)}", exc_info=True)
- return jsonify({'error': str(e)}), 500
- @app.route('/send_coordinate', methods=['POST'])
- def api_send_coordinate():
- try:
- data = request.json
- theta = data.get('theta')
- rho = data.get('rho')
- if theta is None or rho is None:
- return jsonify({"success": False, "error": "Theta and Rho are required"}), 400
- send_coordinate_batch([(theta, rho)])
- return jsonify({"success": True})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/send_home', methods=['POST'])
- def api_send_home():
- """Send the HOME command to the Arduino."""
- try:
- send_command("HOME")
- return jsonify({'success': True})
- except Exception as e:
- return jsonify({'error': str(e)}), 500
- @app.route('/move_to_center', methods=['POST'])
- def api_move_to_center():
- """Move the sand table to the center position."""
- try:
- coordinates = [(0, 0)] # Center position
- send_coordinate_batch(coordinates)
- return jsonify({"success": True})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/move_to_perimeter', methods=['POST'])
- def api_move_to_perimeter():
- """Move the sand table to the perimeter position."""
- try:
- MAX_RHO = 1
- coordinates = [(0, MAX_RHO)] # Perimeter position
- send_coordinate_batch(coordinates)
- return jsonify({"success": True})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- @app.route('/set_speed', methods=['POST'])
- def api_set_speed():
- """Set the speed for the Arduino."""
- try:
- data = request.json
- speed = data.get('speed')
- if speed is None:
- return jsonify({"success": False, "error": "Speed is required"}), 400
- if not isinstance(speed, (int, float)) or speed <= 0:
- return jsonify({"success": False, "error": "Invalid speed value"}), 400
- # Send the SET_SPEED command to the Arduino
- send_command(f"SET_SPEED {speed}")
- return jsonify({"success": True, "speed": speed})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- # Playlist Routes
- @app.route("/list_all_playlists", methods=["GET"])
- def api_list_all_playlists():
- playlist_names = list_all_playlists()
- return jsonify(playlist_names)
- @app.route("/get_playlist", methods=["GET"])
- def api_get_playlist():
- playlist_name = request.args.get("name", "")
- if not playlist_name:
- return jsonify({"error": "Missing playlist 'name' parameter"}), 400
- playlist = get_playlist(playlist_name)
- if not playlist:
- return jsonify({"error": f"Playlist '{playlist_name}' not found"}), 404
- return jsonify(playlist)
- @app.route("/create_playlist", methods=["POST"])
- def api_create_playlist():
- data = request.get_json()
- if not data or "name" not in data or "files" not in data:
- app.logger.error("Missing required fields in create_playlist request")
- return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
- success = create_playlist(data["name"], data["files"])
- return jsonify({
- "success": success,
- "message": f"Playlist '{data['name']}' created/updated"
- })
- @app.route("/modify_playlist", methods=["POST"])
- def api_modify_playlist():
- data = request.get_json()
- if not data or "name" not in data or "files" not in data:
- return jsonify({"success": False, "error": "Playlist 'name' and 'files' are required"}), 400
- success = modify_playlist(data["name"], data["files"])
- return jsonify({
- "success": success,
- "message": f"Playlist '{data['name']}' updated"
- })
- @app.route("/delete_playlist", methods=["DELETE"])
- def api_delete_playlist():
- data = request.get_json()
- if not data or "name" not in data:
- return jsonify({"success": False, "error": "Missing 'name' field"}), 400
- success = delete_playlist(data["name"])
- if not success:
- return jsonify({"success": False, "error": f"Playlist '{data['name']}' not found"}), 404
- return jsonify({
- "success": True,
- "message": f"Playlist '{data['name']}' deleted"
- })
- @app.route('/add_to_playlist', methods=['POST'])
- def api_add_to_playlist():
- data = request.json
- playlist_name = data.get('playlist_name')
- pattern = data.get('pattern')
- success = add_to_playlist(playlist_name, pattern)
- if success:
- return jsonify(success=True)
- else:
- return jsonify(success=False, error='Playlist not found'), 404
- @app.route("/run_playlist", methods=["POST"])
- def api_run_playlist():
- data = request.get_json()
- if not data or "playlist_name" not in data:
- return jsonify({"success": False, "error": "Missing 'playlist_name' field"}), 400
- playlist = get_playlist(data["playlist_name"])
- if not playlist:
- return jsonify({"success": False, "error": f"Playlist '{data['playlist_name']}' not found"}), 404
- schedule_hours = None
- start_time = data.get("start_time")
- end_time = data.get("end_time")
- if start_time and end_time:
- try:
- start_time_obj = datetime.strptime(start_time, "%H:%M").time()
- end_time_obj = datetime.strptime(end_time, "%H:%M").time()
- if start_time_obj >= end_time_obj:
- return jsonify({"success": False, "error": "'start_time' must be earlier than 'end_time'"}), 400
- schedule_hours = (start_time_obj, end_time_obj)
- except ValueError:
- return jsonify({"success": False, "error": "Invalid time format. Use HH:MM (e.g., '09:30')"}), 400
- file_paths = [os.path.join(THETA_RHO_DIR, file) for file in playlist["files"]]
- if not file_paths:
- return jsonify({"success": False, "error": f"Playlist '{data['playlist_name']}' is empty"}), 400
- try:
- threading.Thread(
- target=run_theta_rho_files,
- args=(file_paths,),
- kwargs={
- 'pause_time': data.get("pause_time", 0),
- 'clear_pattern': data.get("clear_pattern"),
- 'run_mode': data.get("run_mode", "single"),
- 'shuffle': data.get("shuffle", False),
- 'schedule_hours': schedule_hours
- },
- daemon=True
- ).start()
- return jsonify({"success": True, "message": f"Playlist '{data['playlist_name']}' is now running."})
- except Exception as e:
- return jsonify({"success": False, "error": str(e)}), 500
- # Execution Control Routes
- @app.route('/stop_execution', methods=['POST'])
- def api_stop_execution():
- stop_execution()
- return jsonify({'success': True})
- @app.route('/pause_execution', methods=['POST'])
- def api_pause_execution():
- pause_execution()
- return jsonify({'success': True, 'message': 'Execution paused'})
- @app.route('/resume_execution', methods=['POST'])
- def api_resume_execution():
- resume_execution()
- return jsonify({'success': True, 'message': 'Execution resumed'})
- @app.route('/status', methods=['GET'])
- def api_get_status():
- return jsonify(get_execution_status())
- # Firmware Routes
- @app.route('/get_firmware_info', methods=['GET', 'POST'])
- def api_get_firmware_info():
- device_info = get_device_info()
- if request.method == "POST":
- motor_type = request.json.get("motorType")
- info, error = get_firmware_info(
- device_info['firmware_version'],
- device_info['driver_type'],
- motor_type
- )
- else:
- info, error = get_firmware_info(
- device_info['firmware_version'],
- device_info['driver_type']
- )
- if error:
- return jsonify({"success": False, "error": error}), 500
- return jsonify(info)
- @app.route('/flash_firmware', methods=['POST'])
- def api_flash_firmware():
- status = get_serial_status()
- if not status['connected']:
- return jsonify({"success": False, "error": "No Arduino connected or connection lost"}), 400
- motor_type = request.json.get("motorType")
- success, message = flash_firmware(status['port'], motor_type)
-
- if success:
- return jsonify({"success": True, "message": message})
-
- app.logger.error(message)
- return jsonify({"success": False, "error": message}), 500
- @app.route('/check_software_update', methods=['GET'])
- def api_check_updates():
- update_info = check_git_updates()
- return jsonify(update_info)
- @app.route('/update_software', methods=['POST'])
- def api_update_software():
- success, message, error_log = update_software()
- if success:
- return jsonify({"success": True})
- return jsonify({
- "success": False,
- "error": message,
- "details": error_log
- }), 500
- # File Management Routes
- @app.route('/download/<filename>', methods=['GET'])
- def download_file(filename):
- return send_from_directory(THETA_RHO_DIR, filename)
- @app.route('/delete_theta_rho_file', methods=['POST'])
- def api_delete_theta_rho_file():
- data = request.json
- file_name = data.get('file_name')
- if not file_name:
- app.logger.error("No file name provided in delete_theta_rho_file request")
- return jsonify({"success": False, "error": "No file name provided"}), 400
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- app.logger.error(f"File not found: {file_path}")
- return jsonify({"success": False, "error": "File not found"}), 404
- try:
- os.remove(file_path)
- return jsonify({"success": True})
- except Exception as e:
- app.logger.error(f"Error deleting theta rho file: {str(e)}", exc_info=True)
- return jsonify({"success": False, "error": str(e)}), 500
- if __name__ == '__main__':
- # Auto-connect to serial
- connect_to_serial()
- app.run(debug=False, host='0.0.0.0', port=8080)
|