#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Genera y guarda la sesión persistente de Bucefalo CRM. Modos: (default) Si hay credenciales en .env (BUCEFALO_LOGIN_EMAIL/PASSWORD + EMAIL_IMAP_*), hace login automático: llena el form, pide código MFA por correo, lo lee del IMAP y lo pega. Sin intervención humana. Si faltan credenciales, abre el navegador y espera a que el usuario complete login + MFA, autodetectando cuando termina (mismo comportamiento previo). --manual Conserva el comportamiento original: espera un ENTER en la terminal. --no-auto Fuerza el modo manual con detección automática (no usar credenciales aunque estén configuradas). """ import argparse import os import subprocess import sys import time from datetime import datetime, timezone ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from paths import SCREENSHOTS_DIR as SCREENSHOT_DIR, SESSION_FILE, BROWSER_PROFILE_DEFAULT # noqa: E402 def _save_debug_screenshot(page, label): """Guarda una captura de pantalla con timestamp y devuelve la ruta.""" try: os.makedirs(SCREENSHOT_DIR, exist_ok=True) ts = time.strftime("%Y%m%d_%H%M%S") path = os.path.join(SCREENSHOT_DIR, f"autologin_{label}_{ts}.png") page.screenshot(path=path, full_page=True) print(f"[DEBUG] Captura guardada: {path}") return path except Exception as e: print(f"[DEBUG] No se pudo guardar screenshot '{label}': {e}") return None # Cargar .env si existe. try: from dotenv import load_dotenv load_dotenv(os.path.join(ROOT_DIR, ".env")) except Exception: pass LOGIN_URL = "https://crm.bucefalocrm.io/" # Tiempo máximo (s) que esperamos a que el usuario termine de loggear en modo no-auto. AUTO_LOGIN_TIMEOUT_SEC = 300 PERSISTENT_PROFILE_DIR = os.environ.get("GHL_BROWSER_PROFILE_DIR", "").strip() or None def ensure_playwright_browsers(): """Si faltan los binarios de Chromium, los instala.""" try: from playwright.sync_api import sync_playwright except Exception as imp_err: print(f"ERROR: 'playwright' no está instalado: {imp_err}") return False try: with sync_playwright() as p: exe = p.chromium.executable_path if exe and os.path.exists(exe): return True except Exception: pass print("[INFO] Instalando binarios de Chromium (playwright install chromium)...") result = subprocess.run( [sys.executable, "-m", "playwright", "install", "chromium"], capture_output=True, text=True, timeout=600, ) if result.returncode != 0: print(result.stderr.strip()) return False return True def _has_auto_credentials(): """True si están todas las variables necesarias para login + MFA auto.""" needed = ( "BUCEFALO_LOGIN_EMAIL", "BUCEFALO_LOGIN_PASSWORD", "EMAIL_IMAP_HOST", "EMAIL_IMAP_USER", "EMAIL_IMAP_PASSWORD", ) return all((os.environ.get(k) or "").strip() for k in needed) def _looks_like_login_page(page): """True si la página visible es del flujo de login (form de credenciales o de código OTP).""" # Indicadores del login en el DOM principal y frames. signals = [ 'input[type="password"]', 'text=/Inicia.*sesi[oó]n.*cuenta/i', 'text=/Sign in to your account/i', 'text=/Verificar el C[oó]digo/i', 'text=/Verify.*Code/i', ] for scope in [page] + list(page.frames): for sig in signals: try: loc = scope.locator(sig).first if loc.count() > 0: try: if loc.is_visible(timeout=200): return True except Exception: return True except Exception: continue return False def _wait_for_login_completion(page, timeout_sec=AUTO_LOGIN_TIMEOUT_SEC): """Espera a que el login se complete. Lo detecta por: 1) URL fuera de /login o /auth, y 2) DOM ya no muestra el form de login (Bucéfalo es SPA y a veces no cambia la URL). """ print(f"[INFO] Esperando hasta {timeout_sec} s a que el login se complete…") deadline = time.time() + timeout_sec last_url = "" # Estabilización: necesitamos N reads consecutivos confirmando que ya no estamos en login. not_login_streak = 0 while time.time() < deadline: try: current = page.url except Exception: current = "" if current and current != last_url: print(f"[INFO] URL actual: {current}") last_url = current url_in_login = "login" in (current or "").lower() or "/auth" in (current or "").lower() dom_in_login = _looks_like_login_page(page) if not url_in_login and not dom_in_login: not_login_streak += 1 if not_login_streak >= 2: print("[INFO] Login completado (URL fuera del login + DOM sin form de credenciales).") return True else: not_login_streak = 0 time.sleep(2) return False def _perform_auto_login(page): """Llena email + password + selecciona MFA por email + pega el código del IMAP. Retorna True si llegó al dashboard, False si algo falló. """ from email_otp_reader import wait_for_bucefalo_otp, OtpReaderError email = os.environ.get("BUCEFALO_LOGIN_EMAIL", "").strip() password = os.environ.get("BUCEFALO_LOGIN_PASSWORD", "") print("[AUTO] Llenando email + contraseña...") try: # GHL usa input[type=email] y input[type=password]. Selectores robustos. page.locator('input[type="email"], input[name="email"]').first.fill(email, timeout=15000) page.locator('input[type="password"], input[name="password"]').first.fill(password) except Exception as e: print(f"[AUTO] No se encontraron los campos del login: {e}") return False # Botón "Iniciar sesión" / "Sign in" print("[AUTO] Click en 'Iniciar sesión'...") submitted = False for sel in [ 'button:has-text("Iniciar sesión")', 'button:has-text("Iniciar")', 'button:has-text("Sign in")', 'button:has-text("Log in")', 'button[type="submit"]', ]: try: btn = page.locator(sel).first if btn.count() > 0: btn.click(timeout=5000) submitted = True break except Exception: continue if not submitted: # Fallback: presionar Enter en el campo password. try: page.keyboard.press("Enter") submitted = True except Exception: pass if not submitted: print("[AUTO] No pude enviar el formulario de login.") return False # Esperar el modal de selección de método 2FA y hacer click en "Email". # NOTA: a veces Bucéfalo salta este paso si solo hay un método configurado. print("[AUTO] Esperando selector de método 2FA o pantalla de código...") time.sleep(4) # GHL muestra el modal tras procesar credenciales _save_debug_screenshot(page, "post_login") # Marcar el momento ANTES de pedir el código — el correo OTP debe ser posterior a esto. request_ts = datetime.now(timezone.utc) # Selectores ampliados para el método "Email" del 2FA. # GHL puede usar botones, radios, tarjetas, etc. email_method_selectors = [ 'button:has-text("Email")', 'button:has-text("Correo electrónico")', 'button:has-text("Correo")', 'button:has-text("Send to Email")', 'button:has-text("Enviar al correo")', 'button:has-text("Por correo")', 'div[role="button"]:has-text("Email")', 'div[role="button"]:has-text("Correo")', 'label:has-text("Email")', 'label:has-text("Correo electrónico")', '[data-test-id*="email"]', '[data-testid*="email"]', # Tarjetas clickeables con icono '.tap-option-card:has-text("Email")', '.method-card:has-text("Email")', '[class*="option"]:has-text("Email")', '[class*="method"]:has-text("Email")', # Radios + label 'input[type="radio"][value*="email" i]', 'input[type="radio"][value*="mail" i]', ] method_clicked = False method_deadline = time.time() + 15 while time.time() < method_deadline and not method_clicked: for sel in email_method_selectors: try: loc = page.locator(sel).first if loc.count() > 0 and loc.is_visible(timeout=300): loc.click(timeout=2000) print(f"[AUTO] Método 2FA seleccionado vía: {sel}") method_clicked = True break except Exception: continue if not method_clicked: time.sleep(1) if not method_clicked: print("[AUTO] No apareció el selector de método 2FA — asumo que Bucéfalo envió el código directo.") _save_debug_screenshot(page, "no_method_selector") # Botón opcional "Send code" / "Enviar código" después de elegir método. time.sleep(1.5) for sel in [ 'button:has-text("Send code")', 'button:has-text("Send Code")', 'button:has-text("Enviar código")', 'button:has-text("Enviar")', 'button:has-text("Continuar")', 'button:has-text("Continue")', ]: try: btn = page.locator(sel).first if btn.count() > 0 and btn.is_visible(timeout=500): btn.click() print(f"[AUTO] Click adicional: {sel}") time.sleep(1) break except Exception: continue _save_debug_screenshot(page, "before_otp_input") # Esperar el correo OTP por IMAP. print("[AUTO] Esperando código OTP por IMAP...") try: code = wait_for_bucefalo_otp(since=request_ts, debug=True) except OtpReaderError as e: print(f"[AUTO] No se pudo leer el OTP: {e}") _save_debug_screenshot(page, "otp_read_failed") return False print(f"[AUTO] Código obtenido: {code}. Buscando dónde tipearlo...") # Esperar hasta 15 s a que aparezca algún input del código. # Probamos múltiples estrategias en orden de preferencia. digit_inputs_selectors = [ 'input[maxlength="1"]', # más amplio: cualquier input de 1 caracter 'input[inputmode="numeric"][maxlength="1"]', 'input[autocomplete="one-time-code"][maxlength="1"]', ] single_input_selectors = [ 'input[autocomplete="one-time-code"]:not([maxlength="1"])', 'input[name="code"]', 'input[name="otp"]', 'input[name="verification-code"]', 'input[type="text"][maxlength="6"]', 'input[type="number"][maxlength="6"]', 'input[type="tel"][maxlength="6"]', 'input[placeholder*="código" i]', 'input[placeholder*="code" i]', 'input[placeholder*="OTP" i]', 'input[aria-label*="code" i]', 'input[aria-label*="código" i]', ] input_wait_deadline = time.time() + 15 target_input = None # locator del input donde hacer click (foco) target_kind = None # 'digits' | 'single' while time.time() < input_wait_deadline: # Preferencia 1: detectar 6 (o más) inputs de 1 dígito. for sel in digit_inputs_selectors: try: inputs = page.locator(sel).all() # Filtrar a los visibles. visible_inputs = [] for ip in inputs: try: if ip.is_visible(timeout=200): visible_inputs.append(ip) except Exception: continue if len(visible_inputs) >= 6: target_input = visible_inputs[0] target_kind = 'digits' print(f"[AUTO] Detectados {len(visible_inputs)} inputs de 1 dígito ({sel}).") break except Exception: continue if target_kind: break # Preferencia 2: input único. for sel in single_input_selectors: try: loc = page.locator(sel).first if loc.count() > 0 and loc.is_visible(timeout=300): target_input = loc target_kind = 'single' print(f"[AUTO] Detectado input único del código: {sel}") break except Exception: continue if target_kind: break time.sleep(1) _save_debug_screenshot(page, "otp_input_search") code_filled = False if target_input is None: # Último recurso: el primer input visible. try: all_inputs = page.locator('input').all() for ip in all_inputs: try: if ip.is_visible(timeout=200): target_input = ip target_kind = 'fallback_first_visible' print("[AUTO] Fallback: usando primer input visible.") break except Exception: continue except Exception: pass if target_input is not None: try: # Hacer click para asegurar foco en el primer input/único. target_input.click(timeout=3000) time.sleep(0.3) # Tipear caracter por caracter con un pequeño delay para que Vue/React # capture cada keydown y auto-avance al siguiente input en el caso de los 6 dígitos. page.keyboard.type(code, delay=80) code_filled = True print(f"[AUTO] Código tipeado vía keyboard.type (modo: {target_kind}).") except Exception as e: print(f"[AUTO] Falló tipear el código: {e}") if not code_filled: print("[AUTO] No pude tipear el código OTP en ningún input.") _save_debug_screenshot(page, "otp_input_not_found") return False # Pequeña espera para que el form procese cada caracter antes del posible auto-submit. time.sleep(1.5) _save_debug_screenshot(page, "code_typed") # Click en "Verificar"/"Continue"; algunos flujos autoenvían tras el último dígito. submit_clicked = False for sel in [ 'button:has-text("Verificar")', 'button:has-text("Verify")', 'button:has-text("Verify Code")', 'button:has-text("Verificar código")', 'button:has-text("Continuar")', 'button:has-text("Continue")', 'button:has-text("Confirmar")', 'button:has-text("Confirm")', 'button:has-text("Submit")', 'button:has-text("Enviar")', 'button[type="submit"]', ]: try: btn = page.locator(sel).first if btn.count() > 0 and btn.is_visible(timeout=500): btn.click() print(f"[AUTO] Click submit: {sel}") submit_clicked = True break except Exception: continue if not submit_clicked: # El form puede auto-submit tras 6to dígito (algunos OTPs lo hacen). Esperar. print("[AUTO] No encontré botón de submit explícito; espero auto-submit.") # Esperar a que llegue al dashboard. ok = _wait_for_login_completion(page, timeout_sec=60) _save_debug_screenshot(page, "final_state") if not ok: print(f"[AUTO] No se completó el login. URL actual: {page.url}") return ok def main(): parser = argparse.ArgumentParser(description="Genera la sesión persistente de Bucéfalo CRM.") parser.add_argument("--manual", action="store_true", help="Espera ENTER en la terminal en vez de detectar login automáticamente.") parser.add_argument("--no-auto", action="store_true", help="No intentar auto-login aunque haya credenciales en .env.") args = parser.parse_args() print("=== CONTROL DE SCRIPTS: ghl_browser_session_generator.py ===") auto_credentials = _has_auto_credentials() and not args.no_auto if auto_credentials: print("Descripción: AUTO-LOGIN con credenciales de .env (incluye 2FA por correo vía IMAP).") else: print("Descripción: Abre un navegador visible para que inicies sesión manualmente.") if not _has_auto_credentials() and not args.no_auto: print("[INFO] Tip: si rellenas .env con BUCEFALO_LOGIN_* y EMAIL_IMAP_*, el login se hace solo.") print("----------------------------------------------------------------------") if not ensure_playwright_browsers(): print("ERROR: No se pudieron preparar los binarios de Playwright.") sys.exit(1) from playwright.sync_api import sync_playwright # En modo auto, podemos correr headless (no necesita interacción humana). # En modo manual, navegador visible para que el usuario complete el login. headless = auto_credentials with sync_playwright() as p: print(f"[INFO] Lanzando navegador Chromium (headless={headless})...") if PERSISTENT_PROFILE_DIR: print(f"[INFO] Modo perfil persistente: {PERSISTENT_PROFILE_DIR}") os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True) context = p.chromium.launch_persistent_context( PERSISTENT_PROFILE_DIR, headless=headless, viewport={"width": 1280, "height": 800}, ) page = context.pages[0] if context.pages else context.new_page() browser = None else: browser = p.chromium.launch(headless=headless) context = browser.new_context(viewport={"width": 1280, "height": 800}) page = context.new_page() page.goto(LOGIN_URL) if auto_credentials: print("[AUTO] Iniciando flujo de auto-login...") ok = _perform_auto_login(page) if not ok: print("[ERROR] Auto-login falló. Se cierra sin guardar la sesión.") if browser is not None: browser.close() else: context.close() sys.exit(2) print("[AUTO] Login completado.") else: print("\n[ATENCIÓN] Inicia sesión y resuelve el MFA en la ventana del navegador.") if args.manual: print("[ATENCIÓN] Modo manual: cuando veas el dashboard, regresa a esta terminal.") input("\nPresiona ENTER aquí en la terminal cuando estés listo para guardar la sesión...") else: ok = _wait_for_login_completion(page) if not ok: print("[ERROR] Tiempo agotado esperando el login. Se cierra sin guardar.") if browser is not None: browser.close() else: context.close() sys.exit(2) print("[INFO] Login detectado. Esperando 2 s extra para estabilizar cookies...") time.sleep(2) if PERSISTENT_PROFILE_DIR: context.close() print(f"\n[ÉXITO] Perfil guardado en: {PERSISTENT_PROFILE_DIR}") else: context.storage_state(path=SESSION_FILE) print(f"\n[ÉXITO] Sesión guardada en: {SESSION_FILE}") browser.close() if __name__ == "__main__": main()