Files
MP-Manager/runtime_control.py
urieljareth 2a37a4ffbf Añade launchers de macOS/Linux con un click y hace runtime_control cross-platform
Equivalentes .command (doble-clic en Finder) de los .bat de Windows:
- setup_mac.command: bootstrap con un click (detecta Python 3.10+, crea .venv,
  instala requirements + Chromium de Playwright, copia .env.example -> .env).
- start/stop/restart/start_persistent_profile.command: espejo de los .bat,
  lanzan el server con nohup usando el python del .venv.
- mp_common.sh: helper compartido (raíz, venv, banners).

runtime_control.py ahora es cross-platform (IS_WINDOWS): lsof/ps/pgrep/kill en
POSIX, netstat/PowerShell/taskkill en Windows. _kill_tree_posix mata el árbol
padre+worker de uvicorn con SIGTERM.

.venv/ añadido a .gitignore. Docs actualizadas (CLAUDE.md, AGENTS.md,
PLAYWRIGHT_SESSION.md).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-30 15:02:47 -06:00

433 lines
15 KiB
Python
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Utilidades de control de runtime del servidor MP Manager.
Centraliza la lógica de:
* leer/validar `generated/runtime/server_info.json`,
* confirmar que un PID es realmente MP Manager (y no otro proyecto Python),
* detectar quién ocupa un puerto,
* apagar el servidor de forma segura (sin matar procesos ajenos),
* esperar a que el server termine de arrancar (server_info.json escrito).
Pensado para que los launchers (Windows: `start.bat`/`stop.bat`/`restart.bat`;
macOS/Linux: `start.command`/`stop.command`/`restart.command`) lo invoquen como
CLI sin tener que hacer parsing de JSON o validaciones complejas en shell. La
lógica de OS (puertos, PIDs, kill) es cross-platform: ver `IS_WINDOWS`.
Uso CLI:
python runtime_control.py status # imprime estado actual
python runtime_control.py stop # apaga MP Manager (sin tocar otros)
python runtime_control.py wait-ready # espera a que server_info.json aparezca, imprime el puerto
python runtime_control.py port-owner P # imprime quién ocupa el puerto P
"""
from __future__ import annotations
import json
import os
import signal
import socket
import subprocess
import sys
import time
from typing import Optional
import paths
# Indicador inequívoco de que un proceso es MP Manager (vs Transcriptor u otro
# proyecto Python). Lo buscamos en la línea de comando del proceso.
PROJECT_MARKER = os.path.normcase(paths.BASE_DIR)
# Las primitivas de OS (descubrir quién ocupa un puerto, leer la línea de comando
# de un PID, matarlo) difieren entre Windows y POSIX (macOS/Linux). El resto del
# módulo es agnóstico: dispatcha según esta bandera.
IS_WINDOWS = os.name == "nt"
def is_port_in_use(port: int, host: str = "127.0.0.1") -> bool:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind((host, port))
return False
except OSError:
return True
def get_pid_listening_on(port: int) -> Optional[int]:
"""Devuelve el PID que escucha en `port` (LISTENING), o None si nadie."""
if IS_WINDOWS:
return _get_pid_listening_on_windows(port)
return _get_pid_listening_on_posix(port)
def _get_pid_listening_on_windows(port: int) -> Optional[int]:
try:
out = subprocess.check_output(
["netstat", "-ano"], text=True, stderr=subprocess.DEVNULL
)
except Exception:
return None
needle = f":{port}"
for line in out.splitlines():
if needle in line and "LISTENING" in line:
parts = line.split()
if parts and parts[-1].isdigit():
# Asegurar que el match es del puerto exacto (no de un sufijo)
local = parts[1] if len(parts) >= 2 else ""
if local.endswith(needle):
return int(parts[-1])
return None
def _get_pid_listening_on_posix(port: int) -> Optional[int]:
"""macOS/Linux: `lsof` lista el/los PID que escuchan en el puerto.
`-sTCP:LISTEN` acota a sockets en estado LISTEN, `-t` devuelve solo PIDs.
Si hay varios (p.ej. el reloader de uvicorn y su worker comparten socket),
devolvemos el primero; `_kill_tree` se encarga del árbol completo.
"""
try:
out = subprocess.check_output(
["lsof", "-nP", f"-iTCP:{port}", "-sTCP:LISTEN", "-t"],
text=True,
stderr=subprocess.DEVNULL,
)
except Exception:
return None
for line in out.split():
if line.strip().isdigit():
return int(line.strip())
return None
def _run_powershell(ps_command: str) -> Optional[str]:
"""Ejecuta un comando PowerShell y devuelve stdout (stripped) o None si falla.
Usamos PowerShell porque `wmic` ya no viene en Windows 11. `Get-CimInstance`
es el reemplazo oficial y siempre está disponible.
"""
try:
out = subprocess.check_output(
["powershell", "-NoProfile", "-NonInteractive", "-Command", ps_command],
text=True,
stderr=subprocess.DEVNULL,
encoding="utf-8",
errors="replace",
)
except Exception:
return None
return out.strip() or None
def get_process_cmdline(pid: int) -> Optional[str]:
"""Devuelve la línea de comando completa de un PID, o None si no existe."""
if IS_WINDOWS:
return _run_powershell(
f"(Get-CimInstance Win32_Process -Filter 'ProcessId={pid}' -ErrorAction SilentlyContinue).CommandLine"
)
# POSIX (macOS/Linux): `ps -o command=` imprime la línea de comando completa
# sin encabezado. En macOS el límite por defecto es amplio; suficiente para
# contener la ruta absoluta a main.py que es lo que buscamos.
try:
out = subprocess.check_output(
["ps", "-p", str(pid), "-o", "command="],
text=True,
stderr=subprocess.DEVNULL,
)
except Exception:
return None
return out.strip() or None
def pid_is_alive(pid: int) -> bool:
"""True si el PID existe."""
if IS_WINDOWS:
result = _run_powershell(
f"if (Get-Process -Id {pid} -ErrorAction SilentlyContinue) {{ 'yes' }} else {{ 'no' }}"
)
return result == "yes"
# POSIX: señal 0 no mata; solo verifica existencia/permisos del proceso.
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
except PermissionError:
# Existe pero es de otro usuario; para nuestros fines, está vivo.
return True
except OSError:
return False
def pid_belongs_to_mp_manager(pid: int) -> bool:
"""True si el PID es ciertamente MP Manager. Conservador: ante la duda, False.
Fuente primaria (más fuerte): `generated/runtime/server_info.json` del proyecto
registró este PID. Como ese archivo SOLO lo escribe `main.py` de este proyecto,
el match es prueba inequívoca de identidad.
Fuente secundaria (cmdline): si los launchers lanzan `python "<ruta absoluta>/main.py"`,
el path del proyecto queda embebido en la línea de comando del proceso y podemos
reconocerlo aunque el server_info.json esté ausente o stale.
"""
info = read_server_info()
if info and info.get("pid") == pid:
return True
cmd = get_process_cmdline(pid)
if not cmd:
return False
norm = os.path.normcase(cmd)
if PROJECT_MARKER in norm:
return True
return False
def get_process_cwd(pid: int) -> Optional[str]:
"""Best-effort: obtener el directorio del ejecutable del proceso.
Win32_Process no expone el cwd real; el ExecutablePath sirve como pista
cuando el ejecutable vive dentro del proyecto. Para nuestro caso, lo más
informativo es la línea de comando completa, así que esta función es
secundaria.
"""
if IS_WINDOWS:
out = _run_powershell(
f"(Get-CimInstance Win32_Process -Filter 'ProcessId={pid}' -ErrorAction SilentlyContinue).ExecutablePath"
)
if out:
return os.path.dirname(out)
return None
# POSIX: no hay un cwd fiable sin /proc (Linux) y macOS no lo expone vía ps.
# La línea de comando completa es la pista útil para nosotros, así que esta
# función queda como best-effort y devuelve None.
return None
def read_server_info() -> Optional[dict]:
if not os.path.exists(paths.SERVER_INFO):
return None
try:
# utf-8-sig tolera BOM por si alguien escribe el archivo con PowerShell
# u otra herramienta que añada  al inicio.
with open(paths.SERVER_INFO, "r", encoding="utf-8-sig") as f:
return json.load(f)
except Exception:
return None
def clear_stale_server_info() -> bool:
"""Si server_info.json apunta a un PID muerto, lo borra. True si limpió."""
info = read_server_info()
if not info:
return False
pid = info.get("pid")
if pid and pid_is_alive(int(pid)):
return False
try:
os.remove(paths.SERVER_INFO)
return True
except OSError:
return False
def status() -> dict:
"""Estado actual: ¿hay MP Manager corriendo? ¿quién ocupa el puerto?"""
info = read_server_info()
result = {
"registered": info,
"registered_alive": False,
"registered_is_ours": False,
"port_8000_pid": None,
"port_8000_is_ours": False,
"port_8000_cmdline": None,
}
if info and info.get("pid"):
pid = int(info["pid"])
if pid_is_alive(pid):
result["registered_alive"] = True
result["registered_is_ours"] = pid_belongs_to_mp_manager(pid)
port_pid = get_pid_listening_on(8000)
if port_pid:
result["port_8000_pid"] = port_pid
result["port_8000_is_ours"] = pid_belongs_to_mp_manager(port_pid)
result["port_8000_cmdline"] = get_process_cmdline(port_pid)
return result
def stop_server(force: bool = False) -> int:
"""Apaga MP Manager si está corriendo. Devuelve exit code (0 = OK, 1 = no hay).
- Si server_info.json existe y el PID es MP Manager → mata el árbol.
- Si server_info.json existe pero el PID NO es MP Manager → limpia el archivo, no mata nada.
- Si server_info.json no existe → mira si el puerto 8000 lo tiene MP Manager (otra
instancia sin registro). Solo lo mata si pasa `force=True`.
"""
info = read_server_info()
killed_any = False
if info and info.get("pid"):
pid = int(info["pid"])
if pid_is_alive(pid):
if pid_belongs_to_mp_manager(pid):
_kill_tree(pid)
killed_any = True
print(f"[SISTEMA] Detenido MP Manager (PID {pid}, puerto {info.get('port')}).")
else:
cmd = get_process_cmdline(pid) or "(desconocido)"
print(f"[ADVERTENCIA] server_info apunta al PID {pid} pero NO es MP Manager:")
print(f" {cmd[:200]}")
print(f" No se mata. Limpiando server_info.json.")
else:
print(f"[SISTEMA] server_info apuntaba al PID {pid} pero ya estaba muerto. Limpiando registro.")
try:
os.remove(paths.SERVER_INFO)
except OSError:
pass
if killed_any:
return 0
# Sin registro: buscar instancias sin server_info.json (raro)
port_pid = get_pid_listening_on(8000)
if port_pid and pid_belongs_to_mp_manager(port_pid):
if force:
_kill_tree(port_pid)
print(f"[SISTEMA] Detenida instancia huérfana de MP Manager en puerto 8000 (PID {port_pid}).")
return 0
print(f"[SISTEMA] Hay un MP Manager huérfano en puerto 8000 (PID {port_pid}) sin server_info.json.")
print(f" Para apagarlo: python runtime_control.py stop --force")
return 1
print("[SISTEMA] No hay servidor MP Manager activo corriendo.")
return 1
def _kill_tree(pid: int) -> None:
if IS_WINDOWS:
subprocess.run(
["taskkill", "/F", "/T", "/PID", str(pid)],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return
_kill_tree_posix(pid)
def _kill_tree_posix(pid: int) -> None:
"""Mata recursivamente el proceso y sus hijos en macOS/Linux.
uvicorn arranca con `reload=True`, lo que crea un proceso reloader padre y un
worker hijo. El PID registrado en server_info.json es el del padre, así que
hay que descender por el árbol (`pgrep -P`) para no dejar al worker zombie
aferrado al puerto. Enviamos SIGTERM (apagado limpio); el padre propaga la
señal a uvicorn que cierra el socket ordenadamente.
"""
try:
out = subprocess.check_output(
["pgrep", "-P", str(pid)], text=True, stderr=subprocess.DEVNULL
)
children = [int(c) for c in out.split() if c.strip().isdigit()]
except Exception:
children = []
for child in children:
_kill_tree_posix(child)
try:
os.kill(pid, signal.SIGTERM)
except (ProcessLookupError, OSError):
pass
def wait_ready(timeout_sec: float = 30.0) -> int:
"""Espera a que server_info.json aparezca con un PID vivo. Imprime el puerto."""
deadline = time.time() + timeout_sec
while time.time() < deadline:
info = read_server_info()
if info and info.get("pid") and pid_is_alive(int(info["pid"])):
port = info.get("port", 8000)
print(port)
return 0
time.sleep(0.5)
print("[ERROR] Timeout esperando a que MP Manager arranque.", file=sys.stderr)
return 2
def preflight_check() -> int:
"""Validación pre-arranque. Devuelve:
0 = OK, puede lanzarse
1 = ya hay MP Manager corriendo (no relanzar)
2 = puerto 8000 ocupado por otro proceso (advertir pero lanzar — main.py salta al siguiente puerto libre)
"""
clear_stale_server_info()
info = read_server_info()
if info and info.get("pid") and pid_is_alive(int(info["pid"])):
port = info.get("port", 8000)
print(f"[INFO] MP Manager ya esta corriendo en puerto {port} (PID {info['pid']}).")
print(f" Abre http://127.0.0.1:{port}/ o detén primero (stop.bat / stop.command).")
return 1
port_pid = get_pid_listening_on(8000)
if port_pid:
if pid_belongs_to_mp_manager(port_pid):
print(f"[INFO] Hay un MP Manager huerfano en puerto 8000 (PID {port_pid}).")
print(f" Detén (stop.bat / stop.command) o usa: python runtime_control.py stop --force")
return 1
cmd = get_process_cmdline(port_pid) or "(desconocido)"
print(f"[ADVERTENCIA] Puerto 8000 ocupado por OTRO proyecto (PID {port_pid}):")
print(f" {cmd[:150]}")
print(f" MP Manager arrancara en el siguiente puerto libre.")
return 2
print("[OK] Puerto 8000 libre. Listo para arrancar.")
return 0
def main(argv: list) -> int:
if not argv:
print(__doc__)
return 0
cmd = argv[0]
if cmd == "status":
s = status()
print(json.dumps(s, indent=2, ensure_ascii=False))
return 0
if cmd == "stop":
force = "--force" in argv[1:]
return stop_server(force=force)
if cmd == "wait-ready":
timeout = 30.0
for arg in argv[1:]:
if arg.startswith("--timeout="):
try:
timeout = float(arg.split("=", 1)[1])
except ValueError:
pass
return wait_ready(timeout_sec=timeout)
if cmd == "preflight":
return preflight_check()
if cmd == "port-owner":
if len(argv) < 2:
print("uso: port-owner <puerto>", file=sys.stderr)
return 2
try:
p = int(argv[1])
except ValueError:
print("puerto invalido", file=sys.stderr)
return 2
pid = get_pid_listening_on(p)
if pid is None:
print(f"puerto {p} libre")
return 0
cmd_line = get_process_cmdline(pid) or "(desconocido)"
ours = pid_belongs_to_mp_manager(pid)
print(f"PID {pid} (es MP Manager: {ours})")
print(f" {cmd_line}")
return 0
print(f"comando desconocido: {cmd}", file=sys.stderr)
return 2
if __name__ == "__main__":
sys.exit(main(sys.argv[1:]))