Files
MP-Manager/scripts/move_custom_fields_between_folders.py
2026-05-30 14:31:19 -06:00

376 lines
16 KiB
Python

#!/usr/bin/env python3
"""Mueve custom fields entre folders (bloques) en una o varias cuentas GHL.
Cada custom field en GHL pertenece a un "folder" (bloque visual en la UI de
configuración). Aunque los fields heredan opciones/dataType desde Marca, la
asociación a un folder se hace por `parentId`, y los folder IDs son distintos
en cada location (aunque los nombres suelen coincidir). Este script:
1. Resuelve los folder IDs por nombre normalizado en cada cuenta.
2. Localiza el field a mover por nombre normalizado.
3. PUT del field con el nuevo `parentId`.
Verificado contra la API V2 de GHL:
- `PUT /locations/{loc}/customFields/{id}` con body `{"parentId": "..."}` →
status 200, mueve el field.
- El cambio es puramente organizacional (UI). No afecta valores guardados,
workflows, formularios públicos ni integraciones.
Uso típico:
# Plan en una cuenta:
python scripts/move_custom_fields_between_folders.py \\
--location Vf7qQl3L9vakJ8hDtQ8e
# Apply en las 2 DEMOs:
python scripts/move_custom_fields_between_folders.py \\
--location Vf7qQl3L9vakJ8hDtQ8e --apply --run-id <uuid>
# Rollout final:
python scripts/move_custom_fields_between_folders.py \\
--all --apply --run-id <uuid>
Para cambiar los movimientos, editar la tabla `MOVES` al inicio del script.
"""
import argparse
import datetime
import json
import os
import sys
import time
import unicodedata
import warnings
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
import requests
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
import sync_engine # noqa: E402
import script_audit # noqa: E402
from paths import MIGRATIONS_DIR # noqa: E402
BASE_URL = "https://services.leadconnectorhq.com"
API_VERSION = "2021-07-28"
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
# ───────────────────────────────────────────────────────────────────────────
# Tabla de movimientos
# ───────────────────────────────────────────────────────────────────────────
# Cada entrada describe un field a mover. El match se hace por nombre
# normalizado (case + acentos insensitive) tanto del field como del folder.
MOVES = [
{
"object": "contact",
"field_name": "Sucursal", # 48/50 cuentas lo tienen en "Form"
"target_folder": "Contact",
},
{
"object": "contact",
"field_name": "Marca del Vehículo", # match por norm (acepta variantes)
"target_folder": "Contact",
},
{
"object": "contact",
"field_name": "Año del Vehículo",
"target_folder": "Contact",
},
{
"object": "contact",
"field_name": "Versión del Vehículo",
"target_folder": "Contact",
},
]
_last_request_by_token: dict = {}
def normalize_name(value):
s = str(value or "").strip().lower()
s = unicodedata.normalize("NFKD", s)
s = "".join(c for c in s if not unicodedata.combining(c))
return " ".join(s.split())
def wait_for_rate_limit(token):
elapsed = time.time() - _last_request_by_token.get(token, 0)
if elapsed < 0.110:
time.sleep(0.110 - elapsed)
_last_request_by_token[token] = time.time()
def ghl_request(method, endpoint, token, *, params=None, json_body=None,
version=API_VERSION, retries=3):
wait_for_rate_limit(token)
url = endpoint if endpoint.startswith("http") else f"{BASE_URL}{endpoint}"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {token}",
"Version": version,
"Content-Type": "application/json",
}
last_exc = None
for attempt in range(retries):
try:
resp = requests.request(method, url, headers=headers, params=params,
json=json_body, timeout=30)
except requests.RequestException as exc:
last_exc = exc
time.sleep(0.5 * (attempt + 1))
continue
if resp.status_code == 429:
time.sleep(1.0 * (attempt + 1))
continue
if resp.status_code >= 500:
time.sleep(1.0 * (attempt + 1))
continue
resp.raise_for_status()
if resp.status_code == 204:
return {}
return resp.json()
if last_exc:
raise last_exc
# ───────────────────────────────────────────────────────────────────────────
# Lectura del schema (campos + folders)
# ───────────────────────────────────────────────────────────────────────────
def list_custom_fields_and_folders(account, model):
"""Lista los custom fields. Folders se obtienen por GET individual por
`parentId` (el endpoint legacy no devuelve folders en la lista)."""
data = ghl_request("GET", f"/locations/{account['location_id']}/customFields",
account["token"], params={"model": model})
items = data.get("customFields") or []
fields = [f for f in items if f.get("documentType") != "folder"]
# Resolver folders únicos referenciados por parentId
parent_ids = {f.get("parentId") for f in fields if f.get("parentId")}
folders = []
for pid in parent_ids:
try:
d = ghl_request("GET", f"/locations/{account['location_id']}/customFields/{pid}",
account["token"])
cf = d.get("customField") or d
if cf.get("documentType") == "folder":
folders.append(cf)
except Exception as exc:
print(f" ⚠ No pude leer folder {pid}: {exc}")
return fields, folders
# ───────────────────────────────────────────────────────────────────────────
# Núcleo: plan + apply por (cuenta, move)
# ───────────────────────────────────────────────────────────────────────────
def plan_account(account):
"""Devuelve lista de acciones a ejecutar para esta cuenta."""
actions = []
# Cache por object para no llamar API repetido
cache = {}
for move in MOVES:
obj = move["object"]
if obj not in cache:
cache[obj] = list_custom_fields_and_folders(account, obj)
fields, folders = cache[obj]
field_norm = normalize_name(move["field_name"])
folder_norm = normalize_name(move["target_folder"])
field = next((f for f in fields if normalize_name(f.get("name")) == field_norm), None)
folder = next((fo for fo in folders if normalize_name(fo.get("name")) == folder_norm), None)
if not field:
actions.append({
"status": "field_not_found", "move": move,
"details": f"No existe field {move['field_name']!r} en {obj}.",
})
continue
if not folder:
actions.append({
"status": "folder_not_found", "move": move,
"field_id": field.get("id"), "field_name": field.get("name"),
"details": f"No existe folder {move['target_folder']!r} en esta cuenta.",
})
continue
if field.get("parentId") == folder.get("id"):
actions.append({
"status": "already_in_target", "move": move,
"field_id": field.get("id"), "field_name": field.get("name"),
"current_parent_id": field.get("parentId"),
"target_folder_id": folder.get("id"),
"target_folder_name": folder.get("name"),
})
continue
actions.append({
"status": "to_move", "move": move,
"field_id": field.get("id"), "field_name": field.get("name"),
"current_parent_id": field.get("parentId"),
"target_folder_id": folder.get("id"),
"target_folder_name": folder.get("name"),
})
return actions
def apply_actions(account, actions, *, dry_run, run_id):
"""Ejecuta los `to_move` del plan. Devuelve stats."""
stats = {"moved": 0, "skipped": 0, "errors": []}
# Snapshot
snap = {
"account": account["nombre"],
"location_id": account["location_id"],
"timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"dry_run": dry_run,
"actions": actions,
}
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
safe = "".join(c if c.isalnum() else "_" for c in account["nombre"])[:40]
snap_path = os.path.join(MIGRATIONS_DIR, f"move_folders_{safe}_{ts}.json")
with open(snap_path, "w", encoding="utf-8") as fh:
json.dump(snap, fh, ensure_ascii=False, indent=2, default=str)
for a in actions:
if a["status"] != "to_move":
stats["skipped"] += 1
continue
if dry_run:
continue
change_id = None
if run_id:
change_id = script_audit.record_change(
run_id, account["location_id"], "__schema__",
a["field_id"], a["field_id"], a["field_name"],
{"parentId": a["current_parent_id"]},
{"parentId": a["target_folder_id"]},
)
try:
ghl_request("PUT", f"/locations/{account['location_id']}/customFields/{a['field_id']}",
account["token"], json_body={"parentId": a["target_folder_id"]})
if change_id:
script_audit.mark_change(change_id, "applied")
stats["moved"] += 1
except Exception as exc:
if change_id:
script_audit.mark_change(change_id, "failed", str(exc))
stats["errors"].append({"field_id": a["field_id"], "error": str(exc)})
return stats, snap_path
# ───────────────────────────────────────────────────────────────────────────
# Reporte
# ───────────────────────────────────────────────────────────────────────────
def print_plan(account, actions):
print(f"\n[Plan] {account['nombre']} ({account['location_id']})")
for a in actions:
m = a["move"]
if a["status"] == "to_move":
print(f" ▸ MOVER {m['object']}.{a['field_name']!r}")
print(f" from parentId={a['current_parent_id']} → to {a['target_folder_name']!r} ({a['target_folder_id']})")
elif a["status"] == "already_in_target":
print(f" ✓ ya está {m['object']}.{a['field_name']!r} en {a['target_folder_name']!r}")
else:
print(f"{a['status']} {m['field_name']!r}: {a.get('details','')}")
# ───────────────────────────────────────────────────────────────────────────
# CLI
# ───────────────────────────────────────────────────────────────────────────
DEMO_LOCATION_IDS = ["Vf7qQl3L9vakJ8hDtQ8e", "Z64WQKORPVwXb5mn68Ef"]
def select_targets(args, accounts):
if args.location:
m = [a for a in accounts if a["location_id"] == args.location]
if not m:
raise SystemExit(f"Location {args.location} no encontrada.")
return m
if args.demos:
return [a for a in accounts if a["location_id"] in DEMO_LOCATION_IDS]
if args.brand:
return [a for a in accounts if a["location_id"] == BRAND_LOCATION_ID]
if args.all_productive:
return [a for a in accounts
if a["location_id"] not in DEMO_LOCATION_IDS
and a["location_id"] != BRAND_LOCATION_ID]
if args.all:
return accounts
raise SystemExit("Especifica --location, --demos, --brand, --all-productive o --all.")
def main():
parser = argparse.ArgumentParser(description="Mueve custom fields entre folders en GHL.")
parser.add_argument("--location", help="Procesar solo esta cuenta.")
parser.add_argument("--demos", action="store_true",
help="Procesar solo las 2 cuentas DEMO.")
parser.add_argument("--brand", action="store_true",
help="Procesar solo Marca.")
parser.add_argument("--all-productive", action="store_true",
help="Procesar todas las sucursales productivas (excluye Marca y DEMOs).")
parser.add_argument("--all", action="store_true", help="Procesar todas las cuentas.")
parser.add_argument("--apply", action="store_true",
help="Ejecuta los cambios. Sin este flag corre en --dry-run.")
parser.add_argument("--run-id", help="ID para registrar en script_audit.")
args = parser.parse_args()
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
if args.run_id:
script_audit.init_audit_db()
dry_run = not args.apply
accounts = sync_engine.parse_accounts_csv()
targets = select_targets(args, accounts)
print(f"Modo: {'DRY-RUN' if dry_run else 'APPLY'}")
print(f"Movimientos configurados: {len(MOVES)}")
for m in MOVES:
print(f" - {m['object']}.{m['field_name']!r} → folder {m['target_folder']!r}")
print(f"Cuentas en scope: {len(targets)}\n")
total_moved = total_skipped = 0
errors_global = []
for acc in targets:
if not script_audit.wait_if_paused_or_stopped(args.run_id):
print("Detención solicitada. Saliendo.")
break
try:
actions = plan_account(acc)
except Exception as exc:
print(f" [{acc['nombre']}] ERROR plan: {exc}")
errors_global.append({"account": acc["nombre"], "error": str(exc)})
continue
print_plan(acc, actions)
try:
stats, snap_path = apply_actions(acc, actions, dry_run=dry_run, run_id=args.run_id)
total_moved += stats["moved"]
total_skipped += stats["skipped"]
errors_global.extend(stats["errors"])
print(f" snapshot: {snap_path}")
if not dry_run:
print(f" ✓ moved={stats['moved']} skipped={stats['skipped']} errors={len(stats['errors'])}")
except Exception as exc:
print(f" ✗ apply falló: {exc}")
errors_global.append({"account": acc["nombre"], "error": str(exc)})
print("\n" + "=" * 60)
if dry_run:
print("DRY-RUN — no se modificó nada. Revisa el plan y vuelve a correr con --apply.")
print(f"Cuentas procesadas : {len(targets)}")
print(f"Movimientos hechos : {total_moved}")
print(f"Acciones saltadas : {total_skipped}")
print(f"Errores : {len(errors_global)}")
if __name__ == "__main__":
main()