#!/usr/bin/env python3 """Audit cross-object: campos personalizados de contact vs opportunity. Muchos campos están "repetidos" entre los objetos `contact` y `opportunity` dentro de una misma cuenta (Marca o sucursal). Por ejemplo: contact."Canal de Origen" ↔ opportunity."CANAL DE ORIGEN" contact."Fuente de Prospecto" ↔ opportunity."Fuente de Prospecto" contact."Sucursal" (SINGLE_OPTIONS) ↔ opportunity."Sucursal" (TEXT) contact."TIENDA" ↔ opportunity."TIENDA" Como el fieldKey difiere por objeto (`contact.*` vs `opportunity.*`), el match debe hacerse por nombre normalizado. Para cada par homólogo emitimos hallazgos: NAME_CASE_MISMATCH — mismo nombre normalizado pero casing distinto ("Canal de Origen" vs "CANAL DE ORIGEN"). DATATYPE_MISMATCH — el mismo concepto se modela como picklist en un objeto y como texto libre en el otro (riesgo alto de datos no comparables). OPTIONS_MISSING_* — opciones presentes en un objeto y ausentes en el otro (comparación case-sensitive). OPTIONS_CASE_MISMATCH — la opción existe en ambos lados pero con casing distinto ("Sucursal" vs "SUCURSAL"). OPTIONS_NOT_UPPERCASE — la opción contiene caracteres en minúscula y el convenio para este campo es MAYÚSCULAS. (Solo se emite para campos cuya mayoría de opciones ya están en mayúsculas — heurística.) ONLY_IN_CONTACT — el campo existe solo en contact (no se compara). ONLY_IN_OPPORTUNITY — el campo existe solo en opportunity. Output: JSON detallado + XLSX para revisión humana en generated/reports/audit_custom_fields/. """ import argparse import datetime import json import os import sys import unicodedata import warnings from collections import defaultdict warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!") ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) import sync_engine # noqa: E402 import script_audit # noqa: E402 BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3" PICKLIST_TYPES = ("SINGLE_OPTIONS", "MULTIPLE_OPTIONS", "RADIO", "CHECKBOX") def normalize_name(value): s = str(value or "").strip().lower() s = unicodedata.normalize("NFKD", s) s = "".join(c for c in s if not unicodedata.combining(c)) s = " ".join(s.split()) # Quitar corchetes y signos de interrogación/exclamación para igualar # variantes tipo "[Correo_Empresa]" y "¿Cuándo necesitas el dinero?". for ch in "[](){}¿?¡!": s = s.replace(ch, "") return " ".join(s.split()) def extract_option_labels(field): raw = field.get("options") if isinstance(raw, list) and raw: labels = [] for opt in raw: if isinstance(opt, dict): label = opt.get("label") if label is None: label = opt.get("key") if label is not None: labels.append(label) elif opt is not None: labels.append(str(opt)) return labels legacy = field.get("picklistOptions") if isinstance(legacy, list): return [str(o) for o in legacy if o is not None] return [] def fetch_custom_fields(account, object_key): raw = sync_engine.ghl_client.get_object_schema_fields( account["token"], account["location_id"], object_key ) return [ f for f in (raw or []) if f.get("id") and f.get("name") and not f.get("standard") and f.get("dataType") != "STANDARD_FIELD" ] def is_mostly_uppercase(labels): """¿La mayoría de las opciones están en MAYÚSCULAS? Solo cuentan letras (ignoramos números, puntuación, espacios). Si ≥ 60% de las opciones que tienen alguna letra están totalmente en mayúsculas, asumimos que el convenio del campo es MAYÚSCULAS. """ candidates = [l for l in labels if any(c.isalpha() for c in str(l))] if not candidates: return False upper_count = sum( 1 for l in candidates if str(l) == str(l).upper() ) return upper_count / len(candidates) >= 0.60 def label_is_uppercase(label): s = str(label) if not any(c.isalpha() for c in s): return True return s == s.upper() def compare_options(contact_labels, opp_labels, field_name, uppercase_convention): """Devuelve lista de findings comparando dos sets de labels. - OPTIONS_MISSING_IN_OPPORTUNITY / OPTIONS_MISSING_IN_CONTACT: presentes en un lado y ausentes (case-sensitive) en el otro. - OPTIONS_CASE_MISMATCH: misma label normalizada (lower) pero textos distintos. Si solo difieren en case lo destacamos como case_only=True. - OPTIONS_NOT_UPPERCASE: si el campo tiene convenio MAYÚSCULAS, listar las opciones que rompen el convenio en ambos lados. """ findings = [] contact_set = set(contact_labels) opp_set = set(opp_labels) contact_by_norm = defaultdict(list) opp_by_norm = defaultdict(list) for l in contact_labels: contact_by_norm[str(l).strip().lower()].append(l) for l in opp_labels: opp_by_norm[str(l).strip().lower()].append(l) # Diferencias case-sensitive normales (texto totalmente distinto) truly_missing_in_opp = [ l for l in contact_labels if str(l).strip().lower() not in opp_by_norm ] truly_missing_in_contact = [ l for l in opp_labels if str(l).strip().lower() not in contact_by_norm ] # Casos donde existe el "mismo" texto en ambos pero el casing difiere case_mismatches = [] for norm, contact_variants in contact_by_norm.items(): if norm not in opp_by_norm: continue opp_variants = opp_by_norm[norm] # Si las labels exactas no aparecen en el otro lado, es case mismatch contact_exact = set(contact_variants) opp_exact = set(opp_variants) if contact_exact != opp_exact: case_mismatches.append({ "normalized": norm, "contact_variants": sorted(contact_exact), "opportunity_variants": sorted(opp_exact), "case_only": all( v.lower() == norm for v in contact_exact | opp_exact ), }) if truly_missing_in_opp: findings.append({ "category": "OPTIONS_MISSING_IN_OPPORTUNITY", "field_name": field_name, "labels": truly_missing_in_opp, "details": ( f"{len(truly_missing_in_opp)} opción(es) existen en contact " "y NO existen (ni en otro casing) en opportunity." ), }) if truly_missing_in_contact: findings.append({ "category": "OPTIONS_MISSING_IN_CONTACT", "field_name": field_name, "labels": truly_missing_in_contact, "details": ( f"{len(truly_missing_in_contact)} opción(es) existen en opportunity " "y NO existen (ni en otro casing) en contact." ), }) if case_mismatches: findings.append({ "category": "OPTIONS_CASE_MISMATCH", "field_name": field_name, "mismatches": case_mismatches, "details": ( f"{len(case_mismatches)} opción(es) presentes en ambos objetos " "con casing/texto distinto." ), }) if uppercase_convention: offenders_contact = [l for l in contact_labels if not label_is_uppercase(l)] offenders_opp = [l for l in opp_labels if not label_is_uppercase(l)] if offenders_contact or offenders_opp: findings.append({ "category": "OPTIONS_NOT_UPPERCASE", "field_name": field_name, "contact_offenders": offenders_contact, "opportunity_offenders": offenders_opp, "details": ( "El campo aparenta usar convenio MAYÚSCULAS pero contiene " "opciones en minúsculas/mixed-case." ), }) return findings def audit_location(location_name, location_id, contact_fields, opportunity_fields): """Compara contact↔opportunity dentro de una location.""" findings = [] contact_by_norm = defaultdict(list) opp_by_norm = defaultdict(list) for f in contact_fields: contact_by_norm[normalize_name(f.get("name"))].append(f) for f in opportunity_fields: opp_by_norm[normalize_name(f.get("name"))].append(f) all_norm_names = set(contact_by_norm) | set(opp_by_norm) for norm in sorted(all_norm_names): cs = contact_by_norm.get(norm, []) os_ = opp_by_norm.get(norm, []) if cs and not os_: # No es necesariamente un problema — muchos campos solo aplican a uno # de los dos objetos. Lo reportamos en "informativo" pero sin marca crítica. findings.append({ "category": "ONLY_IN_CONTACT", "normalized_name": norm, "contact_name": cs[0].get("name"), "contact_dataType": cs[0].get("dataType"), "details": "Campo presente solo en contact; sin contraparte en opportunity.", }) continue if os_ and not cs: findings.append({ "category": "ONLY_IN_OPPORTUNITY", "normalized_name": norm, "opportunity_name": os_[0].get("name"), "opportunity_dataType": os_[0].get("dataType"), "details": "Campo presente solo en opportunity; sin contraparte en contact.", }) continue # Hay match. Como puede haber duplicados intra-objeto, comparamos pares. # En la práctica esperamos len==1 en ambos lados. for c in cs: for o in os_: base = { "normalized_name": norm, "contact_field_name": c.get("name"), "opportunity_field_name": o.get("name"), "contact_field_id": c.get("id"), "opportunity_field_id": o.get("id"), "contact_fieldKey": c.get("fieldKey"), "opportunity_fieldKey": o.get("fieldKey"), } # Casing del nombre del field if c.get("name") != o.get("name"): findings.append({ **base, "category": "NAME_CASE_MISMATCH", "details": ( f"Nombre del campo difiere entre objetos: " f"contact={c.get('name')!r}, opportunity={o.get('name')!r}." ), }) # dataType if c.get("dataType") != o.get("dataType"): findings.append({ **base, "category": "DATATYPE_MISMATCH", "contact_dataType": c.get("dataType"), "opportunity_dataType": o.get("dataType"), "details": ( f"dataType distinto: contact={c.get('dataType')}, " f"opportunity={o.get('dataType')}. El mismo concepto " "se está modelando con tipos distintos." ), }) # Opciones: solo si ambos tienen tipo picklist if ( c.get("dataType") in PICKLIST_TYPES and o.get("dataType") in PICKLIST_TYPES ): c_labels = extract_option_labels(c) o_labels = extract_option_labels(o) if c_labels or o_labels: # convenio MAYÚSCULAS si la mayoría en cualquiera de los # dos ya está en uppercase uppercase = ( is_mostly_uppercase(c_labels) or is_mostly_uppercase(o_labels) ) for f in compare_options(c_labels, o_labels, c.get("name"), uppercase): findings.append({**base, **f}) return findings def write_json(report, path): with open(path, "w", encoding="utf-8") as fh: json.dump(report, fh, ensure_ascii=False, indent=2) def write_xlsx(report, path): import openpyxl from openpyxl.styles import Font, PatternFill, Alignment wb = openpyxl.Workbook() header_font = Font(bold=True, color="FFFFFF") header_fill = PatternFill("solid", fgColor="305496") severity_fills = { "DATATYPE_MISMATCH": PatternFill("solid", fgColor="F8CBAD"), "OPTIONS_MISSING_IN_OPPORTUNITY": PatternFill("solid", fgColor="FFE699"), "OPTIONS_MISSING_IN_CONTACT": PatternFill("solid", fgColor="FFE699"), "OPTIONS_CASE_MISMATCH": PatternFill("solid", fgColor="FFD966"), "OPTIONS_NOT_UPPERCASE": PatternFill("solid", fgColor="C6E0B4"), "NAME_CASE_MISMATCH": PatternFill("solid", fgColor="BDD7EE"), } def write_header(ws, headers): ws.append(headers) for cell in ws[1]: cell.font = header_font cell.fill = header_fill cell.alignment = Alignment(horizontal="center") ws.freeze_panes = "A2" # Sheet 1: Resumen ws_sum = wb.active ws_sum.title = "Resumen" write_header(ws_sum, [ "Cuenta", "Location ID", "Total hallazgos", "Name case", "DataType", "Opts missing→opp", "Opts missing→contact", "Opts case", "Opts not UPPER", "Solo en contact", "Solo en opportunity", ]) for loc_id, loc_data in report["locations"].items(): findings = loc_data["findings"] ws_sum.append([ loc_data["name"], loc_id, len(findings), sum(1 for f in findings if f["category"] == "NAME_CASE_MISMATCH"), sum(1 for f in findings if f["category"] == "DATATYPE_MISMATCH"), sum(1 for f in findings if f["category"] == "OPTIONS_MISSING_IN_OPPORTUNITY"), sum(1 for f in findings if f["category"] == "OPTIONS_MISSING_IN_CONTACT"), sum(1 for f in findings if f["category"] == "OPTIONS_CASE_MISMATCH"), sum(1 for f in findings if f["category"] == "OPTIONS_NOT_UPPERCASE"), sum(1 for f in findings if f["category"] == "ONLY_IN_CONTACT"), sum(1 for f in findings if f["category"] == "ONLY_IN_OPPORTUNITY"), ]) for col_letter in ["A", "B"]: ws_sum.column_dimensions[col_letter].width = 32 # Sheet 2: Hallazgos críticos (excluye ONLY_IN_*) ws_f = wb.create_sheet("Hallazgos") write_header(ws_f, [ "Cuenta", "Location ID", "Categoría", "Campo (normalizado)", "Nombre en contact", "Nombre en opportunity", "dataType contact", "dataType opp", "Labels / detalles", ]) critical = { "NAME_CASE_MISMATCH", "DATATYPE_MISMATCH", "OPTIONS_MISSING_IN_OPPORTUNITY", "OPTIONS_MISSING_IN_CONTACT", "OPTIONS_CASE_MISMATCH", "OPTIONS_NOT_UPPERCASE", } for loc_id, loc_data in report["locations"].items(): for f in loc_data["findings"]: if f["category"] not in critical: continue payload_bits = [] if f.get("labels"): payload_bits.append("labels: " + "; ".join(map(str, f["labels"]))) if f.get("mismatches"): for m in f["mismatches"]: payload_bits.append( f" {m['contact_variants']} ↔ {m['opportunity_variants']}" + (" (case-only)" if m.get("case_only") else "") ) if f.get("contact_offenders") or f.get("opportunity_offenders"): payload_bits.append("contact: " + "; ".join(map(str, f.get("contact_offenders") or []))) payload_bits.append("opportunity: " + "; ".join(map(str, f.get("opportunity_offenders") or []))) payload_bits.append(f.get("details", "")) row = [ loc_data["name"], loc_id, f["category"], f.get("normalized_name", ""), f.get("contact_field_name", ""), f.get("opportunity_field_name", ""), f.get("contact_dataType", ""), f.get("opportunity_dataType", ""), " | ".join(b for b in payload_bits if b), ] ws_f.append(row) fill = severity_fills.get(f["category"]) if fill: for cell in ws_f[ws_f.max_row]: cell.fill = fill for col_letter, width in [("A", 32), ("C", 28), ("D", 28), ("E", 30), ("F", 30), ("I", 80)]: ws_f.column_dimensions[col_letter].width = width # Sheet 3: Solo en uno (informativo) ws_o = wb.create_sheet("Solo en un objeto") write_header(ws_o, ["Cuenta", "Location ID", "Categoría", "Nombre", "dataType"]) for loc_id, loc_data in report["locations"].items(): for f in loc_data["findings"]: if f["category"] in ("ONLY_IN_CONTACT", "ONLY_IN_OPPORTUNITY"): ws_o.append([ loc_data["name"], loc_id, f["category"], f.get("contact_name") or f.get("opportunity_name"), f.get("contact_dataType") or f.get("opportunity_dataType"), ]) for col_letter, width in [("A", 32), ("D", 36)]: ws_o.column_dimensions[col_letter].width = width wb.save(path) def select_targets(args, accounts): if args.location: matches = [a for a in accounts if a["location_id"] == args.location] if not matches: raise SystemExit(f"Location {args.location} no encontrada en el CSV.") return matches if args.all: return accounts if args.brand_only: return [a for a in accounts if a["location_id"] == BRAND_LOCATION_ID] raise SystemExit("Especifica --location , --brand-only o --all.") def main(): parser = argparse.ArgumentParser( description="Audit cross-object: contact↔opportunity custom fields dentro de cada cuenta.", ) parser.add_argument("--location", help="Procesar solo esta location (debugging).") parser.add_argument("--brand-only", action="store_true", help="Procesar solo la Marca Principal.") parser.add_argument("--all", action="store_true", help="Procesar Marca + todas las sucursales.") parser.add_argument("--json", help="Ruta de salida JSON (opcional).") parser.add_argument("--xlsx", help="Ruta de salida XLSX (opcional).") parser.add_argument("--run-id", help="Audit run ID (dashboard).") args = parser.parse_args() if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") accounts = sync_engine.parse_accounts_csv() targets = select_targets(args, accounts) report = { "run_id": args.run_id, "timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(), "locations": {}, } for acc in targets: if not script_audit.wait_if_paused_or_stopped(args.run_id): print("\nDetención solicitada. Saliendo.") break loc_id = acc["location_id"] loc_name = acc["nombre"] print(f"\n[LOCATION] {loc_name} ({loc_id})") try: c_fields = fetch_custom_fields(acc, "contact") o_fields = fetch_custom_fields(acc, "opportunity") print(f" contact={len(c_fields)} cf, opportunity={len(o_fields)} cf") findings = audit_location(loc_name, loc_id, c_fields, o_fields) except Exception as exc: print(f" ERROR: {exc}") findings = [{"category": "FETCH_ERROR", "details": str(exc)}] report["locations"][loc_id] = {"name": loc_name, "findings": findings} cats = defaultdict(int) for f in findings: cats[f["category"]] += 1 if cats: for cat, count in sorted(cats.items()): print(f" {cat}: {count}") else: print(" sin discrepancias") from common import REPORT_AUDIT_CUSTOM_FIELDS ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") json_path = args.json or os.path.join(REPORT_AUDIT_CUSTOM_FIELDS, f"audit_cross_object_{ts}.json") xlsx_path = args.xlsx or os.path.join(REPORT_AUDIT_CUSTOM_FIELDS, f"audit_cross_object_{ts}.xlsx") write_json(report, json_path) write_xlsx(report, xlsx_path) total = sum(len(loc["findings"]) for loc in report["locations"].values()) critical_total = sum( 1 for loc in report["locations"].values() for f in loc["findings"] if f["category"] not in ("ONLY_IN_CONTACT", "ONLY_IN_OPPORTUNITY") ) print(f"\n{'=' * 60}") print(f"Total hallazgos: {total} ({critical_total} críticos) en {len(report['locations'])} cuentas") print(f"JSON: {json_path}") print(f"XLSX: {xlsx_path}") if __name__ == "__main__": main()