Files
MP-Manager/scripts/sync_brand_to_branch_contacts.py
2026-05-30 14:31:19 -06:00

509 lines
23 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""sync_brand_to_branch_contacts.py
Sincroniza desde Marca hacia las sucursales correspondientes los contactos del
bucket "contacts_in_brand_not_in_any_branch" (contactos que viven solo en Marca
y no aparecen en ninguna sucursal).
Reglas estrictas:
1. Solo se procesa el contacto si tiene el campo TIENDA poblado Y esa TIENDA
se puede resolver a un location_id real via el CSV verificador. Si no, se
skipea con razon explicita.
2. Antes de crear, se hace double-check en la sucursal destino con la misma
estrategia: telefono -> email -> nombre (el match por nombre solo aplica si
el contacto no tiene phone ni email).
3. Si match en sucursal: skip (ya existe alli).
4. Si no: crea el contacto en la sucursal con todos los datos basicos + custom
fields mapeados por nombre del schema de Marca al schema de la sucursal.
Modos:
- dry-run (default): no escribe nada en GHL, solo planea.
- --apply: ejecuta las escrituras, registra cambios en script_audit y replica
el contacto en SQLite local para mantener el snapshot sincronizado.
Uso CLI:
python scripts/sync_brand_to_branch_contacts.py
python scripts/sync_brand_to_branch_contacts.py --apply --yes
python scripts/sync_brand_to_branch_contacts.py --only-contact <id>
python scripts/sync_brand_to_branch_contacts.py --json
"""
import argparse
import json
import os
import sqlite3
import sys
import threading
from collections import defaultdict
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
import script_audit # noqa: E402
import sync_engine # noqa: E402
from audit_brand_vs_branches_totals import ( # noqa: E402
load_verifier,
run_audit,
)
from sync_missing_opps_to_brand import ( # noqa: E402
BRAND_LOCATION_ID,
DB_PATH,
build_contact_payload,
find_brand_contact,
index_brand_contacts,
load_schemas_id_to_name,
load_schemas_name_to_id,
normalize_email,
normalize_name,
normalize_phone,
safe_print,
upsert_contact_in_db,
)
SCRIPT_NAME = os.path.basename(__file__)
def _load_branch_contact_index(conn, location_id):
rows = conn.execute(
"SELECT id, first_name, last_name, phone, email FROM contacts WHERE location_id=?",
(location_id,),
).fetchall()
contacts = [dict(r) for r in rows]
idx_phone, idx_email, idx_name = index_brand_contacts(contacts)
return contacts, idx_phone, idx_email, idx_name
def _load_brand_contact_full(conn, contact_id):
row = conn.execute(
"SELECT * FROM contacts WHERE location_id=? AND id=?",
(BRAND_LOCATION_ID, contact_id),
).fetchone()
return dict(row) if row else None
def _preview_payload(payload):
cf_count = len(payload.get("customFields", []))
p = {k: v for k, v in payload.items() if k != "customFields"}
if cf_count:
p["customFields_count"] = cf_count
return p
def run_sync(contact_ids=None, dry_run=True, log=None, run_id=None, max_parallel=None):
"""Ejecuta la sincronizacion Marca -> Sucursal. Devuelve dict serializable.
Args:
contact_ids: lista opcional de contact_ids de Marca a procesar.
dry_run: True (default) = no escribe en GHL.
log: funcion opcional log(line).
run_id: id de script_audit cuando apply.
max_parallel: numero maximo de creates en paralelo. None = auto (igual al
numero de creates planificados, capado a 20).
Flujo:
1. Pre-carga TODAS las sucursales no-demo (snapshot global con indices).
2. Loop secuencial de validacion: TIENDA, match en destino, match GLOBAL.
Genera resultados parciales (skips) y una lista de creates planificados.
3. Si apply: ejecuta los creates en paralelo con ThreadPoolExecutor.
Cada worker hace SOLO la llamada HTTP a GHL; el upsert en SQLite y el
record_change se hacen en el thread principal tras consumir el future
(evita races en SQLite y mantiene el log ordenado por completitud).
"""
if log is None:
log = safe_print
log_lock = threading.Lock()
def tlog(msg):
with log_lock:
log(msg)
if not os.path.exists(DB_PATH):
raise FileNotFoundError(f"No existe {DB_PATH}. Corre una sincronizacion global primero.")
log(f"[{datetime.now().strftime('%H:%M:%S')}] === sync_brand_to_branch_contacts ===")
log(f"Modo: {'DRY-RUN (no escribe)' if dry_run else 'APPLY (escribe en GHL)'}")
log("Calculando bucket contacts_in_brand_not_in_any_branch desde audit...")
audit_data = run_audit(limit_missing=None)
missing = audit_data["missing"]["contacts_in_brand_not_in_any_branch"]
targets = missing["items"]
if contact_ids:
wanted = set(contact_ids)
targets = [t for t in targets if t["id"] in wanted]
log(f"Contactos candidatos: {len(targets)} (total en bucket: {missing['total']})")
if not targets:
return {
"dry_run": dry_run,
"summary": _empty_summary(),
"items": [],
}
tokens = sync_engine.get_tokens_map()
_verifier_by_loc, _verifier_by_tienda = load_verifier()
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
brand_schema_id_to_name = load_schemas_id_to_name(conn, BRAND_LOCATION_ID, "contact")
# ---- Pre-carga GLOBAL: todas las sucursales no-demo con sus indices ----
log("Pre-cargando snapshot de TODAS las sucursales para double-check global...")
per_branch_summary = audit_data.get("per_branch") or []
# per_branch_summary ya excluye Marca y demos (audit_brand_vs_branches_totals lo filtra).
branch_data = {} # loc_id -> {name, contacts, idx_phone, idx_email, idx_name}
branch_schema_cache = {} # loc_id -> name->id schema
global_contacts_aug = [] # contactos de todas las sucursales con _loc anotado
for b_info in per_branch_summary:
loc = b_info["location_id"]
_bc, ip, ie, inom = _load_branch_contact_index(conn, loc)
branch_data[loc] = {
"name": b_info["name"],
"contacts": _bc,
"idx_phone": ip,
"idx_email": ie,
"idx_name": inom,
}
for c in _bc:
aug = dict(c)
aug["_loc"] = loc
aug["_loc_name"] = b_info["name"]
global_contacts_aug.append(aug)
global_idx_phone, global_idx_email, global_idx_name = index_brand_contacts(global_contacts_aug)
log(f"Snapshot global: {len(branch_data)} sucursales, {len(global_contacts_aug)} contactos indexados.")
summary = _empty_summary()
summary["candidates"] = len(targets)
results = []
creates_planned = [] # cada item: dict con item, payload, target_token, expected_loc, etc.
# ---- Fase 1: validacion secuencial ----
for idx, target in enumerate(targets, 1):
brand_contact_id = target["id"]
display_name = target.get("name") or "(sin nombre)"
tienda_value = target.get("tienda")
expected_loc = target.get("expected_location_id")
expected_name = target.get("expected_branch_name")
log(f"[{idx}/{len(targets)}] {display_name} ({brand_contact_id}) tienda='{tienda_value}'")
item = {
"brand_contact_id": brand_contact_id,
"name": display_name,
"phone": target.get("phone"),
"email": target.get("email"),
"tienda": tienda_value,
"target_location_id": expected_loc,
"target_branch_name": expected_name,
"actions": [],
"status": "pending",
"error": None,
}
try:
if not tienda_value:
item["status"] = "skipped_no_tienda"
item["actions"].append({"action": "skip_no_tienda"})
summary["skipped_no_tienda"] += 1
log(" Skip: contacto no tiene campo TIENDA poblado.")
results.append(item)
continue
if not expected_loc:
item["status"] = "skipped_unknown_tienda"
item["actions"].append({"action": "skip_unknown_tienda", "tienda": tienda_value})
summary["skipped_unknown_tienda"] += 1
log(f" Skip: TIENDA='{tienda_value}' no matchea ninguna fila del verificador.")
results.append(item)
continue
target_token = tokens.get(expected_loc)
if not target_token:
raise RuntimeError(f"No hay token para la location {expected_loc} ({expected_name})")
brand_contact = _load_brand_contact_full(conn, brand_contact_id)
if not brand_contact:
raise RuntimeError(f"No se encontro el contacto {brand_contact_id} en Marca SQLite")
# Double-check #1: sucursal destino
bdata = branch_data.get(expected_loc)
if not bdata:
# Sucursal destino no esta cacheada (puede pasar si fue demo o filtrada).
raise RuntimeError(f"Sucursal destino {expected_loc} no esta en el snapshot")
match, strategy, collision = find_brand_contact(
brand_contact, bdata["idx_phone"], bdata["idx_email"], bdata["idx_name"]
)
if collision and not match:
# Mismo teléfono que un contacto de la sucursal destino, pero
# nombres divergentes. NO crear ciegamente: probablemente
# son personas distintas que comparten número.
log(
f" COLISION TELEFONO en sucursal destino {expected_name}: "
f"contacto local {collision.get('id')} comparte numero pero "
f"el nombre diverge. Skip para revision manual."
)
item["status"] = "skipped_phone_collision"
item["actions"].append({
"action": "skip_phone_collision_in_branch",
"branch_contact_id": collision.get("id"),
"branch_location_id": expected_loc,
"branch_name": expected_name,
})
summary.setdefault("phone_collisions_unresolved", 0)
summary["phone_collisions_unresolved"] += 1
if run_id:
try:
script_audit.record_change(
run_id, expected_loc, "contact", brand_contact_id,
"", "skipped_phone_collision", None,
{
"source_brand_contact_id": brand_contact_id,
"colliding_branch_contact_id": collision.get("id"),
"phone": brand_contact.get("phone"),
},
)
except Exception as audit_exc:
log(f" WARN: no se pudo registrar la colision en script_audit: {audit_exc}")
results.append(item)
continue
if match:
log(f" YA EXISTE en sucursal destino {expected_name} por {strategy}: {match['id']}. Skip.")
item["status"] = "skipped_already_in_branch"
item["actions"].append({
"action": "skip_already_in_branch",
"strategy": strategy,
"branch_contact_id": match["id"],
})
summary["skipped_already_in_branch"] += 1
results.append(item)
continue
# Double-check #2: cualquier OTRA sucursal (global)
global_match, global_strategy, _global_collision = find_brand_contact(
brand_contact, global_idx_phone, global_idx_email, global_idx_name
)
if global_match and global_match.get("_loc") and global_match["_loc"] != expected_loc:
other_loc = global_match["_loc"]
other_name = global_match.get("_loc_name") or other_loc
log(f" YA EXISTE en OTRA sucursal: {other_name} ({other_loc}) por {global_strategy}: {global_match['id']}. Skip.")
item["status"] = "skipped_in_other_branch"
item["actions"].append({
"action": "skip_in_other_branch",
"strategy": global_strategy,
"branch_contact_id": global_match["id"],
"located_in_location_id": other_loc,
"located_in_branch_name": other_name,
})
summary["skipped_in_other_branch"] += 1
results.append(item)
continue
# Construir payload con mapping de CFs Marca -> Sucursal destino
if expected_loc not in branch_schema_cache:
branch_schema_cache[expected_loc] = load_schemas_name_to_id(conn, expected_loc, "contact")
dst_schema = branch_schema_cache[expected_loc]
if not dst_schema:
log(f" WARN: schema de contactos vacio para {expected_name}. Los custom fields no se mapearan.")
payload = build_contact_payload(
brand_contact,
brand_schema_id_to_name,
dst_schema,
target_location_id=expected_loc,
)
log(f" Plan: CREAR en sucursal {expected_name} (cf_count={len(payload.get('customFields', []))})")
item["actions"].append({
"action": "create_contact_in_branch",
"target_location_id": expected_loc,
"target_branch_name": expected_name,
"payload_preview": _preview_payload(payload),
})
item["status"] = "created" # tentativo; si apply falla, se cambia a error
results.append(item)
if dry_run:
summary["contacts_created"] += 1
else:
# Encolar para fase 2 (paralela)
creates_planned.append({
"item": item,
"payload": payload,
"target_token": target_token,
"expected_loc": expected_loc,
"expected_name": expected_name,
"brand_contact": brand_contact,
"brand_contact_id": brand_contact_id,
"bdata": bdata,
})
except Exception as e:
summary["errors"] += 1
item["status"] = "error"
item["error"] = str(e)
log(f" ERROR: {e}")
if item not in results:
results.append(item)
# ---- Fase 2: paralelizar creates ----
if not dry_run and creates_planned:
workers = min(max_parallel or len(creates_planned), len(creates_planned), 20)
log(f"\nAplicando {len(creates_planned)} creates en paralelo (max_workers={workers})...")
def _do_create(plan):
# Solo la llamada HTTP. NO toca SQLite ni indices en memoria.
return sync_engine.ghl_client.create_contact(plan["target_token"], plan["payload"])
t0 = datetime.now()
with ThreadPoolExecutor(max_workers=workers) as executor:
future_to_plan = {executor.submit(_do_create, plan): plan for plan in creates_planned}
for future in as_completed(future_to_plan):
plan = future_to_plan[future]
item = plan["item"]
try:
res = future.result()
new_contact_id = (res.get("contact") or {}).get("id") or res.get("id")
if not new_contact_id:
raise RuntimeError(f"GHL no devolvio id de contacto creado. Respuesta: {res}")
summary["contacts_created"] += 1
item["actions"][-1]["result"] = {"branch_contact_id": new_contact_id}
tlog(f" [OK] {item['name']} -> {plan['expected_name']}: {new_contact_id}")
# Upsert SQLite e indices en thread principal (post-future).
try:
upsert_contact_in_db(conn, new_contact_id, plan["payload"], plan["expected_loc"])
except Exception as db_exc:
tlog(f" WARN: no se pudo upsert contacto en SQLite: {db_exc}")
# Refresh autoritativo desde Bucéfalo.
try:
ref = sync_engine.refresh_contact_in_db(plan["target_token"], new_contact_id, plan["expected_loc"])
if not ref.get("ok"):
tlog(f" WARN: refresh_contact_in_db fallo: {ref.get('error')}")
summary.setdefault("local_refresh_errors", 0)
summary["local_refresh_errors"] += 1
except Exception as ref_exc:
tlog(f" WARN: refresh_contact_in_db excepcion: {ref_exc}")
# Indexar en memoria para que duplicados intra-batch se detecten.
new_c = {
"id": new_contact_id,
"first_name": plan["brand_contact"].get("first_name"),
"last_name": plan["brand_contact"].get("last_name"),
"phone": plan["brand_contact"].get("phone"),
"email": plan["brand_contact"].get("email"),
"_loc": plan["expected_loc"],
"_loc_name": plan["expected_name"],
}
p = normalize_phone(new_c.get("phone"))
e = normalize_email(new_c.get("email"))
n = normalize_name(new_c.get("first_name"), new_c.get("last_name"))
if p:
plan["bdata"]["idx_phone"].setdefault(p, new_c)
global_idx_phone.setdefault(p, new_c)
if e:
plan["bdata"]["idx_email"].setdefault(e, new_c)
global_idx_email.setdefault(e, new_c)
if n and not p and not e:
plan["bdata"]["idx_name"].setdefault(n, new_c)
global_idx_name.setdefault(n, new_c)
if run_id:
cid = script_audit.record_change(
run_id, plan["expected_loc"], "contact", new_contact_id,
"", "created", None,
{"phone": new_c["phone"], "email": new_c["email"],
"name": f"{new_c['first_name']} {new_c['last_name']}".strip(),
"source": "brand", "source_contact_id": plan["brand_contact_id"]},
)
if cid:
script_audit.mark_change(cid, "applied")
except Exception as e:
summary["errors"] += 1
item["status"] = "error"
item["error"] = str(e)
tlog(f" [ERROR] {item['name']} -> {plan['expected_name']}: {e}")
elapsed = (datetime.now() - t0).total_seconds()
log(f"Creates paralelos terminados en {elapsed:.1f}s ({len(creates_planned)} requests).")
# ---- Resumen ----
log(f"\n=== RESUMEN ===")
log(f" Candidatos : {summary['candidates']}")
log(f" Contactos {'a crear' if dry_run else 'creados'} : {summary['contacts_created']}")
log(f" Ya existian en destino : {summary['skipped_already_in_branch']}")
log(f" Ya existian en OTRA suc : {summary['skipped_in_other_branch']}")
log(f" Skip sin TIENDA : {summary['skipped_no_tienda']}")
log(f" Skip TIENDA desconocida : {summary['skipped_unknown_tienda']}")
if summary.get("phone_collisions_unresolved"):
log(f" Colisiones telefono sin match (revision manual): {summary['phone_collisions_unresolved']}")
log(f" Errores : {summary['errors']}")
return {
"dry_run": dry_run,
"summary": summary,
"items": results,
}
finally:
conn.close()
def _empty_summary():
return {
"candidates": 0,
"contacts_created": 0,
"skipped_already_in_branch": 0,
"skipped_in_other_branch": 0,
"skipped_no_tienda": 0,
"skipped_unknown_tienda": 0,
"phone_collisions_unresolved": 0,
"errors": 0,
}
def main():
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--apply", action="store_true", help="Ejecuta las escrituras en GHL. Default dry-run.")
parser.add_argument("--yes", action="store_true", help="Skip confirmacion interactiva.")
parser.add_argument("--only-contact", action="append", default=[], help="Procesa solo el contact_id dado (repetible).")
parser.add_argument("--run-id", type=str, default=None, help="Id de script_audit. Solo aplica con --apply.")
parser.add_argument("--json", action="store_true", help="Imprime el resultado como JSON.")
args = parser.parse_args()
dry_run = not args.apply
if not dry_run and not args.yes:
confirm = input("Esto escribira en GHL. Continuar? (y/N): ").strip().lower()
if confirm not in ("y", "yes", "s", "si", ""):
print("Cancelado.")
sys.exit(0)
try:
result = run_sync(
contact_ids=args.only_contact or None,
dry_run=dry_run,
log=safe_print,
run_id=args.run_id,
)
except FileNotFoundError as e:
safe_print(f"ERROR: {e}")
sys.exit(2)
except RuntimeError as e:
safe_print(f"ERROR: {e}")
sys.exit(3)
if args.json:
safe_print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
if __name__ == "__main__":
main()