"""Utilidades de control de runtime del servidor MP Manager. Centraliza la lógica de: * leer/validar `generated/runtime/server_info.json`, * confirmar que un PID es realmente MP Manager (y no otro proyecto Python), * detectar quién ocupa un puerto, * apagar el servidor de forma segura (sin matar procesos ajenos), * esperar a que el server termine de arrancar (server_info.json escrito). Pensado para que los launchers (Windows: `start.bat`/`stop.bat`/`restart.bat`; macOS/Linux: `start.command`/`stop.command`/`restart.command`) lo invoquen como CLI sin tener que hacer parsing de JSON o validaciones complejas en shell. La lógica de OS (puertos, PIDs, kill) es cross-platform: ver `IS_WINDOWS`. Uso CLI: python runtime_control.py status # imprime estado actual python runtime_control.py stop # apaga MP Manager (sin tocar otros) python runtime_control.py wait-ready # espera a que server_info.json aparezca, imprime el puerto python runtime_control.py port-owner P # imprime quién ocupa el puerto P """ from __future__ import annotations import json import os import signal import socket import subprocess import sys import time from typing import Optional import paths # Indicador inequívoco de que un proceso es MP Manager (vs Transcriptor u otro # proyecto Python). Lo buscamos en la línea de comando del proceso. PROJECT_MARKER = os.path.normcase(paths.BASE_DIR) # Las primitivas de OS (descubrir quién ocupa un puerto, leer la línea de comando # de un PID, matarlo) difieren entre Windows y POSIX (macOS/Linux). El resto del # módulo es agnóstico: dispatcha según esta bandera. IS_WINDOWS = os.name == "nt" def is_port_in_use(port: int, host: str = "127.0.0.1") -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind((host, port)) return False except OSError: return True def get_pid_listening_on(port: int) -> Optional[int]: """Devuelve el PID que escucha en `port` (LISTENING), o None si nadie.""" if IS_WINDOWS: return _get_pid_listening_on_windows(port) return _get_pid_listening_on_posix(port) def _get_pid_listening_on_windows(port: int) -> Optional[int]: try: out = subprocess.check_output( ["netstat", "-ano"], text=True, stderr=subprocess.DEVNULL ) except Exception: return None needle = f":{port}" for line in out.splitlines(): if needle in line and "LISTENING" in line: parts = line.split() if parts and parts[-1].isdigit(): # Asegurar que el match es del puerto exacto (no de un sufijo) local = parts[1] if len(parts) >= 2 else "" if local.endswith(needle): return int(parts[-1]) return None def _get_pid_listening_on_posix(port: int) -> Optional[int]: """macOS/Linux: `lsof` lista el/los PID que escuchan en el puerto. `-sTCP:LISTEN` acota a sockets en estado LISTEN, `-t` devuelve solo PIDs. Si hay varios (p.ej. el reloader de uvicorn y su worker comparten socket), devolvemos el primero; `_kill_tree` se encarga del árbol completo. """ try: out = subprocess.check_output( ["lsof", "-nP", f"-iTCP:{port}", "-sTCP:LISTEN", "-t"], text=True, stderr=subprocess.DEVNULL, ) except Exception: return None for line in out.split(): if line.strip().isdigit(): return int(line.strip()) return None def _run_powershell(ps_command: str) -> Optional[str]: """Ejecuta un comando PowerShell y devuelve stdout (stripped) o None si falla. Usamos PowerShell porque `wmic` ya no viene en Windows 11. `Get-CimInstance` es el reemplazo oficial y siempre está disponible. """ try: out = subprocess.check_output( ["powershell", "-NoProfile", "-NonInteractive", "-Command", ps_command], text=True, stderr=subprocess.DEVNULL, encoding="utf-8", errors="replace", ) except Exception: return None return out.strip() or None def get_process_cmdline(pid: int) -> Optional[str]: """Devuelve la línea de comando completa de un PID, o None si no existe.""" if IS_WINDOWS: return _run_powershell( f"(Get-CimInstance Win32_Process -Filter 'ProcessId={pid}' -ErrorAction SilentlyContinue).CommandLine" ) # POSIX (macOS/Linux): `ps -o command=` imprime la línea de comando completa # sin encabezado. En macOS el límite por defecto es amplio; suficiente para # contener la ruta absoluta a main.py que es lo que buscamos. try: out = subprocess.check_output( ["ps", "-p", str(pid), "-o", "command="], text=True, stderr=subprocess.DEVNULL, ) except Exception: return None return out.strip() or None def pid_is_alive(pid: int) -> bool: """True si el PID existe.""" if IS_WINDOWS: result = _run_powershell( f"if (Get-Process -Id {pid} -ErrorAction SilentlyContinue) {{ 'yes' }} else {{ 'no' }}" ) return result == "yes" # POSIX: señal 0 no mata; solo verifica existencia/permisos del proceso. try: os.kill(pid, 0) return True except ProcessLookupError: return False except PermissionError: # Existe pero es de otro usuario; para nuestros fines, está vivo. return True except OSError: return False def pid_belongs_to_mp_manager(pid: int) -> bool: """True si el PID es ciertamente MP Manager. Conservador: ante la duda, False. Fuente primaria (más fuerte): `generated/runtime/server_info.json` del proyecto registró este PID. Como ese archivo SOLO lo escribe `main.py` de este proyecto, el match es prueba inequívoca de identidad. Fuente secundaria (cmdline): si los launchers lanzan `python "/main.py"`, el path del proyecto queda embebido en la línea de comando del proceso y podemos reconocerlo aunque el server_info.json esté ausente o stale. """ info = read_server_info() if info and info.get("pid") == pid: return True cmd = get_process_cmdline(pid) if not cmd: return False norm = os.path.normcase(cmd) if PROJECT_MARKER in norm: return True return False def get_process_cwd(pid: int) -> Optional[str]: """Best-effort: obtener el directorio del ejecutable del proceso. Win32_Process no expone el cwd real; el ExecutablePath sirve como pista cuando el ejecutable vive dentro del proyecto. Para nuestro caso, lo más informativo es la línea de comando completa, así que esta función es secundaria. """ if IS_WINDOWS: out = _run_powershell( f"(Get-CimInstance Win32_Process -Filter 'ProcessId={pid}' -ErrorAction SilentlyContinue).ExecutablePath" ) if out: return os.path.dirname(out) return None # POSIX: no hay un cwd fiable sin /proc (Linux) y macOS no lo expone vía ps. # La línea de comando completa es la pista útil para nosotros, así que esta # función queda como best-effort y devuelve None. return None def read_server_info() -> Optional[dict]: if not os.path.exists(paths.SERVER_INFO): return None try: # utf-8-sig tolera BOM por si alguien escribe el archivo con PowerShell # u otra herramienta que añada  al inicio. with open(paths.SERVER_INFO, "r", encoding="utf-8-sig") as f: return json.load(f) except Exception: return None def clear_stale_server_info() -> bool: """Si server_info.json apunta a un PID muerto, lo borra. True si limpió.""" info = read_server_info() if not info: return False pid = info.get("pid") if pid and pid_is_alive(int(pid)): return False try: os.remove(paths.SERVER_INFO) return True except OSError: return False def status() -> dict: """Estado actual: ¿hay MP Manager corriendo? ¿quién ocupa el puerto?""" info = read_server_info() result = { "registered": info, "registered_alive": False, "registered_is_ours": False, "port_8000_pid": None, "port_8000_is_ours": False, "port_8000_cmdline": None, } if info and info.get("pid"): pid = int(info["pid"]) if pid_is_alive(pid): result["registered_alive"] = True result["registered_is_ours"] = pid_belongs_to_mp_manager(pid) port_pid = get_pid_listening_on(8000) if port_pid: result["port_8000_pid"] = port_pid result["port_8000_is_ours"] = pid_belongs_to_mp_manager(port_pid) result["port_8000_cmdline"] = get_process_cmdline(port_pid) return result def stop_server(force: bool = False) -> int: """Apaga MP Manager si está corriendo. Devuelve exit code (0 = OK, 1 = no hay). - Si server_info.json existe y el PID es MP Manager → mata el árbol. - Si server_info.json existe pero el PID NO es MP Manager → limpia el archivo, no mata nada. - Si server_info.json no existe → mira si el puerto 8000 lo tiene MP Manager (otra instancia sin registro). Solo lo mata si pasa `force=True`. """ info = read_server_info() killed_any = False if info and info.get("pid"): pid = int(info["pid"]) if pid_is_alive(pid): if pid_belongs_to_mp_manager(pid): _kill_tree(pid) killed_any = True print(f"[SISTEMA] Detenido MP Manager (PID {pid}, puerto {info.get('port')}).") else: cmd = get_process_cmdline(pid) or "(desconocido)" print(f"[ADVERTENCIA] server_info apunta al PID {pid} pero NO es MP Manager:") print(f" {cmd[:200]}") print(f" No se mata. Limpiando server_info.json.") else: print(f"[SISTEMA] server_info apuntaba al PID {pid} pero ya estaba muerto. Limpiando registro.") try: os.remove(paths.SERVER_INFO) except OSError: pass if killed_any: return 0 # Sin registro: buscar instancias sin server_info.json (raro) port_pid = get_pid_listening_on(8000) if port_pid and pid_belongs_to_mp_manager(port_pid): if force: _kill_tree(port_pid) print(f"[SISTEMA] Detenida instancia huérfana de MP Manager en puerto 8000 (PID {port_pid}).") return 0 print(f"[SISTEMA] Hay un MP Manager huérfano en puerto 8000 (PID {port_pid}) sin server_info.json.") print(f" Para apagarlo: python runtime_control.py stop --force") return 1 print("[SISTEMA] No hay servidor MP Manager activo corriendo.") return 1 def _kill_tree(pid: int) -> None: if IS_WINDOWS: subprocess.run( ["taskkill", "/F", "/T", "/PID", str(pid)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) return _kill_tree_posix(pid) def _kill_tree_posix(pid: int) -> None: """Mata recursivamente el proceso y sus hijos en macOS/Linux. uvicorn arranca con `reload=True`, lo que crea un proceso reloader padre y un worker hijo. El PID registrado en server_info.json es el del padre, así que hay que descender por el árbol (`pgrep -P`) para no dejar al worker zombie aferrado al puerto. Enviamos SIGTERM (apagado limpio); el padre propaga la señal a uvicorn que cierra el socket ordenadamente. """ try: out = subprocess.check_output( ["pgrep", "-P", str(pid)], text=True, stderr=subprocess.DEVNULL ) children = [int(c) for c in out.split() if c.strip().isdigit()] except Exception: children = [] for child in children: _kill_tree_posix(child) try: os.kill(pid, signal.SIGTERM) except (ProcessLookupError, OSError): pass def wait_ready(timeout_sec: float = 30.0) -> int: """Espera a que server_info.json aparezca con un PID vivo. Imprime el puerto.""" deadline = time.time() + timeout_sec while time.time() < deadline: info = read_server_info() if info and info.get("pid") and pid_is_alive(int(info["pid"])): port = info.get("port", 8000) print(port) return 0 time.sleep(0.5) print("[ERROR] Timeout esperando a que MP Manager arranque.", file=sys.stderr) return 2 def preflight_check() -> int: """Validación pre-arranque. Devuelve: 0 = OK, puede lanzarse 1 = ya hay MP Manager corriendo (no relanzar) 2 = puerto 8000 ocupado por otro proceso (advertir pero lanzar — main.py salta al siguiente puerto libre) """ clear_stale_server_info() info = read_server_info() if info and info.get("pid") and pid_is_alive(int(info["pid"])): port = info.get("port", 8000) print(f"[INFO] MP Manager ya esta corriendo en puerto {port} (PID {info['pid']}).") print(f" Abre http://127.0.0.1:{port}/ o detén primero (stop.bat / stop.command).") return 1 port_pid = get_pid_listening_on(8000) if port_pid: if pid_belongs_to_mp_manager(port_pid): print(f"[INFO] Hay un MP Manager huerfano en puerto 8000 (PID {port_pid}).") print(f" Detén (stop.bat / stop.command) o usa: python runtime_control.py stop --force") return 1 cmd = get_process_cmdline(port_pid) or "(desconocido)" print(f"[ADVERTENCIA] Puerto 8000 ocupado por OTRO proyecto (PID {port_pid}):") print(f" {cmd[:150]}") print(f" MP Manager arrancara en el siguiente puerto libre.") return 2 print("[OK] Puerto 8000 libre. Listo para arrancar.") return 0 def main(argv: list) -> int: if not argv: print(__doc__) return 0 cmd = argv[0] if cmd == "status": s = status() print(json.dumps(s, indent=2, ensure_ascii=False)) return 0 if cmd == "stop": force = "--force" in argv[1:] return stop_server(force=force) if cmd == "wait-ready": timeout = 30.0 for arg in argv[1:]: if arg.startswith("--timeout="): try: timeout = float(arg.split("=", 1)[1]) except ValueError: pass return wait_ready(timeout_sec=timeout) if cmd == "preflight": return preflight_check() if cmd == "port-owner": if len(argv) < 2: print("uso: port-owner ", file=sys.stderr) return 2 try: p = int(argv[1]) except ValueError: print("puerto invalido", file=sys.stderr) return 2 pid = get_pid_listening_on(p) if pid is None: print(f"puerto {p} libre") return 0 cmd_line = get_process_cmdline(pid) or "(desconocido)" ours = pid_belongs_to_mp_manager(pid) print(f"PID {pid} (es MP Manager: {ours})") print(f" {cmd_line}") return 0 print(f"comando desconocido: {cmd}", file=sys.stderr) return 2 if __name__ == "__main__": sys.exit(main(sys.argv[1:]))