"""Audit del repo para preparar la capa agentica MCP. Category: audit Mutates: no Tool-safe: yes Recorre `main.py`, `script_runner.py` y `scripts/` para producir un reporte JSON con el estado de cada endpoint y script frente a su uso como tool LLM: - endpoints FastAPI (read/write, tool-safe heurístico) - scripts: presencia en SCRIPTS_METADATA, flags soportadas (--json, --dry-run, --apply, --run-id), docstring, clasificación mutador/read-only, decisión sugerida para los huérfanos - huecos de documentación (scripts no mencionados en CLAUDE.md / AGENTS.md) Salida: `generated/agent/audit_report.json` (sobreescribe). Stdout: resumen humano. Con `--json` imprime el reporte completo a stdout en lugar del resumen. Uso: python scripts/audit_agent_readiness.py # resumen + escribe JSON python scripts/audit_agent_readiness.py --json # JSON a stdout python scripts/audit_agent_readiness.py --fail-on-issues """ from __future__ import annotations import argparse import ast import json import os import re import sys from datetime import datetime, timezone BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) SCRIPTS_DIR = os.path.join(BASE_DIR, "scripts") sys.path.insert(0, BASE_DIR) import paths # noqa: E402 # Categorías de scripts por prefijo / patrón READ_ONLY_PREFIXES = ( "audit_", "analyze_", "find_", "check_", "health_check_", "compare_", "mp_contact_search", "mp_opportunity_search", "mp_opportunities_status", "mp_branches_deep_audit", "daily_summary_", "search_", "export_", "monitor_", "full_audit_", "full_autos_", "ghl_branch_analysis", ) MUTATOR_PREFIXES = ( "fix_", "migrate_", "move_", "update_", "sync_", "cleanup_", "reconcile_", "fill_", "create_", "apply_", "align_", "backfill_", "merge_", "dedupe_", "tag_", "run_origen_", "fuente_prospecto_", "canal_origen_", ) UTILITY_FILES = {"common.py", "email_otp_reader.py"} BROWSER_PREFIX = "ghl_browser_" def parse_args(): p = argparse.ArgumentParser(description=__doc__) p.add_argument("--json", action="store_true", help="Imprime el reporte completo a stdout.") p.add_argument("--fail-on-issues", action="store_true", help="Exit 1 si hay hallazgos.") return p.parse_args() def _extract_docstring(tree: ast.AST) -> str | None: return ast.get_docstring(tree) if isinstance(tree, ast.Module) else None def _has_argparse_flag(source: str, *flags: str) -> bool: return any(re.search(rf"['\"]{re.escape(f)}['\"]", source) for f in flags) def _calls_mutating_http(source: str) -> bool: # Heurística: requests.post/put/delete o ghl_client.* con métodos mutadores return bool(re.search(r"\b(requests|session)\.(post|put|delete|patch)\b", source)) or bool( re.search(r"ghl_client\.\w*(post|put|delete|create|update|delete)", source, re.I) ) def classify_script(name: str, source: str) -> dict: stem = name[:-3] if name.endswith(".py") else name is_utility = name in UTILITY_FILES is_browser = stem.startswith(BROWSER_PREFIX) if is_utility: category = "utility" elif is_browser: category = "browser" elif any(stem.startswith(p) for p in READ_ONLY_PREFIXES): category = "audit" elif any(stem.startswith(p) for p in MUTATOR_PREFIXES): category = "mutator" else: category = "unknown" has_mut_http = _calls_mutating_http(source) # Mutador efectivo = categoría mutator OR detecta llamadas HTTP mutadoras OR escribe en GHL via ghl_client is_mutator = category == "mutator" or (category not in ("utility",) and has_mut_http) return { "category": category, "is_mutator": is_mutator, "has_dry_run_flag": _has_argparse_flag(source, "--dry-run", "--apply"), "has_apply_flag": _has_argparse_flag(source, "--apply"), "has_run_id_flag": _has_argparse_flag(source, "--run-id"), "has_json_flag": _has_argparse_flag(source, "--json"), "calls_mutating_http": has_mut_http, } def audit_scripts(registered: set[str]) -> list[dict]: out = [] for name in sorted(os.listdir(SCRIPTS_DIR)): if not name.endswith(".py"): continue path = os.path.join(SCRIPTS_DIR, name) try: source = open(path, "r", encoding="utf-8").read() except OSError: continue try: tree = ast.parse(source) docstring = _extract_docstring(tree) except SyntaxError: docstring = None info = classify_script(name, source) registered_in_metadata = name in registered # Decisión sugerida para huérfanos if info["category"] == "utility": suggestion = "keep_utility" elif registered_in_metadata: suggestion = "ok" elif info["category"] == "browser": suggestion = "register" elif info["category"] in ("audit", "mutator"): suggestion = "register" else: suggestion = "review" # Issues por script issues = [] if info["is_mutator"]: if not info["has_apply_flag"]: issues.append("mutator-without-apply-flag") if not info["has_run_id_flag"]: issues.append("mutator-without-run-id") if info["category"] in ("audit", "mutator") and not info["has_json_flag"]: issues.append("missing-json-flag") if not docstring: issues.append("missing-docstring") elif info["category"] != "utility": # docstring header esperado: Category/Mutates/Tool-safe if "Category:" not in docstring or "Mutates:" not in docstring: issues.append("docstring-missing-header") out.append({ "name": name, "registered_in_metadata": registered_in_metadata, "docstring": (docstring.splitlines()[0] if docstring else None), "suggestion": suggestion, "issues": issues, **info, }) return out # --- Endpoints FastAPI --- _DECORATOR_METHODS = {"get", "post", "put", "delete", "patch"} def audit_endpoints() -> list[dict]: main_path = os.path.join(BASE_DIR, "main.py") try: source = open(main_path, "r", encoding="utf-8").read() tree = ast.parse(source) except (OSError, SyntaxError): return [] endpoints = [] for node in ast.walk(tree): if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): continue for dec in node.decorator_list: if not isinstance(dec, ast.Call): continue func = dec.func method = None if isinstance(func, ast.Attribute) and func.attr in _DECORATOR_METHODS: method = func.attr.upper() if not method or not dec.args: continue arg0 = dec.args[0] if not isinstance(arg0, ast.Constant) or not isinstance(arg0.value, str): continue path = arg0.value # Heurística tool-safe: descartar SSE/stream/exports binarios tool_safe = not any( token in path for token in ("/stream", "/sse", "/exports/") ) endpoints.append({ "method": method, "path": path, "function": node.name, "is_write": method in {"POST", "PUT", "DELETE", "PATCH"}, "tool_safe": tool_safe, }) return endpoints # --- Cobertura de docs --- def audit_doc_coverage(scripts: list[dict]) -> dict: docs = {} for fname in ("CLAUDE.md", "AGENTS.md"): fpath = os.path.join(BASE_DIR, fname) try: docs[fname] = open(fpath, "r", encoding="utf-8").read() except OSError: docs[fname] = "" coverage = {} for s in scripts: coverage[s["name"]] = { doc: (s["name"] in content or s["name"][:-3] in content) for doc, content in docs.items() } not_documented = [name for name, hits in coverage.items() if not any(hits.values())] return {"by_script": coverage, "not_documented_anywhere": sorted(not_documented)} def build_report() -> dict: import script_runner registered = set(script_runner.SCRIPTS_METADATA.keys()) scripts = audit_scripts(registered) endpoints = audit_endpoints() doc_cov = audit_doc_coverage(scripts) total_scripts = len(scripts) orphans = [s for s in scripts if not s["registered_in_metadata"] and s["category"] != "utility"] issues_total = sum(len(s["issues"]) for s in scripts) mutators_missing_apply = [ s["name"] for s in scripts if s["is_mutator"] and "mutator-without-apply-flag" in s["issues"] ] mutators_missing_run_id = [ s["name"] for s in scripts if s["is_mutator"] and "mutator-without-run-id" in s["issues"] ] missing_json = [s["name"] for s in scripts if "missing-json-flag" in s["issues"]] missing_docstring_header = [s["name"] for s in scripts if "docstring-missing-header" in s["issues"]] return { "generated_at": datetime.now(timezone.utc).isoformat(), "summary": { "scripts_total": total_scripts, "scripts_registered": sum(1 for s in scripts if s["registered_in_metadata"]), "scripts_orphan": len(orphans), "endpoints_total": len(endpoints), "endpoints_tool_safe": sum(1 for e in endpoints if e["tool_safe"]), "issues_total": issues_total, "mutators_missing_apply": mutators_missing_apply, "mutators_missing_run_id": mutators_missing_run_id, "scripts_missing_json": missing_json, "scripts_missing_docstring_header": missing_docstring_header, "scripts_not_documented": doc_cov["not_documented_anywhere"], }, "scripts": scripts, "endpoints": endpoints, "doc_coverage": doc_cov, } def print_summary(report: dict) -> None: s = report["summary"] print("=== MP Manager — Agent Readiness ===") print(f"Scripts: {s['scripts_total']} total, {s['scripts_registered']} registrados, {s['scripts_orphan']} huérfanos") print(f"Endpoints: {s['endpoints_total']} total, {s['endpoints_tool_safe']} tool-safe") print(f"Issues: {s['issues_total']}") if s["mutators_missing_apply"]: print(f" - Mutadores sin --apply: {len(s['mutators_missing_apply'])}") if s["mutators_missing_run_id"]: print(f" - Mutadores sin --run-id: {len(s['mutators_missing_run_id'])}") if s["scripts_missing_json"]: print(f" - Sin --json: {len(s['scripts_missing_json'])}") if s["scripts_missing_docstring_header"]: print(f" - Docstring sin header: {len(s['scripts_missing_docstring_header'])}") if s["scripts_not_documented"]: print(f" - No mencionados en docs: {len(s['scripts_not_documented'])}") print(f"\nReporte completo: {paths.AGENT_AUDIT_REPORT}") def main(): args = parse_args() os.makedirs(paths.AGENT_DIR, exist_ok=True) report = build_report() with open(paths.AGENT_AUDIT_REPORT, "w", encoding="utf-8") as f: json.dump(report, f, indent=2, ensure_ascii=False) if args.json: print(json.dumps(report, indent=2, ensure_ascii=False)) else: print_summary(report) if args.fail_on_issues and report["summary"]["issues_total"] > 0: return 1 return 0 if __name__ == "__main__": sys.exit(main())