#!/usr/bin/env python3 # -*- coding: utf-8 -*- """sync_missing_contacts_to_brand.py Sincroniza al CRM de Marca los contactos que existen en sucursal y NO tienen contraparte en Marca (bucket "contacts_in_branch_not_in_brand" del audit audit_brand_vs_branches_totals). Flujo por cada contacto de sucursal sin contraparte en Marca: 1. Doble verificacion en Marca antes de crear, buscando por: a) telefono normalizado (ultimos 10 digitos) b) email (lowercase) c) nombre completo (normalizado: sin acentos, lowercase, espacios colapsados) 2. Si match en cualquiera de los 3 criterios: skip (ya existe, falso positivo del audit que solo busca por phone/email). 3. Si no existe: crea el contacto en Marca con todos los datos basicos y los custom fields mapeados por nombre del schema de la sucursal al de Marca. NO crea oportunidades. Si el contacto tenia opps en sucursal, esas se sincronizan con `sync_missing_opps_to_brand.py` (que ya gestiona contactos faltantes ad-hoc). Modos: - dry-run (default): no escribe nada en GHL, solo planea. - --apply: ejecuta las escrituras y registra cada cambio en script_audit (rollback disponible desde el dashboard). Uso CLI: python scripts/sync_missing_contacts_to_brand.py python scripts/sync_missing_contacts_to_brand.py --apply --yes python scripts/sync_missing_contacts_to_brand.py --apply --run-id python scripts/sync_missing_contacts_to_brand.py --only-contact python scripts/sync_missing_contacts_to_brand.py --json Para uso programatico (desde el endpoint /api/comparativa/sync-missing-contacts): from scripts.sync_missing_contacts_to_brand import run_sync result = run_sync(dry_run=True) """ import argparse import json import os import sqlite3 import sys from datetime import datetime ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) if SCRIPTS_DIR not in sys.path: sys.path.insert(0, SCRIPTS_DIR) import script_audit # noqa: E402 import sync_engine # noqa: E402 from audit_brand_vs_branches_totals import ( # noqa: E402 run_audit, normalize_phone, normalize_email, resolve_contact_link_field_id, extract_contact_link_value, ) from sync_missing_opps_to_brand import ( # noqa: E402 BRAND_LOCATION_ID, DB_PATH, build_brand_contact_payload, find_brand_contact, index_brand_contacts, load_branch_contact, load_brand_contacts, load_schemas_id_to_name, load_schemas_name_to_id, normalize_name, safe_print, upsert_brand_contact_in_db, ) SCRIPT_NAME = os.path.basename(__file__) def run_sync(contact_ids=None, dry_run=True, log=None, run_id=None): """Ejecuta la creacion masiva de contactos en Marca. Devuelve dict serializable. Args: contact_ids: lista opcional de contact_ids de sucursal a procesar. None = todos los del bucket contacts_in_branch_not_in_brand. dry_run: True (default) = no escribe en GHL. log: funcion opcional log(line) para streaming. run_id: id de script_audit para registrar cambios cuando apply. """ if log is None: log = safe_print if not os.path.exists(DB_PATH): raise FileNotFoundError(f"No existe {DB_PATH}. Corre una sincronizacion global primero.") log(f"[{datetime.now().strftime('%H:%M:%S')}] === sync_missing_contacts_to_brand ===") log(f"Modo: {'DRY-RUN (no escribe)' if dry_run else 'APPLY (escribe en GHL)'}") log("Calculando bucket contacts_in_branch_not_in_brand desde audit...") audit_data = run_audit(limit_missing=None) missing = audit_data["missing"]["contacts_in_branch_not_in_brand"] targets = missing["items"] if contact_ids: wanted = set(contact_ids) targets = [t for t in targets if t["id"] in wanted] log(f"Contactos candidatos: {len(targets)} (total en bucket: {missing['total']})") if not targets: return { "dry_run": dry_run, "summary": { "candidates": 0, "contacts_created": 0, "skipped_already_in_brand": 0, "errors": 0, }, "items": [], } tokens = sync_engine.get_tokens_map() brand_token = tokens.get(BRAND_LOCATION_ID) if not brand_token: raise RuntimeError("No se encontro token para la cuenta de Marca en el CSV de tokens.") conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row try: brand_contact_schema_name_to_id = load_schemas_name_to_id(conn, BRAND_LOCATION_ID, "contact") if not brand_contact_schema_name_to_id: log("ADVERTENCIA: schema de contactos de Marca vacio. Corre sync de metadata para que los custom fields se mapeen.") brand_contacts = load_brand_contacts(conn) idx_phone, idx_email, idx_name = index_brand_contacts(brand_contacts) log(f"Indice Marca: {len(brand_contacts)} contactos.") # Índice DETERMINÍSTICO por "ID Contacto Sucursal" (paralelo al match por id # de opps). Match principal antes de la cascada phone/email/name. brand_contact_link_field_id = resolve_contact_link_field_id(conn, BRAND_LOCATION_ID) brand_contacts_by_link = {} if brand_contact_link_field_id: for bc in brand_contacts: v = extract_contact_link_value(bc.get("custom_fields_json"), brand_contact_link_field_id) if v: brand_contacts_by_link.setdefault(v, bc) log(f"Índice por 'ID Contacto Sucursal': {len(brand_contacts_by_link)} contactos Marca con link poblado.") results = [] summary = { "candidates": len(targets), "contacts_created": 0, "skipped_already_in_brand": 0, "errors": 0, } for idx, target in enumerate(targets, 1): branch_loc_id = target["branch_location_id"] branch_name = target.get("branch_name", "") branch_contact_id = target["id"] display_name = target.get("name") or "(sin nombre)" log(f"\n[{idx}/{len(targets)}] Contacto {branch_contact_id} | {display_name} | sucursal: {branch_name}") item = { "branch_contact_id": branch_contact_id, "branch_location_id": branch_loc_id, "branch_name": branch_name, "name": display_name, "phone": target.get("phone"), "email": target.get("email"), "opps_in_branch": target.get("opps_in_branch", 0), "actions": [], "status": "pending", "error": None, } try: branch_contact = load_branch_contact(conn, branch_loc_id, branch_contact_id) if not branch_contact: raise RuntimeError(f"No se encontro el contacto {branch_contact_id} en SQLite") # Match PRINCIPAL determinístico por "ID Contacto Sucursal". # Si una opp de Marca tiene como link el id de este contacto de # sucursal -> ya existe en Marca, skip (el audit pudo dejarlo en # missing por sync de SQLite viejo). linked = brand_contacts_by_link.get(branch_contact_id) if linked: log(f" Match por 'ID Contacto Sucursal' -> contacto Marca {linked.get('id')}. Skip (ya existe).") item["status"] = "skipped_already_in_brand" item["actions"].append({ "action": "skip_already_in_brand", "strategy": "id_contacto_sucursal", "brand_contact_id": linked.get("id"), }) summary["skipped_already_in_brand"] += 1 results.append(item) continue # Double-check: el audit busca por phone/email; aqui agregamos nombre. match, strategy, collision = find_brand_contact(branch_contact, idx_phone, idx_email, idx_name) if collision and not match: # Phone idéntico a un contacto en Marca pero el nombre # diverge — caso pareja con mismo número. Crear este # contacto en Marca de forma normal NO es seguro porque # algunas integraciones lo detectarán como duplicado y # mergearán/eliminarán. Reportamos y dejamos para # revisión manual. log( f" COLISION TELEFONO con contacto Marca {collision.get('id')} " f"({(collision.get('first_name') or '') + ' ' + (collision.get('last_name') or '')!r}). " "Skip para revision manual." ) item["status"] = "skipped_phone_collision" item["actions"].append({ "action": "skip_phone_collision", "colliding_brand_contact_id": collision.get("id"), "colliding_brand_name": f"{collision.get('first_name') or ''} {collision.get('last_name') or ''}".strip(), }) summary.setdefault("phone_collisions_unresolved", 0) summary["phone_collisions_unresolved"] += 1 if run_id: try: script_audit.record_change( run_id, BRAND_LOCATION_ID, "contact", branch_contact_id, "", "skipped_phone_collision", None, { "branch_contact_id": branch_contact_id, "branch_location_id": branch_loc_id, "colliding_brand_contact_id": collision.get("id"), "phone": branch_contact.get("phone"), }, ) except Exception as audit_exc: log(f" WARN: no se pudo registrar la colision en script_audit: {audit_exc}") results.append(item) continue if match: log(f" YA EXISTE en Marca por {strategy}: {match['id']}. Skip.") item["status"] = "skipped" item["actions"].append({ "action": "skip_already_in_brand", "strategy": strategy, "brand_contact_id": match["id"], }) summary["skipped_already_in_brand"] += 1 results.append(item) continue # Construir payload y crear. branch_contact_schema_id_to_name = load_schemas_id_to_name(conn, branch_loc_id, "contact") payload = build_brand_contact_payload( branch_contact, branch_contact_schema_id_to_name, brand_contact_schema_name_to_id, ) log(f" Plan: CREAR en Marca (cf_count={len(payload.get('customFields', []))})") item["actions"].append({ "action": "create_contact", "payload_preview": _preview_payload(payload), }) if not dry_run: res = sync_engine.ghl_client.create_contact(brand_token, payload) brand_contact_id = (res.get("contact") or {}).get("id") or res.get("id") if not brand_contact_id: raise RuntimeError(f"GHL no devolvio id de contacto creado. Respuesta: {res}") summary["contacts_created"] += 1 log(f" Contacto creado en Marca: {brand_contact_id}") item["actions"][-1]["result"] = {"brand_contact_id": brand_contact_id} # Replicar en SQLite local para mantener snapshot sincronizado. try: upsert_brand_contact_in_db(conn, brand_contact_id, payload) except Exception as db_exc: log(f" WARN: no se pudo upsert contacto en SQLite: {db_exc}") # Refresh autoritativo desde Bucéfalo para garantizar 1:1 # (CFs auto-poblados por GHL, tags, dateAdded reales). try: ref = sync_engine.refresh_contact_in_db(brand_token, brand_contact_id, BRAND_LOCATION_ID) if not ref.get("ok"): log(f" WARN: refresh_contact_in_db fallo: {ref.get('error')}") summary.setdefault("local_refresh_errors", 0) summary["local_refresh_errors"] += 1 except Exception as ref_exc: log(f" WARN: refresh_contact_in_db excepcion: {ref_exc}") # Indexar el nuevo en memoria para detectar duplicados intra-batch. new_c = { "id": brand_contact_id, "first_name": branch_contact.get("first_name"), "last_name": branch_contact.get("last_name"), "phone": branch_contact.get("phone"), "email": branch_contact.get("email"), } p = normalize_phone(new_c.get("phone")) e = normalize_email(new_c.get("email")) n = normalize_name(new_c.get("first_name"), new_c.get("last_name")) if p: idx_phone.setdefault(p, new_c) if e: idx_email.setdefault(e, new_c) if n: idx_name.setdefault(n, new_c) if run_id: cid = script_audit.record_change( run_id, BRAND_LOCATION_ID, "contact", brand_contact_id, "", "created", None, {"phone": new_c["phone"], "email": new_c["email"], "name": f"{new_c['first_name']} {new_c['last_name']}".strip(), "source_branch": branch_loc_id}, ) if cid: script_audit.mark_change(cid, "applied") else: summary["contacts_created"] += 1 item["status"] = "created" results.append(item) except Exception as e: summary["errors"] += 1 item["status"] = "error" item["error"] = str(e) log(f" ERROR: {e}") results.append(item) log(f"\n=== RESUMEN ===") log(f" Candidatos : {summary['candidates']}") log(f" Contactos {'a crear' if dry_run else 'creados'}: {summary['contacts_created']}") log(f" Ya existian en Marca: {summary['skipped_already_in_brand']}") if summary.get("phone_collisions_unresolved"): log(f" Colisiones telefono sin match (revision manual): {summary['phone_collisions_unresolved']}") log(f" Errores : {summary['errors']}") return { "dry_run": dry_run, "summary": summary, "items": results, } finally: conn.close() def _preview_payload(payload): cf_count = len(payload.get("customFields", [])) p = {k: v for k, v in payload.items() if k != "customFields"} if cf_count: p["customFields_count"] = cf_count return p def main(): parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) parser.add_argument("--apply", action="store_true", help="Ejecuta las escrituras en GHL. Por default es dry-run.") parser.add_argument("--yes", action="store_true", help="Skip confirmacion interactiva.") parser.add_argument("--only-contact", action="append", default=[], help="Procesa solo el contact_id dado (puede repetirse).") parser.add_argument("--run-id", type=str, default=None, help="Id de script_audit. Solo aplica con --apply.") parser.add_argument("--json", action="store_true", help="Imprime el resultado como JSON al final.") args = parser.parse_args() dry_run = not args.apply if not dry_run and not args.yes: confirm = input("Esto escribira en GHL. Continuar? (y/N): ").strip().lower() if confirm not in ("y", "yes", "s", "si", "sí"): print("Cancelado.") sys.exit(0) try: result = run_sync( contact_ids=args.only_contact or None, dry_run=dry_run, log=safe_print, run_id=args.run_id, ) except FileNotFoundError as e: safe_print(f"ERROR: {e}") sys.exit(2) except RuntimeError as e: safe_print(f"ERROR: {e}") sys.exit(3) if args.json: safe_print(json.dumps(result, ensure_ascii=False, indent=2, default=str)) if __name__ == "__main__": main()