#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ cleanup_brand_orphan_opportunities.py Investiga las oportunidades de la cuenta de Marca Principal y detecta cuáles no tienen contraparte en ninguna sucursal activa (excluye demos). Opcionalmente las elimina vía API de GHL. Uso: python scripts/cleanup_brand_orphan_opportunities.py # solo reporte python scripts/cleanup_brand_orphan_opportunities.py --dry-run # reporte + simulacion de borrado python scripts/cleanup_brand_orphan_opportunities.py --delete # reporte + borrado real python scripts/cleanup_brand_orphan_opportunities.py --dry-run --min-score 50 """ import argparse import csv import os import sys import sqlite3 import json import re import time import unicodedata ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from paths import DB_PATH BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3" CSV_PATH = os.path.join(ROOT_DIR, "Bucéfalo - Mesa de control - API Tokens - MP.csv") # --------------------------------------------------------------------------- # Utilidades compartidas # --------------------------------------------------------------------------- def safe_print(*args, **kwargs): sep = kwargs.get("sep", " ") end = kwargs.get("end", "\n") text = sep.join(str(a) for a in args) encoding = sys.stdout.encoding or "utf-8" try: sys.stdout.write(text + end) sys.stdout.flush() except UnicodeEncodeError: try: sys.stdout.write(text.encode(encoding, errors="replace").decode(encoding) + end) sys.stdout.flush() except Exception: try: sys.stdout.write(text.encode("ascii", errors="replace").decode("ascii") + end) sys.stdout.flush() except Exception: pass def normalize_phone(phone): return re.sub(r"\D+", "", str(phone)) if phone else "" def phone_last_digits(phone, n=10): norm = normalize_phone(phone) return norm[-n:] if len(norm) >= n else norm def normalize_text(text): if not text: return "" nfkd = unicodedata.normalize("NFD", str(text)) clean = "".join(c for c in nfkd if unicodedata.category(c) != "Mn") return " ".join(clean.lower().split()) def opp_names_match(a, b): return bool(a and b and normalize_text(a) == normalize_text(b)) def opportunity_similarity(brand_opp, branch_opp): """Puntaje 0-100 de similitud entre dos oportunidades.""" if brand_opp["id"] == branch_opp["id"]: return 100 score = 0 bn = normalize_text(brand_opp.get("name") or "") sn = normalize_text(branch_opp.get("name") or "") if bn and sn: if bn == sn: score += 60 elif bn in sn or sn in bn: score += 40 else: overlap = set(bn.split()) & set(sn.split()) score += min(35, len(overlap) * 10) bv = brand_opp.get("monetary_value") or 0.0 sv = branch_opp.get("monetary_value") or 0.0 if abs(bv - sv) < 0.01: score += 25 elif max(bv, sv) > 0 and abs(bv - sv) / max(1.0, bv, sv) < 0.2: score += 15 if brand_opp.get("status") == branch_opp.get("status"): score += 15 return score def resolve_brand_link_field_id(conn): """field_id del campo 'ID Oportunidad Sucursal' en Marca (desde object_schemas).""" try: row = conn.execute( "SELECT field_id FROM object_schemas " "WHERE location_id=? AND object_key='opportunity' AND field_key=?", (BRAND_LOCATION_ID, "opportunity.id_oportunidad_sucursal"), ).fetchone() return row["field_id"] if row else None except Exception: return None def get_brand_opp_link_value(opp, link_field_id): """Valor de 'ID Oportunidad Sucursal' en una opp de Marca (custom_fields_json).""" if not link_field_id: return None raw = opp.get("custom_fields_json") if not raw: return None try: cfs = json.loads(raw) except Exception: return None for cf in cfs or []: if cf.get("id") == link_field_id or cf.get("fieldId") == link_field_id: return cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString") return None # --------------------------------------------------------------------------- # Carga de token # --------------------------------------------------------------------------- def load_brand_token(): if not os.path.exists(CSV_PATH): return None try: with open(CSV_PATH, mode="r", encoding="utf-8-sig") as f: reader = csv.DictReader(f) for row in reader: loc_id = (row.get("Location_ID") or row.get("location_id") or "").strip() token = (row.get("API_token") or row.get("api_token") or "").strip() if loc_id == BRAND_LOCATION_ID and token: return token except Exception as e: safe_print(f"Advertencia: no se pudo leer CSV de tokens: {e}") return None # --------------------------------------------------------------------------- # Lógica principal # --------------------------------------------------------------------------- def investigate(conn, min_score, branch_location_ids, branch_name_map): """ Recorre todas las oportunidades de Marca y las clasifica en: - synced: tienen contraparte en al menos una sucursal - orphan: su contacto existe en sucursales pero la oportunidad no - no_contact: el contacto no aparece en ninguna sucursal Retorna (synced, orphan, no_contact) — listas de dicts con metadatos extra. """ # -- Contactos y oportunidades de Marca -- brand_contacts = { row["id"]: dict(row) for row in conn.execute( "SELECT * FROM contacts WHERE location_id = ?", (BRAND_LOCATION_ID,) ).fetchall() } brand_opps = conn.execute(""" SELECT o.*, p.name AS pipeline_name FROM opportunities o LEFT JOIN pipelines p ON o.pipeline_id = p.id AND o.location_id = p.location_id WHERE o.location_id = ? """, (BRAND_LOCATION_ID,)).fetchall() # -- Contactos de sucursales (no-demo) indexados -- raw_branch_contacts = [ dict(row) for row in conn.execute( "SELECT * FROM contacts WHERE location_id != ?", (BRAND_LOCATION_ID,) ).fetchall() if row["location_id"] in branch_location_ids ] idx_phone = {} # phone_full -> [contact, ...] idx_suffix = {} # last_10 -> [contact, ...] idx_email = {} # email -> [contact, ...] idx_name = {} # norm_name -> [contact, ...] for bc in raw_branch_contacts: full_p = normalize_phone(bc["phone"]) suf = phone_last_digits(bc["phone"]) email = (bc["email"] or "").strip().lower() name = normalize_text(f"{bc['first_name'] or ''} {bc['last_name'] or ''}") if full_p: idx_phone.setdefault(full_p, []).append(bc) if suf and suf != full_p: idx_suffix.setdefault(suf, []).append(bc) if email: idx_email.setdefault(email, []).append(bc) if name: idx_name.setdefault(name, []).append(bc) # -- Oportunidades de sucursales indexadas por (location_id, contact_id) -- raw_branch_opps = [ dict(row) for row in conn.execute( "SELECT * FROM opportunities WHERE location_id != ?", (BRAND_LOCATION_ID,) ).fetchall() if row["location_id"] in branch_location_ids ] branch_opps_idx = {} # (loc_id, contact_id) -> [opp, ...] for bo in raw_branch_opps: key = (bo["location_id"], bo["contact_id"]) branch_opps_idx.setdefault(key, []).append(bo) # Set de TODOS los ids nativos de opps de sucursal (incluye demos a proposito, # para no borrar de Marca opps con link valido). Y field_id del link en Marca. all_branch_opp_ids = { r["id"] for r in conn.execute( "SELECT id FROM opportunities WHERE location_id != ?", (BRAND_LOCATION_ID,) ).fetchall() if r["id"] } link_field_id = resolve_brand_link_field_id(conn) link_protected = 0 # -- Clasificación -- synced = [] orphan = [] no_contact = [] for row in brand_opps: bo = dict(row) # SALVAGUARDA por link directo: si esta opp de Marca tiene el campo # "ID Oportunidad Sucursal" apuntando a una opp de sucursal que existe, # tiene contraparte REAL y nunca es huerfana, sin importar la heuristica # de nombre/monto. Evita borrados erroneos por nombre divergente. link_val = get_brand_opp_link_value(bo, link_field_id) if link_val and link_val in all_branch_opp_ids: link_protected += 1 synced.append({ **bo, "_match_loc": None, "_match_score": 100, "_match_name": "(link ID Oportunidad Sucursal)", "_contact": "", "_phone": "", }) continue bc = brand_contacts.get(bo["contact_id"]) if not bc: orphan.append({**bo, "_reason": "Contacto no existe en la BD local de Marca"}) continue fn = bc.get("first_name") or "" ln = bc.get("last_name") or "" phone = bc.get("phone") or "" email = (bc.get("email") or "").strip().lower() name = normalize_text(f"{fn} {ln}") full_p = normalize_phone(phone) suf = phone_last_digits(phone) # Buscar contacto en cualquier sucursal (deduplicado por (loc, id)) seen_contacts = set() candidates = [] def _add(lst): for c in lst: key = (c["location_id"], c["id"]) if key not in seen_contacts and c["location_id"] in branch_location_ids: seen_contacts.add(key) candidates.append(c) if full_p: _add(idx_phone.get(full_p, [])) if suf: _add(idx_suffix.get(suf, [])) if email: _add(idx_email.get(email, [])) if name: _add(idx_name.get(name, [])) if not candidates: no_contact.append({ **bo, "_reason": "Contacto sin coincidencia en ninguna sucursal", "_contact": f"{fn} {ln}".strip(), "_phone": phone, "_email": email, }) continue # Buscar oportunidad coincidente en las sucursales del contacto best_score = 0 best_loc_id = None found_exact = False for branch_c in candidates: loc_id = branch_c["location_id"] branch_cid = branch_c["id"] b_opps = branch_opps_idx.get((loc_id, branch_cid), []) for so in b_opps: if bo["id"] == so["id"] or opp_names_match(bo.get("name"), so.get("name")): found_exact = True best_score = 100 best_loc_id = loc_id break score = opportunity_similarity(bo, so) if score > best_score: best_score = score best_loc_id = loc_id if found_exact: break if found_exact or best_score >= min_score: synced.append({ **bo, "_match_loc": best_loc_id, "_match_score": best_score, "_match_name": branch_name_map.get(best_loc_id, best_loc_id), "_contact": f"{fn} {ln}".strip(), "_phone": phone, }) else: orphan.append({ **bo, "_reason": "Oportunidad no encontrada en ninguna sucursal", "_best_score": best_score, "_best_loc": branch_name_map.get(best_loc_id, best_loc_id) if best_loc_id else "—", "_contact": f"{fn} {ln}".strip(), "_phone": phone, "_email": email, }) if link_field_id: safe_print(f" [SALVAGUARDA] {link_protected} opp(s) de Marca protegidas por 'ID Oportunidad Sucursal' (link directo a opp de sucursal existente).") else: safe_print(" [SALVAGUARDA] Campo 'ID Oportunidad Sucursal' no resuelto en object_schemas de Marca. Corre sync de metadata para activar esta proteccion.") return synced, orphan, no_contact def print_orphan_detail(all_orphans): if not all_orphans: safe_print(" (ninguna)") return # Agrupar por pipeline groups = {} for o in all_orphans: pl = o.get("pipeline_name") or "Sin Pipeline" groups.setdefault(pl, []).append(o) W = 96 for pl, opps in sorted(groups.items()): safe_print(f"\n Pipeline: {pl} ({len(opps)} oportunidades)") safe_print(" " + "-" * W) for o in sorted(opps, key=lambda x: (x.get("name") or "")): safe_print(f" Nombre: {o.get('name') or 'Sin nombre'}") safe_print(f" ID: {o['id']}") safe_print(f" Estado: {(o.get('status') or 'open').upper():<12} " f"Valor: ${(o.get('monetary_value') or 0):,.2f}") safe_print(f" Cliente: {o.get('_contact') or 'N/A'} | Tel: {o.get('_phone') or 'N/A'}") reason = o.get("_reason", "") extra = "" if o.get("_best_score"): extra = f" (mejor score encontrado: {o['_best_score']}% en: {o.get('_best_loc', '—')})" safe_print(f" Razon: {reason}{extra}") safe_print(" " + "·" * W) def print_summary(brand_total, synced, orphan, no_contact, branch_accounts): all_orphans = orphan + no_contact W = 70 safe_print(f"\n{'=' * W}") safe_print("RESUMEN EJECUTIVO") safe_print(f"{'=' * W}") safe_print(f" Sucursales activas auditadas: {len(branch_accounts):>5}") safe_print(f" Total oportunidades en Marca: {brand_total:>5}") safe_print(f" {'-' * 50}") safe_print(f" Sincronizadas (con contraparte en sucursal): {len(synced):>5}") safe_print(f" Huerfanas TOTAL: {len(all_orphans):>5}") safe_print(f" +- Oportunidad sin par en sucursales: {len(orphan):>5}") safe_print(f" +- Contacto ausente en todas las sucs.: {len(no_contact):>5}") # Desglose por pipeline de las huérfanas if all_orphans: pipeline_count = {} for o in all_orphans: pl = o.get("pipeline_name") or "Sin Pipeline" pipeline_count[pl] = pipeline_count.get(pl, 0) + 1 safe_print(f"\n Huerfanas por Pipeline:") for pl, cnt in sorted(pipeline_count.items(), key=lambda x: -x[1]): safe_print(f" {pl:<46} {cnt:>4}") # Desglose de sincronizadas por sucursal (top matches) if synced: loc_count = {} for o in synced: loc_name = o.get("_match_name") or "Desconocida" loc_count[loc_name] = loc_count.get(loc_name, 0) + 1 safe_print(f"\n Sincronizadas por Sucursal:") for loc, cnt in sorted(loc_count.items(), key=lambda x: -x[1]): safe_print(f" {loc:<42} {cnt:>4}") safe_print(f"{'=' * W}\n") def run_deletion(all_orphans, dry_run, token): client = None if not dry_run: try: from ghl_client import GHLClient client = GHLClient() except Exception as e: safe_print(f"Error: no se pudo importar GHLClient: {e}") safe_print("Ejecuta desde el directorio raiz del proyecto.") return label = "[DRY-RUN]" if dry_run else "[BORRADO REAL]" W = 80 safe_print(f"\n{'=' * W}") safe_print(f"ELIMINACION DE OPORTUNIDADES HUERFANAS {label}") safe_print(f"{'=' * W}") safe_print(f"Total candidatas a eliminar: {len(all_orphans)}") if dry_run: safe_print("Modo DRY-RUN activo — no se realizara ningun cambio en el CRM.\n") else: safe_print("ATENCION: Borrado permanente. Esta accion no se puede deshacer.\n") deleted = 0 errors = 0 for idx, o in enumerate(all_orphans, 1): opp_id = o["id"] opp_name = o.get("name") or "Sin nombre" contact = o.get("_contact") or "N/A" pipeline = o.get("pipeline_name") or "Sin Pipeline" safe_print(f" {idx:03d}. {label} [{pipeline}] '{opp_name}'") safe_print(f" ID: {opp_id} | Cliente: {contact}") if not dry_run and client is not None: try: ok = client.delete_opportunity(token, opp_id, BRAND_LOCATION_ID) if ok: deleted += 1 safe_print(" --> ELIMINADA OK") else: errors += 1 safe_print(" --> ERROR: la API no confirmo el borrado") time.sleep(0.15) except Exception as e: errors += 1 safe_print(f" --> EXCEPCION: {e}") safe_print() if dry_run: safe_print(f" [DRY-RUN] Se habrian eliminado {len(all_orphans)} oportunidades de Marca.") else: safe_print(f" Resultado: {deleted} eliminadas | {errors} errores") safe_print("=" * W + "\n") # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser( description="Detecta oportunidades de Marca sin contraparte en sucursales y opcionalmente las elimina." ) parser.add_argument("--dry-run", action="store_true", help="Simula la eliminacion sin realizar cambios reales en el CRM.") parser.add_argument("--delete", action="store_true", help="Elimina via API las oportunidades huerfanas (requiere token en el CSV de control).") parser.add_argument("--min-score", type=int, default=35, help="Umbral de similitud (0-100) para considerar una oportunidad como sincronizada. Default: 35.") args = parser.parse_args() if not os.path.exists(DB_PATH): safe_print(f"Error: DB no encontrada en {DB_PATH}. Ejecuta la sincronizacion global primero.") sys.exit(1) mode = "[DRY-RUN]" if args.dry_run else ("[BORRADO REAL]" if args.delete else "[SOLO LECTURA]") W = 100 safe_print("\n" + "=" * W) safe_print(f"INVESTIGACION DE OPORTUNIDADES HUERFANAS EN CUENTA DE MARCA {mode}") safe_print("=" * W) safe_print(f" Umbral de similitud: {args.min_score}% | Demos: excluidas\n") conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row try: # Cargar cuentas (excluir marca y demos) all_accounts = conn.execute("SELECT * FROM accounts").fetchall() branch_accounts = [ a for a in all_accounts if a["location_id"] != BRAND_LOCATION_ID and "demo" not in (a["nombre"] or "").lower() ] branch_location_ids = {a["location_id"] for a in branch_accounts} branch_name_map = {a["location_id"]: a["nombre"] for a in branch_accounts} safe_print(f" Sucursales activas encontradas: {len(branch_accounts)}") for a in sorted(branch_accounts, key=lambda x: x["nombre"]): safe_print(f" - {a['nombre']}") safe_print() # Investigación safe_print(" Analizando oportunidades de Marca vs. sucursales...") synced, orphan, no_contact = investigate(conn, args.min_score, branch_location_ids, branch_name_map) brand_total = len(synced) + len(orphan) + len(no_contact) all_orphans = orphan + no_contact # Detalle de huérfanas safe_print(f"\n{'=' * W}") safe_print("DETALLE: OPORTUNIDADES HUERFANAS (SIN CONTRAPARTE EN SUCURSALES)") safe_print(f"{'=' * W}") print_orphan_detail(all_orphans) # Resumen print_summary(brand_total, synced, orphan, no_contact, branch_accounts) # Eliminación o dry-run (si hay huérfanas y el usuario lo pidió) if (args.delete or args.dry_run) and all_orphans: token = None if args.delete: token = load_brand_token() if not token: safe_print("Error: no se encontro el token de la cuenta Marca en el CSV de control.") sys.exit(1) run_deletion(all_orphans, dry_run=args.dry_run, token=token) elif not all_orphans: safe_print("No hay oportunidades huerfanas. No se requiere ninguna accion.") finally: conn.close() if __name__ == "__main__": main()