#!/usr/bin/env python3 # -*- coding: utf-8 -*- """audit_phone_collisions.py Auditoria read-only y PARALELA de los casos de colision de telefono que hacen fallar a sync_missing_opps_to_brand.py. Para cada caso (opp en sucursal sin contraparte en Marca cuyo contacto colisiona por telefono con un contacto de Marca de nombre distinto), trae EN PARALELO: - Detalle completo del contacto de sucursal (+ sus opps). - Detalle completo del contacto de Marca que colisiona (+ sus opps). - Similitud de nombre via common.match_contacts. - Conteo de cuantos contactos de Marca comparten ese telefono. Clasifica cada caso: - SAME_PERSON: muy probable el mismo (mismo email / similitud alta) -> deberia asociarse la opp al contacto de Marca existente. - DISTINCT_PERSON: pareja/familia con mismo numero -> skip + revision manual. - UNCERTAIN: requiere ojo humano. Read-only. No escribe nada. """ import json import os import sys from concurrent.futures import ThreadPoolExecutor, as_completed ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) if SCRIPTS_DIR not in sys.path: sys.path.insert(0, SCRIPTS_DIR) import requests from common import match_contacts, normalize_phone BASE_URL = "https://services.leadconnectorhq.com" BRAND_LOC = "GbKkBpCmKu2QmloKFHy3" BRAND_TOKEN = "pit-4e4266f8-97ac-4150-a971-cc9158809640" # Tokens por location (de la mesa de control) TOKENS = { "GbKkBpCmKu2QmloKFHy3": "pit-4e4266f8-97ac-4150-a971-cc9158809640", # Marca "uZnMH5bO6MXTHcgHeyq9": "pit-dd42c1ce-2ab7-4bf9-8bc0-c0087a83b2e5", # Pilares } def _load_token(loc): if loc in TOKENS: return TOKENS[loc] import csv path = os.path.join(ROOT_DIR, "Bucéfalo - Mesa de control - API Tokens - MP.csv") with open(path, encoding="utf-8-sig") as f: for row in csv.DictReader(f): if (row.get("Location_ID") or "").strip() == loc: return (row.get("API_token") or "").strip() raise RuntimeError(f"sin token para {loc}") def _headers(token): return {"Authorization": f"Bearer {token}", "Version": "2021-07-28", "Accept": "application/json", "Content-Type": "application/json"} def get_contact(loc, contact_id): token = _load_token(loc) r = requests.get(f"{BASE_URL}/contacts/{contact_id}", headers=_headers(token), timeout=30) if r.status_code != 200: return {"error": f"HTTP {r.status_code}: {r.text[:160]}"} return r.json().get("contact", {}) def get_opps_for_contact(loc, contact_id): token = _load_token(loc) r = requests.get(f"{BASE_URL}/opportunities/search", headers=_headers(token), params={"location_id": loc, "contact_id": contact_id, "limit": 50}, timeout=30) if r.status_code != 200: return [] return r.json().get("opportunities", []) or [] def count_brand_contacts_sharing_phone(phone): """Cuantos contactos de Marca comparten el telefono (normalizado).""" token = BRAND_TOKEN body = {"locationId": BRAND_LOC, "pageLimit": 20, "page": 1, "filters": [{"field": "phone", "operator": "contains", "value": phone}]} r = requests.post(f"{BASE_URL}/contacts/search", headers=_headers(token), json=body, timeout=30) if r.status_code != 200: return None, [] contacts = r.json().get("contacts", []) or [] np = normalize_phone(phone) matches = [c for c in contacts if normalize_phone(c.get("phone")) == np] return len(matches), [ {"id": c.get("id"), "name": f"{c.get('firstName') or ''} {c.get('lastName') or ''}".strip(), "phone": c.get("phone"), "email": c.get("email")} for c in matches ] def audit_one(case): """Procesa un caso. Hace las 5 consultas concurrentemente.""" branch_loc = case["branch_location_id"] branch_cid = case["branch_contact_id"] brand_cid = case["colliding_brand_contact_id"] phone = case["phone"] out = {"opp_id": case["opp_id"], "branch_name": case["branch_name"], "opp_name": case["opp_name"], "phone": phone} with ThreadPoolExecutor(max_workers=5) as ex: futs = { "branch_contact": ex.submit(get_contact, branch_loc, branch_cid), "brand_contact": ex.submit(get_contact, BRAND_LOC, brand_cid), "branch_opps": ex.submit(get_opps_for_contact, branch_loc, branch_cid), "brand_opps": ex.submit(get_opps_for_contact, BRAND_LOC, brand_cid), "phone_share": ex.submit(count_brand_contacts_sharing_phone, phone), } res = {k: f.result() for k, f in futs.items()} bc = res["branch_contact"] mc = res["brand_contact"] out["branch_contact"] = { "id": bc.get("id"), "name": f"{bc.get('firstName') or ''} {bc.get('lastName') or ''}".strip(), "email": bc.get("email"), "phone": bc.get("phone"), "dateAdded": bc.get("dateAdded"), "tags": bc.get("tags"), } out["brand_collider"] = { "id": mc.get("id"), "name": f"{mc.get('firstName') or ''} {mc.get('lastName') or ''}".strip(), "email": mc.get("email"), "phone": mc.get("phone"), "dateAdded": mc.get("dateAdded"), "tags": mc.get("tags"), } out["branch_opps_count"] = len(res["branch_opps"]) out["brand_opps_count"] = len(res["brand_opps"]) out["brand_opps"] = [{"name": o.get("name"), "status": o.get("status"), "value": o.get("monetaryValue"), "pipelineId": o.get("pipelineId")} for o in res["brand_opps"][:10]] share_count, share_list = res["phone_share"] out["brand_contacts_sharing_phone"] = share_count out["brand_contacts_sharing_phone_list"] = share_list # Similitud de nombre (sin requerir phone/email, solo nombre via match_contacts) src = {"first_name": bc.get("firstName"), "last_name": bc.get("lastName"), "phone": bc.get("phone"), "email": bc.get("email")} dst = {"first_name": mc.get("firstName"), "last_name": mc.get("lastName"), "phone": mc.get("phone"), "email": mc.get("email")} try: m = match_contacts(src, dst, threshold=0.80) out["match_level"] = m.get("level") out["match_name_similarity"] = round(m.get("name_score", 0) or 0, 3) except Exception as e: out["match_level"] = f"error:{e}" # Email igual? be = (bc.get("email") or "").strip().lower() me = (mc.get("email") or "").strip().lower() placeholder = {"sincorreo@gmail.com", "noemail@gmail.com", ""} out["same_email"] = bool(be and be == me and be not in placeholder) # Clasificacion heuristica sim = out.get("match_name_similarity", 0) or 0 if out["same_email"] or sim >= 0.80: verdict = "SAME_PERSON" elif sim >= 0.55: verdict = "UNCERTAIN" else: verdict = "DISTINCT_PERSON" out["verdict"] = verdict return out def main(): clean = os.path.join(ROOT_DIR, "generated", "reports", "missing_opps_clean.json") data = json.load(open(clean, encoding="utf-8")) cases = [] for it in data["items"]: coll = next((a for a in it.get("actions", []) if a.get("action") == "phone_collision_unresolved"), None) if not coll: continue bc = it.get("branch_contact", {}) cases.append({ "opp_id": it["opp_id"], "branch_name": it.get("branch_name"), "branch_location_id": it.get("branch_location_id"), "branch_contact_id": bc.get("id"), "opp_name": it.get("opp_name"), "phone": bc.get("phone"), "colliding_brand_contact_id": coll.get("colliding_brand_contact_id"), }) print(f"Auditando {len(cases)} casos de colision EN PARALELO...\n") results = [] with ThreadPoolExecutor(max_workers=len(cases) or 1) as ex: futs = [ex.submit(audit_one, c) for c in cases] for f in as_completed(futs): results.append(f.result()) results.sort(key=lambda r: r["opp_name"]) for r in results: print("=" * 78) print(f"OPP {r['opp_id']} | {r['branch_name']} | tel {r['phone']}") bc = r["branch_contact"]; mc = r["brand_collider"] print(f" SUCURSAL : {bc['name']!r} email={bc['email']} alta={bc['dateAdded']}") print(f" MARCA(col): {mc['name']!r} email={mc['email']} alta={mc['dateAdded']}") print(f" similitud_nombre={r.get('match_name_similarity')} match_level={r.get('match_level')} same_email={r['same_email']}") print(f" contactos en Marca con ese telefono: {r['brand_contacts_sharing_phone']} -> {[x['name'] for x in r['brand_contacts_sharing_phone_list']]}") print(f" opps del colider en Marca: {r['brand_opps_count']} -> {[(o['name'], o['status'], o['value']) for o in r['brand_opps']]}") print(f" >>> VEREDICTO: {r['verdict']}") print() out_path = os.path.join(ROOT_DIR, "generated", "reports", "phone_collisions_audit.json") with open(out_path, "w", encoding="utf-8") as f: json.dump(results, f, ensure_ascii=False, indent=2) print(f"Reporte: {out_path}") # Resumen from collections import Counter verdicts = Counter(r["verdict"] for r in results) print(f"Resumen veredictos: {dict(verdicts)}") if __name__ == "__main__": main()