400 lines
18 KiB
Python
400 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""Alinea el ORDEN (folder + posición) de los custom fields de cada sucursal al de Marca.
|
|
|
|
En GHL el orden visual de un custom field es jerárquico: **(folder, position
|
|
dentro del folder)**. Varios campos pueden compartir la misma `position`
|
|
numérica si viven en folders distintos. Por eso, para que una sucursal se vea
|
|
igual que Marca hay que alinear DOS atributos por campo:
|
|
1. `parentId` → el folder homólogo (resuelto por NOMBRE, no por id: los
|
|
folder ids difieren entre cuentas aunque el nombre coincida).
|
|
2. `position` → la misma posición numérica que en Marca.
|
|
|
|
Verificado empíricamente contra la API V2 de GHL:
|
|
- `PUT /locations/{loc}/customFields/{id}` con body `{"parentId": ..., "position": N}`
|
|
aplica AMBOS atributos (status 200).
|
|
- CRÍTICO: enviar `position` SOLA es IGNORADO por la API (igual que `dataType`).
|
|
Debe ir SIEMPRE acompañada de `parentId` en el mismo body.
|
|
- El cambio es puramente organizacional (UI de configuración). No afecta
|
|
valores guardados, workflows, formularios públicos ni integraciones.
|
|
|
|
Matching de campos: por `fieldKey` (primario) con fallback a nombre normalizado.
|
|
- Campos que solo existen en la sucursal (no en Marca) NO se tocan.
|
|
- Campos que solo existen en Marca no aplican porque la sucursal no los tiene
|
|
(usar create_missing_brand_fields.py para clonarlos primero).
|
|
- Si el folder de referencia no existe en la sucursal, se omite el campo y se
|
|
reporta como `folder_missing` (no se reubica a ciegas).
|
|
|
|
Uso:
|
|
|
|
# Plan (dry-run) en una cuenta
|
|
python scripts/align_custom_field_order.py --location <id>
|
|
|
|
# Plan en todas las sucursales productivas (excluye Marca y DEMOs)
|
|
python scripts/align_custom_field_order.py --all-productive
|
|
|
|
# Apply en las 2 DEMO primero (validación)
|
|
python scripts/align_custom_field_order.py --demos --apply --run-id <uuid>
|
|
|
|
# Rollout productivo
|
|
python scripts/align_custom_field_order.py --all-productive --apply --run-id <uuid>
|
|
|
|
Sin `--apply` el script corre en DRY-RUN y no escribe nada.
|
|
"""
|
|
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import unicodedata
|
|
import warnings
|
|
|
|
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
|
|
import requests
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
|
|
import sync_engine # noqa: E402
|
|
import script_audit # noqa: E402
|
|
from paths import MIGRATIONS_DIR # noqa: E402
|
|
|
|
BASE_URL = "https://services.leadconnectorhq.com"
|
|
API_VERSION = "2021-07-28"
|
|
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
|
|
DEMO_LOCATION_IDS = ["Vf7qQl3L9vakJ8hDtQ8e", "Z64WQKORPVwXb5mn68Ef"]
|
|
|
|
_last_request_by_token: dict = {}
|
|
|
|
|
|
def normalize_name(value):
|
|
s = str(value or "").strip().lower()
|
|
s = unicodedata.normalize("NFKD", s)
|
|
s = "".join(c for c in s if not unicodedata.combining(c))
|
|
return " ".join(s.split())
|
|
|
|
|
|
def wait_for_rate_limit(token):
|
|
elapsed = time.time() - _last_request_by_token.get(token, 0)
|
|
if elapsed < 0.110:
|
|
time.sleep(0.110 - elapsed)
|
|
_last_request_by_token[token] = time.time()
|
|
|
|
|
|
def ghl_request(method, endpoint, token, *, params=None, json_body=None,
|
|
version=API_VERSION, retries=3):
|
|
wait_for_rate_limit(token)
|
|
url = endpoint if endpoint.startswith("http") else f"{BASE_URL}{endpoint}"
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"Authorization": f"Bearer {token}",
|
|
"Version": version,
|
|
"Content-Type": "application/json",
|
|
}
|
|
last_exc = None
|
|
for attempt in range(retries):
|
|
try:
|
|
resp = requests.request(method, url, headers=headers, params=params,
|
|
json=json_body, timeout=30)
|
|
except requests.RequestException as exc:
|
|
last_exc = exc
|
|
time.sleep(0.5 * (attempt + 1))
|
|
continue
|
|
if resp.status_code == 429:
|
|
time.sleep(1.0 * (attempt + 1))
|
|
continue
|
|
if resp.status_code >= 500:
|
|
time.sleep(1.0 * (attempt + 1))
|
|
continue
|
|
resp.raise_for_status()
|
|
if resp.status_code == 204:
|
|
return {}
|
|
return resp.json()
|
|
if last_exc:
|
|
raise last_exc
|
|
|
|
|
|
# ───────────────────────────────────────────────────────────────────────────
|
|
# Lectura del schema (campos + folders)
|
|
# ───────────────────────────────────────────────────────────────────────────
|
|
|
|
def list_fields_and_folders(account, model):
|
|
"""Devuelve (fields, folders_by_id).
|
|
|
|
Los folders no vienen en la lista legacy; se resuelven con un GET por cada
|
|
`parentId` referenciado (igual que move_custom_fields_between_folders.py).
|
|
"""
|
|
data = ghl_request("GET", f"/locations/{account['location_id']}/customFields",
|
|
account["token"], params={"model": model})
|
|
items = data.get("customFields") or []
|
|
fields = [f for f in items if f.get("documentType") != "folder"]
|
|
parent_ids = {f.get("parentId") for f in fields if f.get("parentId")}
|
|
folders_by_id = {}
|
|
for pid in parent_ids:
|
|
try:
|
|
d = ghl_request("GET", f"/locations/{account['location_id']}/customFields/{pid}",
|
|
account["token"])
|
|
cf = d.get("customField") or d
|
|
if cf.get("documentType") == "folder":
|
|
folders_by_id[pid] = cf
|
|
except Exception as exc:
|
|
print(f" ⚠ No pude leer folder {pid}: {exc}")
|
|
return fields, folders_by_id
|
|
|
|
|
|
def build_brand_reference(brand, model):
|
|
"""Construye el mapa de referencia de Marca para un model.
|
|
|
|
Devuelve dict con índices por fieldKey y por nombre normalizado. Cada entrada:
|
|
{name, fieldKey, folder_norm, position}
|
|
"""
|
|
fields, folders_by_id = list_fields_and_folders(brand, model)
|
|
ref_by_fk = {}
|
|
ref_by_norm = {}
|
|
for f in fields:
|
|
folder = folders_by_id.get(f.get("parentId")) or {}
|
|
entry = {
|
|
"name": f.get("name"),
|
|
"fieldKey": f.get("fieldKey"),
|
|
"folder_name": folder.get("name"),
|
|
"folder_norm": normalize_name(folder.get("name")),
|
|
"position": f.get("position"),
|
|
}
|
|
if f.get("fieldKey"):
|
|
ref_by_fk[f["fieldKey"]] = entry
|
|
ref_by_norm[normalize_name(f.get("name"))] = entry
|
|
return {"by_fk": ref_by_fk, "by_norm": ref_by_norm, "count": len(fields)}
|
|
|
|
|
|
# ───────────────────────────────────────────────────────────────────────────
|
|
# Plan por (cuenta, model)
|
|
# ───────────────────────────────────────────────────────────────────────────
|
|
|
|
def plan_account_model(account, model, brand_ref):
|
|
"""Devuelve lista de acciones para alinear el orden de `model` en `account`."""
|
|
actions = []
|
|
fields, folders_by_id = list_fields_and_folders(account, model)
|
|
folder_id_by_norm = {normalize_name(fo.get("name")): pid
|
|
for pid, fo in folders_by_id.items()}
|
|
|
|
for f in fields:
|
|
fk = f.get("fieldKey")
|
|
ref = brand_ref["by_fk"].get(fk) or brand_ref["by_norm"].get(normalize_name(f.get("name")))
|
|
if not ref:
|
|
actions.append({
|
|
"status": "extra_in_branch", "object": model,
|
|
"field_id": f.get("id"), "field_name": f.get("name"),
|
|
"details": "Campo no existe en Marca — no se toca.",
|
|
})
|
|
continue
|
|
|
|
target_pid = folder_id_by_norm.get(ref["folder_norm"])
|
|
target_pos = ref["position"]
|
|
cur_pid = f.get("parentId")
|
|
cur_pos = f.get("position")
|
|
cur_folder = (folders_by_id.get(cur_pid) or {}).get("name")
|
|
|
|
if not target_pid:
|
|
actions.append({
|
|
"status": "folder_missing", "object": model,
|
|
"field_id": f.get("id"), "field_name": f.get("name"),
|
|
"ref_folder": ref["folder_name"],
|
|
"details": f"Folder {ref['folder_name']!r} no existe en la sucursal.",
|
|
})
|
|
continue
|
|
|
|
if cur_pid == target_pid and cur_pos == target_pos:
|
|
actions.append({
|
|
"status": "already_aligned", "object": model,
|
|
"field_id": f.get("id"), "field_name": f.get("name"),
|
|
})
|
|
continue
|
|
|
|
actions.append({
|
|
"status": "to_align", "object": model,
|
|
"field_id": f.get("id"), "field_name": f.get("name"),
|
|
"from": {"folder": cur_folder, "parentId": cur_pid, "position": cur_pos},
|
|
"to": {"folder": ref["folder_name"], "parentId": target_pid, "position": target_pos},
|
|
})
|
|
return actions
|
|
|
|
|
|
def apply_actions(account, actions, *, dry_run, run_id):
|
|
"""Ejecuta los `to_align`. Devuelve stats."""
|
|
stats = {"aligned": 0, "skipped": 0, "errors": []}
|
|
snap = {
|
|
"account": account["nombre"],
|
|
"location_id": account["location_id"],
|
|
"timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
|
"dry_run": dry_run,
|
|
"actions": actions,
|
|
}
|
|
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
|
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
safe = "".join(c if c.isalnum() else "_" for c in account["nombre"])[:40]
|
|
snap_path = os.path.join(MIGRATIONS_DIR, f"align_order_{safe}_{ts}.json")
|
|
with open(snap_path, "w", encoding="utf-8") as fh:
|
|
json.dump(snap, fh, ensure_ascii=False, indent=2, default=str)
|
|
|
|
for a in actions:
|
|
if a["status"] != "to_align":
|
|
stats["skipped"] += 1
|
|
continue
|
|
if dry_run:
|
|
continue
|
|
change_id = None
|
|
if run_id:
|
|
change_id = script_audit.record_change(
|
|
run_id, account["location_id"], "__schema__",
|
|
a["field_id"], a["field_id"], a["field_name"],
|
|
{"parentId": a["from"]["parentId"], "position": a["from"]["position"]},
|
|
{"parentId": a["to"]["parentId"], "position": a["to"]["position"]},
|
|
)
|
|
try:
|
|
ghl_request("PUT",
|
|
f"/locations/{account['location_id']}/customFields/{a['field_id']}",
|
|
account["token"],
|
|
json_body={"parentId": a["to"]["parentId"],
|
|
"position": a["to"]["position"]})
|
|
if change_id:
|
|
script_audit.mark_change(change_id, "applied")
|
|
stats["aligned"] += 1
|
|
except Exception as exc:
|
|
if change_id:
|
|
script_audit.mark_change(change_id, "failed", str(exc))
|
|
stats["errors"].append({"field_id": a["field_id"], "error": str(exc)})
|
|
|
|
return stats, snap_path
|
|
|
|
|
|
# ───────────────────────────────────────────────────────────────────────────
|
|
# Reporte
|
|
# ───────────────────────────────────────────────────────────────────────────
|
|
|
|
def print_plan(account, actions):
|
|
to_align = [a for a in actions if a["status"] == "to_align"]
|
|
aligned = sum(1 for a in actions if a["status"] == "already_aligned")
|
|
extra = sum(1 for a in actions if a["status"] == "extra_in_branch")
|
|
missing_folder = [a for a in actions if a["status"] == "folder_missing"]
|
|
print(f"\n[Plan] {account['nombre']} ({account['location_id']})")
|
|
print(f" ya alineados={aligned} a alinear={len(to_align)} extra(sin tocar)={extra} folder_faltante={len(missing_folder)}")
|
|
for a in to_align:
|
|
fr, to = a["from"], a["to"]
|
|
print(f" ▸ {a['object']}.{a['field_name']!r}")
|
|
print(f" folder {fr['folder']!r} pos={fr['position']} → {to['folder']!r} pos={to['position']}")
|
|
for a in missing_folder:
|
|
print(f" ✗ folder_missing {a['object']}.{a['field_name']!r}: {a['details']}")
|
|
|
|
|
|
# ───────────────────────────────────────────────────────────────────────────
|
|
# CLI
|
|
# ───────────────────────────────────────────────────────────────────────────
|
|
|
|
def select_targets(args, accounts):
|
|
if args.location:
|
|
m = [a for a in accounts if a["location_id"] == args.location]
|
|
if not m:
|
|
raise SystemExit(f"Location {args.location} no encontrada.")
|
|
return m
|
|
if args.demos:
|
|
return [a for a in accounts if a["location_id"] in DEMO_LOCATION_IDS]
|
|
if args.all_productive:
|
|
return [a for a in accounts
|
|
if a["location_id"] not in DEMO_LOCATION_IDS
|
|
and a["location_id"] != BRAND_LOCATION_ID]
|
|
if args.all:
|
|
return [a for a in accounts if a["location_id"] != BRAND_LOCATION_ID]
|
|
raise SystemExit("Especifica --location <id>, --demos, --all-productive o --all.")
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Alinea el orden (folder + posición) de custom fields de cada sucursal al de Marca.")
|
|
parser.add_argument("--location", help="Procesar solo esta cuenta.")
|
|
parser.add_argument("--demos", action="store_true", help="Procesar solo las 2 cuentas DEMO.")
|
|
parser.add_argument("--all-productive", dest="all_productive", action="store_true",
|
|
help="Procesar todas las sucursales productivas (excluye Marca y DEMOs).")
|
|
parser.add_argument("--all", action="store_true", help="Procesar todas las sucursales (excluye Marca).")
|
|
parser.add_argument("--object", choices=["contact", "opportunity", "both"], default="both",
|
|
help="Objeto a alinear. Default both.")
|
|
parser.add_argument("--apply", action="store_true",
|
|
help="Ejecuta los cambios. Sin este flag corre en DRY-RUN.")
|
|
parser.add_argument("--run-id", help="ID para registrar en script_audit (rollback).")
|
|
args = parser.parse_args()
|
|
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
|
|
dry_run = not args.apply
|
|
models = ["contact", "opportunity"] if args.object == "both" else [args.object]
|
|
accounts = sync_engine.parse_accounts_csv()
|
|
brand = next((a for a in accounts if a["location_id"] == BRAND_LOCATION_ID), None)
|
|
if not brand:
|
|
raise SystemExit(f"Cuenta de Marca {BRAND_LOCATION_ID} no encontrada en el CSV.")
|
|
targets = select_targets(args, accounts)
|
|
|
|
# Crear el run en script_audit ANTES de registrar cambios: record_change
|
|
# inserta en script_change_log con run_id como FK a script_runs(id). Sin esta
|
|
# fila, el INSERT falla con FOREIGN KEY constraint. create_run es idempotente
|
|
# (INSERT OR IGNORE) y llama init_audit_db internamente.
|
|
if args.run_id:
|
|
script_audit.create_run(
|
|
args.run_id, "align_custom_field_order.py",
|
|
arguments=" ".join(sys.argv[1:]),
|
|
locations=[t["location_id"] for t in targets],
|
|
)
|
|
|
|
print(f"Modo: {'DRY-RUN' if dry_run else 'APPLY'}")
|
|
print(f"Objetos: {', '.join(models)}")
|
|
print(f"Cuentas en scope: {len(targets)}")
|
|
|
|
# Referencia de Marca (cache por model)
|
|
print(f"\n[BRAND] {brand['nombre']} ({BRAND_LOCATION_ID})")
|
|
brand_ref = {}
|
|
for model in models:
|
|
brand_ref[model] = build_brand_reference(brand, model)
|
|
print(f" {model}: {brand_ref[model]['count']} custom fields de referencia")
|
|
|
|
total_aligned = total_skipped = 0
|
|
errors_global = []
|
|
for acc in targets:
|
|
if not script_audit.wait_if_paused_or_stopped(args.run_id):
|
|
print("Detención solicitada. Saliendo.")
|
|
break
|
|
all_actions = []
|
|
try:
|
|
for model in models:
|
|
all_actions.extend(plan_account_model(acc, model, brand_ref[model]))
|
|
except Exception as exc:
|
|
print(f" [{acc['nombre']}] ERROR plan: {exc}")
|
|
errors_global.append({"account": acc["nombre"], "error": str(exc)})
|
|
continue
|
|
print_plan(acc, all_actions)
|
|
try:
|
|
stats, snap_path = apply_actions(acc, all_actions, dry_run=dry_run, run_id=args.run_id)
|
|
total_aligned += stats["aligned"]
|
|
total_skipped += stats["skipped"]
|
|
errors_global.extend(stats["errors"])
|
|
print(f" snapshot: {snap_path}")
|
|
if not dry_run:
|
|
print(f" ✓ aligned={stats['aligned']} skipped={stats['skipped']} errors={len(stats['errors'])}")
|
|
except Exception as exc:
|
|
print(f" ✗ apply falló: {exc}")
|
|
errors_global.append({"account": acc["nombre"], "error": str(exc)})
|
|
|
|
print("\n" + "=" * 60)
|
|
if dry_run:
|
|
print("DRY-RUN — no se modificó nada. Revisa el plan y vuelve a correr con --apply.")
|
|
print(f"Cuentas procesadas : {len(targets)}")
|
|
print(f"Campos alineados : {total_aligned}")
|
|
print(f"Acciones saltadas : {total_skipped}")
|
|
print(f"Errores : {len(errors_global)}")
|
|
for e in errors_global[:20]:
|
|
print(f" - {e}")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|