#!/usr/bin/env python3 """Merge valores de orphan customField → twin clean, después delete orphan. Para sucursales donde un campo orphan tiene records con valor y existe un twin clean en la misma sucursal: 1. Iterar todos los records del objeto. 2. Para cada record con valor en el orphan: - Si clean ya tiene el mismo valor → `already_synced` (no action). - Si clean está vacío → `migrate` (PUT clean = orphan_value). - Si clean tiene valor distinto → `conflict` (abort, decisión manual). 3. Snapshot del plan en JSON antes de cualquier escritura. 4. Si hay conflictos → ABORTA sin tocar nada. 5. En --apply: PUT a cada record con valor a migrar, después DELETE orphan field. Default DRY-RUN. Usar --apply para escritura real. """ import argparse import datetime import json import os import sys import time import warnings from urllib.parse import parse_qs, urlparse warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!") import requests ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) import sync_engine # noqa: E402 import script_audit # noqa: E402 BASE_URL = "https://services.leadconnectorhq.com" API_VERSION = "2021-07-28" from paths import MIGRATIONS_DIR # noqa: E402 # Lista explícita de merges a realizar. Cada entry vincula un orphan_fk con su # clean_fk en una location específica. NO incluye Cancún Modalidad (caso # especial — el clean no existe y hay que crearlo). MERGES = [ # Marina Nacional — 4 orphans contact con 1 record cada uno {"location_id": "HvDw9Eg3rjrwkbQJXqfi", "object": "contact", "orphan_fk": "contact.contactmarca_del_vehiculo", "clean_fk": "contact.marca_del_vehiculo"}, {"location_id": "HvDw9Eg3rjrwkbQJXqfi", "object": "contact", "orphan_fk": "contact.contactano_del_vehiculo", "clean_fk": "contact.ano_del_vehiculo"}, {"location_id": "HvDw9Eg3rjrwkbQJXqfi", "object": "contact", "orphan_fk": "contact.contactversion_del_vehiculo", "clean_fk": "contact.version_del_vehiculo"}, {"location_id": "HvDw9Eg3rjrwkbQJXqfi", "object": "contact", "orphan_fk": "contact.contactque_modalidad_prefieres", "clean_fk": "contact.que_modalidad_prefieres"}, # Uruapan — 1 orphan contact con 1 record {"location_id": "FoQWuksh4wQjPbVVZ8ZQ", "object": "contact", "orphan_fk": "contact.fuente_del_prospecto", "clean_fk": "contact.fuente_de_prospecto"}, # Cancún — orphan opp Vehículo con 7 records {"location_id": "uJEn2iuUficuml9zxAnt", "object": "opportunity", "orphan_fk": "opportunity.opportunityvehiculo", "clean_fk": "opportunity.vehiculo"}, # 0001 - MP - Qro DEMO — mismo patrón que Marina Nacional (1 record cada uno) {"location_id": "Z64WQKORPVwXb5mn68Ef", "object": "contact", "orphan_fk": "contact.contactmarca_del_vehiculo", "clean_fk": "contact.marca_del_vehiculo"}, {"location_id": "Z64WQKORPVwXb5mn68Ef", "object": "contact", "orphan_fk": "contact.contactano_del_vehiculo", "clean_fk": "contact.ano_del_vehiculo"}, {"location_id": "Z64WQKORPVwXb5mn68Ef", "object": "contact", "orphan_fk": "contact.contactversion_del_vehiculo", "clean_fk": "contact.version_del_vehiculo"}, {"location_id": "Z64WQKORPVwXb5mn68Ef", "object": "contact", "orphan_fk": "contact.contactque_modalidad_prefieres", "clean_fk": "contact.que_modalidad_prefieres"}, ] _last_request_by_token: dict = {} def wait_for_rate_limit(token): now = time.time() elapsed = now - _last_request_by_token.get(token, 0) if elapsed < 0.110: time.sleep(0.110 - elapsed) _last_request_by_token[token] = time.time() def ghl_request(method, endpoint, token, *, params=None, json_body=None, version=API_VERSION, retries=3): wait_for_rate_limit(token) url = endpoint if endpoint.startswith("http") else f"{BASE_URL}{endpoint}" headers = { "Accept": "application/json", "Authorization": f"Bearer {token}", "Version": version, "Content-Type": "application/json", } last_exc = None for attempt in range(retries): try: resp = requests.request(method, url, headers=headers, params=params, json=json_body, timeout=45) except requests.RequestException as exc: last_exc = exc time.sleep(0.5 * (attempt + 1)) continue if resp.status_code == 429: time.sleep(1.0 * (attempt + 1)) continue if resp.status_code >= 500: time.sleep(1.0 * (attempt + 1)) continue resp.raise_for_status() if resp.status_code == 204: return {} return resp.json() if last_exc: raise last_exc resp.raise_for_status() def list_custom_fields(location_id, token): result = [] for model in ("contact", "opportunity"): data = ghl_request("GET", f"/locations/{location_id}/customFields", token, params={"model": model}) result.extend(data.get("customFields") or []) return result def cf_value(record, field_id): for f in record.get("customFields", []) or []: if f.get("id") != field_id: continue for key in ("value", "fieldValue", "fieldValueString"): v = f.get(key) if v is not None: return v return None return None def value_is_empty(v): return v is None or v == "" or v == [] or v == {} def values_equal(a, b): """Compara dos values. Para listas comparamos como sets-de-elementos.""" if a == b: return True if isinstance(a, list) and isinstance(b, list): return sorted(map(str, a)) == sorted(map(str, b)) return False def iter_contacts(location_id, token): start_after = None start_after_id = None while True: params = {"locationId": location_id, "limit": 100} if start_after: params["startAfter"] = start_after if start_after_id: params["startAfterId"] = start_after_id data = ghl_request("GET", "/contacts/", token, params=params) batch = data.get("contacts", []) or [] if not batch: break for c in batch: yield c meta = data.get("meta") or {} next_url = meta.get("nextPageUrl") cursor = meta.get("startAfter") cursor_id = meta.get("startAfterId") if not cursor and next_url: qs = parse_qs(urlparse(next_url).query) cursor = (qs.get("startAfter") or [None])[0] cursor_id = cursor_id or (qs.get("startAfterId") or [None])[0] if not cursor and not cursor_id: break start_after = cursor start_after_id = cursor_id def iter_opportunities(location_id, token): page = 1 while True: data = ghl_request("POST", "/opportunities/search", token, json_body={"locationId": location_id, "limit": 100, "page": page}) batch = data.get("opportunities", []) or [] if not batch: break for o in batch: yield o total = (data.get("meta") or {}).get("total") or 0 if total and len(batch) < 100: break page += 1 def put_record_field(location_id, token, object_key, record_id, field_id, value): body = {"customFields": [{"id": field_id, "value": value}]} endpoint = "/contacts/" if object_key == "contact" else "/opportunities/" return ghl_request("PUT", f"{endpoint}{record_id}", token, json_body=body) def delete_field(location_id, token, field_id): return ghl_request("DELETE", f"/locations/{location_id}/customFields/{field_id}", token) def record_label(record): return (record.get("contactName") or record.get("name") or " ".join(filter(None, [record.get("firstName"), record.get("lastName")])) or record.get("email") or record.get("phone") or record.get("id")) def plan_merge(branch, merge_spec): """Construye el plan de merge para un merge_spec. Devuelve: {"plan": [...], "orphan_field": {...}, "clean_field": {...}} o {"error": "..."} si falta algún field. """ location_id = branch["location_id"] token = branch["token"] cfs = list_custom_fields(location_id, token) by_fk = {cf.get("fieldKey"): cf for cf in cfs} orphan = by_fk.get(merge_spec["orphan_fk"]) clean = by_fk.get(merge_spec["clean_fk"]) if not orphan: return {"error": f"orphan field {merge_spec['orphan_fk']} no existe en sucursal"} if not clean: return {"error": f"clean field {merge_spec['clean_fk']} no existe en sucursal"} if orphan.get("dataType") != clean.get("dataType"): return {"error": f"dataType mismatch: orphan={orphan.get('dataType')} clean={clean.get('dataType')}"} iterator = iter_contacts(location_id, token) if merge_spec["object"] == "contact" else iter_opportunities(location_id, token) plan = [] total = 0 for rec in iterator: total += 1 ov = cf_value(rec, orphan["id"]) cv = cf_value(rec, clean["id"]) if value_is_empty(ov): continue if values_equal(ov, cv): plan.append({"record_id": rec.get("id"), "label": record_label(rec), "status": "already_synced", "orphan_value": ov}) elif value_is_empty(cv): plan.append({"record_id": rec.get("id"), "label": record_label(rec), "status": "migrate", "orphan_value": ov}) else: plan.append({"record_id": rec.get("id"), "label": record_label(rec), "status": "conflict", "orphan_value": ov, "clean_value": cv}) return {"plan": plan, "orphan_field": orphan, "clean_field": clean, "total_scanned": total} def write_snapshot(payload, branch_name, orphan_fk): os.makedirs(MIGRATIONS_DIR, exist_ok=True) ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") safe_branch = "".join(c if c.isalnum() else "_" for c in branch_name)[:40] safe_fk = orphan_fk.replace(".", "_") path = os.path.join(MIGRATIONS_DIR, f"merge_{safe_branch}_{safe_fk}_{ts}.json") with open(path, "w", encoding="utf-8") as fh: json.dump(payload, fh, ensure_ascii=False, indent=2, default=str) return path def execute_merge(branch, merge_spec, planning, dry_run, run_id): location_id = branch["location_id"] token = branch["token"] plan = planning["plan"] orphan = planning["orphan_field"] clean = planning["clean_field"] conflicts = [p for p in plan if p["status"] == "conflict"] migrates = [p for p in plan if p["status"] == "migrate"] synced = [p for p in plan if p["status"] == "already_synced"] print(f" Records escaneados: {planning['total_scanned']}") print(f" Migrate: {len(migrates)}") print(f" Already synced: {len(synced)}") print(f" Conflict: {len(conflicts)}") snapshot_payload = { "branch": branch["nombre"], "branch_location_id": location_id, "object": merge_spec["object"], "orphan_field": {"id": orphan["id"], "fieldKey": orphan.get("fieldKey"), "name": orphan.get("name"), "dataType": orphan.get("dataType")}, "clean_field": {"id": clean["id"], "fieldKey": clean.get("fieldKey"), "name": clean.get("name"), "dataType": clean.get("dataType")}, "plan": plan, "dry_run": dry_run, "timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(), } snapshot_path = write_snapshot(snapshot_payload, branch["nombre"], merge_spec["orphan_fk"]) print(f" Snapshot → {snapshot_path}") if conflicts: print(f" ✗ ABORT: {len(conflicts)} conflicts. Decisión manual requerida.") for c in conflicts[:5]: print(f" - record {c['record_id']} ({c['label']}):") print(f" orphan={c['orphan_value']!r}") print(f" clean ={c['clean_value']!r}") return {"status": "abort_conflicts", "conflicts": len(conflicts), "snapshot": snapshot_path} if dry_run: print(f" [DRY] Sería: PUT {len(migrates)} records, DELETE orphan {orphan['id']}.") for m in migrates: print(f" - PUT record {m['record_id']} ({m['label']}) <- {m['orphan_value']!r}") return {"status": "dry_run", "migrate_count": len(migrates), "snapshot": snapshot_path} # === EJECUCIÓN === errors = [] migrated = 0 for m in migrates: if not script_audit.wait_if_paused_or_stopped(run_id): print(" Detención solicitada.") break try: put_record_field(location_id, token, merge_spec["object"], m["record_id"], clean["id"], m["orphan_value"]) migrated += 1 print(f" ✓ PUT record {m['record_id']} ({m['label']})") except Exception as exc: errors.append({"record_id": m["record_id"], "error": str(exc)}) print(f" ⚠ PUT falló record {m['record_id']}: {exc}") if errors: print(f" ✗ {len(errors)} PUTs fallaron. NO se elimina el orphan field. Snapshot preservado.") return {"status": "partial_failure", "migrated": migrated, "errors": errors, "snapshot": snapshot_path} try: delete_field(location_id, token, orphan["id"]) print(f" ✓ DELETE orphan field {orphan['id']}") return {"status": "completed", "migrated": migrated, "snapshot": snapshot_path} except Exception as exc: print(f" ⚠ DELETE orphan falló: {exc}") return {"status": "delete_failed", "migrated": migrated, "error": str(exc), "snapshot": snapshot_path} def select_merges(args): if args.location: return [m for m in MERGES if m["location_id"] == args.location] if args.orphan_fk: return [m for m in MERGES if m["orphan_fk"] == args.orphan_fk] return list(MERGES) def main(): parser = argparse.ArgumentParser( description="Merge valores de orphan a clean twin, después delete orphan field.", ) parser.add_argument("--apply", action="store_true", help="Ejecuta cambios reales. Sin esto corre en DRY-RUN.") parser.add_argument("--location", help="Filtrar a una location ID.") parser.add_argument("--orphan-fk", help="Filtrar a un orphan_fk específico.") parser.add_argument("--run-id", help="Audit run ID del dashboard.") args = parser.parse_args() if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") accounts = sync_engine.parse_accounts_csv() accounts_by_id = {a["location_id"]: a for a in accounts} merges_to_run = select_merges(args) print("=" * 70) print(f"Merges a procesar: {len(merges_to_run)}") print(f"Modo: {'APPLY' if args.apply else 'DRY-RUN'}") print("=" * 70) summary = {"completed": 0, "dry_run": 0, "abort_conflicts": 0, "partial_failure": 0, "delete_failed": 0, "error": 0} results = [] for merge_spec in merges_to_run: if not script_audit.wait_if_paused_or_stopped(args.run_id): print("\nDetención solicitada.") break branch = accounts_by_id.get(merge_spec["location_id"]) if not branch: print(f"\n⚠ Location {merge_spec['location_id']} no en CSV. Skip.") continue print(f"\n[{branch['nombre']}] {merge_spec['object']}: {merge_spec['orphan_fk']} → {merge_spec['clean_fk']}") try: planning = plan_merge(branch, merge_spec) if "error" in planning: print(f" ✗ {planning['error']}") summary["error"] += 1 results.append({"merge": merge_spec, "status": "error", "error": planning["error"]}) continue result = execute_merge(branch, merge_spec, planning, dry_run=not args.apply, run_id=args.run_id) summary[result["status"]] = summary.get(result["status"], 0) + 1 results.append({"merge": merge_spec, **result}) except Exception as exc: print(f" ✗ EXCEPCIÓN: {exc}") summary["error"] += 1 results.append({"merge": merge_spec, "status": "exception", "error": str(exc)}) print(f"\n{'=' * 70}") print("RESUMEN") print("=" * 70) for k, v in summary.items(): if v: print(f" {k:25s}: {v}") ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") report_path = os.path.join(MIGRATIONS_DIR, f"merge_report_{ts}.json") os.makedirs(MIGRATIONS_DIR, exist_ok=True) with open(report_path, "w", encoding="utf-8") as fh: json.dump({ "timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(), "apply_mode": args.apply, "summary": summary, "results": results, }, fh, ensure_ascii=False, indent=2, default=str) print(f"\nReporte: {report_path}") if __name__ == "__main__": main()