343 lines
13 KiB
Python
343 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Reporte read-only: cuantifica que campos pierde hoy sync_contacts_branch_to_brand.
|
|
|
|
Compara, por sucursal, el esquema de contactos de la sucursal contra el esquema
|
|
de la Marca Principal, y samplea contactos vivos para contar cuantos registros
|
|
tienen valor poblado en cada categoria:
|
|
|
|
- campos estandar que el sync ya copia (firstName/lastName/email/phone)
|
|
- campos estandar que el sync NO copia (tags, source, address, dateOfBirth, ...)
|
|
- custom fields con match literal entre sucursal y Marca (sync los copia hoy)
|
|
- custom fields con match solo por alias/normalizacion (sync los PIERDE hoy)
|
|
- custom fields sin contraparte en Marca (sync no puede copiarlos)
|
|
|
|
No escribe en GHL. No depende de mp_manager.sqlite (consulta live por API).
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sys
|
|
import unicodedata
|
|
from collections import defaultdict
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
|
|
import sync_engine # noqa: E402
|
|
from common import BRAND_LOCATION_ID, FIELD_ALIASES # noqa: E402
|
|
|
|
|
|
STANDARD_FIELDS_SYNCED_TODAY = ["firstName", "lastName", "email", "phone"]
|
|
STANDARD_FIELDS_DROPPED_TODAY = [
|
|
"name", "address1", "city", "state", "country", "postalCode",
|
|
"dateOfBirth", "companyName", "website", "timezone",
|
|
"source", "type", "assignedTo", "dnd",
|
|
]
|
|
LIST_FIELDS_DROPPED_TODAY = ["tags", "additionalEmails", "additionalPhones"]
|
|
|
|
CUSTOM_VALUE_KEYS = (
|
|
"value", "fieldValueString", "fieldValueDate",
|
|
"fieldValueNumber", "fieldValueArray",
|
|
"fieldValueOptions", "fieldValueFile",
|
|
)
|
|
|
|
|
|
def norm_field_name(name):
|
|
text = unicodedata.normalize("NFKD", str(name or "").strip().lower())
|
|
text = "".join(ch for ch in text if not unicodedata.combining(ch))
|
|
return " ".join(text.split())
|
|
|
|
|
|
def build_alias_lookup():
|
|
"""Devuelve dict normalizado -> nombre canonico para los aliases del manual."""
|
|
lookup = {}
|
|
for canonical, variants in FIELD_ALIASES.items():
|
|
for variant in variants:
|
|
lookup[norm_field_name(variant)] = canonical
|
|
return lookup
|
|
|
|
|
|
def classify_custom_fields(branch_schema, brand_schema, alias_lookup):
|
|
"""Para cada campo de la sucursal, decide en que bucket de cobertura cae."""
|
|
brand_norm = {norm_field_name(name): name for name in brand_schema}
|
|
brand_alias_keys = {alias_lookup.get(norm_field_name(name)) for name in brand_schema}
|
|
brand_alias_keys.discard(None)
|
|
|
|
classification = {}
|
|
for field_name in branch_schema:
|
|
if field_name in brand_schema:
|
|
classification[field_name] = ("exact_match", field_name)
|
|
continue
|
|
norm = norm_field_name(field_name)
|
|
if norm in brand_norm:
|
|
classification[field_name] = ("alias_match_only", brand_norm[norm])
|
|
continue
|
|
canonical = alias_lookup.get(norm)
|
|
if canonical and canonical in brand_alias_keys:
|
|
for brand_name in brand_schema:
|
|
if alias_lookup.get(norm_field_name(brand_name)) == canonical:
|
|
classification[field_name] = ("alias_match_only", brand_name)
|
|
break
|
|
continue
|
|
classification[field_name] = ("no_match", None)
|
|
return classification
|
|
|
|
|
|
def custom_field_has_value(field):
|
|
for key in CUSTOM_VALUE_KEYS:
|
|
if key not in field:
|
|
continue
|
|
value = field[key]
|
|
if value is None:
|
|
continue
|
|
if isinstance(value, str) and not value.strip():
|
|
continue
|
|
if isinstance(value, (list, dict)) and not value:
|
|
continue
|
|
return True
|
|
return False
|
|
|
|
|
|
def standard_field_has_value(contact, field_name):
|
|
value = contact.get(field_name)
|
|
if value is None:
|
|
return False
|
|
if isinstance(value, str):
|
|
return bool(value.strip())
|
|
if isinstance(value, (list, dict)):
|
|
return bool(value)
|
|
return True
|
|
|
|
|
|
def scan_branch(account, brand_schema, alias_lookup, max_contacts):
|
|
location_id = account["location_id"]
|
|
token = account["token"]
|
|
branch_name = account.get("nombre") or location_id
|
|
|
|
print(f"\n--- {branch_name} ({location_id}) ---")
|
|
branch_schema = sync_engine.ghl_client.get_object_schema(token, location_id, "contact")
|
|
if not branch_schema:
|
|
print(" WARN: no se pudo leer el schema de contacto de la sucursal.")
|
|
return None
|
|
|
|
classification = classify_custom_fields(branch_schema, brand_schema, alias_lookup)
|
|
branch_id_to_name = {fid: name for name, fid in branch_schema.items()}
|
|
|
|
contacts = sync_engine.ghl_client.get_all_contacts(token, location_id, max_contacts=max_contacts)
|
|
total_contacts = len(contacts)
|
|
print(f" Contactos sampleados: {total_contacts}")
|
|
|
|
standard_synced_hits = defaultdict(int)
|
|
standard_dropped_hits = defaultdict(int)
|
|
list_dropped_hits = defaultdict(int)
|
|
custom_hits = defaultdict(int)
|
|
|
|
for contact in contacts:
|
|
for fname in STANDARD_FIELDS_SYNCED_TODAY:
|
|
if standard_field_has_value(contact, fname):
|
|
standard_synced_hits[fname] += 1
|
|
for fname in STANDARD_FIELDS_DROPPED_TODAY:
|
|
if standard_field_has_value(contact, fname):
|
|
standard_dropped_hits[fname] += 1
|
|
for fname in LIST_FIELDS_DROPPED_TODAY:
|
|
value = contact.get(fname)
|
|
if isinstance(value, list) and value:
|
|
list_dropped_hits[fname] += 1
|
|
|
|
for field in contact.get("customFields", []) or []:
|
|
if not isinstance(field, dict):
|
|
continue
|
|
fid = field.get("id") or field.get("fieldId")
|
|
name = branch_id_to_name.get(fid)
|
|
if not name:
|
|
continue
|
|
if custom_field_has_value(field):
|
|
custom_hits[name] += 1
|
|
|
|
buckets = {"exact_match": [], "alias_match_only": [], "no_match": []}
|
|
for field_name, (bucket, brand_name) in classification.items():
|
|
buckets[bucket].append({
|
|
"branch_field": field_name,
|
|
"brand_field": brand_name,
|
|
"contacts_with_value": custom_hits.get(field_name, 0),
|
|
})
|
|
|
|
for bucket in buckets:
|
|
buckets[bucket].sort(key=lambda row: row["contacts_with_value"], reverse=True)
|
|
|
|
print(f" Custom fields: {len(buckets['exact_match'])} match exacto, "
|
|
f"{len(buckets['alias_match_only'])} solo por alias (perdidos hoy), "
|
|
f"{len(buckets['no_match'])} sin contraparte en Marca.")
|
|
|
|
if buckets["alias_match_only"]:
|
|
print(" Campos perdidos por mismatch de nombre (top 5 con datos):")
|
|
for row in buckets["alias_match_only"][:5]:
|
|
print(f" - {row['branch_field']!r} -> Marca {row['brand_field']!r} "
|
|
f"({row['contacts_with_value']}/{total_contacts} contactos con valor)")
|
|
|
|
if buckets["no_match"]:
|
|
top_no_match = [r for r in buckets["no_match"] if r["contacts_with_value"]][:5]
|
|
if top_no_match:
|
|
print(" Campos sin contraparte en Marca (top 5 con datos):")
|
|
for row in top_no_match:
|
|
print(f" - {row['branch_field']!r} "
|
|
f"({row['contacts_with_value']}/{total_contacts} contactos con valor)")
|
|
|
|
standard_dropped_with_data = {k: v for k, v in standard_dropped_hits.items() if v}
|
|
list_dropped_with_data = {k: v for k, v in list_dropped_hits.items() if v}
|
|
if standard_dropped_with_data or list_dropped_with_data:
|
|
print(" Estandar/lista no sincronizados que SI tienen datos:")
|
|
for k, v in sorted(standard_dropped_with_data.items(), key=lambda x: x[1], reverse=True):
|
|
print(f" - {k}: {v}/{total_contacts}")
|
|
for k, v in sorted(list_dropped_with_data.items(), key=lambda x: x[1], reverse=True):
|
|
print(f" - {k} (lista): {v}/{total_contacts}")
|
|
|
|
return {
|
|
"location_id": location_id,
|
|
"branch_name": branch_name,
|
|
"total_contacts": total_contacts,
|
|
"standard_synced_hits": dict(standard_synced_hits),
|
|
"standard_dropped_hits": dict(standard_dropped_hits),
|
|
"list_dropped_hits": dict(list_dropped_hits),
|
|
"custom_field_buckets": buckets,
|
|
}
|
|
|
|
|
|
def aggregate(report_rows):
|
|
agg_standard_dropped = defaultdict(int)
|
|
agg_list_dropped = defaultdict(int)
|
|
agg_alias_only = defaultdict(lambda: {"contacts": 0, "branches": 0, "brand_field": None})
|
|
agg_no_match = defaultdict(lambda: {"contacts": 0, "branches": 0})
|
|
total_contacts = 0
|
|
|
|
for row in report_rows:
|
|
if not row:
|
|
continue
|
|
total_contacts += row["total_contacts"]
|
|
for k, v in row["standard_dropped_hits"].items():
|
|
agg_standard_dropped[k] += v
|
|
for k, v in row["list_dropped_hits"].items():
|
|
agg_list_dropped[k] += v
|
|
for entry in row["custom_field_buckets"]["alias_match_only"]:
|
|
key = entry["branch_field"]
|
|
agg_alias_only[key]["contacts"] += entry["contacts_with_value"]
|
|
agg_alias_only[key]["branches"] += 1
|
|
agg_alias_only[key]["brand_field"] = entry["brand_field"]
|
|
for entry in row["custom_field_buckets"]["no_match"]:
|
|
if entry["contacts_with_value"]:
|
|
key = entry["branch_field"]
|
|
agg_no_match[key]["contacts"] += entry["contacts_with_value"]
|
|
agg_no_match[key]["branches"] += 1
|
|
|
|
return {
|
|
"total_contacts": total_contacts,
|
|
"standard_dropped": dict(agg_standard_dropped),
|
|
"list_dropped": dict(agg_list_dropped),
|
|
"alias_only": dict(agg_alias_only),
|
|
"no_match": dict(agg_no_match),
|
|
}
|
|
|
|
|
|
def print_global(summary):
|
|
print("\n" + "=" * 78)
|
|
print("RESUMEN GLOBAL")
|
|
print("=" * 78)
|
|
print(f"Contactos sampleados (suma): {summary['total_contacts']}")
|
|
|
|
standard = sorted(summary["standard_dropped"].items(), key=lambda x: x[1], reverse=True)
|
|
if standard:
|
|
print("\nCampos estandar perdidos hoy por el sync (contactos con valor, total bulk):")
|
|
for name, count in standard:
|
|
if count:
|
|
print(f" - {name}: {count}")
|
|
|
|
lists = sorted(summary["list_dropped"].items(), key=lambda x: x[1], reverse=True)
|
|
if lists:
|
|
print("\nCampos de lista perdidos hoy (tags/emails/phones secundarios):")
|
|
for name, count in lists:
|
|
if count:
|
|
print(f" - {name}: {count}")
|
|
|
|
alias = sorted(summary["alias_only"].items(), key=lambda x: x[1]["contacts"], reverse=True)
|
|
if alias:
|
|
print("\nCustom fields perdidos por mismatch de nombre (arreglables con aliases):")
|
|
for branch_field, info in alias[:20]:
|
|
print(f" - {branch_field!r} -> Marca {info['brand_field']!r}: "
|
|
f"{info['contacts']} contactos en {info['branches']} sucursal(es)")
|
|
|
|
no_match = sorted(summary["no_match"].items(), key=lambda x: x[1]["contacts"], reverse=True)
|
|
if no_match:
|
|
print("\nCustom fields sin contraparte en Marca (requieren crearlos en Marca primero):")
|
|
for branch_field, info in no_match[:20]:
|
|
print(f" - {branch_field!r}: {info['contacts']} contactos en {info['branches']} sucursal(es)")
|
|
|
|
|
|
def select_accounts(args, accounts):
|
|
if args.location:
|
|
match = [a for a in accounts if a["location_id"] == args.location]
|
|
if not match:
|
|
raise SystemExit(f"Location {args.location} no existe en el CSV de tokens")
|
|
return match
|
|
return [a for a in accounts
|
|
if a["location_id"] != BRAND_LOCATION_ID
|
|
and "demo" not in (a.get("nombre") or "").lower()]
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
|
|
parser.add_argument("--location", help="Auditar una sola sucursal por location_id")
|
|
parser.add_argument("--max-contacts", type=int, default=500,
|
|
help="Contactos a samplear por sucursal. Default 500. Subir para mayor precision.")
|
|
parser.add_argument("--json", dest="json_path",
|
|
help="Ruta opcional para volcar el reporte completo en JSON.")
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
|
|
args = parse_args()
|
|
accounts = sync_engine.parse_accounts_csv()
|
|
brand_account = next((a for a in accounts if a["location_id"] == BRAND_LOCATION_ID), None)
|
|
if not brand_account:
|
|
raise SystemExit("No se encontro la cuenta de Marca Principal en el CSV de tokens")
|
|
|
|
branches = select_accounts(args, accounts)
|
|
|
|
print("=" * 78)
|
|
print("AUDIT: COBERTURA DE SINCRONIZACION DE CONTACTOS SUCURSAL -> MARCA")
|
|
print("=" * 78)
|
|
print(f"Sucursales a auditar: {len(branches)}")
|
|
print(f"Sample por sucursal: {args.max_contacts} contactos")
|
|
|
|
print("\nCargando schema de contactos de Marca...")
|
|
brand_schema = sync_engine.ghl_client.get_object_schema(
|
|
brand_account["token"], BRAND_LOCATION_ID, "contact"
|
|
)
|
|
if not brand_schema:
|
|
raise SystemExit("No se pudo leer el schema de Marca; abortando.")
|
|
print(f"Schema de Marca: {len(brand_schema)} campos custom mapeados por nombre.")
|
|
|
|
alias_lookup = build_alias_lookup()
|
|
|
|
rows = []
|
|
for account in branches:
|
|
rows.append(scan_branch(account, brand_schema, alias_lookup, args.max_contacts))
|
|
|
|
summary = aggregate(rows)
|
|
print_global(summary)
|
|
|
|
if args.json_path:
|
|
payload = {"per_branch": rows, "summary": summary}
|
|
with open(args.json_path, "w", encoding="utf-8") as fh:
|
|
json.dump(payload, fh, ensure_ascii=False, indent=2)
|
|
print(f"\nReporte JSON volcado en: {args.json_path}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|