Files
MP-Manager/scripts/fill_opp_id_oportunidad_sucursal.py
2026-05-30 14:31:19 -06:00

325 lines
13 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Copia el ID NATIVO de cada oportunidad a su propio custom field
"ID Oportunidad Sucursal" (fieldKey: opportunity.id_oportunidad_sucursal).
SOLO aplica a SUCURSALES (excluye la cuenta de Marca). En la sucursal, cada
oportunidad queda autoetiquetada con su propio `id`. Propósito: habilitar a
futuro la sincronización de múltiples oportunidades por contacto usando este
campo como filtro estable para identificar la opp exacta a actualizar/crear en
Marca (en vez de heurísticas por nombre/monto).
Flujo:
- Lee TODAS las oportunidades de cada sucursal EN VIVO (POST /opportunities/search
paginado), no del cache, para tener el estado actual del custom field.
- Para cada opp, si el campo ya vale exactamente su `id`, se omite (idempotente).
- Si no, PUT /opportunities/{id} con customFields=[{id,key,field_value: opp.id}].
GHL MERGEA customFields en el PUT (validado): preserva los demás campos.
Cada cambio se registra en script_audit (rollback) y se guarda un snapshot por
cuenta en generated/migrations/. Sin --apply corre en DRY-RUN.
Uso:
python scripts/fill_opp_id_oportunidad_sucursal.py --location <id> # dry-run
python scripts/fill_opp_id_oportunidad_sucursal.py --demos --apply --run-id <uuid>
python scripts/fill_opp_id_oportunidad_sucursal.py --all --apply --run-id <uuid>
"""
import argparse
import datetime
import json
import os
import sys
import uuid
import warnings
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
import sync_engine # noqa: E402
import script_audit # noqa: E402
from paths import MIGRATIONS_DIR # noqa: E402
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
DEMO_LOCATION_IDS = ["Vf7qQl3L9vakJ8hDtQ8e", "Z64WQKORPVwXb5mn68Ef"]
FIELD_FKEY = "opportunity.id_oportunidad_sucursal"
FIELD_NAME_NORM = "id oportunidad sucursal"
gc = sync_engine.ghl_client
def norm(s):
return " ".join(str(s or "").strip().lower().split())
def resolve_field(location_id, token):
"""Devuelve (field_id, field_key) del campo en una location (legacy endpoint)."""
data = gc._request("GET", f"/locations/{location_id}/customFields", token,
params={"model": "opportunity"})
for cf in data.get("customFields", []) or []:
if cf.get("fieldKey") == FIELD_FKEY or norm(cf.get("name")) == FIELD_NAME_NORM:
return cf.get("id"), cf.get("fieldKey")
return None, None
def current_field_value(opp, field_id):
"""Lee el valor actual del custom field en una opp del search. Maneja las
variantes de clave que devuelve GHL (id/fieldId, value/fieldValue/...)."""
for cf in opp.get("customFields") or []:
if cf.get("id") == field_id or cf.get("fieldId") == field_id:
return cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString")
return None
def plan_account(account):
"""Devuelve (field_id, field_key, actions). actions: dicts con status."""
loc = account["location_id"]
token = account["token"]
field_id, field_key = resolve_field(loc, token)
if not field_id:
return None, None, [{"status": "field_missing",
"details": "El campo 'ID Oportunidad Sucursal' no existe en esta cuenta."}]
opps = gc.get_all_opportunities(token, loc)
actions = []
for o in opps:
oid = o.get("id")
if not oid:
continue
cur = current_field_value(o, field_id)
if cur == oid:
actions.append({"status": "already_ok", "opp_id": oid})
else:
actions.append({"status": "to_set", "opp_id": oid,
"old": cur, "new": oid, "name": o.get("name")})
return field_id, field_key, actions
def apply_actions(account, field_id, field_key, actions, *, dry_run, run_id):
stats = {"set": 0, "skipped": 0, "errors": []}
to_set = [a for a in actions if a["status"] == "to_set"]
snap = {
"account": account["nombre"], "location_id": account["location_id"],
"field_id": field_id, "field_key": field_key,
"timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"dry_run": dry_run,
"total_opps": len(actions),
"to_set": [{"opp_id": a["opp_id"], "old": a.get("old"), "new": a["new"]} for a in to_set],
}
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
safe = "".join(c if c.isalnum() else "_" for c in account["nombre"])[:40]
snap_path = os.path.join(MIGRATIONS_DIR, f"fill_oppid_{safe}_{ts}.json")
with open(snap_path, "w", encoding="utf-8") as fh:
json.dump(snap, fh, ensure_ascii=False, indent=2, default=str)
stats["skipped"] = sum(1 for a in actions if a["status"] != "to_set")
if dry_run:
return stats, snap_path
token = account["token"]
for a in to_set:
oid = a["opp_id"]
change_id = None
if run_id:
change_id = script_audit.record_change(
run_id, account["location_id"], "opportunity",
oid, field_id, FIELD_NAME_NORM, {"value": a.get("old")}, {"value": oid})
try:
gc.update_opportunity(token, oid, {"customFields": [
{"id": field_id, "key": field_key, "field_value": oid}]})
if change_id:
script_audit.mark_change(change_id, "applied")
stats["set"] += 1
except Exception as exc:
if change_id:
script_audit.mark_change(change_id, "failed", str(exc))
stats["errors"].append({"opp_id": oid, "error": str(exc)})
return stats, snap_path
def print_plan(account, actions):
to_set = [a for a in actions if a["status"] == "to_set"]
ok = sum(1 for a in actions if a["status"] == "already_ok")
miss = [a for a in actions if a["status"] == "field_missing"]
print(f"\n[Plan] {account['nombre']} ({account['location_id']})")
if miss:
print(f"{miss[0]['details']}")
return
print(f" opps totales={len(actions)} ya ok={ok} a setear={len(to_set)}")
for a in to_set[:6]:
print(f" ▸ opp {a['opp_id']} {a.get('name')!r} campo: {a.get('old')!r}{a['new']!r}")
if len(to_set) > 6:
print(f" … y {len(to_set) - 6} más")
def select_targets(args, accounts):
if args.location:
m = [a for a in accounts if a["location_id"] == args.location]
if not m:
raise SystemExit(f"Location {args.location} no encontrada.")
if args.location == BRAND_LOCATION_ID:
raise SystemExit("Este script es SOLO para sucursales; no aplica a Marca.")
return m
if args.demos:
return [a for a in accounts if a["location_id"] in DEMO_LOCATION_IDS]
if args.all_productive:
return [a for a in accounts
if a["location_id"] not in DEMO_LOCATION_IDS
and a["location_id"] != BRAND_LOCATION_ID]
if args.all:
return [a for a in accounts if a["location_id"] != BRAND_LOCATION_ID]
raise SystemExit("Especifica --location <id>, --demos, --all-productive o --all.")
def run_fill(location_ids=None, opp_ids=None, dry_run=True, run_id=None, log=None):
"""Ejecuta el llenado del campo 'ID Oportunidad Sucursal' y devuelve un dict
JSON-serializable. Pensado para uso programático (endpoint del dashboard) y CLI.
Args:
location_ids: lista de location_ids (sucursales) a procesar. None = todas
las sucursales (excluye Marca). NUNCA toca Marca (defensa).
opp_ids: iterable opcional de opp_ids para filtrar (solo se actualizan
las opps en esta lista). None = todas las opps detectadas en scope.
dry_run: True (default) = no escribe en GHL.
run_id: id para script_audit (rollback).
log: callable opcional log(line).
"""
if log is None:
log = safe_print
opp_filter = set(opp_ids) if opp_ids else None
accounts = sync_engine.parse_accounts_csv()
accounts_by_id = {a["location_id"]: a for a in accounts}
if location_ids:
targets = []
for lid in location_ids:
if lid == BRAND_LOCATION_ID:
continue # defensa: nunca tocar Marca
acc = accounts_by_id.get(lid)
if acc:
targets.append(acc)
else:
targets = [a for a in accounts if a["location_id"] != BRAND_LOCATION_ID]
if run_id:
script_audit.create_run(
run_id, "fill_opp_id_oportunidad_sucursal.py",
arguments=("opp_ids:%d" % len(opp_filter)) if opp_filter else "all-branches",
locations=[t["location_id"] for t in targets])
log(f"Modo: {'DRY-RUN' if dry_run else 'APPLY'}")
log(f"Cuentas en scope (sucursales): {len(targets)}")
if opp_filter is not None:
log(f"Filtro opp_ids: {len(opp_filter)} opps")
summary = {
"accounts_processed": 0,
"opps_reviewed": 0,
"set": 0,
"skipped": 0,
"errors": 0,
}
items = [] # detalle por cuenta
errors_global = []
for acc in targets:
if not script_audit.wait_if_paused_or_stopped(run_id):
log("Detención solicitada. Saliendo.")
break
item = {"location_id": acc["location_id"], "account": acc["nombre"]}
try:
field_id, field_key, actions = plan_account(acc)
except Exception as exc:
log(f" [{acc['nombre']}] ERROR plan: {exc}")
errors_global.append({"account": acc["nombre"], "error": str(exc)})
summary["errors"] += 1
item.update({"status": "plan_error", "error": str(exc)})
items.append(item)
continue
# Filtrar por opp_ids si se pasó (mantiene el bookkeeping de "already_ok").
if opp_filter is not None:
actions = [a for a in actions if a.get("opp_id") in opp_filter]
opps_in_scope = sum(1 for a in actions if a.get("status") in ("to_set", "already_ok"))
summary["opps_reviewed"] += opps_in_scope
print_plan(acc, actions)
if field_id is None:
item.update({"status": "field_missing", "opps_in_scope": opps_in_scope})
items.append(item)
continue
try:
stats, snap_path = apply_actions(acc, field_id, field_key, actions,
dry_run=dry_run, run_id=run_id)
summary["set"] += stats["set"]
summary["skipped"] += stats["skipped"]
errors_global.extend(stats["errors"])
summary["errors"] += len(stats["errors"])
summary["accounts_processed"] += 1
log(f" snapshot: {snap_path}")
if not dry_run:
log(f" ✓ set={stats['set']} skipped={stats['skipped']} errors={len(stats['errors'])}")
item.update({
"status": "ok",
"opps_in_scope": opps_in_scope,
"set": stats["set"],
"skipped": stats["skipped"],
"errors": len(stats["errors"]),
"snapshot": snap_path,
})
items.append(item)
except Exception as exc:
log(f" ✗ apply falló: {exc}")
errors_global.append({"account": acc["nombre"], "error": str(exc)})
summary["errors"] += 1
item.update({"status": "apply_error", "error": str(exc)})
items.append(item)
log("\n" + "=" * 60)
if dry_run:
log("DRY-RUN — no se modificó nada. Revisa el plan y vuelve a correr con --apply.")
log(f"Cuentas procesadas : {len(targets)}")
log(f"Opps revisadas : {summary['opps_reviewed']}")
log(f"Campos seteados : {summary['set']}")
log(f"Saltados (ya ok) : {summary['skipped']}")
log(f"Errores : {len(errors_global)}")
for e in errors_global[:20]:
log(f" - {e}")
return {
"dry_run": dry_run,
"summary": summary,
"items": items,
"errors": errors_global,
}
def main():
parser = argparse.ArgumentParser(
description="Copia el id nativo de cada oportunidad a su custom field 'ID Oportunidad Sucursal' (solo sucursales).")
parser.add_argument("--location", help="Procesar solo esta cuenta (no Marca).")
parser.add_argument("--demos", action="store_true", help="Solo las 2 cuentas DEMO.")
parser.add_argument("--all-productive", dest="all_productive", action="store_true",
help="Todas las sucursales productivas (excluye Marca y DEMOs).")
parser.add_argument("--all", action="store_true", help="Todas las sucursales (excluye Marca).")
parser.add_argument("--apply", action="store_true",
help="Ejecuta los cambios. Sin este flag corre en DRY-RUN.")
parser.add_argument("--run-id", help="ID para registrar en script_audit (rollback).")
args = parser.parse_args()
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
accounts = sync_engine.parse_accounts_csv()
targets = select_targets(args, accounts)
location_ids = [t["location_id"] for t in targets]
run_fill(location_ids=location_ids, opp_ids=None,
dry_run=not args.apply, run_id=args.run_id, log=safe_print)
if __name__ == "__main__":
main()