Files
MP-Manager/scripts/backfill_brand_contact_id_sucursal.py
2026-05-30 14:31:19 -06:00

405 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Backfill del campo 'ID Contacto Sucursal' (contact.id_contacto_sucursal)
en los contactos de Marca que aun lo tienen vacio.
Se ejecuta despues de haber poblado el mismo campo en todas las sucursales con
`fill_contact_id_sucursal.py`. Para cada contacto Marca sin el campo, busca su
contraparte en sucursal usando la heuristica clasica (phone + email + name) y,
si encuentra match unico, pone el id del contacto sucursal en el CF de Marca.
Solo escribe en Marca. Las sucursales no se tocan.
Estrategia de match (`scripts.common.match_contacts`):
- 'strong' : phone, email y name (>=0.80 similitud) coinciden.
- 'medium' : phone y name coinciden (email puede faltar).
- cualquier otro se descarta.
Cuando varios contactos de sucursal matchean al mismo contacto Marca (caso
cross-branch duplicates) el desempate elige el de `dateAdded` mas antiguo.
Modos:
- dry-run (default): no escribe nada en GHL.
- --apply --run-id <uuid>: aplica, registra cada cambio en script_audit.
Uso:
python scripts/backfill_brand_contact_id_sucursal.py
python scripts/backfill_brand_contact_id_sucursal.py --apply --run-id <uuid>
python scripts/backfill_brand_contact_id_sucursal.py --only-contact <brand_id>
python scripts/backfill_brand_contact_id_sucursal.py --export-unmatched
"""
import argparse
import csv
import datetime
import json
import os
import sqlite3
import sys
import uuid
import warnings
from collections import defaultdict
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
import sync_engine # noqa: E402
import script_audit # noqa: E402
from paths import DB_PATH, MIGRATIONS_DIR, EXPORTS_DIR # noqa: E402
from common import match_contacts, normalize_phone, normalize_email # noqa: E402
from audit_brand_vs_branches_totals import ( # noqa: E402
resolve_contact_link_field_id,
extract_contact_link_value,
)
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
DEMO_LOCATION_IDS = {"Vf7qQl3L9vakJ8hDtQ8e", "Z64WQKORPVwXb5mn68Ef"}
CF_KEY = "contact.id_contacto_sucursal"
gc = sync_engine.ghl_client
def safe_print(*args, **kwargs):
text = " ".join(str(a) for a in args)
try:
sys.stdout.write(text + "\n")
sys.stdout.flush()
except UnicodeEncodeError:
enc = sys.stdout.encoding or "utf-8"
sys.stdout.write(text.encode(enc, errors="replace").decode(enc) + "\n")
sys.stdout.flush()
def row_to_contact(row):
"""SQLite row -> dict con shape compatible con match_contacts."""
return {
"id": row["id"],
"location_id": row["location_id"],
"phone": row["phone"],
"email": row["email"],
"firstName": row["first_name"],
"lastName": row["last_name"],
"dateAdded": row["date_added"],
}
def load_brand_unfilled(conn, brand_field_id):
"""Contactos Marca con CF id_contacto_sucursal vacio."""
rows = conn.execute(
"SELECT id, location_id, first_name, last_name, email, phone, date_added, custom_fields_json "
"FROM contacts WHERE location_id=?",
(BRAND_LOCATION_ID,),
).fetchall()
out = []
for r in rows:
val = extract_contact_link_value(r["custom_fields_json"], brand_field_id)
if not val:
out.append(r)
return out
def load_branch_contacts(conn):
"""Todos los contactos de sucursales (sin demos)."""
placeholders = ",".join("?" for _ in DEMO_LOCATION_IDS)
rows = conn.execute(
f"SELECT id, location_id, first_name, last_name, email, phone, date_added "
f"FROM contacts WHERE location_id != ? AND location_id NOT IN ({placeholders})",
(BRAND_LOCATION_ID, *DEMO_LOCATION_IDS),
).fetchall()
return rows
def index_branches(rows):
"""Indices por phone normalizado y por email normalizado para evitar O(n*m)."""
by_phone = defaultdict(list)
by_email = defaultdict(list)
for r in rows:
c = row_to_contact(r)
ph = normalize_phone(c.get("phone") or "")
em = normalize_email(c.get("email") or "")
if ph:
by_phone[ph].append(c)
if em:
by_email[em].append(c)
return by_phone, by_email
def pick_oldest(candidates):
"""Tie-break por dateAdded mas antiguo (sucursal con mas historial)."""
def ts(c):
d = c.get("dateAdded") or ""
try:
return datetime.datetime.fromisoformat(d.replace("Z", "+00:00")).timestamp()
except Exception:
return float("inf")
return min(candidates, key=ts)
def plan_match(brand_row, by_phone, by_email):
"""Decide el match para un contacto Marca.
Devuelve dict con status y datos. Status:
- 'match_unique' : 1 sucursal candidata o varias que coinciden en id.
- 'match_multi' : varias sucursales con ids distintos. Se elige la mas antigua.
- 'no_data' : el contacto Marca no tiene phone ni email -> no se puede matchear.
- 'phone_collision' : phone coincide pero name no >=0.80 con ninguno (riesgo de pareja).
- 'no_match' : sin candidatos validos (probable Marca-only legitimo).
"""
brand_c = row_to_contact(brand_row)
ph = normalize_phone(brand_c.get("phone") or "")
em = normalize_email(brand_c.get("email") or "")
if not ph and not em:
return {"status": "no_data", "candidates": []}
candidates_raw = []
if ph:
candidates_raw.extend(by_phone.get(ph, []))
if em:
for c in by_email.get(em, []):
if c not in candidates_raw:
candidates_raw.append(c)
if not candidates_raw:
return {"status": "no_match", "candidates": []}
matches = []
phone_collisions = []
for cand in candidates_raw:
r = match_contacts(brand_c, cand)
if r["level"] in ("strong", "medium"):
matches.append((cand, r))
elif "phone_collision_unresolved" in r["reasons"]:
phone_collisions.append((cand, r))
if not matches:
if phone_collisions:
return {"status": "phone_collision", "candidates": [c for c, _ in phone_collisions]}
return {"status": "no_match", "candidates": []}
ids = {c["id"] for c, _ in matches}
if len(ids) == 1:
chosen = matches[0][0]
return {"status": "match_unique",
"chosen": chosen,
"level": matches[0][1]["level"],
"all_matches": matches}
# Multi-match: tie-break por dateAdded mas antiguo.
chosen = pick_oldest([c for c, _ in matches])
return {"status": "match_multi",
"chosen": chosen,
"all_matches": matches}
def render_summary(plans, log):
counts = defaultdict(int)
for p in plans:
counts[p["plan"]["status"]] += 1
log("\n=== Resumen del plan ===")
log(f" match_unique : {counts['match_unique']}")
log(f" match_multi : {counts['match_multi']}")
log(f" phone_collision : {counts['phone_collision']}")
log(f" no_match : {counts['no_match']}")
log(f" no_data : {counts['no_data']}")
def render_examples(plans, log, n=6):
by_status = defaultdict(list)
for p in plans:
by_status[p["plan"]["status"]].append(p)
for status in ("match_unique", "match_multi", "phone_collision", "no_match", "no_data"):
items = by_status.get(status, [])
if not items:
continue
log(f"\n [{status}] ejemplos:")
for item in items[:n]:
brand = item["brand"]
name = f"{brand['first_name'] or ''} {brand['last_name'] or ''}".strip() or "(sin nombre)"
line = f" - {name!r:40} brand={brand['id']} phone={brand['phone']!r} email={brand['email']!r}"
if status in ("match_unique", "match_multi"):
chosen = item["plan"]["chosen"]
line += f" -> sucursal={chosen['id']} ({chosen['location_id']})"
log(line)
if len(items) > n:
log(f" ... y {len(items)-n} más")
def export_csv(plans, status_filter, filename, log):
path = os.path.join(EXPORTS_DIR, filename)
os.makedirs(EXPORTS_DIR, exist_ok=True)
with open(path, "w", encoding="utf-8", newline="") as fh:
w = csv.writer(fh)
w.writerow(["status", "brand_id", "brand_name", "brand_phone", "brand_email",
"branch_id", "branch_location_id", "branch_name", "branch_date_added"])
for p in plans:
if p["plan"]["status"] not in status_filter:
continue
b = p["brand"]
name = f"{b['first_name'] or ''} {b['last_name'] or ''}".strip()
chosen = p["plan"].get("chosen") or {}
cname = f"{chosen.get('firstName') or ''} {chosen.get('lastName') or ''}".strip()
w.writerow([p["plan"]["status"], b["id"], name, b["phone"] or "", b["email"] or "",
chosen.get("id", ""), chosen.get("location_id", ""), cname,
chosen.get("dateAdded", "")])
log(f" Export CSV: {path}")
return path
def apply_match(plan, brand_field_id, brand_token, run_id, log):
"""Aplica el match: PUT al contacto Marca con el CF poblado."""
p = plan["plan"]
brand = plan["brand"]
if p["status"] not in ("match_unique", "match_multi"):
return {"applied": False, "reason": p["status"]}
chosen_id = p["chosen"]["id"]
change_id = None
if run_id:
change_id = script_audit.record_change(
run_id, BRAND_LOCATION_ID, "contact",
brand["id"], brand_field_id, "id_contacto_sucursal",
{"value": None}, {"value": chosen_id})
try:
gc._request("PUT", f"/contacts/{brand['id']}", brand_token, json={
"customFields": [{"id": brand_field_id, "key": CF_KEY, "field_value": chosen_id}]
})
if change_id:
script_audit.mark_change(change_id, "applied")
return {"applied": True}
except Exception as exc:
if change_id:
script_audit.mark_change(change_id, "failed", str(exc))
log(f" ✗ Error en {brand['id']}: {exc}")
return {"applied": False, "error": str(exc)}
def snapshot_run(plans, run_id, dry_run):
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
path = os.path.join(MIGRATIONS_DIR, f"backfill_brand_contact_id_sucursal_{ts}.json")
serial = []
for item in plans:
b = dict(item["brand"])
b.pop("custom_fields_json", None) # ruido
p = dict(item["plan"])
if "all_matches" in p:
p["all_matches"] = [{"id": c["id"], "location_id": c["location_id"]} for c, _ in p["all_matches"]]
if "candidates" in p:
p["candidates"] = [{"id": c["id"], "location_id": c["location_id"]} for c in p["candidates"]]
if "chosen" in p:
p["chosen"] = {k: v for k, v in p["chosen"].items() if k in ("id", "location_id", "phone", "email", "firstName", "lastName", "dateAdded")}
serial.append({"brand": b, "plan": p})
with open(path, "w", encoding="utf-8") as fh:
json.dump({"run_id": run_id, "dry_run": dry_run,
"timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"count": len(plans), "plans": serial},
fh, ensure_ascii=False, indent=2, default=str)
return path
def run(apply=False, run_id=None, only_contact=None, export_unmatched=False, log=None):
if log is None:
log = safe_print
accounts = sync_engine.parse_accounts_csv()
brand = next(a for a in accounts if a["location_id"] == BRAND_LOCATION_ID)
brand_token = brand["token"]
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
brand_field_id = resolve_contact_link_field_id(conn, BRAND_LOCATION_ID)
if not brand_field_id:
raise SystemExit("No se encontro el field_id de ID Contacto Sucursal en Marca. Resync schemas o crea el campo.")
log(f"CF id_contacto_sucursal en Marca: field_id={brand_field_id}")
unfilled = load_brand_unfilled(conn, brand_field_id)
if only_contact:
unfilled = [r for r in unfilled if r["id"] == only_contact]
log(f"Contactos Marca con CF vacio: {len(unfilled)}")
branch_rows = load_branch_contacts(conn)
log(f"Contactos sucursales indexados: {len(branch_rows)}")
by_phone, by_email = index_branches(branch_rows)
plans = []
for r in unfilled:
plan = plan_match(r, by_phone, by_email)
plans.append({"brand": dict(r), "plan": plan})
render_summary(plans, log)
render_examples(plans, log)
snap = snapshot_run(plans, run_id, dry_run=not apply)
log(f"\nSnapshot: {snap}")
if export_unmatched:
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
export_csv(plans, {"no_match", "no_data", "phone_collision", "match_multi"},
f"brand_unmatched_{ts}.csv", log)
if not apply:
log("\nDRY-RUN. Para aplicar: --apply --run-id <uuid>")
return {"plans": plans, "snapshot": snap}
matchables = [p for p in plans if p["plan"]["status"] in ("match_unique", "match_multi")]
log(f"\nAplicando {len(matchables)} matches en Marca...")
if run_id:
script_audit.create_run(
run_id, "backfill_brand_contact_id_sucursal.py",
arguments=f"matches:{len(matchables)} apply",
locations=[BRAND_LOCATION_ID])
stats = {"applied": 0, "errors": 0, "skipped": 0}
for item in plans:
if item["plan"]["status"] not in ("match_unique", "match_multi"):
stats["skipped"] += 1
continue
if run_id and not script_audit.wait_if_paused_or_stopped(run_id):
log("Detencion solicitada. Saliendo.")
break
r = apply_match(item, brand_field_id, brand_token, run_id, log)
if r["applied"]:
stats["applied"] += 1
else:
stats["errors"] += 1
if run_id:
script_audit.update_run_status(
run_id,
"completed" if stats["errors"] == 0 else "failed",
f"errors={stats['errors']}" if stats["errors"] else None)
log(f"\nResumen: applied={stats['applied']} errors={stats['errors']} skipped={stats['skipped']}")
return {"plans": plans, "snapshot": snap, "stats": stats}
def main():
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
parser.add_argument("--apply", action="store_true", help="Aplica. Sin esto: dry-run.")
parser.add_argument("--run-id", help="ID para script_audit.")
parser.add_argument("--only-contact", help="Filtrar a un solo contact_id Marca.")
parser.add_argument("--export-unmatched", action="store_true",
help="Exportar CSV con los que no se pudieron resolver.")
args = parser.parse_args()
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
run_id = args.run_id
if args.apply and not run_id:
run_id = str(uuid.uuid4())
safe_print(f"[info] run_id autogenerado: {run_id}")
run(apply=args.apply, run_id=run_id, only_contact=args.only_contact,
export_unmatched=args.export_unmatched, log=safe_print)
if __name__ == "__main__":
main()