""" Mission Control - eine schlanke Steuerzentrale fuer einen lokalen llama-swap Stack. Was sie macht: - zeigt konfigurierte + laufende Modelle und ihre Ports (liest llama-swap /running + /v1/models) - laedt neue GGUF-Modelle von HuggingFace (hf download, als Hintergrund-Job) - pflegt ein heruntergeladenes Modell automatisch in die llama-swap config.yaml ein - stoesst Updates an (Container / Toolbox refresh) - laedt Modelle aus dem Speicher (unload) und hat einen kleinen Chat-Test Bewusst KISS: ein File, In-Memory Jobs, keine Datenbank. """ import os import shlex import subprocess import threading import time import uuid from pathlib import Path import httpx from fastapi import Depends, FastAPI, Header, HTTPException from fastapi.responses import FileResponse, JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel from ruamel.yaml import YAML from ruamel.yaml.scalarstring import LiteralScalarString # --------------------------------------------------------------------------- # Konfiguration (alles ueber Umgebungsvariablen ueberschreibbar) # --------------------------------------------------------------------------- LLAMA_SWAP_URL = os.environ.get("MC_LLAMA_SWAP_URL", "http://127.0.0.1:8080").rstrip("/") CONFIG_PATH = Path(os.environ.get("MC_CONFIG_PATH", "/etc/llama-swap/config.yaml")) MODELS_DIR = Path(os.environ.get("MC_MODELS_DIR", "/srv/models")) # Befehl, der zum Starten eines Modells in die config.yaml geschrieben wird. # {model} = Pfad zur GGUF-Datei, {ctx} = Kontextlaenge, ${PORT} bleibt fuer llama-swap stehen. # WICHTIG: an deinen Container-/llama-server-Aufruf anpassen (siehe README). CMD_TEMPLATE = os.environ.get( "MC_CMD_TEMPLATE", "llama-server -m {model} --host 127.0.0.1 --port ${PORT} " "-c {ctx} -ngl 999 -fa 1 --no-mmap", ) # Befehl fuer "Container/Toolbox aktualisieren". Standard: kyuz0 refresh-Skript. UPDATE_CMD = os.environ.get("MC_UPDATE_CMD", "") DEFAULT_TTL = int(os.environ.get("MC_DEFAULT_TTL", "300")) TOKEN = os.environ.get("MC_TOKEN", "") # leer = keine Auth (nur LAN!) yaml = YAML() yaml.preserve_quotes = True app = FastAPI(title="Mission Control") # --------------------------------------------------------------------------- # Mini Job-System (Hintergrund-Prozesse mit Live-Log) # --------------------------------------------------------------------------- JOBS: dict[str, dict] = {} _LOG_CAP = 400 def _run_job(job_id: str, args: list[str], env: dict | None = None): job = JOBS[job_id] job["state"] = "running" try: proc = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, env={**os.environ, **(env or {})}, ) for line in proc.stdout: # type: ignore[union-attr] job["log"].append(line.rstrip("\n")) if len(job["log"]) > _LOG_CAP: del job["log"][0] proc.wait() job["returncode"] = proc.returncode job["state"] = "done" if proc.returncode == 0 else "failed" except Exception as exc: # noqa: BLE001 job["log"].append(f"[mission-control] Fehler: {exc}") job["state"] = "failed" job["returncode"] = -1 job["finished_at"] = time.time() def start_job(args: list[str], label: str, env: dict | None = None) -> str: job_id = uuid.uuid4().hex[:12] JOBS[job_id] = { "id": job_id, "label": label, "state": "queued", "log": [f"$ {' '.join(shlex.quote(a) for a in args)}"], "returncode": None, "started_at": time.time(), "finished_at": None, } threading.Thread(target=_run_job, args=(job_id, args, env), daemon=True).start() return job_id # --------------------------------------------------------------------------- # Auth (optional) # --------------------------------------------------------------------------- def auth(x_mc_token: str = Header(default="")): if TOKEN and x_mc_token != TOKEN: raise HTTPException(status_code=401, detail="Falsches oder fehlendes Token.") # --------------------------------------------------------------------------- # llama-swap Helfer # --------------------------------------------------------------------------- def _swap_get(path: str): with httpx.Client(timeout=5.0) as c: r = c.get(f"{LLAMA_SWAP_URL}{path}") r.raise_for_status() return r.json() def read_config() -> dict: if not CONFIG_PATH.exists(): return {"models": {}} with CONFIG_PATH.open("r", encoding="utf-8") as f: data = yaml.load(f) or {} if "models" not in data or data["models"] is None: data["models"] = {} return data # --------------------------------------------------------------------------- # Request-Modelle # --------------------------------------------------------------------------- class DownloadReq(BaseModel): repo: str file: str subdir: str | None = None class RegisterReq(BaseModel): alias: str model_path: str ctx: int = 8192 ttl: int | None = None class ChatReq(BaseModel): model: str message: str # --------------------------------------------------------------------------- # API # --------------------------------------------------------------------------- @app.get("/api/status", dependencies=[Depends(auth)]) def status(): cfg = read_config() configured = {} for name, spec in (cfg.get("models") or {}).items(): spec = spec or {} configured[name] = { "name": name, "ttl": spec.get("ttl", cfg.get("globalTTL", 0)), "cmd": str(spec.get("cmd", "")).strip(), "state": "idle", "port": None, } swap_ok = True try: running = _swap_get("/running") items = running.get("running", running) if isinstance(running, dict) else running for item in items or []: mid = item.get("model") or item.get("id") or item.get("name") if mid in configured: configured[mid]["state"] = item.get("state", "running") configured[mid]["port"] = item.get("port") elif mid: configured[mid] = { "name": mid, "ttl": None, "cmd": "", "state": item.get("state", "running"), "port": item.get("port"), } except Exception: # noqa: BLE001 swap_ok = False return { "swap_ok": swap_ok, "swap_url": LLAMA_SWAP_URL, "config_path": str(CONFIG_PATH), "models_dir": str(MODELS_DIR), "models": list(configured.values()), } @app.post("/api/download", dependencies=[Depends(auth)]) def download(req: DownloadReq): sub = req.subdir or req.repo.split("/")[-1] target = MODELS_DIR / sub target.mkdir(parents=True, exist_ok=True) args = ["hf", "download", req.repo, req.file, "--local-dir", str(target)] job_id = start_job(args, f"download {req.repo}/{req.file}", env={"HF_XET_HIGH_PERFORMANCE": "1"}) JOBS[job_id]["result_path"] = str(target / req.file) return {"job_id": job_id, "expected_path": str(target / req.file)} @app.post("/api/register", dependencies=[Depends(auth)]) def register(req: RegisterReq): if not Path(req.model_path).exists(): raise HTTPException(404, f"Datei nicht gefunden: {req.model_path}") cfg = read_config() cmd = CMD_TEMPLATE.replace("{model}", req.model_path).replace("{ctx}", str(req.ctx)) cfg["models"][req.alias] = { "cmd": LiteralScalarString(cmd + "\n"), "ttl": req.ttl if req.ttl is not None else DEFAULT_TTL, } CONFIG_PATH.parent.mkdir(parents=True, exist_ok=True) with CONFIG_PATH.open("w", encoding="utf-8") as f: yaml.dump(cfg, f) return {"ok": True, "alias": req.alias, "note": "In config.yaml geschrieben. llama-swap mit -watch-config laedt automatisch neu."} @app.post("/api/unload", dependencies=[Depends(auth)]) def unload(model: str | None = None): path = f"/api/models/unload/{model}" if model else "/api/models/unload" try: with httpx.Client(timeout=10.0) as c: r = c.post(f"{LLAMA_SWAP_URL}{path}") return {"ok": r.status_code < 400, "status": r.status_code} except Exception as exc: # noqa: BLE001 raise HTTPException(502, f"llama-swap nicht erreichbar: {exc}") @app.post("/api/update", dependencies=[Depends(auth)]) def update(): if not UPDATE_CMD: raise HTTPException(400, "Kein Update-Befehl gesetzt (MC_UPDATE_CMD).") job_id = start_job(shlex.split(UPDATE_CMD), "update containers") return {"job_id": job_id} @app.post("/api/chat", dependencies=[Depends(auth)]) def chat(req: ChatReq): payload = {"model": req.model, "messages": [{"role": "user", "content": req.message}]} try: with httpx.Client(timeout=120.0) as c: r = c.post(f"{LLAMA_SWAP_URL}/v1/chat/completions", json=payload) r.raise_for_status() data = r.json() return {"reply": data["choices"][0]["message"]["content"]} except Exception as exc: # noqa: BLE001 raise HTTPException(502, f"Anfrage fehlgeschlagen: {exc}") @app.get("/api/jobs/{job_id}", dependencies=[Depends(auth)]) def job_status(job_id: str): job = JOBS.get(job_id) if not job: raise HTTPException(404, "Job nicht gefunden.") return job @app.get("/api/jobs", dependencies=[Depends(auth)]) def jobs_list(): return sorted(JOBS.values(), key=lambda j: j["started_at"], reverse=True)[:20] # --------------------------------------------------------------------------- # Statisches UI # --------------------------------------------------------------------------- @app.get("/") def index(): return FileResponse(Path(__file__).parent / "static" / "index.html") app.mount("/static", StaticFiles(directory=Path(__file__).parent / "static"), name="static") @app.exception_handler(HTTPException) def _http_exc(_req, exc: HTTPException): return JSONResponse(status_code=exc.status_code, content={"error": exc.detail})