import os import csv import pandas as pd import concurrent.futures from datetime import datetime from ghl_client import GHLClient import db import error_logging CSV_FILENAME = "Bucéfalo - Mesa de control - API Tokens - MP.csv" CSV_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), CSV_FILENAME) # Default: sin tope artificial. El rate limit de GHL es por location, asi que # paralelizar todas las cuentas no genera 429. El consumo de hilos/CPU es bajo # porque casi todo el tiempo es I/O. La env var SYNC_ENGINE_MAX_WORKERS sigue # disponible si en algun momento se quiere acotar a proposito. DEFAULT_SYNC_MAX_WORKERS = 1000 ghl_client = GHLClient() _location_name_cache = {} # ============================================================================ # REFRESH HELPERS — single source of truth post-mutación # ============================================================================ # Tras cualquier endpoint mutante (create/update/delete opp/contact) hay que # garantizar que SQLite refleje la verdad de Bucéfalo. Estos helpers hacen # fetch live del objeto recién tocado y lo upsertan localmente. Si falla, lo # reportan en lugar de tragar el error. def refresh_opportunity_in_db(token, opp_id, location_id): """GET /opportunities/{id} → save_single_opportunity. Devuelve dict con {ok, opportunity, error}. NO levanta excepciones — el caller decide. Si la opp ya no existe (404), borra la fila local.""" try: res = ghl_client.get_opportunity(token, opp_id) except Exception as exc: # Si GHL devuelve 404, eliminar localmente para mantener consistencia. status = None response = getattr(exc, "response", None) if response is not None: try: status = response.status_code except Exception: status = None if status == 404: try: conn = db.get_db_connection() try: conn.execute("DELETE FROM opportunities WHERE id=? AND location_id=?", (opp_id, location_id)) conn.commit() finally: conn.close() return {"ok": True, "deleted_locally": True, "opportunity": None} except Exception as db_exc: return {"ok": False, "error": f"GHL 404 + cleanup local fallo: {db_exc}"} return {"ok": False, "error": f"get_opportunity fallo: {exc}"} opp = (res or {}).get("opportunity") or res or {} if not opp.get("id"): return {"ok": False, "error": f"Respuesta GHL sin id: {res}"} try: db.save_single_opportunity(location_id, opp) except Exception as exc: return {"ok": False, "error": f"save_single_opportunity fallo: {exc}", "opportunity": opp} return {"ok": True, "opportunity": opp} def refresh_contact_in_db(token, contact_id, location_id): """GET /contacts/{id} → save_single_contact. Similar a refresh_opportunity_in_db. Si el contacto fue eliminado en Bucéfalo (404), también borra opps en cascade.""" try: res = ghl_client._request("GET", f"/contacts/{contact_id}", token) except Exception as exc: status = None response = getattr(exc, "response", None) if response is not None: try: status = response.status_code except Exception: status = None if status == 404: try: conn = db.get_db_connection() try: conn.execute("DELETE FROM opportunities WHERE contact_id=? AND location_id=?", (contact_id, location_id)) conn.execute("DELETE FROM contacts WHERE id=? AND location_id=?", (contact_id, location_id)) conn.commit() finally: conn.close() return {"ok": True, "deleted_locally": True, "contact": None} except Exception as db_exc: return {"ok": False, "error": f"GHL 404 + cleanup local fallo: {db_exc}"} return {"ok": False, "error": f"get_contact fallo: {exc}"} contact = (res or {}).get("contact") or res or {} if not contact.get("id"): return {"ok": False, "error": f"Respuesta GHL sin id: {res}"} try: db.save_single_contact(location_id, contact) except Exception as exc: return {"ok": False, "error": f"save_single_contact fallo: {exc}", "contact": contact} return {"ok": True, "contact": contact} def refresh_opps_for_contact(token, contact_id, location_id): """Sincroniza TODAS las opps del contacto en una location. IMPORTANTE: GHL ignora el filtro contactId en POST /opportunities/search (bug confirmado: devuelve todas las opps de la location). Por eso paginamos todas las opps y filtramos en Python por contactId real. Devuelve {ok, refreshed_count, opps_in_ghl, error}.""" matched = [] try: page = 1 while True: body = {"locationId": location_id, "limit": 100, "page": page} res = ghl_client._request("POST", "/opportunities/search", token, json=body) batch = res.get("opportunities", []) if isinstance(res, dict) else [] if not batch: break for opp in batch: if opp.get("contactId") == contact_id: matched.append(opp) if len(batch) < 100: break page += 1 if page > 100: break except Exception as exc: return {"ok": False, "error": f"search_opportunities fallo: {exc}", "refreshed_count": 0} refreshed = 0 for opp in matched: try: db.save_single_opportunity(location_id, opp) refreshed += 1 except Exception as exc: error_logging.log_error("refresh_opps_for_contact_save_failed", exc, { "opp_id": opp.get("id"), "location_id": location_id, "contact_id": contact_id, }) return {"ok": True, "refreshed_count": refreshed, "opps_in_ghl": len(matched)} def get_sync_max_workers(): raw_value = os.getenv("SYNC_ENGINE_MAX_WORKERS", str(DEFAULT_SYNC_MAX_WORKERS)) try: workers = int(raw_value) except (TypeError, ValueError): workers = DEFAULT_SYNC_MAX_WORKERS return max(1, workers) def extract_location_name(location_data): """ Extrae un nombre legible de las variantes comunes de respuesta de GHL. """ if not isinstance(location_data, dict): return None payload = location_data.get("location") if isinstance(location_data.get("location"), dict) else location_data for key in ("name", "businessName", "companyName"): value = payload.get(key) if value and str(value).strip(): return str(value).strip() business = payload.get("business") if isinstance(business, dict): value = business.get("name") if value and str(value).strip(): return str(value).strip() return None def resolve_location_name(location_id, token, fallback=""): if fallback and fallback.strip(): return fallback.strip() if location_id in _location_name_cache: return _location_name_cache[location_id] try: data = ghl_client.get_location(token, location_id) name = extract_location_name(data) or location_id except Exception as e: error_id = error_logging.log_error("sync_resolve_location_name_failed", e, {"location_id": location_id}) print(f"Advertencia: no se pudo obtener nombre de location {location_id}: {e} | error_id={error_id}") name = location_id _location_name_cache[location_id] = name return name # --- CARGADOR DE CSV --- def parse_accounts_csv(): """ Parsea el archivo CSV de cuentas de Bucéfalo. Aplica deduplicación, filtrado y clasificación en 'brand' y 'branch'. """ if not os.path.exists(CSV_PATH): raise FileNotFoundError(f"No se encontró el archivo CSV en la ruta: {CSV_PATH}") accounts = {} with open(CSV_PATH, mode='r', encoding='utf-8-sig') as f: reader = csv.DictReader(f) for row in reader: loc_id = row.get('Location_ID', '').strip() nombre = row.get('Nombre', '').strip() token = row.get('API_token', '').strip() whatsapp = row.get('WhatsApp_idWidget', '').strip() owner = row.get('Company Owner', '').strip() # Filtrar filas vacías o sin Location_ID if not loc_id or loc_id == "" or not token: continue # Normalizar Company Owner if not owner or owner == "-" or owner == "": owner = "Sin asignar" # Determinar tipo # Monte Providencia (Marca) es la cuenta principal acc_type = "brand" if loc_id == "GbKkBpCmKu2QmloKFHy3" else "branch" nombre = resolve_location_name(loc_id, token, nombre) accounts[loc_id] = { "location_id": loc_id, "nombre": nombre, "token": token, "whatsapp_widget": whatsapp, "company_owner": owner, "type": acc_type } # Devolver como lista. La deduplicación se hace automáticamente al usar un diccionario por loc_id (último gana) return list(accounts.values()) def get_tokens_map(): """ Devuelve un mapeo simple de location_id -> token. """ accounts = parse_accounts_csv() return {acc['location_id']: acc['token'] for acc in accounts} # --- MOTOR DE SINCRONIZACIÓN INDIVIDUAL --- def sync_account(location_id, token): """ Sincroniza contactos, oportunidades y pipelines de una sucursal en SQLite. """ log_id = db.create_sync_log(location_id) print(f"[{datetime.now()}] Iniciando sync para cuenta: {location_id}") try: sync_step = "metadata" sync_account_metadata(location_id, token) sync_step = "pipelines" # 1. Obtener Pipelines pipelines = ghl_client.get_pipelines(token, location_id) sync_step = "opportunities" # 2. Obtener Oportunidades opportunities = ghl_client.get_all_opportunities(token, location_id) sync_step = "contacts" # 3. Obtener Contactos contacts = ghl_client.get_all_contacts(token, location_id) sync_step = "synthetic_pipelines" # 4. Fallback sintético de pipelines si GHL API devolvió [] if not pipelines and opportunities: print(f"-> GHL devolvió 0 pipelines para {location_id}. Sintetizando desde oportunidades...") stages_map = {} for o in opportunities: p_id = o.get("pipelineId") s_id = o.get("pipelineStageId") if p_id and s_id: if p_id not in stages_map: stages_map[p_id] = set() stages_map[p_id].add(s_id) pipelines = [] for p_id, stages_set in stages_map.items(): stages_list = [ {"id": s_id, "name": f"Etapa {s_id[:6]}...", "order": idx + 1} for idx, s_id in enumerate(sorted(stages_set)) ] pipelines.append({ "id": p_id, "name": f"Pipeline Sintético ({p_id[:6]})", "stages": stages_list }) sync_step = "save_to_sqlite" # 5. Guardar en SQLite en bloque transaccional db.save_pipelines(location_id, pipelines) db.save_opportunities(location_id, opportunities) db.save_contacts(location_id, contacts) # 6. Actualizar log db.update_sync_log(log_id, "success", len(contacts), len(opportunities)) print(f"[{datetime.now()}] Sincronización exitosa para {location_id}. Contactos: {len(contacts)}, Opps: {len(opportunities)}") return True, len(contacts), len(opportunities), None except Exception as e: error_msg = str(e) error_id = error_logging.log_error("sync_account_failed", e, { "location_id": location_id, "sync_step": locals().get("sync_step", "unknown"), }) db.update_sync_log(log_id, "failed", 0, 0, error_msg) print(f"[{datetime.now()}] Error sincronizando cuenta {location_id}: {error_msg} | error_id={error_id}") return False, 0, 0, error_msg def sync_account_metadata(location_id, token): """ Sincroniza schemas dinamicos de contact y opportunity en SQLite. """ for object_key in ("contact", "opportunity"): fields = ghl_client.get_object_schema_fields(token, location_id, object_key) if not fields: print(f"Advertencia: schema vacio para {location_id}/{object_key}; se conserva metadata local existente.") continue db.save_object_schema(location_id, object_key, fields) return True # --- MOTOR DE SINCRONIZACIÓN GLOBAL --- _sync_status = { "is_running": False, "total": 0, "done": 0, "success_count": 0, "failed_count": 0, "max_workers": 0, "current_branch": "" } def get_sync_progress(): global _sync_status return _sync_status def sync_all_accounts(): """ Sincroniza todas las cuentas cargadas en el CSV en paralelo con concurrencia regulada. """ global _sync_status if _sync_status["is_running"]: return {"message": "Sincronización global ya se encuentra en ejecución."} # Inicializar Base de Datos db.init_db() try: accounts = parse_accounts_csv() # Guardar / Actualizar catálogo de cuentas en SQLite db.save_accounts(accounts) except Exception as e: error_id = error_logging.log_error("sync_all_parse_accounts_failed", e) print(f"No se pudo cargar el catálogo de cuentas: {e} | error_id={error_id}") return {"error": f"No se pudo cargar el catálogo de cuentas: {str(e)}"} total_accounts = len(accounts) _sync_status.update({ "is_running": True, "total": total_accounts, "done": 0, "success_count": 0, "failed_count": 0, "max_workers": 0, "current_branch": "Inicializando..." }) # Ejecutar en segundo plano en un hilo separado para no bloquear FastAPI def run_in_background(): global _sync_status # Al ser el límite de ráfaga de GHL por ubicación individual (recurso), # podemos procesar múltiples sucursales en paralelo sin que interfieran entre sí. concurrency = min(total_accounts, get_sync_max_workers()) if total_accounts else 1 _sync_status["max_workers"] = concurrency print(f"Sync global usando hasta {concurrency} cuentas en paralelo.") with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: future_to_acc = { executor.submit(sync_account, acc['location_id'], acc['token']): acc for acc in accounts } for future in concurrent.futures.as_completed(future_to_acc): acc = future_to_acc[future] _sync_status["current_branch"] = acc['nombre'] try: success, c_count, o_count, err = future.result() if success: _sync_status["success_count"] += 1 else: _sync_status["failed_count"] += 1 except Exception as e: _sync_status["failed_count"] += 1 error_id = error_logging.log_error("sync_thread_fatal", e, { "location_id": acc.get("location_id"), "account_name": acc.get("nombre"), }) print(f"Excepción fatal en hilo de sync para {acc['nombre']}: {e} | error_id={error_id}") _sync_status["done"] += 1 # Finalizado _sync_status["is_running"] = False _sync_status["current_branch"] = "Completado" # Lanzar hilo secundario import threading t = threading.Thread(target=run_in_background) t.daemon = True t.start() return {"message": "Sincronización global iniciada en segundo plano."} def sync_all_metadata(): """ Sincroniza solo schemas/metadata de todas las cuentas en paralelo. """ global _sync_status if _sync_status["is_running"]: return {"message": "Ya hay una sincronización global en ejecución."} db.init_db() try: accounts = parse_accounts_csv() db.save_accounts(accounts) except Exception as e: error_id = error_logging.log_error("sync_metadata_parse_accounts_failed", e) print(f"No se pudo cargar el catálogo de cuentas para metadata: {e} | error_id={error_id}") return {"error": f"No se pudo cargar el catálogo de cuentas: {str(e)}"} total_accounts = len(accounts) _sync_status.update({ "is_running": True, "total": total_accounts, "done": 0, "success_count": 0, "failed_count": 0, "max_workers": 0, "current_branch": "Inicializando metadata..." }) def run_in_background(): global _sync_status concurrency = min(total_accounts, get_sync_max_workers()) if total_accounts else 1 _sync_status["max_workers"] = concurrency print(f"Sync metadata global usando hasta {concurrency} cuentas en paralelo.") with concurrent.futures.ThreadPoolExecutor(max_workers=concurrency) as executor: future_to_acc = { executor.submit(sync_account_metadata, acc['location_id'], acc['token']): acc for acc in accounts } for future in concurrent.futures.as_completed(future_to_acc): acc = future_to_acc[future] _sync_status["current_branch"] = acc['nombre'] try: future.result() _sync_status["success_count"] += 1 except Exception as e: _sync_status["failed_count"] += 1 error_id = error_logging.log_error("sync_account_metadata_failed", e, { "location_id": acc.get("location_id"), "account_name": acc.get("nombre"), }) print(f"Error sincronizando metadata para {acc['nombre']}: {e} | error_id={error_id}") finally: _sync_status["done"] += 1 _sync_status["is_running"] = False _sync_status["current_branch"] = "Metadata completada" import threading t = threading.Thread(target=run_in_background) t.daemon = True t.start() return {"message": "Sincronización de metadata iniciada en segundo plano."}