340 lines
12 KiB
Python
340 lines
12 KiB
Python
"""Utilidades de control de runtime del servidor MP Manager.
|
||
|
||
Centraliza la lógica de:
|
||
* leer/validar `generated/runtime/server_info.json`,
|
||
* confirmar que un PID es realmente MP Manager (y no otro proyecto Python),
|
||
* detectar quién ocupa un puerto,
|
||
* apagar el servidor de forma segura (sin matar procesos ajenos),
|
||
* esperar a que el server termine de arrancar (server_info.json escrito).
|
||
|
||
Pensado para que los batch files (`start.bat`, `stop.bat`, `restart.bat`) lo
|
||
invoquen como CLI sin tener que hacer parsing de JSON o validaciones complejas
|
||
en lenguaje batch.
|
||
|
||
Uso CLI:
|
||
python runtime_control.py status # imprime estado actual
|
||
python runtime_control.py stop # apaga MP Manager (sin tocar otros)
|
||
python runtime_control.py wait-ready # espera a que server_info.json aparezca, imprime el puerto
|
||
python runtime_control.py port-owner P # imprime quién ocupa el puerto P
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import json
|
||
import os
|
||
import socket
|
||
import subprocess
|
||
import sys
|
||
import time
|
||
from typing import Optional
|
||
|
||
import paths
|
||
|
||
|
||
# Indicador inequívoco de que un proceso es MP Manager (vs Transcriptor u otro
|
||
# proyecto Python). Lo buscamos en la línea de comando del proceso.
|
||
PROJECT_MARKER = os.path.normcase(paths.BASE_DIR)
|
||
|
||
|
||
def is_port_in_use(port: int, host: str = "127.0.0.1") -> bool:
|
||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
try:
|
||
s.bind((host, port))
|
||
return False
|
||
except OSError:
|
||
return True
|
||
|
||
|
||
def get_pid_listening_on(port: int) -> Optional[int]:
|
||
"""Devuelve el PID que escucha en `port` (LISTENING), o None si nadie."""
|
||
try:
|
||
out = subprocess.check_output(
|
||
["netstat", "-ano"], text=True, stderr=subprocess.DEVNULL
|
||
)
|
||
except Exception:
|
||
return None
|
||
needle = f":{port}"
|
||
for line in out.splitlines():
|
||
if needle in line and "LISTENING" in line:
|
||
parts = line.split()
|
||
if parts and parts[-1].isdigit():
|
||
# Asegurar que el match es del puerto exacto (no de un sufijo)
|
||
local = parts[1] if len(parts) >= 2 else ""
|
||
if local.endswith(needle):
|
||
return int(parts[-1])
|
||
return None
|
||
|
||
|
||
def _run_powershell(ps_command: str) -> Optional[str]:
|
||
"""Ejecuta un comando PowerShell y devuelve stdout (stripped) o None si falla.
|
||
|
||
Usamos PowerShell porque `wmic` ya no viene en Windows 11. `Get-CimInstance`
|
||
es el reemplazo oficial y siempre está disponible.
|
||
"""
|
||
try:
|
||
out = subprocess.check_output(
|
||
["powershell", "-NoProfile", "-NonInteractive", "-Command", ps_command],
|
||
text=True,
|
||
stderr=subprocess.DEVNULL,
|
||
encoding="utf-8",
|
||
errors="replace",
|
||
)
|
||
except Exception:
|
||
return None
|
||
return out.strip() or None
|
||
|
||
|
||
def get_process_cmdline(pid: int) -> Optional[str]:
|
||
"""Devuelve la línea de comando de un PID en Windows, o None si no existe."""
|
||
return _run_powershell(
|
||
f"(Get-CimInstance Win32_Process -Filter 'ProcessId={pid}' -ErrorAction SilentlyContinue).CommandLine"
|
||
)
|
||
|
||
|
||
def pid_is_alive(pid: int) -> bool:
|
||
"""True si el PID existe. Más confiable que parsear tasklist."""
|
||
result = _run_powershell(
|
||
f"if (Get-Process -Id {pid} -ErrorAction SilentlyContinue) {{ 'yes' }} else {{ 'no' }}"
|
||
)
|
||
return result == "yes"
|
||
|
||
|
||
def pid_belongs_to_mp_manager(pid: int) -> bool:
|
||
"""True si el PID es ciertamente MP Manager. Conservador: ante la duda, False.
|
||
|
||
Fuente primaria (más fuerte): `generated/runtime/server_info.json` del proyecto
|
||
registró este PID. Como ese archivo SOLO lo escribe `main.py` de este proyecto,
|
||
el match es prueba inequívoca de identidad.
|
||
|
||
Fuente secundaria (cmdline): si los .bat lanzan `python "<ruta absoluta>/main.py"`,
|
||
el path del proyecto queda embebido en la línea de comando del proceso y podemos
|
||
reconocerlo aunque el server_info.json esté ausente o stale.
|
||
"""
|
||
info = read_server_info()
|
||
if info and info.get("pid") == pid:
|
||
return True
|
||
cmd = get_process_cmdline(pid)
|
||
if not cmd:
|
||
return False
|
||
norm = os.path.normcase(cmd)
|
||
if PROJECT_MARKER in norm:
|
||
return True
|
||
return False
|
||
|
||
|
||
def get_process_cwd(pid: int) -> Optional[str]:
|
||
"""Best-effort: obtener el directorio del ejecutable del proceso.
|
||
|
||
Win32_Process no expone el cwd real; el ExecutablePath sirve como pista
|
||
cuando el ejecutable vive dentro del proyecto. Para nuestro caso, lo más
|
||
informativo es la línea de comando completa, así que esta función es
|
||
secundaria.
|
||
"""
|
||
out = _run_powershell(
|
||
f"(Get-CimInstance Win32_Process -Filter 'ProcessId={pid}' -ErrorAction SilentlyContinue).ExecutablePath"
|
||
)
|
||
if out:
|
||
return os.path.dirname(out)
|
||
return None
|
||
|
||
|
||
def read_server_info() -> Optional[dict]:
|
||
if not os.path.exists(paths.SERVER_INFO):
|
||
return None
|
||
try:
|
||
# utf-8-sig tolera BOM por si alguien escribe el archivo con PowerShell
|
||
# u otra herramienta que añada al inicio.
|
||
with open(paths.SERVER_INFO, "r", encoding="utf-8-sig") as f:
|
||
return json.load(f)
|
||
except Exception:
|
||
return None
|
||
|
||
|
||
def clear_stale_server_info() -> bool:
|
||
"""Si server_info.json apunta a un PID muerto, lo borra. True si limpió."""
|
||
info = read_server_info()
|
||
if not info:
|
||
return False
|
||
pid = info.get("pid")
|
||
if pid and pid_is_alive(int(pid)):
|
||
return False
|
||
try:
|
||
os.remove(paths.SERVER_INFO)
|
||
return True
|
||
except OSError:
|
||
return False
|
||
|
||
|
||
def status() -> dict:
|
||
"""Estado actual: ¿hay MP Manager corriendo? ¿quién ocupa el puerto?"""
|
||
info = read_server_info()
|
||
result = {
|
||
"registered": info,
|
||
"registered_alive": False,
|
||
"registered_is_ours": False,
|
||
"port_8000_pid": None,
|
||
"port_8000_is_ours": False,
|
||
"port_8000_cmdline": None,
|
||
}
|
||
if info and info.get("pid"):
|
||
pid = int(info["pid"])
|
||
if pid_is_alive(pid):
|
||
result["registered_alive"] = True
|
||
result["registered_is_ours"] = pid_belongs_to_mp_manager(pid)
|
||
|
||
port_pid = get_pid_listening_on(8000)
|
||
if port_pid:
|
||
result["port_8000_pid"] = port_pid
|
||
result["port_8000_is_ours"] = pid_belongs_to_mp_manager(port_pid)
|
||
result["port_8000_cmdline"] = get_process_cmdline(port_pid)
|
||
return result
|
||
|
||
|
||
def stop_server(force: bool = False) -> int:
|
||
"""Apaga MP Manager si está corriendo. Devuelve exit code (0 = OK, 1 = no hay).
|
||
|
||
- Si server_info.json existe y el PID es MP Manager → mata el árbol.
|
||
- Si server_info.json existe pero el PID NO es MP Manager → limpia el archivo, no mata nada.
|
||
- Si server_info.json no existe → mira si el puerto 8000 lo tiene MP Manager (otra
|
||
instancia sin registro). Solo lo mata si pasa `force=True`.
|
||
"""
|
||
info = read_server_info()
|
||
killed_any = False
|
||
|
||
if info and info.get("pid"):
|
||
pid = int(info["pid"])
|
||
if pid_is_alive(pid):
|
||
if pid_belongs_to_mp_manager(pid):
|
||
_kill_tree(pid)
|
||
killed_any = True
|
||
print(f"[SISTEMA] Detenido MP Manager (PID {pid}, puerto {info.get('port')}).")
|
||
else:
|
||
cmd = get_process_cmdline(pid) or "(desconocido)"
|
||
print(f"[ADVERTENCIA] server_info apunta al PID {pid} pero NO es MP Manager:")
|
||
print(f" {cmd[:200]}")
|
||
print(f" No se mata. Limpiando server_info.json.")
|
||
else:
|
||
print(f"[SISTEMA] server_info apuntaba al PID {pid} pero ya estaba muerto. Limpiando registro.")
|
||
try:
|
||
os.remove(paths.SERVER_INFO)
|
||
except OSError:
|
||
pass
|
||
|
||
if killed_any:
|
||
return 0
|
||
|
||
# Sin registro: buscar instancias sin server_info.json (raro)
|
||
port_pid = get_pid_listening_on(8000)
|
||
if port_pid and pid_belongs_to_mp_manager(port_pid):
|
||
if force:
|
||
_kill_tree(port_pid)
|
||
print(f"[SISTEMA] Detenida instancia huérfana de MP Manager en puerto 8000 (PID {port_pid}).")
|
||
return 0
|
||
print(f"[SISTEMA] Hay un MP Manager huérfano en puerto 8000 (PID {port_pid}) sin server_info.json.")
|
||
print(f" Para apagarlo: python runtime_control.py stop --force")
|
||
return 1
|
||
|
||
print("[SISTEMA] No hay servidor MP Manager activo corriendo.")
|
||
return 1
|
||
|
||
|
||
def _kill_tree(pid: int) -> None:
|
||
subprocess.run(
|
||
["taskkill", "/F", "/T", "/PID", str(pid)],
|
||
stdout=subprocess.DEVNULL,
|
||
stderr=subprocess.DEVNULL,
|
||
)
|
||
|
||
|
||
def wait_ready(timeout_sec: float = 30.0) -> int:
|
||
"""Espera a que server_info.json aparezca con un PID vivo. Imprime el puerto."""
|
||
deadline = time.time() + timeout_sec
|
||
while time.time() < deadline:
|
||
info = read_server_info()
|
||
if info and info.get("pid") and pid_is_alive(int(info["pid"])):
|
||
port = info.get("port", 8000)
|
||
print(port)
|
||
return 0
|
||
time.sleep(0.5)
|
||
print("[ERROR] Timeout esperando a que MP Manager arranque.", file=sys.stderr)
|
||
return 2
|
||
|
||
|
||
def preflight_check() -> int:
|
||
"""Validación pre-arranque. Devuelve:
|
||
0 = OK, puede lanzarse
|
||
1 = ya hay MP Manager corriendo (no relanzar)
|
||
2 = puerto 8000 ocupado por otro proceso (advertir pero lanzar — main.py salta al siguiente puerto libre)
|
||
"""
|
||
clear_stale_server_info()
|
||
info = read_server_info()
|
||
if info and info.get("pid") and pid_is_alive(int(info["pid"])):
|
||
port = info.get("port", 8000)
|
||
print(f"[INFO] MP Manager ya esta corriendo en puerto {port} (PID {info['pid']}).")
|
||
print(f" Abre http://127.0.0.1:{port}/ o ejecuta stop.bat primero.")
|
||
return 1
|
||
|
||
port_pid = get_pid_listening_on(8000)
|
||
if port_pid:
|
||
if pid_belongs_to_mp_manager(port_pid):
|
||
print(f"[INFO] Hay un MP Manager huerfano en puerto 8000 (PID {port_pid}).")
|
||
print(f" Ejecuta stop.bat o usa: python runtime_control.py stop --force")
|
||
return 1
|
||
cmd = get_process_cmdline(port_pid) or "(desconocido)"
|
||
print(f"[ADVERTENCIA] Puerto 8000 ocupado por OTRO proyecto (PID {port_pid}):")
|
||
print(f" {cmd[:150]}")
|
||
print(f" MP Manager arrancara en el siguiente puerto libre.")
|
||
return 2
|
||
|
||
print("[OK] Puerto 8000 libre. Listo para arrancar.")
|
||
return 0
|
||
|
||
|
||
def main(argv: list) -> int:
|
||
if not argv:
|
||
print(__doc__)
|
||
return 0
|
||
cmd = argv[0]
|
||
if cmd == "status":
|
||
s = status()
|
||
print(json.dumps(s, indent=2, ensure_ascii=False))
|
||
return 0
|
||
if cmd == "stop":
|
||
force = "--force" in argv[1:]
|
||
return stop_server(force=force)
|
||
if cmd == "wait-ready":
|
||
timeout = 30.0
|
||
for arg in argv[1:]:
|
||
if arg.startswith("--timeout="):
|
||
try:
|
||
timeout = float(arg.split("=", 1)[1])
|
||
except ValueError:
|
||
pass
|
||
return wait_ready(timeout_sec=timeout)
|
||
if cmd == "preflight":
|
||
return preflight_check()
|
||
if cmd == "port-owner":
|
||
if len(argv) < 2:
|
||
print("uso: port-owner <puerto>", file=sys.stderr)
|
||
return 2
|
||
try:
|
||
p = int(argv[1])
|
||
except ValueError:
|
||
print("puerto invalido", file=sys.stderr)
|
||
return 2
|
||
pid = get_pid_listening_on(p)
|
||
if pid is None:
|
||
print(f"puerto {p} libre")
|
||
return 0
|
||
cmd_line = get_process_cmdline(pid) or "(desconocido)"
|
||
ours = pid_belongs_to_mp_manager(pid)
|
||
print(f"PID {pid} (es MP Manager: {ours})")
|
||
print(f" {cmd_line}")
|
||
return 0
|
||
print(f"comando desconocido: {cmd}", file=sys.stderr)
|
||
return 2
|
||
|
||
|
||
if __name__ == "__main__":
|
||
sys.exit(main(sys.argv[1:]))
|