Files
MP-Manager/scripts/cleanup_brand_duplicate_replica_opps.py
2026-05-30 14:31:19 -06:00

334 lines
13 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Elimina REPLICAS DUPLICADAS de oportunidades en la cuenta de Marca.
Origen del problema: cuando el workflow n8n de sincronizacion de oportunidades
("Sincronizar Oportunidad - Nodos Nuevos", id Cfgwp0bOtDW8zuKW) decide
UPDATE-vs-CREATE, en algunas ejecuciones NO encuentra la opp ya existente en
Marca (latencia de indexado del search por custom field / carrera) y hace un
CREATE, dejando DOS opps de Marca que apuntan a la MISMA opp de sucursal de
origen (mismo valor de "ID Oportunidad Sucursal"). Es la causa tipica del
descuadre POSITIVO (Marca > sucursales). El bucket de huerfanas no lo ve porque
trata ese campo como salvaguarda y nunca verifica unicidad.
Estrategia (alineada con duplicate_resolution_rules):
1. Detecta los clusters via audit_brand_vs_branches_totals
(bucket "opportunities_in_brand_duplicate_link"): grupos de opps de Marca
que comparten el mismo valor de link valido (20 chars).
2. Por cada opp del cluster, la trae EN VIVO (verifica que exista; createdAt y
updatedAt NO viven en el cache de SQLite).
3. Decide cual conservar con la jerarquia completa:
conservar = mayor monetaryValue
(desempate: status won/open > lost/abandoned)
(desempate: createdAt MAS ANTIGUO = la replica original)
borrar = el resto del cluster (la(s) replica(s) sobrante(s), que en el
incidente observado son las creadas en rafaga el 2026-05-30
con updatedAt == createdAt, intactas).
4. DELETE de la(s) opp(s) sobrante(s) en Marca via API de GHL.
Cada borrado se registra en script_audit y se guarda un snapshot completo en
generated/migrations/ ANTES de mutar (el DELETE no es reversible en GHL; el
snapshot permite recrear la opp si hiciera falta).
Uso:
python scripts/cleanup_brand_duplicate_replica_opps.py # dry-run
python scripts/cleanup_brand_duplicate_replica_opps.py --apply --run-id <uuid>
python scripts/cleanup_brand_duplicate_replica_opps.py --only-link <id_opp_sucursal> # 1 cluster
python scripts/cleanup_brand_duplicate_replica_opps.py --json
"""
import argparse
import datetime
import json
import os
import sys
import uuid
import warnings
from collections import defaultdict
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
import sync_engine # noqa: E402
import script_audit # noqa: E402
from paths import MIGRATIONS_DIR # noqa: E402
import audit_brand_vs_branches_totals as audit # noqa: E402
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
# Para conservar: mayor monetaryValue, luego status, luego mas antiguo.
# Alineado con el ranking del bucket en audit_brand_vs_branches_totals.
STATUS_RANK = {"won": 3, "open": 2, "lost": 1, "abandoned": 0}
gc = sync_engine.ghl_client
def safe_print(*args, **kwargs):
text = " ".join(str(a) for a in args)
try:
sys.stdout.write(text + "\n")
sys.stdout.flush()
except UnicodeEncodeError:
enc = sys.stdout.encoding or "utf-8"
sys.stdout.write(text.encode(enc, errors="replace").decode(enc) + "\n")
sys.stdout.flush()
def _iso_to_ts(s):
if not s:
return 0.0
try:
return datetime.datetime.fromisoformat(str(s).replace("Z", "+00:00")).timestamp()
except Exception:
return 0.0
def fetch_opp_live(opp_id, token):
"""GET en vivo. Devuelve la opp (dict) o None si ya no existe.
GHL responde **400** (no 404) para una opp borrada ("Opportunity doesn't
exist or is deleted"), ver memoria positive_opp_descuadre_double_replica.
Tratamos 400/404/"doesn't exist"/"deleted"/"not found" como inexistente.
"""
try:
data = gc.get_opportunity(token, opp_id)
return (data or {}).get("opportunity") or data
except Exception as exc:
msg = str(exc).lower()
if any(tok_ in msg for tok_ in ("400", "404", "not found", "doesn't exist", "does not exist", "deleted")):
return None
raise
def keep_score(o):
"""Score mayor = mas razon para CONSERVAR.
(monetaryValue, status_rank, -createdAt_ts): a igual valor y status, el
createdAt mas antiguo (ts menor -> -ts mayor) gana = la replica original.
"""
return (
float(o.get("monetaryValue") or 0),
STATUS_RANK.get((o.get("status") or "").lower(), 0),
-_iso_to_ts(o.get("createdAt")),
)
def _opp_brief(o):
return {
"id": o.get("id"),
"name": o.get("name"),
"status": o.get("status"),
"monetaryValue": o.get("monetaryValue"),
"createdAt": o.get("createdAt"),
"updatedAt": o.get("updatedAt"),
"contactId": o.get("contactId") or o.get("contact_id"),
"pipelineId": o.get("pipelineId"),
"pipelineStageId": o.get("pipelineStageId"),
}
def build_cluster_plan(link_value, opp_ids, token, *, log):
"""Trae cada opp en vivo, decide conservar/borrar. Devuelve dict de plan."""
live = []
for oid in opp_ids:
o = fetch_opp_live(oid, token)
if o is None:
log(f" [!] opp {oid} ya no existe en GHL (404), se ignora")
continue
live.append(o)
if len(live) < 2:
return {
"status": "incomplete",
"link_value": link_value,
"details": f"Solo {len(live)} de {len(opp_ids)} opps viven; el duplicado ya no aplica.",
}
live.sort(key=keep_score, reverse=True)
keep = live[0]
delete = live[1:]
return {
"status": "ok",
"link_value": link_value,
"name": keep.get("name") or "",
"keep": _opp_brief(keep),
"delete": [_opp_brief(o) for o in delete],
# snapshot completo de las que se borran (para recrear si fuera necesario)
"delete_full": [o for o in live if o.get("id") in {d.get("id") for d in delete}],
}
def detect_clusters_from_audit(log):
"""Corre el audit (read-only) y agrupa el bucket de duplicados por link_value."""
log("Detectando clusters via audit_brand_vs_branches_totals...")
data = audit.run_audit(limit_missing=None)
items = data["missing"]["opportunities_in_brand_duplicate_link"]["items"]
groups = defaultdict(list)
for it in items:
groups[it["link_value"]].append(it["id"])
clusters = [{"link_value": lv, "opp_ids": ids} for lv, ids in groups.items() if len(ids) >= 2]
log(f" -> {len(clusters)} cluster(s) de duplicados detectados "
f"({sum(len(c['opp_ids']) - 1 for c in clusters)} opps sobrantes).")
return clusters
def render_cluster_plan(plan):
if plan["status"] != "ok":
safe_print(f" [SKIP] link={plan['link_value']}: {plan.get('details')}")
return
k = plan["keep"]
safe_print(f" cluster link={plan['link_value']} ({plan['name']})")
safe_print(f" CONSERVAR opp={k['id']} status={k['status']} ${float(k['monetaryValue'] or 0):.0f} created={k['createdAt']}")
for d in plan["delete"]:
safe_print(f" BORRAR opp={d['id']} status={d['status']} ${float(d['monetaryValue'] or 0):.0f} created={d['createdAt']} (updated={d['updatedAt']})")
def apply_cluster(plan, token, *, run_id, log):
"""Borra la(s) opp(s) sobrante(s) del cluster. Devuelve stats."""
stats = {"deleted": 0, "errors": []}
if plan["status"] != "ok":
return stats
full_by_id = {o.get("id"): o for o in plan.get("delete_full", [])}
for d in plan["delete"]:
delete_id = d["id"]
change_id = None
if run_id:
change_id = script_audit.record_change(
run_id, BRAND_LOCATION_ID, "opportunity",
delete_id, "_delete", "delete_duplicate_replica_opp",
{"opportunity": full_by_id.get(delete_id, d), "link_value": plan["link_value"]},
{"deleted": True})
try:
gc.delete_opportunity(token, delete_id, BRAND_LOCATION_ID)
if change_id:
script_audit.mark_change(change_id, "applied")
stats["deleted"] += 1
log(f" OK opp borrada en GHL: {delete_id}")
except Exception as exc:
if change_id:
script_audit.mark_change(change_id, "failed", str(exc))
stats["errors"].append({"opp_id": delete_id, "error": str(exc)})
log(f" ERROR al borrar {delete_id}: {exc}")
return stats
def snapshot_run(plans, run_id, dry_run):
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
path = os.path.join(MIGRATIONS_DIR, f"cleanup_brand_duplicate_replica_opps_{ts}.json")
payload = {
"run_id": run_id,
"dry_run": dry_run,
"timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"plans": plans,
}
with open(path, "w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, indent=2, default=str)
return path
def run(apply=False, run_id=None, only_link=None, log=None):
if log is None:
log = safe_print
accounts = sync_engine.parse_accounts_csv()
brand = next(a for a in accounts if a["location_id"] == BRAND_LOCATION_ID)
token = brand["token"]
clusters = detect_clusters_from_audit(log)
if only_link:
clusters = [c for c in clusters if c["link_value"] == only_link]
log(f"Filtro --only-link: {len(clusters)} cluster(s).")
if not clusters:
log("Nada que procesar. No hay replicas duplicadas en Marca.")
return {"summary": {"clusters": 0, "extra_opps": 0, "deleted": 0, "errors": 0}, "plans": []}
if run_id and apply:
script_audit.create_run(
run_id, "cleanup_brand_duplicate_replica_opps.py",
arguments=f"clusters:{len(clusters)} apply",
locations=[BRAND_LOCATION_ID])
log(f"\nModo: {'APPLY (DELETE irreversible en GHL)' if apply else 'DRY-RUN'}")
log(f"Clusters a procesar: {len(clusters)}\n")
plans = []
for c in clusters:
log(f"-- link {c['link_value']} ({len(c['opp_ids'])} opps de Marca)")
plan = build_cluster_plan(c["link_value"], c["opp_ids"], token, log=log)
render_cluster_plan(plan)
plans.append(plan)
snap = snapshot_run(plans, run_id, dry_run=not apply)
log(f"\nSnapshot: {snap}")
ok_plans = [p for p in plans if p["status"] == "ok"]
extra = sum(len(p["delete"]) for p in ok_plans)
summary = {
"clusters": len(plans),
"ok": len(ok_plans),
"skipped": len(plans) - len(ok_plans),
"extra_opps": extra,
"deleted": 0,
"errors": 0,
}
if not apply:
log(f"\nDRY-RUN finalizado. clusters_ok={summary['ok']} opps_a_borrar={extra} skipped={summary['skipped']}")
log("Para aplicar: --apply --run-id <uuid>")
return {"summary": summary, "plans": plans, "snapshot": snap}
log("\nAplicando cambios...")
for plan in ok_plans:
log(f"-- {plan['name']} (link {plan['link_value']})")
stats = apply_cluster(plan, token, run_id=run_id, log=log)
summary["deleted"] += stats["deleted"]
summary["errors"] += len(stats["errors"])
if run_id:
script_audit.update_run_status(
run_id,
"completed" if summary["errors"] == 0 else "failed",
f"errors={summary['errors']}" if summary["errors"] else None)
log(f"\nResumen: clusters={summary['clusters']} deleted={summary['deleted']} errors={summary['errors']}")
return {"summary": summary, "plans": plans, "snapshot": snap}
def main():
parser = argparse.ArgumentParser(
description="Elimina replicas duplicadas de opps en Marca (mismo ID Oportunidad Sucursal).")
parser.add_argument("--apply", action="store_true",
help="Aplica los borrados. Sin este flag corre en DRY-RUN.")
parser.add_argument("--run-id", help="ID para script_audit. Si --apply y no se da, se genera uno.")
parser.add_argument("--only-link", help="Filtra a un solo cluster por su valor de ID Oportunidad Sucursal.")
parser.add_argument("--json", action="store_true", help="Imprime el resultado como JSON.")
args = parser.parse_args()
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
run_id = args.run_id
if args.apply and not run_id:
run_id = str(uuid.uuid4())
safe_print(f"[info] run_id autogenerado: {run_id}")
result = run(apply=args.apply, run_id=run_id, only_link=args.only_link,
log=(lambda *a: None) if args.json else safe_print)
if args.json:
print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
if __name__ == "__main__":
main()