Files
MP-Manager/scripts/audit_collision_submissions.py
2026-05-30 14:31:19 -06:00

200 lines
7.5 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""audit_collision_submissions.py
Profundiza la auditoria de los 4 casos de colision de telefono cruzando con
los FORM SUBMISSIONS (read-only). Como solo la cuenta de Marca tiene el
formulario web, mapeamos:
- Si el contacto (sucursal o Marca) tiene un submission en Marca.
- Que SUCURSAL eligio el lead en el formulario (campo sucursal_value).
- El `source` de cada contacto (de donde vino: formulario, manual, n8n...).
- Submissions que comparten el telefono del caso.
Todo en paralelo. No escribe nada.
"""
import json
import os
import sys
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
import requests
import sync_engine
from common import normalize_phone
BASE_URL = "https://services.leadconnectorhq.com"
BRAND_LOC = "GbKkBpCmKu2QmloKFHy3"
_TOKENS_MAP = None
def _load_token(loc):
"""Resuelve el token PIT de una location desde el CSV de la mesa de control.
Los tokens son secretos: nunca se hardcodean ni se persisten. Se cargan
igual que el resto del repo, vía `sync_engine.get_tokens_map()` (que lee el
CSV `Bucéfalo - Mesa de control - API Tokens - MP.csv`). El mapa se cachea
en memoria para no releer el CSV en cada llamada.
"""
global _TOKENS_MAP
if _TOKENS_MAP is None:
_TOKENS_MAP = sync_engine.get_tokens_map()
token = _TOKENS_MAP.get(loc)
if not token:
raise RuntimeError(f"sin token {loc}")
return token
def _h(token):
return {"Authorization": f"Bearer {token}", "Version": "2021-07-28", "Accept": "application/json"}
def fetch_all_submissions(loc):
token = _load_token(loc)
out, page = [], 1
while True:
r = requests.get(f"{BASE_URL}/forms/submissions", headers=_h(token),
params={"locationId": loc, "limit": 100, "page": page}, timeout=30)
if r.status_code != 200:
break
d = r.json()
subs = d.get("submissions", []) or []
out.extend(subs)
meta = d.get("meta", {})
if not subs or not meta.get("nextPage"):
break
page += 1
if page > 50:
break
return out
def get_contact(loc, cid):
token = _load_token(loc)
r = requests.get(f"{BASE_URL}/contacts/{cid}", headers=_h(token), timeout=30)
if r.status_code != 200:
return {}
return r.json().get("contact", {})
def sucursal_from_submission(sub):
"""El campo de sucursal en `others` no tiene key fija; buscamos heuristicamente
un valor que parezca 'Ciudad, Estado'. Tambien probamos claves comunes."""
others = sub.get("others", {}) or {}
# valores tipicos: listas o strings con ', '
candidates = []
for k, v in others.items():
val = v[0] if isinstance(v, list) and v else v
if isinstance(val, str) and "," in val and len(val) < 60:
candidates.append(val)
return candidates[0] if candidates else None
def main():
clean = os.path.join(ROOT_DIR, "generated", "reports", "missing_opps_clean.json")
data = json.load(open(clean, encoding="utf-8"))
cases = []
for it in data["items"]:
coll = next((a for a in it.get("actions", []) if a.get("action") == "phone_collision_unresolved"), None)
if not coll:
continue
bc = it.get("branch_contact", {})
cases.append({
"label": (it.get("opp_name") or "")[:18],
"branch_loc": it.get("branch_location_id"),
"branch_cid": bc.get("id"),
"brand_cid": coll.get("colliding_brand_contact_id"),
"phone": bc.get("phone"),
})
# En paralelo: submissions de Marca + detalle de los 8 contactos
print("Trayendo submissions de Marca + contactos (en paralelo)...\n")
with ThreadPoolExecutor(max_workers=10) as ex:
f_subs = ex.submit(fetch_all_submissions, BRAND_LOC)
contact_futs = {}
for c in cases:
contact_futs[("branch", c["label"])] = ex.submit(get_contact, c["branch_loc"], c["branch_cid"])
contact_futs[("brand", c["label"])] = ex.submit(get_contact, BRAND_LOC, c["brand_cid"])
brand_subs = f_subs.result()
contacts = {k: f.result() for k, f in contact_futs.items()}
# Indexar submissions por contactId y por phone
sub_by_cid = defaultdict(list)
sub_by_phone = defaultdict(list)
for s in brand_subs:
if s.get("contactId"):
sub_by_cid[s["contactId"]].append(s)
np = normalize_phone(s.get("phone"))
if np:
sub_by_phone[np].append(s)
print(f"Submissions de Marca cargados: {len(brand_subs)}\n")
report = []
for c in cases:
np = normalize_phone(c["phone"])
bc = contacts[("branch", c["label"])]
mc = contacts[("brand", c["label"])]
entry = {
"label": c["label"], "phone": c["phone"],
"branch": {
"id": c["branch_cid"], "loc": c["branch_loc"],
"name": f"{bc.get('firstName') or ''} {bc.get('lastName') or ''}".strip(),
"source": bc.get("source"), "tags": bc.get("tags"),
"has_submission_in_marca": bool(sub_by_cid.get(c["branch_cid"])),
},
"brand_collider": {
"id": c["brand_cid"],
"name": f"{mc.get('firstName') or ''} {mc.get('lastName') or ''}".strip(),
"source": mc.get("source"), "tags": mc.get("tags"),
"submissions": [],
},
"submissions_sharing_phone": [],
}
for s in sub_by_cid.get(c["brand_cid"], []):
entry["brand_collider"]["submissions"].append({
"name": s.get("name"), "formId": s.get("formId"),
"sucursal_elegida": sucursal_from_submission(s),
"createdAt": s.get("createdAt"), "email": s.get("email"), "phone": s.get("phone"),
})
for s in sub_by_phone.get(np, []):
entry["submissions_sharing_phone"].append({
"contactId": s.get("contactId"), "name": s.get("name"),
"sucursal_elegida": sucursal_from_submission(s),
"createdAt": s.get("createdAt"),
})
report.append(entry)
for e in report:
print("=" * 80)
print(f"CASO {e['label']!r} tel {e['phone']}")
b = e["branch"]; m = e["brand_collider"]
print(f" SUCURSAL : {b['name']!r} source={b['source']!r} submission_en_marca={b['has_submission_in_marca']}")
print(f" MARCA(col): {m['name']!r} source={m['source']!r}")
if m["submissions"]:
for s in m["submissions"]:
print(f" submission Marca: name={s['name']!r} sucursal_elegida={s['sucursal_elegida']!r} created={s['createdAt']}")
else:
print(f" (sin submission en Marca para el colider)")
print(f" submissions que comparten el telefono ({len(e['submissions_sharing_phone'])}):")
for s in e["submissions_sharing_phone"]:
print(f" cid={s['contactId']} name={s['name']!r} sucursal_elegida={s['sucursal_elegida']!r} created={s['createdAt']}")
print()
out = os.path.join(ROOT_DIR, "generated", "reports", "collision_submissions_audit.json")
with open(out, "w", encoding="utf-8") as f:
json.dump(report, f, ensure_ascii=False, indent=2)
print(f"Reporte: {out}")
if __name__ == "__main__":
main()