524 lines
21 KiB
Python
524 lines
21 KiB
Python
#!/usr/bin/env python3
|
|
"""Audit cross-object: campos personalizados de contact vs opportunity.
|
|
|
|
Muchos campos están "repetidos" entre los objetos `contact` y `opportunity`
|
|
dentro de una misma cuenta (Marca o sucursal). Por ejemplo:
|
|
|
|
contact."Canal de Origen" ↔ opportunity."CANAL DE ORIGEN"
|
|
contact."Fuente de Prospecto" ↔ opportunity."Fuente de Prospecto"
|
|
contact."Sucursal" (SINGLE_OPTIONS) ↔ opportunity."Sucursal" (TEXT)
|
|
contact."TIENDA" ↔ opportunity."TIENDA"
|
|
|
|
Como el fieldKey difiere por objeto (`contact.*` vs `opportunity.*`), el match
|
|
debe hacerse por nombre normalizado. Para cada par homólogo emitimos hallazgos:
|
|
|
|
NAME_CASE_MISMATCH — mismo nombre normalizado pero casing distinto
|
|
("Canal de Origen" vs "CANAL DE ORIGEN").
|
|
DATATYPE_MISMATCH — el mismo concepto se modela como picklist en un
|
|
objeto y como texto libre en el otro (riesgo
|
|
alto de datos no comparables).
|
|
OPTIONS_MISSING_* — opciones presentes en un objeto y ausentes en el
|
|
otro (comparación case-sensitive).
|
|
OPTIONS_CASE_MISMATCH — la opción existe en ambos lados pero con casing
|
|
distinto ("Sucursal" vs "SUCURSAL").
|
|
OPTIONS_NOT_UPPERCASE — la opción contiene caracteres en minúscula y el
|
|
convenio para este campo es MAYÚSCULAS.
|
|
(Solo se emite para campos cuya mayoría de
|
|
opciones ya están en mayúsculas — heurística.)
|
|
ONLY_IN_CONTACT — el campo existe solo en contact (no se compara).
|
|
ONLY_IN_OPPORTUNITY — el campo existe solo en opportunity.
|
|
|
|
Output: JSON detallado + XLSX para revisión humana en
|
|
generated/reports/audit_custom_fields/.
|
|
"""
|
|
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import os
|
|
import sys
|
|
import unicodedata
|
|
import warnings
|
|
from collections import defaultdict
|
|
|
|
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
|
|
import sync_engine # noqa: E402
|
|
import script_audit # noqa: E402
|
|
|
|
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
|
|
|
|
PICKLIST_TYPES = ("SINGLE_OPTIONS", "MULTIPLE_OPTIONS", "RADIO", "CHECKBOX")
|
|
|
|
|
|
def normalize_name(value):
|
|
s = str(value or "").strip().lower()
|
|
s = unicodedata.normalize("NFKD", s)
|
|
s = "".join(c for c in s if not unicodedata.combining(c))
|
|
s = " ".join(s.split())
|
|
# Quitar corchetes y signos de interrogación/exclamación para igualar
|
|
# variantes tipo "[Correo_Empresa]" y "¿Cuándo necesitas el dinero?".
|
|
for ch in "[](){}¿?¡!":
|
|
s = s.replace(ch, "")
|
|
return " ".join(s.split())
|
|
|
|
|
|
def extract_option_labels(field):
|
|
raw = field.get("options")
|
|
if isinstance(raw, list) and raw:
|
|
labels = []
|
|
for opt in raw:
|
|
if isinstance(opt, dict):
|
|
label = opt.get("label")
|
|
if label is None:
|
|
label = opt.get("key")
|
|
if label is not None:
|
|
labels.append(label)
|
|
elif opt is not None:
|
|
labels.append(str(opt))
|
|
return labels
|
|
legacy = field.get("picklistOptions")
|
|
if isinstance(legacy, list):
|
|
return [str(o) for o in legacy if o is not None]
|
|
return []
|
|
|
|
|
|
def fetch_custom_fields(account, object_key):
|
|
raw = sync_engine.ghl_client.get_object_schema_fields(
|
|
account["token"], account["location_id"], object_key
|
|
)
|
|
return [
|
|
f for f in (raw or [])
|
|
if f.get("id") and f.get("name")
|
|
and not f.get("standard")
|
|
and f.get("dataType") != "STANDARD_FIELD"
|
|
]
|
|
|
|
|
|
def is_mostly_uppercase(labels):
|
|
"""¿La mayoría de las opciones están en MAYÚSCULAS?
|
|
|
|
Solo cuentan letras (ignoramos números, puntuación, espacios). Si ≥ 60%
|
|
de las opciones que tienen alguna letra están totalmente en mayúsculas,
|
|
asumimos que el convenio del campo es MAYÚSCULAS.
|
|
"""
|
|
candidates = [l for l in labels if any(c.isalpha() for c in str(l))]
|
|
if not candidates:
|
|
return False
|
|
upper_count = sum(
|
|
1 for l in candidates
|
|
if str(l) == str(l).upper()
|
|
)
|
|
return upper_count / len(candidates) >= 0.60
|
|
|
|
|
|
def label_is_uppercase(label):
|
|
s = str(label)
|
|
if not any(c.isalpha() for c in s):
|
|
return True
|
|
return s == s.upper()
|
|
|
|
|
|
def compare_options(contact_labels, opp_labels, field_name, uppercase_convention):
|
|
"""Devuelve lista de findings comparando dos sets de labels.
|
|
|
|
- OPTIONS_MISSING_IN_OPPORTUNITY / OPTIONS_MISSING_IN_CONTACT: presentes
|
|
en un lado y ausentes (case-sensitive) en el otro.
|
|
- OPTIONS_CASE_MISMATCH: misma label normalizada (lower) pero textos
|
|
distintos. Si solo difieren en case lo destacamos como case_only=True.
|
|
- OPTIONS_NOT_UPPERCASE: si el campo tiene convenio MAYÚSCULAS, listar las
|
|
opciones que rompen el convenio en ambos lados.
|
|
"""
|
|
findings = []
|
|
contact_set = set(contact_labels)
|
|
opp_set = set(opp_labels)
|
|
|
|
contact_by_norm = defaultdict(list)
|
|
opp_by_norm = defaultdict(list)
|
|
for l in contact_labels:
|
|
contact_by_norm[str(l).strip().lower()].append(l)
|
|
for l in opp_labels:
|
|
opp_by_norm[str(l).strip().lower()].append(l)
|
|
|
|
# Diferencias case-sensitive normales (texto totalmente distinto)
|
|
truly_missing_in_opp = [
|
|
l for l in contact_labels
|
|
if str(l).strip().lower() not in opp_by_norm
|
|
]
|
|
truly_missing_in_contact = [
|
|
l for l in opp_labels
|
|
if str(l).strip().lower() not in contact_by_norm
|
|
]
|
|
|
|
# Casos donde existe el "mismo" texto en ambos pero el casing difiere
|
|
case_mismatches = []
|
|
for norm, contact_variants in contact_by_norm.items():
|
|
if norm not in opp_by_norm:
|
|
continue
|
|
opp_variants = opp_by_norm[norm]
|
|
# Si las labels exactas no aparecen en el otro lado, es case mismatch
|
|
contact_exact = set(contact_variants)
|
|
opp_exact = set(opp_variants)
|
|
if contact_exact != opp_exact:
|
|
case_mismatches.append({
|
|
"normalized": norm,
|
|
"contact_variants": sorted(contact_exact),
|
|
"opportunity_variants": sorted(opp_exact),
|
|
"case_only": all(
|
|
v.lower() == norm for v in contact_exact | opp_exact
|
|
),
|
|
})
|
|
|
|
if truly_missing_in_opp:
|
|
findings.append({
|
|
"category": "OPTIONS_MISSING_IN_OPPORTUNITY",
|
|
"field_name": field_name,
|
|
"labels": truly_missing_in_opp,
|
|
"details": (
|
|
f"{len(truly_missing_in_opp)} opción(es) existen en contact "
|
|
"y NO existen (ni en otro casing) en opportunity."
|
|
),
|
|
})
|
|
if truly_missing_in_contact:
|
|
findings.append({
|
|
"category": "OPTIONS_MISSING_IN_CONTACT",
|
|
"field_name": field_name,
|
|
"labels": truly_missing_in_contact,
|
|
"details": (
|
|
f"{len(truly_missing_in_contact)} opción(es) existen en opportunity "
|
|
"y NO existen (ni en otro casing) en contact."
|
|
),
|
|
})
|
|
if case_mismatches:
|
|
findings.append({
|
|
"category": "OPTIONS_CASE_MISMATCH",
|
|
"field_name": field_name,
|
|
"mismatches": case_mismatches,
|
|
"details": (
|
|
f"{len(case_mismatches)} opción(es) presentes en ambos objetos "
|
|
"con casing/texto distinto."
|
|
),
|
|
})
|
|
|
|
if uppercase_convention:
|
|
offenders_contact = [l for l in contact_labels if not label_is_uppercase(l)]
|
|
offenders_opp = [l for l in opp_labels if not label_is_uppercase(l)]
|
|
if offenders_contact or offenders_opp:
|
|
findings.append({
|
|
"category": "OPTIONS_NOT_UPPERCASE",
|
|
"field_name": field_name,
|
|
"contact_offenders": offenders_contact,
|
|
"opportunity_offenders": offenders_opp,
|
|
"details": (
|
|
"El campo aparenta usar convenio MAYÚSCULAS pero contiene "
|
|
"opciones en minúsculas/mixed-case."
|
|
),
|
|
})
|
|
return findings
|
|
|
|
|
|
def audit_location(location_name, location_id, contact_fields, opportunity_fields):
|
|
"""Compara contact↔opportunity dentro de una location."""
|
|
findings = []
|
|
|
|
contact_by_norm = defaultdict(list)
|
|
opp_by_norm = defaultdict(list)
|
|
for f in contact_fields:
|
|
contact_by_norm[normalize_name(f.get("name"))].append(f)
|
|
for f in opportunity_fields:
|
|
opp_by_norm[normalize_name(f.get("name"))].append(f)
|
|
|
|
all_norm_names = set(contact_by_norm) | set(opp_by_norm)
|
|
|
|
for norm in sorted(all_norm_names):
|
|
cs = contact_by_norm.get(norm, [])
|
|
os_ = opp_by_norm.get(norm, [])
|
|
|
|
if cs and not os_:
|
|
# No es necesariamente un problema — muchos campos solo aplican a uno
|
|
# de los dos objetos. Lo reportamos en "informativo" pero sin marca crítica.
|
|
findings.append({
|
|
"category": "ONLY_IN_CONTACT",
|
|
"normalized_name": norm,
|
|
"contact_name": cs[0].get("name"),
|
|
"contact_dataType": cs[0].get("dataType"),
|
|
"details": "Campo presente solo en contact; sin contraparte en opportunity.",
|
|
})
|
|
continue
|
|
if os_ and not cs:
|
|
findings.append({
|
|
"category": "ONLY_IN_OPPORTUNITY",
|
|
"normalized_name": norm,
|
|
"opportunity_name": os_[0].get("name"),
|
|
"opportunity_dataType": os_[0].get("dataType"),
|
|
"details": "Campo presente solo en opportunity; sin contraparte en contact.",
|
|
})
|
|
continue
|
|
|
|
# Hay match. Como puede haber duplicados intra-objeto, comparamos pares.
|
|
# En la práctica esperamos len==1 en ambos lados.
|
|
for c in cs:
|
|
for o in os_:
|
|
base = {
|
|
"normalized_name": norm,
|
|
"contact_field_name": c.get("name"),
|
|
"opportunity_field_name": o.get("name"),
|
|
"contact_field_id": c.get("id"),
|
|
"opportunity_field_id": o.get("id"),
|
|
"contact_fieldKey": c.get("fieldKey"),
|
|
"opportunity_fieldKey": o.get("fieldKey"),
|
|
}
|
|
|
|
# Casing del nombre del field
|
|
if c.get("name") != o.get("name"):
|
|
findings.append({
|
|
**base,
|
|
"category": "NAME_CASE_MISMATCH",
|
|
"details": (
|
|
f"Nombre del campo difiere entre objetos: "
|
|
f"contact={c.get('name')!r}, opportunity={o.get('name')!r}."
|
|
),
|
|
})
|
|
|
|
# dataType
|
|
if c.get("dataType") != o.get("dataType"):
|
|
findings.append({
|
|
**base,
|
|
"category": "DATATYPE_MISMATCH",
|
|
"contact_dataType": c.get("dataType"),
|
|
"opportunity_dataType": o.get("dataType"),
|
|
"details": (
|
|
f"dataType distinto: contact={c.get('dataType')}, "
|
|
f"opportunity={o.get('dataType')}. El mismo concepto "
|
|
"se está modelando con tipos distintos."
|
|
),
|
|
})
|
|
|
|
# Opciones: solo si ambos tienen tipo picklist
|
|
if (
|
|
c.get("dataType") in PICKLIST_TYPES
|
|
and o.get("dataType") in PICKLIST_TYPES
|
|
):
|
|
c_labels = extract_option_labels(c)
|
|
o_labels = extract_option_labels(o)
|
|
if c_labels or o_labels:
|
|
# convenio MAYÚSCULAS si la mayoría en cualquiera de los
|
|
# dos ya está en uppercase
|
|
uppercase = (
|
|
is_mostly_uppercase(c_labels)
|
|
or is_mostly_uppercase(o_labels)
|
|
)
|
|
for f in compare_options(c_labels, o_labels, c.get("name"), uppercase):
|
|
findings.append({**base, **f})
|
|
|
|
return findings
|
|
|
|
|
|
def write_json(report, path):
|
|
with open(path, "w", encoding="utf-8") as fh:
|
|
json.dump(report, fh, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def write_xlsx(report, path):
|
|
import openpyxl
|
|
from openpyxl.styles import Font, PatternFill, Alignment
|
|
|
|
wb = openpyxl.Workbook()
|
|
header_font = Font(bold=True, color="FFFFFF")
|
|
header_fill = PatternFill("solid", fgColor="305496")
|
|
severity_fills = {
|
|
"DATATYPE_MISMATCH": PatternFill("solid", fgColor="F8CBAD"),
|
|
"OPTIONS_MISSING_IN_OPPORTUNITY": PatternFill("solid", fgColor="FFE699"),
|
|
"OPTIONS_MISSING_IN_CONTACT": PatternFill("solid", fgColor="FFE699"),
|
|
"OPTIONS_CASE_MISMATCH": PatternFill("solid", fgColor="FFD966"),
|
|
"OPTIONS_NOT_UPPERCASE": PatternFill("solid", fgColor="C6E0B4"),
|
|
"NAME_CASE_MISMATCH": PatternFill("solid", fgColor="BDD7EE"),
|
|
}
|
|
|
|
def write_header(ws, headers):
|
|
ws.append(headers)
|
|
for cell in ws[1]:
|
|
cell.font = header_font
|
|
cell.fill = header_fill
|
|
cell.alignment = Alignment(horizontal="center")
|
|
ws.freeze_panes = "A2"
|
|
|
|
# Sheet 1: Resumen
|
|
ws_sum = wb.active
|
|
ws_sum.title = "Resumen"
|
|
write_header(ws_sum, [
|
|
"Cuenta", "Location ID", "Total hallazgos",
|
|
"Name case", "DataType", "Opts missing→opp", "Opts missing→contact",
|
|
"Opts case", "Opts not UPPER",
|
|
"Solo en contact", "Solo en opportunity",
|
|
])
|
|
for loc_id, loc_data in report["locations"].items():
|
|
findings = loc_data["findings"]
|
|
ws_sum.append([
|
|
loc_data["name"], loc_id, len(findings),
|
|
sum(1 for f in findings if f["category"] == "NAME_CASE_MISMATCH"),
|
|
sum(1 for f in findings if f["category"] == "DATATYPE_MISMATCH"),
|
|
sum(1 for f in findings if f["category"] == "OPTIONS_MISSING_IN_OPPORTUNITY"),
|
|
sum(1 for f in findings if f["category"] == "OPTIONS_MISSING_IN_CONTACT"),
|
|
sum(1 for f in findings if f["category"] == "OPTIONS_CASE_MISMATCH"),
|
|
sum(1 for f in findings if f["category"] == "OPTIONS_NOT_UPPERCASE"),
|
|
sum(1 for f in findings if f["category"] == "ONLY_IN_CONTACT"),
|
|
sum(1 for f in findings if f["category"] == "ONLY_IN_OPPORTUNITY"),
|
|
])
|
|
for col_letter in ["A", "B"]:
|
|
ws_sum.column_dimensions[col_letter].width = 32
|
|
|
|
# Sheet 2: Hallazgos críticos (excluye ONLY_IN_*)
|
|
ws_f = wb.create_sheet("Hallazgos")
|
|
write_header(ws_f, [
|
|
"Cuenta", "Location ID", "Categoría", "Campo (normalizado)",
|
|
"Nombre en contact", "Nombre en opportunity",
|
|
"dataType contact", "dataType opp",
|
|
"Labels / detalles",
|
|
])
|
|
critical = {
|
|
"NAME_CASE_MISMATCH", "DATATYPE_MISMATCH",
|
|
"OPTIONS_MISSING_IN_OPPORTUNITY", "OPTIONS_MISSING_IN_CONTACT",
|
|
"OPTIONS_CASE_MISMATCH", "OPTIONS_NOT_UPPERCASE",
|
|
}
|
|
for loc_id, loc_data in report["locations"].items():
|
|
for f in loc_data["findings"]:
|
|
if f["category"] not in critical:
|
|
continue
|
|
payload_bits = []
|
|
if f.get("labels"):
|
|
payload_bits.append("labels: " + "; ".join(map(str, f["labels"])))
|
|
if f.get("mismatches"):
|
|
for m in f["mismatches"]:
|
|
payload_bits.append(
|
|
f" {m['contact_variants']} ↔ {m['opportunity_variants']}"
|
|
+ (" (case-only)" if m.get("case_only") else "")
|
|
)
|
|
if f.get("contact_offenders") or f.get("opportunity_offenders"):
|
|
payload_bits.append("contact: " + "; ".join(map(str, f.get("contact_offenders") or [])))
|
|
payload_bits.append("opportunity: " + "; ".join(map(str, f.get("opportunity_offenders") or [])))
|
|
payload_bits.append(f.get("details", ""))
|
|
row = [
|
|
loc_data["name"], loc_id, f["category"],
|
|
f.get("normalized_name", ""),
|
|
f.get("contact_field_name", ""),
|
|
f.get("opportunity_field_name", ""),
|
|
f.get("contact_dataType", ""),
|
|
f.get("opportunity_dataType", ""),
|
|
" | ".join(b for b in payload_bits if b),
|
|
]
|
|
ws_f.append(row)
|
|
fill = severity_fills.get(f["category"])
|
|
if fill:
|
|
for cell in ws_f[ws_f.max_row]:
|
|
cell.fill = fill
|
|
for col_letter, width in [("A", 32), ("C", 28), ("D", 28), ("E", 30), ("F", 30), ("I", 80)]:
|
|
ws_f.column_dimensions[col_letter].width = width
|
|
|
|
# Sheet 3: Solo en uno (informativo)
|
|
ws_o = wb.create_sheet("Solo en un objeto")
|
|
write_header(ws_o, ["Cuenta", "Location ID", "Categoría", "Nombre", "dataType"])
|
|
for loc_id, loc_data in report["locations"].items():
|
|
for f in loc_data["findings"]:
|
|
if f["category"] in ("ONLY_IN_CONTACT", "ONLY_IN_OPPORTUNITY"):
|
|
ws_o.append([
|
|
loc_data["name"], loc_id, f["category"],
|
|
f.get("contact_name") or f.get("opportunity_name"),
|
|
f.get("contact_dataType") or f.get("opportunity_dataType"),
|
|
])
|
|
for col_letter, width in [("A", 32), ("D", 36)]:
|
|
ws_o.column_dimensions[col_letter].width = width
|
|
|
|
wb.save(path)
|
|
|
|
|
|
def select_targets(args, accounts):
|
|
if args.location:
|
|
matches = [a for a in accounts if a["location_id"] == args.location]
|
|
if not matches:
|
|
raise SystemExit(f"Location {args.location} no encontrada en el CSV.")
|
|
return matches
|
|
if args.all:
|
|
return accounts
|
|
if args.brand_only:
|
|
return [a for a in accounts if a["location_id"] == BRAND_LOCATION_ID]
|
|
raise SystemExit("Especifica --location <id>, --brand-only o --all.")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Audit cross-object: contact↔opportunity custom fields dentro de cada cuenta.",
|
|
)
|
|
parser.add_argument("--location", help="Procesar solo esta location (debugging).")
|
|
parser.add_argument("--brand-only", action="store_true",
|
|
help="Procesar solo la Marca Principal.")
|
|
parser.add_argument("--all", action="store_true",
|
|
help="Procesar Marca + todas las sucursales.")
|
|
parser.add_argument("--json", help="Ruta de salida JSON (opcional).")
|
|
parser.add_argument("--xlsx", help="Ruta de salida XLSX (opcional).")
|
|
parser.add_argument("--run-id", help="Audit run ID (dashboard).")
|
|
args = parser.parse_args()
|
|
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
|
|
accounts = sync_engine.parse_accounts_csv()
|
|
targets = select_targets(args, accounts)
|
|
|
|
report = {
|
|
"run_id": args.run_id,
|
|
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
"locations": {},
|
|
}
|
|
|
|
for acc in targets:
|
|
if not script_audit.wait_if_paused_or_stopped(args.run_id):
|
|
print("\nDetención solicitada. Saliendo.")
|
|
break
|
|
loc_id = acc["location_id"]
|
|
loc_name = acc["nombre"]
|
|
print(f"\n[LOCATION] {loc_name} ({loc_id})")
|
|
try:
|
|
c_fields = fetch_custom_fields(acc, "contact")
|
|
o_fields = fetch_custom_fields(acc, "opportunity")
|
|
print(f" contact={len(c_fields)} cf, opportunity={len(o_fields)} cf")
|
|
findings = audit_location(loc_name, loc_id, c_fields, o_fields)
|
|
except Exception as exc:
|
|
print(f" ERROR: {exc}")
|
|
findings = [{"category": "FETCH_ERROR", "details": str(exc)}]
|
|
|
|
report["locations"][loc_id] = {"name": loc_name, "findings": findings}
|
|
cats = defaultdict(int)
|
|
for f in findings:
|
|
cats[f["category"]] += 1
|
|
if cats:
|
|
for cat, count in sorted(cats.items()):
|
|
print(f" {cat}: {count}")
|
|
else:
|
|
print(" sin discrepancias")
|
|
|
|
from common import REPORT_AUDIT_CUSTOM_FIELDS
|
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
json_path = args.json or os.path.join(REPORT_AUDIT_CUSTOM_FIELDS, f"audit_cross_object_{ts}.json")
|
|
xlsx_path = args.xlsx or os.path.join(REPORT_AUDIT_CUSTOM_FIELDS, f"audit_cross_object_{ts}.xlsx")
|
|
write_json(report, json_path)
|
|
write_xlsx(report, xlsx_path)
|
|
|
|
total = sum(len(loc["findings"]) for loc in report["locations"].values())
|
|
critical_total = sum(
|
|
1 for loc in report["locations"].values() for f in loc["findings"]
|
|
if f["category"] not in ("ONLY_IN_CONTACT", "ONLY_IN_OPPORTUNITY")
|
|
)
|
|
print(f"\n{'=' * 60}")
|
|
print(f"Total hallazgos: {total} ({critical_total} críticos) en {len(report['locations'])} cuentas")
|
|
print(f"JSON: {json_path}")
|
|
print(f"XLSX: {xlsx_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|