Files
MP-Manager/scripts/audit_agent_readiness.py
T
2026-05-30 14:31:19 -06:00

310 lines
11 KiB
Python

"""Audit del repo para preparar la capa agentica MCP.
Category: audit
Mutates: no
Tool-safe: yes
Recorre `main.py`, `script_runner.py` y `scripts/` para producir un reporte JSON
con el estado de cada endpoint y script frente a su uso como tool LLM:
- endpoints FastAPI (read/write, tool-safe heurístico)
- scripts: presencia en SCRIPTS_METADATA, flags soportadas (--json, --dry-run,
--apply, --run-id), docstring, clasificación mutador/read-only, decisión
sugerida para los huérfanos
- huecos de documentación (scripts no mencionados en CLAUDE.md / AGENTS.md)
Salida: `generated/agent/audit_report.json` (sobreescribe). Stdout: resumen
humano. Con `--json` imprime el reporte completo a stdout en lugar del resumen.
Uso:
python scripts/audit_agent_readiness.py # resumen + escribe JSON
python scripts/audit_agent_readiness.py --json # JSON a stdout
python scripts/audit_agent_readiness.py --fail-on-issues
"""
from __future__ import annotations
import argparse
import ast
import json
import os
import re
import sys
from datetime import datetime, timezone
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
SCRIPTS_DIR = os.path.join(BASE_DIR, "scripts")
sys.path.insert(0, BASE_DIR)
import paths # noqa: E402
# Categorías de scripts por prefijo / patrón
READ_ONLY_PREFIXES = (
"audit_", "analyze_", "find_", "check_", "health_check_", "compare_",
"mp_contact_search", "mp_opportunity_search", "mp_opportunities_status",
"mp_branches_deep_audit", "daily_summary_", "search_", "export_",
"monitor_", "full_audit_", "full_autos_", "ghl_branch_analysis",
)
MUTATOR_PREFIXES = (
"fix_", "migrate_", "move_", "update_", "sync_", "cleanup_", "reconcile_",
"fill_", "create_", "apply_", "align_", "backfill_", "merge_", "dedupe_",
"tag_", "run_origen_", "fuente_prospecto_", "canal_origen_",
)
UTILITY_FILES = {"common.py", "email_otp_reader.py"}
BROWSER_PREFIX = "ghl_browser_"
def parse_args():
p = argparse.ArgumentParser(description=__doc__)
p.add_argument("--json", action="store_true", help="Imprime el reporte completo a stdout.")
p.add_argument("--fail-on-issues", action="store_true", help="Exit 1 si hay hallazgos.")
return p.parse_args()
def _extract_docstring(tree: ast.AST) -> str | None:
return ast.get_docstring(tree) if isinstance(tree, ast.Module) else None
def _has_argparse_flag(source: str, *flags: str) -> bool:
return any(re.search(rf"['\"]{re.escape(f)}['\"]", source) for f in flags)
def _calls_mutating_http(source: str) -> bool:
# Heurística: requests.post/put/delete o ghl_client.* con métodos mutadores
return bool(re.search(r"\b(requests|session)\.(post|put|delete|patch)\b", source)) or bool(
re.search(r"ghl_client\.\w*(post|put|delete|create|update|delete)", source, re.I)
)
def classify_script(name: str, source: str) -> dict:
stem = name[:-3] if name.endswith(".py") else name
is_utility = name in UTILITY_FILES
is_browser = stem.startswith(BROWSER_PREFIX)
if is_utility:
category = "utility"
elif is_browser:
category = "browser"
elif any(stem.startswith(p) for p in READ_ONLY_PREFIXES):
category = "audit"
elif any(stem.startswith(p) for p in MUTATOR_PREFIXES):
category = "mutator"
else:
category = "unknown"
has_mut_http = _calls_mutating_http(source)
# Mutador efectivo = categoría mutator OR detecta llamadas HTTP mutadoras OR escribe en GHL via ghl_client
is_mutator = category == "mutator" or (category not in ("utility",) and has_mut_http)
return {
"category": category,
"is_mutator": is_mutator,
"has_dry_run_flag": _has_argparse_flag(source, "--dry-run", "--apply"),
"has_apply_flag": _has_argparse_flag(source, "--apply"),
"has_run_id_flag": _has_argparse_flag(source, "--run-id"),
"has_json_flag": _has_argparse_flag(source, "--json"),
"calls_mutating_http": has_mut_http,
}
def audit_scripts(registered: set[str]) -> list[dict]:
out = []
for name in sorted(os.listdir(SCRIPTS_DIR)):
if not name.endswith(".py"):
continue
path = os.path.join(SCRIPTS_DIR, name)
try:
source = open(path, "r", encoding="utf-8").read()
except OSError:
continue
try:
tree = ast.parse(source)
docstring = _extract_docstring(tree)
except SyntaxError:
docstring = None
info = classify_script(name, source)
registered_in_metadata = name in registered
# Decisión sugerida para huérfanos
if info["category"] == "utility":
suggestion = "keep_utility"
elif registered_in_metadata:
suggestion = "ok"
elif info["category"] == "browser":
suggestion = "register"
elif info["category"] in ("audit", "mutator"):
suggestion = "register"
else:
suggestion = "review"
# Issues por script
issues = []
if info["is_mutator"]:
if not info["has_apply_flag"]:
issues.append("mutator-without-apply-flag")
if not info["has_run_id_flag"]:
issues.append("mutator-without-run-id")
if info["category"] in ("audit", "mutator") and not info["has_json_flag"]:
issues.append("missing-json-flag")
if not docstring:
issues.append("missing-docstring")
elif info["category"] != "utility":
# docstring header esperado: Category/Mutates/Tool-safe
if "Category:" not in docstring or "Mutates:" not in docstring:
issues.append("docstring-missing-header")
out.append({
"name": name,
"registered_in_metadata": registered_in_metadata,
"docstring": (docstring.splitlines()[0] if docstring else None),
"suggestion": suggestion,
"issues": issues,
**info,
})
return out
# --- Endpoints FastAPI ---
_DECORATOR_METHODS = {"get", "post", "put", "delete", "patch"}
def audit_endpoints() -> list[dict]:
main_path = os.path.join(BASE_DIR, "main.py")
try:
source = open(main_path, "r", encoding="utf-8").read()
tree = ast.parse(source)
except (OSError, SyntaxError):
return []
endpoints = []
for node in ast.walk(tree):
if not isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
continue
for dec in node.decorator_list:
if not isinstance(dec, ast.Call):
continue
func = dec.func
method = None
if isinstance(func, ast.Attribute) and func.attr in _DECORATOR_METHODS:
method = func.attr.upper()
if not method or not dec.args:
continue
arg0 = dec.args[0]
if not isinstance(arg0, ast.Constant) or not isinstance(arg0.value, str):
continue
path = arg0.value
# Heurística tool-safe: descartar SSE/stream/exports binarios
tool_safe = not any(
token in path for token in ("/stream", "/sse", "/exports/")
)
endpoints.append({
"method": method,
"path": path,
"function": node.name,
"is_write": method in {"POST", "PUT", "DELETE", "PATCH"},
"tool_safe": tool_safe,
})
return endpoints
# --- Cobertura de docs ---
def audit_doc_coverage(scripts: list[dict]) -> dict:
docs = {}
for fname in ("CLAUDE.md", "AGENTS.md"):
fpath = os.path.join(BASE_DIR, fname)
try:
docs[fname] = open(fpath, "r", encoding="utf-8").read()
except OSError:
docs[fname] = ""
coverage = {}
for s in scripts:
coverage[s["name"]] = {
doc: (s["name"] in content or s["name"][:-3] in content)
for doc, content in docs.items()
}
not_documented = [name for name, hits in coverage.items() if not any(hits.values())]
return {"by_script": coverage, "not_documented_anywhere": sorted(not_documented)}
def build_report() -> dict:
import script_runner
registered = set(script_runner.SCRIPTS_METADATA.keys())
scripts = audit_scripts(registered)
endpoints = audit_endpoints()
doc_cov = audit_doc_coverage(scripts)
total_scripts = len(scripts)
orphans = [s for s in scripts if not s["registered_in_metadata"] and s["category"] != "utility"]
issues_total = sum(len(s["issues"]) for s in scripts)
mutators_missing_apply = [
s["name"] for s in scripts if s["is_mutator"] and "mutator-without-apply-flag" in s["issues"]
]
mutators_missing_run_id = [
s["name"] for s in scripts if s["is_mutator"] and "mutator-without-run-id" in s["issues"]
]
missing_json = [s["name"] for s in scripts if "missing-json-flag" in s["issues"]]
missing_docstring_header = [s["name"] for s in scripts if "docstring-missing-header" in s["issues"]]
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"summary": {
"scripts_total": total_scripts,
"scripts_registered": sum(1 for s in scripts if s["registered_in_metadata"]),
"scripts_orphan": len(orphans),
"endpoints_total": len(endpoints),
"endpoints_tool_safe": sum(1 for e in endpoints if e["tool_safe"]),
"issues_total": issues_total,
"mutators_missing_apply": mutators_missing_apply,
"mutators_missing_run_id": mutators_missing_run_id,
"scripts_missing_json": missing_json,
"scripts_missing_docstring_header": missing_docstring_header,
"scripts_not_documented": doc_cov["not_documented_anywhere"],
},
"scripts": scripts,
"endpoints": endpoints,
"doc_coverage": doc_cov,
}
def print_summary(report: dict) -> None:
s = report["summary"]
print("=== MP Manager — Agent Readiness ===")
print(f"Scripts: {s['scripts_total']} total, {s['scripts_registered']} registrados, {s['scripts_orphan']} huérfanos")
print(f"Endpoints: {s['endpoints_total']} total, {s['endpoints_tool_safe']} tool-safe")
print(f"Issues: {s['issues_total']}")
if s["mutators_missing_apply"]:
print(f" - Mutadores sin --apply: {len(s['mutators_missing_apply'])}")
if s["mutators_missing_run_id"]:
print(f" - Mutadores sin --run-id: {len(s['mutators_missing_run_id'])}")
if s["scripts_missing_json"]:
print(f" - Sin --json: {len(s['scripts_missing_json'])}")
if s["scripts_missing_docstring_header"]:
print(f" - Docstring sin header: {len(s['scripts_missing_docstring_header'])}")
if s["scripts_not_documented"]:
print(f" - No mencionados en docs: {len(s['scripts_not_documented'])}")
print(f"\nReporte completo: {paths.AGENT_AUDIT_REPORT}")
def main():
args = parse_args()
os.makedirs(paths.AGENT_DIR, exist_ok=True)
report = build_report()
with open(paths.AGENT_AUDIT_REPORT, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, ensure_ascii=False)
if args.json:
print(json.dumps(report, indent=2, ensure_ascii=False))
else:
print_summary(report)
if args.fail_on_issues and report["summary"]["issues_total"] > 0:
return 1
return 0
if __name__ == "__main__":
sys.exit(main())