Files
MP-Manager/scripts/reconcile_brand_deadlink_opps.py
2026-05-30 14:31:19 -06:00

497 lines
22 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Reconcilia REPLICAS HUERFANAS de opps en Marca con link MUERTO (dead-link).
CAUSA RAIZ que ataca (ver docs/casos/2026-05-30-descuadre-opp-deadlink.md y
memoria positive_opp_descuadre_double_replica, variante DEAD-LINK UNICO):
El workflow n8n de sync de opps (Cfgwp0bOtDW8zuKW) crea una replica en Marca por
cada opp de sucursal y guarda el id nativo de la sucursal en el CF
"ID Oportunidad Sucursal". Cuando una opp de sucursal se BORRA (su id deja de
existir), GHL NO dispara webhook de borrado -> la replica de Marca queda
HUERFANA con un link muerto y NADIE la limpia. Es la causa #1 del descuadre
positivo (Marca > sucursales). Ningun bucket del audit ni los otros cleanup la
atrapan (link unico no-compartido + campo poblado + opp viva emparejada por
contacto).
Este reconciliador es el backstop DETERMINISTA: converge los dead-links a 0 sin
importar como surgieron. Replica el procedimiento manual ya validado:
1. DETECTA (cache): opps de Marca cuyo link NO esta en el set de ids nativos de
ninguna sucursal (excluye demos).
2. VERIFICA EN VIVO: GET /opportunities/{link} con el token de la sucursal del
contacto. GHL devuelve 400 "doesn't exist or is deleted" si esta borrada
(NO 404). Si responde 200 -> el link esta VIVO (cache stale) -> SKIP.
3. CLASIFICA (deterministico, funcion pura `classify`):
- sibling con link VALIDO -> DELETE (replica obsoleta duplicada)
- 0 opps vivas en sucursal -> DELETE (huerfana real)
- 1 opp viva no enlazada -> RELINK (id rotado; re-apuntar al id vivo)
- ambiguo (multi-opp / sin resolver branch) -> SKIP (revision humana)
4. APLICA (--apply): snapshot live + script_audit run_id (reversible). DELETE
irreversible en GHL -> el snapshot permite recrear.
Uso:
python scripts/reconcile_brand_deadlink_opps.py # dry-run
python scripts/reconcile_brand_deadlink_opps.py --json
python scripts/reconcile_brand_deadlink_opps.py --apply --run-id <uuid>
python scripts/reconcile_brand_deadlink_opps.py --only-opp <marca_opp_id>
python scripts/reconcile_brand_deadlink_opps.py --resync-first # re-sync Marca antes de detectar
python scripts/reconcile_brand_deadlink_opps.py --self-test # valida classify() con fixtures (sin API)
"""
import argparse
import datetime
import json
import os
import sqlite3
import sys
import uuid
import warnings
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
import requests # noqa: E402
import sync_engine # noqa: E402
import script_audit # noqa: E402
from paths import DB_PATH, MIGRATIONS_DIR # noqa: E402
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
DEMO_LOCATION_IDS = {"Vf7qQl3L9vakJ8hDtQ8e", "Z64WQKORPVwXb5mn68Ef"}
OPP_LINK_FIELD_KEY = "opportunity.id_oportunidad_sucursal"
CONTACT_LINK_FIELD_KEY = "contact.id_contacto_sucursal"
GHL_BASE = "https://services.leadconnectorhq.com"
gc = sync_engine.ghl_client
# --------------------------------------------------------------------------- #
# utils
# --------------------------------------------------------------------------- #
def safe_print(*args, **kwargs):
text = " ".join(str(a) for a in args)
try:
sys.stdout.write(text + "\n")
sys.stdout.flush()
except UnicodeEncodeError:
enc = sys.stdout.encoding or "utf-8"
sys.stdout.write(text.encode(enc, errors="replace").decode(enc) + "\n")
sys.stdout.flush()
def extract_cf(cf_json, field_id):
"""Valor de un custom field por id. GHL guarda bajo fieldValueString (gotcha)."""
if not cf_json or not field_id:
return None
try:
arr = json.loads(cf_json)
except Exception:
return None
if isinstance(arr, dict):
arr = arr.get("customFields") or arr.get("custom_fields") or []
for f in arr or []:
if isinstance(f, dict) and (f.get("id") == field_id or f.get("fieldId") == field_id):
return f.get("fieldValueString") or f.get("fieldValue") or f.get("value")
return None
def resolve_field_id(conn, location_id, object_key, field_key):
row = conn.execute(
"SELECT field_id FROM object_schemas "
"WHERE location_id=? AND object_key=? AND field_key=?",
(location_id, object_key, field_key),
).fetchone()
return row["field_id"] if row else None
# --------------------------------------------------------------------------- #
# nucleo de decision (PURO, sin side-effects -> unit-testable via --self-test)
# --------------------------------------------------------------------------- #
def classify(*, dead_link_confirmed, sibling_valid_links, branch_live_opp_ids,
branch_resolved, branch_opp_already_linked):
"""Decide la accion para una replica de Marca con link presuntamente muerto.
Args:
dead_link_confirmed: bool | None — GET live del link dio 400 (True), 200 (False),
o no se pudo verificar (None).
sibling_valid_links: int — # de OTRAS opps de Marca del mismo contacto cuyo link
apunta a una opp de sucursal VIVA.
branch_live_opp_ids: list[str] — ids de opps VIVAS del contacto en su sucursal.
branch_resolved: bool — se pudo resolver la sucursal del contacto.
branch_opp_already_linked: bool — la(s) opp(s) viva(s) de sucursal YA estan
enlazadas por otra opp de Marca (no disponibles para relink).
Returns: dict {action: DELETE|RELINK|SKIP, reason: str, relink_to: str|None}
"""
if dead_link_confirmed is False:
return {"action": "SKIP", "reason": "link VIVO en GHL (cache stale); re-sync y re-evaluar", "relink_to": None}
if dead_link_confirmed is None:
return {"action": "SKIP", "reason": "no se pudo verificar el link en vivo (sucursal no resuelta)", "relink_to": None}
# link confirmado MUERTO
if sibling_valid_links >= 1:
return {"action": "DELETE", "reason": "replica obsoleta: el contacto ya tiene otra opp de Marca con link valido", "relink_to": None}
if branch_resolved and len(branch_live_opp_ids) == 0:
return {"action": "DELETE", "reason": "huerfana real: el contacto no tiene opps vivas en su sucursal", "relink_to": None}
if branch_resolved and len(branch_live_opp_ids) == 1 and not branch_opp_already_linked:
return {"action": "RELINK", "reason": "id rotado: re-apuntar al unico id de opp vivo de la sucursal", "relink_to": branch_live_opp_ids[0]}
return {"action": "SKIP", "reason": "ambiguo (multi-opp en sucursal o ya enlazadas); revision humana", "relink_to": None}
# --------------------------------------------------------------------------- #
# verificacion en vivo
# --------------------------------------------------------------------------- #
def _headers(token):
return {"Authorization": f"Bearer {token}", "Version": "2021-07-28", "Accept": "application/json"}
def opp_alive_live(opp_id, token):
"""True si la opp existe en vivo, False si 400/404 (borrada), None si error de red."""
try:
r = requests.get(f"{GHL_BASE}/opportunities/{opp_id}", headers=_headers(token), timeout=30)
if r.status_code == 200:
return True
if r.status_code in (400, 404):
return False
return None
except Exception:
return None
def branch_live_opps_for_contact(branch_contact_id, location_id, token):
"""Ids de opps vivas del contacto en su sucursal (search by contact)."""
try:
r = requests.get(f"{GHL_BASE}/opportunities/search",
headers=_headers(token),
params={"location_id": location_id, "contact_id": branch_contact_id},
timeout=30)
if r.status_code != 200:
return None
return [o["id"] for o in r.json().get("opportunities", [])]
except Exception:
return None
# --------------------------------------------------------------------------- #
# carga de datos
# --------------------------------------------------------------------------- #
def load_state(conn):
brand_opp_link = resolve_field_id(conn, BRAND_LOCATION_ID, "opportunity", OPP_LINK_FIELD_KEY)
brand_contact_link = resolve_field_id(conn, BRAND_LOCATION_ID, "contact", CONTACT_LINK_FIELD_KEY)
branch_opp_ids = {
r["id"] for r in conn.execute(
"SELECT id FROM opportunities WHERE location_id NOT IN (?,?,?)",
(BRAND_LOCATION_ID, *DEMO_LOCATION_IDS),
)
}
# branch contact id -> location_id (para resolver la sucursal del contacto de Marca)
branch_contact_loc = {}
for r in conn.execute(
"SELECT id, location_id FROM contacts WHERE location_id NOT IN (?,?,?)",
(BRAND_LOCATION_ID, *DEMO_LOCATION_IDS),
):
branch_contact_loc[r["id"]] = r["location_id"]
brand_opps = [dict(r) for r in conn.execute(
"SELECT id, name, status, monetary_value, contact_id, custom_fields_json "
"FROM opportunities WHERE location_id=?", (BRAND_LOCATION_ID,))]
brand_contacts = {r["id"]: dict(r) for r in conn.execute(
"SELECT id, custom_fields_json FROM contacts WHERE location_id=?", (BRAND_LOCATION_ID,))}
return {
"brand_opp_link": brand_opp_link,
"brand_contact_link": brand_contact_link,
"branch_opp_ids": branch_opp_ids,
"branch_contact_loc": branch_contact_loc,
"brand_opps": brand_opps,
"brand_contacts": brand_contacts,
}
def build_plans(conn, tokens, *, only_opp=None, log=safe_print):
st = load_state(conn)
bf = st["brand_opp_link"]
cf = st["brand_contact_link"]
if not bf:
raise RuntimeError("No se resolvio el field_id de 'ID Oportunidad Sucursal' en Marca (corre sync de metadata).")
# opps de Marca agrupadas por contacto + validez de su link (para sibling check)
opps_by_contact = {}
for o in st["brand_opps"]:
lv = (extract_cf(o["custom_fields_json"], bf) or "").strip()
o["_link"] = lv
o["_link_valid"] = bool(lv) and lv in st["branch_opp_ids"]
opps_by_contact.setdefault(o["contact_id"], []).append(o)
# candidatos = link no vacio y NO en el set de ids de sucursal
candidates = [o for o in st["brand_opps"]
if o["_link"] and not o["_link_valid"]]
if only_opp:
candidates = [o for o in candidates if o["id"] == only_opp]
log(f"Candidatos dead-link (cache): {len(candidates)}")
plans = []
for o in candidates:
cid = o["contact_id"]
siblings = [s for s in opps_by_contact.get(cid, []) if s["id"] != o["id"]]
sibling_valid = sum(1 for s in siblings if s["_link_valid"])
# resolver sucursal del contacto via id_contacto_sucursal
bc = st["brand_contacts"].get(cid, {})
branch_contact_id = (extract_cf(bc.get("custom_fields_json"), cf) or "").strip() if cf else ""
branch_loc = st["branch_contact_loc"].get(branch_contact_id)
branch_resolved = bool(branch_loc and branch_loc in tokens)
dead_confirmed = None
branch_live = []
already_linked = False
if branch_resolved:
tok = tokens[branch_loc]
alive = opp_alive_live(o["_link"], tok)
dead_confirmed = (alive is False) if alive is not None else None
live = branch_live_opps_for_contact(branch_contact_id, branch_loc, tok)
branch_live = live or []
# ¿alguna opp viva de la sucursal ya esta enlazada por otra opp de Marca?
linked_ids = {s["_link"] for s in opps_by_contact.get(cid, []) if s["_link_valid"]}
already_linked = any(bid in linked_ids for bid in branch_live)
decision = classify(
dead_link_confirmed=dead_confirmed,
sibling_valid_links=sibling_valid,
branch_live_opp_ids=branch_live,
branch_resolved=branch_resolved,
branch_opp_already_linked=already_linked,
)
plans.append({
"marca_opp_id": o["id"],
"name": o["name"],
"status": o["status"],
"monetary_value": o["monetary_value"],
"contact_id": cid,
"dead_link": o["_link"],
"branch_contact_id": branch_contact_id,
"branch_location_id": branch_loc,
"branch_resolved": branch_resolved,
"dead_link_confirmed": dead_confirmed,
"sibling_valid_links": sibling_valid,
"branch_live_opp_ids": branch_live,
"branch_opp_already_linked": already_linked,
**decision,
})
return plans
# --------------------------------------------------------------------------- #
# aplicacion
# --------------------------------------------------------------------------- #
def snapshot(plans, run_id, dry_run):
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
path = os.path.join(MIGRATIONS_DIR, f"reconcile_brand_deadlink_opps_{ts}.json")
json.dump({"run_id": run_id, "dry_run": dry_run, "plans": plans},
open(path, "w", encoding="utf-8"), ensure_ascii=False, indent=2, default=str)
return path
def apply_plans(plans, tokens, brand_link_field_id, *, run_id, log=safe_print):
btok = tokens[BRAND_LOCATION_ID]
stats = {"deleted": 0, "relinked": 0, "skipped": 0, "errors": 0}
for p in plans:
action = p["action"]
oid = p["marca_opp_id"]
if action == "SKIP":
stats["skipped"] += 1
continue
if action == "DELETE":
snap = None
try:
snap = (gc.get_opportunity(btok, oid) or {}).get("opportunity")
except Exception:
pass
change_id = script_audit.record_change(
run_id, BRAND_LOCATION_ID, "opportunity", oid, "_delete",
"reconcile_deadlink_delete",
{"opportunity": snap, "dead_link": p["dead_link"]}, {"deleted": True}) if run_id else None
try:
gc.delete_opportunity(btok, oid, BRAND_LOCATION_ID)
if change_id:
script_audit.mark_change(change_id, "applied")
stats["deleted"] += 1
log(f" DELETE ok {oid} ({p['name']})")
except Exception as e:
if change_id:
script_audit.mark_change(change_id, "failed", str(e))
stats["errors"] += 1
log(f" DELETE ERROR {oid}: {e}")
elif action == "RELINK":
new = p["relink_to"]
change_id = script_audit.record_change(
run_id, BRAND_LOCATION_ID, "opportunity", oid, brand_link_field_id,
"reconcile_deadlink_relink",
{"value": p["dead_link"]}, {"value": new}) if run_id else None
try:
gc.update_opportunity(btok, oid, {"customFields": [
{"id": brand_link_field_id, "key": OPP_LINK_FIELD_KEY, "field_value": new}]})
if change_id:
script_audit.mark_change(change_id, "applied")
stats["relinked"] += 1
log(f" RELINK ok {oid}: {p['dead_link']} -> {new}")
except Exception as e:
if change_id:
script_audit.mark_change(change_id, "failed", str(e))
stats["errors"] += 1
log(f" RELINK ERROR {oid}: {e}")
return stats
# --------------------------------------------------------------------------- #
# run
# --------------------------------------------------------------------------- #
def run(*, apply=False, run_id=None, only_opp=None, resync_first=False, log=safe_print):
accounts = sync_engine.parse_accounts_csv()
tokens = {a["location_id"]: a["token"] for a in accounts}
if resync_first:
log("Re-sync de Marca antes de detectar...")
sync_engine.sync_account(BRAND_LOCATION_ID, tokens[BRAND_LOCATION_ID])
conn = sqlite3.connect(str(DB_PATH))
conn.row_factory = sqlite3.Row
try:
brand_link_field_id = resolve_field_id(conn, BRAND_LOCATION_ID, "opportunity", OPP_LINK_FIELD_KEY)
plans = build_plans(conn, tokens, only_opp=only_opp, log=log)
finally:
conn.close()
by_action = {"DELETE": [], "RELINK": [], "SKIP": []}
for p in plans:
by_action[p["action"]].append(p)
log(f"\nPlan: DELETE={len(by_action['DELETE'])} RELINK={len(by_action['RELINK'])} SKIP={len(by_action['SKIP'])}")
for p in plans:
log(f" [{p['action']}] {p['marca_opp_id']} '{p['name']}' (${p['monetary_value']}) "
f"dead={p['dead_link']} -> {p['reason']}"
+ (f" relink_to={p['relink_to']}" if p.get('relink_to') else ""))
snap = snapshot(plans, run_id, dry_run=not apply)
log(f"Snapshot: {snap}")
summary = {"candidates": len(plans),
"delete": len(by_action["DELETE"]),
"relink": len(by_action["RELINK"]),
"skip": len(by_action["SKIP"]),
"deleted": 0, "relinked": 0, "errors": 0}
if not apply:
log("\nDRY-RUN. Para aplicar: --apply --run-id <uuid>")
return {"summary": summary, "plans": plans, "snapshot": snap}
actionable = by_action["DELETE"] + by_action["RELINK"]
if not actionable:
log("\nNada accionable. Dead-links = 0 (o todo SKIP).")
return {"summary": summary, "plans": plans, "snapshot": snap}
if run_id:
script_audit.create_run(run_id, "reconcile_brand_deadlink_opps.py",
arguments=f"delete:{summary['delete']} relink:{summary['relink']}",
locations=[BRAND_LOCATION_ID])
log("\nAplicando...")
stats = apply_plans(plans, tokens, brand_link_field_id, run_id=run_id, log=log)
summary.update(deleted=stats["deleted"], relinked=stats["relinked"], errors=stats["errors"])
if run_id:
script_audit.update_run_status(run_id, "completed" if stats["errors"] == 0 else "failed",
f"errors={stats['errors']}" if stats["errors"] else None)
log(f"\nResumen: deleted={summary['deleted']} relinked={summary['relinked']} errors={summary['errors']}")
return {"summary": summary, "plans": plans, "snapshot": snap}
# --------------------------------------------------------------------------- #
# self-test (valida classify() sin tocar API/DB) — repetible, deterministico
# --------------------------------------------------------------------------- #
def self_test():
cases = [
# (descripcion, kwargs, accion_esperada)
("sibling valido -> DELETE (Ernesto/Gerardo)",
dict(dead_link_confirmed=True, sibling_valid_links=1, branch_live_opp_ids=["B1"],
branch_resolved=True, branch_opp_already_linked=True), "DELETE"),
("0 opps vivas -> DELETE (huerfana real)",
dict(dead_link_confirmed=True, sibling_valid_links=0, branch_live_opp_ids=[],
branch_resolved=True, branch_opp_already_linked=False), "DELETE"),
("1 opp viva no enlazada -> RELINK (Patricia/Lizeth)",
dict(dead_link_confirmed=True, sibling_valid_links=0, branch_live_opp_ids=["B9"],
branch_resolved=True, branch_opp_already_linked=False), "RELINK"),
("multi-opp viva -> SKIP (ambiguo)",
dict(dead_link_confirmed=True, sibling_valid_links=0, branch_live_opp_ids=["B1", "B2"],
branch_resolved=True, branch_opp_already_linked=False), "SKIP"),
("link VIVO en GHL -> SKIP (cache stale)",
dict(dead_link_confirmed=False, sibling_valid_links=0, branch_live_opp_ids=["B1"],
branch_resolved=True, branch_opp_already_linked=False), "SKIP"),
("no verificable -> SKIP",
dict(dead_link_confirmed=None, sibling_valid_links=0, branch_live_opp_ids=[],
branch_resolved=False, branch_opp_already_linked=False), "SKIP"),
("1 opp viva PERO ya enlazada por otra -> SKIP",
dict(dead_link_confirmed=True, sibling_valid_links=0, branch_live_opp_ids=["B9"],
branch_resolved=True, branch_opp_already_linked=True), "SKIP"),
("sibling valido gana aunque haya 1 viva libre -> DELETE",
dict(dead_link_confirmed=True, sibling_valid_links=2, branch_live_opp_ids=["B9"],
branch_resolved=True, branch_opp_already_linked=False), "DELETE"),
]
ok = 0
for desc, kw, expected in cases:
got = classify(**kw)["action"]
status = "OK " if got == expected else "FAIL"
if got == expected:
ok += 1
safe_print(f" [{status}] {desc}: esperado={expected} got={got}")
safe_print(f"\nself-test: {ok}/{len(cases)} pasaron")
return ok == len(cases)
def main():
ap = argparse.ArgumentParser(description="Reconcilia replicas huerfanas de opps en Marca con link muerto.")
ap.add_argument("--apply", action="store_true")
ap.add_argument("--run-id")
ap.add_argument("--only-opp", help="Filtra a una sola opp de Marca por id.")
ap.add_argument("--resync-first", action="store_true", help="Re-sync de Marca antes de detectar.")
ap.add_argument("--json", action="store_true")
ap.add_argument("--self-test", action="store_true", help="Valida classify() con fixtures (sin API).")
args = ap.parse_args()
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if args.self_test:
sys.exit(0 if self_test() else 1)
run_id = args.run_id
if args.apply and not run_id:
run_id = str(uuid.uuid4())
safe_print(f"[info] run_id autogenerado: {run_id}")
result = run(apply=args.apply, run_id=run_id, only_opp=args.only_opp,
resync_first=args.resync_first,
log=(lambda *a: None) if args.json else safe_print)
if args.json:
print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
if __name__ == "__main__":
main()