""" Mini Job-System: Hintergrund-Prozesse mit Live-Log. Bewusst KISS: ein In-Memory-Dict, ein Daemon-Thread je Job, Subprocess mit zeilenweisem Log-Capture. Keine Persistenz, kein Broker. Genutzt von allen Routern, die laenger laufende Shell-Befehle anstossen (Download, Update, ...). """ import os import shlex import subprocess import threading import time import uuid JOBS: dict[str, dict] = {} _LOG_CAP = 400 def _run_job(job_id: str, args: list[str], env: dict | None = None): job = JOBS[job_id] job["state"] = "running" try: proc = subprocess.Popen( args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, bufsize=1, env={**os.environ, **(env or {})}, ) for line in proc.stdout: # type: ignore[union-attr] job["log"].append(line.rstrip("\n")) if len(job["log"]) > _LOG_CAP: del job["log"][0] proc.wait() job["returncode"] = proc.returncode job["state"] = "done" if proc.returncode == 0 else "failed" except Exception as exc: # noqa: BLE001 job["log"].append(f"[mission-control] Fehler: {exc}") job["state"] = "failed" job["returncode"] = -1 job["finished_at"] = time.time() def start_job(args: list[str], label: str, env: dict | None = None) -> str: job_id = uuid.uuid4().hex[:12] JOBS[job_id] = { "id": job_id, "label": label, "state": "queued", "log": [f"$ {' '.join(shlex.quote(a) for a in args)}"], "returncode": None, "started_at": time.time(), "finished_at": None, } threading.Thread(target=_run_job, args=(job_id, args, env), daemon=True).start() return job_id