Files
MP-Manager/scripts/delete_intra_brand_duplicates.py
2026-05-30 14:31:19 -06:00

379 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Borra duplicados intra-Marca creados por el workflow n8n al reprocesar
contactos sin phone/email y, en su lugar, completa el campo
'ID Contacto Sucursal' en el contacto original.
Origen del problema: cuando se edito el workflow n8n el 28-may, los contactos
de sucursal sin phone ni email no matchearon contra el contacto Marca
existente (la cascada inicia por phone) y crearon una pareja contacto+opp
nueva en Marca. El contacto NUEVO trae el campo ID Contacto Sucursal
correctamente poblado, pero su opp es un cascaron ($0). El contacto ORIGINAL
tiene la opp real (valor monetario, posibles status won/lost), pero el campo
ID Contacto Sucursal vacio.
Estrategia (alineada con duplicate_resolution_rules):
1. Identifica los grupos de duplicados intra-Marca (audit_brand_vs_branches_totals).
2. Por cada par, trae ambos contactos en vivo (verifica que existan).
3. Decide cual conservar:
conservar = el que tiene la opp de mayor monetaryValue
(desempate: el que tenga status won/lost > open)
(desempate: el mas antiguo por dateAdded)
borrar = el otro
Si el "borrar" es el unico con ID Contacto Sucursal poblado, se copia
ese valor al "conservar" antes del DELETE.
4. DELETE del contacto a borrar (GHL hace cascade a sus opps).
5. PUT al contacto a conservar para fijar ID Contacto Sucursal.
Cada cambio queda en script_audit (PUT del CF es reversible; el DELETE NO,
GHL no permite undelete -- se guarda snapshot completo en migrations/).
Uso:
python scripts/delete_intra_brand_duplicates.py # dry-run
python scripts/delete_intra_brand_duplicates.py --apply --run-id <uuid>
python scripts/delete_intra_brand_duplicates.py --pair <new_id> # solo 1 par
"""
import argparse
import datetime
import json
import os
import sys
import uuid
import warnings
from collections import defaultdict
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
import sync_engine # noqa: E402
import script_audit # noqa: E402
from paths import MIGRATIONS_DIR # noqa: E402
import audit_brand_vs_branches_totals as audit # noqa: E402
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
CF_ID_CONTACTO_SUCURSAL = "E6lI9ykWhqpj7Pmi7Qd3" # field_id en Marca
CF_KEY_CONTACTO_SUCURSAL = "contact.id_contacto_sucursal"
# Pipelines con peso "real" para empate: status won/lost > open > pending
STATUS_WEIGHT = {"won": 3, "lost": 2, "abandoned": 2, "open": 1}
gc = sync_engine.ghl_client
def safe_print(*args, **kwargs):
text = " ".join(str(a) for a in args)
try:
sys.stdout.write(text + "\n")
sys.stdout.flush()
except UnicodeEncodeError:
enc = sys.stdout.encoding or "utf-8"
sys.stdout.write(text.encode(enc, errors="replace").decode(enc) + "\n")
sys.stdout.flush()
def fetch_contact_live(contact_id, token):
"""GET en vivo. Devuelve None si 404. Re-raise otros errores."""
try:
return gc._request("GET", f"/contacts/{contact_id}", token).get("contact")
except Exception as exc:
msg = str(exc)
if "404" in msg or "not found" in msg.lower():
return None
raise
def fetch_opps_live(contact_id, token, location_id):
try:
data = gc._request("GET", "/opportunities/search", token,
params={"location_id": location_id, "contact_id": contact_id})
return data.get("opportunities") or []
except Exception as exc:
return [{"_err": str(exc)}]
def cf_value(contact, field_id):
for cf in (contact or {}).get("customFields") or []:
if cf.get("id") == field_id or cf.get("fieldId") == field_id:
return cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString")
return None
def score_contact(c, opps):
"""Score mayor = mas razon para conservar."""
total_value = sum(float(o.get("monetaryValue") or 0) for o in opps if "_err" not in o)
status_score = max((STATUS_WEIGHT.get((o.get("status") or "").lower(), 0)
for o in opps if "_err" not in o), default=0)
# Mas antiguo gana en empate (negativo de timestamp para que mas chico=mas viejo gane)
ts = c.get("dateAdded") or ""
return (total_value, status_score, -_iso_to_ts(ts))
def _iso_to_ts(s):
if not s:
return 0
try:
return datetime.datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp()
except Exception:
return 0
def build_pair_plan(name, ids, token, *, run_log):
"""Recibe los 2 ids de un par. Devuelve dict con keep_id, delete_id, datos, accion."""
contacts = []
for cid in ids:
c = fetch_contact_live(cid, token)
if c is None:
run_log(f" [!] contacto {cid} ya no existe en GHL (404)")
continue
opps = fetch_opps_live(cid, token, BRAND_LOCATION_ID)
contacts.append({
"id": cid,
"contact": c,
"opps": opps,
"cf_value": cf_value(c, CF_ID_CONTACTO_SUCURSAL),
"score": None,
"total_value": sum(float(o.get("monetaryValue") or 0) for o in opps if "_err" not in o),
})
if len(contacts) < 2:
return {"status": "incomplete", "name": name,
"details": f"Solo {len(contacts)} de 2 contactos viven; nada que hacer."}
for c in contacts:
c["score"] = score_contact(c["contact"], c["opps"])
# Ordenamos descendente por score; el primero se conserva.
contacts.sort(key=lambda c: c["score"], reverse=True)
keep, delete = contacts[0], contacts[1]
# CF a aplicar al conservado (si esta vacio y el otro lo trae).
new_cf = keep["cf_value"] or delete["cf_value"]
needs_cf_update = (new_cf is not None) and (keep["cf_value"] != new_cf)
return {
"status": "ok",
"name": name,
"keep_id": keep["id"],
"delete_id": delete["id"],
"keep_dateAdded": keep["contact"].get("dateAdded"),
"delete_dateAdded": delete["contact"].get("dateAdded"),
"keep_total_value": keep["total_value"],
"delete_total_value": delete["total_value"],
"keep_cf_before": keep["cf_value"],
"delete_cf_before": delete["cf_value"],
"new_cf_value": new_cf,
"needs_cf_update": needs_cf_update,
"keep_opps": [{"id": o.get("id"), "name": o.get("name"), "monetaryValue": o.get("monetaryValue"), "status": o.get("status")} for o in keep["opps"] if "_err" not in o],
"delete_opps": [{"id": o.get("id"), "name": o.get("name"), "monetaryValue": o.get("monetaryValue"), "status": o.get("status")} for o in delete["opps"] if "_err" not in o],
}
def detect_pairs_from_audit(log):
"""Corre el audit (read-only) y devuelve grupos de duplicados intra-Marca."""
log("Detectando pares vía audit_brand_vs_branches_totals...")
data = audit.run_audit(limit_missing=None)
items = data["missing"]["intra_brand_duplicates"]["items"]
groups = defaultdict(list)
for it in items:
groups[it["name_norm"]].append(it)
pairs = []
for name_norm, members in groups.items():
if len(members) != 2:
log(f" [skip] grupo '{name_norm}' tiene {len(members)} miembros (solo procesamos pares)")
continue
pairs.append({
"name": members[0]["name"],
"name_norm": name_norm,
"ids": [m["id"] for m in members],
})
log(f" -> {len(pairs)} pares detectados.")
return pairs
def render_pair_plan(plan):
if plan["status"] != "ok":
safe_print(f" [SKIP] {plan['name']}: {plan.get('details')}")
return
safe_print(f"{plan['name']}")
safe_print(f" CONSERVAR id={plan['keep_id']} dateAdded={plan['keep_dateAdded']} total_opps=${plan['keep_total_value']:.0f}")
safe_print(f" opps: {plan['keep_opps']}")
safe_print(f" BORRAR id={plan['delete_id']} dateAdded={plan['delete_dateAdded']} total_opps=${plan['delete_total_value']:.0f}")
safe_print(f" opps: {plan['delete_opps']}")
if plan["needs_cf_update"]:
safe_print(f" ✎ se setea ID Contacto Sucursal='{plan['new_cf_value']}' en CONSERVAR (estaba='{plan['keep_cf_before']}')")
else:
safe_print(f" = CF no se toca (keep ya tiene '{plan['keep_cf_before']}' o ambos vacios)")
def apply_pair(plan, token, *, dry_run, run_id, log):
"""Aplica el plan de un par. Devuelve dict de stats."""
stats = {"cf_updated": False, "deleted": False, "errors": []}
if plan["status"] != "ok":
return stats
keep_id = plan["keep_id"]
delete_id = plan["delete_id"]
if dry_run:
return stats
# 1) Actualizar CF en conservar (si aplica)
if plan["needs_cf_update"]:
change_id_cf = None
if run_id:
change_id_cf = script_audit.record_change(
run_id, BRAND_LOCATION_ID, "contact",
keep_id, CF_ID_CONTACTO_SUCURSAL, "id_contacto_sucursal",
{"value": plan["keep_cf_before"]}, {"value": plan["new_cf_value"]})
try:
gc._request("PUT", f"/contacts/{keep_id}", token, json={
"customFields": [{"id": CF_ID_CONTACTO_SUCURSAL,
"key": CF_KEY_CONTACTO_SUCURSAL,
"field_value": plan["new_cf_value"]}]
})
if change_id_cf:
script_audit.mark_change(change_id_cf, "applied")
stats["cf_updated"] = True
log(f" ✓ CF actualizado en {keep_id}")
except Exception as exc:
if change_id_cf:
script_audit.mark_change(change_id_cf, "failed", str(exc))
stats["errors"].append({"step": "cf_update", "contact_id": keep_id, "error": str(exc)})
log(f" ✗ Error al actualizar CF en {keep_id}: {exc}")
return stats # no procedemos al delete si fallo el CF
# 2) DELETE del duplicado en Marca (cascade a opps)
change_id_del = None
if run_id:
# Snapshot completo del contacto a borrar para auditoria (DELETE no es reversible)
change_id_del = script_audit.record_change(
run_id, BRAND_LOCATION_ID, "contact",
delete_id, "_delete", "delete_contact_cascade",
{"contact_id": delete_id, "opps": plan["delete_opps"]},
{"deleted": True})
try:
gc.delete_contact(token, delete_id, BRAND_LOCATION_ID)
if change_id_del:
script_audit.mark_change(change_id_del, "applied")
stats["deleted"] = True
log(f" ✓ Contacto borrado en GHL: {delete_id}")
except Exception as exc:
if change_id_del:
script_audit.mark_change(change_id_del, "failed", str(exc))
stats["errors"].append({"step": "delete", "contact_id": delete_id, "error": str(exc)})
log(f" ✗ Error al borrar {delete_id}: {exc}")
return stats
def snapshot_run(plans, run_id, dry_run):
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
fname = f"delete_intra_brand_duplicates_{ts}.json"
path = os.path.join(MIGRATIONS_DIR, fname)
payload = {
"run_id": run_id,
"dry_run": dry_run,
"timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"plans": plans,
}
with open(path, "w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, indent=2, default=str)
return path
def run(apply=False, run_id=None, only_pair=None, log=None):
if log is None:
log = safe_print
accounts = sync_engine.parse_accounts_csv()
brand = next(a for a in accounts if a["location_id"] == BRAND_LOCATION_ID)
token = brand["token"]
pairs = detect_pairs_from_audit(log)
if only_pair:
pairs = [p for p in pairs if only_pair in p["ids"]]
log(f"Filtro --pair: {len(pairs)} pares.")
if not pairs:
log("Nada que procesar.")
return {"summary": {"pairs": 0}}
if run_id and apply:
script_audit.create_run(
run_id, "delete_intra_brand_duplicates.py",
arguments=f"pairs:{len(pairs)} apply",
locations=[BRAND_LOCATION_ID])
log(f"\nModo: {'APPLY (irreversible para el DELETE)' if apply else 'DRY-RUN'}")
log(f"Pares a procesar: {len(pairs)}\n")
plans = []
for p in pairs:
log(f"-- {p['name']} ({p['ids'][0][:8]}, {p['ids'][1][:8]})")
plan = build_pair_plan(p["name"], p["ids"], token, run_log=log)
render_pair_plan(plan)
plans.append(plan)
snap = snapshot_run(plans, run_id, dry_run=not apply)
log(f"\nSnapshot: {snap}")
summary = {"pairs": len(plans), "ok": 0, "skipped": 0,
"cf_updated": 0, "deleted": 0, "errors": 0}
if not apply:
summary["ok"] = sum(1 for p in plans if p["status"] == "ok")
summary["skipped"] = sum(1 for p in plans if p["status"] != "ok")
log(f"\nDRY-RUN finalizado. ok={summary['ok']} skipped={summary['skipped']}")
log("Para aplicar: --apply --run-id <uuid>")
return {"summary": summary, "plans": plans, "snapshot": snap}
log("\nAplicando cambios...")
for plan in plans:
if plan["status"] != "ok":
summary["skipped"] += 1
continue
log(f"-- {plan['name']}")
stats = apply_pair(plan, token, dry_run=False, run_id=run_id, log=log)
summary["ok"] += 1
summary["cf_updated"] += int(stats["cf_updated"])
summary["deleted"] += int(stats["deleted"])
summary["errors"] += len(stats["errors"])
if run_id:
script_audit.update_run_status(run_id,
"completed" if summary["errors"] == 0 else "failed",
f"errors={summary['errors']}" if summary["errors"] else None)
log(f"\nResumen: pairs={summary['pairs']} cf_updated={summary['cf_updated']} "
f"deleted={summary['deleted']} errors={summary['errors']}")
return {"summary": summary, "plans": plans, "snapshot": snap}
def main():
parser = argparse.ArgumentParser(
description="Borra duplicados intra-Marca creados por workflow n8n y rellena ID Contacto Sucursal.")
parser.add_argument("--apply", action="store_true",
help="Aplica los cambios. Sin este flag corre en DRY-RUN.")
parser.add_argument("--run-id", help="ID para script_audit. Si --apply y no se da, se genera uno.")
parser.add_argument("--pair", help="Filtra a un solo par; pasa el contact_id de cualquiera de los dos.")
args = parser.parse_args()
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
run_id = args.run_id
if args.apply and not run_id:
run_id = str(uuid.uuid4())
safe_print(f"[info] run_id autogenerado: {run_id}")
run(apply=args.apply, run_id=run_id, only_pair=args.pair, log=safe_print)
if __name__ == "__main__":
main()