""" System-Router: Liefert Live-Auslastung (CPU, RAM, Disk, GPU, Temp). Greift lokal auf psutil und sysfs zu. """ from pathlib import Path import asyncio import psutil from fastapi import APIRouter, Depends, WebSocket, WebSocketDisconnect from auth import auth router = APIRouter(prefix="/api/system", dependencies=[Depends(auth)]) def _read_sysfs_int(path: Path) -> int | None: try: if path.exists(): return int(path.read_text().strip()) except Exception: pass return None def _get_gpu_mem(): # Suche in /sys/class/drm/ nach card*, die mem_info_* hat drm_dir = Path("/sys/class/drm") if not drm_dir.exists(): return None for card in drm_dir.glob("card*"): dev_dir = card / "device" vram_total = _read_sysfs_int(dev_dir / "mem_info_vram_total") if vram_total is not None: return { "vram": { "used": _read_sysfs_int(dev_dir / "mem_info_vram_used") or 0, "total": vram_total, }, "gtt": { "used": _read_sysfs_int(dev_dir / "mem_info_gtt_used") or 0, "total": _read_sysfs_int(dev_dir / "mem_info_gtt_total") or 0, } } return None def _get_temperatures(): temps = {"cpu": None, "gpu": None} hwmon_dir = Path("/sys/class/hwmon") if not hwmon_dir.exists(): return temps for hw in hwmon_dir.glob("hwmon*"): try: name = (hw / "name").read_text().strip() temp_val = _read_sysfs_int(hw / "temp1_input") if temp_val is not None: if name == "k10temp": temps["cpu"] = temp_val / 1000.0 elif name == "amdgpu": temps["gpu"] = temp_val / 1000.0 except Exception: continue return temps @router.get("/status") def system_status(): temps = _get_temperatures() cpu_percent = psutil.cpu_percent(interval=None) ram = psutil.virtual_memory() disk = psutil.disk_usage("/") return { "cpu": { "percent": cpu_percent, "temp": temps["cpu"] }, "ram": { "used": ram.used, "total": ram.total, "percent": ram.percent }, "disk": { "used": disk.used, "total": disk.total, "percent": disk.percent }, "gpu": _get_gpu_mem() or { "vram": {"used": 0, "total": 0}, "gtt": {"used": 0, "total": 0} }, "gpu_temp": temps["gpu"] } @router.websocket("/stream") async def system_stream(websocket: WebSocket): await websocket.accept() try: while True: # Sende Live-Daten alle 500ms data = system_status() await websocket.send_json(data) await asyncio.sleep(0.5) except WebSocketDisconnect: pass