489 lines
19 KiB
Python
489 lines
19 KiB
Python
import os
|
|
import csv
|
|
import pandas as pd
|
|
import concurrent.futures
|
|
from datetime import datetime
|
|
from ghl_client import GHLClient
|
|
import db
|
|
import error_logging
|
|
|
|
CSV_FILENAME = "Bucéfalo - Mesa de control - API Tokens - MP.csv"
|
|
CSV_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), CSV_FILENAME)
|
|
# Default: sin tope artificial. El rate limit de GHL es por location, asi que
|
|
# paralelizar todas las cuentas no genera 429. El consumo de hilos/CPU es bajo
|
|
# porque casi todo el tiempo es I/O. La env var SYNC_ENGINE_MAX_WORKERS sigue
|
|
# disponible si en algun momento se quiere acotar a proposito.
|
|
DEFAULT_SYNC_MAX_WORKERS = 1000
|
|
|
|
ghl_client = GHLClient()
|
|
_location_name_cache = {}
|
|
|
|
|
|
# ============================================================================
|
|
# REFRESH HELPERS — single source of truth post-mutación
|
|
# ============================================================================
|
|
# Tras cualquier endpoint mutante (create/update/delete opp/contact) hay que
|
|
# garantizar que SQLite refleje la verdad de Bucéfalo. Estos helpers hacen
|
|
# fetch live del objeto recién tocado y lo upsertan localmente. Si falla, lo
|
|
# reportan en lugar de tragar el error.
|
|
|
|
def refresh_opportunity_in_db(token, opp_id, location_id):
|
|
"""GET /opportunities/{id} → save_single_opportunity. Devuelve dict con
|
|
{ok, opportunity, error}. NO levanta excepciones — el caller decide.
|
|
Si la opp ya no existe (404), borra la fila local."""
|
|
try:
|
|
res = ghl_client.get_opportunity(token, opp_id)
|
|
except Exception as exc:
|
|
# Si GHL devuelve 404, eliminar localmente para mantener consistencia.
|
|
status = None
|
|
response = getattr(exc, "response", None)
|
|
if response is not None:
|
|
try: status = response.status_code
|
|
except Exception: status = None
|
|
if status == 404:
|
|
try:
|
|
conn = db.get_db_connection()
|
|
try:
|
|
conn.execute("DELETE FROM opportunities WHERE id=? AND location_id=?",
|
|
(opp_id, location_id))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return {"ok": True, "deleted_locally": True, "opportunity": None}
|
|
except Exception as db_exc:
|
|
return {"ok": False, "error": f"GHL 404 + cleanup local fallo: {db_exc}"}
|
|
return {"ok": False, "error": f"get_opportunity fallo: {exc}"}
|
|
|
|
opp = (res or {}).get("opportunity") or res or {}
|
|
if not opp.get("id"):
|
|
return {"ok": False, "error": f"Respuesta GHL sin id: {res}"}
|
|
try:
|
|
db.save_single_opportunity(location_id, opp)
|
|
except Exception as exc:
|
|
return {"ok": False, "error": f"save_single_opportunity fallo: {exc}", "opportunity": opp}
|
|
return {"ok": True, "opportunity": opp}
|
|
|
|
|
|
def refresh_contact_in_db(token, contact_id, location_id):
|
|
"""GET /contacts/{id} → save_single_contact. Similar a refresh_opportunity_in_db.
|
|
Si el contacto fue eliminado en Bucéfalo (404), también borra opps en cascade."""
|
|
try:
|
|
res = ghl_client._request("GET", f"/contacts/{contact_id}", token)
|
|
except Exception as exc:
|
|
status = None
|
|
response = getattr(exc, "response", None)
|
|
if response is not None:
|
|
try: status = response.status_code
|
|
except Exception: status = None
|
|
if status == 404:
|
|
try:
|
|
conn = db.get_db_connection()
|
|
try:
|
|
conn.execute("DELETE FROM opportunities WHERE contact_id=? AND location_id=?",
|
|
(contact_id, location_id))
|
|
conn.execute("DELETE FROM contacts WHERE id=? AND location_id=?",
|
|
(contact_id, location_id))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
return {"ok": True, "deleted_locally": True, "contact": None}
|
|
except Exception as db_exc:
|
|
return {"ok": False, "error": f"GHL 404 + cleanup local fallo: {db_exc}"}
|
|
return {"ok": False, "error": f"get_contact fallo: {exc}"}
|
|
|
|
contact = (res or {}).get("contact") or res or {}
|
|
if not contact.get("id"):
|
|
return {"ok": False, "error": f"Respuesta GHL sin id: {res}"}
|
|
try:
|
|
db.save_single_contact(location_id, contact)
|
|
except Exception as exc:
|
|
return {"ok": False, "error": f"save_single_contact fallo: {exc}", "contact": contact}
|
|
return {"ok": True, "contact": contact}
|
|
|
|
|
|
def refresh_opps_for_contact(token, contact_id, location_id):
|
|
"""Sincroniza TODAS las opps del contacto en una location.
|
|
|
|
IMPORTANTE: GHL ignora el filtro contactId en POST /opportunities/search
|
|
(bug confirmado: devuelve todas las opps de la location). Por eso paginamos
|
|
todas las opps y filtramos en Python por contactId real.
|
|
|
|
Devuelve {ok, refreshed_count, opps_in_ghl, error}."""
|
|
matched = []
|
|
try:
|
|
page = 1
|
|
while True:
|
|
body = {"locationId": location_id, "limit": 100, "page": page}
|
|
res = ghl_client._request("POST", "/opportunities/search", token, json=body)
|
|
batch = res.get("opportunities", []) if isinstance(res, dict) else []
|
|
if not batch:
|
|
break
|
|
for opp in batch:
|
|
if opp.get("contactId") == contact_id:
|
|
matched.append(opp)
|
|
if len(batch) < 100:
|
|
break
|
|
page += 1
|
|
if page > 100:
|
|
break
|
|
except Exception as exc:
|
|
return {"ok": False, "error": f"search_opportunities fallo: {exc}", "refreshed_count": 0}
|
|
|
|
refreshed = 0
|
|
for opp in matched:
|
|
try:
|
|
db.save_single_opportunity(location_id, opp)
|
|
refreshed += 1
|
|
except Exception as exc:
|
|
error_logging.log_error("refresh_opps_for_contact_save_failed", exc, {
|
|
"opp_id": opp.get("id"), "location_id": location_id, "contact_id": contact_id,
|
|
})
|
|
return {"ok": True, "refreshed_count": refreshed, "opps_in_ghl": len(matched)}
|
|
|
|
|
|
def get_sync_max_workers():
|
|
raw_value = os.getenv("SYNC_ENGINE_MAX_WORKERS", str(DEFAULT_SYNC_MAX_WORKERS))
|
|
try:
|
|
workers = int(raw_value)
|
|
except (TypeError, ValueError):
|
|
workers = DEFAULT_SYNC_MAX_WORKERS
|
|
return max(1, workers)
|
|
|
|
|
|
def extract_location_name(location_data):
|
|
"""
|
|
Extrae un nombre legible de las variantes comunes de respuesta de GHL.
|
|
"""
|
|
if not isinstance(location_data, dict):
|
|
return None
|
|
|
|
payload = location_data.get("location") if isinstance(location_data.get("location"), dict) else location_data
|
|
for key in ("name", "businessName", "companyName"):
|
|
value = payload.get(key)
|
|
if value and str(value).strip():
|
|
return str(value).strip()
|
|
|
|
business = payload.get("business")
|
|
if isinstance(business, dict):
|
|
value = business.get("name")
|
|
if value and str(value).strip():
|
|
return str(value).strip()
|
|
|
|
return None
|
|
|
|
|
|
def resolve_location_name(location_id, token, fallback=""):
|
|
if fallback and fallback.strip():
|
|
return fallback.strip()
|
|
if location_id in _location_name_cache:
|
|
return _location_name_cache[location_id]
|
|
try:
|
|
data = ghl_client.get_location(token, location_id)
|
|
name = extract_location_name(data) or location_id
|
|
except Exception as e:
|
|
error_id = error_logging.log_error("sync_resolve_location_name_failed", e, {"location_id": location_id})
|
|
print(f"Advertencia: no se pudo obtener nombre de location {location_id}: {e} | error_id={error_id}")
|
|
name = location_id
|
|
_location_name_cache[location_id] = name
|
|
return name
|
|
|
|
# --- CARGADOR DE CSV ---
|
|
|
|
def parse_accounts_csv():
|
|
"""
|
|
Parsea el archivo CSV de cuentas de Bucéfalo.
|
|
Aplica deduplicación, filtrado y clasificación en 'brand' y 'branch'.
|
|
"""
|
|
if not os.path.exists(CSV_PATH):
|
|
raise FileNotFoundError(f"No se encontró el archivo CSV en la ruta: {CSV_PATH}")
|
|
|
|
accounts = {}
|
|
|
|
with open(CSV_PATH, mode='r', encoding='utf-8-sig') as f:
|
|
reader = csv.DictReader(f)
|
|
for row in reader:
|
|
loc_id = row.get('Location_ID', '').strip()
|
|
nombre = row.get('Nombre', '').strip()
|
|
token = row.get('API_token', '').strip()
|
|
whatsapp = row.get('WhatsApp_idWidget', '').strip()
|
|
owner = row.get('Company Owner', '').strip()
|
|
|
|
# Filtrar filas vacías o sin Location_ID
|
|
if not loc_id or loc_id == "" or not token:
|
|
continue
|
|
|
|
# Normalizar Company Owner
|
|
if not owner or owner == "-" or owner == "":
|
|
owner = "Sin asignar"
|
|
|
|
# Determinar tipo
|
|
# Monte Providencia (Marca) es la cuenta principal
|
|
acc_type = "brand" if loc_id == "GbKkBpCmKu2QmloKFHy3" else "branch"
|
|
nombre = resolve_location_name(loc_id, token, nombre)
|
|
|
|
accounts[loc_id] = {
|
|
"location_id": loc_id,
|
|
"nombre": nombre,
|
|
"token": token,
|
|
"whatsapp_widget": whatsapp,
|
|
"company_owner": owner,
|
|
"type": acc_type
|
|
}
|
|
|
|
# Devolver como lista. La deduplicación se hace automáticamente al usar un diccionario por loc_id (último gana)
|
|
return list(accounts.values())
|
|
|
|
def get_tokens_map():
|
|
"""
|
|
Devuelve un mapeo simple de location_id -> token.
|
|
"""
|
|
accounts = parse_accounts_csv()
|
|
return {acc['location_id']: acc['token'] for acc in accounts}
|
|
|
|
# --- MOTOR DE SINCRONIZACIÓN INDIVIDUAL ---
|
|
|
|
def sync_account(location_id, token):
|
|
"""
|
|
Sincroniza contactos, oportunidades y pipelines de una sucursal en SQLite.
|
|
"""
|
|
log_id = db.create_sync_log(location_id)
|
|
print(f"[{datetime.now()}] Iniciando sync para cuenta: {location_id}")
|
|
|
|
try:
|
|
sync_step = "metadata"
|
|
sync_account_metadata(location_id, token)
|
|
|
|
sync_step = "pipelines"
|
|
# 1. Obtener Pipelines
|
|
pipelines = ghl_client.get_pipelines(token, location_id)
|
|
|
|
sync_step = "opportunities"
|
|
# 2. Obtener Oportunidades
|
|
opportunities = ghl_client.get_all_opportunities(token, location_id)
|
|
|
|
sync_step = "contacts"
|
|
# 3. Obtener Contactos
|
|
contacts = ghl_client.get_all_contacts(token, location_id)
|
|
|
|
sync_step = "synthetic_pipelines"
|
|
# 4. Fallback sintético de pipelines si GHL API devolvió []
|
|
if not pipelines and opportunities:
|
|
print(f"-> GHL devolvió 0 pipelines para {location_id}. Sintetizando desde oportunidades...")
|
|
stages_map = {}
|
|
for o in opportunities:
|
|
p_id = o.get("pipelineId")
|
|
s_id = o.get("pipelineStageId")
|
|
if p_id and s_id:
|
|
if p_id not in stages_map:
|
|
stages_map[p_id] = set()
|
|
stages_map[p_id].add(s_id)
|
|
|
|
pipelines = []
|
|
for p_id, stages_set in stages_map.items():
|
|
stages_list = [
|
|
{"id": s_id, "name": f"Etapa {s_id[:6]}...", "order": idx + 1}
|
|
for idx, s_id in enumerate(sorted(stages_set))
|
|
]
|
|
pipelines.append({
|
|
"id": p_id,
|
|
"name": f"Pipeline Sintético ({p_id[:6]})",
|
|
"stages": stages_list
|
|
})
|
|
|
|
sync_step = "save_to_sqlite"
|
|
# 5. Guardar en SQLite en bloque transaccional
|
|
db.save_pipelines(location_id, pipelines)
|
|
db.save_opportunities(location_id, opportunities)
|
|
db.save_contacts(location_id, contacts)
|
|
|
|
# 6. Actualizar log
|
|
db.update_sync_log(log_id, "success", len(contacts), len(opportunities))
|
|
print(f"[{datetime.now()}] Sincronización exitosa para {location_id}. Contactos: {len(contacts)}, Opps: {len(opportunities)}")
|
|
return True, len(contacts), len(opportunities), None
|
|
|
|
except Exception as e:
|
|
error_msg = str(e)
|
|
error_id = error_logging.log_error("sync_account_failed", e, {
|
|
"location_id": location_id,
|
|
"sync_step": locals().get("sync_step", "unknown"),
|
|
})
|
|
db.update_sync_log(log_id, "failed", 0, 0, error_msg)
|
|
print(f"[{datetime.now()}] Error sincronizando cuenta {location_id}: {error_msg} | error_id={error_id}")
|
|
return False, 0, 0, error_msg
|
|
|
|
|
|
def sync_account_metadata(location_id, token):
|
|
"""
|
|
Sincroniza schemas dinamicos de contact y opportunity en SQLite.
|
|
"""
|
|
for object_key in ("contact", "opportunity"):
|
|
fields = ghl_client.get_object_schema_fields(token, location_id, object_key)
|
|
if not fields:
|
|
print(f"Advertencia: schema vacio para {location_id}/{object_key}; se conserva metadata local existente.")
|
|
continue
|
|
db.save_object_schema(location_id, object_key, fields)
|
|
return True
|
|
|
|
# --- MOTOR DE SINCRONIZACIÓN GLOBAL ---
|
|
|
|
_sync_status = {
|
|
"is_running": False,
|
|
"total": 0,
|
|
"done": 0,
|
|
"success_count": 0,
|
|
"failed_count": 0,
|
|
"max_workers": 0,
|
|
"current_branch": ""
|
|
}
|
|
|
|
def get_sync_progress():
|
|
global _sync_status
|
|
return _sync_status
|
|
|
|
def sync_all_accounts():
|
|
"""
|
|
Sincroniza todas las cuentas cargadas en el CSV en paralelo con concurrencia regulada.
|
|
"""
|
|
global _sync_status
|
|
|
|
if _sync_status["is_running"]:
|
|
return {"message": "Sincronización global ya se encuentra en ejecución."}
|
|
|
|
# Inicializar Base de Datos
|
|
db.init_db()
|
|
|
|
try:
|
|
accounts = parse_accounts_csv()
|
|
# Guardar / Actualizar catálogo de cuentas en SQLite
|
|
db.save_accounts(accounts)
|
|
except Exception as e:
|
|
error_id = error_logging.log_error("sync_all_parse_accounts_failed", e)
|
|
print(f"No se pudo cargar el catálogo de cuentas: {e} | error_id={error_id}")
|
|
return {"error": f"No se pudo cargar el catálogo de cuentas: {str(e)}"}
|
|
|
|
total_accounts = len(accounts)
|
|
_sync_status.update({
|
|
"is_running": True,
|
|
"total": total_accounts,
|
|
"done": 0,
|
|
"success_count": 0,
|
|
"failed_count": 0,
|
|
"max_workers": 0,
|
|
"current_branch": "Inicializando..."
|
|
})
|
|
|
|
# Ejecutar en segundo plano en un hilo separado para no bloquear FastAPI
|
|
def run_in_background():
|
|
global _sync_status
|
|
# Al ser el límite de ráfaga de GHL por ubicación individual (recurso),
|
|
# podemos procesar múltiples sucursales en paralelo sin que interfieran entre sí.
|
|
concurrency = min(total_accounts, get_sync_max_workers()) if total_accounts else 1
|
|
_sync_status["max_workers"] = concurrency
|
|
print(f"Sync global usando hasta {concurrency} cuentas en paralelo.")
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
|
|
future_to_acc = {
|
|
executor.submit(sync_account, acc['location_id'], acc['token']): acc
|
|
for acc in accounts
|
|
}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_acc):
|
|
acc = future_to_acc[future]
|
|
_sync_status["current_branch"] = acc['nombre']
|
|
try:
|
|
success, c_count, o_count, err = future.result()
|
|
if success:
|
|
_sync_status["success_count"] += 1
|
|
else:
|
|
_sync_status["failed_count"] += 1
|
|
except Exception as e:
|
|
_sync_status["failed_count"] += 1
|
|
error_id = error_logging.log_error("sync_thread_fatal", e, {
|
|
"location_id": acc.get("location_id"),
|
|
"account_name": acc.get("nombre"),
|
|
})
|
|
print(f"Excepción fatal en hilo de sync para {acc['nombre']}: {e} | error_id={error_id}")
|
|
|
|
_sync_status["done"] += 1
|
|
|
|
# Finalizado
|
|
_sync_status["is_running"] = False
|
|
_sync_status["current_branch"] = "Completado"
|
|
|
|
# Lanzar hilo secundario
|
|
import threading
|
|
t = threading.Thread(target=run_in_background)
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
return {"message": "Sincronización global iniciada en segundo plano."}
|
|
|
|
|
|
def sync_all_metadata():
|
|
"""
|
|
Sincroniza solo schemas/metadata de todas las cuentas en paralelo.
|
|
"""
|
|
global _sync_status
|
|
|
|
if _sync_status["is_running"]:
|
|
return {"message": "Ya hay una sincronización global en ejecución."}
|
|
|
|
db.init_db()
|
|
|
|
try:
|
|
accounts = parse_accounts_csv()
|
|
db.save_accounts(accounts)
|
|
except Exception as e:
|
|
error_id = error_logging.log_error("sync_metadata_parse_accounts_failed", e)
|
|
print(f"No se pudo cargar el catálogo de cuentas para metadata: {e} | error_id={error_id}")
|
|
return {"error": f"No se pudo cargar el catálogo de cuentas: {str(e)}"}
|
|
|
|
total_accounts = len(accounts)
|
|
_sync_status.update({
|
|
"is_running": True,
|
|
"total": total_accounts,
|
|
"done": 0,
|
|
"success_count": 0,
|
|
"failed_count": 0,
|
|
"max_workers": 0,
|
|
"current_branch": "Inicializando metadata..."
|
|
})
|
|
|
|
def run_in_background():
|
|
global _sync_status
|
|
concurrency = min(total_accounts, get_sync_max_workers()) if total_accounts else 1
|
|
_sync_status["max_workers"] = concurrency
|
|
print(f"Sync metadata global usando hasta {concurrency} cuentas en paralelo.")
|
|
|
|
with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor:
|
|
future_to_acc = {
|
|
executor.submit(sync_account_metadata, acc['location_id'], acc['token']): acc
|
|
for acc in accounts
|
|
}
|
|
|
|
for future in concurrent.futures.as_completed(future_to_acc):
|
|
acc = future_to_acc[future]
|
|
_sync_status["current_branch"] = acc['nombre']
|
|
try:
|
|
future.result()
|
|
_sync_status["success_count"] += 1
|
|
except Exception as e:
|
|
_sync_status["failed_count"] += 1
|
|
error_id = error_logging.log_error("sync_account_metadata_failed", e, {
|
|
"location_id": acc.get("location_id"),
|
|
"account_name": acc.get("nombre"),
|
|
})
|
|
print(f"Error sincronizando metadata para {acc['nombre']}: {e} | error_id={error_id}")
|
|
finally:
|
|
_sync_status["done"] += 1
|
|
|
|
_sync_status["is_running"] = False
|
|
_sync_status["current_branch"] = "Metadata completada"
|
|
|
|
import threading
|
|
t = threading.Thread(target=run_in_background)
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
return {"message": "Sincronización de metadata iniciada en segundo plano."}
|