334 lines
13 KiB
Python
334 lines
13 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Elimina REPLICAS DUPLICADAS de oportunidades en la cuenta de Marca.
|
|
|
|
Origen del problema: cuando el workflow n8n de sincronizacion de oportunidades
|
|
("Sincronizar Oportunidad - Nodos Nuevos", id Cfgwp0bOtDW8zuKW) decide
|
|
UPDATE-vs-CREATE, en algunas ejecuciones NO encuentra la opp ya existente en
|
|
Marca (latencia de indexado del search por custom field / carrera) y hace un
|
|
CREATE, dejando DOS opps de Marca que apuntan a la MISMA opp de sucursal de
|
|
origen (mismo valor de "ID Oportunidad Sucursal"). Es la causa tipica del
|
|
descuadre POSITIVO (Marca > sucursales). El bucket de huerfanas no lo ve porque
|
|
trata ese campo como salvaguarda y nunca verifica unicidad.
|
|
|
|
Estrategia (alineada con duplicate_resolution_rules):
|
|
1. Detecta los clusters via audit_brand_vs_branches_totals
|
|
(bucket "opportunities_in_brand_duplicate_link"): grupos de opps de Marca
|
|
que comparten el mismo valor de link valido (20 chars).
|
|
2. Por cada opp del cluster, la trae EN VIVO (verifica que exista; createdAt y
|
|
updatedAt NO viven en el cache de SQLite).
|
|
3. Decide cual conservar con la jerarquia completa:
|
|
conservar = mayor monetaryValue
|
|
(desempate: status won/open > lost/abandoned)
|
|
(desempate: createdAt MAS ANTIGUO = la replica original)
|
|
borrar = el resto del cluster (la(s) replica(s) sobrante(s), que en el
|
|
incidente observado son las creadas en rafaga el 2026-05-30
|
|
con updatedAt == createdAt, intactas).
|
|
4. DELETE de la(s) opp(s) sobrante(s) en Marca via API de GHL.
|
|
|
|
Cada borrado se registra en script_audit y se guarda un snapshot completo en
|
|
generated/migrations/ ANTES de mutar (el DELETE no es reversible en GHL; el
|
|
snapshot permite recrear la opp si hiciera falta).
|
|
|
|
Uso:
|
|
python scripts/cleanup_brand_duplicate_replica_opps.py # dry-run
|
|
python scripts/cleanup_brand_duplicate_replica_opps.py --apply --run-id <uuid>
|
|
python scripts/cleanup_brand_duplicate_replica_opps.py --only-link <id_opp_sucursal> # 1 cluster
|
|
python scripts/cleanup_brand_duplicate_replica_opps.py --json
|
|
"""
|
|
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import os
|
|
import sys
|
|
import uuid
|
|
import warnings
|
|
from collections import defaultdict
|
|
|
|
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
if SCRIPTS_DIR not in sys.path:
|
|
sys.path.insert(0, SCRIPTS_DIR)
|
|
|
|
import sync_engine # noqa: E402
|
|
import script_audit # noqa: E402
|
|
from paths import MIGRATIONS_DIR # noqa: E402
|
|
import audit_brand_vs_branches_totals as audit # noqa: E402
|
|
|
|
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
|
|
|
|
# Para conservar: mayor monetaryValue, luego status, luego mas antiguo.
|
|
# Alineado con el ranking del bucket en audit_brand_vs_branches_totals.
|
|
STATUS_RANK = {"won": 3, "open": 2, "lost": 1, "abandoned": 0}
|
|
|
|
gc = sync_engine.ghl_client
|
|
|
|
|
|
def safe_print(*args, **kwargs):
|
|
text = " ".join(str(a) for a in args)
|
|
try:
|
|
sys.stdout.write(text + "\n")
|
|
sys.stdout.flush()
|
|
except UnicodeEncodeError:
|
|
enc = sys.stdout.encoding or "utf-8"
|
|
sys.stdout.write(text.encode(enc, errors="replace").decode(enc) + "\n")
|
|
sys.stdout.flush()
|
|
|
|
|
|
def _iso_to_ts(s):
|
|
if not s:
|
|
return 0.0
|
|
try:
|
|
return datetime.datetime.fromisoformat(str(s).replace("Z", "+00:00")).timestamp()
|
|
except Exception:
|
|
return 0.0
|
|
|
|
|
|
def fetch_opp_live(opp_id, token):
|
|
"""GET en vivo. Devuelve la opp (dict) o None si ya no existe.
|
|
|
|
GHL responde **400** (no 404) para una opp borrada ("Opportunity doesn't
|
|
exist or is deleted"), ver memoria positive_opp_descuadre_double_replica.
|
|
Tratamos 400/404/"doesn't exist"/"deleted"/"not found" como inexistente.
|
|
"""
|
|
try:
|
|
data = gc.get_opportunity(token, opp_id)
|
|
return (data or {}).get("opportunity") or data
|
|
except Exception as exc:
|
|
msg = str(exc).lower()
|
|
if any(tok_ in msg for tok_ in ("400", "404", "not found", "doesn't exist", "does not exist", "deleted")):
|
|
return None
|
|
raise
|
|
|
|
|
|
def keep_score(o):
|
|
"""Score mayor = mas razon para CONSERVAR.
|
|
|
|
(monetaryValue, status_rank, -createdAt_ts): a igual valor y status, el
|
|
createdAt mas antiguo (ts menor -> -ts mayor) gana = la replica original.
|
|
"""
|
|
return (
|
|
float(o.get("monetaryValue") or 0),
|
|
STATUS_RANK.get((o.get("status") or "").lower(), 0),
|
|
-_iso_to_ts(o.get("createdAt")),
|
|
)
|
|
|
|
|
|
def _opp_brief(o):
|
|
return {
|
|
"id": o.get("id"),
|
|
"name": o.get("name"),
|
|
"status": o.get("status"),
|
|
"monetaryValue": o.get("monetaryValue"),
|
|
"createdAt": o.get("createdAt"),
|
|
"updatedAt": o.get("updatedAt"),
|
|
"contactId": o.get("contactId") or o.get("contact_id"),
|
|
"pipelineId": o.get("pipelineId"),
|
|
"pipelineStageId": o.get("pipelineStageId"),
|
|
}
|
|
|
|
|
|
def build_cluster_plan(link_value, opp_ids, token, *, log):
|
|
"""Trae cada opp en vivo, decide conservar/borrar. Devuelve dict de plan."""
|
|
live = []
|
|
for oid in opp_ids:
|
|
o = fetch_opp_live(oid, token)
|
|
if o is None:
|
|
log(f" [!] opp {oid} ya no existe en GHL (404), se ignora")
|
|
continue
|
|
live.append(o)
|
|
|
|
if len(live) < 2:
|
|
return {
|
|
"status": "incomplete",
|
|
"link_value": link_value,
|
|
"details": f"Solo {len(live)} de {len(opp_ids)} opps viven; el duplicado ya no aplica.",
|
|
}
|
|
|
|
live.sort(key=keep_score, reverse=True)
|
|
keep = live[0]
|
|
delete = live[1:]
|
|
|
|
return {
|
|
"status": "ok",
|
|
"link_value": link_value,
|
|
"name": keep.get("name") or "",
|
|
"keep": _opp_brief(keep),
|
|
"delete": [_opp_brief(o) for o in delete],
|
|
# snapshot completo de las que se borran (para recrear si fuera necesario)
|
|
"delete_full": [o for o in live if o.get("id") in {d.get("id") for d in delete}],
|
|
}
|
|
|
|
|
|
def detect_clusters_from_audit(log):
|
|
"""Corre el audit (read-only) y agrupa el bucket de duplicados por link_value."""
|
|
log("Detectando clusters via audit_brand_vs_branches_totals...")
|
|
data = audit.run_audit(limit_missing=None)
|
|
items = data["missing"]["opportunities_in_brand_duplicate_link"]["items"]
|
|
groups = defaultdict(list)
|
|
for it in items:
|
|
groups[it["link_value"]].append(it["id"])
|
|
clusters = [{"link_value": lv, "opp_ids": ids} for lv, ids in groups.items() if len(ids) >= 2]
|
|
log(f" -> {len(clusters)} cluster(s) de duplicados detectados "
|
|
f"({sum(len(c['opp_ids']) - 1 for c in clusters)} opps sobrantes).")
|
|
return clusters
|
|
|
|
|
|
def render_cluster_plan(plan):
|
|
if plan["status"] != "ok":
|
|
safe_print(f" [SKIP] link={plan['link_value']}: {plan.get('details')}")
|
|
return
|
|
k = plan["keep"]
|
|
safe_print(f" cluster link={plan['link_value']} ({plan['name']})")
|
|
safe_print(f" CONSERVAR opp={k['id']} status={k['status']} ${float(k['monetaryValue'] or 0):.0f} created={k['createdAt']}")
|
|
for d in plan["delete"]:
|
|
safe_print(f" BORRAR opp={d['id']} status={d['status']} ${float(d['monetaryValue'] or 0):.0f} created={d['createdAt']} (updated={d['updatedAt']})")
|
|
|
|
|
|
def apply_cluster(plan, token, *, run_id, log):
|
|
"""Borra la(s) opp(s) sobrante(s) del cluster. Devuelve stats."""
|
|
stats = {"deleted": 0, "errors": []}
|
|
if plan["status"] != "ok":
|
|
return stats
|
|
|
|
full_by_id = {o.get("id"): o for o in plan.get("delete_full", [])}
|
|
for d in plan["delete"]:
|
|
delete_id = d["id"]
|
|
change_id = None
|
|
if run_id:
|
|
change_id = script_audit.record_change(
|
|
run_id, BRAND_LOCATION_ID, "opportunity",
|
|
delete_id, "_delete", "delete_duplicate_replica_opp",
|
|
{"opportunity": full_by_id.get(delete_id, d), "link_value": plan["link_value"]},
|
|
{"deleted": True})
|
|
try:
|
|
gc.delete_opportunity(token, delete_id, BRAND_LOCATION_ID)
|
|
if change_id:
|
|
script_audit.mark_change(change_id, "applied")
|
|
stats["deleted"] += 1
|
|
log(f" OK opp borrada en GHL: {delete_id}")
|
|
except Exception as exc:
|
|
if change_id:
|
|
script_audit.mark_change(change_id, "failed", str(exc))
|
|
stats["errors"].append({"opp_id": delete_id, "error": str(exc)})
|
|
log(f" ERROR al borrar {delete_id}: {exc}")
|
|
return stats
|
|
|
|
|
|
def snapshot_run(plans, run_id, dry_run):
|
|
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
|
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
path = os.path.join(MIGRATIONS_DIR, f"cleanup_brand_duplicate_replica_opps_{ts}.json")
|
|
payload = {
|
|
"run_id": run_id,
|
|
"dry_run": dry_run,
|
|
"timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
"plans": plans,
|
|
}
|
|
with open(path, "w", encoding="utf-8") as fh:
|
|
json.dump(payload, fh, ensure_ascii=False, indent=2, default=str)
|
|
return path
|
|
|
|
|
|
def run(apply=False, run_id=None, only_link=None, log=None):
|
|
if log is None:
|
|
log = safe_print
|
|
|
|
accounts = sync_engine.parse_accounts_csv()
|
|
brand = next(a for a in accounts if a["location_id"] == BRAND_LOCATION_ID)
|
|
token = brand["token"]
|
|
|
|
clusters = detect_clusters_from_audit(log)
|
|
if only_link:
|
|
clusters = [c for c in clusters if c["link_value"] == only_link]
|
|
log(f"Filtro --only-link: {len(clusters)} cluster(s).")
|
|
|
|
if not clusters:
|
|
log("Nada que procesar. No hay replicas duplicadas en Marca.")
|
|
return {"summary": {"clusters": 0, "extra_opps": 0, "deleted": 0, "errors": 0}, "plans": []}
|
|
|
|
if run_id and apply:
|
|
script_audit.create_run(
|
|
run_id, "cleanup_brand_duplicate_replica_opps.py",
|
|
arguments=f"clusters:{len(clusters)} apply",
|
|
locations=[BRAND_LOCATION_ID])
|
|
|
|
log(f"\nModo: {'APPLY (DELETE irreversible en GHL)' if apply else 'DRY-RUN'}")
|
|
log(f"Clusters a procesar: {len(clusters)}\n")
|
|
|
|
plans = []
|
|
for c in clusters:
|
|
log(f"-- link {c['link_value']} ({len(c['opp_ids'])} opps de Marca)")
|
|
plan = build_cluster_plan(c["link_value"], c["opp_ids"], token, log=log)
|
|
render_cluster_plan(plan)
|
|
plans.append(plan)
|
|
|
|
snap = snapshot_run(plans, run_id, dry_run=not apply)
|
|
log(f"\nSnapshot: {snap}")
|
|
|
|
ok_plans = [p for p in plans if p["status"] == "ok"]
|
|
extra = sum(len(p["delete"]) for p in ok_plans)
|
|
summary = {
|
|
"clusters": len(plans),
|
|
"ok": len(ok_plans),
|
|
"skipped": len(plans) - len(ok_plans),
|
|
"extra_opps": extra,
|
|
"deleted": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
if not apply:
|
|
log(f"\nDRY-RUN finalizado. clusters_ok={summary['ok']} opps_a_borrar={extra} skipped={summary['skipped']}")
|
|
log("Para aplicar: --apply --run-id <uuid>")
|
|
return {"summary": summary, "plans": plans, "snapshot": snap}
|
|
|
|
log("\nAplicando cambios...")
|
|
for plan in ok_plans:
|
|
log(f"-- {plan['name']} (link {plan['link_value']})")
|
|
stats = apply_cluster(plan, token, run_id=run_id, log=log)
|
|
summary["deleted"] += stats["deleted"]
|
|
summary["errors"] += len(stats["errors"])
|
|
|
|
if run_id:
|
|
script_audit.update_run_status(
|
|
run_id,
|
|
"completed" if summary["errors"] == 0 else "failed",
|
|
f"errors={summary['errors']}" if summary["errors"] else None)
|
|
|
|
log(f"\nResumen: clusters={summary['clusters']} deleted={summary['deleted']} errors={summary['errors']}")
|
|
return {"summary": summary, "plans": plans, "snapshot": snap}
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Elimina replicas duplicadas de opps en Marca (mismo ID Oportunidad Sucursal).")
|
|
parser.add_argument("--apply", action="store_true",
|
|
help="Aplica los borrados. Sin este flag corre en DRY-RUN.")
|
|
parser.add_argument("--run-id", help="ID para script_audit. Si --apply y no se da, se genera uno.")
|
|
parser.add_argument("--only-link", help="Filtra a un solo cluster por su valor de ID Oportunidad Sucursal.")
|
|
parser.add_argument("--json", action="store_true", help="Imprime el resultado como JSON.")
|
|
args = parser.parse_args()
|
|
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
|
|
run_id = args.run_id
|
|
if args.apply and not run_id:
|
|
run_id = str(uuid.uuid4())
|
|
safe_print(f"[info] run_id autogenerado: {run_id}")
|
|
|
|
result = run(apply=args.apply, run_id=run_id, only_link=args.only_link,
|
|
log=(lambda *a: None) if args.json else safe_print)
|
|
|
|
if args.json:
|
|
print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|