#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ audit_brand_contact_opportunity_map.py Mapea la relación entre contactos y oportunidades en la cuenta de Marca Principal e identifica discrepancias: - Contactos sin ninguna oportunidad - Contactos con más de una oportunidad (duplicados potenciales) - Oportunidades huérfanas (contact_id no existe en la BD) - Breakdown por estado de oportunidad Uso: python scripts/audit_brand_contact_opportunity_map.py python scripts/audit_brand_contact_opportunity_map.py --show-no-opp # lista contactos sin opp python scripts/audit_brand_contact_opportunity_map.py --show-multi-opp # lista contactos con 2+ opps python scripts/audit_brand_contact_opportunity_map.py --show-orphan-opp # lista opps sin contacto python scripts/audit_brand_contact_opportunity_map.py --all # muestra todo """ import argparse import os import sys import sqlite3 import re import unicodedata ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from paths import DB_PATH BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3" # --------------------------------------------------------------------------- # Utilidades # --------------------------------------------------------------------------- def safe_print(*args, **kwargs): sep = kwargs.get("sep", " ") end = kwargs.get("end", "\n") text = sep.join(str(a) for a in args) encoding = sys.stdout.encoding or "utf-8" try: sys.stdout.write(text + end) sys.stdout.flush() except UnicodeEncodeError: sys.stdout.write(text.encode(encoding, errors="replace").decode(encoding) + end) sys.stdout.flush() def normalize_text(text): if not text: return "" nfkd = unicodedata.normalize("NFD", str(text)) clean = "".join(c for c in nfkd if unicodedata.category(c) != "Mn") return " ".join(clean.lower().split()) def normalize_phone(phone): return re.sub(r"\D+", "", str(phone)) if phone else "" def fmt_contact(c): name = f"{c.get('first_name') or ''} {c.get('last_name') or ''}".strip() or "Sin nombre" phone = c.get("phone") or "—" email = c.get("email") or "—" return name, phone, email def status_label(s): return (s or "open").upper() # --------------------------------------------------------------------------- # Carga de datos # --------------------------------------------------------------------------- def load_data(conn): contacts = { row["id"]: dict(row) for row in conn.execute( "SELECT * FROM contacts WHERE location_id = ?", (BRAND_LOCATION_ID,) ).fetchall() } opps = conn.execute(""" SELECT o.*, p.name AS pipeline_name FROM opportunities o LEFT JOIN pipelines p ON o.pipeline_id = p.id AND o.location_id = p.location_id WHERE o.location_id = ? """, (BRAND_LOCATION_ID,)).fetchall() opps = [dict(row) for row in opps] return contacts, opps # --------------------------------------------------------------------------- # Análisis # --------------------------------------------------------------------------- def analyze(contacts, opps): # Agrupar oportunidades por contact_id opps_by_contact = {} orphan_opps = [] # opp cuyo contact_id no existe en la BD de Marca for o in opps: cid = o.get("contact_id") if not cid or cid not in contacts: orphan_opps.append(o) else: opps_by_contact.setdefault(cid, []).append(o) # Clasificar contactos no_opp = [] # 0 oportunidades one_opp = [] # exactamente 1 multi_opp = [] # 2 o más for cid, c in contacts.items(): c_opps = opps_by_contact.get(cid, []) if len(c_opps) == 0: no_opp.append(c) elif len(c_opps) == 1: one_opp.append((c, c_opps[0])) else: multi_opp.append((c, c_opps)) # Conteo de estados entre todas las opps válidas status_count = {} pipeline_count = {} for o in opps: if o not in orphan_opps: s = status_label(o.get("status")) status_count[s] = status_count.get(s, 0) + 1 pl = o.get("pipeline_name") or "Sin Pipeline" pipeline_count[pl] = pipeline_count.get(pl, 0) + 1 return { "contacts_total": len(contacts), "opps_total": len(opps), "no_opp": no_opp, "one_opp": one_opp, "multi_opp": multi_opp, "orphan_opps": orphan_opps, "status_count": status_count, "pipeline_count": pipeline_count, "opps_by_contact": opps_by_contact, } # --------------------------------------------------------------------------- # Impresión # --------------------------------------------------------------------------- def print_summary(r): W = 72 total_c = r["contacts_total"] total_o = r["opps_total"] no_opp = r["no_opp"] one_opp = r["one_opp"] multi_opp = r["multi_opp"] orphans = r["orphan_opps"] gap = total_c - total_o safe_print("\n" + "=" * W) safe_print("MAPA CONTACTO <-> OPORTUNIDAD | CUENTA DE MARCA PRINCIPAL") safe_print("=" * W) # Números brutos safe_print(f"\n Contactos en Marca: {total_c:>5}") safe_print(f" Oportunidades en Marca: {total_o:>5}") safe_print(f" Diferencia (contactos - opps): {gap:>+5}") # Relación 1:N safe_print(f"\n {'─' * 60}") safe_print(f" Contactos con 0 oportunidades: {len(no_opp):>5} {'<-- SIN OPP' if no_opp else ''}") safe_print(f" Contactos con 1 oportunidad: {len(one_opp):>5} (estado normal)") safe_print(f" Contactos con 2+ oportunidades: {len(multi_opp):>5} {'<-- REVISAR' if multi_opp else ''}") safe_print(f" Oportunidades sin contacto (BD): {len(orphans):>5} {'<-- HUERFANAS' if orphans else ''}") # Desglose de opps válidas por estado if r["status_count"]: safe_print(f"\n {'─' * 60}") safe_print(" Oportunidades por Estado:") for s, cnt in sorted(r["status_count"].items(), key=lambda x: -x[1]): bar = "#" * min(cnt, 40) safe_print(f" {s:<14} {cnt:>4} {bar}") # Desglose por pipeline if r["pipeline_count"]: safe_print(f"\n {'─' * 60}") safe_print(" Oportunidades por Pipeline:") for pl, cnt in sorted(r["pipeline_count"].items(), key=lambda x: -x[1]): safe_print(f" {pl:<46} {cnt:>4}") # Multi-opp: resumen rápido de cuántas opps tienen if multi_opp: from collections import Counter dist = Counter(len(opps) for _, opps in multi_opp) safe_print(f"\n {'─' * 60}") safe_print(" Distribucion de contactos con 2+ opps:") for n, cnt in sorted(dist.items()): safe_print(f" {n} oportunidades: {cnt} contactos") safe_print("\n" + "=" * W + "\n") def print_no_opp(no_opp, limit=200): W = 90 safe_print(f"\n{'=' * W}") safe_print(f"CONTACTOS SIN OPORTUNIDAD ({len(no_opp)} total)") safe_print(f"{'=' * W}") for i, c in enumerate(sorted(no_opp, key=lambda x: normalize_text(f"{x.get('first_name','')} {x.get('last_name','')}")), 1): name, phone, email = fmt_contact(c) safe_print(f" {i:03d}. {name:<40} Tel: {phone:<18} Email: {email}") if i >= limit: safe_print(f" ... [{len(no_opp) - limit} mas omitidos]") break safe_print("=" * W + "\n") def print_multi_opp(multi_opp, limit=100): W = 90 safe_print(f"\n{'=' * W}") safe_print(f"CONTACTOS CON 2+ OPORTUNIDADES ({len(multi_opp)} total)") safe_print(f"{'=' * W}") shown = 0 for c, opps in sorted(multi_opp, key=lambda x: -len(x[1])): name, phone, email = fmt_contact(c) safe_print(f"\n {name} | Tel: {phone} | Email: {email}") safe_print(f" ID Contacto: {c['id']} | Oportunidades: {len(opps)}") for o in sorted(opps, key=lambda x: x.get("date_added") or ""): val = f"${(o.get('monetary_value') or 0):,.0f}" safe_print(f" - [{status_label(o.get('status')):<10}] {(o.get('name') or 'Sin nombre'):<40} {val:>10} (ID: {o['id']})") shown += 1 if shown >= limit: safe_print(f"\n ... [{len(multi_opp) - limit} contactos mas omitidos]") break safe_print("\n" + "=" * W + "\n") def print_orphan_opps(orphan_opps, limit=100): W = 90 safe_print(f"\n{'=' * W}") safe_print(f"OPORTUNIDADES SIN CONTACTO EN BD ({len(orphan_opps)} total)") safe_print(f"{'=' * W}") for i, o in enumerate(orphan_opps, 1): val = f"${(o.get('monetary_value') or 0):,.0f}" safe_print(f" {i:03d}. [{status_label(o.get('status')):<10}] {(o.get('name') or 'Sin nombre'):<40} {val:>10}") safe_print(f" Contact ID: {o.get('contact_id') or '—'} | Opp ID: {o['id']}") if i >= limit: safe_print(f" ... [{len(orphan_opps) - limit} mas omitidos]") break safe_print("=" * W + "\n") # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Mapea contactos vs oportunidades en Marca y detecta discrepancias." ) parser.add_argument("--show-no-opp", action="store_true", help="Lista contactos sin oportunidad.") parser.add_argument("--show-multi-opp", action="store_true", help="Lista contactos con 2+ oportunidades.") parser.add_argument("--show-orphan-opp", action="store_true", help="Lista oportunidades sin contacto en la BD.") parser.add_argument("--all", action="store_true", help="Muestra todos los detalles.") parser.add_argument("--limit", type=int, default=200, help="Max de filas en listados detallados. Default: 200.") args = parser.parse_args() if not os.path.exists(DB_PATH): safe_print(f"Error: DB no encontrada en {DB_PATH}.") sys.exit(1) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row try: contacts, opps = load_data(conn) r = analyze(contacts, opps) print_summary(r) show_no_opp = args.all or args.show_no_opp show_multi_opp = args.all or args.show_multi_opp show_orphan_opp = args.all or args.show_orphan_opp if show_no_opp and r["no_opp"]: print_no_opp(r["no_opp"], limit=args.limit) if show_multi_opp and r["multi_opp"]: print_multi_opp(r["multi_opp"], limit=args.limit) if show_orphan_opp and r["orphan_opps"]: print_orphan_opps(r["orphan_opps"], limit=args.limit) if not (show_no_opp or show_multi_opp or show_orphan_opp): safe_print(" Tip: agrega --show-no-opp, --show-multi-opp, --show-orphan-opp o --all para ver el detalle.\n") finally: conn.close() if __name__ == "__main__": main()