Files
2026-05-30 14:31:19 -06:00

228 lines
7.2 KiB
Python

"""Adapters: invocan funciones internas del proyecto desde el MCP server.
Filosofía: las tools del MCP llaman funciones Python directamente (no HTTP).
Para scripts que no exportan función pública, usan subprocess con --json y
parsean stdout. Los reportes grandes se vuelcan a `generated/agent/runs/`
y la tool devuelve solo el path + summary para no inflar el contexto LLM.
"""
from __future__ import annotations
import json
import os
import subprocess
import sys
import uuid
from datetime import datetime
from typing import Any
# Asegurar que la raíz del repo esté en el PYTHONPATH cuando se ejecuta
# `python -m mcp_server` desde el cwd del proyecto.
_ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if _ROOT not in sys.path:
sys.path.insert(0, _ROOT)
import paths # noqa: E402
SCRIPTS_DIR = os.path.join(_ROOT, "scripts")
PYTHON_EXE = sys.executable
LARGE_PAYLOAD_THRESHOLD = 8000 # chars; sobre esto, volcamos a archivo
def new_run_id() -> str:
return str(uuid.uuid4())
def _agent_run_path(tool_name: str, ext: str = "json") -> str:
os.makedirs(paths.AGENT_RUNS_DIR, exist_ok=True)
ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f")
return os.path.join(paths.AGENT_RUNS_DIR, f"{tool_name}_{ts}.{ext}")
def maybe_offload(tool_name: str, payload: Any) -> dict:
"""Si el payload es grande, lo escribe a disco y devuelve {path, summary}.
Devuelve siempre un dict con la forma {ok, summary, details?, report_path?}.
"""
try:
serialized = json.dumps(payload, ensure_ascii=False)
except (TypeError, ValueError):
serialized = str(payload)
if len(serialized) > LARGE_PAYLOAD_THRESHOLD:
path = _agent_run_path(tool_name)
with open(path, "w", encoding="utf-8") as f:
f.write(serialized)
summary = _summarize(payload)
return {"ok": True, "summary": summary, "report_path": path}
return {"ok": True, "summary": _summarize(payload), "details": payload}
def _summarize(payload: Any) -> dict:
"""Genera un summary compacto: claves top-level, conteos, primeras filas."""
if isinstance(payload, dict):
out = {"type": "dict", "keys": list(payload.keys())[:20]}
if "summary" in payload:
out["summary"] = payload["summary"]
for k, v in payload.items():
if isinstance(v, list):
out[f"{k}_count"] = len(v)
return out
if isinstance(payload, list):
return {"type": "list", "count": len(payload), "first": payload[:3]}
return {"type": type(payload).__name__, "value": str(payload)[:200]}
def run_python_script(
name: str,
args: list[str] | None = None,
timeout: int = 600,
expect_json: bool = False,
) -> dict:
"""Lanza un script de `scripts/` por subprocess.
Devuelve {ok, exit_code, stdout, stderr, parsed?}. Si expect_json=True y el
script imprimió JSON, lo parsea en `parsed` y aplica maybe_offload sobre él.
"""
if not name.endswith(".py"):
name = name + ".py"
script_path = os.path.join(SCRIPTS_DIR, name)
if not os.path.isfile(script_path):
return {"ok": False, "error": f"script not found: {name}"}
cmd = [PYTHON_EXE, script_path] + (args or [])
try:
proc = subprocess.run(
cmd,
capture_output=True,
text=True,
encoding="utf-8",
errors="replace",
timeout=timeout,
cwd=_ROOT,
)
except subprocess.TimeoutExpired:
return {"ok": False, "error": "timeout", "timeout_sec": timeout}
out: dict[str, Any] = {
"ok": proc.returncode == 0,
"exit_code": proc.returncode,
"command": " ".join(args or []),
}
if expect_json and proc.stdout.strip():
try:
parsed = json.loads(proc.stdout)
offload = maybe_offload(name[:-3], parsed)
out.update(offload)
out["stderr_tail"] = proc.stderr[-500:] if proc.stderr else ""
return out
except json.JSONDecodeError:
pass
out["stdout"] = proc.stdout[-4000:]
out["stderr"] = proc.stderr[-1000:]
return out
# --- Adapters específicos (funciones Python directas) ---
def list_accounts_adapter() -> list[dict]:
"""Devuelve cuentas conocidas desde la DB local."""
import db
return db.get_accounts() or []
def get_account_adapter(location_id: str) -> dict | None:
import db
return db.get_account(location_id)
def search_contacts_adapter(
location_id: str,
query: str | None = None,
limit: int = 50,
offset: int = 0,
without_opp: bool = False,
) -> list[dict]:
import db
return db.get_contacts(location_id, search_query=query, limit=limit, offset=offset, without_opp=without_opp)
def get_contact_adapter(location_id: str, contact_id: str) -> dict | None:
import db
return db.get_contact_by_id(location_id, contact_id)
def get_opportunities_adapter(location_id: str, pipeline_id: str | None = None) -> list[dict]:
import db
return db.get_opportunities(location_id, pipeline_id=pipeline_id)
def get_pipelines_adapter(location_id: str) -> list[dict]:
import db
return db.get_pipelines(location_id)
def get_account_metrics_adapter(location_id: str) -> dict:
import db
return db.get_account_metrics(location_id)
def get_global_metrics_adapter() -> dict:
import db
return db.get_global_metrics()
def get_workflows_adapter(location_id: str | None = None) -> list[dict]:
import db
return db.get_workflows(location_id=location_id)
def get_sync_logs_adapter(limit: int = 20) -> list[dict]:
import db
return db.get_sync_logs(limit=limit)
def get_error_logs_adapter(limit: int = 50) -> list[dict]:
import db
return db.get_error_logs(limit=limit)
def sync_missing_contacts_adapter(
contact_ids: list[str] | None = None,
dry_run: bool = True,
run_id: str | None = None,
) -> dict:
"""Sincroniza contactos faltantes de sucursal → Marca.
Defaults a dry_run=True. Si dry_run=False, requiere run_id (lo genera si no
se pasa) y queda registrado en script_audit para rollback.
"""
from scripts import sync_missing_contacts_to_brand as mod
if not dry_run and not run_id:
run_id = new_run_id()
result = mod.run_sync(contact_ids=contact_ids, dry_run=dry_run, run_id=run_id)
return {"run_id": run_id, "dry_run": dry_run, "result": result}
def sync_missing_opps_adapter(
opp_ids: list[str] | None = None,
dry_run: bool = True,
run_id: str | None = None,
) -> dict:
"""Sincroniza oportunidades faltantes de sucursal → Marca."""
from scripts import sync_missing_opps_to_brand as mod
if not dry_run and not run_id:
run_id = new_run_id()
result = mod.run_sync(opp_ids=opp_ids, dry_run=dry_run, run_id=run_id)
return {"run_id": run_id, "dry_run": dry_run, "result": result}
# --- Catálogo de scripts disponibles ---
def script_catalog() -> dict:
"""Devuelve el inventario completo de scripts con su estado de auditoría."""
if not os.path.exists(paths.AGENT_AUDIT_REPORT):
return {"error": "audit_report.json no existe — corre scripts/audit_agent_readiness.py"}
with open(paths.AGENT_AUDIT_REPORT, "r", encoding="utf-8") as f:
return json.load(f)