#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Borra duplicados intra-Marca creados por el workflow n8n al reprocesar contactos sin phone/email y, en su lugar, completa el campo 'ID Contacto Sucursal' en el contacto original. Origen del problema: cuando se edito el workflow n8n el 28-may, los contactos de sucursal sin phone ni email no matchearon contra el contacto Marca existente (la cascada inicia por phone) y crearon una pareja contacto+opp nueva en Marca. El contacto NUEVO trae el campo ID Contacto Sucursal correctamente poblado, pero su opp es un cascaron ($0). El contacto ORIGINAL tiene la opp real (valor monetario, posibles status won/lost), pero el campo ID Contacto Sucursal vacio. Estrategia (alineada con duplicate_resolution_rules): 1. Identifica los grupos de duplicados intra-Marca (audit_brand_vs_branches_totals). 2. Por cada par, trae ambos contactos en vivo (verifica que existan). 3. Decide cual conservar: conservar = el que tiene la opp de mayor monetaryValue (desempate: el que tenga status won/lost > open) (desempate: el mas antiguo por dateAdded) borrar = el otro Si el "borrar" es el unico con ID Contacto Sucursal poblado, se copia ese valor al "conservar" antes del DELETE. 4. DELETE del contacto a borrar (GHL hace cascade a sus opps). 5. PUT al contacto a conservar para fijar ID Contacto Sucursal. Cada cambio queda en script_audit (PUT del CF es reversible; el DELETE NO, GHL no permite undelete -- se guarda snapshot completo en migrations/). Uso: python scripts/delete_intra_brand_duplicates.py # dry-run python scripts/delete_intra_brand_duplicates.py --apply --run-id python scripts/delete_intra_brand_duplicates.py --pair # solo 1 par """ import argparse import datetime import json import os import sys import uuid import warnings from collections import defaultdict warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!") ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) if SCRIPTS_DIR not in sys.path: sys.path.insert(0, SCRIPTS_DIR) import sync_engine # noqa: E402 import script_audit # noqa: E402 from paths import MIGRATIONS_DIR # noqa: E402 import audit_brand_vs_branches_totals as audit # noqa: E402 BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3" CF_ID_CONTACTO_SUCURSAL = "E6lI9ykWhqpj7Pmi7Qd3" # field_id en Marca CF_KEY_CONTACTO_SUCURSAL = "contact.id_contacto_sucursal" # Pipelines con peso "real" para empate: status won/lost > open > pending STATUS_WEIGHT = {"won": 3, "lost": 2, "abandoned": 2, "open": 1} gc = sync_engine.ghl_client def safe_print(*args, **kwargs): text = " ".join(str(a) for a in args) try: sys.stdout.write(text + "\n") sys.stdout.flush() except UnicodeEncodeError: enc = sys.stdout.encoding or "utf-8" sys.stdout.write(text.encode(enc, errors="replace").decode(enc) + "\n") sys.stdout.flush() def fetch_contact_live(contact_id, token): """GET en vivo. Devuelve None si 404. Re-raise otros errores.""" try: return gc._request("GET", f"/contacts/{contact_id}", token).get("contact") except Exception as exc: msg = str(exc) if "404" in msg or "not found" in msg.lower(): return None raise def fetch_opps_live(contact_id, token, location_id): try: data = gc._request("GET", "/opportunities/search", token, params={"location_id": location_id, "contact_id": contact_id}) return data.get("opportunities") or [] except Exception as exc: return [{"_err": str(exc)}] def cf_value(contact, field_id): for cf in (contact or {}).get("customFields") or []: if cf.get("id") == field_id or cf.get("fieldId") == field_id: return cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString") return None def score_contact(c, opps): """Score mayor = mas razon para conservar.""" total_value = sum(float(o.get("monetaryValue") or 0) for o in opps if "_err" not in o) status_score = max((STATUS_WEIGHT.get((o.get("status") or "").lower(), 0) for o in opps if "_err" not in o), default=0) # Mas antiguo gana en empate (negativo de timestamp para que mas chico=mas viejo gane) ts = c.get("dateAdded") or "" return (total_value, status_score, -_iso_to_ts(ts)) def _iso_to_ts(s): if not s: return 0 try: return datetime.datetime.fromisoformat(s.replace("Z", "+00:00")).timestamp() except Exception: return 0 def build_pair_plan(name, ids, token, *, run_log): """Recibe los 2 ids de un par. Devuelve dict con keep_id, delete_id, datos, accion.""" contacts = [] for cid in ids: c = fetch_contact_live(cid, token) if c is None: run_log(f" [!] contacto {cid} ya no existe en GHL (404)") continue opps = fetch_opps_live(cid, token, BRAND_LOCATION_ID) contacts.append({ "id": cid, "contact": c, "opps": opps, "cf_value": cf_value(c, CF_ID_CONTACTO_SUCURSAL), "score": None, "total_value": sum(float(o.get("monetaryValue") or 0) for o in opps if "_err" not in o), }) if len(contacts) < 2: return {"status": "incomplete", "name": name, "details": f"Solo {len(contacts)} de 2 contactos viven; nada que hacer."} for c in contacts: c["score"] = score_contact(c["contact"], c["opps"]) # Ordenamos descendente por score; el primero se conserva. contacts.sort(key=lambda c: c["score"], reverse=True) keep, delete = contacts[0], contacts[1] # CF a aplicar al conservado (si esta vacio y el otro lo trae). new_cf = keep["cf_value"] or delete["cf_value"] needs_cf_update = (new_cf is not None) and (keep["cf_value"] != new_cf) return { "status": "ok", "name": name, "keep_id": keep["id"], "delete_id": delete["id"], "keep_dateAdded": keep["contact"].get("dateAdded"), "delete_dateAdded": delete["contact"].get("dateAdded"), "keep_total_value": keep["total_value"], "delete_total_value": delete["total_value"], "keep_cf_before": keep["cf_value"], "delete_cf_before": delete["cf_value"], "new_cf_value": new_cf, "needs_cf_update": needs_cf_update, "keep_opps": [{"id": o.get("id"), "name": o.get("name"), "monetaryValue": o.get("monetaryValue"), "status": o.get("status")} for o in keep["opps"] if "_err" not in o], "delete_opps": [{"id": o.get("id"), "name": o.get("name"), "monetaryValue": o.get("monetaryValue"), "status": o.get("status")} for o in delete["opps"] if "_err" not in o], } def detect_pairs_from_audit(log): """Corre el audit (read-only) y devuelve grupos de duplicados intra-Marca.""" log("Detectando pares vía audit_brand_vs_branches_totals...") data = audit.run_audit(limit_missing=None) items = data["missing"]["intra_brand_duplicates"]["items"] groups = defaultdict(list) for it in items: groups[it["name_norm"]].append(it) pairs = [] for name_norm, members in groups.items(): if len(members) != 2: log(f" [skip] grupo '{name_norm}' tiene {len(members)} miembros (solo procesamos pares)") continue pairs.append({ "name": members[0]["name"], "name_norm": name_norm, "ids": [m["id"] for m in members], }) log(f" -> {len(pairs)} pares detectados.") return pairs def render_pair_plan(plan): if plan["status"] != "ok": safe_print(f" [SKIP] {plan['name']}: {plan.get('details')}") return safe_print(f" ✓ {plan['name']}") safe_print(f" CONSERVAR id={plan['keep_id']} dateAdded={plan['keep_dateAdded']} total_opps=${plan['keep_total_value']:.0f}") safe_print(f" opps: {plan['keep_opps']}") safe_print(f" BORRAR id={plan['delete_id']} dateAdded={plan['delete_dateAdded']} total_opps=${plan['delete_total_value']:.0f}") safe_print(f" opps: {plan['delete_opps']}") if plan["needs_cf_update"]: safe_print(f" ✎ se setea ID Contacto Sucursal='{plan['new_cf_value']}' en CONSERVAR (estaba='{plan['keep_cf_before']}')") else: safe_print(f" = CF no se toca (keep ya tiene '{plan['keep_cf_before']}' o ambos vacios)") def apply_pair(plan, token, *, dry_run, run_id, log): """Aplica el plan de un par. Devuelve dict de stats.""" stats = {"cf_updated": False, "deleted": False, "errors": []} if plan["status"] != "ok": return stats keep_id = plan["keep_id"] delete_id = plan["delete_id"] if dry_run: return stats # 1) Actualizar CF en conservar (si aplica) if plan["needs_cf_update"]: change_id_cf = None if run_id: change_id_cf = script_audit.record_change( run_id, BRAND_LOCATION_ID, "contact", keep_id, CF_ID_CONTACTO_SUCURSAL, "id_contacto_sucursal", {"value": plan["keep_cf_before"]}, {"value": plan["new_cf_value"]}) try: gc._request("PUT", f"/contacts/{keep_id}", token, json={ "customFields": [{"id": CF_ID_CONTACTO_SUCURSAL, "key": CF_KEY_CONTACTO_SUCURSAL, "field_value": plan["new_cf_value"]}] }) if change_id_cf: script_audit.mark_change(change_id_cf, "applied") stats["cf_updated"] = True log(f" ✓ CF actualizado en {keep_id}") except Exception as exc: if change_id_cf: script_audit.mark_change(change_id_cf, "failed", str(exc)) stats["errors"].append({"step": "cf_update", "contact_id": keep_id, "error": str(exc)}) log(f" ✗ Error al actualizar CF en {keep_id}: {exc}") return stats # no procedemos al delete si fallo el CF # 2) DELETE del duplicado en Marca (cascade a opps) change_id_del = None if run_id: # Snapshot completo del contacto a borrar para auditoria (DELETE no es reversible) change_id_del = script_audit.record_change( run_id, BRAND_LOCATION_ID, "contact", delete_id, "_delete", "delete_contact_cascade", {"contact_id": delete_id, "opps": plan["delete_opps"]}, {"deleted": True}) try: gc.delete_contact(token, delete_id, BRAND_LOCATION_ID) if change_id_del: script_audit.mark_change(change_id_del, "applied") stats["deleted"] = True log(f" ✓ Contacto borrado en GHL: {delete_id}") except Exception as exc: if change_id_del: script_audit.mark_change(change_id_del, "failed", str(exc)) stats["errors"].append({"step": "delete", "contact_id": delete_id, "error": str(exc)}) log(f" ✗ Error al borrar {delete_id}: {exc}") return stats def snapshot_run(plans, run_id, dry_run): os.makedirs(MIGRATIONS_DIR, exist_ok=True) ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") fname = f"delete_intra_brand_duplicates_{ts}.json" path = os.path.join(MIGRATIONS_DIR, fname) payload = { "run_id": run_id, "dry_run": dry_run, "timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(), "plans": plans, } with open(path, "w", encoding="utf-8") as fh: json.dump(payload, fh, ensure_ascii=False, indent=2, default=str) return path def run(apply=False, run_id=None, only_pair=None, log=None): if log is None: log = safe_print accounts = sync_engine.parse_accounts_csv() brand = next(a for a in accounts if a["location_id"] == BRAND_LOCATION_ID) token = brand["token"] pairs = detect_pairs_from_audit(log) if only_pair: pairs = [p for p in pairs if only_pair in p["ids"]] log(f"Filtro --pair: {len(pairs)} pares.") if not pairs: log("Nada que procesar.") return {"summary": {"pairs": 0}} if run_id and apply: script_audit.create_run( run_id, "delete_intra_brand_duplicates.py", arguments=f"pairs:{len(pairs)} apply", locations=[BRAND_LOCATION_ID]) log(f"\nModo: {'APPLY (irreversible para el DELETE)' if apply else 'DRY-RUN'}") log(f"Pares a procesar: {len(pairs)}\n") plans = [] for p in pairs: log(f"-- {p['name']} ({p['ids'][0][:8]}, {p['ids'][1][:8]})") plan = build_pair_plan(p["name"], p["ids"], token, run_log=log) render_pair_plan(plan) plans.append(plan) snap = snapshot_run(plans, run_id, dry_run=not apply) log(f"\nSnapshot: {snap}") summary = {"pairs": len(plans), "ok": 0, "skipped": 0, "cf_updated": 0, "deleted": 0, "errors": 0} if not apply: summary["ok"] = sum(1 for p in plans if p["status"] == "ok") summary["skipped"] = sum(1 for p in plans if p["status"] != "ok") log(f"\nDRY-RUN finalizado. ok={summary['ok']} skipped={summary['skipped']}") log("Para aplicar: --apply --run-id ") return {"summary": summary, "plans": plans, "snapshot": snap} log("\nAplicando cambios...") for plan in plans: if plan["status"] != "ok": summary["skipped"] += 1 continue log(f"-- {plan['name']}") stats = apply_pair(plan, token, dry_run=False, run_id=run_id, log=log) summary["ok"] += 1 summary["cf_updated"] += int(stats["cf_updated"]) summary["deleted"] += int(stats["deleted"]) summary["errors"] += len(stats["errors"]) if run_id: script_audit.update_run_status(run_id, "completed" if summary["errors"] == 0 else "failed", f"errors={summary['errors']}" if summary["errors"] else None) log(f"\nResumen: pairs={summary['pairs']} cf_updated={summary['cf_updated']} " f"deleted={summary['deleted']} errors={summary['errors']}") return {"summary": summary, "plans": plans, "snapshot": snap} def main(): parser = argparse.ArgumentParser( description="Borra duplicados intra-Marca creados por workflow n8n y rellena ID Contacto Sucursal.") parser.add_argument("--apply", action="store_true", help="Aplica los cambios. Sin este flag corre en DRY-RUN.") parser.add_argument("--run-id", help="ID para script_audit. Si --apply y no se da, se genera uno.") parser.add_argument("--pair", help="Filtra a un solo par; pasa el contact_id de cualquiera de los dos.") args = parser.parse_args() if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") run_id = args.run_id if args.apply and not run_id: run_id = str(uuid.uuid4()) safe_print(f"[info] run_id autogenerado: {run_id}") run(apply=args.apply, run_id=run_id, only_pair=args.pair, log=safe_print) if __name__ == "__main__": main()