| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176217721782179218021812182218321842185218621872188218921902191219221932194219521962197219821992200220122022203220422052206220722082209221022112212221322142215221622172218221922202221222222232224222522262227222822292230223122322233223422352236223722382239224022412242224322442245224622472248224922502251225222532254225522562257225822592260226122622263226422652266226722682269227022712272227322742275227622772278227922802281228222832284228522862287228822892290229122922293229422952296229722982299230023012302230323042305230623072308230923102311231223132314231523162317231823192320232123222323232423252326232723282329233023312332233323342335233623372338233923402341234223432344234523462347234823492350235123522353235423552356235723582359236023612362236323642365236623672368236923702371237223732374237523762377237823792380238123822383238423852386238723882389239023912392239323942395239623972398239924002401240224032404240524062407240824092410241124122413241424152416241724182419242024212422242324242425242624272428242924302431243224332434243524362437243824392440244124422443244424452446244724482449245024512452245324542455245624572458245924602461246224632464246524662467246824692470247124722473247424752476247724782479248024812482248324842485248624872488248924902491249224932494249524962497249824992500250125022503250425052506250725082509251025112512251325142515251625172518251925202521252225232524252525262527252825292530253125322533253425352536253725382539254025412542254325442545254625472548254925502551255225532554255525562557255825592560256125622563256425652566256725682569257025712572257325742575257625772578257925802581258225832584258525862587258825892590259125922593259425952596259725982599260026012602260326042605260626072608260926102611261226132614261526162617261826192620262126222623262426252626262726282629263026312632263326342635263626372638263926402641264226432644264526462647264826492650265126522653265426552656265726582659266026612662266326642665266626672668266926702671267226732674267526762677267826792680268126822683268426852686268726882689269026912692269326942695269626972698269927002701270227032704270527062707270827092710271127122713271427152716271727182719272027212722272327242725272627272728272927302731273227332734273527362737273827392740274127422743274427452746274727482749275027512752275327542755275627572758275927602761276227632764276527662767276827692770277127722773277427752776277727782779278027812782278327842785278627872788278927902791279227932794279527962797279827992800280128022803280428052806280728082809281028112812281328142815281628172818281928202821282228232824282528262827282828292830283128322833283428352836283728382839284028412842284328442845284628472848284928502851285228532854285528562857285828592860286128622863286428652866286728682869287028712872287328742875287628772878287928802881288228832884288528862887288828892890289128922893289428952896289728982899290029012902290329042905290629072908290929102911291229132914291529162917291829192920292129222923292429252926292729282929293029312932293329342935293629372938293929402941294229432944294529462947294829492950295129522953295429552956295729582959296029612962296329642965296629672968296929702971297229732974297529762977297829792980298129822983298429852986298729882989299029912992299329942995299629972998299930003001300230033004300530063007300830093010301130123013301430153016301730183019302030213022302330243025302630273028302930303031303230333034303530363037303830393040304130423043304430453046304730483049305030513052305330543055305630573058305930603061306230633064306530663067306830693070307130723073307430753076307730783079308030813082308330843085308630873088308930903091309230933094309530963097309830993100310131023103310431053106310731083109311031113112311331143115311631173118311931203121312231233124312531263127312831293130313131323133313431353136313731383139314031413142314331443145314631473148314931503151315231533154315531563157315831593160316131623163316431653166316731683169317031713172317331743175317631773178317931803181318231833184318531863187318831893190319131923193319431953196319731983199320032013202320332043205320632073208320932103211321232133214321532163217321832193220322132223223322432253226322732283229323032313232323332343235323632373238323932403241324232433244324532463247324832493250325132523253325432553256325732583259326032613262326332643265326632673268326932703271327232733274327532763277327832793280328132823283328432853286328732883289329032913292329332943295329632973298329933003301330233033304330533063307330833093310331133123313331433153316331733183319332033213322332333243325332633273328332933303331333233333334333533363337333833393340334133423343334433453346334733483349335033513352335333543355335633573358335933603361336233633364336533663367336833693370337133723373337433753376337733783379338033813382338333843385338633873388338933903391339233933394339533963397339833993400340134023403340434053406340734083409341034113412341334143415341634173418341934203421342234233424342534263427342834293430343134323433343434353436343734383439344034413442344334443445344634473448344934503451345234533454345534563457345834593460346134623463346434653466346734683469347034713472347334743475347634773478347934803481348234833484348534863487348834893490349134923493349434953496349734983499350035013502350335043505350635073508350935103511351235133514351535163517351835193520352135223523352435253526352735283529353035313532353335343535353635373538353935403541354235433544354535463547354835493550355135523553355435553556355735583559356035613562356335643565356635673568356935703571357235733574357535763577357835793580358135823583358435853586358735883589359035913592359335943595359635973598359936003601360236033604360536063607360836093610361136123613361436153616361736183619362036213622362336243625362636273628362936303631363236333634363536363637363836393640364136423643364436453646364736483649365036513652365336543655365636573658365936603661366236633664366536663667366836693670367136723673367436753676367736783679368036813682368336843685368636873688368936903691369236933694369536963697369836993700370137023703370437053706370737083709371037113712371337143715371637173718371937203721372237233724372537263727372837293730373137323733373437353736373737383739374037413742374337443745374637473748374937503751375237533754375537563757375837593760376137623763376437653766376737683769377037713772377337743775377637773778377937803781378237833784378537863787378837893790379137923793379437953796379737983799380038013802380338043805380638073808380938103811381238133814381538163817381838193820382138223823382438253826382738283829383038313832383338343835383638373838383938403841384238433844384538463847384838493850385138523853385438553856385738583859386038613862386338643865386638673868386938703871387238733874387538763877 |
- from fastapi import FastAPI, UploadFile, File, HTTPException, BackgroundTasks, WebSocket, WebSocketDisconnect, Request
- from fastapi.responses import JSONResponse, FileResponse, Response
- from fastapi.staticfiles import StaticFiles
- from fastapi.middleware.cors import CORSMiddleware
- from fastapi.templating import Jinja2Templates
- from pydantic import BaseModel
- from typing import List, Optional, Tuple, Dict, Any, Union
- import atexit
- import os
- import logging
- from datetime import datetime, time
- from modules.connection import connection_manager
- from modules.core import pattern_manager
- from modules.core.pattern_manager import parse_theta_rho_file, THETA_RHO_DIR
- from modules.core import playlist_manager
- from modules.update import update_manager
- from modules.core.state import state
- from modules import mqtt
- import signal
- import sys
- import asyncio
- from contextlib import asynccontextmanager
- from modules.led.led_controller import LEDController, effect_idle
- from modules.led.led_interface import LEDInterface
- from modules.led.idle_timeout_manager import idle_timeout_manager
- import math
- from modules.core.cache_manager import generate_all_image_previews, get_cache_path, generate_image_preview, get_pattern_metadata
- from modules.core.version_manager import version_manager
- from modules.core.log_handler import init_memory_handler, get_memory_handler
- import json
- import base64
- import time
- import argparse
- import subprocess
- import platform
- # Get log level from environment variable, default to INFO
- log_level_str = os.getenv('LOG_LEVEL', 'INFO').upper()
- log_level = getattr(logging, log_level_str, logging.INFO)
- logging.basicConfig(
- level=log_level,
- format='%(asctime)s - %(name)s:%(lineno)d - %(levelname)s - %(message)s',
- handlers=[
- logging.StreamHandler(),
- ]
- )
- # Initialize memory log handler for web UI log viewer
- # Increased to 5000 entries to support lazy loading in the UI
- init_memory_handler(max_entries=5000)
- logger = logging.getLogger(__name__)
- async def _check_table_is_idle() -> bool:
- """Helper function to check if table is idle."""
- return not state.current_playing_file or state.pause_requested
- def _start_idle_led_timeout():
- """Start idle LED timeout if enabled."""
- if not state.dw_led_idle_timeout_enabled or state.dw_led_idle_timeout_minutes <= 0:
- return
- logger.debug(f"Starting idle LED timeout: {state.dw_led_idle_timeout_minutes} minutes")
- idle_timeout_manager.start_idle_timeout(
- timeout_minutes=state.dw_led_idle_timeout_minutes,
- state=state,
- check_idle_callback=_check_table_is_idle
- )
- def check_homing_in_progress():
- """Check if homing is in progress and raise exception if so."""
- if state.is_homing:
- raise HTTPException(status_code=409, detail="Cannot perform this action while homing is in progress")
- def normalize_file_path(file_path: str) -> str:
- """Normalize file path separators for consistent cross-platform handling."""
- if not file_path:
- return ''
-
- # First normalize path separators
- normalized = file_path.replace('\\', '/')
-
- # Remove only the patterns directory prefix from the beginning, not patterns within the path
- if normalized.startswith('./patterns/'):
- normalized = normalized[11:]
- elif normalized.startswith('patterns/'):
- normalized = normalized[9:]
-
- return normalized
- @asynccontextmanager
- async def lifespan(app: FastAPI):
- # Startup
- logger.info("Starting Dune Weaver application...")
- # Register signal handlers
- signal.signal(signal.SIGINT, signal_handler)
- signal.signal(signal.SIGTERM, signal_handler)
- # Connect device in background so the web server starts immediately
- async def connect_and_home():
- """Connect to device and perform homing in background."""
- try:
- # Connect without homing first (fast)
- await asyncio.to_thread(connection_manager.connect_device, False)
- # If connected, perform homing in background
- if state.conn and state.conn.is_connected():
- logger.info("Device connected, starting homing in background...")
- state.is_homing = True
- try:
- success = await asyncio.to_thread(connection_manager.home)
- if not success:
- logger.warning("Background homing failed or was skipped")
- # If sensor homing failed, close connection and wait for user action
- if state.sensor_homing_failed:
- logger.error("Sensor homing failed - closing connection. User must check sensor or switch to crash homing.")
- if state.conn:
- await asyncio.to_thread(state.conn.close)
- state.conn = None
- return # Don't proceed with auto-play
- finally:
- state.is_homing = False
- logger.info("Background homing completed")
- # After homing, check for auto_play mode
- if state.auto_play_enabled and state.auto_play_playlist:
- logger.info(f"Homing complete, checking auto_play playlist: {state.auto_play_playlist}")
- try:
- playlist_exists = playlist_manager.get_playlist(state.auto_play_playlist) is not None
- if not playlist_exists:
- logger.warning(f"Auto-play playlist '{state.auto_play_playlist}' not found. Clearing invalid reference.")
- state.auto_play_playlist = None
- state.save()
- elif state.conn and state.conn.is_connected():
- logger.info(f"Starting auto-play playlist: {state.auto_play_playlist}")
- asyncio.create_task(playlist_manager.run_playlist(
- state.auto_play_playlist,
- pause_time=state.auto_play_pause_time,
- clear_pattern=state.auto_play_clear_pattern,
- run_mode=state.auto_play_run_mode,
- shuffle=state.auto_play_shuffle
- ))
- except Exception as e:
- logger.error(f"Failed to auto-play playlist: {str(e)}")
- except Exception as e:
- logger.warning(f"Failed to auto-connect to serial port: {str(e)}")
- # Start connection/homing in background - doesn't block server startup
- asyncio.create_task(connect_and_home())
- # Initialize LED controller based on saved configuration
- try:
- # Auto-detect provider for backward compatibility with existing installations
- if not state.led_provider or state.led_provider == "none":
- if state.wled_ip:
- state.led_provider = "wled"
- logger.info("Auto-detected WLED provider from existing configuration")
- # Initialize the appropriate controller
- if state.led_provider == "wled" and state.wled_ip:
- state.led_controller = LEDInterface("wled", state.wled_ip)
- logger.info(f"LED controller initialized: WLED at {state.wled_ip}")
- elif state.led_provider == "dw_leds":
- state.led_controller = LEDInterface(
- "dw_leds",
- num_leds=state.dw_led_num_leds,
- gpio_pin=state.dw_led_gpio_pin,
- pixel_order=state.dw_led_pixel_order,
- brightness=state.dw_led_brightness / 100.0,
- speed=state.dw_led_speed,
- intensity=state.dw_led_intensity
- )
- logger.info(f"LED controller initialized: DW LEDs ({state.dw_led_num_leds} LEDs on GPIO{state.dw_led_gpio_pin}, pixel order: {state.dw_led_pixel_order})")
- # Initialize hardware and start idle effect (matches behavior of /set_led_config)
- status = state.led_controller.check_status()
- if status.get("connected", False):
- state.led_controller.effect_idle(state.dw_led_idle_effect)
- _start_idle_led_timeout()
- logger.info("DW LEDs hardware initialized and idle effect started")
- else:
- error_msg = status.get("error", "Unknown error")
- logger.warning(f"DW LED hardware initialization failed: {error_msg}")
- else:
- state.led_controller = None
- logger.info("LED controller not configured")
- # Save if provider was auto-detected
- if state.led_provider and state.wled_ip:
- state.save()
- except Exception as e:
- logger.warning(f"Failed to initialize LED controller: {str(e)}")
- state.led_controller = None
- # Note: auto_play is now handled in connect_and_home() after homing completes
- try:
- mqtt_handler = mqtt.init_mqtt()
- except Exception as e:
- logger.warning(f"Failed to initialize MQTT: {str(e)}")
-
- # Schedule cache generation check for later (non-blocking startup)
- async def delayed_cache_check():
- """Check and generate cache in background."""
- try:
- logger.info("Starting cache check...")
- from modules.core.cache_manager import is_cache_generation_needed_async, generate_cache_background
- if await is_cache_generation_needed_async():
- logger.info("Cache generation needed, starting background task...")
- asyncio.create_task(generate_cache_background()) # Don't await - run in background
- else:
- logger.info("Cache is up to date, skipping generation")
- except Exception as e:
- logger.warning(f"Failed during cache generation: {str(e)}")
- # Start cache check in background immediately
- asyncio.create_task(delayed_cache_check())
- # Start idle timeout monitor
- async def idle_timeout_monitor():
- """Monitor LED idle timeout and turn off LEDs when timeout expires."""
- import time
- while True:
- try:
- await asyncio.sleep(30) # Check every 30 seconds
- if not state.dw_led_idle_timeout_enabled:
- continue
- if not state.led_controller or not state.led_controller.is_configured:
- continue
- # Check if we're currently playing a pattern
- is_playing = bool(state.current_playing_file or state.current_playlist)
- if is_playing:
- # Reset activity time when playing
- state.dw_led_last_activity_time = time.time()
- continue
- # If no activity time set, initialize it
- if state.dw_led_last_activity_time is None:
- state.dw_led_last_activity_time = time.time()
- continue
- # Calculate idle duration
- idle_seconds = time.time() - state.dw_led_last_activity_time
- timeout_seconds = state.dw_led_idle_timeout_minutes * 60
- # Turn off LEDs if timeout expired
- if idle_seconds >= timeout_seconds:
- status = state.led_controller.check_status()
- # Check both "power" (WLED) and "power_on" (DW LEDs) keys
- is_powered_on = status.get("power", False) or status.get("power_on", False)
- if is_powered_on: # Only turn off if currently on
- logger.info(f"Idle timeout ({state.dw_led_idle_timeout_minutes} minutes) expired, turning off LEDs")
- state.led_controller.set_power(0)
- # Reset activity time to prevent repeated turn-off attempts
- state.dw_led_last_activity_time = time.time()
- except Exception as e:
- logger.error(f"Error in idle timeout monitor: {e}")
- await asyncio.sleep(60) # Wait longer on error
- asyncio.create_task(idle_timeout_monitor())
- yield # This separates startup from shutdown code
- # Shutdown
- logger.info("Shutting down Dune Weaver application...")
- app = FastAPI(lifespan=lifespan)
- # Add CORS middleware to allow cross-origin requests from other Dune Weaver frontends
- # This enables multi-table control from a single frontend
- # Note: allow_credentials must be False when allow_origins=["*"] (browser security requirement)
- app.add_middleware(
- CORSMiddleware,
- allow_origins=["*"], # Allow all origins for local network access
- allow_credentials=False,
- allow_methods=["*"],
- allow_headers=["*"],
- )
- templates = Jinja2Templates(directory="templates")
- app.mount("/static", StaticFiles(directory="static"), name="static")
- # Global semaphore to limit concurrent preview processing
- # Prevents resource exhaustion when loading many previews simultaneously
- # Lazily initialized to avoid "attached to a different loop" errors
- _preview_semaphore: Optional[asyncio.Semaphore] = None
- def get_preview_semaphore() -> asyncio.Semaphore:
- """Get or create the preview semaphore in the current event loop."""
- global _preview_semaphore
- if _preview_semaphore is None:
- _preview_semaphore = asyncio.Semaphore(5)
- return _preview_semaphore
- # Pydantic models for request/response validation
- class ConnectRequest(BaseModel):
- port: Optional[str] = None
- class auto_playModeRequest(BaseModel):
- enabled: bool
- playlist: Optional[str] = None
- run_mode: Optional[str] = "loop"
- pause_time: Optional[float] = 5.0
- clear_pattern: Optional[str] = "adaptive"
- shuffle: Optional[bool] = False
- class TimeSlot(BaseModel):
- start_time: str # HH:MM format
- end_time: str # HH:MM format
- days: str # "daily", "weekdays", "weekends", or "custom"
- custom_days: Optional[List[str]] = [] # ["monday", "tuesday", etc.]
- class ScheduledPauseRequest(BaseModel):
- enabled: bool
- control_wled: Optional[bool] = False
- finish_pattern: Optional[bool] = False # Finish current pattern before pausing
- timezone: Optional[str] = None # IANA timezone or None for system default
- time_slots: List[TimeSlot] = []
- class CoordinateRequest(BaseModel):
- theta: float
- rho: float
- class PlaylistRequest(BaseModel):
- playlist_name: str
- files: List[str] = []
- pause_time: float = 0
- clear_pattern: Optional[str] = None
- run_mode: str = "single"
- shuffle: bool = False
- class PlaylistRunRequest(BaseModel):
- playlist_name: str
- pause_time: Optional[float] = 0
- clear_pattern: Optional[str] = None
- run_mode: Optional[str] = "single"
- shuffle: Optional[bool] = False
- start_time: Optional[str] = None
- end_time: Optional[str] = None
- class SpeedRequest(BaseModel):
- speed: float
- class WLEDRequest(BaseModel):
- wled_ip: Optional[str] = None
- class LEDConfigRequest(BaseModel):
- provider: str # "wled", "dw_leds", or "none"
- ip_address: Optional[str] = None # For WLED only
- # DW LED specific fields
- num_leds: Optional[int] = None
- gpio_pin: Optional[int] = None
- pixel_order: Optional[str] = None
- brightness: Optional[int] = None
- class DeletePlaylistRequest(BaseModel):
- playlist_name: str
- class RenamePlaylistRequest(BaseModel):
- old_name: str
- new_name: str
- class ThetaRhoRequest(BaseModel):
- file_name: str
- pre_execution: Optional[str] = "none"
- class GetCoordinatesRequest(BaseModel):
- file_name: str
- # ============================================================================
- # Unified Settings Models
- # ============================================================================
- class AppSettingsUpdate(BaseModel):
- name: Optional[str] = None
- custom_logo: Optional[str] = None # Filename or empty string to clear (favicon auto-generated)
- class ConnectionSettingsUpdate(BaseModel):
- preferred_port: Optional[str] = None
- class PatternSettingsUpdate(BaseModel):
- clear_pattern_speed: Optional[int] = None
- custom_clear_from_in: Optional[str] = None
- custom_clear_from_out: Optional[str] = None
- class AutoPlaySettingsUpdate(BaseModel):
- enabled: Optional[bool] = None
- playlist: Optional[str] = None
- run_mode: Optional[str] = None
- pause_time: Optional[float] = None
- clear_pattern: Optional[str] = None
- shuffle: Optional[bool] = None
- class ScheduledPauseSettingsUpdate(BaseModel):
- enabled: Optional[bool] = None
- control_wled: Optional[bool] = None
- finish_pattern: Optional[bool] = None
- timezone: Optional[str] = None # IANA timezone (e.g., "America/New_York") or None for system default
- time_slots: Optional[List[TimeSlot]] = None
- class HomingSettingsUpdate(BaseModel):
- mode: Optional[int] = None
- angular_offset_degrees: Optional[float] = None
- auto_home_enabled: Optional[bool] = None
- auto_home_after_patterns: Optional[int] = None
- class DwLedSettingsUpdate(BaseModel):
- num_leds: Optional[int] = None
- gpio_pin: Optional[int] = None
- pixel_order: Optional[str] = None
- brightness: Optional[int] = None
- speed: Optional[int] = None
- intensity: Optional[int] = None
- idle_effect: Optional[dict] = None
- playing_effect: Optional[dict] = None
- idle_timeout_enabled: Optional[bool] = None
- idle_timeout_minutes: Optional[int] = None
- class LedSettingsUpdate(BaseModel):
- provider: Optional[str] = None # "none", "wled", "dw_leds"
- wled_ip: Optional[str] = None
- dw_led: Optional[DwLedSettingsUpdate] = None
- class MqttSettingsUpdate(BaseModel):
- enabled: Optional[bool] = None
- broker: Optional[str] = None
- port: Optional[int] = None
- username: Optional[str] = None
- password: Optional[str] = None # Write-only, never returned in GET
- client_id: Optional[str] = None
- discovery_prefix: Optional[str] = None
- device_id: Optional[str] = None
- device_name: Optional[str] = None
- class MachineSettingsUpdate(BaseModel):
- table_type_override: Optional[str] = None # Override detected table type, or empty string/"auto" to clear
- timezone: Optional[str] = None # IANA timezone (e.g., "America/New_York", "UTC")
- class SettingsUpdate(BaseModel):
- """Request model for PATCH /api/settings - all fields optional for partial updates"""
- app: Optional[AppSettingsUpdate] = None
- connection: Optional[ConnectionSettingsUpdate] = None
- patterns: Optional[PatternSettingsUpdate] = None
- auto_play: Optional[AutoPlaySettingsUpdate] = None
- scheduled_pause: Optional[ScheduledPauseSettingsUpdate] = None
- homing: Optional[HomingSettingsUpdate] = None
- led: Optional[LedSettingsUpdate] = None
- mqtt: Optional[MqttSettingsUpdate] = None
- machine: Optional[MachineSettingsUpdate] = None
- # Store active WebSocket connections
- active_status_connections = set()
- active_cache_progress_connections = set()
- @app.websocket("/ws/status")
- async def websocket_status_endpoint(websocket: WebSocket):
- await websocket.accept()
- active_status_connections.add(websocket)
- try:
- while True:
- status = pattern_manager.get_status()
- try:
- await websocket.send_json({
- "type": "status_update",
- "data": status
- })
- except RuntimeError as e:
- if "close message has been sent" in str(e):
- break
- raise
- # Use longer interval during pattern execution to reduce asyncio overhead
- # This helps prevent timing-related serial corruption on Pi 3B+
- interval = 2.0 if state.current_playing_file else 1.0
- await asyncio.sleep(interval)
- except WebSocketDisconnect:
- pass
- finally:
- active_status_connections.discard(websocket)
- try:
- await websocket.close()
- except RuntimeError:
- pass
- async def broadcast_status_update(status: dict):
- """Broadcast status update to all connected clients."""
- disconnected = set()
- for websocket in active_status_connections:
- try:
- await websocket.send_json({
- "type": "status_update",
- "data": status
- })
- except WebSocketDisconnect:
- disconnected.add(websocket)
- except RuntimeError:
- disconnected.add(websocket)
-
- active_status_connections.difference_update(disconnected)
- @app.websocket("/ws/cache-progress")
- async def websocket_cache_progress_endpoint(websocket: WebSocket):
- from modules.core.cache_manager import get_cache_progress
- await websocket.accept()
- active_cache_progress_connections.add(websocket)
- try:
- while True:
- progress = get_cache_progress()
- try:
- await websocket.send_json({
- "type": "cache_progress",
- "data": progress
- })
- except RuntimeError as e:
- if "close message has been sent" in str(e):
- break
- raise
- await asyncio.sleep(1.0) # Update every 1 second (reduced frequency for better performance)
- except WebSocketDisconnect:
- pass
- finally:
- active_cache_progress_connections.discard(websocket)
- try:
- await websocket.close()
- except RuntimeError:
- pass
- # WebSocket endpoint for real-time log streaming
- @app.websocket("/ws/logs")
- async def websocket_logs_endpoint(websocket: WebSocket):
- """Stream application logs in real-time via WebSocket."""
- await websocket.accept()
- handler = get_memory_handler()
- if not handler:
- await websocket.close()
- return
- # Subscribe to log updates
- log_queue = handler.subscribe()
- try:
- while True:
- try:
- # Wait for new log entry with timeout
- log_entry = await asyncio.wait_for(log_queue.get(), timeout=30.0)
- await websocket.send_json({
- "type": "log_entry",
- "data": log_entry
- })
- except asyncio.TimeoutError:
- # Send heartbeat to keep connection alive
- await websocket.send_json({"type": "heartbeat"})
- except RuntimeError as e:
- if "close message has been sent" in str(e):
- break
- raise
- except WebSocketDisconnect:
- pass
- finally:
- handler.unsubscribe(log_queue)
- try:
- await websocket.close()
- except RuntimeError:
- pass
- # API endpoint to retrieve logs
- @app.get("/api/logs", tags=["logs"])
- async def get_logs(limit: int = 100, level: str = None, offset: int = 0):
- """
- Retrieve application logs from memory buffer with pagination.
- Args:
- limit: Maximum number of log entries to return (default: 100)
- level: Filter by log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)
- offset: Number of entries to skip from newest (for lazy loading older logs)
- Returns:
- List of log entries with timestamp, level, logger, and message.
- Also returns total count and whether there are more logs available.
- """
- handler = get_memory_handler()
- if not handler:
- return {"logs": [], "count": 0, "total": 0, "has_more": False, "error": "Log handler not initialized"}
- # Clamp limit to reasonable range (no max limit for lazy loading)
- limit = max(1, limit)
- offset = max(0, offset)
- logs = handler.get_logs(limit=limit, level=level, offset=offset)
- total = handler.get_total_count(level=level)
- has_more = offset + len(logs) < total
- return {"logs": logs, "count": len(logs), "total": total, "has_more": has_more}
- @app.delete("/api/logs", tags=["logs"])
- async def clear_logs():
- """Clear all logs from the memory buffer."""
- handler = get_memory_handler()
- if handler:
- handler.clear()
- return {"status": "ok", "message": "Logs cleared"}
- # FastAPI routes - Redirect old frontend routes to new React frontend on port 80
- def get_redirect_response(request: Request):
- """Return redirect page pointing users to the new frontend."""
- host = request.headers.get("host", "localhost").split(":")[0] # Remove port if present
- return templates.TemplateResponse("redirect.html", {"request": request, "host": host})
- @app.get("/")
- async def index(request: Request):
- return get_redirect_response(request)
- @app.get("/settings")
- async def settings_page(request: Request):
- return get_redirect_response(request)
- # ============================================================================
- # Unified Settings API
- # ============================================================================
- @app.get("/api/settings", tags=["settings"])
- async def get_all_settings():
- """
- Get all application settings in a unified structure.
- This endpoint consolidates multiple settings endpoints into a single response.
- Individual settings endpoints are deprecated but still functional.
- """
- return {
- "app": {
- "name": state.app_name,
- "custom_logo": state.custom_logo
- },
- "connection": {
- "preferred_port": state.preferred_port
- },
- "patterns": {
- "clear_pattern_speed": state.clear_pattern_speed,
- "custom_clear_from_in": state.custom_clear_from_in,
- "custom_clear_from_out": state.custom_clear_from_out
- },
- "auto_play": {
- "enabled": state.auto_play_enabled,
- "playlist": state.auto_play_playlist,
- "run_mode": state.auto_play_run_mode,
- "pause_time": state.auto_play_pause_time,
- "clear_pattern": state.auto_play_clear_pattern,
- "shuffle": state.auto_play_shuffle
- },
- "scheduled_pause": {
- "enabled": state.scheduled_pause_enabled,
- "control_wled": state.scheduled_pause_control_wled,
- "finish_pattern": state.scheduled_pause_finish_pattern,
- "timezone": state.scheduled_pause_timezone,
- "time_slots": state.scheduled_pause_time_slots
- },
- "homing": {
- "mode": state.homing,
- "user_override": state.homing_user_override, # True if user explicitly set, False if auto-detected
- "angular_offset_degrees": state.angular_homing_offset_degrees,
- "auto_home_enabled": state.auto_home_enabled,
- "auto_home_after_patterns": state.auto_home_after_patterns
- },
- "led": {
- "provider": state.led_provider,
- "wled_ip": state.wled_ip,
- "dw_led": {
- "num_leds": state.dw_led_num_leds,
- "gpio_pin": state.dw_led_gpio_pin,
- "pixel_order": state.dw_led_pixel_order,
- "brightness": state.dw_led_brightness,
- "speed": state.dw_led_speed,
- "intensity": state.dw_led_intensity,
- "idle_effect": state.dw_led_idle_effect,
- "playing_effect": state.dw_led_playing_effect,
- "idle_timeout_enabled": state.dw_led_idle_timeout_enabled,
- "idle_timeout_minutes": state.dw_led_idle_timeout_minutes
- }
- },
- "mqtt": {
- "enabled": state.mqtt_enabled,
- "broker": state.mqtt_broker,
- "port": state.mqtt_port,
- "username": state.mqtt_username,
- "has_password": bool(state.mqtt_password),
- "client_id": state.mqtt_client_id,
- "discovery_prefix": state.mqtt_discovery_prefix,
- "device_id": state.mqtt_device_id,
- "device_name": state.mqtt_device_name
- },
- "machine": {
- "detected_table_type": state.table_type,
- "table_type_override": state.table_type_override,
- "effective_table_type": state.table_type_override or state.table_type,
- "gear_ratio": state.gear_ratio,
- "x_steps_per_mm": state.x_steps_per_mm,
- "y_steps_per_mm": state.y_steps_per_mm,
- "timezone": state.timezone,
- "available_table_types": [
- {"value": "dune_weaver_mini", "label": "Dune Weaver Mini"},
- {"value": "dune_weaver_mini_pro", "label": "Dune Weaver Mini Pro"},
- {"value": "dune_weaver_mini_pro_byj", "label": "Dune Weaver Mini Pro (BYJ)"},
- {"value": "dune_weaver_gold", "label": "Dune Weaver Gold"},
- {"value": "dune_weaver", "label": "Dune Weaver"},
- {"value": "dune_weaver_pro", "label": "Dune Weaver Pro"}
- ]
- }
- }
- @app.get("/api/manifest.webmanifest", tags=["settings"])
- async def get_dynamic_manifest():
- """
- Get a dynamically generated web manifest.
- Returns manifest with custom icons and app name if custom branding is configured,
- otherwise returns defaults.
- """
- # Determine icon paths based on whether custom logo exists
- if state.custom_logo:
- icon_base = "/static/custom"
- else:
- icon_base = "/static"
- # Use custom app name or default
- app_name = state.app_name or "Dune Weaver"
- return {
- "name": app_name,
- "short_name": app_name,
- "description": "Control your kinetic sand table",
- "icons": [
- {
- "src": f"{icon_base}/android-chrome-192x192.png",
- "sizes": "192x192",
- "type": "image/png",
- "purpose": "any"
- },
- {
- "src": f"{icon_base}/android-chrome-512x512.png",
- "sizes": "512x512",
- "type": "image/png",
- "purpose": "any"
- }
- ],
- "start_url": "/",
- "scope": "/",
- "display": "standalone",
- "orientation": "any",
- "theme_color": "#0a0a0a",
- "background_color": "#0a0a0a",
- "categories": ["utilities", "entertainment"]
- }
- @app.patch("/api/settings", tags=["settings"])
- async def update_settings(settings_update: SettingsUpdate):
- """
- Partially update application settings.
- Only include the categories and fields you want to update.
- All fields are optional - only provided values will be updated.
- Example: {"app": {"name": "Dune Weaver"}, "auto_play": {"enabled": true}}
- """
- updated_categories = []
- requires_restart = False
- led_reinit_needed = False
- old_led_provider = state.led_provider
- # App settings
- if settings_update.app:
- if settings_update.app.name is not None:
- state.app_name = settings_update.app.name or "Dune Weaver"
- if settings_update.app.custom_logo is not None:
- state.custom_logo = settings_update.app.custom_logo or None
- updated_categories.append("app")
- # Connection settings
- if settings_update.connection:
- if settings_update.connection.preferred_port is not None:
- # Store exactly what frontend sends: "__auto__", "__none__", or specific port
- state.preferred_port = settings_update.connection.preferred_port
- updated_categories.append("connection")
- # Pattern settings
- if settings_update.patterns:
- p = settings_update.patterns
- if p.clear_pattern_speed is not None:
- state.clear_pattern_speed = p.clear_pattern_speed if p.clear_pattern_speed > 0 else None
- if p.custom_clear_from_in is not None:
- state.custom_clear_from_in = p.custom_clear_from_in or None
- if p.custom_clear_from_out is not None:
- state.custom_clear_from_out = p.custom_clear_from_out or None
- updated_categories.append("patterns")
- # Auto-play settings
- if settings_update.auto_play:
- ap = settings_update.auto_play
- if ap.enabled is not None:
- state.auto_play_enabled = ap.enabled
- if ap.playlist is not None:
- state.auto_play_playlist = ap.playlist or None
- if ap.run_mode is not None:
- state.auto_play_run_mode = ap.run_mode
- if ap.pause_time is not None:
- state.auto_play_pause_time = ap.pause_time
- if ap.clear_pattern is not None:
- state.auto_play_clear_pattern = ap.clear_pattern
- if ap.shuffle is not None:
- state.auto_play_shuffle = ap.shuffle
- updated_categories.append("auto_play")
- # Scheduled pause (Still Sands) settings
- if settings_update.scheduled_pause:
- sp = settings_update.scheduled_pause
- if sp.enabled is not None:
- state.scheduled_pause_enabled = sp.enabled
- if sp.control_wled is not None:
- state.scheduled_pause_control_wled = sp.control_wled
- if sp.finish_pattern is not None:
- state.scheduled_pause_finish_pattern = sp.finish_pattern
- if sp.timezone is not None:
- # Empty string means use system default (store as None)
- state.scheduled_pause_timezone = sp.timezone if sp.timezone else None
- # Clear cached timezone in pattern_manager so it picks up the new setting
- from modules.core import pattern_manager
- pattern_manager._cached_timezone = None
- pattern_manager._cached_zoneinfo = None
- if sp.time_slots is not None:
- state.scheduled_pause_time_slots = [slot.model_dump() for slot in sp.time_slots]
- updated_categories.append("scheduled_pause")
- # Homing settings
- if settings_update.homing:
- h = settings_update.homing
- if h.mode is not None:
- state.homing = h.mode
- state.homing_user_override = True # User explicitly set preference
- if h.angular_offset_degrees is not None:
- state.angular_homing_offset_degrees = h.angular_offset_degrees
- if h.auto_home_enabled is not None:
- state.auto_home_enabled = h.auto_home_enabled
- if h.auto_home_after_patterns is not None:
- state.auto_home_after_patterns = h.auto_home_after_patterns
- updated_categories.append("homing")
- # LED settings
- if settings_update.led:
- led = settings_update.led
- if led.provider is not None:
- state.led_provider = led.provider
- if led.provider != old_led_provider:
- led_reinit_needed = True
- if led.wled_ip is not None:
- state.wled_ip = led.wled_ip or None
- if led.dw_led:
- dw = led.dw_led
- if dw.num_leds is not None:
- state.dw_led_num_leds = dw.num_leds
- if dw.gpio_pin is not None:
- state.dw_led_gpio_pin = dw.gpio_pin
- if dw.pixel_order is not None:
- state.dw_led_pixel_order = dw.pixel_order
- if dw.brightness is not None:
- state.dw_led_brightness = dw.brightness
- if dw.speed is not None:
- state.dw_led_speed = dw.speed
- if dw.intensity is not None:
- state.dw_led_intensity = dw.intensity
- if dw.idle_effect is not None:
- state.dw_led_idle_effect = dw.idle_effect
- if dw.playing_effect is not None:
- state.dw_led_playing_effect = dw.playing_effect
- if dw.idle_timeout_enabled is not None:
- state.dw_led_idle_timeout_enabled = dw.idle_timeout_enabled
- if dw.idle_timeout_minutes is not None:
- state.dw_led_idle_timeout_minutes = dw.idle_timeout_minutes
- updated_categories.append("led")
- # MQTT settings
- if settings_update.mqtt:
- m = settings_update.mqtt
- if m.enabled is not None:
- state.mqtt_enabled = m.enabled
- if m.broker is not None:
- state.mqtt_broker = m.broker
- if m.port is not None:
- state.mqtt_port = m.port
- if m.username is not None:
- state.mqtt_username = m.username
- if m.password is not None:
- state.mqtt_password = m.password
- if m.client_id is not None:
- state.mqtt_client_id = m.client_id
- if m.discovery_prefix is not None:
- state.mqtt_discovery_prefix = m.discovery_prefix
- if m.device_id is not None:
- state.mqtt_device_id = m.device_id
- if m.device_name is not None:
- state.mqtt_device_name = m.device_name
- updated_categories.append("mqtt")
- requires_restart = True
- # Machine settings
- if settings_update.machine:
- m = settings_update.machine
- if m.table_type_override is not None:
- # Empty string or "auto" clears the override
- state.table_type_override = None if m.table_type_override in ("", "auto") else m.table_type_override
- if m.timezone is not None:
- # Validate timezone by trying to create a ZoneInfo object
- try:
- from zoneinfo import ZoneInfo
- except ImportError:
- from backports.zoneinfo import ZoneInfo
- try:
- ZoneInfo(m.timezone) # Validate
- state.timezone = m.timezone
- # Also update scheduled_pause_timezone to keep in sync
- state.scheduled_pause_timezone = m.timezone
- # Clear cached timezone in pattern_manager so it picks up the new setting
- from modules.core import pattern_manager
- pattern_manager._cached_timezone = None
- pattern_manager._cached_zoneinfo = None
- logger.info(f"Timezone updated to: {m.timezone}")
- except Exception as e:
- logger.warning(f"Invalid timezone '{m.timezone}': {e}")
- updated_categories.append("machine")
- # Save state
- state.save()
- # Handle LED reinitialization if provider changed
- if led_reinit_needed:
- logger.info(f"LED provider changed from {old_led_provider} to {state.led_provider}, reinitialization may be needed")
- logger.info(f"Settings updated: {', '.join(updated_categories)}")
- return {
- "success": True,
- "updated_categories": updated_categories,
- "requires_restart": requires_restart,
- "led_reinit_needed": led_reinit_needed
- }
- # ============================================================================
- # Multi-Table Identity Endpoints
- # ============================================================================
- class TableInfoUpdate(BaseModel):
- name: Optional[str] = None
- class KnownTableAdd(BaseModel):
- id: str
- name: str
- url: str
- host: Optional[str] = None
- port: Optional[int] = None
- version: Optional[str] = None
- class KnownTableUpdate(BaseModel):
- name: Optional[str] = None
- @app.get("/api/table-info", tags=["multi-table"])
- async def get_table_info():
- """
- Get table identity information for multi-table discovery.
- Returns the table's unique ID, name, and version.
- """
- return {
- "id": state.table_id,
- "name": state.table_name,
- "version": await version_manager.get_current_version()
- }
- @app.patch("/api/table-info", tags=["multi-table"])
- async def update_table_info(update: TableInfoUpdate):
- """
- Update table identity information.
- Currently only the table name can be updated.
- The table ID is immutable after generation.
- """
- if update.name is not None:
- state.table_name = update.name.strip() or "Dune Weaver"
- state.save()
- logger.info(f"Table name updated to: {state.table_name}")
- return {
- "success": True,
- "id": state.table_id,
- "name": state.table_name
- }
- @app.get("/api/known-tables", tags=["multi-table"])
- async def get_known_tables():
- """
- Get list of known remote tables.
- These are tables that have been manually added and are persisted
- for multi-table management.
- """
- return {"tables": state.known_tables}
- @app.post("/api/known-tables", tags=["multi-table"])
- async def add_known_table(table: KnownTableAdd):
- """
- Add a known remote table.
- This persists the table information so it's available across
- browser sessions and devices.
- """
- # Check if table with same ID already exists
- existing_ids = [t.get("id") for t in state.known_tables]
- if table.id in existing_ids:
- raise HTTPException(status_code=400, detail="Table with this ID already exists")
- # Check if table with same URL already exists
- existing_urls = [t.get("url") for t in state.known_tables]
- if table.url in existing_urls:
- raise HTTPException(status_code=400, detail="Table with this URL already exists")
- new_table = {
- "id": table.id,
- "name": table.name,
- "url": table.url,
- }
- if table.host:
- new_table["host"] = table.host
- if table.port:
- new_table["port"] = table.port
- if table.version:
- new_table["version"] = table.version
- state.known_tables.append(new_table)
- state.save()
- logger.info(f"Added known table: {table.name} ({table.url})")
- return {"success": True, "table": new_table}
- @app.delete("/api/known-tables/{table_id}", tags=["multi-table"])
- async def remove_known_table(table_id: str):
- """
- Remove a known remote table by ID.
- """
- original_count = len(state.known_tables)
- state.known_tables = [t for t in state.known_tables if t.get("id") != table_id]
- if len(state.known_tables) == original_count:
- raise HTTPException(status_code=404, detail="Table not found")
- state.save()
- logger.info(f"Removed known table: {table_id}")
- return {"success": True}
- @app.patch("/api/known-tables/{table_id}", tags=["multi-table"])
- async def update_known_table(table_id: str, update: KnownTableUpdate):
- """
- Update a known remote table's name.
- """
- for table in state.known_tables:
- if table.get("id") == table_id:
- if update.name is not None:
- table["name"] = update.name.strip()
- state.save()
- logger.info(f"Updated known table {table_id}: name={update.name}")
- return {"success": True, "table": table}
- raise HTTPException(status_code=404, detail="Table not found")
- # ============================================================================
- # Individual Settings Endpoints (Deprecated - use /api/settings instead)
- # ============================================================================
- @app.get("/api/auto_play-mode", deprecated=True, tags=["settings-deprecated"])
- async def get_auto_play_mode():
- """DEPRECATED: Use GET /api/settings instead. Get current auto_play mode settings."""
- return {
- "enabled": state.auto_play_enabled,
- "playlist": state.auto_play_playlist,
- "run_mode": state.auto_play_run_mode,
- "pause_time": state.auto_play_pause_time,
- "clear_pattern": state.auto_play_clear_pattern,
- "shuffle": state.auto_play_shuffle
- }
- @app.post("/api/auto_play-mode", deprecated=True, tags=["settings-deprecated"])
- async def set_auto_play_mode(request: auto_playModeRequest):
- """DEPRECATED: Use PATCH /api/settings instead. Update auto_play mode settings."""
- state.auto_play_enabled = request.enabled
- if request.playlist is not None:
- state.auto_play_playlist = request.playlist
- if request.run_mode is not None:
- state.auto_play_run_mode = request.run_mode
- if request.pause_time is not None:
- state.auto_play_pause_time = request.pause_time
- if request.clear_pattern is not None:
- state.auto_play_clear_pattern = request.clear_pattern
- if request.shuffle is not None:
- state.auto_play_shuffle = request.shuffle
- state.save()
-
- logger.info(f"auto_play mode {'enabled' if request.enabled else 'disabled'}, playlist: {request.playlist}")
- return {"success": True, "message": "auto_play mode settings updated"}
- @app.get("/api/scheduled-pause", deprecated=True, tags=["settings-deprecated"])
- async def get_scheduled_pause():
- """DEPRECATED: Use GET /api/settings instead. Get current Still Sands settings."""
- return {
- "enabled": state.scheduled_pause_enabled,
- "control_wled": state.scheduled_pause_control_wled,
- "finish_pattern": state.scheduled_pause_finish_pattern,
- "timezone": state.scheduled_pause_timezone,
- "time_slots": state.scheduled_pause_time_slots
- }
- @app.post("/api/scheduled-pause", deprecated=True, tags=["settings-deprecated"])
- async def set_scheduled_pause(request: ScheduledPauseRequest):
- """Update Still Sands settings."""
- try:
- # Validate time slots
- for i, slot in enumerate(request.time_slots):
- # Validate time format (HH:MM)
- try:
- start_time = datetime.strptime(slot.start_time, "%H:%M").time()
- end_time = datetime.strptime(slot.end_time, "%H:%M").time()
- except ValueError:
- raise HTTPException(
- status_code=400,
- detail=f"Invalid time format in slot {i+1}. Use HH:MM format."
- )
- # Validate days setting
- if slot.days not in ["daily", "weekdays", "weekends", "custom"]:
- raise HTTPException(
- status_code=400,
- detail=f"Invalid days setting in slot {i+1}. Must be 'daily', 'weekdays', 'weekends', or 'custom'."
- )
- # Validate custom days if applicable
- if slot.days == "custom":
- if not slot.custom_days or len(slot.custom_days) == 0:
- raise HTTPException(
- status_code=400,
- detail=f"Custom days must be specified for slot {i+1} when days is set to 'custom'."
- )
- valid_days = ["monday", "tuesday", "wednesday", "thursday", "friday", "saturday", "sunday"]
- for day in slot.custom_days:
- if day not in valid_days:
- raise HTTPException(
- status_code=400,
- detail=f"Invalid day '{day}' in slot {i+1}. Valid days are: {', '.join(valid_days)}"
- )
- # Update state
- state.scheduled_pause_enabled = request.enabled
- state.scheduled_pause_control_wled = request.control_wled
- state.scheduled_pause_finish_pattern = request.finish_pattern
- state.scheduled_pause_timezone = request.timezone if request.timezone else None
- state.scheduled_pause_time_slots = [slot.model_dump() for slot in request.time_slots]
- state.save()
- # Clear cached timezone so it picks up the new setting
- from modules.core import pattern_manager
- pattern_manager._cached_timezone = None
- pattern_manager._cached_zoneinfo = None
- wled_msg = " (with WLED control)" if request.control_wled else ""
- finish_msg = " (finish pattern first)" if request.finish_pattern else ""
- tz_msg = f" (timezone: {request.timezone})" if request.timezone else ""
- logger.info(f"Still Sands {'enabled' if request.enabled else 'disabled'} with {len(request.time_slots)} time slots{wled_msg}{finish_msg}{tz_msg}")
- return {"success": True, "message": "Still Sands settings updated"}
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Error updating Still Sands settings: {str(e)}")
- raise HTTPException(status_code=500, detail=f"Failed to update Still Sands settings: {str(e)}")
- @app.get("/api/homing-config", deprecated=True, tags=["settings-deprecated"])
- async def get_homing_config():
- """Get homing configuration (mode, compass offset, and auto-home settings)."""
- return {
- "homing_mode": state.homing,
- "angular_homing_offset_degrees": state.angular_homing_offset_degrees,
- "auto_home_enabled": state.auto_home_enabled,
- "auto_home_after_patterns": state.auto_home_after_patterns
- }
- class HomingConfigRequest(BaseModel):
- homing_mode: int = 0 # 0 = crash, 1 = sensor
- angular_homing_offset_degrees: float = 0.0
- auto_home_enabled: Optional[bool] = None
- auto_home_after_patterns: Optional[int] = None
- @app.post("/api/homing-config", deprecated=True, tags=["settings-deprecated"])
- async def set_homing_config(request: HomingConfigRequest):
- """Set homing configuration (mode, compass offset, and auto-home settings)."""
- try:
- # Validate homing mode
- if request.homing_mode not in [0, 1]:
- raise HTTPException(status_code=400, detail="Homing mode must be 0 (crash) or 1 (sensor)")
- state.homing = request.homing_mode
- state.homing_user_override = True # User explicitly set preference
- state.angular_homing_offset_degrees = request.angular_homing_offset_degrees
- # Update auto-home settings if provided
- if request.auto_home_enabled is not None:
- state.auto_home_enabled = request.auto_home_enabled
- if request.auto_home_after_patterns is not None:
- if request.auto_home_after_patterns < 1:
- raise HTTPException(status_code=400, detail="Auto-home after patterns must be at least 1")
- state.auto_home_after_patterns = request.auto_home_after_patterns
- state.save()
- mode_name = "crash" if request.homing_mode == 0 else "sensor"
- logger.info(f"Homing mode set to {mode_name}, compass offset set to {request.angular_homing_offset_degrees}°")
- if request.auto_home_enabled is not None:
- logger.info(f"Auto-home enabled: {state.auto_home_enabled}, after {state.auto_home_after_patterns} patterns")
- return {"success": True, "message": "Homing configuration updated"}
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Error updating homing configuration: {str(e)}")
- raise HTTPException(status_code=500, detail=f"Failed to update homing configuration: {str(e)}")
- @app.get("/list_serial_ports")
- async def list_ports():
- logger.debug("Listing available serial ports")
- return await asyncio.to_thread(connection_manager.list_serial_ports)
- @app.post("/connect")
- async def connect(request: ConnectRequest):
- if not request.port:
- state.conn = connection_manager.WebSocketConnection('ws://fluidnc.local:81')
- if not connection_manager.device_init():
- raise HTTPException(status_code=500, detail="Failed to initialize device - could not get machine parameters")
- logger.info('Successfully connected to websocket ws://fluidnc.local:81')
- return {"success": True}
- try:
- state.conn = connection_manager.SerialConnection(request.port)
- if not connection_manager.device_init():
- raise HTTPException(status_code=500, detail="Failed to initialize device - could not get machine parameters")
- logger.info(f'Successfully connected to serial port {request.port}')
- return {"success": True}
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f'Failed to connect to serial port {request.port}: {str(e)}')
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/disconnect")
- async def disconnect():
- try:
- state.conn.close()
- logger.info('Successfully disconnected from serial port')
- return {"success": True}
- except Exception as e:
- logger.error(f'Failed to disconnect serial: {str(e)}')
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/restart_connection")
- async def restart(request: ConnectRequest):
- if not request.port:
- logger.warning("Restart serial request received without port")
- raise HTTPException(status_code=400, detail="No port provided")
- try:
- logger.info(f"Restarting connection on port {request.port}")
- connection_manager.restart_connection()
- return {"success": True}
- except Exception as e:
- logger.error(f"Failed to restart serial on port {request.port}: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- ###############################################################################
- # Debug Serial Terminal - Independent raw serial communication
- ###############################################################################
- # Store for debug serial connections (separate from main connection)
- _debug_serial_connections: dict = {}
- _debug_serial_lock: Optional[asyncio.Lock] = None
- def get_debug_serial_lock() -> asyncio.Lock:
- """Get or create the debug serial lock in the current event loop."""
- global _debug_serial_lock
- if _debug_serial_lock is None:
- _debug_serial_lock = asyncio.Lock()
- return _debug_serial_lock
- class DebugSerialRequest(BaseModel):
- port: str
- baudrate: int = 115200
- timeout: float = 2.0
- class DebugSerialCommand(BaseModel):
- port: str
- command: str
- timeout: float = 2.0
- @app.post("/api/debug-serial/open", tags=["debug-serial"])
- async def debug_serial_open(request: DebugSerialRequest):
- """Open a debug serial connection (independent of main connection)."""
- import serial
- async with get_debug_serial_lock():
- # Close existing connection on this port if any
- if request.port in _debug_serial_connections:
- try:
- _debug_serial_connections[request.port].close()
- except:
- pass
- del _debug_serial_connections[request.port]
- try:
- ser = serial.Serial(
- request.port,
- baudrate=request.baudrate,
- timeout=request.timeout
- )
- _debug_serial_connections[request.port] = ser
- logger.info(f"Debug serial opened on {request.port}")
- return {"success": True, "port": request.port, "baudrate": request.baudrate}
- except Exception as e:
- logger.error(f"Failed to open debug serial on {request.port}: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/debug-serial/close", tags=["debug-serial"])
- async def debug_serial_close(request: ConnectRequest):
- """Close a debug serial connection."""
- async with get_debug_serial_lock():
- if request.port not in _debug_serial_connections:
- return {"success": True, "message": "Port not open"}
- try:
- _debug_serial_connections[request.port].close()
- del _debug_serial_connections[request.port]
- logger.info(f"Debug serial closed on {request.port}")
- return {"success": True}
- except Exception as e:
- logger.error(f"Failed to close debug serial on {request.port}: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/debug-serial/send", tags=["debug-serial"])
- async def debug_serial_send(request: DebugSerialCommand):
- """Send a command and receive response on debug serial connection."""
- import serial
- async with get_debug_serial_lock():
- if request.port not in _debug_serial_connections:
- raise HTTPException(status_code=400, detail="Port not open. Open it first.")
- ser = _debug_serial_connections[request.port]
- try:
- # Clear input buffer
- ser.reset_input_buffer()
- # Send command with newline
- command = request.command.strip()
- if not command.endswith('\n'):
- command += '\n'
- await asyncio.to_thread(ser.write, command.encode())
- await asyncio.to_thread(ser.flush)
- # Read response with timeout - use read() for more reliable data capture
- responses = []
- start_time = time.time()
- buffer = ""
- # Small delay to let response arrive
- await asyncio.sleep(0.05)
- while time.time() - start_time < request.timeout:
- try:
- # Read all available bytes
- waiting = ser.in_waiting
- if waiting > 0:
- data = await asyncio.to_thread(ser.read, waiting)
- if data:
- buffer += data.decode('utf-8', errors='replace')
- # Process complete lines from buffer
- while '\n' in buffer:
- line, buffer = buffer.split('\n', 1)
- line = line.strip()
- if line:
- responses.append(line)
- # Check for ok/error to know command completed
- if line.lower() in ['ok', 'error'] or line.lower().startswith('error:'):
- # Give a tiny bit more time for any trailing data
- await asyncio.sleep(0.02)
- # Read any remaining data
- if ser.in_waiting > 0:
- extra = await asyncio.to_thread(ser.read, ser.in_waiting)
- if extra:
- for extra_line in extra.decode('utf-8', errors='replace').strip().split('\n'):
- if extra_line.strip():
- responses.append(extra_line.strip())
- break
- else:
- # No data waiting, small delay
- await asyncio.sleep(0.02)
- except Exception as read_error:
- logger.warning(f"Read error: {read_error}")
- break
- # Add any remaining buffer content
- if buffer.strip():
- responses.append(buffer.strip())
- return {
- "success": True,
- "command": request.command.strip(),
- "responses": responses,
- "raw": '\n'.join(responses)
- }
- except Exception as e:
- logger.error(f"Debug serial send error: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/debug-serial/status", tags=["debug-serial"])
- async def debug_serial_status():
- """Get status of all debug serial connections."""
- async with get_debug_serial_lock():
- status = {}
- for port, ser in _debug_serial_connections.items():
- try:
- status[port] = {
- "open": ser.is_open,
- "baudrate": ser.baudrate
- }
- except:
- status[port] = {"open": False}
- return {"connections": status}
- @app.get("/list_theta_rho_files")
- async def list_theta_rho_files():
- logger.debug("Listing theta-rho files")
- # Run the blocking file system operation in a thread pool
- files = await asyncio.to_thread(pattern_manager.list_theta_rho_files)
- return sorted(files)
- @app.get("/list_theta_rho_files_with_metadata")
- async def list_theta_rho_files_with_metadata():
- """Get list of theta-rho files with metadata for sorting and filtering.
-
- Optimized to process files asynchronously and support request cancellation.
- """
- from modules.core.cache_manager import get_pattern_metadata
- import asyncio
- from concurrent.futures import ThreadPoolExecutor
-
- # Run the blocking file listing in a thread
- files = await asyncio.to_thread(pattern_manager.list_theta_rho_files)
- files_with_metadata = []
- # Use ThreadPoolExecutor for I/O-bound operations
- executor = ThreadPoolExecutor(max_workers=4)
-
- def process_file(file_path):
- """Process a single file and return its metadata."""
- try:
- full_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_path)
-
- # Get file stats
- file_stat = os.stat(full_path)
-
- # Get cached metadata (this should be fast if cached)
- metadata = get_pattern_metadata(file_path)
-
- # Extract full folder path from file path
- path_parts = file_path.split('/')
- if len(path_parts) > 1:
- # Get everything except the filename (join all folder parts)
- category = '/'.join(path_parts[:-1])
- else:
- category = 'root'
-
- # Get file name without extension
- file_name = os.path.splitext(os.path.basename(file_path))[0]
-
- # Use modification time (mtime) for "date modified"
- date_modified = file_stat.st_mtime
-
- return {
- 'path': file_path,
- 'name': file_name,
- 'category': category,
- 'date_modified': date_modified,
- 'coordinates_count': metadata.get('total_coordinates', 0) if metadata else 0
- }
-
- except Exception as e:
- logger.warning(f"Error getting metadata for {file_path}: {str(e)}")
- # Include file with minimal info if metadata fails
- path_parts = file_path.split('/')
- if len(path_parts) > 1:
- category = '/'.join(path_parts[:-1])
- else:
- category = 'root'
- return {
- 'path': file_path,
- 'name': os.path.splitext(os.path.basename(file_path))[0],
- 'category': category,
- 'date_modified': 0,
- 'coordinates_count': 0
- }
-
- # Load the entire metadata cache at once (async)
- # This is much faster than 1000+ individual metadata lookups
- try:
- import json
- metadata_cache_path = "metadata_cache.json"
- # Use async file reading to avoid blocking the event loop
- cache_data = await asyncio.to_thread(lambda: json.load(open(metadata_cache_path, 'r')))
- cache_dict = cache_data.get('data', {})
- logger.debug(f"Loaded metadata cache with {len(cache_dict)} entries")
- # Process all files using cached data only
- for file_path in files:
- try:
- # Extract category from path
- path_parts = file_path.split('/')
- category = '/'.join(path_parts[:-1]) if len(path_parts) > 1 else 'root'
- # Get file name without extension
- file_name = os.path.splitext(os.path.basename(file_path))[0]
- # Get metadata from cache
- cached_entry = cache_dict.get(file_path, {})
- if isinstance(cached_entry, dict) and 'metadata' in cached_entry:
- metadata = cached_entry['metadata']
- coords_count = metadata.get('total_coordinates', 0)
- date_modified = cached_entry.get('mtime', 0)
- else:
- coords_count = 0
- date_modified = 0
- files_with_metadata.append({
- 'path': file_path,
- 'name': file_name,
- 'category': category,
- 'date_modified': date_modified,
- 'coordinates_count': coords_count
- })
- except Exception as e:
- logger.warning(f"Error processing {file_path}: {e}")
- # Include file with minimal info if processing fails
- path_parts = file_path.split('/')
- category = '/'.join(path_parts[:-1]) if len(path_parts) > 1 else 'root'
- files_with_metadata.append({
- 'path': file_path,
- 'name': os.path.splitext(os.path.basename(file_path))[0],
- 'category': category,
- 'date_modified': 0,
- 'coordinates_count': 0
- })
- except Exception as e:
- logger.error(f"Failed to load metadata cache, falling back to slow method: {e}")
- # Fallback to original method if cache loading fails
- # Create tasks only when needed
- loop = asyncio.get_running_loop()
- tasks = [loop.run_in_executor(executor, process_file, file_path) for file_path in files]
- for task in asyncio.as_completed(tasks):
- try:
- result = await task
- files_with_metadata.append(result)
- except Exception as task_error:
- logger.error(f"Error processing file: {str(task_error)}")
- # Clean up executor
- executor.shutdown(wait=False)
- return files_with_metadata
- @app.post("/upload_theta_rho")
- async def upload_theta_rho(file: UploadFile = File(...)):
- """Upload a theta-rho file."""
- try:
- # Save the file
- # Ensure custom_patterns directory exists
- custom_patterns_dir = os.path.join(pattern_manager.THETA_RHO_DIR, "custom_patterns")
- os.makedirs(custom_patterns_dir, exist_ok=True)
-
- # Use forward slashes for internal path representation to maintain consistency
- file_path_in_patterns_dir = f"custom_patterns/{file.filename}"
- full_file_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_path_in_patterns_dir)
-
- # Save the uploaded file with proper encoding for Windows compatibility
- file_content = await file.read()
- try:
- # First try to decode as UTF-8 and re-encode to ensure proper encoding
- text_content = file_content.decode('utf-8')
- with open(full_file_path, "w", encoding='utf-8') as f:
- f.write(text_content)
- except UnicodeDecodeError:
- # If UTF-8 decoding fails, save as binary (fallback)
- with open(full_file_path, "wb") as f:
- f.write(file_content)
-
- logger.info(f"File {file.filename} saved successfully")
-
- # Generate image preview for the new file with retry logic
- max_retries = 3
- for attempt in range(max_retries):
- try:
- logger.info(f"Generating preview for {file_path_in_patterns_dir} (attempt {attempt + 1}/{max_retries})")
- success = await generate_image_preview(file_path_in_patterns_dir)
- if success:
- logger.info(f"Preview generated successfully for {file_path_in_patterns_dir}")
- break
- else:
- logger.warning(f"Preview generation failed for {file_path_in_patterns_dir} (attempt {attempt + 1})")
- if attempt < max_retries - 1:
- await asyncio.sleep(0.5) # Small delay before retry
- except Exception as e:
- logger.error(f"Error generating preview for {file_path_in_patterns_dir} (attempt {attempt + 1}): {str(e)}")
- if attempt < max_retries - 1:
- await asyncio.sleep(0.5) # Small delay before retry
-
- return {"success": True, "message": f"File {file.filename} uploaded successfully"}
- except Exception as e:
- logger.error(f"Error uploading file: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/get_theta_rho_coordinates")
- async def get_theta_rho_coordinates(request: GetCoordinatesRequest):
- """Get theta-rho coordinates for animated preview."""
- try:
- # Normalize file path for cross-platform compatibility and remove prefixes
- file_name = normalize_file_path(request.file_name)
- file_path = os.path.join(THETA_RHO_DIR, file_name)
- # Check if we can use cached coordinates (already loaded for current playback)
- # This avoids re-parsing large files (2MB+) which can cause issues on Pi Zero 2W
- current_file = state.current_playing_file
- if current_file and state._current_coordinates:
- # Normalize current file path for comparison
- current_normalized = normalize_file_path(current_file)
- if current_normalized == file_name:
- logger.debug(f"Using cached coordinates for {file_name}")
- return {
- "success": True,
- "coordinates": state._current_coordinates,
- "total_points": len(state._current_coordinates)
- }
- # Check file existence asynchronously
- exists = await asyncio.to_thread(os.path.exists, file_path)
- if not exists:
- raise HTTPException(status_code=404, detail=f"File {file_name} not found")
- # Parse the theta-rho file in a thread (not process) to avoid memory pressure
- # on resource-constrained devices like Pi Zero 2W
- coordinates = await asyncio.to_thread(parse_theta_rho_file, file_path)
- if not coordinates:
- raise HTTPException(status_code=400, detail="No valid coordinates found in file")
- return {
- "success": True,
- "coordinates": coordinates,
- "total_points": len(coordinates)
- }
- except Exception as e:
- logger.error(f"Error getting coordinates for {request.file_name}: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/run_theta_rho")
- async def run_theta_rho(request: ThetaRhoRequest, background_tasks: BackgroundTasks):
- if not request.file_name:
- logger.warning('Run theta-rho request received without file name')
- raise HTTPException(status_code=400, detail="No file name provided")
-
- file_path = None
- if 'clear' in request.file_name:
- logger.info(f'Clear pattern file: {request.file_name.split(".")[0]}')
- file_path = pattern_manager.get_clear_pattern_file(request.file_name.split('.')[0])
- logger.info(f'Clear pattern file: {file_path}')
- if not file_path:
- # Normalize file path for cross-platform compatibility
- normalized_file_name = normalize_file_path(request.file_name)
- file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
- if not os.path.exists(file_path):
- logger.error(f'Theta-rho file not found: {file_path}')
- raise HTTPException(status_code=404, detail="File not found")
- try:
- if not (state.conn.is_connected() if state.conn else False):
- logger.warning("Attempted to run a pattern without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- check_homing_in_progress()
- if pattern_manager.get_pattern_lock().locked():
- logger.info("Another pattern is running, stopping it first...")
- await pattern_manager.stop_actions()
-
- files_to_run = [file_path]
- logger.info(f'Running theta-rho file: {request.file_name} with pre_execution={request.pre_execution}')
-
- # Only include clear_pattern if it's not "none"
- kwargs = {}
- if request.pre_execution != "none":
- kwargs['clear_pattern'] = request.pre_execution
-
- # Pass arguments properly
- background_tasks.add_task(
- pattern_manager.run_theta_rho_files,
- files_to_run, # First positional argument
- **kwargs # Spread keyword arguments
- )
- return {"success": True}
- except HTTPException as http_exc:
- logger.error(f'Failed to run theta-rho file {request.file_name}: {http_exc.detail}')
- raise http_exc
- except Exception as e:
- logger.error(f'Failed to run theta-rho file {request.file_name}: {str(e)}')
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/stop_execution")
- async def stop_execution():
- if not (state.conn.is_connected() if state.conn else False):
- logger.warning("Attempted to stop without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- success = await pattern_manager.stop_actions()
- if not success:
- raise HTTPException(status_code=500, detail="Stop timed out - use force_stop")
- return {"success": True}
- @app.post("/force_stop")
- async def force_stop():
- """Force stop all pattern execution and clear all state. Use when normal stop doesn't work."""
- logger.info("Force stop requested - clearing all pattern state")
- # Set stop flag first
- state.stop_requested = True
- state.pause_requested = False
- # Clear all pattern-related state
- state.current_playing_file = None
- state.execution_progress = None
- state.is_running = False
- state.is_clearing = False
- state.is_homing = False
- state.current_playlist = None
- state.current_playlist_index = None
- state.playlist_mode = None
- state.pause_time_remaining = 0
- # Wake up any waiting tasks
- try:
- pattern_manager.get_pause_event().set()
- except:
- pass
- # Stop motion controller and clear its queue
- if pattern_manager.motion_controller.running:
- pattern_manager.motion_controller.command_queue.put(
- pattern_manager.MotionCommand('stop')
- )
- # Force release pattern lock by recreating it
- pattern_manager.pattern_lock = None # Will be recreated on next use
- logger.info("Force stop completed - all pattern state cleared")
- return {"success": True, "message": "Force stop completed"}
- @app.post("/soft_reset")
- async def soft_reset():
- """Send $Bye soft reset to FluidNC controller. Resets position counters to 0."""
- if not (state.conn and state.conn.is_connected()):
- logger.warning("Attempted to soft reset without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- try:
- # Stop any running patterns first
- await pattern_manager.stop_actions()
- # Use the shared soft reset function
- await connection_manager.perform_soft_reset()
- return {"success": True, "message": "Soft reset sent. Position reset to 0."}
- except Exception as e:
- logger.error(f"Error sending soft reset: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/controller_restart")
- async def controller_restart():
- """Send $System/Control=RESTART to restart the FluidNC controller."""
- if not (state.conn and state.conn.is_connected()):
- logger.warning("Attempted to restart controller without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- try:
- # Stop any running patterns first
- await pattern_manager.stop_actions()
- # Send the FluidNC restart command
- from modules.connection.connection_manager import SerialConnection
- restart_cmd = "$System/Control=RESTART\n"
- if isinstance(state.conn, SerialConnection) and state.conn.ser:
- state.conn.ser.write(restart_cmd.encode())
- state.conn.ser.flush()
- logger.info(f"Controller restart command sent via serial to {state.port}")
- else:
- state.conn.send(restart_cmd)
- logger.info("Controller restart command sent via connection abstraction")
- # Mark as needing homing since position is now unknown
- state.is_homed = False
- return {"success": True, "message": "Controller restart command sent. Homing required."}
- except Exception as e:
- logger.error(f"Error sending controller restart: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/send_home")
- async def send_home():
- try:
- if not (state.conn.is_connected() if state.conn else False):
- logger.warning("Attempted to move to home without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- if state.is_homing:
- raise HTTPException(status_code=409, detail="Homing already in progress")
- # Set homing flag to block other movement operations
- state.is_homing = True
- logger.info("Homing started - blocking other movement operations")
- try:
- # Run homing with 15 second timeout
- success = await asyncio.to_thread(connection_manager.home)
- if not success:
- logger.error("Homing failed or timed out")
- raise HTTPException(status_code=500, detail="Homing failed or timed out after 15 seconds")
- return {"success": True}
- finally:
- # Always clear homing flag when done (success or failure)
- state.is_homing = False
- logger.info("Homing completed - movement operations unblocked")
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Failed to send home command: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- class SensorHomingRecoveryRequest(BaseModel):
- switch_to_crash_homing: bool = False
- @app.post("/recover_sensor_homing")
- async def recover_sensor_homing(request: SensorHomingRecoveryRequest):
- """
- Recover from sensor homing failure.
- If switch_to_crash_homing is True, changes homing mode to crash homing (mode 0)
- and saves the setting. Then attempts to reconnect and home the device.
- If switch_to_crash_homing is False, just clears the failure flag and retries
- with sensor homing.
- """
- try:
- # Clear the sensor homing failure flag first
- state.sensor_homing_failed = False
- if request.switch_to_crash_homing:
- # Switch to crash homing mode
- logger.info("Switching to crash homing mode per user request")
- state.homing = 0
- state.homing_user_override = True
- state.save()
- # If already connected, just perform homing
- if state.conn and state.conn.is_connected():
- logger.info("Device already connected, performing homing...")
- state.is_homing = True
- try:
- success = await asyncio.to_thread(connection_manager.home)
- if not success:
- # Check if sensor homing failed again
- if state.sensor_homing_failed:
- return {
- "success": False,
- "sensor_homing_failed": True,
- "message": "Sensor homing failed again. Please check sensor position or switch to crash homing."
- }
- return {"success": False, "message": "Homing failed"}
- return {"success": True, "message": "Homing completed successfully"}
- finally:
- state.is_homing = False
- else:
- # Need to reconnect
- logger.info("Reconnecting device and performing homing...")
- state.is_homing = True
- try:
- # connect_device includes homing
- await asyncio.to_thread(connection_manager.connect_device, True)
- # Check if sensor homing failed during connection
- if state.sensor_homing_failed:
- return {
- "success": False,
- "sensor_homing_failed": True,
- "message": "Sensor homing failed. Please check sensor position or switch to crash homing."
- }
- if state.conn and state.conn.is_connected():
- return {"success": True, "message": "Connected and homed successfully"}
- else:
- return {"success": False, "message": "Failed to establish connection"}
- finally:
- state.is_homing = False
- except Exception as e:
- logger.error(f"Error during sensor homing recovery: {e}")
- state.is_homing = False
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/run_theta_rho_file/{file_name}")
- async def run_specific_theta_rho_file(file_name: str):
- file_path = os.path.join(pattern_manager.THETA_RHO_DIR, file_name)
- if not os.path.exists(file_path):
- raise HTTPException(status_code=404, detail="File not found")
- if not (state.conn.is_connected() if state.conn else False):
- logger.warning("Attempted to run a pattern without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- check_homing_in_progress()
- pattern_manager.run_theta_rho_file(file_path)
- return {"success": True}
- class DeleteFileRequest(BaseModel):
- file_name: str
- @app.post("/delete_theta_rho_file")
- async def delete_theta_rho_file(request: DeleteFileRequest):
- if not request.file_name:
- logger.warning("Delete theta-rho file request received without filename")
- raise HTTPException(status_code=400, detail="No file name provided")
- # Normalize file path for cross-platform compatibility
- normalized_file_name = normalize_file_path(request.file_name)
- file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
- # Check file existence asynchronously
- exists = await asyncio.to_thread(os.path.exists, file_path)
- if not exists:
- logger.error(f"Attempted to delete non-existent file: {file_path}")
- raise HTTPException(status_code=404, detail="File not found")
- try:
- # Delete the pattern file asynchronously
- await asyncio.to_thread(os.remove, file_path)
- logger.info(f"Successfully deleted theta-rho file: {request.file_name}")
-
- # Clean up cached preview image and metadata asynchronously
- from modules.core.cache_manager import delete_pattern_cache
- cache_cleanup_success = await asyncio.to_thread(delete_pattern_cache, normalized_file_name)
- if cache_cleanup_success:
- logger.info(f"Successfully cleaned up cache for {request.file_name}")
- else:
- logger.warning(f"Cache cleanup failed for {request.file_name}, but pattern was deleted")
-
- return {"success": True, "cache_cleanup": cache_cleanup_success}
- except Exception as e:
- logger.error(f"Failed to delete theta-rho file {request.file_name}: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/move_to_center")
- async def move_to_center():
- try:
- if not (state.conn.is_connected() if state.conn else False):
- logger.warning("Attempted to move to center without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- check_homing_in_progress()
- # Clear stop_requested to ensure manual move works after pattern stop
- state.stop_requested = False
- logger.info("Moving device to center position")
- await pattern_manager.reset_theta()
- await pattern_manager.move_polar(0, 0)
- # Wait for machine to reach idle before returning
- idle = await connection_manager.check_idle_async(timeout=60)
- if not idle:
- logger.warning("Machine did not reach idle after move to center")
- return {"success": True}
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Failed to move to center: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/move_to_perimeter")
- async def move_to_perimeter():
- try:
- if not (state.conn.is_connected() if state.conn else False):
- logger.warning("Attempted to move to perimeter without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- check_homing_in_progress()
- # Clear stop_requested to ensure manual move works after pattern stop
- state.stop_requested = False
- logger.info("Moving device to perimeter position")
- await pattern_manager.reset_theta()
- await pattern_manager.move_polar(0, 1)
- # Wait for machine to reach idle before returning
- idle = await connection_manager.check_idle_async(timeout=60)
- if not idle:
- logger.warning("Machine did not reach idle after move to perimeter")
- return {"success": True}
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Failed to move to perimeter: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/preview_thr")
- async def preview_thr(request: DeleteFileRequest):
- if not request.file_name:
- logger.warning("Preview theta-rho request received without filename")
- raise HTTPException(status_code=400, detail="No file name provided")
- # Normalize file path for cross-platform compatibility
- normalized_file_name = normalize_file_path(request.file_name)
- # Construct the full path to the pattern file to check existence
- pattern_file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
- # Check file existence asynchronously
- exists = await asyncio.to_thread(os.path.exists, pattern_file_path)
- if not exists:
- logger.error(f"Attempted to preview non-existent pattern file: {pattern_file_path}")
- raise HTTPException(status_code=404, detail="Pattern file not found")
- try:
- cache_path = get_cache_path(normalized_file_name)
- # Check cache existence asynchronously
- cache_exists = await asyncio.to_thread(os.path.exists, cache_path)
- if not cache_exists:
- logger.info(f"Cache miss for {request.file_name}. Generating preview...")
- # Attempt to generate the preview if it's missing
- success = await generate_image_preview(normalized_file_name)
- cache_exists_after = await asyncio.to_thread(os.path.exists, cache_path)
- if not success or not cache_exists_after:
- logger.error(f"Failed to generate or find preview for {request.file_name} after attempting generation.")
- raise HTTPException(status_code=500, detail="Failed to generate preview image.")
- # Try to get coordinates from metadata cache first
- metadata = get_pattern_metadata(normalized_file_name)
- if metadata:
- first_coord_obj = metadata.get('first_coordinate')
- last_coord_obj = metadata.get('last_coordinate')
- else:
- # Fallback to parsing file if metadata not cached (shouldn't happen after initial cache)
- logger.debug(f"Metadata cache miss for {request.file_name}, parsing file")
- coordinates = await asyncio.to_thread(parse_theta_rho_file, pattern_file_path)
- first_coord = coordinates[0] if coordinates else None
- last_coord = coordinates[-1] if coordinates else None
-
- # Format coordinates as objects with x and y properties
- first_coord_obj = {"x": first_coord[0], "y": first_coord[1]} if first_coord else None
- last_coord_obj = {"x": last_coord[0], "y": last_coord[1]} if last_coord else None
- # Return JSON with preview URL and coordinates
- # URL encode the file_name for the preview URL
- # Handle both forward slashes and backslashes for cross-platform compatibility
- encoded_filename = normalized_file_name.replace('\\', '--').replace('/', '--')
- return {
- "preview_url": f"/preview/{encoded_filename}",
- "first_coordinate": first_coord_obj,
- "last_coordinate": last_coord_obj
- }
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Failed to generate or serve preview for {request.file_name}: {str(e)}")
- raise HTTPException(status_code=500, detail=f"Failed to serve preview image: {str(e)}")
- @app.get("/api/pattern_history/{pattern_name:path}")
- async def get_pattern_history(pattern_name: str):
- """Get the most recent execution history for a pattern.
- Returns the last completed execution time and speed for the given pattern.
- """
- from modules.core.pattern_manager import get_pattern_execution_history
- # Get just the filename if a full path was provided
- filename = os.path.basename(pattern_name)
- if not filename.endswith('.thr'):
- filename = f"{filename}.thr"
- history = get_pattern_execution_history(filename)
- if history:
- return history
- return {"actual_time_seconds": None, "actual_time_formatted": None, "speed": None, "timestamp": None}
- @app.get("/api/pattern_history_all")
- async def get_all_pattern_history():
- """Get execution history for all patterns in a single request.
- Returns a dict mapping pattern names to their most recent execution history.
- """
- from modules.core.pattern_manager import EXECUTION_LOG_FILE
- import json
- if not os.path.exists(EXECUTION_LOG_FILE):
- return {}
- try:
- history_map = {}
- with open(EXECUTION_LOG_FILE, 'r') as f:
- for line in f:
- line = line.strip()
- if not line:
- continue
- try:
- entry = json.loads(line)
- # Only consider fully completed patterns
- if entry.get('completed', False):
- pattern_name = entry.get('pattern_name')
- if pattern_name:
- # Keep the most recent match (last one in file wins)
- history_map[pattern_name] = {
- "actual_time_seconds": entry.get('actual_time_seconds'),
- "actual_time_formatted": entry.get('actual_time_formatted'),
- "speed": entry.get('speed'),
- "timestamp": entry.get('timestamp')
- }
- except json.JSONDecodeError:
- continue
- return history_map
- except Exception as e:
- logger.error(f"Failed to read execution time log: {e}")
- return {}
- @app.get("/preview/{encoded_filename}")
- async def serve_preview(encoded_filename: str):
- """Serve a preview image for a pattern file."""
- # Decode the filename by replacing -- with the original path separators
- # First try forward slash (most common case), then backslash if needed
- file_name = encoded_filename.replace('--', '/')
-
- # Apply normalization to handle any remaining path prefixes
- file_name = normalize_file_path(file_name)
-
- # Check if the decoded path exists, if not try backslash decoding
- cache_path = get_cache_path(file_name)
- if not os.path.exists(cache_path):
- # Try with backslash for Windows paths
- file_name_backslash = encoded_filename.replace('--', '\\')
- file_name_backslash = normalize_file_path(file_name_backslash)
- cache_path_backslash = get_cache_path(file_name_backslash)
- if os.path.exists(cache_path_backslash):
- file_name = file_name_backslash
- cache_path = cache_path_backslash
- # cache_path is already determined above in the decoding logic
- if not os.path.exists(cache_path):
- logger.error(f"Preview image not found for {file_name}")
- raise HTTPException(status_code=404, detail="Preview image not found")
-
- # Add caching headers
- headers = {
- "Cache-Control": "public, max-age=31536000", # Cache for 1 year
- "Content-Type": "image/webp",
- "Accept-Ranges": "bytes"
- }
-
- return FileResponse(
- cache_path,
- media_type="image/webp",
- headers=headers
- )
- @app.post("/send_coordinate")
- async def send_coordinate(request: CoordinateRequest):
- if not (state.conn.is_connected() if state.conn else False):
- logger.warning("Attempted to send coordinate without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- check_homing_in_progress()
- # Clear stop_requested to ensure manual move works after pattern stop
- state.stop_requested = False
- try:
- logger.debug(f"Sending coordinate: theta={request.theta}, rho={request.rho}")
- await pattern_manager.move_polar(request.theta, request.rho)
- # Wait for machine to reach idle before returning
- idle = await connection_manager.check_idle_async(timeout=60)
- if not idle:
- logger.warning("Machine did not reach idle after send_coordinate")
- return {"success": True}
- except Exception as e:
- logger.error(f"Failed to send coordinate: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/download/{filename}")
- async def download_file(filename: str):
- return FileResponse(
- os.path.join(pattern_manager.THETA_RHO_DIR, filename),
- filename=filename
- )
- @app.get("/serial_status")
- async def serial_status():
- connected = state.conn.is_connected() if state.conn else False
- port = state.port
- logger.debug(f"Serial status check - connected: {connected}, port: {port}")
- return {
- "connected": connected,
- "port": port,
- "preferred_port": state.preferred_port
- }
- @app.get("/api/preferred-port", deprecated=True, tags=["settings-deprecated"])
- async def get_preferred_port():
- """Get the currently configured preferred port for auto-connect."""
- return {
- "preferred_port": state.preferred_port
- }
- @app.post("/api/preferred-port", deprecated=True, tags=["settings-deprecated"])
- async def set_preferred_port(request: Request):
- """Set the preferred port for auto-connect."""
- data = await request.json()
- preferred_port = data.get("preferred_port")
- # Allow setting to None to clear the preference
- if preferred_port == "" or preferred_port == "none":
- preferred_port = None
- state.preferred_port = preferred_port
- state.save()
- logger.info(f"Preferred port set to: {preferred_port}")
- return {
- "success": True,
- "preferred_port": state.preferred_port
- }
- @app.post("/pause_execution")
- async def pause_execution():
- if pattern_manager.pause_execution():
- return {"success": True, "message": "Execution paused"}
- raise HTTPException(status_code=500, detail="Failed to pause execution")
- @app.post("/resume_execution")
- async def resume_execution():
- if pattern_manager.resume_execution():
- return {"success": True, "message": "Execution resumed"}
- raise HTTPException(status_code=500, detail="Failed to resume execution")
- # Playlist endpoints
- @app.get("/list_all_playlists")
- async def list_all_playlists():
- playlist_names = playlist_manager.list_all_playlists()
- return playlist_names
- @app.get("/get_playlist")
- async def get_playlist(name: str):
- if not name:
- raise HTTPException(status_code=400, detail="Missing playlist name parameter")
- playlist = playlist_manager.get_playlist(name)
- if not playlist:
- # Auto-create empty playlist if not found
- logger.info(f"Playlist '{name}' not found, creating empty playlist")
- playlist_manager.create_playlist(name, [])
- playlist = {"name": name, "files": []}
- return playlist
- @app.post("/create_playlist")
- async def create_playlist(request: PlaylistRequest):
- success = playlist_manager.create_playlist(request.playlist_name, request.files)
- return {
- "success": success,
- "message": f"Playlist '{request.playlist_name}' created/updated"
- }
- @app.post("/modify_playlist")
- async def modify_playlist(request: PlaylistRequest):
- success = playlist_manager.modify_playlist(request.playlist_name, request.files)
- return {
- "success": success,
- "message": f"Playlist '{request.playlist_name}' updated"
- }
- @app.delete("/delete_playlist")
- async def delete_playlist(request: DeletePlaylistRequest):
- success = playlist_manager.delete_playlist(request.playlist_name)
- if not success:
- raise HTTPException(
- status_code=404,
- detail=f"Playlist '{request.playlist_name}' not found"
- )
- return {
- "success": True,
- "message": f"Playlist '{request.playlist_name}' deleted"
- }
- @app.post("/rename_playlist")
- async def rename_playlist(request: RenamePlaylistRequest):
- """Rename an existing playlist."""
- success, message = playlist_manager.rename_playlist(request.old_name, request.new_name)
- if not success:
- raise HTTPException(
- status_code=400,
- detail=message
- )
- return {
- "success": True,
- "message": message,
- "new_name": request.new_name
- }
- class AddToPlaylistRequest(BaseModel):
- playlist_name: str
- pattern: str
- @app.post("/add_to_playlist")
- async def add_to_playlist(request: AddToPlaylistRequest):
- success = playlist_manager.add_to_playlist(request.playlist_name, request.pattern)
- if not success:
- raise HTTPException(status_code=404, detail="Playlist not found")
- return {"success": True}
- @app.post("/run_playlist")
- async def run_playlist_endpoint(request: PlaylistRequest):
- """Run a playlist with specified parameters."""
- try:
- if not (state.conn.is_connected() if state.conn else False):
- logger.warning("Attempted to run a playlist without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- check_homing_in_progress()
- if not os.path.exists(playlist_manager.PLAYLISTS_FILE):
- raise HTTPException(status_code=404, detail=f"Playlist '{request.playlist_name}' not found")
- # Start the playlist execution
- success, message = await playlist_manager.run_playlist(
- request.playlist_name,
- pause_time=request.pause_time,
- clear_pattern=request.clear_pattern,
- run_mode=request.run_mode,
- shuffle=request.shuffle
- )
- if not success:
- raise HTTPException(status_code=409, detail=message)
- return {"message": f"Started playlist: {request.playlist_name}"}
- except Exception as e:
- logger.error(f"Error running playlist: {e}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/set_speed")
- async def set_speed(request: SpeedRequest):
- try:
- if not (state.conn.is_connected() if state.conn else False):
- logger.warning("Attempted to change speed without a connection")
- raise HTTPException(status_code=400, detail="Connection not established")
- if request.speed <= 0:
- logger.warning(f"Invalid speed value received: {request.speed}")
- raise HTTPException(status_code=400, detail="Invalid speed value")
- state.speed = request.speed
- return {"success": True, "speed": request.speed}
- except HTTPException:
- raise # Re-raise HTTPException as-is
- except Exception as e:
- logger.error(f"Failed to set speed: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/check_software_update")
- async def check_updates():
- update_info = update_manager.check_git_updates()
- return update_info
- @app.post("/update_software")
- async def update_software():
- logger.info("Starting software update process")
- success, error_message, error_log = update_manager.update_software()
-
- if success:
- logger.info("Software update completed successfully")
- return {"success": True}
- else:
- logger.error(f"Software update failed: {error_message}\nDetails: {error_log}")
- raise HTTPException(
- status_code=500,
- detail={
- "error": error_message,
- "details": error_log
- }
- )
- @app.post("/set_wled_ip")
- async def set_wled_ip(request: WLEDRequest):
- """Legacy endpoint for backward compatibility - sets WLED as LED provider"""
- state.wled_ip = request.wled_ip
- state.led_provider = "wled" if request.wled_ip else "none"
- state.led_controller = LEDInterface("wled", request.wled_ip) if request.wled_ip else None
- if state.led_controller:
- state.led_controller.effect_idle()
- _start_idle_led_timeout()
- state.save()
- logger.info(f"WLED IP updated: {request.wled_ip}")
- return {"success": True, "wled_ip": state.wled_ip}
- @app.get("/get_wled_ip")
- async def get_wled_ip():
- """Legacy endpoint for backward compatibility"""
- if not state.wled_ip:
- raise HTTPException(status_code=404, detail="No WLED IP set")
- return {"success": True, "wled_ip": state.wled_ip}
- @app.post("/set_led_config", deprecated=True, tags=["settings-deprecated"])
- async def set_led_config(request: LEDConfigRequest):
- """DEPRECATED: Use PATCH /api/settings instead. Configure LED provider (WLED, DW LEDs, or none)"""
- if request.provider not in ["wled", "dw_leds", "none"]:
- raise HTTPException(status_code=400, detail="Invalid provider. Must be 'wled', 'dw_leds', or 'none'")
- state.led_provider = request.provider
- if request.provider == "wled":
- if not request.ip_address:
- raise HTTPException(status_code=400, detail="IP address required for WLED")
- state.wled_ip = request.ip_address
- state.led_controller = LEDInterface("wled", request.ip_address)
- logger.info(f"LED provider set to WLED at {request.ip_address}")
- elif request.provider == "dw_leds":
- # Check if hardware settings changed (requires restart)
- old_gpio_pin = state.dw_led_gpio_pin
- old_pixel_order = state.dw_led_pixel_order
- hardware_changed = (
- old_gpio_pin != (request.gpio_pin or 18) or
- old_pixel_order != (request.pixel_order or "RGB")
- )
- # Stop existing DW LED controller if hardware settings changed
- if hardware_changed and state.led_controller and state.led_provider == "dw_leds":
- logger.info("Hardware settings changed, stopping existing LED controller...")
- controller = state.led_controller.get_controller()
- if controller and hasattr(controller, 'stop'):
- try:
- controller.stop()
- logger.info("LED controller stopped successfully")
- except Exception as e:
- logger.error(f"Error stopping LED controller: {e}")
- # Clear the reference and give hardware time to release
- state.led_controller = None
- await asyncio.sleep(0.5)
- state.dw_led_num_leds = request.num_leds or 60
- state.dw_led_gpio_pin = request.gpio_pin or 18
- state.dw_led_pixel_order = request.pixel_order or "RGB"
- state.dw_led_brightness = request.brightness or 35
- state.wled_ip = None
- # Create new LED controller with updated settings
- state.led_controller = LEDInterface(
- "dw_leds",
- num_leds=state.dw_led_num_leds,
- gpio_pin=state.dw_led_gpio_pin,
- pixel_order=state.dw_led_pixel_order,
- brightness=state.dw_led_brightness / 100.0,
- speed=state.dw_led_speed,
- intensity=state.dw_led_intensity
- )
- restart_msg = " (restarted)" if hardware_changed else ""
- logger.info(f"DW LEDs configured{restart_msg}: {state.dw_led_num_leds} LEDs on GPIO{state.dw_led_gpio_pin}, pixel order: {state.dw_led_pixel_order}")
- # Check if initialization succeeded by checking status
- status = state.led_controller.check_status()
- if not status.get("connected", False) and status.get("error"):
- error_msg = status["error"]
- logger.warning(f"DW LED initialization failed: {error_msg}, but configuration saved for testing")
- state.led_controller = None
- # Keep the provider setting for testing purposes
- # state.led_provider remains "dw_leds" so settings can be saved/tested
- # Save state even with error
- state.save()
- # Return success with warning instead of error
- return {
- "success": True,
- "warning": error_msg,
- "hardware_available": False,
- "provider": state.led_provider,
- "dw_led_num_leds": state.dw_led_num_leds,
- "dw_led_gpio_pin": state.dw_led_gpio_pin,
- "dw_led_pixel_order": state.dw_led_pixel_order,
- "dw_led_brightness": state.dw_led_brightness
- }
- else: # none
- state.wled_ip = None
- state.led_controller = None
- logger.info("LED provider disabled")
- # Show idle effect if controller is configured
- if state.led_controller:
- state.led_controller.effect_idle()
- _start_idle_led_timeout()
- state.save()
- return {
- "success": True,
- "provider": state.led_provider,
- "wled_ip": state.wled_ip,
- "dw_led_num_leds": state.dw_led_num_leds,
- "dw_led_gpio_pin": state.dw_led_gpio_pin,
- "dw_led_brightness": state.dw_led_brightness
- }
- @app.get("/get_led_config", deprecated=True, tags=["settings-deprecated"])
- async def get_led_config():
- """DEPRECATED: Use GET /api/settings instead. Get current LED provider configuration"""
- # Auto-detect provider for backward compatibility with existing installations
- provider = state.led_provider
- if not provider or provider == "none":
- # If no provider set but we have IPs configured, auto-detect
- if state.wled_ip:
- provider = "wled"
- state.led_provider = "wled"
- state.save()
- logger.info("Auto-detected WLED provider from existing configuration")
- else:
- provider = "none"
- return {
- "success": True,
- "provider": provider,
- "wled_ip": state.wled_ip,
- "dw_led_num_leds": state.dw_led_num_leds,
- "dw_led_gpio_pin": state.dw_led_gpio_pin,
- "dw_led_pixel_order": state.dw_led_pixel_order,
- "dw_led_brightness": state.dw_led_brightness,
- "dw_led_idle_effect": state.dw_led_idle_effect,
- "dw_led_playing_effect": state.dw_led_playing_effect
- }
- @app.post("/skip_pattern")
- async def skip_pattern():
- if not state.current_playlist:
- raise HTTPException(status_code=400, detail="No playlist is currently running")
- state.skip_requested = True
- # If the playlist task isn't running (e.g., cancelled by TestClient),
- # proactively advance state. Otherwise, let the running task handle it
- # to avoid race conditions with the task's index management.
- from modules.core import playlist_manager
- task = playlist_manager._current_playlist_task
- task_not_running = task is None or task.done()
- if task_not_running and state.current_playlist_index is not None:
- next_index = state.current_playlist_index + 1
- if next_index < len(state.current_playlist):
- state.current_playlist_index = next_index
- state.current_playing_file = state.current_playlist[next_index]
- return {"success": True}
- @app.post("/reorder_playlist")
- async def reorder_playlist(request: dict):
- """Reorder a pattern in the current playlist queue.
- Since the playlist now contains only main patterns (clear patterns are executed
- dynamically at runtime), this simply moves the pattern from one position to another.
- """
- if not state.current_playlist:
- raise HTTPException(status_code=400, detail="No playlist is currently running")
- from_index = request.get("from_index")
- to_index = request.get("to_index")
- if from_index is None or to_index is None:
- raise HTTPException(status_code=400, detail="from_index and to_index are required")
- playlist = list(state.current_playlist) # Make a copy to work with
- current_index = state.current_playlist_index
- # Validate indices
- if from_index < 0 or from_index >= len(playlist):
- raise HTTPException(status_code=400, detail="from_index out of range")
- if to_index < 0 or to_index >= len(playlist):
- raise HTTPException(status_code=400, detail="to_index out of range")
- # Can't move patterns that have already played (before current_index)
- # But CAN move the current pattern or swap with it (allows live reordering)
- if from_index < current_index:
- raise HTTPException(status_code=400, detail="Cannot move completed pattern")
- if to_index < current_index:
- raise HTTPException(status_code=400, detail="Cannot move to completed position")
- # Perform the reorder
- item = playlist.pop(from_index)
- # Adjust to_index if moving forward (since we removed an item before it)
- adjusted_to_index = to_index if to_index < from_index else to_index - 1
- playlist.insert(adjusted_to_index, item)
- # Update state (this triggers the property setter)
- state.current_playlist = playlist
- return {"success": True}
- @app.post("/add_to_queue")
- async def add_to_queue(request: dict):
- """Add a pattern to the current playlist queue.
- Args:
- pattern: The pattern file path to add (e.g., 'circle.thr' or 'subdirectory/pattern.thr')
- position: 'next' to play after current pattern, 'end' to add to end of queue
- """
- if not state.current_playlist:
- raise HTTPException(status_code=400, detail="No playlist is currently running")
- pattern = request.get("pattern")
- position = request.get("position", "end") # 'next' or 'end'
- if not pattern:
- raise HTTPException(status_code=400, detail="pattern is required")
- # Verify the pattern file exists
- pattern_path = os.path.join(pattern_manager.THETA_RHO_DIR, pattern)
- if not os.path.exists(pattern_path):
- raise HTTPException(status_code=404, detail="Pattern file not found")
- playlist = list(state.current_playlist)
- current_index = state.current_playlist_index
- if position == "next":
- # Insert right after the current pattern
- insert_index = current_index + 1
- else:
- # Add to end
- insert_index = len(playlist)
- playlist.insert(insert_index, pattern)
- state.current_playlist = playlist
- return {"success": True, "position": insert_index}
- @app.get("/api/custom_clear_patterns", deprecated=True, tags=["settings-deprecated"])
- async def get_custom_clear_patterns():
- """Get the currently configured custom clear patterns."""
- return {
- "success": True,
- "custom_clear_from_in": state.custom_clear_from_in,
- "custom_clear_from_out": state.custom_clear_from_out
- }
- @app.post("/api/custom_clear_patterns", deprecated=True, tags=["settings-deprecated"])
- async def set_custom_clear_patterns(request: dict):
- """Set custom clear patterns for clear_from_in and clear_from_out."""
- try:
- # Validate that the patterns exist if they're provided
- if "custom_clear_from_in" in request and request["custom_clear_from_in"]:
- pattern_path = os.path.join(pattern_manager.THETA_RHO_DIR, request["custom_clear_from_in"])
- if not os.path.exists(pattern_path):
- raise HTTPException(status_code=400, detail=f"Pattern file not found: {request['custom_clear_from_in']}")
- state.custom_clear_from_in = request["custom_clear_from_in"]
- elif "custom_clear_from_in" in request:
- state.custom_clear_from_in = None
-
- if "custom_clear_from_out" in request and request["custom_clear_from_out"]:
- pattern_path = os.path.join(pattern_manager.THETA_RHO_DIR, request["custom_clear_from_out"])
- if not os.path.exists(pattern_path):
- raise HTTPException(status_code=400, detail=f"Pattern file not found: {request['custom_clear_from_out']}")
- state.custom_clear_from_out = request["custom_clear_from_out"]
- elif "custom_clear_from_out" in request:
- state.custom_clear_from_out = None
-
- state.save()
- logger.info(f"Custom clear patterns updated - in: {state.custom_clear_from_in}, out: {state.custom_clear_from_out}")
- return {
- "success": True,
- "custom_clear_from_in": state.custom_clear_from_in,
- "custom_clear_from_out": state.custom_clear_from_out
- }
- except Exception as e:
- logger.error(f"Failed to set custom clear patterns: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/clear_pattern_speed", deprecated=True, tags=["settings-deprecated"])
- async def get_clear_pattern_speed():
- """Get the current clearing pattern speed setting."""
- return {
- "success": True,
- "clear_pattern_speed": state.clear_pattern_speed,
- "effective_speed": state.clear_pattern_speed if state.clear_pattern_speed is not None else state.speed
- }
- @app.post("/api/clear_pattern_speed", deprecated=True, tags=["settings-deprecated"])
- async def set_clear_pattern_speed(request: dict):
- """DEPRECATED: Use PATCH /api/settings instead. Set the clearing pattern speed."""
- try:
- # If speed is None or "none", use default behavior (state.speed)
- speed_value = request.get("clear_pattern_speed")
- if speed_value is None or speed_value == "none" or speed_value == "":
- speed = None
- else:
- speed = int(speed_value)
-
- # Validate speed range (same as regular speed limits) only if speed is not None
- if speed is not None and not (50 <= speed <= 2000):
- raise HTTPException(status_code=400, detail="Speed must be between 50 and 2000")
-
- state.clear_pattern_speed = speed
- state.save()
-
- logger.info(f"Clear pattern speed set to {speed if speed is not None else 'default (state.speed)'}")
- return {
- "success": True,
- "clear_pattern_speed": state.clear_pattern_speed,
- "effective_speed": state.clear_pattern_speed if state.clear_pattern_speed is not None else state.speed
- }
- except ValueError:
- raise HTTPException(status_code=400, detail="Invalid speed value")
- except Exception as e:
- logger.error(f"Failed to set clear pattern speed: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/app-name", deprecated=True, tags=["settings-deprecated"])
- async def get_app_name():
- """DEPRECATED: Use GET /api/settings instead. Get current application name."""
- return {"app_name": state.app_name}
- @app.post("/api/app-name", deprecated=True, tags=["settings-deprecated"])
- async def set_app_name(request: dict):
- """DEPRECATED: Use PATCH /api/settings instead. Update application name."""
- app_name = request.get("app_name", "").strip()
- if not app_name:
- app_name = "Dune Weaver" # Reset to default if empty
- state.app_name = app_name
- state.save()
- logger.info(f"Application name updated to: {app_name}")
- return {"success": True, "app_name": app_name}
- # ============================================================================
- # Custom Branding Upload Endpoints
- # ============================================================================
- CUSTOM_BRANDING_DIR = os.path.join("static", "custom")
- ALLOWED_IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".webp", ".svg"}
- MAX_LOGO_SIZE = 10 * 1024 * 1024 # 10MB
- MAX_LOGO_DIMENSION = 512 # Max width/height for optimized logo
- def optimize_logo_image(content: bytes, original_ext: str) -> tuple[bytes, str]:
- """Optimize logo image by resizing and converting to WebP.
- Args:
- content: Original image bytes
- original_ext: Original file extension (e.g., '.png', '.jpg')
- Returns:
- Tuple of (optimized_bytes, new_extension)
- For SVG files, returns the original content unchanged.
- For raster images, resizes to MAX_LOGO_DIMENSION and converts to WebP.
- """
- # SVG files are already lightweight vectors - keep as-is
- if original_ext.lower() == ".svg":
- return content, original_ext
- try:
- from PIL import Image
- import io
- with Image.open(io.BytesIO(content)) as img:
- # Convert to RGBA for transparency support
- if img.mode in ('P', 'LA') or (img.mode == 'RGBA' and 'transparency' in img.info):
- img = img.convert('RGBA')
- elif img.mode != 'RGBA':
- img = img.convert('RGB')
- # Resize if larger than max dimension (maintain aspect ratio)
- width, height = img.size
- if width > MAX_LOGO_DIMENSION or height > MAX_LOGO_DIMENSION:
- ratio = min(MAX_LOGO_DIMENSION / width, MAX_LOGO_DIMENSION / height)
- new_size = (int(width * ratio), int(height * ratio))
- img = img.resize(new_size, Image.Resampling.LANCZOS)
- logger.info(f"Logo resized from {width}x{height} to {new_size[0]}x{new_size[1]}")
- # Save as WebP with good quality/size balance
- output = io.BytesIO()
- img.save(output, format='WEBP', quality=85, method=6)
- optimized_bytes = output.getvalue()
- original_size = len(content)
- new_size = len(optimized_bytes)
- reduction = ((original_size - new_size) / original_size) * 100
- logger.info(f"Logo optimized: {original_size:,} bytes -> {new_size:,} bytes ({reduction:.1f}% reduction)")
- return optimized_bytes, ".webp"
- except Exception as e:
- logger.warning(f"Logo optimization failed, using original: {str(e)}")
- return content, original_ext
- def generate_favicon_from_logo(logo_path: str, output_dir: str) -> bool:
- """Generate circular favicons with transparent background from the uploaded logo.
- Creates:
- - favicon.ico (multi-size: 256, 128, 64, 48, 32, 16)
- - favicon-16x16.png, favicon-32x32.png, favicon-96x96.png, favicon-128x128.png
- Returns True on success, False on failure.
- """
- try:
- from PIL import Image, ImageDraw
- def create_circular_transparent(img, size):
- """Create circular image with transparent background."""
- resized = img.resize((size, size), Image.Resampling.LANCZOS)
- mask = Image.new('L', (size, size), 0)
- draw = ImageDraw.Draw(mask)
- draw.ellipse((0, 0, size - 1, size - 1), fill=255)
- output = Image.new('RGBA', (size, size), (0, 0, 0, 0))
- output.paste(resized, (0, 0), mask)
- return output
- with Image.open(logo_path) as img:
- # Convert to RGBA if needed
- if img.mode != 'RGBA':
- img = img.convert('RGBA')
- # Crop to square (center crop)
- width, height = img.size
- min_dim = min(width, height)
- left = (width - min_dim) // 2
- top = (height - min_dim) // 2
- img = img.crop((left, top, left + min_dim, top + min_dim))
- # Generate circular favicon PNGs with transparent background
- png_sizes = {
- "favicon-16x16.png": 16,
- "favicon-32x32.png": 32,
- "favicon-96x96.png": 96,
- "favicon-128x128.png": 128,
- }
- for filename, size in png_sizes.items():
- icon = create_circular_transparent(img, size)
- icon.save(os.path.join(output_dir, filename), format='PNG')
- # Generate high-resolution favicon.ico
- ico_sizes = [256, 128, 64, 48, 32, 16]
- ico_images = [create_circular_transparent(img, s) for s in ico_sizes]
- ico_images[0].save(
- os.path.join(output_dir, "favicon.ico"),
- format='ICO',
- append_images=ico_images[1:],
- sizes=[(s, s) for s in ico_sizes]
- )
- return True
- except Exception as e:
- logger.error(f"Failed to generate favicon: {str(e)}")
- return False
- def generate_pwa_icons_from_logo(logo_path: str, output_dir: str) -> bool:
- """Generate square PWA app icons from the uploaded logo.
- Creates square icons (no circular crop) - OS will apply its own mask.
- Generates:
- - apple-touch-icon.png (180x180)
- - android-chrome-192x192.png (192x192)
- - android-chrome-512x512.png (512x512)
- Returns True on success, False on failure.
- """
- try:
- from PIL import Image
- with Image.open(logo_path) as img:
- # Convert to RGBA if needed
- if img.mode != 'RGBA':
- img = img.convert('RGBA')
- # Crop to square (center crop)
- width, height = img.size
- min_dim = min(width, height)
- left = (width - min_dim) // 2
- top = (height - min_dim) // 2
- img = img.crop((left, top, left + min_dim, top + min_dim))
- # Generate square icons at each required size
- icon_sizes = {
- "apple-touch-icon.png": 180,
- "android-chrome-192x192.png": 192,
- "android-chrome-512x512.png": 512,
- }
- for filename, size in icon_sizes.items():
- resized = img.resize((size, size), Image.Resampling.LANCZOS)
- icon_path = os.path.join(output_dir, filename)
- resized.save(icon_path, format='PNG')
- logger.info(f"Generated PWA icon: {filename}")
- return True
- except Exception as e:
- logger.error(f"Failed to generate PWA icons: {str(e)}")
- return False
- @app.post("/api/upload-logo", tags=["settings"])
- async def upload_logo(file: UploadFile = File(...)):
- """Upload a custom logo image.
- Supported formats: PNG, JPG, JPEG, GIF, WebP, SVG
- Maximum upload size: 10MB
- Images are automatically optimized:
- - Resized to max 512x512 pixels
- - Converted to WebP format for smaller file size
- - SVG files are kept as-is (already lightweight)
- A favicon and PWA icons will be automatically generated from the logo.
- """
- try:
- # Validate file extension
- file_ext = os.path.splitext(file.filename)[1].lower()
- if file_ext not in ALLOWED_IMAGE_EXTENSIONS:
- raise HTTPException(
- status_code=400,
- detail=f"Invalid file type. Allowed: {', '.join(ALLOWED_IMAGE_EXTENSIONS)}"
- )
- # Read and validate file size
- content = await file.read()
- if len(content) > MAX_LOGO_SIZE:
- raise HTTPException(
- status_code=400,
- detail=f"File too large. Maximum size: {MAX_LOGO_SIZE // (1024*1024)}MB"
- )
- # Ensure custom branding directory exists
- os.makedirs(CUSTOM_BRANDING_DIR, exist_ok=True)
- # Delete old logo and favicon if they exist
- if state.custom_logo:
- old_logo_path = os.path.join(CUSTOM_BRANDING_DIR, state.custom_logo)
- if os.path.exists(old_logo_path):
- os.remove(old_logo_path)
- # Also remove old favicon
- old_favicon_path = os.path.join(CUSTOM_BRANDING_DIR, "favicon.ico")
- if os.path.exists(old_favicon_path):
- os.remove(old_favicon_path)
- # Optimize the image (resize + convert to WebP for smaller file size)
- optimized_content, optimized_ext = optimize_logo_image(content, file_ext)
- # Generate a unique filename to prevent caching issues
- import uuid
- filename = f"logo-{uuid.uuid4().hex[:8]}{optimized_ext}"
- file_path = os.path.join(CUSTOM_BRANDING_DIR, filename)
- # Save the optimized logo file
- with open(file_path, "wb") as f:
- f.write(optimized_content)
- # Generate favicon and PWA icons from logo (for non-SVG files)
- favicon_generated = False
- pwa_icons_generated = False
- if optimized_ext != ".svg":
- favicon_generated = generate_favicon_from_logo(file_path, CUSTOM_BRANDING_DIR)
- pwa_icons_generated = generate_pwa_icons_from_logo(file_path, CUSTOM_BRANDING_DIR)
- # Update state
- state.custom_logo = filename
- state.save()
- logger.info(f"Custom logo uploaded: {filename}, favicon generated: {favicon_generated}, PWA icons generated: {pwa_icons_generated}")
- return {
- "success": True,
- "filename": filename,
- "url": f"/static/custom/{filename}",
- "favicon_generated": favicon_generated,
- "pwa_icons_generated": pwa_icons_generated
- }
- except HTTPException:
- raise
- except Exception as e:
- logger.error(f"Error uploading logo: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.delete("/api/custom-logo", tags=["settings"])
- async def delete_custom_logo():
- """Remove custom logo, favicon, and PWA icons, reverting to defaults."""
- try:
- if state.custom_logo:
- # Remove logo
- logo_path = os.path.join(CUSTOM_BRANDING_DIR, state.custom_logo)
- if os.path.exists(logo_path):
- os.remove(logo_path)
- # Remove generated favicons
- favicon_files = [
- "favicon.ico",
- "favicon-16x16.png",
- "favicon-32x32.png",
- "favicon-96x96.png",
- "favicon-128x128.png",
- ]
- for favicon_name in favicon_files:
- favicon_path = os.path.join(CUSTOM_BRANDING_DIR, favicon_name)
- if os.path.exists(favicon_path):
- os.remove(favicon_path)
- # Remove generated PWA icons
- pwa_icons = [
- "apple-touch-icon.png",
- "android-chrome-192x192.png",
- "android-chrome-512x512.png",
- ]
- for icon_name in pwa_icons:
- icon_path = os.path.join(CUSTOM_BRANDING_DIR, icon_name)
- if os.path.exists(icon_path):
- os.remove(icon_path)
- state.custom_logo = None
- state.save()
- logger.info("Custom logo, favicon, and PWA icons removed")
- return {"success": True}
- except Exception as e:
- logger.error(f"Error removing logo: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/mqtt-config", deprecated=True, tags=["settings-deprecated"])
- async def get_mqtt_config():
- """DEPRECATED: Use GET /api/settings instead. Get current MQTT configuration.
- Note: Password is not returned for security reasons.
- """
- from modules.mqtt import get_mqtt_handler
- handler = get_mqtt_handler()
- return {
- "enabled": state.mqtt_enabled,
- "broker": state.mqtt_broker,
- "port": state.mqtt_port,
- "username": state.mqtt_username,
- # Password is intentionally omitted for security
- "has_password": bool(state.mqtt_password),
- "client_id": state.mqtt_client_id,
- "discovery_prefix": state.mqtt_discovery_prefix,
- "device_id": state.mqtt_device_id,
- "device_name": state.mqtt_device_name,
- "connected": handler.is_connected if hasattr(handler, 'is_connected') else False,
- "is_mock": handler.__class__.__name__ == 'MockMQTTHandler'
- }
- @app.post("/api/mqtt-config", deprecated=True, tags=["settings-deprecated"])
- async def set_mqtt_config(request: dict):
- """DEPRECATED: Use PATCH /api/settings instead. Update MQTT configuration. Requires restart to take effect."""
- try:
- # Update state with new values
- state.mqtt_enabled = request.get("enabled", False)
- state.mqtt_broker = (request.get("broker") or "").strip()
- state.mqtt_port = int(request.get("port") or 1883)
- state.mqtt_username = (request.get("username") or "").strip()
- state.mqtt_password = (request.get("password") or "").strip()
- state.mqtt_client_id = (request.get("client_id") or "dune_weaver").strip()
- state.mqtt_discovery_prefix = (request.get("discovery_prefix") or "homeassistant").strip()
- state.mqtt_device_id = (request.get("device_id") or "dune_weaver").strip()
- state.mqtt_device_name = (request.get("device_name") or "Dune Weaver").strip()
- # Validate required fields when enabled
- if state.mqtt_enabled and not state.mqtt_broker:
- return JSONResponse(
- content={"success": False, "message": "Broker address is required when MQTT is enabled"},
- status_code=400
- )
- state.save()
- logger.info(f"MQTT configuration updated. Enabled: {state.mqtt_enabled}, Broker: {state.mqtt_broker}")
- return {
- "success": True,
- "message": "MQTT configuration saved. Restart the application for changes to take effect.",
- "requires_restart": True
- }
- except ValueError as e:
- return JSONResponse(
- content={"success": False, "message": f"Invalid value: {str(e)}"},
- status_code=400
- )
- except Exception as e:
- logger.error(f"Failed to update MQTT config: {str(e)}")
- return JSONResponse(
- content={"success": False, "message": str(e)},
- status_code=500
- )
- @app.post("/api/mqtt-test")
- async def test_mqtt_connection(request: dict):
- """Test MQTT connection with provided settings."""
- import paho.mqtt.client as mqtt_client
- broker = (request.get("broker") or "").strip()
- port = int(request.get("port") or 1883)
- username = (request.get("username") or "").strip()
- password = (request.get("password") or "").strip()
- client_id = (request.get("client_id") or "dune_weaver_test").strip()
- if not broker:
- return JSONResponse(
- content={"success": False, "message": "Broker address is required"},
- status_code=400
- )
- try:
- # Create a test client
- client = mqtt_client.Client(client_id=client_id + "_test")
- if username:
- client.username_pw_set(username, password)
- # Connection result
- connection_result = {"connected": False, "error": None}
- def on_connect(client, userdata, flags, rc):
- if rc == 0:
- connection_result["connected"] = True
- else:
- error_messages = {
- 1: "Incorrect protocol version",
- 2: "Invalid client identifier",
- 3: "Server unavailable",
- 4: "Bad username or password",
- 5: "Not authorized"
- }
- connection_result["error"] = error_messages.get(rc, f"Connection failed with code {rc}")
- client.on_connect = on_connect
- # Try to connect with timeout
- client.connect_async(broker, port, keepalive=10)
- client.loop_start()
- # Wait for connection result (max 5 seconds)
- import time
- start_time = time.time()
- while time.time() - start_time < 5:
- if connection_result["connected"] or connection_result["error"]:
- break
- await asyncio.sleep(0.1)
- client.loop_stop()
- client.disconnect()
- if connection_result["connected"]:
- return {"success": True, "message": "Successfully connected to MQTT broker"}
- elif connection_result["error"]:
- return JSONResponse(
- content={"success": False, "message": connection_result["error"]},
- status_code=400
- )
- else:
- return JSONResponse(
- content={"success": False, "message": "Connection timed out. Check broker address and port."},
- status_code=400
- )
- except Exception as e:
- logger.error(f"MQTT test connection failed: {str(e)}")
- return JSONResponse(
- content={"success": False, "message": str(e)},
- status_code=500
- )
- def _read_and_encode_preview(cache_path: str) -> str:
- """Read preview image from disk and encode as base64.
-
- Combines file I/O and base64 encoding in a single function
- to be run in executor, reducing context switches.
- """
- with open(cache_path, 'rb') as f:
- image_data = f.read()
- return base64.b64encode(image_data).decode('utf-8')
- @app.post("/preview_thr_batch")
- async def preview_thr_batch(request: dict):
- start = time.time()
- if not request.get("file_names"):
- logger.warning("Batch preview request received without filenames")
- raise HTTPException(status_code=400, detail="No file names provided")
- file_names = request["file_names"]
- if not isinstance(file_names, list):
- raise HTTPException(status_code=400, detail="file_names must be a list")
- headers = {
- "Cache-Control": "public, max-age=3600", # Cache for 1 hour
- "Content-Type": "application/json"
- }
- async def process_single_file(file_name):
- """Process a single file and return its preview data."""
- # Check in-memory cache first (for current and next playing patterns)
- normalized_for_cache = normalize_file_path(file_name)
- if state._current_preview and state._current_preview[0] == normalized_for_cache:
- logger.debug(f"Using cached preview for current: {file_name}")
- return file_name, state._current_preview[1]
- if state._next_preview and state._next_preview[0] == normalized_for_cache:
- logger.debug(f"Using cached preview for next: {file_name}")
- return file_name, state._next_preview[1]
- # Acquire semaphore to limit concurrent processing
- async with get_preview_semaphore():
- t1 = time.time()
- try:
- # Normalize file path for cross-platform compatibility
- normalized_file_name = normalize_file_path(file_name)
- pattern_file_path = os.path.join(pattern_manager.THETA_RHO_DIR, normalized_file_name)
- # Check file existence asynchronously
- exists = await asyncio.to_thread(os.path.exists, pattern_file_path)
- if not exists:
- logger.warning(f"Pattern file not found: {pattern_file_path}")
- return file_name, {"error": "Pattern file not found"}
- cache_path = get_cache_path(normalized_file_name)
- # Check cache existence asynchronously
- cache_exists = await asyncio.to_thread(os.path.exists, cache_path)
- if not cache_exists:
- logger.info(f"Cache miss for {file_name}. Generating preview...")
- success = await generate_image_preview(normalized_file_name)
- cache_exists_after = await asyncio.to_thread(os.path.exists, cache_path)
- if not success or not cache_exists_after:
- logger.error(f"Failed to generate or find preview for {file_name}")
- return file_name, {"error": "Failed to generate preview"}
- metadata = get_pattern_metadata(normalized_file_name)
- if metadata:
- first_coord_obj = metadata.get('first_coordinate')
- last_coord_obj = metadata.get('last_coordinate')
- else:
- logger.debug(f"Metadata cache miss for {file_name}, parsing file")
- # Use thread pool to avoid memory pressure on resource-constrained devices
- coordinates = await asyncio.to_thread(parse_theta_rho_file, pattern_file_path)
- first_coord = coordinates[0] if coordinates else None
- last_coord = coordinates[-1] if coordinates else None
- first_coord_obj = {"x": first_coord[0], "y": first_coord[1]} if first_coord else None
- last_coord_obj = {"x": last_coord[0], "y": last_coord[1]} if last_coord else None
- # Read image file and encode in executor to avoid blocking event loop
- loop = asyncio.get_running_loop()
- image_b64 = await loop.run_in_executor(None, _read_and_encode_preview, cache_path)
- result = {
- "image_data": f"data:image/webp;base64,{image_b64}",
- "first_coordinate": first_coord_obj,
- "last_coordinate": last_coord_obj
- }
- # Cache preview for current/next pattern to speed up subsequent requests
- current_file = state.current_playing_file
- if current_file:
- current_normalized = normalize_file_path(current_file)
- if normalized_file_name == current_normalized:
- state._current_preview = (normalized_file_name, result)
- logger.debug(f"Cached preview for current: {file_name}")
- elif state.current_playlist:
- # Check if this is the next pattern in playlist
- playlist = state.current_playlist
- idx = state.current_playlist_index
- if idx is not None and idx + 1 < len(playlist):
- next_file = normalize_file_path(playlist[idx + 1])
- if normalized_file_name == next_file:
- state._next_preview = (normalized_file_name, result)
- logger.debug(f"Cached preview for next: {file_name}")
- logger.debug(f"Processed {file_name} in {time.time() - t1:.2f}s")
- return file_name, result
- except Exception as e:
- logger.error(f"Error processing {file_name}: {str(e)}")
- return file_name, {"error": str(e)}
- # Process all files concurrently
- tasks = [process_single_file(file_name) for file_name in file_names]
- file_results = await asyncio.gather(*tasks)
- # Convert results to dictionary
- results = dict(file_results)
- logger.debug(f"Total batch processing time: {time.time() - start:.2f}s for {len(file_names)} files")
- return JSONResponse(content=results, headers=headers)
- @app.get("/playlists")
- async def playlists_page(request: Request):
- return get_redirect_response(request)
- @app.get("/image2sand")
- async def image2sand_page(request: Request):
- return get_redirect_response(request)
- @app.get("/led")
- async def led_control_page(request: Request):
- return get_redirect_response(request)
- # DW LED control endpoints
- @app.get("/api/dw_leds/status")
- async def dw_leds_status():
- """Get DW LED controller status"""
- if not state.led_controller or state.led_provider != "dw_leds":
- return {"connected": False, "message": "DW LEDs not configured"}
- try:
- return state.led_controller.check_status()
- except Exception as e:
- logger.error(f"Failed to check DW LED status: {str(e)}")
- return {"connected": False, "message": str(e)}
- @app.post("/api/dw_leds/power")
- async def dw_leds_power(request: dict):
- """Control DW LED power (0=off, 1=on, 2=toggle)"""
- if not state.led_controller or state.led_provider != "dw_leds":
- raise HTTPException(status_code=400, detail="DW LEDs not configured")
- state_value = request.get("state", 1)
- if state_value not in [0, 1, 2]:
- raise HTTPException(status_code=400, detail="State must be 0 (off), 1 (on), or 2 (toggle)")
- try:
- result = state.led_controller.set_power(state_value)
- # Reset idle timeout when LEDs are manually powered on (only if idle timeout is enabled)
- # This prevents idle timeout from immediately turning them back off
- if state_value in [1, 2] and state.dw_led_idle_timeout_enabled: # Power on or toggle
- state.dw_led_last_activity_time = time.time()
- logger.debug(f"LED activity time reset due to manual power on")
- return result
- except Exception as e:
- logger.error(f"Failed to set DW LED power: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/dw_leds/brightness")
- async def dw_leds_brightness(request: dict):
- """Set DW LED brightness (0-100)"""
- if not state.led_controller or state.led_provider != "dw_leds":
- raise HTTPException(status_code=400, detail="DW LEDs not configured")
- value = request.get("value", 50)
- if not 0 <= value <= 100:
- raise HTTPException(status_code=400, detail="Brightness must be between 0 and 100")
- try:
- controller = state.led_controller.get_controller()
- result = controller.set_brightness(value)
- # Update state if successful
- if result.get("connected"):
- state.dw_led_brightness = value
- state.save()
- return result
- except Exception as e:
- logger.error(f"Failed to set DW LED brightness: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/dw_leds/color")
- async def dw_leds_color(request: dict):
- """Set solid color (manual UI control - always powers on LEDs)"""
- if not state.led_controller or state.led_provider != "dw_leds":
- raise HTTPException(status_code=400, detail="DW LEDs not configured")
- # Accept both formats: {"r": 255, "g": 0, "b": 0} or {"color": [255, 0, 0]}
- if "color" in request:
- color = request["color"]
- if not isinstance(color, list) or len(color) != 3:
- raise HTTPException(status_code=400, detail="Color must be [R, G, B] array")
- r, g, b = color[0], color[1], color[2]
- elif "r" in request and "g" in request and "b" in request:
- r = request["r"]
- g = request["g"]
- b = request["b"]
- else:
- raise HTTPException(status_code=400, detail="Color must include r, g, b fields or color array")
- try:
- controller = state.led_controller.get_controller()
- # Power on LEDs when user manually sets color via UI
- controller.set_power(1)
- # Reset idle timeout for manual interaction (only if idle timeout is enabled)
- if state.dw_led_idle_timeout_enabled:
- state.dw_led_last_activity_time = time.time()
- return controller.set_color(r, g, b)
- except Exception as e:
- logger.error(f"Failed to set DW LED color: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/dw_leds/colors")
- async def dw_leds_colors(request: dict):
- """Set effect colors (color1, color2, color3) - manual UI control - always powers on LEDs"""
- if not state.led_controller or state.led_provider != "dw_leds":
- raise HTTPException(status_code=400, detail="DW LEDs not configured")
- # Parse colors from request
- color1 = None
- color2 = None
- color3 = None
- if "color1" in request:
- c = request["color1"]
- if isinstance(c, list) and len(c) == 3:
- color1 = tuple(c)
- else:
- raise HTTPException(status_code=400, detail="color1 must be [R, G, B] array")
- if "color2" in request:
- c = request["color2"]
- if isinstance(c, list) and len(c) == 3:
- color2 = tuple(c)
- else:
- raise HTTPException(status_code=400, detail="color2 must be [R, G, B] array")
- if "color3" in request:
- c = request["color3"]
- if isinstance(c, list) and len(c) == 3:
- color3 = tuple(c)
- else:
- raise HTTPException(status_code=400, detail="color3 must be [R, G, B] array")
- if not any([color1, color2, color3]):
- raise HTTPException(status_code=400, detail="Must provide at least one color")
- try:
- controller = state.led_controller.get_controller()
- # Power on LEDs when user manually sets colors via UI
- controller.set_power(1)
- # Reset idle timeout for manual interaction (only if idle timeout is enabled)
- if state.dw_led_idle_timeout_enabled:
- state.dw_led_last_activity_time = time.time()
- return controller.set_colors(color1=color1, color2=color2, color3=color3)
- except Exception as e:
- logger.error(f"Failed to set DW LED colors: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/dw_leds/effects")
- async def dw_leds_effects():
- """Get list of available effects"""
- if not state.led_controller or state.led_provider != "dw_leds":
- raise HTTPException(status_code=400, detail="DW LEDs not configured")
- try:
- controller = state.led_controller.get_controller()
- effects = controller.get_effects()
- # Convert tuples to lists for JSON serialization
- effects_list = [[eid, name] for eid, name in effects]
- return {
- "success": True,
- "effects": effects_list
- }
- except Exception as e:
- logger.error(f"Failed to get DW LED effects: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.get("/api/dw_leds/palettes")
- async def dw_leds_palettes():
- """Get list of available palettes"""
- if not state.led_controller or state.led_provider != "dw_leds":
- raise HTTPException(status_code=400, detail="DW LEDs not configured")
- try:
- controller = state.led_controller.get_controller()
- palettes = controller.get_palettes()
- # Convert tuples to lists for JSON serialization
- palettes_list = [[pid, name] for pid, name in palettes]
- return {
- "success": True,
- "palettes": palettes_list
- }
- except Exception as e:
- logger.error(f"Failed to get DW LED palettes: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/dw_leds/effect")
- async def dw_leds_effect(request: dict):
- """Set effect by ID (manual UI control - always powers on LEDs)"""
- if not state.led_controller or state.led_provider != "dw_leds":
- raise HTTPException(status_code=400, detail="DW LEDs not configured")
- effect_id = request.get("effect_id", 0)
- speed = request.get("speed")
- intensity = request.get("intensity")
- try:
- controller = state.led_controller.get_controller()
- # Power on LEDs when user manually sets effect via UI
- controller.set_power(1)
- # Reset idle timeout for manual interaction (only if idle timeout is enabled)
- if state.dw_led_idle_timeout_enabled:
- state.dw_led_last_activity_time = time.time()
- return controller.set_effect(effect_id, speed=speed, intensity=intensity)
- except Exception as e:
- logger.error(f"Failed to set DW LED effect: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/dw_leds/palette")
- async def dw_leds_palette(request: dict):
- """Set palette by ID (manual UI control - always powers on LEDs)"""
- if not state.led_controller or state.led_provider != "dw_leds":
- raise HTTPException(status_code=400, detail="DW LEDs not configured")
- palette_id = request.get("palette_id", 0)
- try:
- controller = state.led_controller.get_controller()
- # Power on LEDs when user manually sets palette via UI
- controller.set_power(1)
- # Reset idle timeout for manual interaction (only if idle timeout is enabled)
- if state.dw_led_idle_timeout_enabled:
- state.dw_led_last_activity_time = time.time()
- return controller.set_palette(palette_id)
- except Exception as e:
- logger.error(f"Failed to set DW LED palette: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/dw_leds/speed")
- async def dw_leds_speed(request: dict):
- """Set effect speed (0-255)"""
- if not state.led_controller or state.led_provider != "dw_leds":
- raise HTTPException(status_code=400, detail="DW LEDs not configured")
- value = request.get("speed", 128)
- if not 0 <= value <= 255:
- raise HTTPException(status_code=400, detail="Speed must be between 0 and 255")
- try:
- controller = state.led_controller.get_controller()
- result = controller.set_speed(value)
- # Save speed to state
- state.dw_led_speed = value
- state.save()
- return result
- except Exception as e:
- logger.error(f"Failed to set DW LED speed: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/dw_leds/intensity")
- async def dw_leds_intensity(request: dict):
- """Set effect intensity (0-255)"""
- if not state.led_controller or state.led_provider != "dw_leds":
- raise HTTPException(status_code=400, detail="DW LEDs not configured")
- value = request.get("intensity", 128)
- if not 0 <= value <= 255:
- raise HTTPException(status_code=400, detail="Intensity must be between 0 and 255")
- try:
- controller = state.led_controller.get_controller()
- result = controller.set_intensity(value)
- # Save intensity to state
- state.dw_led_intensity = value
- state.save()
- return result
- except Exception as e:
- logger.error(f"Failed to set DW LED intensity: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- @app.post("/api/dw_leds/save_effect_settings")
- async def dw_leds_save_effect_settings(request: dict):
- """Save current LED settings as idle or playing effect"""
- effect_type = request.get("type") # 'idle' or 'playing'
- settings = {
- "effect_id": request.get("effect_id"),
- "palette_id": request.get("palette_id"),
- "speed": request.get("speed"),
- "intensity": request.get("intensity"),
- "color1": request.get("color1"),
- "color2": request.get("color2"),
- "color3": request.get("color3")
- }
- if effect_type == "idle":
- state.dw_led_idle_effect = settings
- elif effect_type == "playing":
- state.dw_led_playing_effect = settings
- else:
- raise HTTPException(status_code=400, detail="Invalid effect type. Must be 'idle' or 'playing'")
- state.save()
- logger.info(f"DW LED {effect_type} effect settings saved: {settings}")
- return {"success": True, "type": effect_type, "settings": settings}
- @app.post("/api/dw_leds/clear_effect_settings")
- async def dw_leds_clear_effect_settings(request: dict):
- """Clear idle or playing effect settings"""
- effect_type = request.get("type") # 'idle' or 'playing'
- if effect_type == "idle":
- state.dw_led_idle_effect = None
- elif effect_type == "playing":
- state.dw_led_playing_effect = None
- else:
- raise HTTPException(status_code=400, detail="Invalid effect type. Must be 'idle' or 'playing'")
- state.save()
- logger.info(f"DW LED {effect_type} effect settings cleared")
- return {"success": True, "type": effect_type}
- @app.get("/api/dw_leds/get_effect_settings")
- async def dw_leds_get_effect_settings():
- """Get saved idle and playing effect settings"""
- return {
- "idle_effect": state.dw_led_idle_effect,
- "playing_effect": state.dw_led_playing_effect
- }
- @app.post("/api/dw_leds/idle_timeout")
- async def dw_leds_set_idle_timeout(request: dict):
- """Configure LED idle timeout settings"""
- enabled = request.get("enabled", False)
- minutes = request.get("minutes", 30)
- # Validate minutes (between 1 and 1440 - 24 hours)
- if minutes < 1 or minutes > 1440:
- raise HTTPException(status_code=400, detail="Timeout must be between 1 and 1440 minutes")
- state.dw_led_idle_timeout_enabled = enabled
- state.dw_led_idle_timeout_minutes = minutes
- # Reset activity time when settings change
- import time
- state.dw_led_last_activity_time = time.time()
- state.save()
- logger.info(f"DW LED idle timeout configured: enabled={enabled}, minutes={minutes}")
- return {
- "success": True,
- "enabled": enabled,
- "minutes": minutes
- }
- @app.get("/api/dw_leds/idle_timeout")
- async def dw_leds_get_idle_timeout():
- """Get LED idle timeout settings"""
- import time
- # Calculate remaining time if timeout is active
- remaining_minutes = None
- if state.dw_led_idle_timeout_enabled and state.dw_led_last_activity_time:
- elapsed_seconds = time.time() - state.dw_led_last_activity_time
- timeout_seconds = state.dw_led_idle_timeout_minutes * 60
- remaining_seconds = max(0, timeout_seconds - elapsed_seconds)
- remaining_minutes = round(remaining_seconds / 60, 1)
- return {
- "enabled": state.dw_led_idle_timeout_enabled,
- "minutes": state.dw_led_idle_timeout_minutes,
- "remaining_minutes": remaining_minutes
- }
- @app.get("/table_control")
- async def table_control_page(request: Request):
- return get_redirect_response(request)
- @app.get("/cache-progress")
- async def get_cache_progress_endpoint():
- """Get the current cache generation progress."""
- from modules.core.cache_manager import get_cache_progress
- return get_cache_progress()
- @app.post("/rebuild_cache")
- async def rebuild_cache_endpoint():
- """Trigger a rebuild of the pattern cache."""
- try:
- from modules.core.cache_manager import rebuild_cache
- await rebuild_cache()
- return {"success": True, "message": "Cache rebuild completed successfully"}
- except Exception as e:
- logger.error(f"Failed to rebuild cache: {str(e)}")
- raise HTTPException(status_code=500, detail=str(e))
- def signal_handler(signum, frame):
- """Handle shutdown signals gracefully."""
- logger.info("Received shutdown signal, cleaning up...")
- try:
- # Turn off all LEDs on shutdown
- if state.led_controller:
- state.led_controller.set_power(0)
- # Stop pattern manager motion controller
- pattern_manager.motion_controller.stop()
- # Set stop flags to halt any running patterns
- state.stop_requested = True
- state.pause_requested = False
- state.save()
- logger.info("Cleanup completed")
- except Exception as e:
- logger.error(f"Error during cleanup: {str(e)}")
- finally:
- logger.info("Exiting application...")
- # Use os._exit after cleanup is complete to avoid async stack tracebacks
- # This is safe because we've already: shut down process pool, stopped motion controller, saved state
- os._exit(0)
- @app.get("/api/version")
- async def get_version_info(force_refresh: bool = False):
- """Get current and latest version information
- Args:
- force_refresh: If true, bypass cache and fetch fresh data from GitHub
- """
- try:
- version_info = await version_manager.get_version_info(force_refresh=force_refresh)
- return JSONResponse(content=version_info)
- except Exception as e:
- logger.error(f"Error getting version info: {e}")
- return JSONResponse(
- content={
- "current": await version_manager.get_current_version(),
- "latest": await version_manager.get_current_version(),
- "update_available": False,
- "error": "Unable to check for updates"
- },
- status_code=200
- )
- @app.post("/api/update")
- async def trigger_update():
- """Trigger software update by pulling latest Docker images and recreating containers."""
- try:
- logger.info("Update triggered via API")
- success, error_message, error_log = update_manager.update_software()
- if success:
- return JSONResponse(content={
- "success": True,
- "message": "Update started. Containers are being recreated with the latest images. The page will reload shortly."
- })
- else:
- return JSONResponse(content={
- "success": False,
- "message": error_message or "Update failed",
- "errors": error_log
- })
- except Exception as e:
- logger.error(f"Error triggering update: {e}")
- return JSONResponse(
- content={"success": False, "message": f"Failed to trigger update: {str(e)}"},
- status_code=500
- )
- @app.post("/api/system/shutdown")
- async def shutdown_system():
- """Shutdown the system"""
- try:
- logger.warning("Shutdown initiated via API")
- # Schedule shutdown command after a short delay to allow response to be sent
- def delayed_shutdown():
- time.sleep(2) # Give time for response to be sent
- try:
- # Use systemctl to shutdown the host (via mounted systemd socket)
- subprocess.run(["systemctl", "poweroff"], check=True)
- logger.info("Host shutdown command executed successfully via systemctl")
- except FileNotFoundError:
- logger.error("systemctl command not found - ensure systemd volumes are mounted")
- except Exception as e:
- logger.error(f"Error executing host shutdown command: {e}")
- import threading
- shutdown_thread = threading.Thread(target=delayed_shutdown)
- shutdown_thread.start()
- return {"success": True, "message": "System shutdown initiated"}
- except Exception as e:
- logger.error(f"Error initiating shutdown: {e}")
- return JSONResponse(
- content={"success": False, "message": str(e)},
- status_code=500
- )
- @app.post("/api/system/restart")
- async def restart_system():
- """Restart the Docker containers using docker compose"""
- try:
- logger.warning("Restart initiated via API")
- # Schedule restart command after a short delay to allow response to be sent
- def delayed_restart():
- time.sleep(2) # Give time for response to be sent
- try:
- # Use docker restart directly with container name
- # This is simpler and doesn't require the compose file path
- subprocess.run(["docker", "restart", "dune-weaver-backend"], check=True)
- logger.info("Docker restart command executed successfully")
- except FileNotFoundError:
- logger.error("docker command not found")
- except Exception as e:
- logger.error(f"Error executing docker restart: {e}")
- import threading
- restart_thread = threading.Thread(target=delayed_restart)
- restart_thread.start()
- return {"success": True, "message": "System restart initiated"}
- except Exception as e:
- logger.error(f"Error initiating restart: {e}")
- return JSONResponse(
- content={"success": False, "message": str(e)},
- status_code=500
- )
- def entrypoint():
- import uvicorn
- logger.info("Starting FastAPI server on port 8080...")
- uvicorn.run(app, host="0.0.0.0", port=8080, workers=1) # Set workers to 1 to avoid multiple signal handlers
- if __name__ == "__main__":
- entrypoint()
|