"""Adapters: invocan funciones internas del proyecto desde el MCP server. Filosofía: las tools del MCP llaman funciones Python directamente (no HTTP). Para scripts que no exportan función pública, usan subprocess con --json y parsean stdout. Los reportes grandes se vuelcan a `generated/agent/runs/` y la tool devuelve solo el path + summary para no inflar el contexto LLM. """ from __future__ import annotations import json import os import subprocess import sys import uuid from datetime import datetime from typing import Any # Asegurar que la raíz del repo esté en el PYTHONPATH cuando se ejecuta # `python -m mcp_server` desde el cwd del proyecto. _ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if _ROOT not in sys.path: sys.path.insert(0, _ROOT) import paths # noqa: E402 SCRIPTS_DIR = os.path.join(_ROOT, "scripts") PYTHON_EXE = sys.executable LARGE_PAYLOAD_THRESHOLD = 8000 # chars; sobre esto, volcamos a archivo def new_run_id() -> str: return str(uuid.uuid4()) def _agent_run_path(tool_name: str, ext: str = "json") -> str: os.makedirs(paths.AGENT_RUNS_DIR, exist_ok=True) ts = datetime.now().strftime("%Y%m%d_%H%M%S_%f") return os.path.join(paths.AGENT_RUNS_DIR, f"{tool_name}_{ts}.{ext}") def maybe_offload(tool_name: str, payload: Any) -> dict: """Si el payload es grande, lo escribe a disco y devuelve {path, summary}. Devuelve siempre un dict con la forma {ok, summary, details?, report_path?}. """ try: serialized = json.dumps(payload, ensure_ascii=False) except (TypeError, ValueError): serialized = str(payload) if len(serialized) > LARGE_PAYLOAD_THRESHOLD: path = _agent_run_path(tool_name) with open(path, "w", encoding="utf-8") as f: f.write(serialized) summary = _summarize(payload) return {"ok": True, "summary": summary, "report_path": path} return {"ok": True, "summary": _summarize(payload), "details": payload} def _summarize(payload: Any) -> dict: """Genera un summary compacto: claves top-level, conteos, primeras filas.""" if isinstance(payload, dict): out = {"type": "dict", "keys": list(payload.keys())[:20]} if "summary" in payload: out["summary"] = payload["summary"] for k, v in payload.items(): if isinstance(v, list): out[f"{k}_count"] = len(v) return out if isinstance(payload, list): return {"type": "list", "count": len(payload), "first": payload[:3]} return {"type": type(payload).__name__, "value": str(payload)[:200]} def run_python_script( name: str, args: list[str] | None = None, timeout: int = 600, expect_json: bool = False, ) -> dict: """Lanza un script de `scripts/` por subprocess. Devuelve {ok, exit_code, stdout, stderr, parsed?}. Si expect_json=True y el script imprimió JSON, lo parsea en `parsed` y aplica maybe_offload sobre él. """ if not name.endswith(".py"): name = name + ".py" script_path = os.path.join(SCRIPTS_DIR, name) if not os.path.isfile(script_path): return {"ok": False, "error": f"script not found: {name}"} cmd = [PYTHON_EXE, script_path] + (args or []) try: proc = subprocess.run( cmd, capture_output=True, text=True, encoding="utf-8", errors="replace", timeout=timeout, cwd=_ROOT, ) except subprocess.TimeoutExpired: return {"ok": False, "error": "timeout", "timeout_sec": timeout} out: dict[str, Any] = { "ok": proc.returncode == 0, "exit_code": proc.returncode, "command": " ".join(args or []), } if expect_json and proc.stdout.strip(): try: parsed = json.loads(proc.stdout) offload = maybe_offload(name[:-3], parsed) out.update(offload) out["stderr_tail"] = proc.stderr[-500:] if proc.stderr else "" return out except json.JSONDecodeError: pass out["stdout"] = proc.stdout[-4000:] out["stderr"] = proc.stderr[-1000:] return out # --- Adapters específicos (funciones Python directas) --- def list_accounts_adapter() -> list[dict]: """Devuelve cuentas conocidas desde la DB local.""" import db return db.get_accounts() or [] def get_account_adapter(location_id: str) -> dict | None: import db return db.get_account(location_id) def search_contacts_adapter( location_id: str, query: str | None = None, limit: int = 50, offset: int = 0, without_opp: bool = False, ) -> list[dict]: import db return db.get_contacts(location_id, search_query=query, limit=limit, offset=offset, without_opp=without_opp) def get_contact_adapter(location_id: str, contact_id: str) -> dict | None: import db return db.get_contact_by_id(location_id, contact_id) def get_opportunities_adapter(location_id: str, pipeline_id: str | None = None) -> list[dict]: import db return db.get_opportunities(location_id, pipeline_id=pipeline_id) def get_pipelines_adapter(location_id: str) -> list[dict]: import db return db.get_pipelines(location_id) def get_account_metrics_adapter(location_id: str) -> dict: import db return db.get_account_metrics(location_id) def get_global_metrics_adapter() -> dict: import db return db.get_global_metrics() def get_workflows_adapter(location_id: str | None = None) -> list[dict]: import db return db.get_workflows(location_id=location_id) def get_sync_logs_adapter(limit: int = 20) -> list[dict]: import db return db.get_sync_logs(limit=limit) def get_error_logs_adapter(limit: int = 50) -> list[dict]: import db return db.get_error_logs(limit=limit) def sync_missing_contacts_adapter( contact_ids: list[str] | None = None, dry_run: bool = True, run_id: str | None = None, ) -> dict: """Sincroniza contactos faltantes de sucursal → Marca. Defaults a dry_run=True. Si dry_run=False, requiere run_id (lo genera si no se pasa) y queda registrado en script_audit para rollback. """ from scripts import sync_missing_contacts_to_brand as mod if not dry_run and not run_id: run_id = new_run_id() result = mod.run_sync(contact_ids=contact_ids, dry_run=dry_run, run_id=run_id) return {"run_id": run_id, "dry_run": dry_run, "result": result} def sync_missing_opps_adapter( opp_ids: list[str] | None = None, dry_run: bool = True, run_id: str | None = None, ) -> dict: """Sincroniza oportunidades faltantes de sucursal → Marca.""" from scripts import sync_missing_opps_to_brand as mod if not dry_run and not run_id: run_id = new_run_id() result = mod.run_sync(opp_ids=opp_ids, dry_run=dry_run, run_id=run_id) return {"run_id": run_id, "dry_run": dry_run, "result": result} # --- Catálogo de scripts disponibles --- def script_catalog() -> dict: """Devuelve el inventario completo de scripts con su estado de auditoría.""" if not os.path.exists(paths.AGENT_AUDIT_REPORT): return {"error": "audit_report.json no existe — corre scripts/audit_agent_readiness.py"} with open(paths.AGENT_AUDIT_REPORT, "r", encoding="utf-8") as f: return json.load(f)