"""Utilidades de control de runtime del servidor MP Manager. Centraliza la lógica de: * leer/validar `generated/runtime/server_info.json`, * confirmar que un PID es realmente MP Manager (y no otro proyecto Python), * detectar quién ocupa un puerto, * apagar el servidor de forma segura (sin matar procesos ajenos), * esperar a que el server termine de arrancar (server_info.json escrito). Pensado para que los batch files (`start.bat`, `stop.bat`, `restart.bat`) lo invoquen como CLI sin tener que hacer parsing de JSON o validaciones complejas en lenguaje batch. Uso CLI: python runtime_control.py status # imprime estado actual python runtime_control.py stop # apaga MP Manager (sin tocar otros) python runtime_control.py wait-ready # espera a que server_info.json aparezca, imprime el puerto python runtime_control.py port-owner P # imprime quién ocupa el puerto P """ from __future__ import annotations import json import os import socket import subprocess import sys import time from typing import Optional import paths # Indicador inequívoco de que un proceso es MP Manager (vs Transcriptor u otro # proyecto Python). Lo buscamos en la línea de comando del proceso. PROJECT_MARKER = os.path.normcase(paths.BASE_DIR) def is_port_in_use(port: int, host: str = "127.0.0.1") -> bool: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: s.bind((host, port)) return False except OSError: return True def get_pid_listening_on(port: int) -> Optional[int]: """Devuelve el PID que escucha en `port` (LISTENING), o None si nadie.""" try: out = subprocess.check_output( ["netstat", "-ano"], text=True, stderr=subprocess.DEVNULL ) except Exception: return None needle = f":{port}" for line in out.splitlines(): if needle in line and "LISTENING" in line: parts = line.split() if parts and parts[-1].isdigit(): # Asegurar que el match es del puerto exacto (no de un sufijo) local = parts[1] if len(parts) >= 2 else "" if local.endswith(needle): return int(parts[-1]) return None def _run_powershell(ps_command: str) -> Optional[str]: """Ejecuta un comando PowerShell y devuelve stdout (stripped) o None si falla. Usamos PowerShell porque `wmic` ya no viene en Windows 11. `Get-CimInstance` es el reemplazo oficial y siempre está disponible. """ try: out = subprocess.check_output( ["powershell", "-NoProfile", "-NonInteractive", "-Command", ps_command], text=True, stderr=subprocess.DEVNULL, encoding="utf-8", errors="replace", ) except Exception: return None return out.strip() or None def get_process_cmdline(pid: int) -> Optional[str]: """Devuelve la línea de comando de un PID en Windows, o None si no existe.""" return _run_powershell( f"(Get-CimInstance Win32_Process -Filter 'ProcessId={pid}' -ErrorAction SilentlyContinue).CommandLine" ) def pid_is_alive(pid: int) -> bool: """True si el PID existe. Más confiable que parsear tasklist.""" result = _run_powershell( f"if (Get-Process -Id {pid} -ErrorAction SilentlyContinue) {{ 'yes' }} else {{ 'no' }}" ) return result == "yes" def pid_belongs_to_mp_manager(pid: int) -> bool: """True si el PID es ciertamente MP Manager. Conservador: ante la duda, False. Fuente primaria (más fuerte): `generated/runtime/server_info.json` del proyecto registró este PID. Como ese archivo SOLO lo escribe `main.py` de este proyecto, el match es prueba inequívoca de identidad. Fuente secundaria (cmdline): si los .bat lanzan `python "/main.py"`, el path del proyecto queda embebido en la línea de comando del proceso y podemos reconocerlo aunque el server_info.json esté ausente o stale. """ info = read_server_info() if info and info.get("pid") == pid: return True cmd = get_process_cmdline(pid) if not cmd: return False norm = os.path.normcase(cmd) if PROJECT_MARKER in norm: return True return False def get_process_cwd(pid: int) -> Optional[str]: """Best-effort: obtener el directorio del ejecutable del proceso. Win32_Process no expone el cwd real; el ExecutablePath sirve como pista cuando el ejecutable vive dentro del proyecto. Para nuestro caso, lo más informativo es la línea de comando completa, así que esta función es secundaria. """ out = _run_powershell( f"(Get-CimInstance Win32_Process -Filter 'ProcessId={pid}' -ErrorAction SilentlyContinue).ExecutablePath" ) if out: return os.path.dirname(out) return None def read_server_info() -> Optional[dict]: if not os.path.exists(paths.SERVER_INFO): return None try: # utf-8-sig tolera BOM por si alguien escribe el archivo con PowerShell # u otra herramienta que añada  al inicio. with open(paths.SERVER_INFO, "r", encoding="utf-8-sig") as f: return json.load(f) except Exception: return None def clear_stale_server_info() -> bool: """Si server_info.json apunta a un PID muerto, lo borra. True si limpió.""" info = read_server_info() if not info: return False pid = info.get("pid") if pid and pid_is_alive(int(pid)): return False try: os.remove(paths.SERVER_INFO) return True except OSError: return False def status() -> dict: """Estado actual: ¿hay MP Manager corriendo? ¿quién ocupa el puerto?""" info = read_server_info() result = { "registered": info, "registered_alive": False, "registered_is_ours": False, "port_8000_pid": None, "port_8000_is_ours": False, "port_8000_cmdline": None, } if info and info.get("pid"): pid = int(info["pid"]) if pid_is_alive(pid): result["registered_alive"] = True result["registered_is_ours"] = pid_belongs_to_mp_manager(pid) port_pid = get_pid_listening_on(8000) if port_pid: result["port_8000_pid"] = port_pid result["port_8000_is_ours"] = pid_belongs_to_mp_manager(port_pid) result["port_8000_cmdline"] = get_process_cmdline(port_pid) return result def stop_server(force: bool = False) -> int: """Apaga MP Manager si está corriendo. Devuelve exit code (0 = OK, 1 = no hay). - Si server_info.json existe y el PID es MP Manager → mata el árbol. - Si server_info.json existe pero el PID NO es MP Manager → limpia el archivo, no mata nada. - Si server_info.json no existe → mira si el puerto 8000 lo tiene MP Manager (otra instancia sin registro). Solo lo mata si pasa `force=True`. """ info = read_server_info() killed_any = False if info and info.get("pid"): pid = int(info["pid"]) if pid_is_alive(pid): if pid_belongs_to_mp_manager(pid): _kill_tree(pid) killed_any = True print(f"[SISTEMA] Detenido MP Manager (PID {pid}, puerto {info.get('port')}).") else: cmd = get_process_cmdline(pid) or "(desconocido)" print(f"[ADVERTENCIA] server_info apunta al PID {pid} pero NO es MP Manager:") print(f" {cmd[:200]}") print(f" No se mata. Limpiando server_info.json.") else: print(f"[SISTEMA] server_info apuntaba al PID {pid} pero ya estaba muerto. Limpiando registro.") try: os.remove(paths.SERVER_INFO) except OSError: pass if killed_any: return 0 # Sin registro: buscar instancias sin server_info.json (raro) port_pid = get_pid_listening_on(8000) if port_pid and pid_belongs_to_mp_manager(port_pid): if force: _kill_tree(port_pid) print(f"[SISTEMA] Detenida instancia huérfana de MP Manager en puerto 8000 (PID {port_pid}).") return 0 print(f"[SISTEMA] Hay un MP Manager huérfano en puerto 8000 (PID {port_pid}) sin server_info.json.") print(f" Para apagarlo: python runtime_control.py stop --force") return 1 print("[SISTEMA] No hay servidor MP Manager activo corriendo.") return 1 def _kill_tree(pid: int) -> None: subprocess.run( ["taskkill", "/F", "/T", "/PID", str(pid)], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) def wait_ready(timeout_sec: float = 30.0) -> int: """Espera a que server_info.json aparezca con un PID vivo. Imprime el puerto.""" deadline = time.time() + timeout_sec while time.time() < deadline: info = read_server_info() if info and info.get("pid") and pid_is_alive(int(info["pid"])): port = info.get("port", 8000) print(port) return 0 time.sleep(0.5) print("[ERROR] Timeout esperando a que MP Manager arranque.", file=sys.stderr) return 2 def preflight_check() -> int: """Validación pre-arranque. Devuelve: 0 = OK, puede lanzarse 1 = ya hay MP Manager corriendo (no relanzar) 2 = puerto 8000 ocupado por otro proceso (advertir pero lanzar — main.py salta al siguiente puerto libre) """ clear_stale_server_info() info = read_server_info() if info and info.get("pid") and pid_is_alive(int(info["pid"])): port = info.get("port", 8000) print(f"[INFO] MP Manager ya esta corriendo en puerto {port} (PID {info['pid']}).") print(f" Abre http://127.0.0.1:{port}/ o ejecuta stop.bat primero.") return 1 port_pid = get_pid_listening_on(8000) if port_pid: if pid_belongs_to_mp_manager(port_pid): print(f"[INFO] Hay un MP Manager huerfano en puerto 8000 (PID {port_pid}).") print(f" Ejecuta stop.bat o usa: python runtime_control.py stop --force") return 1 cmd = get_process_cmdline(port_pid) or "(desconocido)" print(f"[ADVERTENCIA] Puerto 8000 ocupado por OTRO proyecto (PID {port_pid}):") print(f" {cmd[:150]}") print(f" MP Manager arrancara en el siguiente puerto libre.") return 2 print("[OK] Puerto 8000 libre. Listo para arrancar.") return 0 def main(argv: list) -> int: if not argv: print(__doc__) return 0 cmd = argv[0] if cmd == "status": s = status() print(json.dumps(s, indent=2, ensure_ascii=False)) return 0 if cmd == "stop": force = "--force" in argv[1:] return stop_server(force=force) if cmd == "wait-ready": timeout = 30.0 for arg in argv[1:]: if arg.startswith("--timeout="): try: timeout = float(arg.split("=", 1)[1]) except ValueError: pass return wait_ready(timeout_sec=timeout) if cmd == "preflight": return preflight_check() if cmd == "port-owner": if len(argv) < 2: print("uso: port-owner ", file=sys.stderr) return 2 try: p = int(argv[1]) except ValueError: print("puerto invalido", file=sys.stderr) return 2 pid = get_pid_listening_on(p) if pid is None: print(f"puerto {p} libre") return 0 cmd_line = get_process_cmdline(pid) or "(desconocido)" ours = pid_belongs_to_mp_manager(pid) print(f"PID {pid} (es MP Manager: {ours})") print(f" {cmd_line}") return 0 print(f"comando desconocido: {cmd}", file=sys.stderr) return 2 if __name__ == "__main__": sys.exit(main(sys.argv[1:]))