#!/usr/bin/env python3 # -*- coding: utf-8 -*- """sync_brand_to_branch_contacts.py Sincroniza desde Marca hacia las sucursales correspondientes los contactos del bucket "contacts_in_brand_not_in_any_branch" (contactos que viven solo en Marca y no aparecen en ninguna sucursal). Reglas estrictas: 1. Solo se procesa el contacto si tiene el campo TIENDA poblado Y esa TIENDA se puede resolver a un location_id real via el CSV verificador. Si no, se skipea con razon explicita. 2. Antes de crear, se hace double-check en la sucursal destino con la misma estrategia: telefono -> email -> nombre (el match por nombre solo aplica si el contacto no tiene phone ni email). 3. Si match en sucursal: skip (ya existe alli). 4. Si no: crea el contacto en la sucursal con todos los datos basicos + custom fields mapeados por nombre del schema de Marca al schema de la sucursal. Modos: - dry-run (default): no escribe nada en GHL, solo planea. - --apply: ejecuta las escrituras, registra cambios en script_audit y replica el contacto en SQLite local para mantener el snapshot sincronizado. Uso CLI: python scripts/sync_brand_to_branch_contacts.py python scripts/sync_brand_to_branch_contacts.py --apply --yes python scripts/sync_brand_to_branch_contacts.py --only-contact python scripts/sync_brand_to_branch_contacts.py --json """ import argparse import json import os import sqlite3 import sys import threading from collections import defaultdict from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) if SCRIPTS_DIR not in sys.path: sys.path.insert(0, SCRIPTS_DIR) import script_audit # noqa: E402 import sync_engine # noqa: E402 from audit_brand_vs_branches_totals import ( # noqa: E402 load_verifier, run_audit, ) from sync_missing_opps_to_brand import ( # noqa: E402 BRAND_LOCATION_ID, DB_PATH, build_contact_payload, find_brand_contact, index_brand_contacts, load_schemas_id_to_name, load_schemas_name_to_id, normalize_email, normalize_name, normalize_phone, safe_print, upsert_contact_in_db, ) SCRIPT_NAME = os.path.basename(__file__) def _load_branch_contact_index(conn, location_id): rows = conn.execute( "SELECT id, first_name, last_name, phone, email FROM contacts WHERE location_id=?", (location_id,), ).fetchall() contacts = [dict(r) for r in rows] idx_phone, idx_email, idx_name = index_brand_contacts(contacts) return contacts, idx_phone, idx_email, idx_name def _load_brand_contact_full(conn, contact_id): row = conn.execute( "SELECT * FROM contacts WHERE location_id=? AND id=?", (BRAND_LOCATION_ID, contact_id), ).fetchone() return dict(row) if row else None def _preview_payload(payload): cf_count = len(payload.get("customFields", [])) p = {k: v for k, v in payload.items() if k != "customFields"} if cf_count: p["customFields_count"] = cf_count return p def run_sync(contact_ids=None, dry_run=True, log=None, run_id=None, max_parallel=None): """Ejecuta la sincronizacion Marca -> Sucursal. Devuelve dict serializable. Args: contact_ids: lista opcional de contact_ids de Marca a procesar. dry_run: True (default) = no escribe en GHL. log: funcion opcional log(line). run_id: id de script_audit cuando apply. max_parallel: numero maximo de creates en paralelo. None = auto (igual al numero de creates planificados, capado a 20). Flujo: 1. Pre-carga TODAS las sucursales no-demo (snapshot global con indices). 2. Loop secuencial de validacion: TIENDA, match en destino, match GLOBAL. Genera resultados parciales (skips) y una lista de creates planificados. 3. Si apply: ejecuta los creates en paralelo con ThreadPoolExecutor. Cada worker hace SOLO la llamada HTTP a GHL; el upsert en SQLite y el record_change se hacen en el thread principal tras consumir el future (evita races en SQLite y mantiene el log ordenado por completitud). """ if log is None: log = safe_print log_lock = threading.Lock() def tlog(msg): with log_lock: log(msg) if not os.path.exists(DB_PATH): raise FileNotFoundError(f"No existe {DB_PATH}. Corre una sincronizacion global primero.") log(f"[{datetime.now().strftime('%H:%M:%S')}] === sync_brand_to_branch_contacts ===") log(f"Modo: {'DRY-RUN (no escribe)' if dry_run else 'APPLY (escribe en GHL)'}") log("Calculando bucket contacts_in_brand_not_in_any_branch desde audit...") audit_data = run_audit(limit_missing=None) missing = audit_data["missing"]["contacts_in_brand_not_in_any_branch"] targets = missing["items"] if contact_ids: wanted = set(contact_ids) targets = [t for t in targets if t["id"] in wanted] log(f"Contactos candidatos: {len(targets)} (total en bucket: {missing['total']})") if not targets: return { "dry_run": dry_run, "summary": _empty_summary(), "items": [], } tokens = sync_engine.get_tokens_map() _verifier_by_loc, _verifier_by_tienda = load_verifier() conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row try: brand_schema_id_to_name = load_schemas_id_to_name(conn, BRAND_LOCATION_ID, "contact") # ---- Pre-carga GLOBAL: todas las sucursales no-demo con sus indices ---- log("Pre-cargando snapshot de TODAS las sucursales para double-check global...") per_branch_summary = audit_data.get("per_branch") or [] # per_branch_summary ya excluye Marca y demos (audit_brand_vs_branches_totals lo filtra). branch_data = {} # loc_id -> {name, contacts, idx_phone, idx_email, idx_name} branch_schema_cache = {} # loc_id -> name->id schema global_contacts_aug = [] # contactos de todas las sucursales con _loc anotado for b_info in per_branch_summary: loc = b_info["location_id"] _bc, ip, ie, inom = _load_branch_contact_index(conn, loc) branch_data[loc] = { "name": b_info["name"], "contacts": _bc, "idx_phone": ip, "idx_email": ie, "idx_name": inom, } for c in _bc: aug = dict(c) aug["_loc"] = loc aug["_loc_name"] = b_info["name"] global_contacts_aug.append(aug) global_idx_phone, global_idx_email, global_idx_name = index_brand_contacts(global_contacts_aug) log(f"Snapshot global: {len(branch_data)} sucursales, {len(global_contacts_aug)} contactos indexados.") summary = _empty_summary() summary["candidates"] = len(targets) results = [] creates_planned = [] # cada item: dict con item, payload, target_token, expected_loc, etc. # ---- Fase 1: validacion secuencial ---- for idx, target in enumerate(targets, 1): brand_contact_id = target["id"] display_name = target.get("name") or "(sin nombre)" tienda_value = target.get("tienda") expected_loc = target.get("expected_location_id") expected_name = target.get("expected_branch_name") log(f"[{idx}/{len(targets)}] {display_name} ({brand_contact_id}) tienda='{tienda_value}'") item = { "brand_contact_id": brand_contact_id, "name": display_name, "phone": target.get("phone"), "email": target.get("email"), "tienda": tienda_value, "target_location_id": expected_loc, "target_branch_name": expected_name, "actions": [], "status": "pending", "error": None, } try: if not tienda_value: item["status"] = "skipped_no_tienda" item["actions"].append({"action": "skip_no_tienda"}) summary["skipped_no_tienda"] += 1 log(" Skip: contacto no tiene campo TIENDA poblado.") results.append(item) continue if not expected_loc: item["status"] = "skipped_unknown_tienda" item["actions"].append({"action": "skip_unknown_tienda", "tienda": tienda_value}) summary["skipped_unknown_tienda"] += 1 log(f" Skip: TIENDA='{tienda_value}' no matchea ninguna fila del verificador.") results.append(item) continue target_token = tokens.get(expected_loc) if not target_token: raise RuntimeError(f"No hay token para la location {expected_loc} ({expected_name})") brand_contact = _load_brand_contact_full(conn, brand_contact_id) if not brand_contact: raise RuntimeError(f"No se encontro el contacto {brand_contact_id} en Marca SQLite") # Double-check #1: sucursal destino bdata = branch_data.get(expected_loc) if not bdata: # Sucursal destino no esta cacheada (puede pasar si fue demo o filtrada). raise RuntimeError(f"Sucursal destino {expected_loc} no esta en el snapshot") match, strategy, collision = find_brand_contact( brand_contact, bdata["idx_phone"], bdata["idx_email"], bdata["idx_name"] ) if collision and not match: # Mismo teléfono que un contacto de la sucursal destino, pero # nombres divergentes. NO crear ciegamente: probablemente # son personas distintas que comparten número. log( f" COLISION TELEFONO en sucursal destino {expected_name}: " f"contacto local {collision.get('id')} comparte numero pero " f"el nombre diverge. Skip para revision manual." ) item["status"] = "skipped_phone_collision" item["actions"].append({ "action": "skip_phone_collision_in_branch", "branch_contact_id": collision.get("id"), "branch_location_id": expected_loc, "branch_name": expected_name, }) summary.setdefault("phone_collisions_unresolved", 0) summary["phone_collisions_unresolved"] += 1 if run_id: try: script_audit.record_change( run_id, expected_loc, "contact", brand_contact_id, "", "skipped_phone_collision", None, { "source_brand_contact_id": brand_contact_id, "colliding_branch_contact_id": collision.get("id"), "phone": brand_contact.get("phone"), }, ) except Exception as audit_exc: log(f" WARN: no se pudo registrar la colision en script_audit: {audit_exc}") results.append(item) continue if match: log(f" YA EXISTE en sucursal destino {expected_name} por {strategy}: {match['id']}. Skip.") item["status"] = "skipped_already_in_branch" item["actions"].append({ "action": "skip_already_in_branch", "strategy": strategy, "branch_contact_id": match["id"], }) summary["skipped_already_in_branch"] += 1 results.append(item) continue # Double-check #2: cualquier OTRA sucursal (global) global_match, global_strategy, _global_collision = find_brand_contact( brand_contact, global_idx_phone, global_idx_email, global_idx_name ) if global_match and global_match.get("_loc") and global_match["_loc"] != expected_loc: other_loc = global_match["_loc"] other_name = global_match.get("_loc_name") or other_loc log(f" YA EXISTE en OTRA sucursal: {other_name} ({other_loc}) por {global_strategy}: {global_match['id']}. Skip.") item["status"] = "skipped_in_other_branch" item["actions"].append({ "action": "skip_in_other_branch", "strategy": global_strategy, "branch_contact_id": global_match["id"], "located_in_location_id": other_loc, "located_in_branch_name": other_name, }) summary["skipped_in_other_branch"] += 1 results.append(item) continue # Construir payload con mapping de CFs Marca -> Sucursal destino if expected_loc not in branch_schema_cache: branch_schema_cache[expected_loc] = load_schemas_name_to_id(conn, expected_loc, "contact") dst_schema = branch_schema_cache[expected_loc] if not dst_schema: log(f" WARN: schema de contactos vacio para {expected_name}. Los custom fields no se mapearan.") payload = build_contact_payload( brand_contact, brand_schema_id_to_name, dst_schema, target_location_id=expected_loc, ) log(f" Plan: CREAR en sucursal {expected_name} (cf_count={len(payload.get('customFields', []))})") item["actions"].append({ "action": "create_contact_in_branch", "target_location_id": expected_loc, "target_branch_name": expected_name, "payload_preview": _preview_payload(payload), }) item["status"] = "created" # tentativo; si apply falla, se cambia a error results.append(item) if dry_run: summary["contacts_created"] += 1 else: # Encolar para fase 2 (paralela) creates_planned.append({ "item": item, "payload": payload, "target_token": target_token, "expected_loc": expected_loc, "expected_name": expected_name, "brand_contact": brand_contact, "brand_contact_id": brand_contact_id, "bdata": bdata, }) except Exception as e: summary["errors"] += 1 item["status"] = "error" item["error"] = str(e) log(f" ERROR: {e}") if item not in results: results.append(item) # ---- Fase 2: paralelizar creates ---- if not dry_run and creates_planned: workers = min(max_parallel or len(creates_planned), len(creates_planned), 20) log(f"\nAplicando {len(creates_planned)} creates en paralelo (max_workers={workers})...") def _do_create(plan): # Solo la llamada HTTP. NO toca SQLite ni indices en memoria. return sync_engine.ghl_client.create_contact(plan["target_token"], plan["payload"]) t0 = datetime.now() with ThreadPoolExecutor(max_workers=workers) as executor: future_to_plan = {executor.submit(_do_create, plan): plan for plan in creates_planned} for future in as_completed(future_to_plan): plan = future_to_plan[future] item = plan["item"] try: res = future.result() new_contact_id = (res.get("contact") or {}).get("id") or res.get("id") if not new_contact_id: raise RuntimeError(f"GHL no devolvio id de contacto creado. Respuesta: {res}") summary["contacts_created"] += 1 item["actions"][-1]["result"] = {"branch_contact_id": new_contact_id} tlog(f" [OK] {item['name']} -> {plan['expected_name']}: {new_contact_id}") # Upsert SQLite e indices en thread principal (post-future). try: upsert_contact_in_db(conn, new_contact_id, plan["payload"], plan["expected_loc"]) except Exception as db_exc: tlog(f" WARN: no se pudo upsert contacto en SQLite: {db_exc}") # Refresh autoritativo desde Bucéfalo. try: ref = sync_engine.refresh_contact_in_db(plan["target_token"], new_contact_id, plan["expected_loc"]) if not ref.get("ok"): tlog(f" WARN: refresh_contact_in_db fallo: {ref.get('error')}") summary.setdefault("local_refresh_errors", 0) summary["local_refresh_errors"] += 1 except Exception as ref_exc: tlog(f" WARN: refresh_contact_in_db excepcion: {ref_exc}") # Indexar en memoria para que duplicados intra-batch se detecten. new_c = { "id": new_contact_id, "first_name": plan["brand_contact"].get("first_name"), "last_name": plan["brand_contact"].get("last_name"), "phone": plan["brand_contact"].get("phone"), "email": plan["brand_contact"].get("email"), "_loc": plan["expected_loc"], "_loc_name": plan["expected_name"], } p = normalize_phone(new_c.get("phone")) e = normalize_email(new_c.get("email")) n = normalize_name(new_c.get("first_name"), new_c.get("last_name")) if p: plan["bdata"]["idx_phone"].setdefault(p, new_c) global_idx_phone.setdefault(p, new_c) if e: plan["bdata"]["idx_email"].setdefault(e, new_c) global_idx_email.setdefault(e, new_c) if n and not p and not e: plan["bdata"]["idx_name"].setdefault(n, new_c) global_idx_name.setdefault(n, new_c) if run_id: cid = script_audit.record_change( run_id, plan["expected_loc"], "contact", new_contact_id, "", "created", None, {"phone": new_c["phone"], "email": new_c["email"], "name": f"{new_c['first_name']} {new_c['last_name']}".strip(), "source": "brand", "source_contact_id": plan["brand_contact_id"]}, ) if cid: script_audit.mark_change(cid, "applied") except Exception as e: summary["errors"] += 1 item["status"] = "error" item["error"] = str(e) tlog(f" [ERROR] {item['name']} -> {plan['expected_name']}: {e}") elapsed = (datetime.now() - t0).total_seconds() log(f"Creates paralelos terminados en {elapsed:.1f}s ({len(creates_planned)} requests).") # ---- Resumen ---- log(f"\n=== RESUMEN ===") log(f" Candidatos : {summary['candidates']}") log(f" Contactos {'a crear' if dry_run else 'creados'} : {summary['contacts_created']}") log(f" Ya existian en destino : {summary['skipped_already_in_branch']}") log(f" Ya existian en OTRA suc : {summary['skipped_in_other_branch']}") log(f" Skip sin TIENDA : {summary['skipped_no_tienda']}") log(f" Skip TIENDA desconocida : {summary['skipped_unknown_tienda']}") if summary.get("phone_collisions_unresolved"): log(f" Colisiones telefono sin match (revision manual): {summary['phone_collisions_unresolved']}") log(f" Errores : {summary['errors']}") return { "dry_run": dry_run, "summary": summary, "items": results, } finally: conn.close() def _empty_summary(): return { "candidates": 0, "contacts_created": 0, "skipped_already_in_branch": 0, "skipped_in_other_branch": 0, "skipped_no_tienda": 0, "skipped_unknown_tienda": 0, "phone_collisions_unresolved": 0, "errors": 0, } def main(): parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--apply", action="store_true", help="Ejecuta las escrituras en GHL. Default dry-run.") parser.add_argument("--yes", action="store_true", help="Skip confirmacion interactiva.") parser.add_argument("--only-contact", action="append", default=[], help="Procesa solo el contact_id dado (repetible).") parser.add_argument("--run-id", type=str, default=None, help="Id de script_audit. Solo aplica con --apply.") parser.add_argument("--json", action="store_true", help="Imprime el resultado como JSON.") args = parser.parse_args() dry_run = not args.apply if not dry_run and not args.yes: confirm = input("Esto escribira en GHL. Continuar? (y/N): ").strip().lower() if confirm not in ("y", "yes", "s", "si", "sí"): print("Cancelado.") sys.exit(0) try: result = run_sync( contact_ids=args.only_contact or None, dry_run=dry_run, log=safe_print, run_id=args.run_id, ) except FileNotFoundError as e: safe_print(f"ERROR: {e}") sys.exit(2) except RuntimeError as e: safe_print(f"ERROR: {e}") sys.exit(3) if args.json: safe_print(json.dumps(result, ensure_ascii=False, indent=2, default=str)) if __name__ == "__main__": main()