#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ audit_brand_vs_branches_discrepancy.py Auditoría comparativa de contactos y oportunidades entre la cuenta de Marca Principal y todas las sucursales. Identifica las causas reales del gap entre contactos y oportunidades en Marca, separando: - Contactos en Marca sin opp que SI tienen opp en una sucursal (gap real de sync Sucursal -> Marca). - Contactos en Marca sin opp y sin contraparte en ninguna sucursal (leads digitales / imports directos, no es bug de sync). - Contactos en sucursal que NO existen en Marca (gap de sync de contactos). - Casos multi-opp donde la sucursal tiene 2+ opps del mismo contacto pero Marca solo replicó una (patrón de bug de sync). - Sucursales con anomalía de gap local (más contactos que opps). Es read-only: solo consulta `mp_manager.sqlite`. No toca GHL. Uso: python scripts/audit_brand_vs_branches_discrepancy.py python scripts/audit_brand_vs_branches_discrepancy.py --show-sync-gaps python scripts/audit_brand_vs_branches_discrepancy.py --show-multi-opp-gaps python scripts/audit_brand_vs_branches_discrepancy.py --show-unsynced-contacts python scripts/audit_brand_vs_branches_discrepancy.py --show-brand-only python scripts/audit_brand_vs_branches_discrepancy.py --all """ import argparse import os import re import sqlite3 import sys import unicodedata from collections import Counter, defaultdict ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from paths import DB_PATH BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3" # --------------------------------------------------------------------------- # Utilidades # --------------------------------------------------------------------------- def safe_print(*args, **kwargs): sep = kwargs.get("sep", " ") end = kwargs.get("end", "\n") text = sep.join(str(a) for a in args) encoding = sys.stdout.encoding or "utf-8" try: sys.stdout.write(text + end) sys.stdout.flush() except UnicodeEncodeError: sys.stdout.write(text.encode(encoding, errors="replace").decode(encoding) + end) sys.stdout.flush() def normalize_phone(phone, last_n=10): digits = re.sub(r"\D+", "", str(phone or "")) return digits[-last_n:] if len(digits) >= last_n else digits def normalize_email(email): return (str(email or "")).strip().lower() def normalize_text(text): if not text: return "" nfkd = unicodedata.normalize("NFD", str(text)) clean = "".join(c for c in nfkd if unicodedata.category(c) != "Mn") return " ".join(clean.lower().split()) def ascii_safe(text, width=None): out = str(text or "").encode("ascii", errors="replace").decode("ascii") if width is not None: out = out[:width] return out def fmt_contact_name(c): name = f"{c.get('first_name') or ''} {c.get('last_name') or ''}".strip() return name or "Sin nombre" def status_label(s): return (s or "open").upper() # --------------------------------------------------------------------------- # Carga # --------------------------------------------------------------------------- def load_accounts(conn): return { r["location_id"]: r["nombre"] for r in conn.execute("SELECT location_id, nombre FROM accounts").fetchall() } def load_brand_data(conn): contacts = [ dict(r) for r in conn.execute( "SELECT id, first_name, last_name, phone, email, source, tags FROM contacts WHERE location_id = ?", (BRAND_LOCATION_ID,), ).fetchall() ] opps = [ dict(r) for r in conn.execute( "SELECT id, contact_id, status, name, monetary_value FROM opportunities WHERE location_id = ?", (BRAND_LOCATION_ID,), ).fetchall() ] return contacts, opps def load_branch_data(conn, branch_ids): """Devuelve dict[loc_id] -> {'contacts': [...], 'opps': [...]}.""" out = {} for loc in branch_ids: contacts = [ dict(r) for r in conn.execute( "SELECT id, first_name, last_name, phone, email FROM contacts WHERE location_id = ?", (loc,), ).fetchall() ] opps = [ dict(r) for r in conn.execute( "SELECT id, contact_id, status, name FROM opportunities WHERE location_id = ?", (loc,), ).fetchall() ] out[loc] = {"contacts": contacts, "opps": opps} return out # --------------------------------------------------------------------------- # Indexado y match # --------------------------------------------------------------------------- def build_indexes(contacts): by_phone, by_email = defaultdict(list), defaultdict(list) for c in contacts: p = normalize_phone(c.get("phone")) e = normalize_email(c.get("email")) if p: by_phone[p].append(c) if e: by_email[e].append(c) return by_phone, by_email def find_matches(contact, idx_phone, idx_email): """Busca contactos en el indice que matcheen por telefono o email.""" p = normalize_phone(contact.get("phone")) e = normalize_email(contact.get("email")) seen, matches = set(), [] if p and p in idx_phone: for m in idx_phone[p]: if m["id"] not in seen: matches.append(m) seen.add(m["id"]) if e and e in idx_email: for m in idx_email[e]: if m["id"] not in seen: matches.append(m) seen.add(m["id"]) return matches # --------------------------------------------------------------------------- # Análisis principal # --------------------------------------------------------------------------- def analyze(conn): accounts = load_accounts(conn) branch_ids = [loc for loc in accounts if loc != BRAND_LOCATION_ID] brand_contacts, brand_opps = load_brand_data(conn) branch_data = load_branch_data(conn, branch_ids) # Indices de Marca brand_idx_phone, brand_idx_email = build_indexes(brand_contacts) # Opps Marca agrupadas por contact_id brand_opps_by_cid = defaultdict(list) for o in brand_opps: brand_opps_by_cid[o["contact_id"]].append(o) # ---- Análisis 1: clasificar contactos Marca sin opp ---- brand_no_opp_contacts = [ c for c in brand_contacts if not brand_opps_by_cid.get(c["id"]) ] sync_gaps = [] # contacto Marca sin opp, sucursal SI tiene opp legit_no_opp = [] # contacto Marca sin opp, sucursal tampoco tiene brand_only = [] # contacto Marca sin opp, sin contraparte en sucursal # Para clasificar contra sucursales, armamos indices globales de sucursal all_branch_contacts = [] for loc, data in branch_data.items(): for c in data["contacts"]: c_aug = dict(c) c_aug["_loc"] = loc opp_count = sum(1 for o in data["opps"] if o.get("contact_id") == c["id"]) c_aug["_opp_count"] = opp_count all_branch_contacts.append(c_aug) branch_idx_phone, branch_idx_email = build_indexes(all_branch_contacts) for c in brand_no_opp_contacts: matches = find_matches(c, branch_idx_phone, branch_idx_email) if not matches: brand_only.append(c) else: total_opps = sum(m.get("_opp_count", 0) for m in matches) if total_opps > 0: sync_gaps.append((c, matches)) else: legit_no_opp.append((c, matches)) # ---- Análisis 2: por sucursal, métricas y gaps de sync hacia Marca ---- per_branch = {} unsynced_contacts = [] # contactos en sucursal sin contraparte en Marca multi_opp_gaps = [] # contactos con multi-opp solo parcialmente replicados for loc, data in branch_data.items(): bc = data["contacts"] bo = data["opps"] bo_by_cid = defaultdict(list) for o in bo: bo_by_cid[o.get("contact_id")].append(o) n_c = len(bc) n_o = len(bo) n_no_opp_local = sum(1 for c in bc if not bo_by_cid.get(c["id"])) n_c_no_sync = 0 n_o_no_sync_here = 0 for c in bc: matches_marca = find_matches(c, brand_idx_phone, brand_idx_email) opps_in_branch = len(bo_by_cid.get(c["id"], [])) opps_in_brand = sum( len(brand_opps_by_cid.get(m["id"], [])) for m in matches_marca ) if not matches_marca: n_c_no_sync += 1 unsynced_contacts.append({ "loc": loc, "contact": c, "opps_in_branch": opps_in_branch, }) n_o_no_sync_here += opps_in_branch else: if opps_in_branch > opps_in_brand: diff = opps_in_branch - opps_in_brand n_o_no_sync_here += diff multi_opp_gaps.append({ "loc": loc, "branch_contact": c, "branch_opps": bo_by_cid.get(c["id"], []), "brand_opp_count": opps_in_brand, "diff": diff, }) per_branch[loc] = { "name": accounts.get(loc, loc), "n_contacts": n_c, "n_opps": n_o, "gap": n_c - n_o, "n_no_opp_local": n_no_opp_local, "n_unsynced_contacts": n_c_no_sync, "n_unsynced_opps": n_o_no_sync_here, } # ---- Conteos de opps Marca por estado y pipelines ---- status_count = Counter(status_label(o.get("status")) for o in brand_opps) return { "accounts": accounts, "brand_contacts_total": len(brand_contacts), "brand_opps_total": len(brand_opps), "brand_no_opp_total": len(brand_no_opp_contacts), "sync_gaps": sync_gaps, "legit_no_opp": legit_no_opp, "brand_only": brand_only, "per_branch": per_branch, "unsynced_contacts": unsynced_contacts, "multi_opp_gaps": multi_opp_gaps, "branch_totals": { "contacts": sum(b["n_contacts"] for b in per_branch.values()), "opps": sum(b["n_opps"] for b in per_branch.values()), }, "status_count": status_count, } # --------------------------------------------------------------------------- # Impresión # --------------------------------------------------------------------------- def print_overview(r): W = 72 safe_print("\n" + "=" * W) safe_print("DISCREPANCIA MARCA vs SUCURSALES | CUENTA MONTE PROVIDENCIA") safe_print("=" * W) bt = r["branch_totals"] safe_print(f"\n Contactos Marca: {r['brand_contacts_total']:>6}") safe_print(f" Opps Marca: {r['brand_opps_total']:>6}") safe_print(f" Gap (contactos - opps): {r['brand_contacts_total'] - r['brand_opps_total']:>+6}") safe_print("") safe_print(f" Contactos suma Sucursales: {bt['contacts']:>6}") safe_print(f" Opps suma Sucursales: {bt['opps']:>6}") safe_print(f" Diferencia Marca vs Sucursales: contactos {r['brand_contacts_total'] - bt['contacts']:>+5} / opps {r['brand_opps_total'] - bt['opps']:>+5}") safe_print(f"\n {'-' * 60}") safe_print(" CLASIFICACION de los contactos Marca sin opp:") safe_print(f" Total contactos Marca SIN opp: {r['brand_no_opp_total']:>5}") safe_print(f" a) Gap real de sync (opp en sucursal): {len(r['sync_gaps']):>5} <- corregible con reconcile_and_sync_opportunities") safe_print(f" b) Tampoco tiene opp en sucursal: {len(r['legit_no_opp']):>5} (problema operativo de sucursal)") safe_print(f" c) Sin contraparte en sucursal: {len(r['brand_only']):>5} (leads digitales / imports directos)") safe_print(f"\n {'-' * 60}") safe_print(" GAPS de sync desde sucursales hacia Marca:") safe_print(f" Contactos en sucursal NO en Marca: {len(r['unsynced_contacts']):>5}") safe_print(f" Casos multi-opp parcialmente replicados: {len(r['multi_opp_gaps']):>5} <- posible bug de sync") if r["status_count"]: safe_print(f"\n {'-' * 60}") safe_print(" Opps Marca por Estado:") for s, cnt in sorted(r["status_count"].items(), key=lambda x: -x[1]): bar = "#" * min(cnt, 40) safe_print(f" {s:<14} {cnt:>4} {bar}") safe_print("\n" + "=" * W + "\n") def print_branch_table(r): safe_print("=" * 120) safe_print("MÉTRICAS POR SUCURSAL") safe_print("=" * 120) safe_print( f"{'Sucursal':<38} {'C_suc':>6} {'O_suc':>6} {'gap':>5} {'noOpp':>6} {'C_noSync':>9} {'O_noSync':>9}" ) safe_print("-" * 120) branches = sorted(r["per_branch"].items(), key=lambda x: (x[1]["gap"] * -1, -x[1]["n_no_opp_local"])) for loc, b in branches: if b["n_contacts"] == 0 and b["n_opps"] == 0: continue short = ascii_safe(b["name"], 37) safe_print( f"{short:<38} {b['n_contacts']:>6} {b['n_opps']:>6} {b['gap']:>+5} " f"{b['n_no_opp_local']:>6} {b['n_unsynced_contacts']:>9} {b['n_unsynced_opps']:>9}" ) safe_print("-" * 120) bt = r["branch_totals"] n_unsync_c = sum(b["n_unsynced_contacts"] for b in r["per_branch"].values()) n_unsync_o = sum(b["n_unsynced_opps"] for b in r["per_branch"].values()) n_no_opp = sum(b["n_no_opp_local"] for b in r["per_branch"].values()) safe_print( f"{'TOTAL SUCURSALES':<38} {bt['contacts']:>6} {bt['opps']:>6} {bt['contacts']-bt['opps']:>+5} " f"{n_no_opp:>6} {n_unsync_c:>9} {n_unsync_o:>9}" ) safe_print("") safe_print(" Leyenda:") safe_print(" C_suc/O_suc = Contactos/Opps locales en la sucursal") safe_print(" gap = C_suc - O_suc (diferencia local)") safe_print(" noOpp = contactos en sucursal SIN opp local") safe_print(" C_noSync = contactos de sucursal que NO están en Marca") safe_print(" O_noSync = opps de sucursal que NO llegaron a Marca") safe_print("") def print_sync_gaps(sync_gaps, accounts, limit=200): safe_print("=" * 110) safe_print(f"GAPS REALES DE SYNC SUC->MARCA ({len(sync_gaps)} contactos)") safe_print(" Contacto está en Marca SIN opp, pero su sucursal SÍ tiene opp asociada.") safe_print("=" * 110) for i, (c, matches) in enumerate(sync_gaps, 1): name = fmt_contact_name(c) safe_print( f"\n {i:03d}. {name} | Tel: {c.get('phone') or '-'} | Email: {c.get('email') or '-'}" ) safe_print(f" ID Marca: {c['id']}") for m in matches: if m.get("_opp_count", 0) > 0: branch_name = ascii_safe(accounts.get(m["_loc"], m["_loc"])) safe_print( f" -> {branch_name}: opps={m['_opp_count']} (contact_id sucursal: {m['id']})" ) if i >= limit: safe_print(f"\n ... [{len(sync_gaps) - limit} casos más omitidos]") break safe_print("=" * 110 + "\n") def print_multi_opp_gaps(multi_opp_gaps, accounts, limit=200): safe_print("=" * 110) safe_print(f"MULTI-OPP PARCIALMENTE REPLICADOS ({len(multi_opp_gaps)} casos)") safe_print(" Sucursal tiene más opps del contacto que las que aparecen en Marca.") safe_print(" Patrón candidato a bug en sync_engine al replicar contactos con varias opps.") safe_print("=" * 110) for i, g in enumerate(multi_opp_gaps, 1): c = g["branch_contact"] name = fmt_contact_name(c) branch = ascii_safe(accounts.get(g["loc"], g["loc"])) safe_print( f"\n {i:03d}. [{branch}] {name} | suc_opps={len(g['branch_opps'])} marca_opps={g['brand_opp_count']} (faltan {g['diff']})" ) for o in g["branch_opps"]: nm = ascii_safe(o.get("name") or "Sin nombre", 50) safe_print(f" - [{status_label(o.get('status')):<10}] {nm:<50} (id: {o['id']})") if i >= limit: safe_print(f"\n ... [{len(multi_opp_gaps) - limit} casos más omitidos]") break safe_print("=" * 110 + "\n") def print_unsynced_contacts(unsynced_contacts, accounts, limit=200): safe_print("=" * 110) safe_print(f"CONTACTOS EN SUCURSAL QUE NO ESTÁN EN MARCA ({len(unsynced_contacts)} contactos)") safe_print("=" * 110) # Ordenar por con_opps primero (más críticos) y dentro por sucursal items = sorted(unsynced_contacts, key=lambda x: (-x["opps_in_branch"], x["loc"])) for i, item in enumerate(items, 1): c = item["contact"] name = fmt_contact_name(c) branch = ascii_safe(accounts.get(item["loc"], item["loc"])) warn = " <- TIENE OPPS" if item["opps_in_branch"] > 0 else "" safe_print( f" {i:03d}. [{branch:<28}] {name:<35} Tel: {c.get('phone') or '-':<18} opps={item['opps_in_branch']}{warn}" ) if i >= limit: safe_print(f" ... [{len(unsynced_contacts) - limit} más omitidos]") break safe_print("=" * 110 + "\n") def print_brand_only(brand_only, limit=200): safe_print("=" * 110) safe_print(f"CONTACTOS SOLO EN MARCA (sin opp, sin contraparte en sucursal) ({len(brand_only)})") safe_print(" Mayormente leads digitales (Formulario, Facebook) o imports manuales.") safe_print("=" * 110) src_counter = Counter() for c in brand_only: src = (c.get("source") or "SIN_SOURCE").strip() or "SIN_SOURCE" src_counter[src] += 1 safe_print("\n Distribución por source:") for src, cnt in src_counter.most_common(): safe_print(f" {src:<35} {cnt:>4}") safe_print(f"\n Listado (primeros {min(limit, len(brand_only))}):") for i, c in enumerate(sorted(brand_only, key=lambda x: normalize_text(fmt_contact_name(x))), 1): name = fmt_contact_name(c) safe_print( f" {i:03d}. {name:<40} Tel: {c.get('phone') or '-':<18} Email: {c.get('email') or '-'}" ) if i >= limit: safe_print(f" ... [{len(brand_only) - limit} más omitidos]") break safe_print("=" * 110 + "\n") # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Audita la discrepancia de contactos y opps entre Marca y sucursales." ) parser.add_argument("--show-sync-gaps", action="store_true", help="Detalla los contactos Marca sin opp que SÍ tienen opp en sucursal.") parser.add_argument("--show-multi-opp-gaps", action="store_true", help="Detalla casos donde la sucursal tiene multi-opp y Marca solo replicó parte.") parser.add_argument("--show-unsynced-contacts", action="store_true", help="Detalla contactos en sucursal que no están en Marca.") parser.add_argument("--show-brand-only", action="store_true", help="Detalla contactos solo en Marca (leads digitales / imports).") parser.add_argument("--all", action="store_true", help="Muestra todos los detalles.") parser.add_argument("--limit", type=int, default=200, help="Máximo de filas por listado. Default: 200.") args = parser.parse_args() if not os.path.exists(DB_PATH): safe_print(f"Error: DB no encontrada en {DB_PATH}.") sys.exit(1) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row try: r = analyze(conn) print_overview(r) print_branch_table(r) show_sync = args.all or args.show_sync_gaps show_multi = args.all or args.show_multi_opp_gaps show_unsynced = args.all or args.show_unsynced_contacts show_only = args.all or args.show_brand_only if show_sync and r["sync_gaps"]: print_sync_gaps(r["sync_gaps"], r["accounts"], limit=args.limit) if show_multi and r["multi_opp_gaps"]: print_multi_opp_gaps(r["multi_opp_gaps"], r["accounts"], limit=args.limit) if show_unsynced and r["unsynced_contacts"]: print_unsynced_contacts(r["unsynced_contacts"], r["accounts"], limit=args.limit) if show_only and r["brand_only"]: print_brand_only(r["brand_only"], limit=args.limit) if not (show_sync or show_multi or show_unsynced or show_only): safe_print( " Tip: agrega --show-sync-gaps, --show-multi-opp-gaps, " "--show-unsynced-contacts, --show-brand-only o --all para ver el detalle.\n" ) finally: conn.close() if __name__ == "__main__": main()