1327 lines
66 KiB
Python
1327 lines
66 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Reconcile and sync opportunities from Branches to Brand with 1-hour margin of past Latency and Fuzzy Matching."""
|
|
|
|
import argparse
|
|
import csv
|
|
import os
|
|
import sys
|
|
import sqlite3
|
|
import json
|
|
import re
|
|
import time
|
|
import unicodedata
|
|
from datetime import datetime
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
|
|
import sync_engine
|
|
import db
|
|
import error_logging
|
|
import script_audit
|
|
|
|
from paths import DB_PATH
|
|
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
|
|
|
|
# Verificador CSV: fuente AUTORITATIVA de Sucursal y TIENDA por location_id.
|
|
# Misma fuente que usa scripts/fill_sucursal_tienda_from_location.py.
|
|
# Formato:
|
|
# Sucursal = "Atlacomulco, Estado de México" (texto completo)
|
|
# TIENDA = "ATLACOMULCO" (código corto)
|
|
VERIFIER_FILENAME = "Monte Providencia - Verificador de sucursales y correos - Sucursales.csv"
|
|
|
|
# Valores válidos del campo Modalidad de Empeño (doc MP).
|
|
MODALIDAD_EMPENO_VALUES = ["Sin Dejarlo (GPS)", "Tradicional (Resguardo)"]
|
|
|
|
# Acumulador global de errores de ejecución para imprimir al final del run.
|
|
EXECUTION_ERRORS = []
|
|
|
|
|
|
# Campos de oportunidad con LOGICA DE NEGOCIO DEDICADA mas abajo (fallbacks,
|
|
# verificador de Sucursal/TIENDA, construccion de Vehiculo, normalizacion de
|
|
# Modalidad, etc.). Se excluyen de la copia dinamica generica para no pisar esa
|
|
# logica; los demas custom fields (incluido "ID Oportunidad Sucursal" y cualquier
|
|
# campo nuevo) se copian tal cual sucursal -> Marca.
|
|
_SPECIAL_OVERRIDE_FIELDS_NORM = {
|
|
"canal de origen", "fuente de prospecto", "sucursal", "tienda", "vehiculo",
|
|
"modalidad de empeno", "persona que atendio al prospecto",
|
|
"visita a sucursal", "fecha de ultima visita a sucursal",
|
|
}
|
|
|
|
|
|
def _norm_field_name(name):
|
|
"""Normaliza nombre de campo: sin acentos, minusculas, espacios colapsados."""
|
|
s = unicodedata.normalize("NFKD", str(name or ""))
|
|
s = "".join(c for c in s if not unicodedata.combining(c))
|
|
return " ".join(s.lower().split())
|
|
|
|
|
|
def _record_error(category, message, context=None, exc=None):
|
|
"""Acumula errores para el resumen final + persiste en error_log si hay excepción."""
|
|
entry = {
|
|
"category": category,
|
|
"message": message,
|
|
"context": context or {},
|
|
"error_id": None,
|
|
}
|
|
if exc is not None:
|
|
try:
|
|
entry["error_id"] = error_logging.log_error(category, exc, context or {})
|
|
except Exception:
|
|
entry["error_id"] = None
|
|
EXECUTION_ERRORS.append(entry)
|
|
return entry
|
|
|
|
|
|
def load_verifier_map():
|
|
"""
|
|
Carga el verificador CSV como fuente AUTORITATIVA de Sucursal y TIENDA.
|
|
|
|
Retorna {location_id: {"sucursal": str, "tienda": str, "sc_name": str}}.
|
|
Ignora la cuenta principal (SUCURSAL == "-") y filas sin TIENDA válido.
|
|
"""
|
|
path = os.path.join(ROOT_DIR, VERIFIER_FILENAME)
|
|
if not os.path.exists(path):
|
|
raise FileNotFoundError(f"Verificador no encontrado: {path}")
|
|
|
|
result = {}
|
|
with open(path, mode="r", encoding="utf-8-sig", newline="") as fh:
|
|
for row in csv.DictReader(fh):
|
|
loc_id = (row.get("ID LOCATION BUCEFALO") or "").strip()
|
|
sucursal = (row.get("SUCURSAL") or "").strip()
|
|
tienda = (row.get("TIENDA") or "").strip()
|
|
sc_name = (row.get("SC BUCEFALO") or "").strip()
|
|
if not loc_id or sucursal == "-" or tienda in ("-", "CUENTA PRINCIPAL", ""):
|
|
continue
|
|
if loc_id not in result:
|
|
result[loc_id] = {"sucursal": sucursal, "tienda": tienda, "sc_name": sc_name}
|
|
return result
|
|
|
|
|
|
def normalize_modalidad(raw_value):
|
|
"""Devuelve el valor canónico de Modalidad de Empeño o el original si no se reconoce."""
|
|
if not raw_value:
|
|
return None
|
|
text = remove_accents(str(raw_value)).lower()
|
|
if "gps" in text or "sin dejar" in text:
|
|
return "Sin Dejarlo (GPS)"
|
|
if "resguardo" in text or "tradicional" in text or "dejar auto" in text:
|
|
return "Tradicional (Resguardo)"
|
|
return str(raw_value)
|
|
|
|
|
|
def has_meaningful_vehicle_info(marca, version, ano):
|
|
"""VehÃculo válido = al menos marca o versión. Solo año no es información útil."""
|
|
has_text = bool((marca or "").strip()) or bool((version or "").strip())
|
|
return has_text
|
|
|
|
def safe_print(*args, **kwargs):
|
|
sep = kwargs.get("sep", " ")
|
|
end = kwargs.get("end", "\n")
|
|
text = sep.join(str(arg) for arg in args)
|
|
encoding = sys.stdout.encoding or "utf-8"
|
|
try:
|
|
sys.stdout.write(text + end)
|
|
sys.stdout.flush()
|
|
except UnicodeEncodeError:
|
|
try:
|
|
safe_text = text.encode(encoding, errors="replace").decode(encoding)
|
|
sys.stdout.write(safe_text + end)
|
|
sys.stdout.flush()
|
|
except Exception:
|
|
try:
|
|
safe_text = text.encode("ascii", errors="replace").decode("ascii")
|
|
sys.stdout.write(safe_text + end)
|
|
sys.stdout.flush()
|
|
except Exception:
|
|
pass
|
|
|
|
def normalize_phone(phone):
|
|
if not phone:
|
|
return ""
|
|
return re.sub(r"\D+", "", str(phone))
|
|
|
|
def get_clean_phone_10d(phone):
|
|
norm = re.sub(r"\D+", "", str(phone or ""))
|
|
if len(norm) >= 10:
|
|
return norm[-10:]
|
|
return norm
|
|
|
|
def remove_accents(text):
|
|
if not text:
|
|
return ""
|
|
accents_map = {
|
|
'á': 'a', 'é': 'e', 'Ã': 'i', 'ó': 'o', 'ú': 'u',
|
|
'ü': 'u', 'ñ': 'n',
|
|
'Ã': 'a', 'É': 'e', 'Ã': 'i', 'Ó': 'o', 'Ú': 'u',
|
|
'Ü': 'u', 'Ñ': 'n'
|
|
}
|
|
clean_chars = []
|
|
for char in text:
|
|
clean_chars.append(accents_map.get(char, char.lower()))
|
|
cleaned = "".join(clean_chars)
|
|
cleaned = re.sub(r"[^\w\s]", "", cleaned) # quitar puntuación y caracteres corruptos de codificación
|
|
return " ".join(cleaned.split())
|
|
|
|
def extract_tienda_label(account_nombre):
|
|
if not account_nombre:
|
|
return None
|
|
parts = str(account_nombre).split(" - ")
|
|
if len(parts) < 3:
|
|
return None
|
|
raw = " - ".join(parts[2:]).strip()
|
|
return raw.upper() if raw else None
|
|
|
|
def fetch_opportunities_paginated(token, location_id):
|
|
"""
|
|
Obtiene todas las oportunidades con paginación completa, acumulando y
|
|
deduplicando por ID en cumplimiento de la guÃa de Monte Providencia.
|
|
|
|
Sin tope de cantidad: solo corta la paginación nativa de GHL
|
|
(batch vacÃo repetido, batch menor al limit o total alcanzado).
|
|
|
|
Retorna (unique_opps, pages_queried, raw_received, duplicates_discarded, errors).
|
|
"""
|
|
opportunities = []
|
|
seen_ids = set()
|
|
pages_queried = 0
|
|
raw_received = 0
|
|
duplicates_discarded = 0
|
|
errors = []
|
|
|
|
page = 1
|
|
limit = 100
|
|
empty_pages = 0
|
|
while True:
|
|
pages_queried += 1
|
|
try:
|
|
data = sync_engine.ghl_client.search_opportunities(token, location_id, limit=limit, page=page)
|
|
except Exception as e:
|
|
err_msg = str(e)
|
|
errors.append(f"Fallo en pagina {page}: {err_msg}")
|
|
_record_error(
|
|
"ghl_get_all_opportunities_page_failed",
|
|
f"Fallo paginación opps en {location_id} pagina {page}: {err_msg}",
|
|
{"location_id": location_id, "page": page, "accumulated_opps": len(opportunities)},
|
|
exc=e,
|
|
)
|
|
break
|
|
|
|
batch = data.get("opportunities", []) or []
|
|
if not batch:
|
|
empty_pages += 1
|
|
if empty_pages >= 2:
|
|
break
|
|
page += 1
|
|
continue
|
|
empty_pages = 0
|
|
|
|
raw_received += len(batch)
|
|
for opp in batch:
|
|
opp_id = opp.get("id")
|
|
if not opp_id:
|
|
continue
|
|
if opp_id in seen_ids:
|
|
duplicates_discarded += 1
|
|
continue
|
|
seen_ids.add(opp_id)
|
|
opportunities.append(opp)
|
|
|
|
total_reported = data.get("total", 0) or 0
|
|
if len(batch) < limit or (total_reported > 0 and len(opportunities) >= total_reported):
|
|
break
|
|
|
|
page += 1
|
|
|
|
return opportunities, pages_queried, raw_received, duplicates_discarded, errors
|
|
|
|
def normalize_match_text(text):
|
|
return remove_accents(str(text or "")).strip().lower()
|
|
|
|
def custom_fields_by_name(obj, id_to_name):
|
|
values = {}
|
|
for cf in obj.get("customFields", []) or []:
|
|
cf_id = cf.get("id")
|
|
cf_val = cf.get("fieldValueString") or cf.get("value")
|
|
cf_name = id_to_name.get(cf_id)
|
|
if cf_name and cf_val is not None:
|
|
values[cf_name] = cf_val
|
|
return values
|
|
|
|
def branch_affinity_score(brand_custom_fields_by_name, branch_values):
|
|
expected_values = [normalize_match_text(v) for v in branch_values if normalize_match_text(v)]
|
|
if not expected_values:
|
|
return 0
|
|
|
|
brand_values = []
|
|
for field_name in ("Sucursal", "TIENDA", "Tienda"):
|
|
value = brand_custom_fields_by_name.get(field_name)
|
|
if value:
|
|
brand_values.append(normalize_match_text(value))
|
|
|
|
if not brand_values:
|
|
return 0
|
|
|
|
for brand_value in brand_values:
|
|
for expected in expected_values:
|
|
if brand_value == expected or brand_value in expected or expected in brand_value:
|
|
return 30
|
|
|
|
return -25
|
|
|
|
def parse_date_string(date_str):
|
|
if not date_str:
|
|
return None
|
|
clean_str = date_str.replace("T", " ").replace("Z", "")
|
|
if "." in clean_str:
|
|
clean_str = clean_str.split(".")[0]
|
|
# Cortar timezone offsets si existen (+00:00, -06:00, etc.)
|
|
clean_str = re.split(r"[\+\-]\d{2}", clean_str)[0].strip()
|
|
|
|
for fmt in ("%Y-%m-%d %H:%M:%S", "%Y-%m-%d", "%d/%m/%Y %H:%M:%S"):
|
|
try:
|
|
return datetime.strptime(clean_str, fmt)
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
def are_dates_within_one_hour(date_str1, date_str2):
|
|
d1 = parse_date_string(date_str1)
|
|
d2 = parse_date_string(date_str2)
|
|
if not d1 or not d2:
|
|
return False
|
|
diff = abs((d1 - d2).total_seconds())
|
|
return diff <= 3600 # 3600 seconds = 1 hour
|
|
|
|
def calculate_opportunity_similarity(so, bo, brand_custom_fields_by_name=None, branch_values=None):
|
|
"""
|
|
Calcula un puntaje de similitud (0 a 100) entre la oportunidad de sucursal (so)
|
|
y la de marca (bo) para encontrar el par mas semejante.
|
|
"""
|
|
if so["id"] == bo["id"]:
|
|
return 100
|
|
|
|
score = 0
|
|
so_name = normalize_match_text(so.get("name"))
|
|
bo_name = normalize_match_text(bo.get("name"))
|
|
|
|
# 1. Match de Nombre Exacto
|
|
if so_name == bo_name:
|
|
score += 50
|
|
# 2. Match de Nombre Parcial
|
|
elif so_name and bo_name:
|
|
if so_name in bo_name or bo_name in so_name:
|
|
score += 30
|
|
else:
|
|
# Overlap de palabras
|
|
so_words = set(so_name.split())
|
|
bo_words = set(bo_name.split())
|
|
overlap = so_words.intersection(bo_words)
|
|
if len(overlap) > 0:
|
|
score += min(25, len(overlap) * 8)
|
|
|
|
# 3. Match de Valor Monetario
|
|
so_val = so["monetary_value"] or 0.0
|
|
bo_val = bo["monetary_value"] or 0.0
|
|
if abs(so_val - bo_val) < 0.01:
|
|
score += 15
|
|
elif max(so_val, bo_val) > 0 and abs(so_val - bo_val) / max(1.0, so_val, bo_val) < 0.2:
|
|
score += 10
|
|
|
|
# 4. Match de Estado (Status)
|
|
if so["status"] == bo["status"]:
|
|
score += 10
|
|
|
|
# 5. Proximidad temporal de creación
|
|
if are_dates_within_one_hour(so.get("date_added"), bo.get("date_added") or bo.get("createdAt")):
|
|
score += 15
|
|
|
|
# 6. Afinidad de sucursal/TIENDA cuando la oportunidad de Marca ya la tiene poblada
|
|
if brand_custom_fields_by_name is not None:
|
|
score += branch_affinity_score(brand_custom_fields_by_name, branch_values or [])
|
|
|
|
return score
|
|
|
|
def apply_brand_opportunity_update(brand_token, opportunity_id, opp_data, target_status=None):
|
|
"""
|
|
GHL expone el cambio de estado en un endpoint dedicado. El PUT general puede
|
|
aceptar otros campos sin reflejar won/lost/abandoned en los reportes.
|
|
"""
|
|
sync_engine.ghl_client.update_opportunity(brand_token, opportunity_id, opp_data)
|
|
if target_status:
|
|
sync_engine.ghl_client.update_opportunity_status(brand_token, opportunity_id, target_status)
|
|
|
|
def is_duplicate_opportunity_error(err):
|
|
response = getattr(err, "response", None)
|
|
if response is None or response.status_code != 400:
|
|
return False
|
|
return "duplicate opportunity" in (response.text or "").lower()
|
|
|
|
def _find_stage_by_name(brand_stages, target_name):
|
|
target_norm = remove_accents(target_name).strip().lower()
|
|
for bs in brand_stages:
|
|
if remove_accents(bs.get("name", "")).strip().lower() == target_norm:
|
|
return bs
|
|
return None
|
|
|
|
|
|
def map_stage_branch_to_brand(conn, branch_loc_id, branch_pid, branch_sid, brand_pipelines):
|
|
"""
|
|
Mapea dinámicamente un pipeline_stage_id de sucursal al pipeline_stage_id de Marca.
|
|
|
|
Estrategia:
|
|
1. Pipeline destino = "Standar" / "Standard" (o el primero como fallback).
|
|
2. Stage destino = match por nombre de etapa (sin acentos, case-insensitive).
|
|
3. Fallback seguro = etapa "PROSPECTO NUEVO" en Marca (alineado con doc MP:
|
|
"oportunidades huérfanas se reubican o marcan para revisión").
|
|
4. Si tampoco existe "PROSPECTO NUEVO", primera etapa del pipeline + warning.
|
|
|
|
Retorna (brand_pipeline_id, brand_stage_id, warning_str_or_None).
|
|
"""
|
|
if not brand_pipelines:
|
|
return None, None, "Marca no tiene pipelines cargados"
|
|
|
|
brand_pipe = None
|
|
for bp in brand_pipelines:
|
|
bp_name = bp.get("name", "").lower()
|
|
if "standar" in bp_name or "standard" in bp_name:
|
|
brand_pipe = bp
|
|
break
|
|
if not brand_pipe:
|
|
brand_pipe = brand_pipelines[0]
|
|
|
|
brand_pid_mapped = brand_pipe["id"]
|
|
brand_stages = brand_pipe.get("stages", [])
|
|
if not brand_stages:
|
|
return brand_pid_mapped, None, f"Pipeline de Marca '{brand_pipe.get('name')}' no tiene etapas"
|
|
|
|
# 1. Match por nombre exacto
|
|
branch_stages_list = []
|
|
try:
|
|
row = conn.execute(
|
|
"SELECT stages_json FROM pipelines WHERE location_id = ? AND id = ?",
|
|
(branch_loc_id, branch_pid)
|
|
).fetchone()
|
|
if row:
|
|
try:
|
|
branch_stages_list = json.loads(row["stages_json"])
|
|
branch_stages_list = sorted(branch_stages_list, key=lambda x: x.get("order", 0))
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
|
|
branch_stage_name = None
|
|
if branch_stages_list:
|
|
for s in branch_stages_list:
|
|
if s["id"] == branch_sid:
|
|
branch_stage_name = s.get("name")
|
|
break
|
|
|
|
if branch_stage_name:
|
|
matched = _find_stage_by_name(brand_stages, branch_stage_name)
|
|
if matched:
|
|
return brand_pid_mapped, matched["id"], None
|
|
|
|
# 2. Fallback seguro: PROSPECTO NUEVO
|
|
prospecto_nuevo = _find_stage_by_name(brand_stages, "PROSPECTO NUEVO")
|
|
if prospecto_nuevo:
|
|
warning = (
|
|
f"Stage de sucursal '{branch_stage_name or branch_sid}' sin match por nombre en Marca; "
|
|
f"se reubica en 'PROSPECTO NUEVO' (revisar)."
|
|
)
|
|
return brand_pid_mapped, prospecto_nuevo["id"], warning
|
|
|
|
# 3. Última opción: primera etapa
|
|
warning = (
|
|
f"Stage '{branch_stage_name or branch_sid}' sin match y no existe 'PROSPECTO NUEVO' en Marca; "
|
|
f"se usa la primera etapa '{brand_stages[0].get('name')}' (revisar manualmente)."
|
|
)
|
|
return brand_pid_mapped, brand_stages[0]["id"], warning
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Reconcilia y sincroniza oportunidades de sucursales a la cuenta de Marca con prioridades de busqueda.")
|
|
parser.add_argument("--dry-run", action="store_true", help="Simula los cambios sin realizar peticiones de escritura a GHL.")
|
|
parser.add_argument("--location", help="ID de ubicacion especifica para procesar (por defecto todas las sucursales)")
|
|
parser.add_argument("--run-id", help="ID de auditoria para la ejecucion del script")
|
|
parser.add_argument("--updates-only", action="store_true", help="Solo realiza actualizaciones (PUT) de oportunidades existentes, ignorando la creación de contactos y de nuevas oportunidades.")
|
|
parser.add_argument("--no-contacts", action="store_true", help="Evita crear nuevos contactos en la Marca Principal.")
|
|
parser.add_argument("--no-creations", action="store_true", help="Evita crear nuevas oportunidades (POST) en la Marca Principal.")
|
|
parser.add_argument("--no-updates", action="store_true", help="Evita realizar actualizaciones (PUT) de oportunidades existentes.")
|
|
args = parser.parse_args()
|
|
|
|
# Resolver selección de acciones permitidas
|
|
only_updates = args.updates_only
|
|
allow_contacts = not (args.no_contacts or only_updates)
|
|
allow_creations = not (args.no_creations or only_updates)
|
|
allow_updates = not args.no_updates
|
|
|
|
# Registrar el inicio en la auditorÃa si hay run_id
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "running")
|
|
|
|
safe_print("\n" + "=" * 90)
|
|
safe_print("RECONCILIACION Y SINCRONIZACION DE OPORTUNIDADES: SUCURSALES -> MARCA")
|
|
safe_print("=" * 90)
|
|
if args.dry_run:
|
|
safe_print("MODO SIMULACION (DRY-RUN) - No se realizaran escrituras en GHL\n")
|
|
|
|
if not os.path.exists(DB_PATH):
|
|
safe_print(f"Error: La base de datos local no existe en {DB_PATH}.")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", error_message="SQLite database missing")
|
|
sys.exit(1)
|
|
|
|
# 1. Cargar catálogo de cuentas y tokens desde Bucéfalo
|
|
try:
|
|
accounts = sync_engine.parse_accounts_csv()
|
|
brand_acc = next((a for a in accounts if a["location_id"] == BRAND_LOCATION_ID), None)
|
|
if not brand_acc:
|
|
raise Exception("No se encontro la cuenta de Marca Principal en el CSV de cuentas.")
|
|
brand_token = brand_acc["token"]
|
|
except Exception as e:
|
|
safe_print(f"Error crÃtico al cargar tokens: {e}")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", error_message=str(e))
|
|
sys.exit(1)
|
|
|
|
# 2. Cargar verificador CSV (fuente autoritativa de Sucursal/TIENDA por location_id)
|
|
try:
|
|
verifier_map = load_verifier_map()
|
|
safe_print(f" Verificador CSV cargado: {len(verifier_map)} sucursales con Sucursal/TIENDA canónicos.")
|
|
except Exception as e:
|
|
safe_print(f"Error crÃtico al cargar verificador CSV: {e}")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", error_message=str(e))
|
|
sys.exit(1)
|
|
|
|
# 2b. Reportar sucursales activas (no demo) del CSV de mesa de control que NO están
|
|
# en el verificador. Estas opps no recibirán corrección automática de Sucursal/TIENDA.
|
|
branch_accounts_all = [
|
|
a for a in accounts
|
|
if a["location_id"] != BRAND_LOCATION_ID and "demo" not in a["nombre"].lower()
|
|
]
|
|
missing_in_verifier = [a for a in branch_accounts_all if a["location_id"] not in verifier_map]
|
|
if missing_in_verifier:
|
|
safe_print(f" [ADVERTENCIA] {len(missing_in_verifier)} sucursales del CSV de mesa de control NO están en el verificador:")
|
|
for acc in missing_in_verifier:
|
|
safe_print(f" - {acc['location_id']}: {acc['nombre']}")
|
|
_record_error(
|
|
"sucursal_sin_entrada_en_verificador",
|
|
f"Sucursal '{acc['nombre']}' ({acc['location_id']}) no está en el verificador CSV; "
|
|
f"sus opps no recibirán corrección automática de Sucursal/TIENDA.",
|
|
{"location_id": acc["location_id"], "nombre": acc["nombre"]},
|
|
)
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
try:
|
|
# Inicializar estadÃsticas requeridas por la documentación de Monte Providencia
|
|
stats = {
|
|
"locations_processed": [],
|
|
"pages_queried": 0,
|
|
"records_received": 0,
|
|
"records_unique_processed": 0,
|
|
"duplicates_discarded": 0,
|
|
"errors": {}
|
|
}
|
|
|
|
# Cargar esquemas dinámicos para la Marca Principal
|
|
safe_print(" Obteniendo esquemas dinámicos de Marca Principal...")
|
|
brand_contact_schema = sync_engine.ghl_client.get_object_schema(brand_token, BRAND_LOCATION_ID, "contact")
|
|
brand_opp_schema = sync_engine.ghl_client.get_object_schema(brand_token, BRAND_LOCATION_ID, "opportunity")
|
|
|
|
# Cargar contactos de Marca en SQLite
|
|
brand_contacts = conn.execute("SELECT * FROM contacts WHERE location_id = ?", (BRAND_LOCATION_ID,)).fetchall()
|
|
|
|
# Cargar pipelines de Marca de SQLite para el mapeo dinámico de etapas
|
|
brand_pipelines_rows = conn.execute("SELECT * FROM pipelines WHERE location_id = ?", (BRAND_LOCATION_ID,)).fetchall()
|
|
brand_pipelines = []
|
|
for r in brand_pipelines_rows:
|
|
stages = []
|
|
try:
|
|
stages = json.loads(r["stages_json"])
|
|
stages = sorted(stages, key=lambda x: x.get("order", 0))
|
|
except Exception:
|
|
pass
|
|
brand_pipelines.append({
|
|
"id": r["id"],
|
|
"name": r["name"],
|
|
"stages": stages
|
|
})
|
|
|
|
# Cache invertido de schema de oportunidades de Marca (cf_id -> cf_name).
|
|
# Antes se reconstruÃa 4 veces por oportunidad dentro del loop principal.
|
|
brand_opp_name_by_id = {v: k for k, v in brand_opp_schema.items()}
|
|
|
|
# Cargar oportunidades vivas de Marca de GHL para tener custom fields
|
|
safe_print(f" Obteniendo oportunidades vivas de Marca Principal...")
|
|
stats["locations_processed"].append(f"Marca Principal ({BRAND_LOCATION_ID})")
|
|
try:
|
|
live_brand_opps, b_pages, b_raw, b_dups, b_errs = fetch_opportunities_paginated(
|
|
brand_token, BRAND_LOCATION_ID
|
|
)
|
|
stats["pages_queried"] += b_pages
|
|
stats["records_received"] += b_raw
|
|
stats["duplicates_discarded"] += b_dups
|
|
if b_errs:
|
|
stats["errors"][BRAND_LOCATION_ID] = b_errs
|
|
|
|
brand_opps_rows = []
|
|
for lo in live_brand_opps:
|
|
norm_opp = dict(lo)
|
|
norm_opp["pipeline_id"] = lo.get("pipelineId")
|
|
norm_opp["pipeline_stage_id"] = lo.get("pipelineStageId")
|
|
norm_opp["monetary_value"] = lo.get("monetaryValue")
|
|
norm_opp["contact_id"] = lo.get("contactId")
|
|
norm_opp["date_added"] = lo.get("createdAt") or lo.get("date_added")
|
|
brand_opps_rows.append(norm_opp)
|
|
stats["records_unique_processed"] += len(brand_opps_rows)
|
|
except Exception as e:
|
|
safe_print(f" [WARN] No se pudieron obtener las oportunidades vivas de la Marca GHL, usando base de datos local: {e}")
|
|
_record_error("ghl_get_brand_opportunities_failed", str(e), {"location_id": BRAND_LOCATION_ID}, exc=e)
|
|
stats["errors"][BRAND_LOCATION_ID] = [str(e)]
|
|
sqlite_opps = conn.execute("SELECT * FROM opportunities WHERE location_id = ?", (BRAND_LOCATION_ID,)).fetchall()
|
|
brand_opps_rows = [dict(row) for row in sqlite_opps]
|
|
stats["records_unique_processed"] += len(brand_opps_rows)
|
|
|
|
# Pre-computar custom fields por nombre para CADA brand opportunity (cache por bo id).
|
|
brand_opp_cf_cache = {}
|
|
for bo in brand_opps_rows:
|
|
brand_opp_cf_cache[bo.get("id")] = custom_fields_by_name(bo, brand_opp_name_by_id)
|
|
|
|
# Indexar contactos de la Marca Principal con prioridad estricta:
|
|
# 1. Teléfono (últimos 10 dÃgitos), 2. Email, 3. Nombre Completo Normalizado (sin acentos ni caracteres corruptos)
|
|
brand_contacts_by_phone = {}
|
|
brand_contacts_by_email = {}
|
|
brand_contacts_by_name = {}
|
|
for bc in brand_contacts:
|
|
phone_10d = get_clean_phone_10d(bc["phone"])
|
|
email = (bc["email"] or "").strip().lower()
|
|
name_norm = remove_accents(f"{bc['first_name'] or ''} {bc['last_name'] or ''}")
|
|
|
|
if phone_10d:
|
|
brand_contacts_by_phone[phone_10d] = dict(bc)
|
|
if email:
|
|
brand_contacts_by_email[email] = dict(bc)
|
|
if name_norm:
|
|
brand_contacts_by_name[name_norm] = dict(bc)
|
|
|
|
# Indexar oportunidades de la Marca por contact_id
|
|
brand_opps_by_contact_id = {}
|
|
for bo in brand_opps_rows:
|
|
contact_key = bo.get("contact_id") or bo.get("contactId")
|
|
brand_opps_by_contact_id.setdefault(contact_key, []).append(dict(bo))
|
|
used_brand_opp_ids_by_contact = {}
|
|
|
|
# Determinar qué sucursales procesar
|
|
if args.location:
|
|
branch_accounts = [a for a in accounts if a["location_id"] == args.location and a["location_id"] != BRAND_LOCATION_ID]
|
|
if not branch_accounts:
|
|
safe_print(f"Error: La sucursal {args.location} no existe en el catalogo.")
|
|
sys.exit(1)
|
|
else:
|
|
branch_accounts = [a for a in accounts if a["location_id"] != BRAND_LOCATION_ID and "demo" not in a["nombre"].lower()]
|
|
|
|
safe_print(f"Iniciando reconciliacion para {len(branch_accounts)} sucursal(es)...")
|
|
safe_print("-" * 90)
|
|
|
|
total_contacts_created = 0
|
|
total_opps_created = 0
|
|
total_opps_updated = 0
|
|
|
|
for b_acc in branch_accounts:
|
|
loc_id = b_acc["location_id"]
|
|
b_name = b_acc["nombre"]
|
|
branch_token = b_acc["token"]
|
|
|
|
# Respetar cancelación / pausa del dashboard antes de procesar cada sucursal.
|
|
if args.run_id and not script_audit.wait_if_paused_or_stopped(args.run_id):
|
|
safe_print(f"[CANCELADO] Ejecución detenida por el usuario antes de procesar {b_name}.")
|
|
break
|
|
|
|
safe_print(f"Procesando sucursal: '{b_name}' ({loc_id})...")
|
|
|
|
# Cargar esquemas dinámicos para la Sucursal (GET /objects/)
|
|
safe_print(f" Obteniendo esquemas dinámicos de Sucursal '{b_name}'...")
|
|
try:
|
|
branch_contact_schema = sync_engine.ghl_client.get_object_schema(branch_token, loc_id, "contact")
|
|
branch_opp_schema = sync_engine.ghl_client.get_object_schema(branch_token, loc_id, "opportunity")
|
|
except Exception as e:
|
|
_record_error("ghl_get_object_schema_failed", f"Fallo obteniendo schemas en {b_name}: {e}", {"location_id": loc_id}, exc=e)
|
|
stats["errors"].setdefault(loc_id, []).append(f"Schema fetch falló: {e}")
|
|
continue
|
|
branch_contact_id_to_name = {v: k for k, v in branch_contact_schema.items()}
|
|
branch_opp_id_to_name = {v: k for k, v in branch_opp_schema.items()}
|
|
|
|
# Cargar contactos de esta sucursal en SQLite
|
|
branch_contacts = conn.execute("SELECT * FROM contacts WHERE location_id = ?", (loc_id,)).fetchall()
|
|
branch_contacts_by_id = { bc["id"]: dict(bc) for bc in branch_contacts }
|
|
|
|
# Cargar oportunidades vivas de GHL para tener customFields
|
|
safe_print(f" Obteniendo oportunidades vivas de GHL...")
|
|
stats["locations_processed"].append(f"{b_name} ({loc_id})")
|
|
try:
|
|
live_branch_opps, b_pages, b_raw, b_dups, b_errs = fetch_opportunities_paginated(
|
|
branch_token, loc_id
|
|
)
|
|
stats["pages_queried"] += b_pages
|
|
stats["records_received"] += b_raw
|
|
stats["duplicates_discarded"] += b_dups
|
|
if b_errs:
|
|
stats["errors"][loc_id] = b_errs
|
|
|
|
branch_opps = []
|
|
for lo in live_branch_opps:
|
|
norm_opp = dict(lo)
|
|
norm_opp["pipeline_id"] = lo.get("pipelineId")
|
|
norm_opp["pipeline_stage_id"] = lo.get("pipelineStageId")
|
|
norm_opp["monetary_value"] = lo.get("monetaryValue")
|
|
norm_opp["contact_id"] = lo.get("contactId")
|
|
norm_opp["date_added"] = lo.get("createdAt") or lo.get("date_added")
|
|
branch_opps.append(norm_opp)
|
|
stats["records_unique_processed"] += len(branch_opps)
|
|
except Exception as e:
|
|
safe_print(f" [WARN] No se pudieron obtener las oportunidades vivas de GHL, usando base de datos local: {e}")
|
|
_record_error("ghl_get_branch_opportunities_failed", str(e), {"location_id": loc_id}, exc=e)
|
|
stats["errors"][loc_id] = [str(e)]
|
|
sqlite_opps = conn.execute("SELECT * FROM opportunities WHERE location_id = ?", (loc_id,)).fetchall()
|
|
branch_opps = [dict(row) for row in sqlite_opps]
|
|
stats["records_unique_processed"] += len(branch_opps)
|
|
|
|
if not branch_opps:
|
|
safe_print(" (No hay oportunidades para esta sucursal)")
|
|
continue
|
|
|
|
safe_print(f" Analizando {len(branch_opps)} oportunidades de sucursal...")
|
|
branch_opp_count_by_contact_id = {}
|
|
for branch_opp in branch_opps:
|
|
branch_opp_count_by_contact_id[branch_opp.get("contact_id")] = branch_opp_count_by_contact_id.get(branch_opp.get("contact_id"), 0) + 1
|
|
|
|
for so in branch_opps:
|
|
contact_id = so["contact_id"]
|
|
sc = branch_contacts_by_id.get(contact_id)
|
|
if not sc:
|
|
continue # Oportunidad huérfana de contacto en la sucursal misma
|
|
|
|
client_name = f"{sc['first_name'] or ''} {sc['last_name'] or ''}".strip() or "Sin nombre"
|
|
client_phone = sc["phone"] or ""
|
|
client_email = sc["email"] or ""
|
|
|
|
# Extraer campos personalizados del contacto de la sucursal por nombre
|
|
# (necesario antes de la creación para incluirlos en el POST)
|
|
sc_custom_fields_by_name = {}
|
|
if sc and sc.get("custom_fields_json"):
|
|
try:
|
|
cf_list = json.loads(sc["custom_fields_json"])
|
|
for cf in cf_list:
|
|
cf_id = cf.get("id")
|
|
cf_val = cf.get("value")
|
|
cf_name = branch_contact_id_to_name.get(cf_id)
|
|
if cf_name and cf_val is not None:
|
|
sc_custom_fields_by_name[cf_name] = cf_val
|
|
except Exception:
|
|
pass
|
|
|
|
# 1. Encontrar o crear el contacto en Marca Principal siguiendo la prioridad estricta:
|
|
# Prioridad 1: Teléfono (últimos 10 dÃgitos)
|
|
# Prioridad 2: Email
|
|
# Prioridad 3: Nombre Completo Normalizado (sin acentos ni caracteres corruptos)
|
|
norm_p_10d = get_clean_phone_10d(client_phone)
|
|
norm_client_name = remove_accents(client_name)
|
|
bc = None
|
|
match_method = ""
|
|
|
|
if norm_p_10d and norm_p_10d in brand_contacts_by_phone:
|
|
bc = brand_contacts_by_phone[norm_p_10d]
|
|
match_method = "TELEFONO"
|
|
elif client_email and client_email.lower() in brand_contacts_by_email:
|
|
bc = brand_contacts_by_email[client_email.lower()]
|
|
match_method = "EMAIL"
|
|
elif norm_client_name and norm_client_name in brand_contacts_by_name:
|
|
bc = brand_contacts_by_name[norm_client_name]
|
|
match_method = "NOMBRE"
|
|
|
|
bc_id = None
|
|
if not bc:
|
|
# El contacto no existe en Marca, hay que crearlo! (si está permitido)
|
|
if not allow_contacts:
|
|
safe_print(f" [SALTADO] Se omitió la creación de contacto central para: '{client_name}' en Marca Principal (Acción no permitida).")
|
|
continue
|
|
|
|
if args.dry_run:
|
|
safe_print(f" [SIMULACION] Crearia contacto central para: '{client_name}' en Marca Principal (No se encontro por Telefono, Email ni Nombre).")
|
|
bc_id = "SIMULATED_BRAND_CONTACT_ID"
|
|
total_contacts_created += 1
|
|
else:
|
|
safe_print(f" Creando contacto central para: '{client_name}' en Marca Principal...")
|
|
# Mapear custom fields del contacto de sucursal a los IDs del schema de Marca
|
|
contact_custom_fields = []
|
|
for f_name, f_val in sc_custom_fields_by_name.items():
|
|
brand_cf_id = brand_contact_schema.get(f_name)
|
|
if brand_cf_id and f_val is not None and str(f_val).strip():
|
|
contact_custom_fields.append({"id": brand_cf_id, "value": f_val})
|
|
contact_data = {
|
|
"locationId": BRAND_LOCATION_ID,
|
|
"firstName": sc["first_name"],
|
|
"lastName": sc["last_name"],
|
|
"email": sc["email"],
|
|
"phone": sc["phone"]
|
|
}
|
|
if contact_custom_fields:
|
|
contact_data["customFields"] = contact_custom_fields
|
|
try:
|
|
res = sync_engine.ghl_client.create_contact(brand_token, contact_data)
|
|
bc_id = res.get("contact", {}).get("id")
|
|
if not bc_id:
|
|
raise Exception("Respuesta de GHL sin id de contacto")
|
|
total_contacts_created += 1
|
|
if args.run_id:
|
|
cid = script_audit.record_change(
|
|
args.run_id, BRAND_LOCATION_ID, "contact", bc_id,
|
|
"", "created", None,
|
|
{"phone": sc["phone"], "email": sc["email"], "name": client_name},
|
|
)
|
|
if cid:
|
|
script_audit.mark_change(cid, "applied")
|
|
# Agregar temporalmente a nuestro indice en memoria
|
|
new_contact = {"id": bc_id, "first_name": sc["first_name"], "last_name": sc["last_name"], "email": sc["email"], "phone": sc["phone"]}
|
|
if norm_p_10d: brand_contacts_by_phone[norm_p_10d] = new_contact
|
|
if client_email: brand_contacts_by_email[client_email.lower()] = new_contact
|
|
if norm_client_name: brand_contacts_by_name[norm_client_name] = new_contact
|
|
except Exception as err:
|
|
err_entry = _record_error(
|
|
"create_brand_contact_failed",
|
|
f"[{b_name}] Fallo crear contacto '{client_name}' en Marca: {err}",
|
|
{"location_id": loc_id, "branch_contact_id": contact_id, "client_name": client_name},
|
|
exc=err,
|
|
)
|
|
safe_print(f" * Error creando contacto '{client_name}' en Marca: {err} (error_id={err_entry.get('error_id')})")
|
|
continue
|
|
else:
|
|
bc_id = bc["id"]
|
|
|
|
if not bc_id:
|
|
continue
|
|
|
|
# --- MAPEO DINÃMICO DE CAMPOS PERSONALIZADOS ---
|
|
# Extraer campos personalizados de la oportunidad de la sucursal por nombre
|
|
so_custom_fields_by_name = custom_fields_by_name(so, branch_opp_id_to_name)
|
|
# sc_custom_fields_by_name ya fue extraÃdo antes de la creación del contacto
|
|
|
|
branch_match_values = [
|
|
b_name,
|
|
loc_id,
|
|
so_custom_fields_by_name.get("Sucursal"),
|
|
so_custom_fields_by_name.get("TIENDA") or so_custom_fields_by_name.get("Tienda"),
|
|
sc_custom_fields_by_name.get("Sucursal"),
|
|
sc_custom_fields_by_name.get("TIENDA") or sc_custom_fields_by_name.get("Tienda"),
|
|
]
|
|
|
|
# 2. Buscar si la oportunidad ya existe en la Marca Principal bajo este contacto
|
|
brand_opps_for_contact = brand_opps_by_contact_id.get(bc_id, [])
|
|
used_brand_opp_ids = used_brand_opp_ids_by_contact.setdefault(bc_id, set())
|
|
available_brand_opps_for_contact = [
|
|
bo for bo in brand_opps_for_contact
|
|
if bo.get("id") not in used_brand_opp_ids
|
|
]
|
|
|
|
def _bo_cf(bo):
|
|
bo_id = bo.get("id")
|
|
cached = brand_opp_cf_cache.get(bo_id)
|
|
if cached is None:
|
|
cached = custom_fields_by_name(bo, brand_opp_name_by_id)
|
|
brand_opp_cf_cache[bo_id] = cached
|
|
return cached
|
|
|
|
matched_bo = None
|
|
# Estrategia de conciliación robusta multicapa:
|
|
# Capa 1. Mismo ID de oportunidad en GHL
|
|
for bo in available_brand_opps_for_contact:
|
|
if bo.get("id") == so.get("id"):
|
|
matched_bo = bo
|
|
break
|
|
|
|
# Capa 2. Coincidencia por Nombre Exacto de oportunidad, desempatada por sucursal/fecha/valor
|
|
if not matched_bo:
|
|
exact_name_matches = [
|
|
bo for bo in available_brand_opps_for_contact
|
|
if normalize_match_text(bo.get("name")) and normalize_match_text(bo.get("name")) == normalize_match_text(so.get("name"))
|
|
]
|
|
if exact_name_matches:
|
|
matched_bo = max(
|
|
exact_name_matches,
|
|
key=lambda bo: calculate_opportunity_similarity(so, bo, _bo_cf(bo), branch_match_values)
|
|
)
|
|
|
|
# Capa 2.5. Única oportunidad, solo si también hay una sola oportunidad en la sucursal para ese contacto.
|
|
if (
|
|
not matched_bo
|
|
and len(available_brand_opps_for_contact) == 1
|
|
and branch_opp_count_by_contact_id.get(contact_id, 0) == 1
|
|
):
|
|
candidate_bo = available_brand_opps_for_contact[0]
|
|
candidate_score = calculate_opportunity_similarity(
|
|
so, candidate_bo, _bo_cf(candidate_bo), branch_match_values
|
|
)
|
|
if candidate_score >= 25:
|
|
matched_bo = candidate_bo
|
|
|
|
# Capa 3. Proximidad de creación (rango temporal de 1 hora)
|
|
if not matched_bo:
|
|
for bo in available_brand_opps_for_contact:
|
|
if are_dates_within_one_hour(so["date_added"], bo["date_added"]):
|
|
matched_bo = bo
|
|
break
|
|
|
|
# Capa 4. Emparejamiento HeurÃstico / Similitud (Similitud >= 55%)
|
|
if not matched_bo:
|
|
best_fuzzy_bo = None
|
|
best_fuzzy_score = 0
|
|
for bo in available_brand_opps_for_contact:
|
|
score = calculate_opportunity_similarity(so, bo, _bo_cf(bo), branch_match_values)
|
|
if score > best_fuzzy_score:
|
|
best_fuzzy_score = score
|
|
best_fuzzy_bo = bo
|
|
if best_fuzzy_bo and best_fuzzy_score >= 55:
|
|
matched_bo = best_fuzzy_bo
|
|
|
|
# --- MAPEO DINÃMICO DE PIPELINE Y STAGE DE SUCURSAL A MARCA ---
|
|
mapped_brand_pid, mapped_brand_sid, stage_warning = map_stage_branch_to_brand(
|
|
conn, loc_id, so["pipeline_id"], so["pipeline_stage_id"], brand_pipelines
|
|
)
|
|
if not mapped_brand_pid:
|
|
mapped_brand_pid = so["pipeline_id"]
|
|
if not mapped_brand_sid:
|
|
mapped_brand_sid = so["pipeline_stage_id"]
|
|
if stage_warning:
|
|
_record_error(
|
|
"stage_mapping_warning",
|
|
f"[{b_name}] opp '{so.get('name')}' ({so.get('id')}): {stage_warning}",
|
|
{"location_id": loc_id, "opportunity_id": so.get("id"), "branch_stage_id": so.get("pipeline_stage_id")},
|
|
)
|
|
|
|
# Mapear los campos de la oportunidad para la Marca Principal con reglas y fallbacks de Monte Providencia
|
|
brand_opp_custom_values = {}
|
|
|
|
# CAPA BASE - mapeo dinamico de TODOS los custom fields de la opp de
|
|
# sucursal cuyo nombre NO tenga logica dedicada abajo. Garantiza que
|
|
# campos sin tratamiento especial (incluido "ID Oportunidad Sucursal"
|
|
# y cualquier campo nuevo) se propaguen sucursal -> Marca, en vez de
|
|
# quedarse solo con los 9 campos historicos. Los campos especiales se
|
|
# excluyen aqui y los calcula/valida la logica que sigue (override).
|
|
for _src_fname, _src_fval in so_custom_fields_by_name.items():
|
|
if _norm_field_name(_src_fname) in _SPECIAL_OVERRIDE_FIELDS_NORM:
|
|
continue
|
|
if _src_fval is not None and str(_src_fval).strip() != "":
|
|
brand_opp_custom_values[_src_fname] = _src_fval
|
|
|
|
# 1. Copiar Canal de Origen con fallbacks (p. ej. si está vacÃo, heredar del contacto)
|
|
canal = so_custom_fields_by_name.get("CANAL DE ORIGEN") or so_custom_fields_by_name.get("Canal de Origen")
|
|
if not canal:
|
|
canal = sc_custom_fields_by_name.get("CANAL DE ORIGEN") or sc_custom_fields_by_name.get("Canal de Origen")
|
|
if not canal:
|
|
contact_tags = [t.strip().lower() for t in (sc.get("tags") or "").split(",") if t.strip()]
|
|
if "sucursal" in contact_tags:
|
|
canal = "SUCURSAL"
|
|
elif sc.get("source"):
|
|
sc_source = str(sc.get("source") or "").upper()
|
|
if any(kw in sc_source for kw in ["FACEBOOK", "FB", "INSTAGRAM", "IG", "WHATSAPP", "WA", "FORM"]):
|
|
canal = sc_source
|
|
else:
|
|
canal = "SUCURSAL"
|
|
else:
|
|
canal = "SUCURSAL"
|
|
|
|
if canal:
|
|
brand_opp_custom_values["CANAL DE ORIGEN"] = canal
|
|
brand_opp_custom_values["Canal de Origen"] = canal
|
|
|
|
# 2. Copiar Fuente de Prospecto con validación y fallbacks
|
|
fuente = so_custom_fields_by_name.get("Fuente de Prospecto")
|
|
if not fuente:
|
|
fuente = sc_custom_fields_by_name.get("Fuente de Prospecto")
|
|
|
|
# Regla de Validación: SUCURSAL no puede tener fuente LEAD DIGITAL
|
|
if canal == "SUCURSAL" and fuente == "LEAD DIGITAL":
|
|
fuente = "SUCURSAL"
|
|
elif not fuente:
|
|
if canal in ["FORMULARIO", "FACEBOOK", "WHATSAPP", "INSTAGRAM", "LEAD DIGITAL"]:
|
|
fuente = "LEAD DIGITAL"
|
|
else:
|
|
fuente = "SUCURSAL"
|
|
|
|
if fuente:
|
|
brand_opp_custom_values["Fuente de Prospecto"] = fuente
|
|
|
|
# 3 + 4. Sucursal y TIENDA desde el VERIFICADOR CSV (fuente autoritativa).
|
|
# Doc MP: "Sucursal/TIENDA debe coincidir con verificador de sucursales".
|
|
# La opp pertenece a la sucursal `loc_id`, asà que se toma el valor del
|
|
# verificador para esa location, no lo que digan los campos de origen.
|
|
verifier_entry = verifier_map.get(loc_id)
|
|
if verifier_entry:
|
|
# Detectar caso "cross-sucursal": la opp tiene un Sucursal o TIENDA
|
|
# poblado que NO coincide con la sucursal que la aloja. Puede ser
|
|
# cliente transferido entre sucursales o dato sucio — vale la pena
|
|
# marcarlo para revisión manual.
|
|
existing_sucursal = (so_custom_fields_by_name.get("Sucursal") or "").strip()
|
|
existing_tienda = (
|
|
so_custom_fields_by_name.get("TIENDA")
|
|
or so_custom_fields_by_name.get("Tienda")
|
|
or ""
|
|
).strip()
|
|
target_sucursal = verifier_entry["sucursal"]
|
|
target_tienda = verifier_entry["tienda"]
|
|
|
|
sucursal_mismatch = (
|
|
existing_sucursal
|
|
and existing_sucursal != target_sucursal
|
|
and existing_sucursal != target_tienda
|
|
)
|
|
tienda_mismatch = (
|
|
existing_tienda
|
|
and existing_tienda != target_tienda
|
|
and existing_tienda != target_sucursal
|
|
)
|
|
if sucursal_mismatch or tienda_mismatch:
|
|
_record_error(
|
|
"cross_sucursal",
|
|
f"[{b_name}] opp '{so.get('name')}' tiene Sucursal='{existing_sucursal}' "
|
|
f"/ TIENDA='{existing_tienda}' pero está alojada en location {loc_id} "
|
|
f"(verificador → '{target_sucursal}' / '{target_tienda}'). Se sobrescribe.",
|
|
{
|
|
"location_id": loc_id,
|
|
"opportunity_id": so.get("id"),
|
|
"existing_sucursal": existing_sucursal,
|
|
"existing_tienda": existing_tienda,
|
|
"target_sucursal": target_sucursal,
|
|
"target_tienda": target_tienda,
|
|
},
|
|
)
|
|
|
|
brand_opp_custom_values["Sucursal"] = target_sucursal
|
|
brand_opp_custom_values["TIENDA"] = target_tienda
|
|
brand_opp_custom_values["Tienda"] = target_tienda
|
|
else:
|
|
_record_error(
|
|
"location_sin_verificador",
|
|
f"[{b_name}] location_id '{loc_id}' no está en el verificador CSV; no se escribe Sucursal/TIENDA.",
|
|
{"location_id": loc_id, "opportunity_id": so.get("id")},
|
|
)
|
|
|
|
# 5. Calcular y mapear el VehÃculo (según Reglas de Monte Providencia)
|
|
# Reglas:
|
|
# - LEAD DIGITAL: construir desde Marca/Versión/Año del contacto, pero SOLO si
|
|
# hay al menos marca o versión (solo año es información inútil que
|
|
# sobrescribirÃa datos válidos en Marca).
|
|
# - Otros canales: respetar el valor de la opp de sucursal; si no existe,
|
|
# intentar el constructed_vehicle siempre que sea significativo.
|
|
marca_veh = sc_custom_fields_by_name.get("Marca del VehÃculo", "").strip()
|
|
vers_veh = sc_custom_fields_by_name.get("Versión del VehÃculo", "").strip()
|
|
ano_veh = sc_custom_fields_by_name.get("Año del VehÃculo", "").strip()
|
|
vehicle_meaningful = has_meaningful_vehicle_info(marca_veh, vers_veh, ano_veh)
|
|
constructed_vehicle = " ".join(filter(None, [marca_veh, vers_veh, ano_veh])).strip()
|
|
|
|
if fuente == "LEAD DIGITAL" and constructed_vehicle and vehicle_meaningful:
|
|
brand_opp_custom_values["VehÃculo"] = constructed_vehicle
|
|
else:
|
|
vehiculo_val = so_custom_fields_by_name.get("VehÃculo")
|
|
if not vehiculo_val and vehicle_meaningful:
|
|
vehiculo_val = constructed_vehicle
|
|
if vehiculo_val:
|
|
brand_opp_custom_values["VehÃculo"] = vehiculo_val
|
|
|
|
# 6. Modalidad de Empeño (doc MP: Sin Dejarlo (GPS) / Tradicional (Resguardo))
|
|
modalidad_raw = (
|
|
so_custom_fields_by_name.get("Modalidad de Empeño")
|
|
or so_custom_fields_by_name.get("MODALIDAD DE EMPEÑO")
|
|
or sc_custom_fields_by_name.get("Modalidad de Empeño")
|
|
or sc_custom_fields_by_name.get("MODALIDAD DE EMPEÑO")
|
|
or sc_custom_fields_by_name.get("¿Qué modalidad prefieres?")
|
|
)
|
|
modalidad_val = normalize_modalidad(modalidad_raw)
|
|
if modalidad_val:
|
|
brand_opp_custom_values["Modalidad de Empeño"] = modalidad_val
|
|
|
|
# 7. Persona que atendió al prospecto (copia directa desde sucursal)
|
|
persona_val = (
|
|
so_custom_fields_by_name.get("Persona que atendió al prospecto")
|
|
or so_custom_fields_by_name.get("Persona que atendio al prospecto")
|
|
)
|
|
if persona_val:
|
|
brand_opp_custom_values["Persona que atendió al prospecto"] = persona_val
|
|
|
|
# 8. Visita a sucursal (checkbox)
|
|
visita_val = (
|
|
so_custom_fields_by_name.get("Visita a sucursal")
|
|
or so_custom_fields_by_name.get("Visita a Sucursal")
|
|
)
|
|
if visita_val is not None and str(visita_val) != "":
|
|
brand_opp_custom_values["Visita a sucursal"] = visita_val
|
|
|
|
# 9. Fecha de última visita a sucursal
|
|
fecha_visita_val = (
|
|
so_custom_fields_by_name.get("Fecha de última visita a sucursal")
|
|
or so_custom_fields_by_name.get("Fecha de ultima visita a sucursal")
|
|
)
|
|
if fecha_visita_val:
|
|
brand_opp_custom_values["Fecha de última visita a sucursal"] = fecha_visita_val
|
|
|
|
# Convertir los valores al formato de GHL [ {"id": "brand_f_id", "value": "value"} ]
|
|
brand_custom_fields_list = []
|
|
for f_name, f_val in brand_opp_custom_values.items():
|
|
brand_f_id = brand_opp_schema.get(f_name)
|
|
if brand_f_id:
|
|
brand_custom_fields_list.append({
|
|
"id": brand_f_id,
|
|
"value": f_val
|
|
})
|
|
|
|
# 3. Acción de sincronización / actualización (PUT o POST)
|
|
if matched_bo:
|
|
used_brand_opp_ids.add(matched_bo.get("id"))
|
|
# Validar si tiene discrepancias para actualizar (PUT)
|
|
has_discrepancies = False
|
|
changes = []
|
|
|
|
if (so["name"] or "").strip().lower() != (matched_bo.get("name") or "").strip().lower():
|
|
has_discrepancies = True
|
|
changes.append(f"Nombre ('{matched_bo.get('name')}' -> '{so['name']}')")
|
|
if so["status"] != matched_bo.get("status"):
|
|
has_discrepancies = True
|
|
status_changed = True
|
|
changes.append(f"Estado ({matched_bo.get('status')} -> {so['status']})")
|
|
else:
|
|
status_changed = False
|
|
if abs((so["monetary_value"] or 0) - (matched_bo.get("monetaryValue") or matched_bo.get("monetary_value") or 0)) >= 0.01:
|
|
has_discrepancies = True
|
|
changes.append(f"Valor (${matched_bo.get('monetaryValue') or matched_bo.get('monetary_value') or 0:.2f} -> ${so['monetary_value']:.2f})")
|
|
if mapped_brand_sid != (matched_bo.get("pipelineStageId") or matched_bo.get("pipeline_stage_id")):
|
|
has_discrepancies = True
|
|
changes.append(f"Etapa ({matched_bo.get('pipelineStageId') or matched_bo.get('pipeline_stage_id')} -> {mapped_brand_sid})")
|
|
|
|
# Comparar campos personalizados de la Marca con los calculados (usa cache)
|
|
matched_bo_custom_fields_by_name = dict(_bo_cf(matched_bo))
|
|
|
|
for f_name, expected_val in brand_opp_custom_values.items():
|
|
current_val = matched_bo_custom_fields_by_name.get(f_name, "")
|
|
norm_curr = str(current_val).strip().lower()
|
|
norm_exp = str(expected_val).strip().lower()
|
|
if norm_curr != norm_exp:
|
|
has_discrepancies = True
|
|
changes.append(f"Campo '{f_name}' ('{current_val}' -> '{expected_val}')")
|
|
|
|
if has_discrepancies:
|
|
if not allow_updates:
|
|
safe_print(f" [SALTADO] Se omitió la actualización de la oportunidad '{matched_bo.get('name')}' (ID: {matched_bo.get('id')}) en Marca (Acción no permitida).")
|
|
continue
|
|
|
|
# Registrar cambio planeado en audit log (un row por campo cambiado)
|
|
change_ids = []
|
|
if args.run_id:
|
|
for f_name, expected_val in brand_opp_custom_values.items():
|
|
old_val = matched_bo_custom_fields_by_name.get(f_name)
|
|
if str(old_val or "").strip().lower() != str(expected_val or "").strip().lower():
|
|
cf_id = brand_opp_schema.get(f_name)
|
|
cid = script_audit.record_change(
|
|
args.run_id, BRAND_LOCATION_ID, "opportunity",
|
|
matched_bo.get("id"), cf_id or "", f_name, old_val, expected_val,
|
|
)
|
|
if cid:
|
|
change_ids.append((cid, f_name))
|
|
|
|
if args.dry_run:
|
|
safe_print(f" [SIMULACION] Actualizaria oportunidad '{matched_bo.get('name')}' (ID: {matched_bo.get('id')}) en Marca. Cambios: {', '.join(changes)}")
|
|
for cid, _f in change_ids:
|
|
script_audit.mark_change(cid, "planned")
|
|
total_opps_updated += 1
|
|
else:
|
|
safe_print(f" Actualizando oportunidad '{matched_bo.get('name')}' (ID: {matched_bo.get('id')}) en Marca para alinear con Sucursal...")
|
|
opp_data = {
|
|
"name": so["name"],
|
|
"monetaryValue": so["monetary_value"] or 0.0,
|
|
"pipelineId": mapped_brand_pid,
|
|
"pipelineStageId": mapped_brand_sid,
|
|
"customFields": brand_custom_fields_list
|
|
}
|
|
try:
|
|
apply_brand_opportunity_update(
|
|
brand_token,
|
|
matched_bo.get("id"),
|
|
opp_data,
|
|
(so["status"] or "open") if status_changed else None,
|
|
)
|
|
safe_print(f" [OK] Actualizada correctamente.")
|
|
total_opps_updated += 1
|
|
for cid, _f in change_ids:
|
|
script_audit.mark_change(cid, "applied")
|
|
matched_bo.update({
|
|
"name": so["name"],
|
|
"status": so["status"] or "open",
|
|
"monetaryValue": so["monetary_value"] or 0.0,
|
|
"monetary_value": so["monetary_value"] or 0.0,
|
|
"pipelineId": mapped_brand_pid,
|
|
"pipeline_id": mapped_brand_pid,
|
|
"pipelineStageId": mapped_brand_sid,
|
|
"pipeline_stage_id": mapped_brand_sid,
|
|
})
|
|
# Invalidar cache de cf para re-leer en próximas iteraciones
|
|
brand_opp_cf_cache[matched_bo.get("id")] = dict(brand_opp_custom_values)
|
|
except Exception as err:
|
|
err_entry = _record_error(
|
|
"update_opportunity_failed",
|
|
f"[{b_name}] Fallo actualizar opp '{matched_bo.get('name')}' (id={matched_bo.get('id')}): {err}",
|
|
{"location_id": loc_id, "brand_opportunity_id": matched_bo.get("id")},
|
|
exc=err,
|
|
)
|
|
safe_print(f" [ERROR] Fallo actualizar en GHL: {err} (error_id={err_entry.get('error_id')})")
|
|
for cid, _f in change_ids:
|
|
script_audit.mark_change(cid, "failed", str(err))
|
|
else:
|
|
# No existe en Marca, crearla! (POST) (si está permitido)
|
|
if not allow_creations:
|
|
safe_print(f" [SALTADO] Se omitió la creación de la oportunidad nueva '{so['name']}' en Marca Principal (Acción no permitida).")
|
|
continue
|
|
|
|
if args.dry_run:
|
|
safe_print(f" [SIMULACION] Crearia oportunidad nueva '{so['name']}' en Marca Principal bajo el contacto ID '{bc_id}'.")
|
|
if brand_custom_fields_list:
|
|
cf_details = [f"{f_name}: '{f['value']}'" for f_name, f_val in brand_opp_custom_values.items() for f in brand_custom_fields_list if f['id'] == brand_opp_schema.get(f_name)]
|
|
safe_print(f" [SIMULACION] Con Campos Personalizados: {', '.join(cf_details)}")
|
|
total_opps_created += 1
|
|
else:
|
|
safe_print(f" Creando oportunidad '{so['name']}' en Marca Principal...")
|
|
opp_data = {
|
|
"locationId": BRAND_LOCATION_ID,
|
|
"name": so["name"],
|
|
"status": so["status"] or "open",
|
|
"monetaryValue": so["monetary_value"] or 0.0,
|
|
"pipelineId": mapped_brand_pid,
|
|
"pipelineStageId": mapped_brand_sid,
|
|
"contactId": bc_id,
|
|
"customFields": brand_custom_fields_list
|
|
}
|
|
try:
|
|
res = sync_engine.ghl_client.create_opportunity(brand_token, opp_data)
|
|
new_opp_id = res.get("opportunity", {}).get("id")
|
|
target_status = so["status"] or "open"
|
|
if new_opp_id and target_status != "open":
|
|
sync_engine.ghl_client.update_opportunity_status(brand_token, new_opp_id, target_status)
|
|
safe_print(f" [OK] Creada con éxito (Nuevo ID en Marca GHL: {new_opp_id}).")
|
|
total_opps_created += 1
|
|
if new_opp_id and args.run_id:
|
|
cid = script_audit.record_change(
|
|
args.run_id, BRAND_LOCATION_ID, "opportunity", new_opp_id,
|
|
"", "created", None,
|
|
{"name": so["name"], "contact_id": bc_id, "source_branch": loc_id},
|
|
)
|
|
if cid:
|
|
script_audit.mark_change(cid, "applied")
|
|
if new_opp_id:
|
|
new_bo = {
|
|
"id": new_opp_id,
|
|
"name": so["name"],
|
|
"status": target_status,
|
|
"monetaryValue": so["monetary_value"] or 0.0,
|
|
"monetary_value": so["monetary_value"] or 0.0,
|
|
"pipelineId": mapped_brand_pid,
|
|
"pipeline_id": mapped_brand_pid,
|
|
"pipelineStageId": mapped_brand_sid,
|
|
"pipeline_stage_id": mapped_brand_sid,
|
|
"contactId": bc_id,
|
|
"contact_id": bc_id,
|
|
"date_added": so.get("date_added"),
|
|
}
|
|
brand_opps_by_contact_id.setdefault(bc_id, []).append(new_bo)
|
|
brand_opp_cf_cache[new_opp_id] = dict(brand_opp_custom_values)
|
|
except Exception as err:
|
|
if is_duplicate_opportunity_error(err):
|
|
safe_print(" [SALTADO] GHL rechazó la creación porque el contacto ya tiene una oportunidad en Marca.")
|
|
_record_error(
|
|
"create_opportunity_duplicate",
|
|
f"[{b_name}] Duplicado al crear opp '{so.get('name')}' (contact {bc_id})",
|
|
{"location_id": loc_id, "contact_id": bc_id, "opportunity_name": so.get("name")},
|
|
)
|
|
else:
|
|
err_entry = _record_error(
|
|
"create_opportunity_failed",
|
|
f"[{b_name}] Fallo crear opp '{so.get('name')}': {err}",
|
|
{"location_id": loc_id, "contact_id": bc_id, "opportunity_name": so.get("name")},
|
|
exc=err,
|
|
)
|
|
safe_print(f" [ERROR] Fallo crear en GHL: {err} (error_id={err_entry.get('error_id')})")
|
|
|
|
safe_print("-" * 90)
|
|
|
|
# Imprimir resumen de la ejecucion
|
|
safe_print("\n" + "=" * 90)
|
|
safe_print("=== RESUMEN DE RECONCILIACION Y SINCRONIZACION SUCURSAL -> MARCA ===")
|
|
safe_print("=" * 90)
|
|
if args.dry_run:
|
|
safe_print(f" [SIMULACION] Contactos Nuevos a Crear en Marca: {total_contacts_created}")
|
|
safe_print(f" [SIMULACION] Oportunidades Nuevas a Replicar: {total_opps_created}")
|
|
safe_print(f" [SIMULACION] Oportunidades a Actualizar (PUT): {total_opps_updated}")
|
|
else:
|
|
safe_print(f" Contactos Nuevos Creados en Marca: {total_contacts_created}")
|
|
safe_print(f" Oportunidades Nuevas Replicadas: {total_opps_created}")
|
|
safe_print(f" Oportunidades Actualizadas (PUT): {total_opps_updated}")
|
|
|
|
safe_print("-" * 90)
|
|
safe_print("--- MÉTRICAS DE ESCANEO Y AUDITORÃA DE GHL ---")
|
|
safe_print(f" Locations procesadas: {len(stats['locations_processed'])}")
|
|
for loc in stats['locations_processed']:
|
|
safe_print(f" - {loc}")
|
|
safe_print(f" Páginas consultadas (GHL): {stats['pages_queried']}")
|
|
safe_print(f" Registros recibidos desde GHL: {stats['records_received']}")
|
|
safe_print(f" Registros únicos procesados: {stats['records_unique_processed']}")
|
|
safe_print(f" Duplicados descartados: {stats['duplicates_discarded']}")
|
|
|
|
if stats['errors']:
|
|
safe_print(f" Errores por location o endpoint:")
|
|
for loc_err, err_list in stats['errors'].items():
|
|
safe_print(f" - Location {loc_err}:")
|
|
for err in err_list:
|
|
safe_print(f" * {err}")
|
|
else:
|
|
safe_print(f" Errores por location o endpoint: Ninguno")
|
|
safe_print("=" * 90)
|
|
|
|
# --- RESUMEN DE ERRORES ACUMULADOS DURANTE LA EJECUCIÓN ---
|
|
safe_print("\n" + "=" * 90)
|
|
safe_print("=== ERRORES Y ADVERTENCIAS ACUMULADOS DURANTE LA EJECUCIÓN ===")
|
|
safe_print("=" * 90)
|
|
if not EXECUTION_ERRORS:
|
|
safe_print(" (Sin errores ni advertencias)")
|
|
else:
|
|
by_cat = {}
|
|
for e in EXECUTION_ERRORS:
|
|
by_cat.setdefault(e["category"], []).append(e)
|
|
safe_print(f" Total: {len(EXECUTION_ERRORS)} entradas en {len(by_cat)} categorÃas.\n")
|
|
for cat, items in sorted(by_cat.items(), key=lambda kv: -len(kv[1])):
|
|
safe_print(f" [{cat}] {len(items)} entrada(s):")
|
|
for it in items[:50]:
|
|
err_id_str = f" (error_id={it['error_id']})" if it.get("error_id") else ""
|
|
safe_print(f" - {it['message']}{err_id_str}")
|
|
if len(items) > 50:
|
|
safe_print(f" ... y {len(items) - 50} más (truncadas).")
|
|
safe_print("")
|
|
safe_print("=" * 90 + "\n")
|
|
|
|
# Registrar éxito en la auditorÃa si hay run_id
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "success")
|
|
|
|
except Exception as e:
|
|
safe_print(f"Error critico en la ejecucion del script de reconciliacion: {e}")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", error_message=str(e))
|
|
sys.exit(1)
|
|
finally:
|
|
conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
main()
|