#!/usr/bin/env python3 """Audit de campos personalizados Marca Principal (Monte Providencia) vs sucursales. Compara el schema de custom fields de contact + opportunity entre la cuenta de Marca (referencia) y cada sucursal. Emite hallazgos categorizados con una acción sugerida (fix_action) para informar reparación posterior. Output: JSON detallado + XLSX para revisión humana. Categorías de hallazgo: MISSING_IN_BRANCH — existe en Marca, no en sucursal EXTRA_IN_BRANCH — existe en sucursal, no en Marca MATCH_FIELDKEY_DIFF_NAME — mismo fieldKey, nombre distinto MATCH_FIELDKEY_DIFF_OPTIONS — mismas keys, opciones distintas MATCH_FIELDKEY_DIFF_DATATYPE — mismo fieldKey, dataType distinto (crítico) MATCH_NAME_DIFF_FIELDKEY — mismo nombre normalizado, fieldKeys distintos DUPLICATE_IN_BRANCH — sucursal tiene varios campos con el mismo fieldKey/nombre DUPLICATE_IN_BRAND — Marca tiene duplicados (anomalía a revisar) FUZZY_NAME_MATCH — nombre similar (>= threshold) sin match exacto fix_action posibles: CREATE_IN_BRANCH, RENAME, ADD_OPTIONS, MANUAL_REVIEW, IMPOSSIBLE (cambio de dataType requiere delete+recreate). """ import argparse import datetime import difflib import json import os import sys import unicodedata import warnings from collections import defaultdict warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!") ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) import sync_engine # noqa: E402 import script_audit # noqa: E402 BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3" DEFAULT_FUZZY_THRESHOLD = 0.85 def normalize_name(value): s = str(value or "").strip().lower() s = unicodedata.normalize("NFKD", s) s = "".join(c for c in s if not unicodedata.combining(c)) return " ".join(s.split()) def fuzzy_ratio(a, b): na, nb = normalize_name(a), normalize_name(b) if not na or not nb: return 0.0 return difflib.SequenceMatcher(None, na, nb).ratio() def fetch_custom_fields(account, object_key): """Devuelve solo custom fields (no standard) del object schema.""" raw = sync_engine.ghl_client.get_object_schema_fields( account["token"], account["location_id"], object_key ) return [ f for f in (raw or []) if f.get("id") and f.get("name") and not f.get("standard") and f.get("dataType") != "STANDARD_FIELD" ] def index_by(fields, key_fn): idx = defaultdict(list) for f in fields: k = key_fn(f) if k: idx[k].append(f) return idx def extract_option_labels(field): """Devuelve la lista de labels de un campo picklist. El API V2 del schema expone `options: [{key, label}, ...]`. Algunos clientes antiguos usaban `picklistOptions: [str]` — mantenemos compat por si reaparece. """ raw = field.get("options") if isinstance(raw, list) and raw: labels = [] for opt in raw: if isinstance(opt, dict): label = opt.get("label") if label is None: label = opt.get("key") if label is not None: labels.append(label) elif opt is not None: labels.append(str(opt)) return labels legacy = field.get("picklistOptions") if isinstance(legacy, list): return [str(o) for o in legacy if o is not None] return [] def options_diff(brand_opts, branch_opts): brand_set = set(brand_opts or []) branch_set = set(branch_opts or []) return sorted(brand_set - branch_set), sorted(branch_set - brand_set) def compare_matched_pair(brand_field, branch_field, object_key, fieldkey_hint=None): """Compara dos campos que ya sabemos son el "mismo" lógicamente. Emite findings por diferencias atributo-a-atributo. `fieldkey_hint` se pasa cuando matcheamos por fieldKey (Capa 1) — en ese caso una diferencia de nombre justifica un RENAME. Si es None (Capa 2, matched por nombre), las fieldKeys son distintas y RENAME no aplica. """ findings = [] brand_name = brand_field.get("name") branch_name = branch_field.get("name") base = { "object": object_key, "field_name": brand_name, "brand_field_id": brand_field.get("id"), "branch_field_id": branch_field.get("id"), "fieldKey": fieldkey_hint or brand_field.get("fieldKey"), } if brand_field.get("dataType") != branch_field.get("dataType"): findings.append({ **base, "category": "MATCH_FIELDKEY_DIFF_DATATYPE", "fix_action": "IMPOSSIBLE", "brand_dataType": brand_field.get("dataType"), "branch_dataType": branch_field.get("dataType"), "details": ( f"dataType {brand_field.get('dataType')} en Marca vs " f"{branch_field.get('dataType')} en sucursal. La API ignora " "cambios de dataType — requiere delete+recreate (data loss)." ), }) return findings # Si dataType difiere, no tiene sentido comparar options if brand_name != branch_name and fieldkey_hint is not None: # Solo emitir RENAME cuando matcheamos por fieldKey — en ese caso sí # tiene sentido alinear el name. Si fieldkey_hint es None matcheamos # por nombre normalizado, las fieldKeys son distintas, y el finding # parent MATCH_NAME_DIFF_FIELDKEY ya cubre el caso. findings.append({ **base, "category": "MATCH_FIELDKEY_DIFF_NAME", "fix_action": "RENAME", "brand_name": brand_name, "branch_name": branch_name, "details": f"Nombres distintos. Mismo fieldKey {fieldkey_hint!r}.", }) if brand_field.get("dataType") in ("SINGLE_OPTIONS", "MULTIPLE_OPTIONS", "RADIO", "CHECKBOX"): missing, extra = options_diff( extract_option_labels(brand_field), extract_option_labels(branch_field), ) if missing or extra: fix = "ADD_OPTIONS" if missing and not extra else "MANUAL_REVIEW" findings.append({ **base, "category": "MATCH_FIELDKEY_DIFF_OPTIONS", "fix_action": fix, "options_missing_in_branch": missing, "options_extra_in_branch": extra, "details": ( f"{len(missing)} opciones faltan en sucursal, " f"{len(extra)} extra en sucursal." ), }) return findings def audit_object(brand_fields, branch_fields, object_key, fuzzy_threshold): """Compara los dos schemas. Estrategia jerárquica de matching: 1. fieldKey exacto 2. nombre normalizado exacto 3. fuzzy match (>= threshold) """ findings = [] matched_brand_ids = set() matched_branch_ids = set() brand_by_fk = index_by(brand_fields, lambda f: f.get("fieldKey")) branch_by_fk = index_by(branch_fields, lambda f: f.get("fieldKey")) # --- Capa 1: match por fieldKey --- for fk, brand_list in brand_by_fk.items(): if len(brand_list) > 1: findings.append({ "object": object_key, "category": "DUPLICATE_IN_BRAND", "fix_action": "MANUAL_REVIEW", "field_name": brand_list[0].get("name"), "fieldKey": fk, "brand_field_ids": [f.get("id") for f in brand_list], "details": f"Marca tiene {len(brand_list)} campos con el mismo fieldKey.", }) if fk not in branch_by_fk: continue branch_list = branch_by_fk[fk] if len(branch_list) > 1: findings.append({ "object": object_key, "category": "DUPLICATE_IN_BRANCH", "fix_action": "MANUAL_REVIEW", "field_name": branch_list[0].get("name"), "fieldKey": fk, "branch_field_ids": [f.get("id") for f in branch_list], "details": f"Sucursal tiene {len(branch_list)} campos con el mismo fieldKey.", }) brand_f = brand_list[0] for branch_f in branch_list: matched_brand_ids.add(brand_f["id"]) matched_branch_ids.add(branch_f["id"]) findings.extend(compare_matched_pair(brand_f, branch_f, object_key, fk)) # --- Capa 2: match por nombre normalizado entre los aún no matcheados --- brand_unmatched_by_norm = index_by( [f for f in brand_fields if f["id"] not in matched_brand_ids], lambda f: normalize_name(f.get("name")), ) branch_unmatched_by_norm = index_by( [f for f in branch_fields if f["id"] not in matched_branch_ids], lambda f: normalize_name(f.get("name")), ) for norm_name, brand_list in brand_unmatched_by_norm.items(): if norm_name not in branch_unmatched_by_norm: continue branch_list = branch_unmatched_by_norm[norm_name] brand_f = brand_list[0] for branch_f in branch_list: matched_brand_ids.add(brand_f["id"]) matched_branch_ids.add(branch_f["id"]) findings.append({ "object": object_key, "category": "MATCH_NAME_DIFF_FIELDKEY", "fix_action": "MANUAL_REVIEW", "field_name": brand_f.get("name"), "branch_name": branch_f.get("name"), "brand_fieldKey": brand_f.get("fieldKey"), "branch_fieldKey": branch_f.get("fieldKey"), "brand_field_id": brand_f["id"], "branch_field_id": branch_f["id"], "details": ( "Nombre normalizado idéntico pero fieldKeys distintos — " "probablemente fueron creados independientemente en cada cuenta." ), }) # También comparar dataType y options para emitir findings adicionales findings.extend(compare_matched_pair(brand_f, branch_f, object_key, None)) # --- Capa 3a: MISSING_IN_BRANCH (Marca tiene, sucursal no) --- for brand_f in brand_fields: if brand_f["id"] in matched_brand_ids: continue findings.append({ "object": object_key, "category": "MISSING_IN_BRANCH", "fix_action": "CREATE_IN_BRANCH", "field_name": brand_f.get("name"), "fieldKey": brand_f.get("fieldKey"), "dataType": brand_f.get("dataType"), "options": extract_option_labels(brand_f), "brand_field_id": brand_f["id"], "details": "Campo presente en Marca, ausente en sucursal.", }) # --- Capa 3b: EXTRA_IN_BRANCH y FUZZY_NAME_MATCH --- for branch_f in branch_fields: if branch_f["id"] in matched_branch_ids: continue # Buscar el mejor fuzzy match contra TODOS los fields de Marca (matcheados o no). best_ratio = 0.0 best_brand = None for brand_f in brand_fields: r = fuzzy_ratio(branch_f.get("name"), brand_f.get("name")) if r > best_ratio: best_ratio = r best_brand = brand_f if best_brand and best_ratio >= fuzzy_threshold: findings.append({ "object": object_key, "category": "FUZZY_NAME_MATCH", "fix_action": "MANUAL_REVIEW", "field_name": branch_f.get("name"), "brand_name": best_brand.get("name"), "ratio": round(best_ratio, 3), "brand_field_id": best_brand["id"], "branch_field_id": branch_f["id"], "brand_fieldKey": best_brand.get("fieldKey"), "branch_fieldKey": branch_f.get("fieldKey"), "details": ( f"Sucursal tiene campo similar al de Marca (ratio={round(best_ratio, 3)}). " "Revisar si es el mismo campo con typo/case distinto." ), }) else: findings.append({ "object": object_key, "category": "EXTRA_IN_BRANCH", "fix_action": "MANUAL_REVIEW", "field_name": branch_f.get("name"), "fieldKey": branch_f.get("fieldKey"), "dataType": branch_f.get("dataType"), "branch_field_id": branch_f["id"], "details": "Campo presente en sucursal, ausente en Marca.", }) return findings def write_json(report, path): with open(path, "w", encoding="utf-8") as fh: json.dump(report, fh, ensure_ascii=False, indent=2) def write_xlsx(report, path): import openpyxl from openpyxl.styles import Font, PatternFill, Alignment wb = openpyxl.Workbook() header_font = Font(bold=True, color="FFFFFF") header_fill = PatternFill("solid", fgColor="305496") severity_fills = { "IMPOSSIBLE": PatternFill("solid", fgColor="F8CBAD"), "MANUAL_REVIEW": PatternFill("solid", fgColor="FFE699"), "RENAME": PatternFill("solid", fgColor="C6E0B4"), "ADD_OPTIONS": PatternFill("solid", fgColor="C6E0B4"), "CREATE_IN_BRANCH": PatternFill("solid", fgColor="BDD7EE"), } def write_header(ws, headers): ws.append(headers) for cell in ws[1]: cell.font = header_font cell.fill = header_fill cell.alignment = Alignment(horizontal="center") ws.freeze_panes = "A2" # Sheet 1: Summary ws_sum = wb.active ws_sum.title = "Resumen" write_header(ws_sum, [ "Sucursal", "Location ID", "Objeto", "Total hallazgos", "Missing en sucursal", "Extra en sucursal", "Diff name", "Diff options", "Diff dataType", "Match por nombre", "Fuzzy", "Duplicados", ]) for loc_id, loc_data in report["branches"].items(): for obj_key in ("contact", "opportunity"): findings = [f for f in loc_data["findings"] if f["object"] == obj_key] ws_sum.append([ loc_data["name"], loc_id, obj_key, len(findings), sum(1 for f in findings if f["category"] == "MISSING_IN_BRANCH"), sum(1 for f in findings if f["category"] == "EXTRA_IN_BRANCH"), sum(1 for f in findings if f["category"] == "MATCH_FIELDKEY_DIFF_NAME"), sum(1 for f in findings if f["category"] == "MATCH_FIELDKEY_DIFF_OPTIONS"), sum(1 for f in findings if f["category"] == "MATCH_FIELDKEY_DIFF_DATATYPE"), sum(1 for f in findings if f["category"] == "MATCH_NAME_DIFF_FIELDKEY"), sum(1 for f in findings if f["category"] == "FUZZY_NAME_MATCH"), sum(1 for f in findings if f["category"] in ("DUPLICATE_IN_BRANCH", "DUPLICATE_IN_BRAND")), ]) for col_letter in ["A", "B", "C"]: ws_sum.column_dimensions[col_letter].width = 28 # Sheet 2: Findings detalle ws_f = wb.create_sheet("Hallazgos") write_header(ws_f, [ "Sucursal", "Location ID", "Objeto", "Categoría", "Fix Action", "Field Name (Marca)", "Field Name (Sucursal)", "Brand fieldKey", "Branch fieldKey", "dataType", "Ratio fuzzy", "Brand Field ID", "Branch Field ID", "Detalles", ]) for loc_id, loc_data in report["branches"].items(): for f in loc_data["findings"]: row = [ loc_data["name"], loc_id, f.get("object"), f.get("category"), f.get("fix_action"), f.get("field_name") or f.get("brand_name") or "", f.get("branch_name") or "", f.get("fieldKey") or f.get("brand_fieldKey") or "", f.get("branch_fieldKey") or "", f.get("dataType") or f.get("brand_dataType") or "", f.get("ratio") or "", f.get("brand_field_id") or "", f.get("branch_field_id") or "", f.get("details") or "", ] ws_f.append(row) fill = severity_fills.get(f.get("fix_action")) if fill: for cell in ws_f[ws_f.max_row]: cell.fill = fill for col_letter, width in [("A", 28), ("D", 28), ("E", 18), ("F", 32), ("G", 32), ("N", 60)]: ws_f.column_dimensions[col_letter].width = width # Sheet 3: Detalle de opciones ws_o = wb.create_sheet("Diff Options") write_header(ws_o, [ "Sucursal", "Location ID", "Objeto", "Field Name", "Opciones faltantes en sucursal", "Opciones extra en sucursal", ]) for loc_id, loc_data in report["branches"].items(): for f in loc_data["findings"]: if f.get("category") == "MATCH_FIELDKEY_DIFF_OPTIONS": ws_o.append([ loc_data["name"], loc_id, f.get("object"), f.get("field_name", ""), "; ".join(f.get("options_missing_in_branch") or []), "; ".join(f.get("options_extra_in_branch") or []), ]) for col_letter, width in [("A", 28), ("D", 32), ("E", 50), ("F", 50)]: ws_o.column_dimensions[col_letter].width = width wb.save(path) def select_targets(args, accounts): if args.location: matches = [a for a in accounts if a["location_id"] == args.location] if not matches: raise SystemExit(f"Location {args.location} no encontrada en el CSV.") return matches if args.all: return [a for a in accounts if a.get("type") == "branch"] raise SystemExit("Especifica --location o --all.") def main(): parser = argparse.ArgumentParser( description="Audit de campos personalizados: compara cada sucursal MP contra Marca Principal.", ) parser.add_argument("--location", help="Procesar solo esta sucursal (debugging).") parser.add_argument("--all", action="store_true", help="Procesar todas las sucursales de MP.") parser.add_argument("--object", choices=["contact", "opportunity", "both"], default="both", help="Objeto a auditar. Default both.") parser.add_argument("--fuzzy-threshold", type=float, default=DEFAULT_FUZZY_THRESHOLD, help=f"Umbral SequenceMatcher para fuzzy match. Default {DEFAULT_FUZZY_THRESHOLD}.") parser.add_argument("--json", help="Ruta de salida JSON (opcional, default audit_custom_fields_.json en cwd).") parser.add_argument("--xlsx", help="Ruta de salida XLSX (opcional, default audit_custom_fields_.xlsx en cwd).") parser.add_argument("--run-id", help="Audit run ID (dashboard).") args = parser.parse_args() if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") accounts = sync_engine.parse_accounts_csv() brand = next((a for a in accounts if a["location_id"] == BRAND_LOCATION_ID), None) if not brand: raise SystemExit(f"Cuenta de Marca {BRAND_LOCATION_ID} no encontrada en el CSV.") targets = select_targets(args, accounts) # Excluir la marca de los targets para no compararla consigo misma. targets = [t for t in targets if t["location_id"] != BRAND_LOCATION_ID] objects_to_audit = ["contact", "opportunity"] if args.object == "both" else [args.object] print(f"[BRAND] {brand['nombre']} ({BRAND_LOCATION_ID})") brand_fields = {} for obj_key in objects_to_audit: brand_fields[obj_key] = fetch_custom_fields(brand, obj_key) print(f" {obj_key}: {len(brand_fields[obj_key])} custom fields") report = { "run_id": args.run_id, "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), "fuzzy_threshold": args.fuzzy_threshold, "brand": { "location_id": BRAND_LOCATION_ID, "name": brand["nombre"], "field_counts": {k: len(v) for k, v in brand_fields.items()}, }, "branches": {}, } for branch in targets: if not script_audit.wait_if_paused_or_stopped(args.run_id): print("\nDetención solicitada. Saliendo.") break print(f"\n[BRANCH] {branch['nombre']} ({branch['location_id']})") all_findings = [] for obj_key in objects_to_audit: try: branch_cf = fetch_custom_fields(branch, obj_key) print(f" {obj_key}: {len(branch_cf)} custom fields") findings = audit_object(brand_fields[obj_key], branch_cf, obj_key, args.fuzzy_threshold) all_findings.extend(findings) except Exception as exc: print(f" ERROR {obj_key}: {exc}") all_findings.append({ "object": obj_key, "category": "FETCH_ERROR", "fix_action": "MANUAL_REVIEW", "details": str(exc), }) report["branches"][branch["location_id"]] = { "name": branch["nombre"], "findings": all_findings, } cats = defaultdict(int) for f in all_findings: cats[f["category"]] += 1 if cats: for cat, count in sorted(cats.items()): print(f" {cat}: {count}") else: print(f" sin discrepancias") # Outputs from common import REPORT_AUDIT_CUSTOM_FIELDS ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") json_path = args.json or os.path.join(REPORT_AUDIT_CUSTOM_FIELDS, f"audit_custom_fields_{ts}.json") xlsx_path = args.xlsx or os.path.join(REPORT_AUDIT_CUSTOM_FIELDS, f"audit_custom_fields_{ts}.xlsx") write_json(report, json_path) write_xlsx(report, xlsx_path) total_findings = sum(len(loc["findings"]) for loc in report["branches"].values()) print(f"\n{'=' * 60}") print(f"Total hallazgos: {total_findings} en {len(report['branches'])} sucursales") print(f"JSON: {json_path}") print(f"XLSX: {xlsx_path}") if __name__ == "__main__": main()