Files
MP-Manager/scripts/audit_custom_fields_schema.py
2026-05-30 14:31:19 -06:00

538 lines
22 KiB
Python

#!/usr/bin/env python3
"""Audit de campos personalizados Marca Principal (Monte Providencia) vs sucursales.
Compara el schema de custom fields de contact + opportunity entre la cuenta de
Marca (referencia) y cada sucursal. Emite hallazgos categorizados con una
acción sugerida (fix_action) para informar reparación posterior.
Output: JSON detallado + XLSX para revisión humana.
Categorías de hallazgo:
MISSING_IN_BRANCH — existe en Marca, no en sucursal
EXTRA_IN_BRANCH — existe en sucursal, no en Marca
MATCH_FIELDKEY_DIFF_NAME — mismo fieldKey, nombre distinto
MATCH_FIELDKEY_DIFF_OPTIONS — mismas keys, opciones distintas
MATCH_FIELDKEY_DIFF_DATATYPE — mismo fieldKey, dataType distinto (crítico)
MATCH_NAME_DIFF_FIELDKEY — mismo nombre normalizado, fieldKeys distintos
DUPLICATE_IN_BRANCH — sucursal tiene varios campos con el mismo fieldKey/nombre
DUPLICATE_IN_BRAND — Marca tiene duplicados (anomalía a revisar)
FUZZY_NAME_MATCH — nombre similar (>= threshold) sin match exacto
fix_action posibles: CREATE_IN_BRANCH, RENAME, ADD_OPTIONS, MANUAL_REVIEW,
IMPOSSIBLE (cambio de dataType requiere delete+recreate).
"""
import argparse
import datetime
import difflib
import json
import os
import sys
import unicodedata
import warnings
from collections import defaultdict
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
import sync_engine # noqa: E402
import script_audit # noqa: E402
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
DEFAULT_FUZZY_THRESHOLD = 0.85
def normalize_name(value):
s = str(value or "").strip().lower()
s = unicodedata.normalize("NFKD", s)
s = "".join(c for c in s if not unicodedata.combining(c))
return " ".join(s.split())
def fuzzy_ratio(a, b):
na, nb = normalize_name(a), normalize_name(b)
if not na or not nb:
return 0.0
return difflib.SequenceMatcher(None, na, nb).ratio()
def fetch_custom_fields(account, object_key):
"""Devuelve solo custom fields (no standard) del object schema."""
raw = sync_engine.ghl_client.get_object_schema_fields(
account["token"], account["location_id"], object_key
)
return [
f for f in (raw or [])
if f.get("id") and f.get("name")
and not f.get("standard")
and f.get("dataType") != "STANDARD_FIELD"
]
def index_by(fields, key_fn):
idx = defaultdict(list)
for f in fields:
k = key_fn(f)
if k:
idx[k].append(f)
return idx
def extract_option_labels(field):
"""Devuelve la lista de labels de un campo picklist.
El API V2 del schema expone `options: [{key, label}, ...]`. Algunos clientes
antiguos usaban `picklistOptions: [str]` — mantenemos compat por si reaparece.
"""
raw = field.get("options")
if isinstance(raw, list) and raw:
labels = []
for opt in raw:
if isinstance(opt, dict):
label = opt.get("label")
if label is None:
label = opt.get("key")
if label is not None:
labels.append(label)
elif opt is not None:
labels.append(str(opt))
return labels
legacy = field.get("picklistOptions")
if isinstance(legacy, list):
return [str(o) for o in legacy if o is not None]
return []
def options_diff(brand_opts, branch_opts):
brand_set = set(brand_opts or [])
branch_set = set(branch_opts or [])
return sorted(brand_set - branch_set), sorted(branch_set - brand_set)
def compare_matched_pair(brand_field, branch_field, object_key, fieldkey_hint=None):
"""Compara dos campos que ya sabemos son el "mismo" lógicamente.
Emite findings por diferencias atributo-a-atributo.
`fieldkey_hint` se pasa cuando matcheamos por fieldKey (Capa 1) — en ese
caso una diferencia de nombre justifica un RENAME. Si es None (Capa 2,
matched por nombre), las fieldKeys son distintas y RENAME no aplica.
"""
findings = []
brand_name = brand_field.get("name")
branch_name = branch_field.get("name")
base = {
"object": object_key,
"field_name": brand_name,
"brand_field_id": brand_field.get("id"),
"branch_field_id": branch_field.get("id"),
"fieldKey": fieldkey_hint or brand_field.get("fieldKey"),
}
if brand_field.get("dataType") != branch_field.get("dataType"):
findings.append({
**base,
"category": "MATCH_FIELDKEY_DIFF_DATATYPE",
"fix_action": "IMPOSSIBLE",
"brand_dataType": brand_field.get("dataType"),
"branch_dataType": branch_field.get("dataType"),
"details": (
f"dataType {brand_field.get('dataType')} en Marca vs "
f"{branch_field.get('dataType')} en sucursal. La API ignora "
"cambios de dataType — requiere delete+recreate (data loss)."
),
})
return findings # Si dataType difiere, no tiene sentido comparar options
if brand_name != branch_name and fieldkey_hint is not None:
# Solo emitir RENAME cuando matcheamos por fieldKey — en ese caso sí
# tiene sentido alinear el name. Si fieldkey_hint es None matcheamos
# por nombre normalizado, las fieldKeys son distintas, y el finding
# parent MATCH_NAME_DIFF_FIELDKEY ya cubre el caso.
findings.append({
**base,
"category": "MATCH_FIELDKEY_DIFF_NAME",
"fix_action": "RENAME",
"brand_name": brand_name,
"branch_name": branch_name,
"details": f"Nombres distintos. Mismo fieldKey {fieldkey_hint!r}.",
})
if brand_field.get("dataType") in ("SINGLE_OPTIONS", "MULTIPLE_OPTIONS", "RADIO", "CHECKBOX"):
missing, extra = options_diff(
extract_option_labels(brand_field),
extract_option_labels(branch_field),
)
if missing or extra:
fix = "ADD_OPTIONS" if missing and not extra else "MANUAL_REVIEW"
findings.append({
**base,
"category": "MATCH_FIELDKEY_DIFF_OPTIONS",
"fix_action": fix,
"options_missing_in_branch": missing,
"options_extra_in_branch": extra,
"details": (
f"{len(missing)} opciones faltan en sucursal, "
f"{len(extra)} extra en sucursal."
),
})
return findings
def audit_object(brand_fields, branch_fields, object_key, fuzzy_threshold):
"""Compara los dos schemas. Estrategia jerárquica de matching:
1. fieldKey exacto
2. nombre normalizado exacto
3. fuzzy match (>= threshold)
"""
findings = []
matched_brand_ids = set()
matched_branch_ids = set()
brand_by_fk = index_by(brand_fields, lambda f: f.get("fieldKey"))
branch_by_fk = index_by(branch_fields, lambda f: f.get("fieldKey"))
# --- Capa 1: match por fieldKey ---
for fk, brand_list in brand_by_fk.items():
if len(brand_list) > 1:
findings.append({
"object": object_key,
"category": "DUPLICATE_IN_BRAND",
"fix_action": "MANUAL_REVIEW",
"field_name": brand_list[0].get("name"),
"fieldKey": fk,
"brand_field_ids": [f.get("id") for f in brand_list],
"details": f"Marca tiene {len(brand_list)} campos con el mismo fieldKey.",
})
if fk not in branch_by_fk:
continue
branch_list = branch_by_fk[fk]
if len(branch_list) > 1:
findings.append({
"object": object_key,
"category": "DUPLICATE_IN_BRANCH",
"fix_action": "MANUAL_REVIEW",
"field_name": branch_list[0].get("name"),
"fieldKey": fk,
"branch_field_ids": [f.get("id") for f in branch_list],
"details": f"Sucursal tiene {len(branch_list)} campos con el mismo fieldKey.",
})
brand_f = brand_list[0]
for branch_f in branch_list:
matched_brand_ids.add(brand_f["id"])
matched_branch_ids.add(branch_f["id"])
findings.extend(compare_matched_pair(brand_f, branch_f, object_key, fk))
# --- Capa 2: match por nombre normalizado entre los aún no matcheados ---
brand_unmatched_by_norm = index_by(
[f for f in brand_fields if f["id"] not in matched_brand_ids],
lambda f: normalize_name(f.get("name")),
)
branch_unmatched_by_norm = index_by(
[f for f in branch_fields if f["id"] not in matched_branch_ids],
lambda f: normalize_name(f.get("name")),
)
for norm_name, brand_list in brand_unmatched_by_norm.items():
if norm_name not in branch_unmatched_by_norm:
continue
branch_list = branch_unmatched_by_norm[norm_name]
brand_f = brand_list[0]
for branch_f in branch_list:
matched_brand_ids.add(brand_f["id"])
matched_branch_ids.add(branch_f["id"])
findings.append({
"object": object_key,
"category": "MATCH_NAME_DIFF_FIELDKEY",
"fix_action": "MANUAL_REVIEW",
"field_name": brand_f.get("name"),
"branch_name": branch_f.get("name"),
"brand_fieldKey": brand_f.get("fieldKey"),
"branch_fieldKey": branch_f.get("fieldKey"),
"brand_field_id": brand_f["id"],
"branch_field_id": branch_f["id"],
"details": (
"Nombre normalizado idéntico pero fieldKeys distintos — "
"probablemente fueron creados independientemente en cada cuenta."
),
})
# También comparar dataType y options para emitir findings adicionales
findings.extend(compare_matched_pair(brand_f, branch_f, object_key, None))
# --- Capa 3a: MISSING_IN_BRANCH (Marca tiene, sucursal no) ---
for brand_f in brand_fields:
if brand_f["id"] in matched_brand_ids:
continue
findings.append({
"object": object_key,
"category": "MISSING_IN_BRANCH",
"fix_action": "CREATE_IN_BRANCH",
"field_name": brand_f.get("name"),
"fieldKey": brand_f.get("fieldKey"),
"dataType": brand_f.get("dataType"),
"options": extract_option_labels(brand_f),
"brand_field_id": brand_f["id"],
"details": "Campo presente en Marca, ausente en sucursal.",
})
# --- Capa 3b: EXTRA_IN_BRANCH y FUZZY_NAME_MATCH ---
for branch_f in branch_fields:
if branch_f["id"] in matched_branch_ids:
continue
# Buscar el mejor fuzzy match contra TODOS los fields de Marca (matcheados o no).
best_ratio = 0.0
best_brand = None
for brand_f in brand_fields:
r = fuzzy_ratio(branch_f.get("name"), brand_f.get("name"))
if r > best_ratio:
best_ratio = r
best_brand = brand_f
if best_brand and best_ratio >= fuzzy_threshold:
findings.append({
"object": object_key,
"category": "FUZZY_NAME_MATCH",
"fix_action": "MANUAL_REVIEW",
"field_name": branch_f.get("name"),
"brand_name": best_brand.get("name"),
"ratio": round(best_ratio, 3),
"brand_field_id": best_brand["id"],
"branch_field_id": branch_f["id"],
"brand_fieldKey": best_brand.get("fieldKey"),
"branch_fieldKey": branch_f.get("fieldKey"),
"details": (
f"Sucursal tiene campo similar al de Marca (ratio={round(best_ratio, 3)}). "
"Revisar si es el mismo campo con typo/case distinto."
),
})
else:
findings.append({
"object": object_key,
"category": "EXTRA_IN_BRANCH",
"fix_action": "MANUAL_REVIEW",
"field_name": branch_f.get("name"),
"fieldKey": branch_f.get("fieldKey"),
"dataType": branch_f.get("dataType"),
"branch_field_id": branch_f["id"],
"details": "Campo presente en sucursal, ausente en Marca.",
})
return findings
def write_json(report, path):
with open(path, "w", encoding="utf-8") as fh:
json.dump(report, fh, ensure_ascii=False, indent=2)
def write_xlsx(report, path):
import openpyxl
from openpyxl.styles import Font, PatternFill, Alignment
wb = openpyxl.Workbook()
header_font = Font(bold=True, color="FFFFFF")
header_fill = PatternFill("solid", fgColor="305496")
severity_fills = {
"IMPOSSIBLE": PatternFill("solid", fgColor="F8CBAD"),
"MANUAL_REVIEW": PatternFill("solid", fgColor="FFE699"),
"RENAME": PatternFill("solid", fgColor="C6E0B4"),
"ADD_OPTIONS": PatternFill("solid", fgColor="C6E0B4"),
"CREATE_IN_BRANCH": PatternFill("solid", fgColor="BDD7EE"),
}
def write_header(ws, headers):
ws.append(headers)
for cell in ws[1]:
cell.font = header_font
cell.fill = header_fill
cell.alignment = Alignment(horizontal="center")
ws.freeze_panes = "A2"
# Sheet 1: Summary
ws_sum = wb.active
ws_sum.title = "Resumen"
write_header(ws_sum, [
"Sucursal", "Location ID", "Objeto", "Total hallazgos",
"Missing en sucursal", "Extra en sucursal",
"Diff name", "Diff options", "Diff dataType",
"Match por nombre", "Fuzzy", "Duplicados",
])
for loc_id, loc_data in report["branches"].items():
for obj_key in ("contact", "opportunity"):
findings = [f for f in loc_data["findings"] if f["object"] == obj_key]
ws_sum.append([
loc_data["name"], loc_id, obj_key, len(findings),
sum(1 for f in findings if f["category"] == "MISSING_IN_BRANCH"),
sum(1 for f in findings if f["category"] == "EXTRA_IN_BRANCH"),
sum(1 for f in findings if f["category"] == "MATCH_FIELDKEY_DIFF_NAME"),
sum(1 for f in findings if f["category"] == "MATCH_FIELDKEY_DIFF_OPTIONS"),
sum(1 for f in findings if f["category"] == "MATCH_FIELDKEY_DIFF_DATATYPE"),
sum(1 for f in findings if f["category"] == "MATCH_NAME_DIFF_FIELDKEY"),
sum(1 for f in findings if f["category"] == "FUZZY_NAME_MATCH"),
sum(1 for f in findings if f["category"] in ("DUPLICATE_IN_BRANCH", "DUPLICATE_IN_BRAND")),
])
for col_letter in ["A", "B", "C"]:
ws_sum.column_dimensions[col_letter].width = 28
# Sheet 2: Findings detalle
ws_f = wb.create_sheet("Hallazgos")
write_header(ws_f, [
"Sucursal", "Location ID", "Objeto", "Categoría", "Fix Action",
"Field Name (Marca)", "Field Name (Sucursal)",
"Brand fieldKey", "Branch fieldKey",
"dataType", "Ratio fuzzy",
"Brand Field ID", "Branch Field ID", "Detalles",
])
for loc_id, loc_data in report["branches"].items():
for f in loc_data["findings"]:
row = [
loc_data["name"], loc_id, f.get("object"),
f.get("category"), f.get("fix_action"),
f.get("field_name") or f.get("brand_name") or "",
f.get("branch_name") or "",
f.get("fieldKey") or f.get("brand_fieldKey") or "",
f.get("branch_fieldKey") or "",
f.get("dataType") or f.get("brand_dataType") or "",
f.get("ratio") or "",
f.get("brand_field_id") or "",
f.get("branch_field_id") or "",
f.get("details") or "",
]
ws_f.append(row)
fill = severity_fills.get(f.get("fix_action"))
if fill:
for cell in ws_f[ws_f.max_row]:
cell.fill = fill
for col_letter, width in [("A", 28), ("D", 28), ("E", 18), ("F", 32), ("G", 32), ("N", 60)]:
ws_f.column_dimensions[col_letter].width = width
# Sheet 3: Detalle de opciones
ws_o = wb.create_sheet("Diff Options")
write_header(ws_o, [
"Sucursal", "Location ID", "Objeto", "Field Name",
"Opciones faltantes en sucursal", "Opciones extra en sucursal",
])
for loc_id, loc_data in report["branches"].items():
for f in loc_data["findings"]:
if f.get("category") == "MATCH_FIELDKEY_DIFF_OPTIONS":
ws_o.append([
loc_data["name"], loc_id, f.get("object"), f.get("field_name", ""),
"; ".join(f.get("options_missing_in_branch") or []),
"; ".join(f.get("options_extra_in_branch") or []),
])
for col_letter, width in [("A", 28), ("D", 32), ("E", 50), ("F", 50)]:
ws_o.column_dimensions[col_letter].width = width
wb.save(path)
def select_targets(args, accounts):
if args.location:
matches = [a for a in accounts if a["location_id"] == args.location]
if not matches:
raise SystemExit(f"Location {args.location} no encontrada en el CSV.")
return matches
if args.all:
return [a for a in accounts if a.get("type") == "branch"]
raise SystemExit("Especifica --location <id> o --all.")
def main():
parser = argparse.ArgumentParser(
description="Audit de campos personalizados: compara cada sucursal MP contra Marca Principal.",
)
parser.add_argument("--location", help="Procesar solo esta sucursal (debugging).")
parser.add_argument("--all", action="store_true",
help="Procesar todas las sucursales de MP.")
parser.add_argument("--object", choices=["contact", "opportunity", "both"],
default="both", help="Objeto a auditar. Default both.")
parser.add_argument("--fuzzy-threshold", type=float, default=DEFAULT_FUZZY_THRESHOLD,
help=f"Umbral SequenceMatcher para fuzzy match. Default {DEFAULT_FUZZY_THRESHOLD}.")
parser.add_argument("--json", help="Ruta de salida JSON (opcional, default audit_custom_fields_<ts>.json en cwd).")
parser.add_argument("--xlsx", help="Ruta de salida XLSX (opcional, default audit_custom_fields_<ts>.xlsx en cwd).")
parser.add_argument("--run-id", help="Audit run ID (dashboard).")
args = parser.parse_args()
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
accounts = sync_engine.parse_accounts_csv()
brand = next((a for a in accounts if a["location_id"] == BRAND_LOCATION_ID), None)
if not brand:
raise SystemExit(f"Cuenta de Marca {BRAND_LOCATION_ID} no encontrada en el CSV.")
targets = select_targets(args, accounts)
# Excluir la marca de los targets para no compararla consigo misma.
targets = [t for t in targets if t["location_id"] != BRAND_LOCATION_ID]
objects_to_audit = ["contact", "opportunity"] if args.object == "both" else [args.object]
print(f"[BRAND] {brand['nombre']} ({BRAND_LOCATION_ID})")
brand_fields = {}
for obj_key in objects_to_audit:
brand_fields[obj_key] = fetch_custom_fields(brand, obj_key)
print(f" {obj_key}: {len(brand_fields[obj_key])} custom fields")
report = {
"run_id": args.run_id,
"timestamp": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"fuzzy_threshold": args.fuzzy_threshold,
"brand": {
"location_id": BRAND_LOCATION_ID,
"name": brand["nombre"],
"field_counts": {k: len(v) for k, v in brand_fields.items()},
},
"branches": {},
}
for branch in targets:
if not script_audit.wait_if_paused_or_stopped(args.run_id):
print("\nDetención solicitada. Saliendo.")
break
print(f"\n[BRANCH] {branch['nombre']} ({branch['location_id']})")
all_findings = []
for obj_key in objects_to_audit:
try:
branch_cf = fetch_custom_fields(branch, obj_key)
print(f" {obj_key}: {len(branch_cf)} custom fields")
findings = audit_object(brand_fields[obj_key], branch_cf, obj_key, args.fuzzy_threshold)
all_findings.extend(findings)
except Exception as exc:
print(f" ERROR {obj_key}: {exc}")
all_findings.append({
"object": obj_key,
"category": "FETCH_ERROR",
"fix_action": "MANUAL_REVIEW",
"details": str(exc),
})
report["branches"][branch["location_id"]] = {
"name": branch["nombre"],
"findings": all_findings,
}
cats = defaultdict(int)
for f in all_findings:
cats[f["category"]] += 1
if cats:
for cat, count in sorted(cats.items()):
print(f" {cat}: {count}")
else:
print(f" sin discrepancias")
# Outputs
from common import REPORT_AUDIT_CUSTOM_FIELDS
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
json_path = args.json or os.path.join(REPORT_AUDIT_CUSTOM_FIELDS, f"audit_custom_fields_{ts}.json")
xlsx_path = args.xlsx or os.path.join(REPORT_AUDIT_CUSTOM_FIELDS, f"audit_custom_fields_{ts}.xlsx")
write_json(report, json_path)
write_xlsx(report, xlsx_path)
total_findings = sum(len(loc["findings"]) for loc in report["branches"].values())
print(f"\n{'=' * 60}")
print(f"Total hallazgos: {total_findings} en {len(report['branches'])} sucursales")
print(f"JSON: {json_path}")
print(f"XLSX: {xlsx_path}")
if __name__ == "__main__":
main()