"""Construcción del servidor FastMCP y registro de tools. Tools expuestas (ver docs/AGENT_TOOLS.md para el catálogo completo): Cuentas / metadatos: list_accounts, get_account, get_global_metrics, get_account_metrics Datos cacheados (SQLite local — refresca con sync_all_accounts): search_contacts, get_contact, get_opportunities, get_pipelines, get_workflows Operaciones (con dry_run por defecto, requieren confirm_token para aplicar): sync_missing_contacts, sync_missing_opps Operacional / observabilidad: sync_logs, error_logs, agent_audit_report, script_catalog Genéricas (cola larga — uso avanzado): run_script(name, args, expect_json) """ from __future__ import annotations import json import os from typing import Any from mcp.server.fastmcp import FastMCP from . import adapters from .manifest import write_manifest # Token literal que el LLM debe pasar como `confirm_token` para aplicar # cambios reales. Documentado en docs/AGENT_TOOLS.md y en la descripción # de cada tool mutadora. La idea es que el LLM lo solicite explícitamente # al usuario antes de invocar con apply=True. APPLY_CONFIRM_TOKEN = "I-HAVE-USER-CONFIRMATION" def _require_confirm(apply: bool, confirm_token: str | None) -> str | None: if not apply: return None if confirm_token != APPLY_CONFIRM_TOKEN: return ( f"apply=True requiere confirm_token='{APPLY_CONFIRM_TOKEN}'. " "Pide confirmación al usuario antes de pasarlo. Default es dry-run." ) return None def build_server() -> FastMCP: mcp = FastMCP("mp-manager") # Regenerar manifest al arrancar para que siempre refleje el estado actual. try: write_manifest() except Exception: # No bloqueamos el arranque si el manifest falla; el server sigue siendo útil. pass # ---------- Cuentas / metadatos ---------- @mcp.tool(description="Lista todas las cuentas (Marca + 49 sucursales) desde SQLite local. Read-only.") def list_accounts() -> dict: accounts = adapters.list_accounts_adapter() return adapters.maybe_offload("list_accounts", accounts) @mcp.tool(description="Detalle de una cuenta por location_id desde SQLite local.") def get_account(location_id: str) -> dict: acc = adapters.get_account_adapter(location_id) return {"ok": acc is not None, "account": acc} @mcp.tool(description="Métricas globales: total contactos, opps, sucursales activas. Read-only desde SQLite.") def get_global_metrics() -> dict: return {"ok": True, "metrics": adapters.get_global_metrics_adapter()} @mcp.tool(description="Métricas de una sucursal (contactos, opps por status, pipelines). Read-only desde SQLite.") def get_account_metrics(location_id: str) -> dict: return {"ok": True, "metrics": adapters.get_account_metrics_adapter(location_id)} # ---------- Datos cacheados ---------- @mcp.tool( description=( "Busca contactos en una location (cache SQLite). " "query matchea nombre/email/teléfono. " "without_opp=True filtra contactos sin oportunidad." ) ) def search_contacts( location_id: str, query: str | None = None, limit: int = 50, offset: int = 0, without_opp: bool = False, ) -> dict: rows = adapters.search_contacts_adapter( location_id, query=query, limit=limit, offset=offset, without_opp=without_opp ) return adapters.maybe_offload("search_contacts", rows) @mcp.tool(description="Detalle completo de un contacto por (location_id, contact_id) desde SQLite.") def get_contact(location_id: str, contact_id: str) -> dict: c = adapters.get_contact_adapter(location_id, contact_id) return {"ok": c is not None, "contact": c} @mcp.tool(description="Oportunidades de una location (opcionalmente filtra por pipeline_id). Cache SQLite.") def get_opportunities(location_id: str, pipeline_id: str | None = None) -> dict: rows = adapters.get_opportunities_adapter(location_id, pipeline_id=pipeline_id) return adapters.maybe_offload("get_opportunities", rows) @mcp.tool(description="Pipelines y etapas de una location. Cache SQLite.") def get_pipelines(location_id: str) -> dict: rows = adapters.get_pipelines_adapter(location_id) return {"ok": True, "pipelines": rows} @mcp.tool(description="Workflows de una location (o de todas si se omite location_id). Cache SQLite.") def get_workflows(location_id: str | None = None) -> dict: rows = adapters.get_workflows_adapter(location_id=location_id) return adapters.maybe_offload("get_workflows", rows) # ---------- Operaciones (mutadoras con guard) ---------- @mcp.tool( description=( "Sincroniza contactos faltantes de sucursales hacia Marca. " "Default dry_run=True (planifica, no aplica). " "Para apply=True debes pasar confirm_token='I-HAVE-USER-CONFIRMATION' tras pedir confirmación al usuario. " "Genera run_id automático y registra en script_audit para rollback." ) ) def sync_missing_contacts( contact_ids: list[str] | None = None, apply: bool = False, confirm_token: str | None = None, ) -> dict: err = _require_confirm(apply, confirm_token) if err: return {"ok": False, "error": err} result = adapters.sync_missing_contacts_adapter( contact_ids=contact_ids, dry_run=not apply ) return adapters.maybe_offload("sync_missing_contacts", result) @mcp.tool( description=( "Sincroniza oportunidades faltantes de sucursales hacia Marca. " "Default dry_run=True. Para apply=True requiere confirm_token. " "Genera run_id y registra en script_audit para rollback." ) ) def sync_missing_opps( opp_ids: list[str] | None = None, apply: bool = False, confirm_token: str | None = None, ) -> dict: err = _require_confirm(apply, confirm_token) if err: return {"ok": False, "error": err} result = adapters.sync_missing_opps_adapter( opp_ids=opp_ids, dry_run=not apply ) return adapters.maybe_offload("sync_missing_opps", result) # ---------- Observabilidad ---------- @mcp.tool(description="Logs recientes de sincronización (última corrida por sucursal). Read-only.") def sync_logs(limit: int = 20) -> dict: return {"ok": True, "logs": adapters.get_sync_logs_adapter(limit=limit)} @mcp.tool(description="Errores recientes (tabla error_log + errors.jsonl). Read-only.") def error_logs(limit: int = 50) -> dict: return adapters.maybe_offload("error_logs", adapters.get_error_logs_adapter(limit=limit)) @mcp.tool( description=( "Reporte de salud agentica: scripts, endpoints, issues. " "Regenera si stale_seconds>0 o si no existe. Lectura del JSON generado por scripts/audit_agent_readiness.py." ) ) def agent_audit_report() -> dict: report = adapters.script_catalog() return adapters.maybe_offload("agent_audit_report", report) @mcp.tool( description=( "Catálogo de tools y scripts disponibles (lectura del tools_manifest.json). " "Útil al inicio de una sesión para saber qué herramientas existen." ) ) def script_catalog() -> dict: path = os.path.join(os.path.dirname(__file__), "..", "generated", "agent", "tools_manifest.json") path = os.path.abspath(path) try: with open(path, "r", encoding="utf-8") as f: manifest = json.load(f) except (OSError, json.JSONDecodeError) as e: return {"ok": False, "error": str(e)} return adapters.maybe_offload("script_catalog", manifest) # ---------- Genérica (cola larga) ---------- @mcp.tool( description=( "Ejecuta cualquier script de scripts/ por subprocess. " "Para scripts mutadores debes pasar apply=True + confirm_token, y los args correctos del script (típicamente --apply --run-id ...). " "Si expect_json=True intenta parsear stdout como JSON y vuelca a generated/agent/runs/ si es grande. " "Lista de scripts disponibles en `script_catalog` / `agent_audit_report`." ) ) def run_script( name: str, args: list[str] | None = None, expect_json: bool = False, apply: bool = False, confirm_token: str | None = None, timeout_sec: int = 600, ) -> dict: err = _require_confirm(apply, confirm_token) if err: return {"ok": False, "error": err} return adapters.run_python_script( name, args=args, timeout=timeout_sec, expect_json=expect_json ) return mcp