821 lines
34 KiB
Python
821 lines
34 KiB
Python
#!/usr/bin/env python3
|
||
"""Migra 6 custom fields de sucursales para alinearlos al fieldKey de Marca.
|
||
|
||
GHL no permite cambiar `fieldKey` directamente (es inmutable). Esta migración:
|
||
1. Captura snapshot de valores actuales por record en JSON local.
|
||
2. DELETE del campo viejo en la sucursal.
|
||
3. POST create con nombre SIN acentos → GHL auto-genera el fieldKey de Marca.
|
||
4. Verifica que el fieldKey generado coincide con el objetivo. Si no, aborta.
|
||
5. PUT rename al display name de Marca (acentos restaurados).
|
||
6. PUT restore de valores en cada record.
|
||
7. Registra cada operación en `script_audit`.
|
||
|
||
Default: DRY-RUN. Usar --apply para escritura real. Recomendado correr primero
|
||
en --location de DEMO, luego en una sucursal canary (Cd. Carmen), después en
|
||
lotes con --batch <id1,id2,...> o --all.
|
||
|
||
Snapshots se escriben en `generated/migrations/<branch>_<field_key>_<ts>.json`
|
||
(vía `paths.MIGRATIONS_DIR`) para recuperar valores aunque el restore falle.
|
||
"""
|
||
|
||
import argparse
|
||
import datetime
|
||
import json
|
||
import os
|
||
import sys
|
||
import time
|
||
import unicodedata
|
||
import warnings
|
||
from urllib.parse import parse_qs, urlparse
|
||
|
||
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
|
||
import requests
|
||
|
||
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||
if ROOT_DIR not in sys.path:
|
||
sys.path.insert(0, ROOT_DIR)
|
||
|
||
import sync_engine # noqa: E402
|
||
import script_audit # noqa: E402
|
||
|
||
BASE_URL = "https://services.leadconnectorhq.com"
|
||
API_VERSION = "2021-07-28"
|
||
OBJECT_API_VERSION = "2021-04-15"
|
||
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
|
||
|
||
from paths import MIGRATIONS_DIR # noqa: E402
|
||
|
||
|
||
# Lista exhaustiva de las 6 migraciones. Cada `target_fieldkey` debe existir en
|
||
# Marca con esa convención exacta (GHL no genera ese slug en su slugifier
|
||
# actual; estos fieldKeys son artefactos históricos de Marca).
|
||
#
|
||
# `create_name` es el name a usar al POST — está calibrado para que GHL
|
||
# produzca `target_fieldkey` con su slugifier actual (testeado en DEMO).
|
||
# `final_name` es el display name que se PUT-renombra después.
|
||
FIELD_MIGRATIONS = [
|
||
{
|
||
"object": "contact",
|
||
"target_fieldkey": "contact.acepta_los_terminos_para_tu_cotizacion",
|
||
"create_name": "Acepta los terminos para tu cotizacion",
|
||
"final_name": "Acepta los terminos para tu cotización",
|
||
},
|
||
{
|
||
"object": "contact",
|
||
"target_fieldkey": "contact.cuando_necesitas_el_dinero",
|
||
"create_name": "Cuando necesitas el dinero",
|
||
"final_name": "¿Cuándo necesitas el dinero?",
|
||
},
|
||
{
|
||
"object": "opportunity",
|
||
"target_fieldkey": "opportunity.visita_sucursal",
|
||
"create_name": "Visita Sucursal",
|
||
"final_name": "Visita a Sucursal",
|
||
},
|
||
{
|
||
"object": "opportunity",
|
||
"target_fieldkey": "opportunity.fecha_de_ultima_visita_a_sucursal",
|
||
"create_name": "Fecha de ultima visita a sucursal",
|
||
"final_name": "Fecha de ultima visita a sucursal",
|
||
},
|
||
{
|
||
"object": "opportunity",
|
||
"target_fieldkey": "opportunity.persona_que_atendie_al_prospecto",
|
||
"create_name": "Persona que atendie al prospecto",
|
||
"final_name": "Persona que atendió al prospecto ", # trailing space para coincidir con Marca
|
||
},
|
||
{
|
||
"object": "opportunity",
|
||
"target_fieldkey": "opportunity.modalidad_del_empeno",
|
||
"create_name": "Modalidad del Empeno",
|
||
"final_name": "Modalidad de Empeño ", # trailing space para coincidir con Marca
|
||
},
|
||
# Cobertura para sucursales sin twin clean (caso Sendero con 83 records en
|
||
# `opportunity.fuente_del_prospecto` y sin `opportunity.fuente_de_prospecto`).
|
||
# En sucursales que ya tienen el clean, el script lo marca already_migrated.
|
||
{
|
||
"object": "opportunity",
|
||
"target_fieldkey": "opportunity.fuente_de_prospecto",
|
||
"create_name": "Fuente de Prospecto",
|
||
"final_name": "Fuente de Prospecto",
|
||
},
|
||
]
|
||
|
||
ORPHAN_FIELDKEYS_TO_DELETE = [
|
||
# Duplicado pre-existente de "Fuente de Prospecto". Marca tiene
|
||
# `opportunity.fuente_de_prospecto` (sin "l"); algunas sucursales tienen
|
||
# además este con "l".
|
||
#
|
||
# `force=True` + `twin_fieldkey` activa borrado aunque tenga records con
|
||
# valor: el alias resolver del workflow ya escribió esos mismos valores
|
||
# también en el twin "limpio", así que no hay data loss real al borrar.
|
||
{
|
||
"object": "opportunity",
|
||
"fieldkey": "opportunity.fuente_del_prospecto",
|
||
"twin_fieldkey": "opportunity.fuente_de_prospecto",
|
||
"force": True,
|
||
"reason": "Duplicado de Fuente de Prospecto. Valores ya replicados en twin clean por el workflow.",
|
||
},
|
||
# Variante de contact (Uruapan).
|
||
# SIN force: no sabemos si los valores están replicados en el twin contact;
|
||
# si tiene valores, se reportará y se trata manualmente.
|
||
{
|
||
"object": "contact",
|
||
"fieldkey": "contact.fuente_del_prospecto",
|
||
"twin_fieldkey": "contact.fuente_de_prospecto",
|
||
"reason": "Duplicado de Fuente de Prospecto en contact.",
|
||
},
|
||
# Orphans con doble prefijo `contactcontact*` (Marina Nacional + Qro DEMO).
|
||
# SIN force: si tiene valores, skip y revisar manualmente.
|
||
{
|
||
"object": "contact",
|
||
"fieldkey": "contact.contactmarca_del_vehiculo",
|
||
"twin_fieldkey": "contact.marca_del_vehiculo",
|
||
"reason": "Orphan con doble prefijo.",
|
||
},
|
||
{
|
||
"object": "contact",
|
||
"fieldkey": "contact.contactano_del_vehiculo",
|
||
"twin_fieldkey": "contact.ano_del_vehiculo",
|
||
"reason": "Orphan con doble prefijo.",
|
||
},
|
||
{
|
||
"object": "contact",
|
||
"fieldkey": "contact.contactversion_del_vehiculo",
|
||
"twin_fieldkey": "contact.version_del_vehiculo",
|
||
"reason": "Orphan con doble prefijo.",
|
||
},
|
||
{
|
||
"object": "contact",
|
||
"fieldkey": "contact.contactfuente_del_prospecto",
|
||
"twin_fieldkey": "contact.fuente_de_prospecto",
|
||
"reason": "Orphan con doble prefijo + diff (fuente_del vs fuente_de).",
|
||
},
|
||
{
|
||
"object": "contact",
|
||
"fieldkey": "contact.contactque_modalidad_prefieres",
|
||
"twin_fieldkey": "contact.que_modalidad_prefieres",
|
||
"reason": "Orphan con doble prefijo.",
|
||
},
|
||
# Orphan doble prefijo en opportunity (Monte Providencia DEMO).
|
||
{
|
||
"object": "opportunity",
|
||
"fieldkey": "opportunity.opportunityvehiculo",
|
||
"twin_fieldkey": "opportunity.vehiculo",
|
||
"reason": "Orphan con doble prefijo opportunityopportunity de Vehículo.",
|
||
},
|
||
]
|
||
|
||
_last_request_by_token: dict = {}
|
||
|
||
|
||
def normalize_name(value):
|
||
s = str(value or "").strip().lower()
|
||
s = unicodedata.normalize("NFKD", s)
|
||
s = "".join(c for c in s if not unicodedata.combining(c))
|
||
return " ".join(s.split())
|
||
|
||
|
||
def wait_for_rate_limit(token):
|
||
now = time.time()
|
||
elapsed = now - _last_request_by_token.get(token, 0)
|
||
if elapsed < 0.110:
|
||
time.sleep(0.110 - elapsed)
|
||
_last_request_by_token[token] = time.time()
|
||
|
||
|
||
def ghl_request(method, endpoint, token, *, params=None, json_body=None, version=API_VERSION, retries=3):
|
||
wait_for_rate_limit(token)
|
||
url = endpoint if endpoint.startswith("http") else f"{BASE_URL}{endpoint}"
|
||
headers = {
|
||
"Accept": "application/json",
|
||
"Authorization": f"Bearer {token}",
|
||
"Version": version,
|
||
"Content-Type": "application/json",
|
||
}
|
||
last_exc = None
|
||
for attempt in range(retries):
|
||
try:
|
||
resp = requests.request(method, url, headers=headers, params=params, json=json_body, timeout=45)
|
||
except requests.RequestException as exc:
|
||
last_exc = exc
|
||
time.sleep(0.5 * (attempt + 1))
|
||
continue
|
||
if resp.status_code == 429:
|
||
time.sleep(1.0 * (attempt + 1))
|
||
continue
|
||
if resp.status_code >= 500:
|
||
time.sleep(1.0 * (attempt + 1))
|
||
continue
|
||
resp.raise_for_status()
|
||
if resp.status_code == 204:
|
||
return {}
|
||
return resp.json()
|
||
if last_exc:
|
||
raise last_exc
|
||
resp.raise_for_status()
|
||
|
||
|
||
# --- Schemas y custom fields ---
|
||
|
||
|
||
def list_custom_fields(location_id, token):
|
||
"""Lista TODOS los custom fields de una location (contact + opportunity).
|
||
|
||
El endpoint base sin query devuelve solo contact. Hay que pedir cada model
|
||
por separado y concatenar.
|
||
"""
|
||
result = []
|
||
for model in ("contact", "opportunity"):
|
||
data = ghl_request("GET", f"/locations/{location_id}/customFields", token,
|
||
params={"model": model})
|
||
result.extend(data.get("customFields") or [])
|
||
return result
|
||
|
||
|
||
def get_object_schema_fields(location_id, token, object_key):
|
||
"""Schema crudo (incluye fields estándar y custom)."""
|
||
ghl_request("GET", "/objects/", token, params={"locationId": location_id}, version=OBJECT_API_VERSION)
|
||
data = ghl_request("GET", f"/objects/{object_key}", token,
|
||
params={"locationId": location_id}, version=OBJECT_API_VERSION)
|
||
return data.get("fields") or []
|
||
|
||
|
||
def fieldkey_quality_score(fieldkey, object_key):
|
||
"""Score más alto = fieldKey más "limpio" (menos orphan-like).
|
||
|
||
Penaliza prefijo doble (contactcontact*, opportunityopportunity*) y sufijo `_copy`.
|
||
"""
|
||
if not fieldkey:
|
||
return 0
|
||
s = fieldkey.lower()
|
||
score = 10
|
||
expected_prefix = f"{object_key}."
|
||
after = s[len(expected_prefix):] if s.startswith(expected_prefix) else s
|
||
if after.startswith(object_key):
|
||
score -= 5
|
||
if s.endswith("_copy"):
|
||
score -= 5
|
||
return score
|
||
|
||
|
||
def count_records_with_value(branch, object_key, field_id):
|
||
"""Cuenta records con valor no-null en el field dado."""
|
||
iterator = (iter_all_contacts(branch["location_id"], branch["token"])
|
||
if object_key == "contact"
|
||
else iter_all_opportunities(branch["location_id"], branch["token"]))
|
||
count = 0
|
||
for rec in iterator:
|
||
val = cf_value(rec, field_id)
|
||
if val is not None and val != "" and val != []:
|
||
count += 1
|
||
return count
|
||
|
||
|
||
def resolve_ambiguous(branch, object_key, candidates):
|
||
"""Aplica heurística 'prefer-field-with-values':
|
||
- Si exactamente UN candidato tiene records con valores → ese se queda.
|
||
- Si NINGUNO tiene valores → se queda el de fieldKey más "limpio".
|
||
- Si MÁS DE UNO tiene valores → aborta (manual review).
|
||
Devuelve {abort: bool, keep, delete[], counts}.
|
||
"""
|
||
counts = {}
|
||
for cand in candidates:
|
||
counts[cand["id"]] = count_records_with_value(branch, object_key, cand["id"])
|
||
|
||
with_values_ids = [cid for cid, n in counts.items() if n > 0]
|
||
|
||
if len(with_values_ids) > 1:
|
||
return {"abort": True, "counts": counts}
|
||
|
||
if len(with_values_ids) == 1:
|
||
keep = next(c for c in candidates if c["id"] == with_values_ids[0])
|
||
else:
|
||
ranked = sorted(candidates,
|
||
key=lambda c: fieldkey_quality_score(c.get("fieldKey"), object_key),
|
||
reverse=True)
|
||
keep = ranked[0]
|
||
|
||
delete = [c for c in candidates if c["id"] != keep["id"]]
|
||
return {"abort": False, "keep": keep, "delete": delete, "counts": counts}
|
||
|
||
|
||
def find_field_by_target(branch_cfs, target_fieldkey, object_key, brand_final_name):
|
||
"""Localiza el campo viejo en la sucursal que debe migrarse.
|
||
|
||
Estrategia:
|
||
1. Si el branch ya tiene un campo con target_fieldkey → YA MIGRADO, retornar especial.
|
||
2. Buscar por match en nombre normalizado (case/accent insensitive).
|
||
3. Si hay varios candidatos, retornar lista (caller decide).
|
||
"""
|
||
# 1. ¿Ya está migrado?
|
||
for cf in branch_cfs:
|
||
if cf.get("model") != object_key:
|
||
continue
|
||
if cf.get("fieldKey") == target_fieldkey:
|
||
return {"status": "already_migrated", "field": cf}
|
||
|
||
# 2. Match por nombre normalizado
|
||
target_norm = normalize_name(brand_final_name)
|
||
candidates = []
|
||
for cf in branch_cfs:
|
||
if cf.get("model") != object_key:
|
||
continue
|
||
if normalize_name(cf.get("name")) == target_norm:
|
||
candidates.append(cf)
|
||
|
||
if len(candidates) == 0:
|
||
return {"status": "not_found"}
|
||
if len(candidates) > 1:
|
||
return {"status": "ambiguous", "candidates": candidates}
|
||
return {"status": "found", "field": candidates[0]}
|
||
|
||
|
||
# --- Lectura de records ---
|
||
|
||
|
||
def iter_all_contacts(location_id, token):
|
||
start_after = None
|
||
start_after_id = None
|
||
while True:
|
||
params = {"locationId": location_id, "limit": 100}
|
||
if start_after:
|
||
params["startAfter"] = start_after
|
||
if start_after_id:
|
||
params["startAfterId"] = start_after_id
|
||
data = ghl_request("GET", "/contacts/", token, params=params)
|
||
batch = data.get("contacts", []) or []
|
||
if not batch:
|
||
break
|
||
for c in batch:
|
||
yield c
|
||
meta = data.get("meta") or {}
|
||
next_url = meta.get("nextPageUrl")
|
||
cursor = meta.get("startAfter")
|
||
cursor_id = meta.get("startAfterId")
|
||
if not cursor and next_url:
|
||
qs = parse_qs(urlparse(next_url).query)
|
||
cursor = (qs.get("startAfter") or [None])[0]
|
||
cursor_id = cursor_id or (qs.get("startAfterId") or [None])[0]
|
||
if not cursor and not cursor_id:
|
||
break
|
||
start_after = cursor
|
||
start_after_id = cursor_id
|
||
|
||
|
||
def iter_all_opportunities(location_id, token):
|
||
page = 1
|
||
while True:
|
||
data = ghl_request("POST", "/opportunities/search", token,
|
||
json_body={"locationId": location_id, "limit": 100, "page": page})
|
||
batch = data.get("opportunities", []) or []
|
||
if not batch:
|
||
break
|
||
for o in batch:
|
||
yield o
|
||
total = (data.get("meta") or {}).get("total") or 0
|
||
if total and len(batch) < 100:
|
||
break
|
||
page += 1
|
||
|
||
|
||
def cf_value(record, field_id):
|
||
"""Lee el customField value en un record (acepta 'value' o 'fieldValue')."""
|
||
for f in record.get("customFields", []) or []:
|
||
if f.get("id") != field_id:
|
||
continue
|
||
for key in ("value", "fieldValue", "fieldValueString"):
|
||
v = f.get(key)
|
||
if v is not None:
|
||
return v
|
||
return None
|
||
return None
|
||
|
||
|
||
# --- Operaciones mutadoras ---
|
||
|
||
|
||
def delete_field(location_id, token, field_id):
|
||
return ghl_request("DELETE", f"/locations/{location_id}/customFields/{field_id}", token)
|
||
|
||
|
||
def create_field(location_id, token, *, name, data_type, model, options=None, placeholder=None, position=None, parent_id=None):
|
||
body = {"name": name, "dataType": data_type, "model": model}
|
||
if options is not None and data_type in ("SINGLE_OPTIONS", "MULTIPLE_OPTIONS", "RADIO", "CHECKBOX"):
|
||
body["options"] = options
|
||
if placeholder is not None:
|
||
body["placeholder"] = placeholder
|
||
if position is not None:
|
||
body["position"] = position
|
||
if parent_id:
|
||
body["parentId"] = parent_id
|
||
data = ghl_request("POST", f"/locations/{location_id}/customFields", token, json_body=body)
|
||
return data.get("customField") or data
|
||
|
||
|
||
def rename_field(location_id, token, field_id, new_name):
|
||
data = ghl_request("PUT", f"/locations/{location_id}/customFields/{field_id}", token,
|
||
json_body={"name": new_name})
|
||
return data.get("customField") or data
|
||
|
||
|
||
def put_record_field(location_id, token, object_key, record_id, field_id, value):
|
||
body = {"customFields": [{"id": field_id, "value": value}]}
|
||
endpoint = "/contacts/" if object_key == "contact" else "/opportunities/"
|
||
return ghl_request("PUT", f"{endpoint}{record_id}", token, json_body=body)
|
||
|
||
|
||
# --- Snapshot ---
|
||
|
||
|
||
def write_snapshot(payload, branch_name, target_fieldkey):
|
||
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
|
||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
safe_branch = "".join(c if c.isalnum() else "_" for c in branch_name)[:40]
|
||
safe_fk = target_fieldkey.replace(".", "_")
|
||
path = os.path.join(MIGRATIONS_DIR, f"{safe_branch}_{safe_fk}_{ts}.json")
|
||
with open(path, "w", encoding="utf-8") as fh:
|
||
json.dump(payload, fh, ensure_ascii=False, indent=2, default=str)
|
||
return path
|
||
|
||
|
||
# --- Procedimiento por field × branch ---
|
||
|
||
|
||
def migrate_field_in_branch(branch, brand_field, migration, dry_run, run_id, snapshot_only=False):
|
||
location_id = branch["location_id"]
|
||
token = branch["token"]
|
||
name = branch["nombre"]
|
||
object_key = migration["object"]
|
||
target_fieldkey = migration["target_fieldkey"]
|
||
create_name = migration["create_name"]
|
||
final_name = migration["final_name"]
|
||
|
||
print(f"\n ▶ {target_fieldkey} ({object_key})")
|
||
|
||
branch_cfs = list_custom_fields(location_id, token)
|
||
discovery = find_field_by_target(branch_cfs, target_fieldkey, object_key, final_name)
|
||
|
||
if discovery["status"] == "already_migrated":
|
||
existing = discovery["field"]
|
||
if existing.get("name") != final_name:
|
||
print(f" ✓ fieldKey ya migrado, pero name='{existing.get('name')}' ≠ Marca='{final_name}'")
|
||
if dry_run or snapshot_only:
|
||
print(f" [DRY] PUT rename → '{final_name}'")
|
||
return {"status": "rename_pending", "branch_location_id": location_id}
|
||
try:
|
||
rename_field(location_id, token, existing["id"], final_name)
|
||
print(f" ✓ RENAME → '{final_name}'")
|
||
return {"status": "renamed", "branch_location_id": location_id}
|
||
except Exception as exc:
|
||
print(f" ⚠ rename falló: {exc}")
|
||
return {"status": "rename_failed", "error": str(exc)}
|
||
print(f" ✓ ya migrado (fieldKey={existing['fieldKey']})")
|
||
return {"status": "already_migrated", "branch_location_id": location_id}
|
||
|
||
if discovery["status"] == "not_found":
|
||
print(f" → no se encontró campo viejo. SKIP.")
|
||
return {"status": "not_found", "branch_location_id": location_id}
|
||
|
||
if discovery["status"] == "ambiguous":
|
||
candidates = discovery["candidates"]
|
||
print(f" ⚠ AMBIGUO ({len(candidates)} candidatos). Aplicando 'prefer-field-with-values'...")
|
||
resolution = resolve_ambiguous(branch, object_key, candidates)
|
||
for cid, n in resolution["counts"].items():
|
||
cand = next(c for c in candidates if c["id"] == cid)
|
||
print(f" cand id={cid} fk={cand.get('fieldKey')!r} → {n} records con valor")
|
||
if resolution["abort"]:
|
||
print(f" ✗ ABORT: múltiples candidatos con valores. Migración manual requerida.")
|
||
return {
|
||
"status": "ambiguous_multi_values",
|
||
"candidates": [{"id": c["id"], "fieldKey": c.get("fieldKey"),
|
||
"records_with_values": resolution["counts"][c["id"]]}
|
||
for c in candidates],
|
||
}
|
||
keep = resolution["keep"]
|
||
delete_cands = resolution["delete"]
|
||
print(f" → Keep: {keep['id']} (fk={keep.get('fieldKey')})")
|
||
for orphan in delete_cands:
|
||
n = resolution["counts"][orphan["id"]]
|
||
print(f" → Delete orphan: {orphan['id']} (fk={orphan.get('fieldKey')}, {n} records con valor)")
|
||
if dry_run or snapshot_only:
|
||
print(f" [DRY] Orphans NO eliminados (dry-run/snapshot-only mode).")
|
||
else:
|
||
for orphan in delete_cands:
|
||
try:
|
||
delete_field(location_id, token, orphan["id"])
|
||
print(f" ✓ DELETE orphan {orphan['id']}")
|
||
except Exception as exc:
|
||
print(f" ⚠ falló DELETE orphan {orphan['id']}: {exc}")
|
||
# Caer al flujo normal con el candidato preservado
|
||
discovery = {"status": "found", "field": keep}
|
||
|
||
old_field = discovery["field"]
|
||
old_field_id = old_field["id"]
|
||
old_field_key = old_field.get("fieldKey")
|
||
old_data_type = old_field.get("dataType")
|
||
old_options = old_field.get("picklistOptions") or []
|
||
old_placeholder = old_field.get("placeholder")
|
||
old_position = old_field.get("position")
|
||
old_parent_id = old_field.get("parentId")
|
||
|
||
print(f" old: id={old_field_id} fieldKey={old_field_key} dataType={old_data_type} options={len(old_options)}")
|
||
print(f" brand: dataType={brand_field.get('dataType')} options={len(brand_field.get('picklistOptions') or [])}")
|
||
|
||
if old_data_type != brand_field.get("dataType"):
|
||
print(f" ✗ ABORT: dataType difiere entre sucursal ({old_data_type}) y Marca ({brand_field.get('dataType')}). Migración manual requerida.")
|
||
return {"status": "datatype_mismatch", "branch_location_id": location_id}
|
||
|
||
# === SNAPSHOT ===
|
||
print(f" Snapshot...")
|
||
iterator = iter_all_contacts(location_id, token) if object_key == "contact" else iter_all_opportunities(location_id, token)
|
||
records_snapshot = []
|
||
total_scanned = 0
|
||
for rec in iterator:
|
||
total_scanned += 1
|
||
val = cf_value(rec, old_field_id)
|
||
if val is not None and val != "" and val != []:
|
||
records_snapshot.append({"id": rec["id"], "value": val})
|
||
print(f" {total_scanned} records escaneados, {len(records_snapshot)} con valor en el campo viejo")
|
||
|
||
snapshot_payload = {
|
||
"branch_name": name,
|
||
"branch_location_id": location_id,
|
||
"object": object_key,
|
||
"old_field_id": old_field_id,
|
||
"old_field_key": old_field_key,
|
||
"old_field_name": old_field.get("name"),
|
||
"old_data_type": old_data_type,
|
||
"old_options": old_options,
|
||
"target_fieldkey": target_fieldkey,
|
||
"create_name": create_name,
|
||
"final_name": final_name,
|
||
"dry_run": dry_run,
|
||
"timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
||
"records": records_snapshot,
|
||
}
|
||
snapshot_path = write_snapshot(snapshot_payload, name, target_fieldkey)
|
||
print(f" Snapshot → {snapshot_path}")
|
||
|
||
if snapshot_only:
|
||
print(f" (snapshot-only mode — no se borra nada)")
|
||
return {"status": "snapshot_only", "snapshot": snapshot_path, "records": len(records_snapshot)}
|
||
|
||
if dry_run:
|
||
print(f" [DRY] DELETE old field, POST create '{create_name}', PUT rename → '{final_name}', restore {len(records_snapshot)} valores")
|
||
return {"status": "dry_run", "snapshot": snapshot_path, "records_to_restore": len(records_snapshot)}
|
||
|
||
# === DELETE old field ===
|
||
change_id = None
|
||
if run_id:
|
||
change_id = script_audit.record_change(run_id, location_id, object_key, old_field_id,
|
||
"__schema__", "delete_field", old_field_key, None)
|
||
try:
|
||
delete_field(location_id, token, old_field_id)
|
||
if change_id:
|
||
script_audit.mark_change(change_id, "applied")
|
||
print(f" ✓ DELETE old field {old_field_id}")
|
||
except Exception as exc:
|
||
if change_id:
|
||
script_audit.mark_change(change_id, "failed", str(exc))
|
||
print(f" ✗ DELETE falló: {exc}. SNAPSHOT preservado en {snapshot_path}.")
|
||
return {"status": "delete_failed", "error": str(exc), "snapshot": snapshot_path}
|
||
|
||
# === POST create new field ===
|
||
options_for_create = brand_field.get("picklistOptions") or old_options or None
|
||
try:
|
||
new_field = create_field(location_id, token,
|
||
name=create_name,
|
||
data_type=brand_field.get("dataType"),
|
||
model=object_key,
|
||
options=options_for_create,
|
||
placeholder=old_placeholder,
|
||
position=old_position,
|
||
parent_id=old_parent_id)
|
||
new_field_id = new_field.get("id")
|
||
new_field_key = new_field.get("fieldKey")
|
||
print(f" ✓ CREATE new field id={new_field_id} fieldKey={new_field_key}")
|
||
except Exception as exc:
|
||
print(f" ✗ CREATE falló: {exc}. SNAPSHOT en {snapshot_path}. Campo viejo ya eliminado — recuperación manual desde Marca o JSON.")
|
||
return {"status": "create_failed", "error": str(exc), "snapshot": snapshot_path}
|
||
|
||
# === Verificar fieldKey generado ===
|
||
if new_field_key != target_fieldkey:
|
||
print(f" ⚠ FieldKey generado ({new_field_key}) NO coincide con objetivo ({target_fieldkey}). Snapshot preservado.")
|
||
# Continúa con restore de igual modo — el campo nuevo existe, los datos no se pierden.
|
||
# Pero el audit seguirá reportando esta sucursal como desalineada.
|
||
|
||
# === PUT rename ===
|
||
if create_name != final_name:
|
||
try:
|
||
rename_field(location_id, token, new_field_id, final_name)
|
||
print(f" ✓ RENAME new field → '{final_name}'")
|
||
except Exception as exc:
|
||
print(f" ⚠ RENAME falló: {exc}. El campo está creado con name='{create_name}', podés renombrarlo manualmente.")
|
||
|
||
# === Restore values ===
|
||
restore_errors = []
|
||
restored = 0
|
||
for rec_snap in records_snapshot:
|
||
if not script_audit.wait_if_paused_or_stopped(run_id):
|
||
print(" Detención solicitada. Snapshot preservado para resumir manualmente.")
|
||
break
|
||
try:
|
||
put_record_field(location_id, token, object_key, rec_snap["id"], new_field_id, rec_snap["value"])
|
||
restored += 1
|
||
except Exception as exc:
|
||
restore_errors.append({"record_id": rec_snap["id"], "error": str(exc)})
|
||
print(f" ✓ Restore: {restored}/{len(records_snapshot)} records. Errores: {len(restore_errors)}")
|
||
|
||
return {
|
||
"status": "completed" if not restore_errors and new_field_key == target_fieldkey else "completed_with_warnings",
|
||
"snapshot": snapshot_path,
|
||
"new_field_id": new_field_id,
|
||
"new_field_key": new_field_key,
|
||
"fieldkey_matches_target": new_field_key == target_fieldkey,
|
||
"records_restored": restored,
|
||
"restore_errors": restore_errors,
|
||
}
|
||
|
||
|
||
# --- Entrada ---
|
||
|
||
|
||
def cleanup_orphans(branch, dry_run, run_id, snapshot_only=False):
|
||
"""Borra orphans conocidos (ORPHAN_FIELDKEYS_TO_DELETE) si no tienen
|
||
records con valores. Seguro: si tiene valores, skip + warning."""
|
||
location_id = branch["location_id"]
|
||
token = branch["token"]
|
||
results = []
|
||
branch_cfs = list_custom_fields(location_id, token)
|
||
by_fk = {cf.get("fieldKey"): cf for cf in branch_cfs}
|
||
|
||
for orphan_spec in ORPHAN_FIELDKEYS_TO_DELETE:
|
||
fk = orphan_spec["fieldkey"]
|
||
obj = orphan_spec["object"]
|
||
cf = by_fk.get(fk)
|
||
if not cf:
|
||
continue # no existe en esta sucursal
|
||
print(f"\n ⚙ Orphan detectado: {fk} (id={cf['id']})")
|
||
n_values = count_records_with_value(branch, obj, cf["id"])
|
||
print(f" records con valor: {n_values}")
|
||
if n_values > 0:
|
||
if orphan_spec.get("force"):
|
||
twin_fk = orphan_spec.get("twin_fieldkey")
|
||
twin = by_fk.get(twin_fk) if twin_fk else None
|
||
if not twin:
|
||
print(f" ⚠ SKIP — orphan force=True pero twin_fieldkey {twin_fk!r} NO existe en la sucursal. No es seguro borrar.")
|
||
results.append({"fieldkey": fk, "status": "skipped_no_twin", "records_with_values": n_values})
|
||
continue
|
||
print(f" ⚙ force=True + twin presente ({twin_fk}, id={twin['id']}). Valores replicados — safe to delete.")
|
||
else:
|
||
print(f" ⚠ SKIP — tiene valores. Migración manual requerida.")
|
||
results.append({"fieldkey": fk, "status": "skipped_has_values", "records_with_values": n_values})
|
||
continue
|
||
if dry_run or snapshot_only:
|
||
print(f" [DRY] DELETE orphan {cf['id']}")
|
||
results.append({"fieldkey": fk, "status": "dry_run"})
|
||
continue
|
||
try:
|
||
delete_field(location_id, token, cf["id"])
|
||
print(f" ✓ DELETE orphan {cf['id']}")
|
||
results.append({"fieldkey": fk, "status": "deleted", "id": cf["id"]})
|
||
except Exception as exc:
|
||
print(f" ⚠ DELETE falló: {exc}")
|
||
results.append({"fieldkey": fk, "status": "delete_failed", "error": str(exc)})
|
||
return results
|
||
|
||
|
||
def select_branches(args, accounts):
|
||
if args.location:
|
||
matches = [a for a in accounts if a["location_id"] == args.location]
|
||
if not matches:
|
||
raise SystemExit(f"Location {args.location} no encontrada.")
|
||
return matches
|
||
if args.batch:
|
||
ids = {x.strip() for x in args.batch.split(",") if x.strip()}
|
||
matches = [a for a in accounts if a["location_id"] in ids]
|
||
missing = ids - {a["location_id"] for a in matches}
|
||
if missing:
|
||
print(f"WARN: location_ids no encontrados: {missing}")
|
||
return matches
|
||
if args.all:
|
||
return [a for a in accounts if a.get("type") == "branch"]
|
||
raise SystemExit("Especifica --location <id>, --batch <id1,id2,...> o --all.")
|
||
|
||
|
||
def main():
|
||
parser = argparse.ArgumentParser(
|
||
description="Migra 6 custom fields de sucursales para alinearlos al fieldKey de Marca.",
|
||
)
|
||
parser.add_argument("--location", help="Una sola location.")
|
||
parser.add_argument("--batch", help="Lista comma-separated de location IDs.")
|
||
parser.add_argument("--all", action="store_true", help="Todas las sucursales (NO incluye marca).")
|
||
parser.add_argument("--apply", action="store_true", help="Ejecuta cambios reales. Sin esto corre en DRY-RUN.")
|
||
parser.add_argument("--snapshot-only", action="store_true",
|
||
help="Solo genera snapshots de valores actuales, sin tocar GHL.")
|
||
parser.add_argument("--field", action="append",
|
||
help="Migra solo este target_fieldkey. Repetible. Default: las 6.")
|
||
parser.add_argument("--run-id", help="Audit run ID del dashboard.")
|
||
args = parser.parse_args()
|
||
|
||
if hasattr(sys.stdout, "reconfigure"):
|
||
sys.stdout.reconfigure(encoding="utf-8")
|
||
|
||
accounts = sync_engine.parse_accounts_csv()
|
||
brand = next((a for a in accounts if a["location_id"] == BRAND_LOCATION_ID), None)
|
||
if not brand:
|
||
raise SystemExit("Cuenta de Marca no encontrada en CSV.")
|
||
|
||
branches = select_branches(args, accounts)
|
||
branches = [b for b in branches if b["location_id"] != BRAND_LOCATION_ID]
|
||
|
||
if args.field:
|
||
migrations = [m for m in FIELD_MIGRATIONS if m["target_fieldkey"] in set(args.field)]
|
||
if not migrations:
|
||
raise SystemExit(f"--field no matchea ningún migration. Disponibles: {[m['target_fieldkey'] for m in FIELD_MIGRATIONS]}")
|
||
else:
|
||
migrations = FIELD_MIGRATIONS
|
||
|
||
print("=" * 70)
|
||
print(f"Marca: {brand['nombre']} ({BRAND_LOCATION_ID})")
|
||
print(f"Sucursales a procesar: {len(branches)}")
|
||
print(f"Campos a migrar: {len(migrations)}")
|
||
print(f"Modo: {'APPLY (escritura real)' if args.apply else ('SNAPSHOT-ONLY' if args.snapshot_only else 'DRY-RUN')}")
|
||
print("=" * 70)
|
||
|
||
# Pre-cargar campos de Marca por target_fieldkey
|
||
print("\nFetch schema de Marca...")
|
||
brand_cfs = list_custom_fields(BRAND_LOCATION_ID, brand["token"])
|
||
brand_by_fieldkey = {cf.get("fieldKey"): cf for cf in brand_cfs}
|
||
for m in migrations:
|
||
if m["target_fieldkey"] not in brand_by_fieldkey:
|
||
raise SystemExit(
|
||
f"target_fieldkey {m['target_fieldkey']} NO existe en Marca. Migración requiere que Marca lo tenga.")
|
||
|
||
summary = {"completed": 0, "already_migrated": 0, "not_found": 0,
|
||
"ambiguous": 0, "datatype_mismatch": 0,
|
||
"dry_run": 0, "snapshot_only": 0,
|
||
"delete_failed": 0, "create_failed": 0,
|
||
"completed_with_warnings": 0}
|
||
per_branch_results = {}
|
||
|
||
for branch in branches:
|
||
if not script_audit.wait_if_paused_or_stopped(args.run_id):
|
||
print("\nDetención solicitada.")
|
||
break
|
||
print(f"\n{'=' * 70}")
|
||
print(f"[BRANCH] {branch['nombre']} ({branch['location_id']})")
|
||
branch_results = []
|
||
# Pre-step: limpiar orphans conocidos
|
||
try:
|
||
orphan_results = cleanup_orphans(branch, dry_run=not args.apply, run_id=args.run_id,
|
||
snapshot_only=args.snapshot_only)
|
||
for orphan_res in orphan_results:
|
||
key = f"orphan_{orphan_res.get('status', 'unknown')}"
|
||
summary[key] = summary.get(key, 0) + 1
|
||
except Exception as exc:
|
||
print(f" ⚠ Cleanup orphans falló: {exc}")
|
||
for migration in migrations:
|
||
brand_field = brand_by_fieldkey[migration["target_fieldkey"]]
|
||
try:
|
||
result = migrate_field_in_branch(branch, brand_field, migration,
|
||
dry_run=not args.apply,
|
||
run_id=args.run_id,
|
||
snapshot_only=args.snapshot_only)
|
||
except Exception as exc:
|
||
result = {"status": "exception", "error": str(exc)}
|
||
print(f" ✗ EXCEPCIÓN: {exc}")
|
||
branch_results.append({"migration": migration["target_fieldkey"], **result})
|
||
summary[result.get("status", "exception")] = summary.get(result.get("status", "exception"), 0) + 1
|
||
per_branch_results[branch["location_id"]] = {
|
||
"name": branch["nombre"],
|
||
"results": branch_results,
|
||
}
|
||
|
||
# Reporte final
|
||
print(f"\n{'=' * 70}")
|
||
print("RESUMEN")
|
||
print("=" * 70)
|
||
for k, v in summary.items():
|
||
if v:
|
||
print(f" {k:30s}: {v}")
|
||
|
||
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
||
report_path = os.path.join(MIGRATIONS_DIR, f"migration_report_{ts}.json")
|
||
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
|
||
with open(report_path, "w", encoding="utf-8") as fh:
|
||
json.dump({
|
||
"timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(),
|
||
"apply_mode": args.apply,
|
||
"snapshot_only": args.snapshot_only,
|
||
"branches_processed": len(per_branch_results),
|
||
"summary": summary,
|
||
"details": per_branch_results,
|
||
}, fh, ensure_ascii=False, indent=2, default=str)
|
||
print(f"\nReporte: {report_path}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|