#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Reconcile and sync opportunities from Branches to Brand with 1-hour margin of past Latency and Fuzzy Matching.""" import argparse import csv import os import sys import sqlite3 import json import re import time import unicodedata from datetime import datetime ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) import sync_engine import db import error_logging import script_audit from paths import DB_PATH BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3" # Verificador CSV: fuente AUTORITATIVA de Sucursal y TIENDA por location_id. # Misma fuente que usa scripts/fill_sucursal_tienda_from_location.py. # Formato: # Sucursal = "Atlacomulco, Estado de México" (texto completo) # TIENDA = "ATLACOMULCO" (código corto) VERIFIER_FILENAME = "Monte Providencia - Verificador de sucursales y correos - Sucursales.csv" # Valores válidos del campo Modalidad de Empeño (doc MP). MODALIDAD_EMPENO_VALUES = ["Sin Dejarlo (GPS)", "Tradicional (Resguardo)"] # Acumulador global de errores de ejecución para imprimir al final del run. EXECUTION_ERRORS = [] # Campos de oportunidad con LOGICA DE NEGOCIO DEDICADA mas abajo (fallbacks, # verificador de Sucursal/TIENDA, construccion de Vehiculo, normalizacion de # Modalidad, etc.). Se excluyen de la copia dinamica generica para no pisar esa # logica; los demas custom fields (incluido "ID Oportunidad Sucursal" y cualquier # campo nuevo) se copian tal cual sucursal -> Marca. _SPECIAL_OVERRIDE_FIELDS_NORM = { "canal de origen", "fuente de prospecto", "sucursal", "tienda", "vehiculo", "modalidad de empeno", "persona que atendio al prospecto", "visita a sucursal", "fecha de ultima visita a sucursal", } def _norm_field_name(name): """Normaliza nombre de campo: sin acentos, minusculas, espacios colapsados.""" s = unicodedata.normalize("NFKD", str(name or "")) s = "".join(c for c in s if not unicodedata.combining(c)) return " ".join(s.lower().split()) def _record_error(category, message, context=None, exc=None): """Acumula errores para el resumen final + persiste en error_log si hay excepción.""" entry = { "category": category, "message": message, "context": context or {}, "error_id": None, } if exc is not None: try: entry["error_id"] = error_logging.log_error(category, exc, context or {}) except Exception: entry["error_id"] = None EXECUTION_ERRORS.append(entry) return entry def load_verifier_map(): """ Carga el verificador CSV como fuente AUTORITATIVA de Sucursal y TIENDA. Retorna {location_id: {"sucursal": str, "tienda": str, "sc_name": str}}. Ignora la cuenta principal (SUCURSAL == "-") y filas sin TIENDA válido. """ path = os.path.join(ROOT_DIR, VERIFIER_FILENAME) if not os.path.exists(path): raise FileNotFoundError(f"Verificador no encontrado: {path}") result = {} with open(path, mode="r", encoding="utf-8-sig", newline="") as fh: for row in csv.DictReader(fh): loc_id = (row.get("ID LOCATION BUCEFALO") or "").strip() sucursal = (row.get("SUCURSAL") or "").strip() tienda = (row.get("TIENDA") or "").strip() sc_name = (row.get("SC BUCEFALO") or "").strip() if not loc_id or sucursal == "-" or tienda in ("-", "CUENTA PRINCIPAL", ""): continue if loc_id not in result: result[loc_id] = {"sucursal": sucursal, "tienda": tienda, "sc_name": sc_name} return result def normalize_modalidad(raw_value): """Devuelve el valor canónico de Modalidad de Empeño o el original si no se reconoce.""" if not raw_value: return None text = remove_accents(str(raw_value)).lower() if "gps" in text or "sin dejar" in text: return "Sin Dejarlo (GPS)" if "resguardo" in text or "tradicional" in text or "dejar auto" in text: return "Tradicional (Resguardo)" return str(raw_value) def has_meaningful_vehicle_info(marca, version, ano): """Vehículo válido = al menos marca o versión. Solo año no es información útil.""" has_text = bool((marca or "").strip()) or bool((version or "").strip()) return has_text def safe_print(*args, **kwargs): sep = kwargs.get("sep", " ") end = kwargs.get("end", "\n") text = sep.join(str(arg) for arg in args) encoding = sys.stdout.encoding or "utf-8" try: sys.stdout.write(text + end) sys.stdout.flush() except UnicodeEncodeError: try: safe_text = text.encode(encoding, errors="replace").decode(encoding) sys.stdout.write(safe_text + end) sys.stdout.flush() except Exception: try: safe_text = text.encode("ascii", errors="replace").decode("ascii") sys.stdout.write(safe_text + end) sys.stdout.flush() except Exception: pass def normalize_phone(phone): if not phone: return "" return re.sub(r"\D+", "", str(phone)) def get_clean_phone_10d(phone): norm = re.sub(r"\D+", "", str(phone or "")) if len(norm) >= 10: return norm[-10:] return norm def remove_accents(text): if not text: return "" accents_map = { 'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u', 'ü': 'u', 'ñ': 'n', 'Á': 'a', 'É': 'e', 'Í': 'i', 'Ó': 'o', 'Ú': 'u', 'Ü': 'u', 'Ñ': 'n' } clean_chars = [] for char in text: clean_chars.append(accents_map.get(char, char.lower())) cleaned = "".join(clean_chars) cleaned = re.sub(r"[^\w\s]", "", cleaned) # quitar puntuación y caracteres corruptos de codificación return " ".join(cleaned.split()) def extract_tienda_label(account_nombre): if not account_nombre: return None parts = str(account_nombre).split(" - ") if len(parts) < 3: return None raw = " - ".join(parts[2:]).strip() return raw.upper() if raw else None def fetch_opportunities_paginated(token, location_id): """ Obtiene todas las oportunidades con paginación completa, acumulando y deduplicando por ID en cumplimiento de la guía de Monte Providencia. Sin tope de cantidad: solo corta la paginación nativa de GHL (batch vacío repetido, batch menor al limit o total alcanzado). Retorna (unique_opps, pages_queried, raw_received, duplicates_discarded, errors). """ opportunities = [] seen_ids = set() pages_queried = 0 raw_received = 0 duplicates_discarded = 0 errors = [] page = 1 limit = 100 empty_pages = 0 while True: pages_queried += 1 try: data = sync_engine.ghl_client.search_opportunities(token, location_id, limit=limit, page=page) except Exception as e: err_msg = str(e) errors.append(f"Fallo en pagina {page}: {err_msg}") _record_error( "ghl_get_all_opportunities_page_failed", f"Fallo paginación opps en {location_id} pagina {page}: {err_msg}", {"location_id": location_id, "page": page, "accumulated_opps": len(opportunities)}, exc=e, ) break batch = data.get("opportunities", []) or [] if not batch: empty_pages += 1 if empty_pages >= 2: break page += 1 continue empty_pages = 0 raw_received += len(batch) for opp in batch: opp_id = opp.get("id") if not opp_id: continue if opp_id in seen_ids: duplicates_discarded += 1 continue seen_ids.add(opp_id) opportunities.append(opp) total_reported = data.get("total", 0) or 0 if len(batch) < limit or (total_reported > 0 and len(opportunities) >= total_reported): break page += 1 return opportunities, pages_queried, raw_received, duplicates_discarded, errors def normalize_match_text(text): return remove_accents(str(text or "")).strip().lower() def custom_fields_by_name(obj, id_to_name): values = {} for cf in obj.get("customFields", []) or []: cf_id = cf.get("id") cf_val = cf.get("fieldValueString") or cf.get("value") cf_name = id_to_name.get(cf_id) if cf_name and cf_val is not None: values[cf_name] = cf_val return values def branch_affinity_score(brand_custom_fields_by_name, branch_values): expected_values = [normalize_match_text(v) for v in branch_values if normalize_match_text(v)] if not expected_values: return 0 brand_values = [] for field_name in ("Sucursal", "TIENDA", "Tienda"): value = brand_custom_fields_by_name.get(field_name) if value: brand_values.append(normalize_match_text(value)) if not brand_values: return 0 for brand_value in brand_values: for expected in expected_values: if brand_value == expected or brand_value in expected or expected in brand_value: return 30 return -25 def parse_date_string(date_str): if not date_str: return None clean_str = date_str.replace("T", " ").replace("Z", "") if "." in clean_str: clean_str = clean_str.split(".")[0] # Cortar timezone offsets si existen (+00:00, -06:00, etc.) clean_str = re.split(r"[\+\-]\d{2}", clean_str)[0].strip() for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%d/%m/%Y %H:%M:%S"): try: return datetime.strptime(clean_str, fmt) except Exception: pass return None def are_dates_within_one_hour(date_str1, date_str2): d1 = parse_date_string(date_str1) d2 = parse_date_string(date_str2) if not d1 or not d2: return False diff = abs((d1 - d2).total_seconds()) return diff <= 3600 # 3600 seconds = 1 hour def calculate_opportunity_similarity(so, bo, brand_custom_fields_by_name=None, branch_values=None): """ Calcula un puntaje de similitud (0 a 100) entre la oportunidad de sucursal (so) y la de marca (bo) para encontrar el par mas semejante. """ if so["id"] == bo["id"]: return 100 score = 0 so_name = normalize_match_text(so.get("name")) bo_name = normalize_match_text(bo.get("name")) # 1. Match de Nombre Exacto if so_name == bo_name: score += 50 # 2. Match de Nombre Parcial elif so_name and bo_name: if so_name in bo_name or bo_name in so_name: score += 30 else: # Overlap de palabras so_words = set(so_name.split()) bo_words = set(bo_name.split()) overlap = so_words.intersection(bo_words) if len(overlap) > 0: score += min(25, len(overlap) * 8) # 3. Match de Valor Monetario so_val = so["monetary_value"] or 0.0 bo_val = bo["monetary_value"] or 0.0 if abs(so_val - bo_val) < 0.01: score += 15 elif max(so_val, bo_val) > 0 and abs(so_val - bo_val) / max(1.0, so_val, bo_val) < 0.2: score += 10 # 4. Match de Estado (Status) if so["status"] == bo["status"]: score += 10 # 5. Proximidad temporal de creación if are_dates_within_one_hour(so.get("date_added"), bo.get("date_added") or bo.get("createdAt")): score += 15 # 6. Afinidad de sucursal/TIENDA cuando la oportunidad de Marca ya la tiene poblada if brand_custom_fields_by_name is not None: score += branch_affinity_score(brand_custom_fields_by_name, branch_values or []) return score def apply_brand_opportunity_update(brand_token, opportunity_id, opp_data, target_status=None): """ GHL expone el cambio de estado en un endpoint dedicado. El PUT general puede aceptar otros campos sin reflejar won/lost/abandoned en los reportes. """ sync_engine.ghl_client.update_opportunity(brand_token, opportunity_id, opp_data) if target_status: sync_engine.ghl_client.update_opportunity_status(brand_token, opportunity_id, target_status) def is_duplicate_opportunity_error(err): response = getattr(err, "response", None) if response is None or response.status_code != 400: return False return "duplicate opportunity" in (response.text or "").lower() def _find_stage_by_name(brand_stages, target_name): target_norm = remove_accents(target_name).strip().lower() for bs in brand_stages: if remove_accents(bs.get("name", "")).strip().lower() == target_norm: return bs return None def map_stage_branch_to_brand(conn, branch_loc_id, branch_pid, branch_sid, brand_pipelines): """ Mapea dinámicamente un pipeline_stage_id de sucursal al pipeline_stage_id de Marca. Estrategia: 1. Pipeline destino = "Standar" / "Standard" (o el primero como fallback). 2. Stage destino = match por nombre de etapa (sin acentos, case-insensitive). 3. Fallback seguro = etapa "PROSPECTO NUEVO" en Marca (alineado con doc MP: "oportunidades huérfanas se reubican o marcan para revisión"). 4. Si tampoco existe "PROSPECTO NUEVO", primera etapa del pipeline + warning. Retorna (brand_pipeline_id, brand_stage_id, warning_str_or_None). """ if not brand_pipelines: return None, None, "Marca no tiene pipelines cargados" brand_pipe = None for bp in brand_pipelines: bp_name = bp.get("name", "").lower() if "standar" in bp_name or "standard" in bp_name: brand_pipe = bp break if not brand_pipe: brand_pipe = brand_pipelines[0] brand_pid_mapped = brand_pipe["id"] brand_stages = brand_pipe.get("stages", []) if not brand_stages: return brand_pid_mapped, None, f"Pipeline de Marca '{brand_pipe.get('name')}' no tiene etapas" # 1. Match por nombre exacto branch_stages_list = [] try: row = conn.execute( "SELECT stages_json FROM pipelines WHERE location_id = ? AND id = ?", (branch_loc_id, branch_pid) ).fetchone() if row: try: branch_stages_list = json.loads(row["stages_json"]) branch_stages_list = sorted(branch_stages_list, key=lambda x: x.get("order", 0)) except Exception: pass except Exception: pass branch_stage_name = None if branch_stages_list: for s in branch_stages_list: if s["id"] == branch_sid: branch_stage_name = s.get("name") break if branch_stage_name: matched = _find_stage_by_name(brand_stages, branch_stage_name) if matched: return brand_pid_mapped, matched["id"], None # 2. Fallback seguro: PROSPECTO NUEVO prospecto_nuevo = _find_stage_by_name(brand_stages, "PROSPECTO NUEVO") if prospecto_nuevo: warning = ( f"Stage de sucursal '{branch_stage_name or branch_sid}' sin match por nombre en Marca; " f"se reubica en 'PROSPECTO NUEVO' (revisar)." ) return brand_pid_mapped, prospecto_nuevo["id"], warning # 3. Última opción: primera etapa warning = ( f"Stage '{branch_stage_name or branch_sid}' sin match y no existe 'PROSPECTO NUEVO' en Marca; " f"se usa la primera etapa '{brand_stages[0].get('name')}' (revisar manualmente)." ) return brand_pid_mapped, brand_stages[0]["id"], warning def main(): parser = argparse.ArgumentParser(description="Reconcilia y sincroniza oportunidades de sucursales a la cuenta de Marca con prioridades de busqueda.") parser.add_argument("--dry-run", action="store_true", help="Simula los cambios sin realizar peticiones de escritura a GHL.") parser.add_argument("--location", help="ID de ubicacion especifica para procesar (por defecto todas las sucursales)") parser.add_argument("--run-id", help="ID de auditoria para la ejecucion del script") parser.add_argument("--updates-only", action="store_true", help="Solo realiza actualizaciones (PUT) de oportunidades existentes, ignorando la creación de contactos y de nuevas oportunidades.") parser.add_argument("--no-contacts", action="store_true", help="Evita crear nuevos contactos en la Marca Principal.") parser.add_argument("--no-creations", action="store_true", help="Evita crear nuevas oportunidades (POST) en la Marca Principal.") parser.add_argument("--no-updates", action="store_true", help="Evita realizar actualizaciones (PUT) de oportunidades existentes.") args = parser.parse_args() # Resolver selección de acciones permitidas only_updates = args.updates_only allow_contacts = not (args.no_contacts or only_updates) allow_creations = not (args.no_creations or only_updates) allow_updates = not args.no_updates # Registrar el inicio en la auditoría si hay run_id if args.run_id: script_audit.update_run_status(args.run_id, "running") safe_print("\n" + "=" * 90) safe_print("RECONCILIACION Y SINCRONIZACION DE OPORTUNIDADES: SUCURSALES -> MARCA") safe_print("=" * 90) if args.dry_run: safe_print("MODO SIMULACION (DRY-RUN) - No se realizaran escrituras en GHL\n") if not os.path.exists(DB_PATH): safe_print(f"Error: La base de datos local no existe en {DB_PATH}.") if args.run_id: script_audit.update_run_status(args.run_id, "failed", error_message="SQLite database missing") sys.exit(1) # 1. Cargar catálogo de cuentas y tokens desde Bucéfalo try: accounts = sync_engine.parse_accounts_csv() brand_acc = next((a for a in accounts if a["location_id"] == BRAND_LOCATION_ID), None) if not brand_acc: raise Exception("No se encontro la cuenta de Marca Principal en el CSV de cuentas.") brand_token = brand_acc["token"] except Exception as e: safe_print(f"Error crítico al cargar tokens: {e}") if args.run_id: script_audit.update_run_status(args.run_id, "failed", error_message=str(e)) sys.exit(1) # 2. Cargar verificador CSV (fuente autoritativa de Sucursal/TIENDA por location_id) try: verifier_map = load_verifier_map() safe_print(f" Verificador CSV cargado: {len(verifier_map)} sucursales con Sucursal/TIENDA canónicos.") except Exception as e: safe_print(f"Error crítico al cargar verificador CSV: {e}") if args.run_id: script_audit.update_run_status(args.run_id, "failed", error_message=str(e)) sys.exit(1) # 2b. Reportar sucursales activas (no demo) del CSV de mesa de control que NO están # en el verificador. Estas opps no recibirán corrección automática de Sucursal/TIENDA. branch_accounts_all = [ a for a in accounts if a["location_id"] != BRAND_LOCATION_ID and "demo" not in a["nombre"].lower() ] missing_in_verifier = [a for a in branch_accounts_all if a["location_id"] not in verifier_map] if missing_in_verifier: safe_print(f" [ADVERTENCIA] {len(missing_in_verifier)} sucursales del CSV de mesa de control NO están en el verificador:") for acc in missing_in_verifier: safe_print(f" - {acc['location_id']}: {acc['nombre']}") _record_error( "sucursal_sin_entrada_en_verificador", f"Sucursal '{acc['nombre']}' ({acc['location_id']}) no está en el verificador CSV; " f"sus opps no recibirán corrección automática de Sucursal/TIENDA.", {"location_id": acc["location_id"], "nombre": acc["nombre"]}, ) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row try: # Inicializar estadísticas requeridas por la documentación de Monte Providencia stats = { "locations_processed": [], "pages_queried": 0, "records_received": 0, "records_unique_processed": 0, "duplicates_discarded": 0, "errors": {} } # Cargar esquemas dinámicos para la Marca Principal safe_print(" Obteniendo esquemas dinámicos de Marca Principal...") brand_contact_schema = sync_engine.ghl_client.get_object_schema(brand_token, BRAND_LOCATION_ID, "contact") brand_opp_schema = sync_engine.ghl_client.get_object_schema(brand_token, BRAND_LOCATION_ID, "opportunity") # Cargar contactos de Marca en SQLite brand_contacts = conn.execute("SELECT * FROM contacts WHERE location_id = ?", (BRAND_LOCATION_ID,)).fetchall() # Cargar pipelines de Marca de SQLite para el mapeo dinámico de etapas brand_pipelines_rows = conn.execute("SELECT * FROM pipelines WHERE location_id = ?", (BRAND_LOCATION_ID,)).fetchall() brand_pipelines = [] for r in brand_pipelines_rows: stages = [] try: stages = json.loads(r["stages_json"]) stages = sorted(stages, key=lambda x: x.get("order", 0)) except Exception: pass brand_pipelines.append({ "id": r["id"], "name": r["name"], "stages": stages }) # Cache invertido de schema de oportunidades de Marca (cf_id -> cf_name). # Antes se reconstruía 4 veces por oportunidad dentro del loop principal. brand_opp_name_by_id = {v: k for k, v in brand_opp_schema.items()} # Cargar oportunidades vivas de Marca de GHL para tener custom fields safe_print(f" Obteniendo oportunidades vivas de Marca Principal...") stats["locations_processed"].append(f"Marca Principal ({BRAND_LOCATION_ID})") try: live_brand_opps, b_pages, b_raw, b_dups, b_errs = fetch_opportunities_paginated( brand_token, BRAND_LOCATION_ID ) stats["pages_queried"] += b_pages stats["records_received"] += b_raw stats["duplicates_discarded"] += b_dups if b_errs: stats["errors"][BRAND_LOCATION_ID] = b_errs brand_opps_rows = [] for lo in live_brand_opps: norm_opp = dict(lo) norm_opp["pipeline_id"] = lo.get("pipelineId") norm_opp["pipeline_stage_id"] = lo.get("pipelineStageId") norm_opp["monetary_value"] = lo.get("monetaryValue") norm_opp["contact_id"] = lo.get("contactId") norm_opp["date_added"] = lo.get("createdAt") or lo.get("date_added") brand_opps_rows.append(norm_opp) stats["records_unique_processed"] += len(brand_opps_rows) except Exception as e: safe_print(f" [WARN] No se pudieron obtener las oportunidades vivas de la Marca GHL, usando base de datos local: {e}") _record_error("ghl_get_brand_opportunities_failed", str(e), {"location_id": BRAND_LOCATION_ID}, exc=e) stats["errors"][BRAND_LOCATION_ID] = [str(e)] sqlite_opps = conn.execute("SELECT * FROM opportunities WHERE location_id = ?", (BRAND_LOCATION_ID,)).fetchall() brand_opps_rows = [dict(row) for row in sqlite_opps] stats["records_unique_processed"] += len(brand_opps_rows) # Pre-computar custom fields por nombre para CADA brand opportunity (cache por bo id). brand_opp_cf_cache = {} for bo in brand_opps_rows: brand_opp_cf_cache[bo.get("id")] = custom_fields_by_name(bo, brand_opp_name_by_id) # Indexar contactos de la Marca Principal con prioridad estricta: # 1. Teléfono (últimos 10 dígitos), 2. Email, 3. Nombre Completo Normalizado (sin acentos ni caracteres corruptos) brand_contacts_by_phone = {} brand_contacts_by_email = {} brand_contacts_by_name = {} for bc in brand_contacts: phone_10d = get_clean_phone_10d(bc["phone"]) email = (bc["email"] or "").strip().lower() name_norm = remove_accents(f"{bc['first_name'] or ''} {bc['last_name'] or ''}") if phone_10d: brand_contacts_by_phone[phone_10d] = dict(bc) if email: brand_contacts_by_email[email] = dict(bc) if name_norm: brand_contacts_by_name[name_norm] = dict(bc) # Indexar oportunidades de la Marca por contact_id brand_opps_by_contact_id = {} for bo in brand_opps_rows: contact_key = bo.get("contact_id") or bo.get("contactId") brand_opps_by_contact_id.setdefault(contact_key, []).append(dict(bo)) used_brand_opp_ids_by_contact = {} # Determinar qué sucursales procesar if args.location: branch_accounts = [a for a in accounts if a["location_id"] == args.location and a["location_id"] != BRAND_LOCATION_ID] if not branch_accounts: safe_print(f"Error: La sucursal {args.location} no existe en el catalogo.") sys.exit(1) else: branch_accounts = [a for a in accounts if a["location_id"] != BRAND_LOCATION_ID and "demo" not in a["nombre"].lower()] safe_print(f"Iniciando reconciliacion para {len(branch_accounts)} sucursal(es)...") safe_print("-" * 90) total_contacts_created = 0 total_opps_created = 0 total_opps_updated = 0 for b_acc in branch_accounts: loc_id = b_acc["location_id"] b_name = b_acc["nombre"] branch_token = b_acc["token"] # Respetar cancelación / pausa del dashboard antes de procesar cada sucursal. if args.run_id and not script_audit.wait_if_paused_or_stopped(args.run_id): safe_print(f"[CANCELADO] Ejecución detenida por el usuario antes de procesar {b_name}.") break safe_print(f"Procesando sucursal: '{b_name}' ({loc_id})...") # Cargar esquemas dinámicos para la Sucursal (GET /objects/) safe_print(f" Obteniendo esquemas dinámicos de Sucursal '{b_name}'...") try: branch_contact_schema = sync_engine.ghl_client.get_object_schema(branch_token, loc_id, "contact") branch_opp_schema = sync_engine.ghl_client.get_object_schema(branch_token, loc_id, "opportunity") except Exception as e: _record_error("ghl_get_object_schema_failed", f"Fallo obteniendo schemas en {b_name}: {e}", {"location_id": loc_id}, exc=e) stats["errors"].setdefault(loc_id, []).append(f"Schema fetch falló: {e}") continue branch_contact_id_to_name = {v: k for k, v in branch_contact_schema.items()} branch_opp_id_to_name = {v: k for k, v in branch_opp_schema.items()} # Cargar contactos de esta sucursal en SQLite branch_contacts = conn.execute("SELECT * FROM contacts WHERE location_id = ?", (loc_id,)).fetchall() branch_contacts_by_id = { bc["id"]: dict(bc) for bc in branch_contacts } # Cargar oportunidades vivas de GHL para tener customFields safe_print(f" Obteniendo oportunidades vivas de GHL...") stats["locations_processed"].append(f"{b_name} ({loc_id})") try: live_branch_opps, b_pages, b_raw, b_dups, b_errs = fetch_opportunities_paginated( branch_token, loc_id ) stats["pages_queried"] += b_pages stats["records_received"] += b_raw stats["duplicates_discarded"] += b_dups if b_errs: stats["errors"][loc_id] = b_errs branch_opps = [] for lo in live_branch_opps: norm_opp = dict(lo) norm_opp["pipeline_id"] = lo.get("pipelineId") norm_opp["pipeline_stage_id"] = lo.get("pipelineStageId") norm_opp["monetary_value"] = lo.get("monetaryValue") norm_opp["contact_id"] = lo.get("contactId") norm_opp["date_added"] = lo.get("createdAt") or lo.get("date_added") branch_opps.append(norm_opp) stats["records_unique_processed"] += len(branch_opps) except Exception as e: safe_print(f" [WARN] No se pudieron obtener las oportunidades vivas de GHL, usando base de datos local: {e}") _record_error("ghl_get_branch_opportunities_failed", str(e), {"location_id": loc_id}, exc=e) stats["errors"][loc_id] = [str(e)] sqlite_opps = conn.execute("SELECT * FROM opportunities WHERE location_id = ?", (loc_id,)).fetchall() branch_opps = [dict(row) for row in sqlite_opps] stats["records_unique_processed"] += len(branch_opps) if not branch_opps: safe_print(" (No hay oportunidades para esta sucursal)") continue safe_print(f" Analizando {len(branch_opps)} oportunidades de sucursal...") branch_opp_count_by_contact_id = {} for branch_opp in branch_opps: branch_opp_count_by_contact_id[branch_opp.get("contact_id")] = branch_opp_count_by_contact_id.get(branch_opp.get("contact_id"), 0) + 1 for so in branch_opps: contact_id = so["contact_id"] sc = branch_contacts_by_id.get(contact_id) if not sc: continue # Oportunidad huérfana de contacto en la sucursal misma client_name = f"{sc['first_name'] or ''} {sc['last_name'] or ''}".strip() or "Sin nombre" client_phone = sc["phone"] or "" client_email = sc["email"] or "" # Extraer campos personalizados del contacto de la sucursal por nombre # (necesario antes de la creación para incluirlos en el POST) sc_custom_fields_by_name = {} if sc and sc.get("custom_fields_json"): try: cf_list = json.loads(sc["custom_fields_json"]) for cf in cf_list: cf_id = cf.get("id") cf_val = cf.get("value") cf_name = branch_contact_id_to_name.get(cf_id) if cf_name and cf_val is not None: sc_custom_fields_by_name[cf_name] = cf_val except Exception: pass # 1. Encontrar o crear el contacto en Marca Principal siguiendo la prioridad estricta: # Prioridad 1: Teléfono (últimos 10 dígitos) # Prioridad 2: Email # Prioridad 3: Nombre Completo Normalizado (sin acentos ni caracteres corruptos) norm_p_10d = get_clean_phone_10d(client_phone) norm_client_name = remove_accents(client_name) bc = None match_method = "" if norm_p_10d and norm_p_10d in brand_contacts_by_phone: bc = brand_contacts_by_phone[norm_p_10d] match_method = "TELEFONO" elif client_email and client_email.lower() in brand_contacts_by_email: bc = brand_contacts_by_email[client_email.lower()] match_method = "EMAIL" elif norm_client_name and norm_client_name in brand_contacts_by_name: bc = brand_contacts_by_name[norm_client_name] match_method = "NOMBRE" bc_id = None if not bc: # El contacto no existe en Marca, hay que crearlo! (si está permitido) if not allow_contacts: safe_print(f" [SALTADO] Se omitió la creación de contacto central para: '{client_name}' en Marca Principal (Acción no permitida).") continue if args.dry_run: safe_print(f" [SIMULACION] Crearia contacto central para: '{client_name}' en Marca Principal (No se encontro por Telefono, Email ni Nombre).") bc_id = "SIMULATED_BRAND_CONTACT_ID" total_contacts_created += 1 else: safe_print(f" Creando contacto central para: '{client_name}' en Marca Principal...") # Mapear custom fields del contacto de sucursal a los IDs del schema de Marca contact_custom_fields = [] for f_name, f_val in sc_custom_fields_by_name.items(): brand_cf_id = brand_contact_schema.get(f_name) if brand_cf_id and f_val is not None and str(f_val).strip(): contact_custom_fields.append({"id": brand_cf_id, "value": f_val}) contact_data = { "locationId": BRAND_LOCATION_ID, "firstName": sc["first_name"], "lastName": sc["last_name"], "email": sc["email"], "phone": sc["phone"] } if contact_custom_fields: contact_data["customFields"] = contact_custom_fields try: res = sync_engine.ghl_client.create_contact(brand_token, contact_data) bc_id = res.get("contact", {}).get("id") if not bc_id: raise Exception("Respuesta de GHL sin id de contacto") total_contacts_created += 1 if args.run_id: cid = script_audit.record_change( args.run_id, BRAND_LOCATION_ID, "contact", bc_id, "", "created", None, {"phone": sc["phone"], "email": sc["email"], "name": client_name}, ) if cid: script_audit.mark_change(cid, "applied") # Agregar temporalmente a nuestro indice en memoria new_contact = {"id": bc_id, "first_name": sc["first_name"], "last_name": sc["last_name"], "email": sc["email"], "phone": sc["phone"]} if norm_p_10d: brand_contacts_by_phone[norm_p_10d] = new_contact if client_email: brand_contacts_by_email[client_email.lower()] = new_contact if norm_client_name: brand_contacts_by_name[norm_client_name] = new_contact except Exception as err: err_entry = _record_error( "create_brand_contact_failed", f"[{b_name}] Fallo crear contacto '{client_name}' en Marca: {err}", {"location_id": loc_id, "branch_contact_id": contact_id, "client_name": client_name}, exc=err, ) safe_print(f" * Error creando contacto '{client_name}' en Marca: {err} (error_id={err_entry.get('error_id')})") continue else: bc_id = bc["id"] if not bc_id: continue # --- MAPEO DINÁMICO DE CAMPOS PERSONALIZADOS --- # Extraer campos personalizados de la oportunidad de la sucursal por nombre so_custom_fields_by_name = custom_fields_by_name(so, branch_opp_id_to_name) # sc_custom_fields_by_name ya fue extraído antes de la creación del contacto branch_match_values = [ b_name, loc_id, so_custom_fields_by_name.get("Sucursal"), so_custom_fields_by_name.get("TIENDA") or so_custom_fields_by_name.get("Tienda"), sc_custom_fields_by_name.get("Sucursal"), sc_custom_fields_by_name.get("TIENDA") or sc_custom_fields_by_name.get("Tienda"), ] # 2. Buscar si la oportunidad ya existe en la Marca Principal bajo este contacto brand_opps_for_contact = brand_opps_by_contact_id.get(bc_id, []) used_brand_opp_ids = used_brand_opp_ids_by_contact.setdefault(bc_id, set()) available_brand_opps_for_contact = [ bo for bo in brand_opps_for_contact if bo.get("id") not in used_brand_opp_ids ] def _bo_cf(bo): bo_id = bo.get("id") cached = brand_opp_cf_cache.get(bo_id) if cached is None: cached = custom_fields_by_name(bo, brand_opp_name_by_id) brand_opp_cf_cache[bo_id] = cached return cached matched_bo = None # Estrategia de conciliación robusta multicapa: # Capa 1. Mismo ID de oportunidad en GHL for bo in available_brand_opps_for_contact: if bo.get("id") == so.get("id"): matched_bo = bo break # Capa 2. Coincidencia por Nombre Exacto de oportunidad, desempatada por sucursal/fecha/valor if not matched_bo: exact_name_matches = [ bo for bo in available_brand_opps_for_contact if normalize_match_text(bo.get("name")) and normalize_match_text(bo.get("name")) == normalize_match_text(so.get("name")) ] if exact_name_matches: matched_bo = max( exact_name_matches, key=lambda bo: calculate_opportunity_similarity(so, bo, _bo_cf(bo), branch_match_values) ) # Capa 2.5. Única oportunidad, solo si también hay una sola oportunidad en la sucursal para ese contacto. if ( not matched_bo and len(available_brand_opps_for_contact) == 1 and branch_opp_count_by_contact_id.get(contact_id, 0) == 1 ): candidate_bo = available_brand_opps_for_contact[0] candidate_score = calculate_opportunity_similarity( so, candidate_bo, _bo_cf(candidate_bo), branch_match_values ) if candidate_score >= 25: matched_bo = candidate_bo # Capa 3. Proximidad de creación (rango temporal de 1 hora) if not matched_bo: for bo in available_brand_opps_for_contact: if are_dates_within_one_hour(so["date_added"], bo["date_added"]): matched_bo = bo break # Capa 4. Emparejamiento Heurístico / Similitud (Similitud >= 55%) if not matched_bo: best_fuzzy_bo = None best_fuzzy_score = 0 for bo in available_brand_opps_for_contact: score = calculate_opportunity_similarity(so, bo, _bo_cf(bo), branch_match_values) if score > best_fuzzy_score: best_fuzzy_score = score best_fuzzy_bo = bo if best_fuzzy_bo and best_fuzzy_score >= 55: matched_bo = best_fuzzy_bo # --- MAPEO DINÁMICO DE PIPELINE Y STAGE DE SUCURSAL A MARCA --- mapped_brand_pid, mapped_brand_sid, stage_warning = map_stage_branch_to_brand( conn, loc_id, so["pipeline_id"], so["pipeline_stage_id"], brand_pipelines ) if not mapped_brand_pid: mapped_brand_pid = so["pipeline_id"] if not mapped_brand_sid: mapped_brand_sid = so["pipeline_stage_id"] if stage_warning: _record_error( "stage_mapping_warning", f"[{b_name}] opp '{so.get('name')}' ({so.get('id')}): {stage_warning}", {"location_id": loc_id, "opportunity_id": so.get("id"), "branch_stage_id": so.get("pipeline_stage_id")}, ) # Mapear los campos de la oportunidad para la Marca Principal con reglas y fallbacks de Monte Providencia brand_opp_custom_values = {} # CAPA BASE - mapeo dinamico de TODOS los custom fields de la opp de # sucursal cuyo nombre NO tenga logica dedicada abajo. Garantiza que # campos sin tratamiento especial (incluido "ID Oportunidad Sucursal" # y cualquier campo nuevo) se propaguen sucursal -> Marca, en vez de # quedarse solo con los 9 campos historicos. Los campos especiales se # excluyen aqui y los calcula/valida la logica que sigue (override). for _src_fname, _src_fval in so_custom_fields_by_name.items(): if _norm_field_name(_src_fname) in _SPECIAL_OVERRIDE_FIELDS_NORM: continue if _src_fval is not None and str(_src_fval).strip() != "": brand_opp_custom_values[_src_fname] = _src_fval # 1. Copiar Canal de Origen con fallbacks (p. ej. si está vacío, heredar del contacto) canal = so_custom_fields_by_name.get("CANAL DE ORIGEN") or so_custom_fields_by_name.get("Canal de Origen") if not canal: canal = sc_custom_fields_by_name.get("CANAL DE ORIGEN") or sc_custom_fields_by_name.get("Canal de Origen") if not canal: contact_tags = [t.strip().lower() for t in (sc.get("tags") or "").split(",") if t.strip()] if "sucursal" in contact_tags: canal = "SUCURSAL" elif sc.get("source"): sc_source = str(sc.get("source") or "").upper() if any(kw in sc_source for kw in ["FACEBOOK", "FB", "INSTAGRAM", "IG", "WHATSAPP", "WA", "FORM"]): canal = sc_source else: canal = "SUCURSAL" else: canal = "SUCURSAL" if canal: brand_opp_custom_values["CANAL DE ORIGEN"] = canal brand_opp_custom_values["Canal de Origen"] = canal # 2. Copiar Fuente de Prospecto con validación y fallbacks fuente = so_custom_fields_by_name.get("Fuente de Prospecto") if not fuente: fuente = sc_custom_fields_by_name.get("Fuente de Prospecto") # Regla de Validación: SUCURSAL no puede tener fuente LEAD DIGITAL if canal == "SUCURSAL" and fuente == "LEAD DIGITAL": fuente = "SUCURSAL" elif not fuente: if canal in ["FORMULARIO", "FACEBOOK", "WHATSAPP", "INSTAGRAM", "LEAD DIGITAL"]: fuente = "LEAD DIGITAL" else: fuente = "SUCURSAL" if fuente: brand_opp_custom_values["Fuente de Prospecto"] = fuente # 3 + 4. Sucursal y TIENDA desde el VERIFICADOR CSV (fuente autoritativa). # Doc MP: "Sucursal/TIENDA debe coincidir con verificador de sucursales". # La opp pertenece a la sucursal `loc_id`, así que se toma el valor del # verificador para esa location, no lo que digan los campos de origen. verifier_entry = verifier_map.get(loc_id) if verifier_entry: # Detectar caso "cross-sucursal": la opp tiene un Sucursal o TIENDA # poblado que NO coincide con la sucursal que la aloja. Puede ser # cliente transferido entre sucursales o dato sucio — vale la pena # marcarlo para revisión manual. existing_sucursal = (so_custom_fields_by_name.get("Sucursal") or "").strip() existing_tienda = ( so_custom_fields_by_name.get("TIENDA") or so_custom_fields_by_name.get("Tienda") or "" ).strip() target_sucursal = verifier_entry["sucursal"] target_tienda = verifier_entry["tienda"] sucursal_mismatch = ( existing_sucursal and existing_sucursal != target_sucursal and existing_sucursal != target_tienda ) tienda_mismatch = ( existing_tienda and existing_tienda != target_tienda and existing_tienda != target_sucursal ) if sucursal_mismatch or tienda_mismatch: _record_error( "cross_sucursal", f"[{b_name}] opp '{so.get('name')}' tiene Sucursal='{existing_sucursal}' " f"/ TIENDA='{existing_tienda}' pero está alojada en location {loc_id} " f"(verificador → '{target_sucursal}' / '{target_tienda}'). Se sobrescribe.", { "location_id": loc_id, "opportunity_id": so.get("id"), "existing_sucursal": existing_sucursal, "existing_tienda": existing_tienda, "target_sucursal": target_sucursal, "target_tienda": target_tienda, }, ) brand_opp_custom_values["Sucursal"] = target_sucursal brand_opp_custom_values["TIENDA"] = target_tienda brand_opp_custom_values["Tienda"] = target_tienda else: _record_error( "location_sin_verificador", f"[{b_name}] location_id '{loc_id}' no está en el verificador CSV; no se escribe Sucursal/TIENDA.", {"location_id": loc_id, "opportunity_id": so.get("id")}, ) # 5. Calcular y mapear el Vehículo (según Reglas de Monte Providencia) # Reglas: # - LEAD DIGITAL: construir desde Marca/Versión/Año del contacto, pero SOLO si # hay al menos marca o versión (solo año es información inútil que # sobrescribiría datos válidos en Marca). # - Otros canales: respetar el valor de la opp de sucursal; si no existe, # intentar el constructed_vehicle siempre que sea significativo. marca_veh = sc_custom_fields_by_name.get("Marca del Vehículo", "").strip() vers_veh = sc_custom_fields_by_name.get("Versión del Vehículo", "").strip() ano_veh = sc_custom_fields_by_name.get("Año del Vehículo", "").strip() vehicle_meaningful = has_meaningful_vehicle_info(marca_veh, vers_veh, ano_veh) constructed_vehicle = " ".join(filter(None, [marca_veh, vers_veh, ano_veh])).strip() if fuente == "LEAD DIGITAL" and constructed_vehicle and vehicle_meaningful: brand_opp_custom_values["Vehículo"] = constructed_vehicle else: vehiculo_val = so_custom_fields_by_name.get("Vehículo") if not vehiculo_val and vehicle_meaningful: vehiculo_val = constructed_vehicle if vehiculo_val: brand_opp_custom_values["Vehículo"] = vehiculo_val # 6. Modalidad de Empeño (doc MP: Sin Dejarlo (GPS) / Tradicional (Resguardo)) modalidad_raw = ( so_custom_fields_by_name.get("Modalidad de Empeño") or so_custom_fields_by_name.get("MODALIDAD DE EMPEÑO") or sc_custom_fields_by_name.get("Modalidad de Empeño") or sc_custom_fields_by_name.get("MODALIDAD DE EMPEÑO") or sc_custom_fields_by_name.get("¿Qué modalidad prefieres?") ) modalidad_val = normalize_modalidad(modalidad_raw) if modalidad_val: brand_opp_custom_values["Modalidad de Empeño"] = modalidad_val # 7. Persona que atendió al prospecto (copia directa desde sucursal) persona_val = ( so_custom_fields_by_name.get("Persona que atendió al prospecto") or so_custom_fields_by_name.get("Persona que atendio al prospecto") ) if persona_val: brand_opp_custom_values["Persona que atendió al prospecto"] = persona_val # 8. Visita a sucursal (checkbox) visita_val = ( so_custom_fields_by_name.get("Visita a sucursal") or so_custom_fields_by_name.get("Visita a Sucursal") ) if visita_val is not None and str(visita_val) != "": brand_opp_custom_values["Visita a sucursal"] = visita_val # 9. Fecha de última visita a sucursal fecha_visita_val = ( so_custom_fields_by_name.get("Fecha de última visita a sucursal") or so_custom_fields_by_name.get("Fecha de ultima visita a sucursal") ) if fecha_visita_val: brand_opp_custom_values["Fecha de última visita a sucursal"] = fecha_visita_val # Convertir los valores al formato de GHL [ {"id": "brand_f_id", "value": "value"} ] brand_custom_fields_list = [] for f_name, f_val in brand_opp_custom_values.items(): brand_f_id = brand_opp_schema.get(f_name) if brand_f_id: brand_custom_fields_list.append({ "id": brand_f_id, "value": f_val }) # 3. Acción de sincronización / actualización (PUT o POST) if matched_bo: used_brand_opp_ids.add(matched_bo.get("id")) # Validar si tiene discrepancias para actualizar (PUT) has_discrepancies = False changes = [] if (so["name"] or "").strip().lower() != (matched_bo.get("name") or "").strip().lower(): has_discrepancies = True changes.append(f"Nombre ('{matched_bo.get('name')}' -> '{so['name']}')") if so["status"] != matched_bo.get("status"): has_discrepancies = True status_changed = True changes.append(f"Estado ({matched_bo.get('status')} -> {so['status']})") else: status_changed = False if abs((so["monetary_value"] or 0) - (matched_bo.get("monetaryValue") or matched_bo.get("monetary_value") or 0)) >= 0.01: has_discrepancies = True changes.append(f"Valor (${matched_bo.get('monetaryValue') or matched_bo.get('monetary_value') or 0:.2f} -> ${so['monetary_value']:.2f})") if mapped_brand_sid != (matched_bo.get("pipelineStageId") or matched_bo.get("pipeline_stage_id")): has_discrepancies = True changes.append(f"Etapa ({matched_bo.get('pipelineStageId') or matched_bo.get('pipeline_stage_id')} -> {mapped_brand_sid})") # Comparar campos personalizados de la Marca con los calculados (usa cache) matched_bo_custom_fields_by_name = dict(_bo_cf(matched_bo)) for f_name, expected_val in brand_opp_custom_values.items(): current_val = matched_bo_custom_fields_by_name.get(f_name, "") norm_curr = str(current_val).strip().lower() norm_exp = str(expected_val).strip().lower() if norm_curr != norm_exp: has_discrepancies = True changes.append(f"Campo '{f_name}' ('{current_val}' -> '{expected_val}')") if has_discrepancies: if not allow_updates: safe_print(f" [SALTADO] Se omitió la actualización de la oportunidad '{matched_bo.get('name')}' (ID: {matched_bo.get('id')}) en Marca (Acción no permitida).") continue # Registrar cambio planeado en audit log (un row por campo cambiado) change_ids = [] if args.run_id: for f_name, expected_val in brand_opp_custom_values.items(): old_val = matched_bo_custom_fields_by_name.get(f_name) if str(old_val or "").strip().lower() != str(expected_val or "").strip().lower(): cf_id = brand_opp_schema.get(f_name) cid = script_audit.record_change( args.run_id, BRAND_LOCATION_ID, "opportunity", matched_bo.get("id"), cf_id or "", f_name, old_val, expected_val, ) if cid: change_ids.append((cid, f_name)) if args.dry_run: safe_print(f" [SIMULACION] Actualizaria oportunidad '{matched_bo.get('name')}' (ID: {matched_bo.get('id')}) en Marca. Cambios: {', '.join(changes)}") for cid, _f in change_ids: script_audit.mark_change(cid, "planned") total_opps_updated += 1 else: safe_print(f" Actualizando oportunidad '{matched_bo.get('name')}' (ID: {matched_bo.get('id')}) en Marca para alinear con Sucursal...") opp_data = { "name": so["name"], "monetaryValue": so["monetary_value"] or 0.0, "pipelineId": mapped_brand_pid, "pipelineStageId": mapped_brand_sid, "customFields": brand_custom_fields_list } try: apply_brand_opportunity_update( brand_token, matched_bo.get("id"), opp_data, (so["status"] or "open") if status_changed else None, ) safe_print(f" [OK] Actualizada correctamente.") total_opps_updated += 1 for cid, _f in change_ids: script_audit.mark_change(cid, "applied") matched_bo.update({ "name": so["name"], "status": so["status"] or "open", "monetaryValue": so["monetary_value"] or 0.0, "monetary_value": so["monetary_value"] or 0.0, "pipelineId": mapped_brand_pid, "pipeline_id": mapped_brand_pid, "pipelineStageId": mapped_brand_sid, "pipeline_stage_id": mapped_brand_sid, }) # Invalidar cache de cf para re-leer en próximas iteraciones brand_opp_cf_cache[matched_bo.get("id")] = dict(brand_opp_custom_values) except Exception as err: err_entry = _record_error( "update_opportunity_failed", f"[{b_name}] Fallo actualizar opp '{matched_bo.get('name')}' (id={matched_bo.get('id')}): {err}", {"location_id": loc_id, "brand_opportunity_id": matched_bo.get("id")}, exc=err, ) safe_print(f" [ERROR] Fallo actualizar en GHL: {err} (error_id={err_entry.get('error_id')})") for cid, _f in change_ids: script_audit.mark_change(cid, "failed", str(err)) else: # No existe en Marca, crearla! (POST) (si está permitido) if not allow_creations: safe_print(f" [SALTADO] Se omitió la creación de la oportunidad nueva '{so['name']}' en Marca Principal (Acción no permitida).") continue if args.dry_run: safe_print(f" [SIMULACION] Crearia oportunidad nueva '{so['name']}' en Marca Principal bajo el contacto ID '{bc_id}'.") if brand_custom_fields_list: cf_details = [f"{f_name}: '{f['value']}'" for f_name, f_val in brand_opp_custom_values.items() for f in brand_custom_fields_list if f['id'] == brand_opp_schema.get(f_name)] safe_print(f" [SIMULACION] Con Campos Personalizados: {', '.join(cf_details)}") total_opps_created += 1 else: safe_print(f" Creando oportunidad '{so['name']}' en Marca Principal...") opp_data = { "locationId": BRAND_LOCATION_ID, "name": so["name"], "status": so["status"] or "open", "monetaryValue": so["monetary_value"] or 0.0, "pipelineId": mapped_brand_pid, "pipelineStageId": mapped_brand_sid, "contactId": bc_id, "customFields": brand_custom_fields_list } try: res = sync_engine.ghl_client.create_opportunity(brand_token, opp_data) new_opp_id = res.get("opportunity", {}).get("id") target_status = so["status"] or "open" if new_opp_id and target_status != "open": sync_engine.ghl_client.update_opportunity_status(brand_token, new_opp_id, target_status) safe_print(f" [OK] Creada con éxito (Nuevo ID en Marca GHL: {new_opp_id}).") total_opps_created += 1 if new_opp_id and args.run_id: cid = script_audit.record_change( args.run_id, BRAND_LOCATION_ID, "opportunity", new_opp_id, "", "created", None, {"name": so["name"], "contact_id": bc_id, "source_branch": loc_id}, ) if cid: script_audit.mark_change(cid, "applied") if new_opp_id: new_bo = { "id": new_opp_id, "name": so["name"], "status": target_status, "monetaryValue": so["monetary_value"] or 0.0, "monetary_value": so["monetary_value"] or 0.0, "pipelineId": mapped_brand_pid, "pipeline_id": mapped_brand_pid, "pipelineStageId": mapped_brand_sid, "pipeline_stage_id": mapped_brand_sid, "contactId": bc_id, "contact_id": bc_id, "date_added": so.get("date_added"), } brand_opps_by_contact_id.setdefault(bc_id, []).append(new_bo) brand_opp_cf_cache[new_opp_id] = dict(brand_opp_custom_values) except Exception as err: if is_duplicate_opportunity_error(err): safe_print(" [SALTADO] GHL rechazó la creación porque el contacto ya tiene una oportunidad en Marca.") _record_error( "create_opportunity_duplicate", f"[{b_name}] Duplicado al crear opp '{so.get('name')}' (contact {bc_id})", {"location_id": loc_id, "contact_id": bc_id, "opportunity_name": so.get("name")}, ) else: err_entry = _record_error( "create_opportunity_failed", f"[{b_name}] Fallo crear opp '{so.get('name')}': {err}", {"location_id": loc_id, "contact_id": bc_id, "opportunity_name": so.get("name")}, exc=err, ) safe_print(f" [ERROR] Fallo crear en GHL: {err} (error_id={err_entry.get('error_id')})") safe_print("-" * 90) # Imprimir resumen de la ejecucion safe_print("\n" + "=" * 90) safe_print("=== RESUMEN DE RECONCILIACION Y SINCRONIZACION SUCURSAL -> MARCA ===") safe_print("=" * 90) if args.dry_run: safe_print(f" [SIMULACION] Contactos Nuevos a Crear en Marca: {total_contacts_created}") safe_print(f" [SIMULACION] Oportunidades Nuevas a Replicar: {total_opps_created}") safe_print(f" [SIMULACION] Oportunidades a Actualizar (PUT): {total_opps_updated}") else: safe_print(f" Contactos Nuevos Creados en Marca: {total_contacts_created}") safe_print(f" Oportunidades Nuevas Replicadas: {total_opps_created}") safe_print(f" Oportunidades Actualizadas (PUT): {total_opps_updated}") safe_print("-" * 90) safe_print("--- MÉTRICAS DE ESCANEO Y AUDITORÍA DE GHL ---") safe_print(f" Locations procesadas: {len(stats['locations_processed'])}") for loc in stats['locations_processed']: safe_print(f" - {loc}") safe_print(f" Páginas consultadas (GHL): {stats['pages_queried']}") safe_print(f" Registros recibidos desde GHL: {stats['records_received']}") safe_print(f" Registros únicos procesados: {stats['records_unique_processed']}") safe_print(f" Duplicados descartados: {stats['duplicates_discarded']}") if stats['errors']: safe_print(f" Errores por location o endpoint:") for loc_err, err_list in stats['errors'].items(): safe_print(f" - Location {loc_err}:") for err in err_list: safe_print(f" * {err}") else: safe_print(f" Errores por location o endpoint: Ninguno") safe_print("=" * 90) # --- RESUMEN DE ERRORES ACUMULADOS DURANTE LA EJECUCIÓN --- safe_print("\n" + "=" * 90) safe_print("=== ERRORES Y ADVERTENCIAS ACUMULADOS DURANTE LA EJECUCIÓN ===") safe_print("=" * 90) if not EXECUTION_ERRORS: safe_print(" (Sin errores ni advertencias)") else: by_cat = {} for e in EXECUTION_ERRORS: by_cat.setdefault(e["category"], []).append(e) safe_print(f" Total: {len(EXECUTION_ERRORS)} entradas en {len(by_cat)} categorías.\n") for cat, items in sorted(by_cat.items(), key=lambda kv: -len(kv[1])): safe_print(f" [{cat}] {len(items)} entrada(s):") for it in items[:50]: err_id_str = f" (error_id={it['error_id']})" if it.get("error_id") else "" safe_print(f" - {it['message']}{err_id_str}") if len(items) > 50: safe_print(f" ... y {len(items) - 50} más (truncadas).") safe_print("") safe_print("=" * 90 + "\n") # Registrar éxito en la auditoría si hay run_id if args.run_id: script_audit.update_run_status(args.run_id, "success") except Exception as e: safe_print(f"Error critico en la ejecucion del script de reconciliacion: {e}") if args.run_id: script_audit.update_run_status(args.run_id, "failed", error_message=str(e)) sys.exit(1) finally: conn.close() if __name__ == "__main__": main()