#!/usr/bin/env python3 """Audit script compliance with MP custom-field safety rules.""" import argparse import os import re ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) SCRIPTS_DIR = os.path.join(ROOT_DIR, "scripts") SKIP_FILES = {"common.py", "audit_script_compliance.py"} def read_text(path): with open(path, "r", encoding="utf-8", errors="replace") as fh: return fh.read() def line_number(text, index): return text.count("\n", 0, index) + 1 def line_at(text, index): start = text.rfind("\n", 0, index) + 1 end = text.find("\n", index) if end == -1: end = len(text) return text[start:end] def find_issues(path): text = read_text(path) issues = [] uses_custom_fields = bool(re.search(r"customFields|custom_fields_json|custom_fields|field_id|fieldId", text)) mutates_custom_fields = bool(re.search(r"update_contact|create_contact|update_opportunity|create_opportunity|\"PUT\"|\"POST\"", text)) and "customFields" in text schema_calls = bool(re.search(r"get_object_schema|get_schemas|/objects/", text)) if uses_custom_fields and not schema_calls: issues.append((1, "Lee o compara custom fields sin resolver schema dinamico en el script.")) for match in re.finditer(r"[furbFURB]?[\"']https://services\.leadconnectorhq\.com/objects/[^\"']+|[furbFURB]?[\"']/objects/\{[^}]+\}|[furbFURB]?[\"']/objects/\w+", text): prefix = text[max(0, match.start() - 600):match.start()] if "/objects/" not in prefix and "get_objects_catalog" not in prefix: issues.append((line_number(text, match.start()), "Posible llamada a schema de objeto sin consulta previa al catalogo /objects/.")) for match in re.finditer(r"customFields\s*[:=]\s*\{", text): issues.append((line_number(text, match.start()), "customFields parece construirse como diccionario; GHL espera lista de {id, value}.")) for match in re.finditer(r"\{\s*[\"']id[\"']\s*:\s*[\"'][A-Za-z0-9_-]{8,}[\"']", text): if line_at(text, match.start()).strip().startswith("#"): continue issues.append((line_number(text, match.start()), "Posible ID hardcodeado en payload de custom field.")) if mutates_custom_fields and "--dry-run" not in text and "--apply" not in text: issues.append((1, "Script mutador de custom fields sin --dry-run o --apply evidente.")) return sorted(set(issues)) def audit_scripts(): results = [] for filename in sorted(os.listdir(SCRIPTS_DIR)): if not filename.endswith(".py") or filename in SKIP_FILES: continue path = os.path.join(SCRIPTS_DIR, filename) if not os.path.isfile(path): continue issues = find_issues(path) results.append((filename, issues)) return results def main(): parser = argparse.ArgumentParser(description="Audita cumplimiento tecnico de scripts MP.") parser.add_argument("--fail-on-issues", action="store_true", help="Retorna codigo 1 si encuentra hallazgos.") args = parser.parse_args() results = audit_scripts() issue_count = sum(len(issues) for _, issues in results) print("=== AUDITORIA DE CUMPLIMIENTO DE SCRIPTS ===") print(f"Scripts auditados: {len(results)}") print(f"Hallazgos: {issue_count}\n") for filename, issues in results: status = "OK" if not issues else "REVISAR" print(f"[{status}] {filename}") for line, message in issues: print(f" L{line}: {message}") if issue_count and args.fail_on_issues: raise SystemExit(1) if __name__ == "__main__": main()