Files
MP-Manager/scripts/reconcile_and_sync_opportunities.py
2026-05-30 14:31:19 -06:00

1327 lines
66 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Reconcile and sync opportunities from Branches to Brand with 1-hour margin of past Latency and Fuzzy Matching."""
import argparse
import csv
import os
import sys
import sqlite3
import json
import re
import time
import unicodedata
from datetime import datetime
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
import sync_engine
import db
import error_logging
import script_audit
from paths import DB_PATH
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
# Verificador CSV: fuente AUTORITATIVA de Sucursal y TIENDA por location_id.
# Misma fuente que usa scripts/fill_sucursal_tienda_from_location.py.
# Formato:
# Sucursal = "Atlacomulco, Estado de México" (texto completo)
# TIENDA = "ATLACOMULCO" (código corto)
VERIFIER_FILENAME = "Monte Providencia - Verificador de sucursales y correos - Sucursales.csv"
# Valores válidos del campo Modalidad de Empeño (doc MP).
MODALIDAD_EMPENO_VALUES = ["Sin Dejarlo (GPS)", "Tradicional (Resguardo)"]
# Acumulador global de errores de ejecución para imprimir al final del run.
EXECUTION_ERRORS = []
# Campos de oportunidad con LOGICA DE NEGOCIO DEDICADA mas abajo (fallbacks,
# verificador de Sucursal/TIENDA, construccion de Vehiculo, normalizacion de
# Modalidad, etc.). Se excluyen de la copia dinamica generica para no pisar esa
# logica; los demas custom fields (incluido "ID Oportunidad Sucursal" y cualquier
# campo nuevo) se copian tal cual sucursal -> Marca.
_SPECIAL_OVERRIDE_FIELDS_NORM = {
"canal de origen", "fuente de prospecto", "sucursal", "tienda", "vehiculo",
"modalidad de empeno", "persona que atendio al prospecto",
"visita a sucursal", "fecha de ultima visita a sucursal",
}
def _norm_field_name(name):
"""Normaliza nombre de campo: sin acentos, minusculas, espacios colapsados."""
s = unicodedata.normalize("NFKD", str(name or ""))
s = "".join(c for c in s if not unicodedata.combining(c))
return " ".join(s.lower().split())
def _record_error(category, message, context=None, exc=None):
"""Acumula errores para el resumen final + persiste en error_log si hay excepción."""
entry = {
"category": category,
"message": message,
"context": context or {},
"error_id": None,
}
if exc is not None:
try:
entry["error_id"] = error_logging.log_error(category, exc, context or {})
except Exception:
entry["error_id"] = None
EXECUTION_ERRORS.append(entry)
return entry
def load_verifier_map():
"""
Carga el verificador CSV como fuente AUTORITATIVA de Sucursal y TIENDA.
Retorna {location_id: {"sucursal": str, "tienda": str, "sc_name": str}}.
Ignora la cuenta principal (SUCURSAL == "-") y filas sin TIENDA válido.
"""
path = os.path.join(ROOT_DIR, VERIFIER_FILENAME)
if not os.path.exists(path):
raise FileNotFoundError(f"Verificador no encontrado: {path}")
result = {}
with open(path, mode="r", encoding="utf-8-sig", newline="") as fh:
for row in csv.DictReader(fh):
loc_id = (row.get("ID LOCATION BUCEFALO") or "").strip()
sucursal = (row.get("SUCURSAL") or "").strip()
tienda = (row.get("TIENDA") or "").strip()
sc_name = (row.get("SC BUCEFALO") or "").strip()
if not loc_id or sucursal == "-" or tienda in ("-", "CUENTA PRINCIPAL", ""):
continue
if loc_id not in result:
result[loc_id] = {"sucursal": sucursal, "tienda": tienda, "sc_name": sc_name}
return result
def normalize_modalidad(raw_value):
"""Devuelve el valor canónico de Modalidad de Empeño o el original si no se reconoce."""
if not raw_value:
return None
text = remove_accents(str(raw_value)).lower()
if "gps" in text or "sin dejar" in text:
return "Sin Dejarlo (GPS)"
if "resguardo" in text or "tradicional" in text or "dejar auto" in text:
return "Tradicional (Resguardo)"
return str(raw_value)
def has_meaningful_vehicle_info(marca, version, ano):
"""Vehículo válido = al menos marca o versión. Solo año no es información útil."""
has_text = bool((marca or "").strip()) or bool((version or "").strip())
return has_text
def safe_print(*args, **kwargs):
sep = kwargs.get("sep", " ")
end = kwargs.get("end", "\n")
text = sep.join(str(arg) for arg in args)
encoding = sys.stdout.encoding or "utf-8"
try:
sys.stdout.write(text + end)
sys.stdout.flush()
except UnicodeEncodeError:
try:
safe_text = text.encode(encoding, errors="replace").decode(encoding)
sys.stdout.write(safe_text + end)
sys.stdout.flush()
except Exception:
try:
safe_text = text.encode("ascii", errors="replace").decode("ascii")
sys.stdout.write(safe_text + end)
sys.stdout.flush()
except Exception:
pass
def normalize_phone(phone):
if not phone:
return ""
return re.sub(r"\D+", "", str(phone))
def get_clean_phone_10d(phone):
norm = re.sub(r"\D+", "", str(phone or ""))
if len(norm) >= 10:
return norm[-10:]
return norm
def remove_accents(text):
if not text:
return ""
accents_map = {
'á': 'a', 'é': 'e', 'í': 'i', 'ó': 'o', 'ú': 'u',
'ü': 'u', 'ñ': 'n',
'Á': 'a', 'É': 'e', 'Í': 'i', 'Ó': 'o', 'Ú': 'u',
'Ü': 'u', 'Ñ': 'n'
}
clean_chars = []
for char in text:
clean_chars.append(accents_map.get(char, char.lower()))
cleaned = "".join(clean_chars)
cleaned = re.sub(r"[^\w\s]", "", cleaned) # quitar puntuación y caracteres corruptos de codificación
return " ".join(cleaned.split())
def extract_tienda_label(account_nombre):
if not account_nombre:
return None
parts = str(account_nombre).split(" - ")
if len(parts) < 3:
return None
raw = " - ".join(parts[2:]).strip()
return raw.upper() if raw else None
def fetch_opportunities_paginated(token, location_id):
"""
Obtiene todas las oportunidades con paginación completa, acumulando y
deduplicando por ID en cumplimiento de la guía de Monte Providencia.
Sin tope de cantidad: solo corta la paginación nativa de GHL
(batch vacío repetido, batch menor al limit o total alcanzado).
Retorna (unique_opps, pages_queried, raw_received, duplicates_discarded, errors).
"""
opportunities = []
seen_ids = set()
pages_queried = 0
raw_received = 0
duplicates_discarded = 0
errors = []
page = 1
limit = 100
empty_pages = 0
while True:
pages_queried += 1
try:
data = sync_engine.ghl_client.search_opportunities(token, location_id, limit=limit, page=page)
except Exception as e:
err_msg = str(e)
errors.append(f"Fallo en pagina {page}: {err_msg}")
_record_error(
"ghl_get_all_opportunities_page_failed",
f"Fallo paginación opps en {location_id} pagina {page}: {err_msg}",
{"location_id": location_id, "page": page, "accumulated_opps": len(opportunities)},
exc=e,
)
break
batch = data.get("opportunities", []) or []
if not batch:
empty_pages += 1
if empty_pages >= 2:
break
page += 1
continue
empty_pages = 0
raw_received += len(batch)
for opp in batch:
opp_id = opp.get("id")
if not opp_id:
continue
if opp_id in seen_ids:
duplicates_discarded += 1
continue
seen_ids.add(opp_id)
opportunities.append(opp)
total_reported = data.get("total", 0) or 0
if len(batch) < limit or (total_reported > 0 and len(opportunities) >= total_reported):
break
page += 1
return opportunities, pages_queried, raw_received, duplicates_discarded, errors
def normalize_match_text(text):
return remove_accents(str(text or "")).strip().lower()
def custom_fields_by_name(obj, id_to_name):
values = {}
for cf in obj.get("customFields", []) or []:
cf_id = cf.get("id")
cf_val = cf.get("fieldValueString") or cf.get("value")
cf_name = id_to_name.get(cf_id)
if cf_name and cf_val is not None:
values[cf_name] = cf_val
return values
def branch_affinity_score(brand_custom_fields_by_name, branch_values):
expected_values = [normalize_match_text(v) for v in branch_values if normalize_match_text(v)]
if not expected_values:
return 0
brand_values = []
for field_name in ("Sucursal", "TIENDA", "Tienda"):
value = brand_custom_fields_by_name.get(field_name)
if value:
brand_values.append(normalize_match_text(value))
if not brand_values:
return 0
for brand_value in brand_values:
for expected in expected_values:
if brand_value == expected or brand_value in expected or expected in brand_value:
return 30
return -25
def parse_date_string(date_str):
if not date_str:
return None
clean_str = date_str.replace("T", " ").replace("Z", "")
if "." in clean_str:
clean_str = clean_str.split(".")[0]
# Cortar timezone offsets si existen (+00:00, -06:00, etc.)
clean_str = re.split(r"[\+\-]\d{2}", clean_str)[0].strip()
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%d/%m/%Y %H:%M:%S"):
try:
return datetime.strptime(clean_str, fmt)
except Exception:
pass
return None
def are_dates_within_one_hour(date_str1, date_str2):
d1 = parse_date_string(date_str1)
d2 = parse_date_string(date_str2)
if not d1 or not d2:
return False
diff = abs((d1 - d2).total_seconds())
return diff <= 3600 # 3600 seconds = 1 hour
def calculate_opportunity_similarity(so, bo, brand_custom_fields_by_name=None, branch_values=None):
"""
Calcula un puntaje de similitud (0 a 100) entre la oportunidad de sucursal (so)
y la de marca (bo) para encontrar el par mas semejante.
"""
if so["id"] == bo["id"]:
return 100
score = 0
so_name = normalize_match_text(so.get("name"))
bo_name = normalize_match_text(bo.get("name"))
# 1. Match de Nombre Exacto
if so_name == bo_name:
score += 50
# 2. Match de Nombre Parcial
elif so_name and bo_name:
if so_name in bo_name or bo_name in so_name:
score += 30
else:
# Overlap de palabras
so_words = set(so_name.split())
bo_words = set(bo_name.split())
overlap = so_words.intersection(bo_words)
if len(overlap) > 0:
score += min(25, len(overlap) * 8)
# 3. Match de Valor Monetario
so_val = so["monetary_value"] or 0.0
bo_val = bo["monetary_value"] or 0.0
if abs(so_val - bo_val) < 0.01:
score += 15
elif max(so_val, bo_val) > 0 and abs(so_val - bo_val) / max(1.0, so_val, bo_val) < 0.2:
score += 10
# 4. Match de Estado (Status)
if so["status"] == bo["status"]:
score += 10
# 5. Proximidad temporal de creación
if are_dates_within_one_hour(so.get("date_added"), bo.get("date_added") or bo.get("createdAt")):
score += 15
# 6. Afinidad de sucursal/TIENDA cuando la oportunidad de Marca ya la tiene poblada
if brand_custom_fields_by_name is not None:
score += branch_affinity_score(brand_custom_fields_by_name, branch_values or [])
return score
def apply_brand_opportunity_update(brand_token, opportunity_id, opp_data, target_status=None):
"""
GHL expone el cambio de estado en un endpoint dedicado. El PUT general puede
aceptar otros campos sin reflejar won/lost/abandoned en los reportes.
"""
sync_engine.ghl_client.update_opportunity(brand_token, opportunity_id, opp_data)
if target_status:
sync_engine.ghl_client.update_opportunity_status(brand_token, opportunity_id, target_status)
def is_duplicate_opportunity_error(err):
response = getattr(err, "response", None)
if response is None or response.status_code != 400:
return False
return "duplicate opportunity" in (response.text or "").lower()
def _find_stage_by_name(brand_stages, target_name):
target_norm = remove_accents(target_name).strip().lower()
for bs in brand_stages:
if remove_accents(bs.get("name", "")).strip().lower() == target_norm:
return bs
return None
def map_stage_branch_to_brand(conn, branch_loc_id, branch_pid, branch_sid, brand_pipelines):
"""
Mapea dinámicamente un pipeline_stage_id de sucursal al pipeline_stage_id de Marca.
Estrategia:
1. Pipeline destino = "Standar" / "Standard" (o el primero como fallback).
2. Stage destino = match por nombre de etapa (sin acentos, case-insensitive).
3. Fallback seguro = etapa "PROSPECTO NUEVO" en Marca (alineado con doc MP:
"oportunidades huérfanas se reubican o marcan para revisión").
4. Si tampoco existe "PROSPECTO NUEVO", primera etapa del pipeline + warning.
Retorna (brand_pipeline_id, brand_stage_id, warning_str_or_None).
"""
if not brand_pipelines:
return None, None, "Marca no tiene pipelines cargados"
brand_pipe = None
for bp in brand_pipelines:
bp_name = bp.get("name", "").lower()
if "standar" in bp_name or "standard" in bp_name:
brand_pipe = bp
break
if not brand_pipe:
brand_pipe = brand_pipelines[0]
brand_pid_mapped = brand_pipe["id"]
brand_stages = brand_pipe.get("stages", [])
if not brand_stages:
return brand_pid_mapped, None, f"Pipeline de Marca '{brand_pipe.get('name')}' no tiene etapas"
# 1. Match por nombre exacto
branch_stages_list = []
try:
row = conn.execute(
"SELECT stages_json FROM pipelines WHERE location_id = ? AND id = ?",
(branch_loc_id, branch_pid)
).fetchone()
if row:
try:
branch_stages_list = json.loads(row["stages_json"])
branch_stages_list = sorted(branch_stages_list, key=lambda x: x.get("order", 0))
except Exception:
pass
except Exception:
pass
branch_stage_name = None
if branch_stages_list:
for s in branch_stages_list:
if s["id"] == branch_sid:
branch_stage_name = s.get("name")
break
if branch_stage_name:
matched = _find_stage_by_name(brand_stages, branch_stage_name)
if matched:
return brand_pid_mapped, matched["id"], None
# 2. Fallback seguro: PROSPECTO NUEVO
prospecto_nuevo = _find_stage_by_name(brand_stages, "PROSPECTO NUEVO")
if prospecto_nuevo:
warning = (
f"Stage de sucursal '{branch_stage_name or branch_sid}' sin match por nombre en Marca; "
f"se reubica en 'PROSPECTO NUEVO' (revisar)."
)
return brand_pid_mapped, prospecto_nuevo["id"], warning
# 3. Última opción: primera etapa
warning = (
f"Stage '{branch_stage_name or branch_sid}' sin match y no existe 'PROSPECTO NUEVO' en Marca; "
f"se usa la primera etapa '{brand_stages[0].get('name')}' (revisar manualmente)."
)
return brand_pid_mapped, brand_stages[0]["id"], warning
def main():
parser = argparse.ArgumentParser(description="Reconcilia y sincroniza oportunidades de sucursales a la cuenta de Marca con prioridades de busqueda.")
parser.add_argument("--dry-run", action="store_true", help="Simula los cambios sin realizar peticiones de escritura a GHL.")
parser.add_argument("--location", help="ID de ubicacion especifica para procesar (por defecto todas las sucursales)")
parser.add_argument("--run-id", help="ID de auditoria para la ejecucion del script")
parser.add_argument("--updates-only", action="store_true", help="Solo realiza actualizaciones (PUT) de oportunidades existentes, ignorando la creación de contactos y de nuevas oportunidades.")
parser.add_argument("--no-contacts", action="store_true", help="Evita crear nuevos contactos en la Marca Principal.")
parser.add_argument("--no-creations", action="store_true", help="Evita crear nuevas oportunidades (POST) en la Marca Principal.")
parser.add_argument("--no-updates", action="store_true", help="Evita realizar actualizaciones (PUT) de oportunidades existentes.")
args = parser.parse_args()
# Resolver selección de acciones permitidas
only_updates = args.updates_only
allow_contacts = not (args.no_contacts or only_updates)
allow_creations = not (args.no_creations or only_updates)
allow_updates = not args.no_updates
# Registrar el inicio en la auditoría si hay run_id
if args.run_id:
script_audit.update_run_status(args.run_id, "running")
safe_print("\n" + "=" * 90)
safe_print("RECONCILIACION Y SINCRONIZACION DE OPORTUNIDADES: SUCURSALES -> MARCA")
safe_print("=" * 90)
if args.dry_run:
safe_print("MODO SIMULACION (DRY-RUN) - No se realizaran escrituras en GHL\n")
if not os.path.exists(DB_PATH):
safe_print(f"Error: La base de datos local no existe en {DB_PATH}.")
if args.run_id:
script_audit.update_run_status(args.run_id, "failed", error_message="SQLite database missing")
sys.exit(1)
# 1. Cargar catálogo de cuentas y tokens desde Bucéfalo
try:
accounts = sync_engine.parse_accounts_csv()
brand_acc = next((a for a in accounts if a["location_id"] == BRAND_LOCATION_ID), None)
if not brand_acc:
raise Exception("No se encontro la cuenta de Marca Principal en el CSV de cuentas.")
brand_token = brand_acc["token"]
except Exception as e:
safe_print(f"Error crítico al cargar tokens: {e}")
if args.run_id:
script_audit.update_run_status(args.run_id, "failed", error_message=str(e))
sys.exit(1)
# 2. Cargar verificador CSV (fuente autoritativa de Sucursal/TIENDA por location_id)
try:
verifier_map = load_verifier_map()
safe_print(f" Verificador CSV cargado: {len(verifier_map)} sucursales con Sucursal/TIENDA canónicos.")
except Exception as e:
safe_print(f"Error crítico al cargar verificador CSV: {e}")
if args.run_id:
script_audit.update_run_status(args.run_id, "failed", error_message=str(e))
sys.exit(1)
# 2b. Reportar sucursales activas (no demo) del CSV de mesa de control que NO están
# en el verificador. Estas opps no recibirán corrección automática de Sucursal/TIENDA.
branch_accounts_all = [
a for a in accounts
if a["location_id"] != BRAND_LOCATION_ID and "demo" not in a["nombre"].lower()
]
missing_in_verifier = [a for a in branch_accounts_all if a["location_id"] not in verifier_map]
if missing_in_verifier:
safe_print(f" [ADVERTENCIA] {len(missing_in_verifier)} sucursales del CSV de mesa de control NO están en el verificador:")
for acc in missing_in_verifier:
safe_print(f" - {acc['location_id']}: {acc['nombre']}")
_record_error(
"sucursal_sin_entrada_en_verificador",
f"Sucursal '{acc['nombre']}' ({acc['location_id']}) no está en el verificador CSV; "
f"sus opps no recibirán corrección automática de Sucursal/TIENDA.",
{"location_id": acc["location_id"], "nombre": acc["nombre"]},
)
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
# Inicializar estadísticas requeridas por la documentación de Monte Providencia
stats = {
"locations_processed": [],
"pages_queried": 0,
"records_received": 0,
"records_unique_processed": 0,
"duplicates_discarded": 0,
"errors": {}
}
# Cargar esquemas dinámicos para la Marca Principal
safe_print(" Obteniendo esquemas dinámicos de Marca Principal...")
brand_contact_schema = sync_engine.ghl_client.get_object_schema(brand_token, BRAND_LOCATION_ID, "contact")
brand_opp_schema = sync_engine.ghl_client.get_object_schema(brand_token, BRAND_LOCATION_ID, "opportunity")
# Cargar contactos de Marca en SQLite
brand_contacts = conn.execute("SELECT * FROM contacts WHERE location_id = ?", (BRAND_LOCATION_ID,)).fetchall()
# Cargar pipelines de Marca de SQLite para el mapeo dinámico de etapas
brand_pipelines_rows = conn.execute("SELECT * FROM pipelines WHERE location_id = ?", (BRAND_LOCATION_ID,)).fetchall()
brand_pipelines = []
for r in brand_pipelines_rows:
stages = []
try:
stages = json.loads(r["stages_json"])
stages = sorted(stages, key=lambda x: x.get("order", 0))
except Exception:
pass
brand_pipelines.append({
"id": r["id"],
"name": r["name"],
"stages": stages
})
# Cache invertido de schema de oportunidades de Marca (cf_id -> cf_name).
# Antes se reconstruía 4 veces por oportunidad dentro del loop principal.
brand_opp_name_by_id = {v: k for k, v in brand_opp_schema.items()}
# Cargar oportunidades vivas de Marca de GHL para tener custom fields
safe_print(f" Obteniendo oportunidades vivas de Marca Principal...")
stats["locations_processed"].append(f"Marca Principal ({BRAND_LOCATION_ID})")
try:
live_brand_opps, b_pages, b_raw, b_dups, b_errs = fetch_opportunities_paginated(
brand_token, BRAND_LOCATION_ID
)
stats["pages_queried"] += b_pages
stats["records_received"] += b_raw
stats["duplicates_discarded"] += b_dups
if b_errs:
stats["errors"][BRAND_LOCATION_ID] = b_errs
brand_opps_rows = []
for lo in live_brand_opps:
norm_opp = dict(lo)
norm_opp["pipeline_id"] = lo.get("pipelineId")
norm_opp["pipeline_stage_id"] = lo.get("pipelineStageId")
norm_opp["monetary_value"] = lo.get("monetaryValue")
norm_opp["contact_id"] = lo.get("contactId")
norm_opp["date_added"] = lo.get("createdAt") or lo.get("date_added")
brand_opps_rows.append(norm_opp)
stats["records_unique_processed"] += len(brand_opps_rows)
except Exception as e:
safe_print(f" [WARN] No se pudieron obtener las oportunidades vivas de la Marca GHL, usando base de datos local: {e}")
_record_error("ghl_get_brand_opportunities_failed", str(e), {"location_id": BRAND_LOCATION_ID}, exc=e)
stats["errors"][BRAND_LOCATION_ID] = [str(e)]
sqlite_opps = conn.execute("SELECT * FROM opportunities WHERE location_id = ?", (BRAND_LOCATION_ID,)).fetchall()
brand_opps_rows = [dict(row) for row in sqlite_opps]
stats["records_unique_processed"] += len(brand_opps_rows)
# Pre-computar custom fields por nombre para CADA brand opportunity (cache por bo id).
brand_opp_cf_cache = {}
for bo in brand_opps_rows:
brand_opp_cf_cache[bo.get("id")] = custom_fields_by_name(bo, brand_opp_name_by_id)
# Indexar contactos de la Marca Principal con prioridad estricta:
# 1. Teléfono (últimos 10 dígitos), 2. Email, 3. Nombre Completo Normalizado (sin acentos ni caracteres corruptos)
brand_contacts_by_phone = {}
brand_contacts_by_email = {}
brand_contacts_by_name = {}
for bc in brand_contacts:
phone_10d = get_clean_phone_10d(bc["phone"])
email = (bc["email"] or "").strip().lower()
name_norm = remove_accents(f"{bc['first_name'] or ''} {bc['last_name'] or ''}")
if phone_10d:
brand_contacts_by_phone[phone_10d] = dict(bc)
if email:
brand_contacts_by_email[email] = dict(bc)
if name_norm:
brand_contacts_by_name[name_norm] = dict(bc)
# Indexar oportunidades de la Marca por contact_id
brand_opps_by_contact_id = {}
for bo in brand_opps_rows:
contact_key = bo.get("contact_id") or bo.get("contactId")
brand_opps_by_contact_id.setdefault(contact_key, []).append(dict(bo))
used_brand_opp_ids_by_contact = {}
# Determinar qué sucursales procesar
if args.location:
branch_accounts = [a for a in accounts if a["location_id"] == args.location and a["location_id"] != BRAND_LOCATION_ID]
if not branch_accounts:
safe_print(f"Error: La sucursal {args.location} no existe en el catalogo.")
sys.exit(1)
else:
branch_accounts = [a for a in accounts if a["location_id"] != BRAND_LOCATION_ID and "demo" not in a["nombre"].lower()]
safe_print(f"Iniciando reconciliacion para {len(branch_accounts)} sucursal(es)...")
safe_print("-" * 90)
total_contacts_created = 0
total_opps_created = 0
total_opps_updated = 0
for b_acc in branch_accounts:
loc_id = b_acc["location_id"]
b_name = b_acc["nombre"]
branch_token = b_acc["token"]
# Respetar cancelación / pausa del dashboard antes de procesar cada sucursal.
if args.run_id and not script_audit.wait_if_paused_or_stopped(args.run_id):
safe_print(f"[CANCELADO] Ejecución detenida por el usuario antes de procesar {b_name}.")
break
safe_print(f"Procesando sucursal: '{b_name}' ({loc_id})...")
# Cargar esquemas dinámicos para la Sucursal (GET /objects/)
safe_print(f" Obteniendo esquemas dinámicos de Sucursal '{b_name}'...")
try:
branch_contact_schema = sync_engine.ghl_client.get_object_schema(branch_token, loc_id, "contact")
branch_opp_schema = sync_engine.ghl_client.get_object_schema(branch_token, loc_id, "opportunity")
except Exception as e:
_record_error("ghl_get_object_schema_failed", f"Fallo obteniendo schemas en {b_name}: {e}", {"location_id": loc_id}, exc=e)
stats["errors"].setdefault(loc_id, []).append(f"Schema fetch falló: {e}")
continue
branch_contact_id_to_name = {v: k for k, v in branch_contact_schema.items()}
branch_opp_id_to_name = {v: k for k, v in branch_opp_schema.items()}
# Cargar contactos de esta sucursal en SQLite
branch_contacts = conn.execute("SELECT * FROM contacts WHERE location_id = ?", (loc_id,)).fetchall()
branch_contacts_by_id = { bc["id"]: dict(bc) for bc in branch_contacts }
# Cargar oportunidades vivas de GHL para tener customFields
safe_print(f" Obteniendo oportunidades vivas de GHL...")
stats["locations_processed"].append(f"{b_name} ({loc_id})")
try:
live_branch_opps, b_pages, b_raw, b_dups, b_errs = fetch_opportunities_paginated(
branch_token, loc_id
)
stats["pages_queried"] += b_pages
stats["records_received"] += b_raw
stats["duplicates_discarded"] += b_dups
if b_errs:
stats["errors"][loc_id] = b_errs
branch_opps = []
for lo in live_branch_opps:
norm_opp = dict(lo)
norm_opp["pipeline_id"] = lo.get("pipelineId")
norm_opp["pipeline_stage_id"] = lo.get("pipelineStageId")
norm_opp["monetary_value"] = lo.get("monetaryValue")
norm_opp["contact_id"] = lo.get("contactId")
norm_opp["date_added"] = lo.get("createdAt") or lo.get("date_added")
branch_opps.append(norm_opp)
stats["records_unique_processed"] += len(branch_opps)
except Exception as e:
safe_print(f" [WARN] No se pudieron obtener las oportunidades vivas de GHL, usando base de datos local: {e}")
_record_error("ghl_get_branch_opportunities_failed", str(e), {"location_id": loc_id}, exc=e)
stats["errors"][loc_id] = [str(e)]
sqlite_opps = conn.execute("SELECT * FROM opportunities WHERE location_id = ?", (loc_id,)).fetchall()
branch_opps = [dict(row) for row in sqlite_opps]
stats["records_unique_processed"] += len(branch_opps)
if not branch_opps:
safe_print(" (No hay oportunidades para esta sucursal)")
continue
safe_print(f" Analizando {len(branch_opps)} oportunidades de sucursal...")
branch_opp_count_by_contact_id = {}
for branch_opp in branch_opps:
branch_opp_count_by_contact_id[branch_opp.get("contact_id")] = branch_opp_count_by_contact_id.get(branch_opp.get("contact_id"), 0) + 1
for so in branch_opps:
contact_id = so["contact_id"]
sc = branch_contacts_by_id.get(contact_id)
if not sc:
continue # Oportunidad huérfana de contacto en la sucursal misma
client_name = f"{sc['first_name'] or ''} {sc['last_name'] or ''}".strip() or "Sin nombre"
client_phone = sc["phone"] or ""
client_email = sc["email"] or ""
# Extraer campos personalizados del contacto de la sucursal por nombre
# (necesario antes de la creación para incluirlos en el POST)
sc_custom_fields_by_name = {}
if sc and sc.get("custom_fields_json"):
try:
cf_list = json.loads(sc["custom_fields_json"])
for cf in cf_list:
cf_id = cf.get("id")
cf_val = cf.get("value")
cf_name = branch_contact_id_to_name.get(cf_id)
if cf_name and cf_val is not None:
sc_custom_fields_by_name[cf_name] = cf_val
except Exception:
pass
# 1. Encontrar o crear el contacto en Marca Principal siguiendo la prioridad estricta:
# Prioridad 1: Teléfono (últimos 10 dígitos)
# Prioridad 2: Email
# Prioridad 3: Nombre Completo Normalizado (sin acentos ni caracteres corruptos)
norm_p_10d = get_clean_phone_10d(client_phone)
norm_client_name = remove_accents(client_name)
bc = None
match_method = ""
if norm_p_10d and norm_p_10d in brand_contacts_by_phone:
bc = brand_contacts_by_phone[norm_p_10d]
match_method = "TELEFONO"
elif client_email and client_email.lower() in brand_contacts_by_email:
bc = brand_contacts_by_email[client_email.lower()]
match_method = "EMAIL"
elif norm_client_name and norm_client_name in brand_contacts_by_name:
bc = brand_contacts_by_name[norm_client_name]
match_method = "NOMBRE"
bc_id = None
if not bc:
# El contacto no existe en Marca, hay que crearlo! (si está permitido)
if not allow_contacts:
safe_print(f" [SALTADO] Se omitió la creación de contacto central para: '{client_name}' en Marca Principal (Acción no permitida).")
continue
if args.dry_run:
safe_print(f" [SIMULACION] Crearia contacto central para: '{client_name}' en Marca Principal (No se encontro por Telefono, Email ni Nombre).")
bc_id = "SIMULATED_BRAND_CONTACT_ID"
total_contacts_created += 1
else:
safe_print(f" Creando contacto central para: '{client_name}' en Marca Principal...")
# Mapear custom fields del contacto de sucursal a los IDs del schema de Marca
contact_custom_fields = []
for f_name, f_val in sc_custom_fields_by_name.items():
brand_cf_id = brand_contact_schema.get(f_name)
if brand_cf_id and f_val is not None and str(f_val).strip():
contact_custom_fields.append({"id": brand_cf_id, "value": f_val})
contact_data = {
"locationId": BRAND_LOCATION_ID,
"firstName": sc["first_name"],
"lastName": sc["last_name"],
"email": sc["email"],
"phone": sc["phone"]
}
if contact_custom_fields:
contact_data["customFields"] = contact_custom_fields
try:
res = sync_engine.ghl_client.create_contact(brand_token, contact_data)
bc_id = res.get("contact", {}).get("id")
if not bc_id:
raise Exception("Respuesta de GHL sin id de contacto")
total_contacts_created += 1
if args.run_id:
cid = script_audit.record_change(
args.run_id, BRAND_LOCATION_ID, "contact", bc_id,
"", "created", None,
{"phone": sc["phone"], "email": sc["email"], "name": client_name},
)
if cid:
script_audit.mark_change(cid, "applied")
# Agregar temporalmente a nuestro indice en memoria
new_contact = {"id": bc_id, "first_name": sc["first_name"], "last_name": sc["last_name"], "email": sc["email"], "phone": sc["phone"]}
if norm_p_10d: brand_contacts_by_phone[norm_p_10d] = new_contact
if client_email: brand_contacts_by_email[client_email.lower()] = new_contact
if norm_client_name: brand_contacts_by_name[norm_client_name] = new_contact
except Exception as err:
err_entry = _record_error(
"create_brand_contact_failed",
f"[{b_name}] Fallo crear contacto '{client_name}' en Marca: {err}",
{"location_id": loc_id, "branch_contact_id": contact_id, "client_name": client_name},
exc=err,
)
safe_print(f" * Error creando contacto '{client_name}' en Marca: {err} (error_id={err_entry.get('error_id')})")
continue
else:
bc_id = bc["id"]
if not bc_id:
continue
# --- MAPEO DINÁMICO DE CAMPOS PERSONALIZADOS ---
# Extraer campos personalizados de la oportunidad de la sucursal por nombre
so_custom_fields_by_name = custom_fields_by_name(so, branch_opp_id_to_name)
# sc_custom_fields_by_name ya fue extraído antes de la creación del contacto
branch_match_values = [
b_name,
loc_id,
so_custom_fields_by_name.get("Sucursal"),
so_custom_fields_by_name.get("TIENDA") or so_custom_fields_by_name.get("Tienda"),
sc_custom_fields_by_name.get("Sucursal"),
sc_custom_fields_by_name.get("TIENDA") or sc_custom_fields_by_name.get("Tienda"),
]
# 2. Buscar si la oportunidad ya existe en la Marca Principal bajo este contacto
brand_opps_for_contact = brand_opps_by_contact_id.get(bc_id, [])
used_brand_opp_ids = used_brand_opp_ids_by_contact.setdefault(bc_id, set())
available_brand_opps_for_contact = [
bo for bo in brand_opps_for_contact
if bo.get("id") not in used_brand_opp_ids
]
def _bo_cf(bo):
bo_id = bo.get("id")
cached = brand_opp_cf_cache.get(bo_id)
if cached is None:
cached = custom_fields_by_name(bo, brand_opp_name_by_id)
brand_opp_cf_cache[bo_id] = cached
return cached
matched_bo = None
# Estrategia de conciliación robusta multicapa:
# Capa 1. Mismo ID de oportunidad en GHL
for bo in available_brand_opps_for_contact:
if bo.get("id") == so.get("id"):
matched_bo = bo
break
# Capa 2. Coincidencia por Nombre Exacto de oportunidad, desempatada por sucursal/fecha/valor
if not matched_bo:
exact_name_matches = [
bo for bo in available_brand_opps_for_contact
if normalize_match_text(bo.get("name")) and normalize_match_text(bo.get("name")) == normalize_match_text(so.get("name"))
]
if exact_name_matches:
matched_bo = max(
exact_name_matches,
key=lambda bo: calculate_opportunity_similarity(so, bo, _bo_cf(bo), branch_match_values)
)
# Capa 2.5. Única oportunidad, solo si también hay una sola oportunidad en la sucursal para ese contacto.
if (
not matched_bo
and len(available_brand_opps_for_contact) == 1
and branch_opp_count_by_contact_id.get(contact_id, 0) == 1
):
candidate_bo = available_brand_opps_for_contact[0]
candidate_score = calculate_opportunity_similarity(
so, candidate_bo, _bo_cf(candidate_bo), branch_match_values
)
if candidate_score >= 25:
matched_bo = candidate_bo
# Capa 3. Proximidad de creación (rango temporal de 1 hora)
if not matched_bo:
for bo in available_brand_opps_for_contact:
if are_dates_within_one_hour(so["date_added"], bo["date_added"]):
matched_bo = bo
break
# Capa 4. Emparejamiento Heurístico / Similitud (Similitud >= 55%)
if not matched_bo:
best_fuzzy_bo = None
best_fuzzy_score = 0
for bo in available_brand_opps_for_contact:
score = calculate_opportunity_similarity(so, bo, _bo_cf(bo), branch_match_values)
if score > best_fuzzy_score:
best_fuzzy_score = score
best_fuzzy_bo = bo
if best_fuzzy_bo and best_fuzzy_score >= 55:
matched_bo = best_fuzzy_bo
# --- MAPEO DINÁMICO DE PIPELINE Y STAGE DE SUCURSAL A MARCA ---
mapped_brand_pid, mapped_brand_sid, stage_warning = map_stage_branch_to_brand(
conn, loc_id, so["pipeline_id"], so["pipeline_stage_id"], brand_pipelines
)
if not mapped_brand_pid:
mapped_brand_pid = so["pipeline_id"]
if not mapped_brand_sid:
mapped_brand_sid = so["pipeline_stage_id"]
if stage_warning:
_record_error(
"stage_mapping_warning",
f"[{b_name}] opp '{so.get('name')}' ({so.get('id')}): {stage_warning}",
{"location_id": loc_id, "opportunity_id": so.get("id"), "branch_stage_id": so.get("pipeline_stage_id")},
)
# Mapear los campos de la oportunidad para la Marca Principal con reglas y fallbacks de Monte Providencia
brand_opp_custom_values = {}
# CAPA BASE - mapeo dinamico de TODOS los custom fields de la opp de
# sucursal cuyo nombre NO tenga logica dedicada abajo. Garantiza que
# campos sin tratamiento especial (incluido "ID Oportunidad Sucursal"
# y cualquier campo nuevo) se propaguen sucursal -> Marca, en vez de
# quedarse solo con los 9 campos historicos. Los campos especiales se
# excluyen aqui y los calcula/valida la logica que sigue (override).
for _src_fname, _src_fval in so_custom_fields_by_name.items():
if _norm_field_name(_src_fname) in _SPECIAL_OVERRIDE_FIELDS_NORM:
continue
if _src_fval is not None and str(_src_fval).strip() != "":
brand_opp_custom_values[_src_fname] = _src_fval
# 1. Copiar Canal de Origen con fallbacks (p. ej. si está vacío, heredar del contacto)
canal = so_custom_fields_by_name.get("CANAL DE ORIGEN") or so_custom_fields_by_name.get("Canal de Origen")
if not canal:
canal = sc_custom_fields_by_name.get("CANAL DE ORIGEN") or sc_custom_fields_by_name.get("Canal de Origen")
if not canal:
contact_tags = [t.strip().lower() for t in (sc.get("tags") or "").split(",") if t.strip()]
if "sucursal" in contact_tags:
canal = "SUCURSAL"
elif sc.get("source"):
sc_source = str(sc.get("source") or "").upper()
if any(kw in sc_source for kw in ["FACEBOOK", "FB", "INSTAGRAM", "IG", "WHATSAPP", "WA", "FORM"]):
canal = sc_source
else:
canal = "SUCURSAL"
else:
canal = "SUCURSAL"
if canal:
brand_opp_custom_values["CANAL DE ORIGEN"] = canal
brand_opp_custom_values["Canal de Origen"] = canal
# 2. Copiar Fuente de Prospecto con validación y fallbacks
fuente = so_custom_fields_by_name.get("Fuente de Prospecto")
if not fuente:
fuente = sc_custom_fields_by_name.get("Fuente de Prospecto")
# Regla de Validación: SUCURSAL no puede tener fuente LEAD DIGITAL
if canal == "SUCURSAL" and fuente == "LEAD DIGITAL":
fuente = "SUCURSAL"
elif not fuente:
if canal in ["FORMULARIO", "FACEBOOK", "WHATSAPP", "INSTAGRAM", "LEAD DIGITAL"]:
fuente = "LEAD DIGITAL"
else:
fuente = "SUCURSAL"
if fuente:
brand_opp_custom_values["Fuente de Prospecto"] = fuente
# 3 + 4. Sucursal y TIENDA desde el VERIFICADOR CSV (fuente autoritativa).
# Doc MP: "Sucursal/TIENDA debe coincidir con verificador de sucursales".
# La opp pertenece a la sucursal `loc_id`, así que se toma el valor del
# verificador para esa location, no lo que digan los campos de origen.
verifier_entry = verifier_map.get(loc_id)
if verifier_entry:
# Detectar caso "cross-sucursal": la opp tiene un Sucursal o TIENDA
# poblado que NO coincide con la sucursal que la aloja. Puede ser
# cliente transferido entre sucursales o dato sucio — vale la pena
# marcarlo para revisión manual.
existing_sucursal = (so_custom_fields_by_name.get("Sucursal") or "").strip()
existing_tienda = (
so_custom_fields_by_name.get("TIENDA")
or so_custom_fields_by_name.get("Tienda")
or ""
).strip()
target_sucursal = verifier_entry["sucursal"]
target_tienda = verifier_entry["tienda"]
sucursal_mismatch = (
existing_sucursal
and existing_sucursal != target_sucursal
and existing_sucursal != target_tienda
)
tienda_mismatch = (
existing_tienda
and existing_tienda != target_tienda
and existing_tienda != target_sucursal
)
if sucursal_mismatch or tienda_mismatch:
_record_error(
"cross_sucursal",
f"[{b_name}] opp '{so.get('name')}' tiene Sucursal='{existing_sucursal}' "
f"/ TIENDA='{existing_tienda}' pero está alojada en location {loc_id} "
f"(verificador → '{target_sucursal}' / '{target_tienda}'). Se sobrescribe.",
{
"location_id": loc_id,
"opportunity_id": so.get("id"),
"existing_sucursal": existing_sucursal,
"existing_tienda": existing_tienda,
"target_sucursal": target_sucursal,
"target_tienda": target_tienda,
},
)
brand_opp_custom_values["Sucursal"] = target_sucursal
brand_opp_custom_values["TIENDA"] = target_tienda
brand_opp_custom_values["Tienda"] = target_tienda
else:
_record_error(
"location_sin_verificador",
f"[{b_name}] location_id '{loc_id}' no está en el verificador CSV; no se escribe Sucursal/TIENDA.",
{"location_id": loc_id, "opportunity_id": so.get("id")},
)
# 5. Calcular y mapear el Vehículo (según Reglas de Monte Providencia)
# Reglas:
# - LEAD DIGITAL: construir desde Marca/Versión/Año del contacto, pero SOLO si
# hay al menos marca o versión (solo año es información inútil que
# sobrescribiría datos válidos en Marca).
# - Otros canales: respetar el valor de la opp de sucursal; si no existe,
# intentar el constructed_vehicle siempre que sea significativo.
marca_veh = sc_custom_fields_by_name.get("Marca del Vehículo", "").strip()
vers_veh = sc_custom_fields_by_name.get("Versión del Vehículo", "").strip()
ano_veh = sc_custom_fields_by_name.get("Año del Vehículo", "").strip()
vehicle_meaningful = has_meaningful_vehicle_info(marca_veh, vers_veh, ano_veh)
constructed_vehicle = " ".join(filter(None, [marca_veh, vers_veh, ano_veh])).strip()
if fuente == "LEAD DIGITAL" and constructed_vehicle and vehicle_meaningful:
brand_opp_custom_values["Vehículo"] = constructed_vehicle
else:
vehiculo_val = so_custom_fields_by_name.get("Vehículo")
if not vehiculo_val and vehicle_meaningful:
vehiculo_val = constructed_vehicle
if vehiculo_val:
brand_opp_custom_values["Vehículo"] = vehiculo_val
# 6. Modalidad de Empeño (doc MP: Sin Dejarlo (GPS) / Tradicional (Resguardo))
modalidad_raw = (
so_custom_fields_by_name.get("Modalidad de Empeño")
or so_custom_fields_by_name.get("MODALIDAD DE EMPEÑO")
or sc_custom_fields_by_name.get("Modalidad de Empeño")
or sc_custom_fields_by_name.get("MODALIDAD DE EMPEÑO")
or sc_custom_fields_by_name.get("¿Qué modalidad prefieres?")
)
modalidad_val = normalize_modalidad(modalidad_raw)
if modalidad_val:
brand_opp_custom_values["Modalidad de Empeño"] = modalidad_val
# 7. Persona que atendió al prospecto (copia directa desde sucursal)
persona_val = (
so_custom_fields_by_name.get("Persona que atendió al prospecto")
or so_custom_fields_by_name.get("Persona que atendio al prospecto")
)
if persona_val:
brand_opp_custom_values["Persona que atendió al prospecto"] = persona_val
# 8. Visita a sucursal (checkbox)
visita_val = (
so_custom_fields_by_name.get("Visita a sucursal")
or so_custom_fields_by_name.get("Visita a Sucursal")
)
if visita_val is not None and str(visita_val) != "":
brand_opp_custom_values["Visita a sucursal"] = visita_val
# 9. Fecha de última visita a sucursal
fecha_visita_val = (
so_custom_fields_by_name.get("Fecha de última visita a sucursal")
or so_custom_fields_by_name.get("Fecha de ultima visita a sucursal")
)
if fecha_visita_val:
brand_opp_custom_values["Fecha de última visita a sucursal"] = fecha_visita_val
# Convertir los valores al formato de GHL [ {"id": "brand_f_id", "value": "value"} ]
brand_custom_fields_list = []
for f_name, f_val in brand_opp_custom_values.items():
brand_f_id = brand_opp_schema.get(f_name)
if brand_f_id:
brand_custom_fields_list.append({
"id": brand_f_id,
"value": f_val
})
# 3. Acción de sincronización / actualización (PUT o POST)
if matched_bo:
used_brand_opp_ids.add(matched_bo.get("id"))
# Validar si tiene discrepancias para actualizar (PUT)
has_discrepancies = False
changes = []
if (so["name"] or "").strip().lower() != (matched_bo.get("name") or "").strip().lower():
has_discrepancies = True
changes.append(f"Nombre ('{matched_bo.get('name')}' -> '{so['name']}')")
if so["status"] != matched_bo.get("status"):
has_discrepancies = True
status_changed = True
changes.append(f"Estado ({matched_bo.get('status')} -> {so['status']})")
else:
status_changed = False
if abs((so["monetary_value"] or 0) - (matched_bo.get("monetaryValue") or matched_bo.get("monetary_value") or 0)) >= 0.01:
has_discrepancies = True
changes.append(f"Valor (${matched_bo.get('monetaryValue') or matched_bo.get('monetary_value') or 0:.2f} -> ${so['monetary_value']:.2f})")
if mapped_brand_sid != (matched_bo.get("pipelineStageId") or matched_bo.get("pipeline_stage_id")):
has_discrepancies = True
changes.append(f"Etapa ({matched_bo.get('pipelineStageId') or matched_bo.get('pipeline_stage_id')} -> {mapped_brand_sid})")
# Comparar campos personalizados de la Marca con los calculados (usa cache)
matched_bo_custom_fields_by_name = dict(_bo_cf(matched_bo))
for f_name, expected_val in brand_opp_custom_values.items():
current_val = matched_bo_custom_fields_by_name.get(f_name, "")
norm_curr = str(current_val).strip().lower()
norm_exp = str(expected_val).strip().lower()
if norm_curr != norm_exp:
has_discrepancies = True
changes.append(f"Campo '{f_name}' ('{current_val}' -> '{expected_val}')")
if has_discrepancies:
if not allow_updates:
safe_print(f" [SALTADO] Se omitió la actualización de la oportunidad '{matched_bo.get('name')}' (ID: {matched_bo.get('id')}) en Marca (Acción no permitida).")
continue
# Registrar cambio planeado en audit log (un row por campo cambiado)
change_ids = []
if args.run_id:
for f_name, expected_val in brand_opp_custom_values.items():
old_val = matched_bo_custom_fields_by_name.get(f_name)
if str(old_val or "").strip().lower() != str(expected_val or "").strip().lower():
cf_id = brand_opp_schema.get(f_name)
cid = script_audit.record_change(
args.run_id, BRAND_LOCATION_ID, "opportunity",
matched_bo.get("id"), cf_id or "", f_name, old_val, expected_val,
)
if cid:
change_ids.append((cid, f_name))
if args.dry_run:
safe_print(f" [SIMULACION] Actualizaria oportunidad '{matched_bo.get('name')}' (ID: {matched_bo.get('id')}) en Marca. Cambios: {', '.join(changes)}")
for cid, _f in change_ids:
script_audit.mark_change(cid, "planned")
total_opps_updated += 1
else:
safe_print(f" Actualizando oportunidad '{matched_bo.get('name')}' (ID: {matched_bo.get('id')}) en Marca para alinear con Sucursal...")
opp_data = {
"name": so["name"],
"monetaryValue": so["monetary_value"] or 0.0,
"pipelineId": mapped_brand_pid,
"pipelineStageId": mapped_brand_sid,
"customFields": brand_custom_fields_list
}
try:
apply_brand_opportunity_update(
brand_token,
matched_bo.get("id"),
opp_data,
(so["status"] or "open") if status_changed else None,
)
safe_print(f" [OK] Actualizada correctamente.")
total_opps_updated += 1
for cid, _f in change_ids:
script_audit.mark_change(cid, "applied")
matched_bo.update({
"name": so["name"],
"status": so["status"] or "open",
"monetaryValue": so["monetary_value"] or 0.0,
"monetary_value": so["monetary_value"] or 0.0,
"pipelineId": mapped_brand_pid,
"pipeline_id": mapped_brand_pid,
"pipelineStageId": mapped_brand_sid,
"pipeline_stage_id": mapped_brand_sid,
})
# Invalidar cache de cf para re-leer en próximas iteraciones
brand_opp_cf_cache[matched_bo.get("id")] = dict(brand_opp_custom_values)
except Exception as err:
err_entry = _record_error(
"update_opportunity_failed",
f"[{b_name}] Fallo actualizar opp '{matched_bo.get('name')}' (id={matched_bo.get('id')}): {err}",
{"location_id": loc_id, "brand_opportunity_id": matched_bo.get("id")},
exc=err,
)
safe_print(f" [ERROR] Fallo actualizar en GHL: {err} (error_id={err_entry.get('error_id')})")
for cid, _f in change_ids:
script_audit.mark_change(cid, "failed", str(err))
else:
# No existe en Marca, crearla! (POST) (si está permitido)
if not allow_creations:
safe_print(f" [SALTADO] Se omitió la creación de la oportunidad nueva '{so['name']}' en Marca Principal (Acción no permitida).")
continue
if args.dry_run:
safe_print(f" [SIMULACION] Crearia oportunidad nueva '{so['name']}' en Marca Principal bajo el contacto ID '{bc_id}'.")
if brand_custom_fields_list:
cf_details = [f"{f_name}: '{f['value']}'" for f_name, f_val in brand_opp_custom_values.items() for f in brand_custom_fields_list if f['id'] == brand_opp_schema.get(f_name)]
safe_print(f" [SIMULACION] Con Campos Personalizados: {', '.join(cf_details)}")
total_opps_created += 1
else:
safe_print(f" Creando oportunidad '{so['name']}' en Marca Principal...")
opp_data = {
"locationId": BRAND_LOCATION_ID,
"name": so["name"],
"status": so["status"] or "open",
"monetaryValue": so["monetary_value"] or 0.0,
"pipelineId": mapped_brand_pid,
"pipelineStageId": mapped_brand_sid,
"contactId": bc_id,
"customFields": brand_custom_fields_list
}
try:
res = sync_engine.ghl_client.create_opportunity(brand_token, opp_data)
new_opp_id = res.get("opportunity", {}).get("id")
target_status = so["status"] or "open"
if new_opp_id and target_status != "open":
sync_engine.ghl_client.update_opportunity_status(brand_token, new_opp_id, target_status)
safe_print(f" [OK] Creada con éxito (Nuevo ID en Marca GHL: {new_opp_id}).")
total_opps_created += 1
if new_opp_id and args.run_id:
cid = script_audit.record_change(
args.run_id, BRAND_LOCATION_ID, "opportunity", new_opp_id,
"", "created", None,
{"name": so["name"], "contact_id": bc_id, "source_branch": loc_id},
)
if cid:
script_audit.mark_change(cid, "applied")
if new_opp_id:
new_bo = {
"id": new_opp_id,
"name": so["name"],
"status": target_status,
"monetaryValue": so["monetary_value"] or 0.0,
"monetary_value": so["monetary_value"] or 0.0,
"pipelineId": mapped_brand_pid,
"pipeline_id": mapped_brand_pid,
"pipelineStageId": mapped_brand_sid,
"pipeline_stage_id": mapped_brand_sid,
"contactId": bc_id,
"contact_id": bc_id,
"date_added": so.get("date_added"),
}
brand_opps_by_contact_id.setdefault(bc_id, []).append(new_bo)
brand_opp_cf_cache[new_opp_id] = dict(brand_opp_custom_values)
except Exception as err:
if is_duplicate_opportunity_error(err):
safe_print(" [SALTADO] GHL rechazó la creación porque el contacto ya tiene una oportunidad en Marca.")
_record_error(
"create_opportunity_duplicate",
f"[{b_name}] Duplicado al crear opp '{so.get('name')}' (contact {bc_id})",
{"location_id": loc_id, "contact_id": bc_id, "opportunity_name": so.get("name")},
)
else:
err_entry = _record_error(
"create_opportunity_failed",
f"[{b_name}] Fallo crear opp '{so.get('name')}': {err}",
{"location_id": loc_id, "contact_id": bc_id, "opportunity_name": so.get("name")},
exc=err,
)
safe_print(f" [ERROR] Fallo crear en GHL: {err} (error_id={err_entry.get('error_id')})")
safe_print("-" * 90)
# Imprimir resumen de la ejecucion
safe_print("\n" + "=" * 90)
safe_print("=== RESUMEN DE RECONCILIACION Y SINCRONIZACION SUCURSAL -> MARCA ===")
safe_print("=" * 90)
if args.dry_run:
safe_print(f" [SIMULACION] Contactos Nuevos a Crear en Marca: {total_contacts_created}")
safe_print(f" [SIMULACION] Oportunidades Nuevas a Replicar: {total_opps_created}")
safe_print(f" [SIMULACION] Oportunidades a Actualizar (PUT): {total_opps_updated}")
else:
safe_print(f" Contactos Nuevos Creados en Marca: {total_contacts_created}")
safe_print(f" Oportunidades Nuevas Replicadas: {total_opps_created}")
safe_print(f" Oportunidades Actualizadas (PUT): {total_opps_updated}")
safe_print("-" * 90)
safe_print("--- MÉTRICAS DE ESCANEO Y AUDITORÍA DE GHL ---")
safe_print(f" Locations procesadas: {len(stats['locations_processed'])}")
for loc in stats['locations_processed']:
safe_print(f" - {loc}")
safe_print(f" Páginas consultadas (GHL): {stats['pages_queried']}")
safe_print(f" Registros recibidos desde GHL: {stats['records_received']}")
safe_print(f" Registros únicos procesados: {stats['records_unique_processed']}")
safe_print(f" Duplicados descartados: {stats['duplicates_discarded']}")
if stats['errors']:
safe_print(f" Errores por location o endpoint:")
for loc_err, err_list in stats['errors'].items():
safe_print(f" - Location {loc_err}:")
for err in err_list:
safe_print(f" * {err}")
else:
safe_print(f" Errores por location o endpoint: Ninguno")
safe_print("=" * 90)
# --- RESUMEN DE ERRORES ACUMULADOS DURANTE LA EJECUCIÓN ---
safe_print("\n" + "=" * 90)
safe_print("=== ERRORES Y ADVERTENCIAS ACUMULADOS DURANTE LA EJECUCIÓN ===")
safe_print("=" * 90)
if not EXECUTION_ERRORS:
safe_print(" (Sin errores ni advertencias)")
else:
by_cat = {}
for e in EXECUTION_ERRORS:
by_cat.setdefault(e["category"], []).append(e)
safe_print(f" Total: {len(EXECUTION_ERRORS)} entradas en {len(by_cat)} categorías.\n")
for cat, items in sorted(by_cat.items(), key=lambda kv: -len(kv[1])):
safe_print(f" [{cat}] {len(items)} entrada(s):")
for it in items[:50]:
err_id_str = f" (error_id={it['error_id']})" if it.get("error_id") else ""
safe_print(f" - {it['message']}{err_id_str}")
if len(items) > 50:
safe_print(f" ... y {len(items) - 50} más (truncadas).")
safe_print("")
safe_print("=" * 90 + "\n")
# Registrar éxito en la auditoría si hay run_id
if args.run_id:
script_audit.update_run_status(args.run_id, "success")
except Exception as e:
safe_print(f"Error critico en la ejecucion del script de reconciliacion: {e}")
if args.run_id:
script_audit.update_run_status(args.run_id, "failed", error_message=str(e))
sys.exit(1)
finally:
conn.close()
if __name__ == "__main__":
main()