224 lines
9.2 KiB
Python
224 lines
9.2 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""audit_phone_collisions.py
|
|
|
|
Auditoria read-only y PARALELA de los casos de colision de telefono que
|
|
hacen fallar a sync_missing_opps_to_brand.py.
|
|
|
|
Para cada caso (opp en sucursal sin contraparte en Marca cuyo contacto
|
|
colisiona por telefono con un contacto de Marca de nombre distinto), trae
|
|
EN PARALELO:
|
|
- Detalle completo del contacto de sucursal (+ sus opps).
|
|
- Detalle completo del contacto de Marca que colisiona (+ sus opps).
|
|
- Similitud de nombre via common.match_contacts.
|
|
- Conteo de cuantos contactos de Marca comparten ese telefono.
|
|
|
|
Clasifica cada caso:
|
|
- SAME_PERSON: muy probable el mismo (mismo email / similitud alta) ->
|
|
deberia asociarse la opp al contacto de Marca existente.
|
|
- DISTINCT_PERSON: pareja/familia con mismo numero -> skip + revision manual.
|
|
- UNCERTAIN: requiere ojo humano.
|
|
|
|
Read-only. No escribe nada.
|
|
"""
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
if SCRIPTS_DIR not in sys.path:
|
|
sys.path.insert(0, SCRIPTS_DIR)
|
|
|
|
import requests
|
|
from common import match_contacts, normalize_phone
|
|
|
|
BASE_URL = "https://services.leadconnectorhq.com"
|
|
BRAND_LOC = "GbKkBpCmKu2QmloKFHy3"
|
|
BRAND_TOKEN = "pit-4e4266f8-97ac-4150-a971-cc9158809640"
|
|
|
|
# Tokens por location (de la mesa de control)
|
|
TOKENS = {
|
|
"GbKkBpCmKu2QmloKFHy3": "pit-4e4266f8-97ac-4150-a971-cc9158809640", # Marca
|
|
"uZnMH5bO6MXTHcgHeyq9": "pit-dd42c1ce-2ab7-4bf9-8bc0-c0087a83b2e5", # Pilares
|
|
}
|
|
|
|
|
|
def _load_token(loc):
|
|
if loc in TOKENS:
|
|
return TOKENS[loc]
|
|
import csv
|
|
path = os.path.join(ROOT_DIR, "Bucéfalo - Mesa de control - API Tokens - MP.csv")
|
|
with open(path, encoding="utf-8-sig") as f:
|
|
for row in csv.DictReader(f):
|
|
if (row.get("Location_ID") or "").strip() == loc:
|
|
return (row.get("API_token") or "").strip()
|
|
raise RuntimeError(f"sin token para {loc}")
|
|
|
|
|
|
def _headers(token):
|
|
return {"Authorization": f"Bearer {token}", "Version": "2021-07-28", "Accept": "application/json", "Content-Type": "application/json"}
|
|
|
|
|
|
def get_contact(loc, contact_id):
|
|
token = _load_token(loc)
|
|
r = requests.get(f"{BASE_URL}/contacts/{contact_id}", headers=_headers(token), timeout=30)
|
|
if r.status_code != 200:
|
|
return {"error": f"HTTP {r.status_code}: {r.text[:160]}"}
|
|
return r.json().get("contact", {})
|
|
|
|
|
|
def get_opps_for_contact(loc, contact_id):
|
|
token = _load_token(loc)
|
|
r = requests.get(f"{BASE_URL}/opportunities/search", headers=_headers(token),
|
|
params={"location_id": loc, "contact_id": contact_id, "limit": 50}, timeout=30)
|
|
if r.status_code != 200:
|
|
return []
|
|
return r.json().get("opportunities", []) or []
|
|
|
|
|
|
def count_brand_contacts_sharing_phone(phone):
|
|
"""Cuantos contactos de Marca comparten el telefono (normalizado)."""
|
|
token = BRAND_TOKEN
|
|
body = {"locationId": BRAND_LOC, "pageLimit": 20, "page": 1,
|
|
"filters": [{"field": "phone", "operator": "contains", "value": phone}]}
|
|
r = requests.post(f"{BASE_URL}/contacts/search", headers=_headers(token), json=body, timeout=30)
|
|
if r.status_code != 200:
|
|
return None, []
|
|
contacts = r.json().get("contacts", []) or []
|
|
np = normalize_phone(phone)
|
|
matches = [c for c in contacts if normalize_phone(c.get("phone")) == np]
|
|
return len(matches), [
|
|
{"id": c.get("id"), "name": f"{c.get('firstName') or ''} {c.get('lastName') or ''}".strip(),
|
|
"phone": c.get("phone"), "email": c.get("email")}
|
|
for c in matches
|
|
]
|
|
|
|
|
|
def audit_one(case):
|
|
"""Procesa un caso. Hace las 5 consultas concurrentemente."""
|
|
branch_loc = case["branch_location_id"]
|
|
branch_cid = case["branch_contact_id"]
|
|
brand_cid = case["colliding_brand_contact_id"]
|
|
phone = case["phone"]
|
|
|
|
out = {"opp_id": case["opp_id"], "branch_name": case["branch_name"],
|
|
"opp_name": case["opp_name"], "phone": phone}
|
|
|
|
with ThreadPoolExecutor(max_workers=5) as ex:
|
|
futs = {
|
|
"branch_contact": ex.submit(get_contact, branch_loc, branch_cid),
|
|
"brand_contact": ex.submit(get_contact, BRAND_LOC, brand_cid),
|
|
"branch_opps": ex.submit(get_opps_for_contact, branch_loc, branch_cid),
|
|
"brand_opps": ex.submit(get_opps_for_contact, BRAND_LOC, brand_cid),
|
|
"phone_share": ex.submit(count_brand_contacts_sharing_phone, phone),
|
|
}
|
|
res = {k: f.result() for k, f in futs.items()}
|
|
|
|
bc = res["branch_contact"]
|
|
mc = res["brand_contact"]
|
|
out["branch_contact"] = {
|
|
"id": bc.get("id"), "name": f"{bc.get('firstName') or ''} {bc.get('lastName') or ''}".strip(),
|
|
"email": bc.get("email"), "phone": bc.get("phone"),
|
|
"dateAdded": bc.get("dateAdded"), "tags": bc.get("tags"),
|
|
}
|
|
out["brand_collider"] = {
|
|
"id": mc.get("id"), "name": f"{mc.get('firstName') or ''} {mc.get('lastName') or ''}".strip(),
|
|
"email": mc.get("email"), "phone": mc.get("phone"),
|
|
"dateAdded": mc.get("dateAdded"), "tags": mc.get("tags"),
|
|
}
|
|
out["branch_opps_count"] = len(res["branch_opps"])
|
|
out["brand_opps_count"] = len(res["brand_opps"])
|
|
out["brand_opps"] = [{"name": o.get("name"), "status": o.get("status"),
|
|
"value": o.get("monetaryValue"), "pipelineId": o.get("pipelineId")}
|
|
for o in res["brand_opps"][:10]]
|
|
share_count, share_list = res["phone_share"]
|
|
out["brand_contacts_sharing_phone"] = share_count
|
|
out["brand_contacts_sharing_phone_list"] = share_list
|
|
|
|
# Similitud de nombre (sin requerir phone/email, solo nombre via match_contacts)
|
|
src = {"first_name": bc.get("firstName"), "last_name": bc.get("lastName"),
|
|
"phone": bc.get("phone"), "email": bc.get("email")}
|
|
dst = {"first_name": mc.get("firstName"), "last_name": mc.get("lastName"),
|
|
"phone": mc.get("phone"), "email": mc.get("email")}
|
|
try:
|
|
m = match_contacts(src, dst, threshold=0.80)
|
|
out["match_level"] = m.get("level")
|
|
out["match_name_similarity"] = round(m.get("name_score", 0) or 0, 3)
|
|
except Exception as e:
|
|
out["match_level"] = f"error:{e}"
|
|
|
|
# Email igual?
|
|
be = (bc.get("email") or "").strip().lower()
|
|
me = (mc.get("email") or "").strip().lower()
|
|
placeholder = {"sincorreo@gmail.com", "noemail@gmail.com", ""}
|
|
out["same_email"] = bool(be and be == me and be not in placeholder)
|
|
|
|
# Clasificacion heuristica
|
|
sim = out.get("match_name_similarity", 0) or 0
|
|
if out["same_email"] or sim >= 0.80:
|
|
verdict = "SAME_PERSON"
|
|
elif sim >= 0.55:
|
|
verdict = "UNCERTAIN"
|
|
else:
|
|
verdict = "DISTINCT_PERSON"
|
|
out["verdict"] = verdict
|
|
return out
|
|
|
|
|
|
def main():
|
|
clean = os.path.join(ROOT_DIR, "generated", "reports", "missing_opps_clean.json")
|
|
data = json.load(open(clean, encoding="utf-8"))
|
|
cases = []
|
|
for it in data["items"]:
|
|
coll = next((a for a in it.get("actions", []) if a.get("action") == "phone_collision_unresolved"), None)
|
|
if not coll:
|
|
continue
|
|
bc = it.get("branch_contact", {})
|
|
cases.append({
|
|
"opp_id": it["opp_id"],
|
|
"branch_name": it.get("branch_name"),
|
|
"branch_location_id": it.get("branch_location_id"),
|
|
"branch_contact_id": bc.get("id"),
|
|
"opp_name": it.get("opp_name"),
|
|
"phone": bc.get("phone"),
|
|
"colliding_brand_contact_id": coll.get("colliding_brand_contact_id"),
|
|
})
|
|
|
|
print(f"Auditando {len(cases)} casos de colision EN PARALELO...\n")
|
|
results = []
|
|
with ThreadPoolExecutor(max_workers=len(cases) or 1) as ex:
|
|
futs = [ex.submit(audit_one, c) for c in cases]
|
|
for f in as_completed(futs):
|
|
results.append(f.result())
|
|
|
|
results.sort(key=lambda r: r["opp_name"])
|
|
for r in results:
|
|
print("=" * 78)
|
|
print(f"OPP {r['opp_id']} | {r['branch_name']} | tel {r['phone']}")
|
|
bc = r["branch_contact"]; mc = r["brand_collider"]
|
|
print(f" SUCURSAL : {bc['name']!r} email={bc['email']} alta={bc['dateAdded']}")
|
|
print(f" MARCA(col): {mc['name']!r} email={mc['email']} alta={mc['dateAdded']}")
|
|
print(f" similitud_nombre={r.get('match_name_similarity')} match_level={r.get('match_level')} same_email={r['same_email']}")
|
|
print(f" contactos en Marca con ese telefono: {r['brand_contacts_sharing_phone']} -> {[x['name'] for x in r['brand_contacts_sharing_phone_list']]}")
|
|
print(f" opps del colider en Marca: {r['brand_opps_count']} -> {[(o['name'], o['status'], o['value']) for o in r['brand_opps']]}")
|
|
print(f" >>> VEREDICTO: {r['verdict']}")
|
|
print()
|
|
|
|
out_path = os.path.join(ROOT_DIR, "generated", "reports", "phone_collisions_audit.json")
|
|
with open(out_path, "w", encoding="utf-8") as f:
|
|
json.dump(results, f, ensure_ascii=False, indent=2)
|
|
print(f"Reporte: {out_path}")
|
|
# Resumen
|
|
from collections import Counter
|
|
verdicts = Counter(r["verdict"] for r in results)
|
|
print(f"Resumen veredictos: {dict(verdicts)}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|