500 lines
19 KiB
Python
500 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Genera y guarda la sesión persistente de Bucefalo CRM.
|
|
|
|
Modos:
|
|
(default) Si hay credenciales en .env (BUCEFALO_LOGIN_EMAIL/PASSWORD + EMAIL_IMAP_*),
|
|
hace login automático: llena el form, pide código MFA por correo, lo lee
|
|
del IMAP y lo pega. Sin intervención humana.
|
|
Si faltan credenciales, abre el navegador y espera a que el usuario complete
|
|
login + MFA, autodetectando cuando termina (mismo comportamiento previo).
|
|
--manual Conserva el comportamiento original: espera un ENTER en la terminal.
|
|
--no-auto Fuerza el modo manual con detección automática (no usar credenciales aunque
|
|
estén configuradas).
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
from datetime import datetime, timezone
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
|
|
from paths import SCREENSHOTS_DIR as SCREENSHOT_DIR, SESSION_FILE, BROWSER_PROFILE_DEFAULT # noqa: E402
|
|
|
|
|
|
def _save_debug_screenshot(page, label):
|
|
"""Guarda una captura de pantalla con timestamp y devuelve la ruta."""
|
|
try:
|
|
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
|
|
ts = time.strftime("%Y%m%d_%H%M%S")
|
|
path = os.path.join(SCREENSHOT_DIR, f"autologin_{label}_{ts}.png")
|
|
page.screenshot(path=path, full_page=True)
|
|
print(f"[DEBUG] Captura guardada: {path}")
|
|
return path
|
|
except Exception as e:
|
|
print(f"[DEBUG] No se pudo guardar screenshot '{label}': {e}")
|
|
return None
|
|
|
|
# Cargar .env si existe.
|
|
try:
|
|
from dotenv import load_dotenv
|
|
load_dotenv(os.path.join(ROOT_DIR, ".env"))
|
|
except Exception:
|
|
pass
|
|
|
|
LOGIN_URL = "https://crm.bucefalocrm.io/"
|
|
# Tiempo máximo (s) que esperamos a que el usuario termine de loggear en modo no-auto.
|
|
AUTO_LOGIN_TIMEOUT_SEC = 300
|
|
PERSISTENT_PROFILE_DIR = os.environ.get("GHL_BROWSER_PROFILE_DIR", "").strip() or None
|
|
|
|
|
|
def ensure_playwright_browsers():
|
|
"""Si faltan los binarios de Chromium, los instala."""
|
|
try:
|
|
from playwright.sync_api import sync_playwright
|
|
except Exception as imp_err:
|
|
print(f"ERROR: 'playwright' no está instalado: {imp_err}")
|
|
return False
|
|
try:
|
|
with sync_playwright() as p:
|
|
exe = p.chromium.executable_path
|
|
if exe and os.path.exists(exe):
|
|
return True
|
|
except Exception:
|
|
pass
|
|
print("[INFO] Instalando binarios de Chromium (playwright install chromium)...")
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "playwright", "install", "chromium"],
|
|
capture_output=True, text=True, timeout=600,
|
|
)
|
|
if result.returncode != 0:
|
|
print(result.stderr.strip())
|
|
return False
|
|
return True
|
|
|
|
|
|
def _has_auto_credentials():
|
|
"""True si están todas las variables necesarias para login + MFA auto."""
|
|
needed = (
|
|
"BUCEFALO_LOGIN_EMAIL", "BUCEFALO_LOGIN_PASSWORD",
|
|
"EMAIL_IMAP_HOST", "EMAIL_IMAP_USER", "EMAIL_IMAP_PASSWORD",
|
|
)
|
|
return all((os.environ.get(k) or "").strip() for k in needed)
|
|
|
|
|
|
def _looks_like_login_page(page):
|
|
"""True si la página visible es del flujo de login (form de credenciales o de código OTP)."""
|
|
# Indicadores del login en el DOM principal y frames.
|
|
signals = [
|
|
'input[type="password"]',
|
|
'text=/Inicia.*sesi[oó]n.*cuenta/i',
|
|
'text=/Sign in to your account/i',
|
|
'text=/Verificar el C[oó]digo/i',
|
|
'text=/Verify.*Code/i',
|
|
]
|
|
for scope in [page] + list(page.frames):
|
|
for sig in signals:
|
|
try:
|
|
loc = scope.locator(sig).first
|
|
if loc.count() > 0:
|
|
try:
|
|
if loc.is_visible(timeout=200):
|
|
return True
|
|
except Exception:
|
|
return True
|
|
except Exception:
|
|
continue
|
|
return False
|
|
|
|
|
|
def _wait_for_login_completion(page, timeout_sec=AUTO_LOGIN_TIMEOUT_SEC):
|
|
"""Espera a que el login se complete. Lo detecta por:
|
|
1) URL fuera de /login o /auth, y
|
|
2) DOM ya no muestra el form de login (Bucéfalo es SPA y a veces no cambia la URL).
|
|
"""
|
|
print(f"[INFO] Esperando hasta {timeout_sec} s a que el login se complete…")
|
|
deadline = time.time() + timeout_sec
|
|
last_url = ""
|
|
# Estabilización: necesitamos N reads consecutivos confirmando que ya no estamos en login.
|
|
not_login_streak = 0
|
|
while time.time() < deadline:
|
|
try:
|
|
current = page.url
|
|
except Exception:
|
|
current = ""
|
|
if current and current != last_url:
|
|
print(f"[INFO] URL actual: {current}")
|
|
last_url = current
|
|
|
|
url_in_login = "login" in (current or "").lower() or "/auth" in (current or "").lower()
|
|
dom_in_login = _looks_like_login_page(page)
|
|
|
|
if not url_in_login and not dom_in_login:
|
|
not_login_streak += 1
|
|
if not_login_streak >= 2:
|
|
print("[INFO] Login completado (URL fuera del login + DOM sin form de credenciales).")
|
|
return True
|
|
else:
|
|
not_login_streak = 0
|
|
time.sleep(2)
|
|
return False
|
|
|
|
|
|
def _perform_auto_login(page):
|
|
"""Llena email + password + selecciona MFA por email + pega el código del IMAP.
|
|
|
|
Retorna True si llegó al dashboard, False si algo falló.
|
|
"""
|
|
from email_otp_reader import wait_for_bucefalo_otp, OtpReaderError
|
|
|
|
email = os.environ.get("BUCEFALO_LOGIN_EMAIL", "").strip()
|
|
password = os.environ.get("BUCEFALO_LOGIN_PASSWORD", "")
|
|
|
|
print("[AUTO] Llenando email + contraseña...")
|
|
try:
|
|
# GHL usa input[type=email] y input[type=password]. Selectores robustos.
|
|
page.locator('input[type="email"], input[name="email"]').first.fill(email, timeout=15000)
|
|
page.locator('input[type="password"], input[name="password"]').first.fill(password)
|
|
except Exception as e:
|
|
print(f"[AUTO] No se encontraron los campos del login: {e}")
|
|
return False
|
|
|
|
# Botón "Iniciar sesión" / "Sign in"
|
|
print("[AUTO] Click en 'Iniciar sesión'...")
|
|
submitted = False
|
|
for sel in [
|
|
'button:has-text("Iniciar sesión")',
|
|
'button:has-text("Iniciar")',
|
|
'button:has-text("Sign in")',
|
|
'button:has-text("Log in")',
|
|
'button[type="submit"]',
|
|
]:
|
|
try:
|
|
btn = page.locator(sel).first
|
|
if btn.count() > 0:
|
|
btn.click(timeout=5000)
|
|
submitted = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
if not submitted:
|
|
# Fallback: presionar Enter en el campo password.
|
|
try:
|
|
page.keyboard.press("Enter")
|
|
submitted = True
|
|
except Exception:
|
|
pass
|
|
if not submitted:
|
|
print("[AUTO] No pude enviar el formulario de login.")
|
|
return False
|
|
|
|
# Esperar el modal de selección de método 2FA y hacer click en "Email".
|
|
# NOTA: a veces Bucéfalo salta este paso si solo hay un método configurado.
|
|
print("[AUTO] Esperando selector de método 2FA o pantalla de código...")
|
|
time.sleep(4) # GHL muestra el modal tras procesar credenciales
|
|
_save_debug_screenshot(page, "post_login")
|
|
|
|
# Marcar el momento ANTES de pedir el código — el correo OTP debe ser posterior a esto.
|
|
request_ts = datetime.now(timezone.utc)
|
|
|
|
# Selectores ampliados para el método "Email" del 2FA.
|
|
# GHL puede usar botones, radios, tarjetas, etc.
|
|
email_method_selectors = [
|
|
'button:has-text("Email")',
|
|
'button:has-text("Correo electrónico")',
|
|
'button:has-text("Correo")',
|
|
'button:has-text("Send to Email")',
|
|
'button:has-text("Enviar al correo")',
|
|
'button:has-text("Por correo")',
|
|
'div[role="button"]:has-text("Email")',
|
|
'div[role="button"]:has-text("Correo")',
|
|
'label:has-text("Email")',
|
|
'label:has-text("Correo electrónico")',
|
|
'[data-test-id*="email"]',
|
|
'[data-testid*="email"]',
|
|
# Tarjetas clickeables con icono
|
|
'.tap-option-card:has-text("Email")',
|
|
'.method-card:has-text("Email")',
|
|
'[class*="option"]:has-text("Email")',
|
|
'[class*="method"]:has-text("Email")',
|
|
# Radios + label
|
|
'input[type="radio"][value*="email" i]',
|
|
'input[type="radio"][value*="mail" i]',
|
|
]
|
|
|
|
method_clicked = False
|
|
method_deadline = time.time() + 15
|
|
while time.time() < method_deadline and not method_clicked:
|
|
for sel in email_method_selectors:
|
|
try:
|
|
loc = page.locator(sel).first
|
|
if loc.count() > 0 and loc.is_visible(timeout=300):
|
|
loc.click(timeout=2000)
|
|
print(f"[AUTO] Método 2FA seleccionado vía: {sel}")
|
|
method_clicked = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
if not method_clicked:
|
|
time.sleep(1)
|
|
|
|
if not method_clicked:
|
|
print("[AUTO] No apareció el selector de método 2FA — asumo que Bucéfalo envió el código directo.")
|
|
_save_debug_screenshot(page, "no_method_selector")
|
|
|
|
# Botón opcional "Send code" / "Enviar código" después de elegir método.
|
|
time.sleep(1.5)
|
|
for sel in [
|
|
'button:has-text("Send code")', 'button:has-text("Send Code")',
|
|
'button:has-text("Enviar código")', 'button:has-text("Enviar")',
|
|
'button:has-text("Continuar")', 'button:has-text("Continue")',
|
|
]:
|
|
try:
|
|
btn = page.locator(sel).first
|
|
if btn.count() > 0 and btn.is_visible(timeout=500):
|
|
btn.click()
|
|
print(f"[AUTO] Click adicional: {sel}")
|
|
time.sleep(1)
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
_save_debug_screenshot(page, "before_otp_input")
|
|
|
|
# Esperar el correo OTP por IMAP.
|
|
print("[AUTO] Esperando código OTP por IMAP...")
|
|
try:
|
|
code = wait_for_bucefalo_otp(since=request_ts, debug=True)
|
|
except OtpReaderError as e:
|
|
print(f"[AUTO] No se pudo leer el OTP: {e}")
|
|
_save_debug_screenshot(page, "otp_read_failed")
|
|
return False
|
|
|
|
print(f"[AUTO] Código obtenido: {code}. Buscando dónde tipearlo...")
|
|
|
|
# Esperar hasta 15 s a que aparezca algún input del código.
|
|
# Probamos múltiples estrategias en orden de preferencia.
|
|
digit_inputs_selectors = [
|
|
'input[maxlength="1"]', # más amplio: cualquier input de 1 caracter
|
|
'input[inputmode="numeric"][maxlength="1"]',
|
|
'input[autocomplete="one-time-code"][maxlength="1"]',
|
|
]
|
|
single_input_selectors = [
|
|
'input[autocomplete="one-time-code"]:not([maxlength="1"])',
|
|
'input[name="code"]',
|
|
'input[name="otp"]',
|
|
'input[name="verification-code"]',
|
|
'input[type="text"][maxlength="6"]',
|
|
'input[type="number"][maxlength="6"]',
|
|
'input[type="tel"][maxlength="6"]',
|
|
'input[placeholder*="código" i]',
|
|
'input[placeholder*="code" i]',
|
|
'input[placeholder*="OTP" i]',
|
|
'input[aria-label*="code" i]',
|
|
'input[aria-label*="código" i]',
|
|
]
|
|
|
|
input_wait_deadline = time.time() + 15
|
|
target_input = None # locator del input donde hacer click (foco)
|
|
target_kind = None # 'digits' | 'single'
|
|
while time.time() < input_wait_deadline:
|
|
# Preferencia 1: detectar 6 (o más) inputs de 1 dígito.
|
|
for sel in digit_inputs_selectors:
|
|
try:
|
|
inputs = page.locator(sel).all()
|
|
# Filtrar a los visibles.
|
|
visible_inputs = []
|
|
for ip in inputs:
|
|
try:
|
|
if ip.is_visible(timeout=200):
|
|
visible_inputs.append(ip)
|
|
except Exception:
|
|
continue
|
|
if len(visible_inputs) >= 6:
|
|
target_input = visible_inputs[0]
|
|
target_kind = 'digits'
|
|
print(f"[AUTO] Detectados {len(visible_inputs)} inputs de 1 dígito ({sel}).")
|
|
break
|
|
except Exception:
|
|
continue
|
|
if target_kind:
|
|
break
|
|
# Preferencia 2: input único.
|
|
for sel in single_input_selectors:
|
|
try:
|
|
loc = page.locator(sel).first
|
|
if loc.count() > 0 and loc.is_visible(timeout=300):
|
|
target_input = loc
|
|
target_kind = 'single'
|
|
print(f"[AUTO] Detectado input único del código: {sel}")
|
|
break
|
|
except Exception:
|
|
continue
|
|
if target_kind:
|
|
break
|
|
time.sleep(1)
|
|
|
|
_save_debug_screenshot(page, "otp_input_search")
|
|
|
|
code_filled = False
|
|
if target_input is None:
|
|
# Último recurso: el primer input visible.
|
|
try:
|
|
all_inputs = page.locator('input').all()
|
|
for ip in all_inputs:
|
|
try:
|
|
if ip.is_visible(timeout=200):
|
|
target_input = ip
|
|
target_kind = 'fallback_first_visible'
|
|
print("[AUTO] Fallback: usando primer input visible.")
|
|
break
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
|
|
if target_input is not None:
|
|
try:
|
|
# Hacer click para asegurar foco en el primer input/único.
|
|
target_input.click(timeout=3000)
|
|
time.sleep(0.3)
|
|
# Tipear caracter por caracter con un pequeño delay para que Vue/React
|
|
# capture cada keydown y auto-avance al siguiente input en el caso de los 6 dígitos.
|
|
page.keyboard.type(code, delay=80)
|
|
code_filled = True
|
|
print(f"[AUTO] Código tipeado vía keyboard.type (modo: {target_kind}).")
|
|
except Exception as e:
|
|
print(f"[AUTO] Falló tipear el código: {e}")
|
|
|
|
if not code_filled:
|
|
print("[AUTO] No pude tipear el código OTP en ningún input.")
|
|
_save_debug_screenshot(page, "otp_input_not_found")
|
|
return False
|
|
|
|
# Pequeña espera para que el form procese cada caracter antes del posible auto-submit.
|
|
time.sleep(1.5)
|
|
_save_debug_screenshot(page, "code_typed")
|
|
|
|
# Click en "Verificar"/"Continue"; algunos flujos autoenvían tras el último dígito.
|
|
submit_clicked = False
|
|
for sel in [
|
|
'button:has-text("Verificar")', 'button:has-text("Verify")',
|
|
'button:has-text("Verify Code")', 'button:has-text("Verificar código")',
|
|
'button:has-text("Continuar")', 'button:has-text("Continue")',
|
|
'button:has-text("Confirmar")', 'button:has-text("Confirm")',
|
|
'button:has-text("Submit")', 'button:has-text("Enviar")',
|
|
'button[type="submit"]',
|
|
]:
|
|
try:
|
|
btn = page.locator(sel).first
|
|
if btn.count() > 0 and btn.is_visible(timeout=500):
|
|
btn.click()
|
|
print(f"[AUTO] Click submit: {sel}")
|
|
submit_clicked = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
if not submit_clicked:
|
|
# El form puede auto-submit tras 6to dígito (algunos OTPs lo hacen). Esperar.
|
|
print("[AUTO] No encontré botón de submit explícito; espero auto-submit.")
|
|
|
|
# Esperar a que llegue al dashboard.
|
|
ok = _wait_for_login_completion(page, timeout_sec=60)
|
|
_save_debug_screenshot(page, "final_state")
|
|
if not ok:
|
|
print(f"[AUTO] No se completó el login. URL actual: {page.url}")
|
|
return ok
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Genera la sesión persistente de Bucéfalo CRM.")
|
|
parser.add_argument("--manual", action="store_true",
|
|
help="Espera ENTER en la terminal en vez de detectar login automáticamente.")
|
|
parser.add_argument("--no-auto", action="store_true",
|
|
help="No intentar auto-login aunque haya credenciales en .env.")
|
|
args = parser.parse_args()
|
|
|
|
print("=== CONTROL DE SCRIPTS: ghl_browser_session_generator.py ===")
|
|
auto_credentials = _has_auto_credentials() and not args.no_auto
|
|
if auto_credentials:
|
|
print("Descripción: AUTO-LOGIN con credenciales de .env (incluye 2FA por correo vía IMAP).")
|
|
else:
|
|
print("Descripción: Abre un navegador visible para que inicies sesión manualmente.")
|
|
if not _has_auto_credentials() and not args.no_auto:
|
|
print("[INFO] Tip: si rellenas .env con BUCEFALO_LOGIN_* y EMAIL_IMAP_*, el login se hace solo.")
|
|
print("----------------------------------------------------------------------")
|
|
|
|
if not ensure_playwright_browsers():
|
|
print("ERROR: No se pudieron preparar los binarios de Playwright.")
|
|
sys.exit(1)
|
|
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
# En modo auto, podemos correr headless (no necesita interacción humana).
|
|
# En modo manual, navegador visible para que el usuario complete el login.
|
|
headless = auto_credentials
|
|
|
|
with sync_playwright() as p:
|
|
print(f"[INFO] Lanzando navegador Chromium (headless={headless})...")
|
|
if PERSISTENT_PROFILE_DIR:
|
|
print(f"[INFO] Modo perfil persistente: {PERSISTENT_PROFILE_DIR}")
|
|
os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True)
|
|
context = p.chromium.launch_persistent_context(
|
|
PERSISTENT_PROFILE_DIR,
|
|
headless=headless,
|
|
viewport={"width": 1280, "height": 800},
|
|
)
|
|
page = context.pages[0] if context.pages else context.new_page()
|
|
browser = None
|
|
else:
|
|
browser = p.chromium.launch(headless=headless)
|
|
context = browser.new_context(viewport={"width": 1280, "height": 800})
|
|
page = context.new_page()
|
|
|
|
page.goto(LOGIN_URL)
|
|
|
|
if auto_credentials:
|
|
print("[AUTO] Iniciando flujo de auto-login...")
|
|
ok = _perform_auto_login(page)
|
|
if not ok:
|
|
print("[ERROR] Auto-login falló. Se cierra sin guardar la sesión.")
|
|
if browser is not None:
|
|
browser.close()
|
|
else:
|
|
context.close()
|
|
sys.exit(2)
|
|
print("[AUTO] Login completado.")
|
|
else:
|
|
print("\n[ATENCIÓN] Inicia sesión y resuelve el MFA en la ventana del navegador.")
|
|
if args.manual:
|
|
print("[ATENCIÓN] Modo manual: cuando veas el dashboard, regresa a esta terminal.")
|
|
input("\nPresiona ENTER aquí en la terminal cuando estés listo para guardar la sesión...")
|
|
else:
|
|
ok = _wait_for_login_completion(page)
|
|
if not ok:
|
|
print("[ERROR] Tiempo agotado esperando el login. Se cierra sin guardar.")
|
|
if browser is not None:
|
|
browser.close()
|
|
else:
|
|
context.close()
|
|
sys.exit(2)
|
|
print("[INFO] Login detectado. Esperando 2 s extra para estabilizar cookies...")
|
|
time.sleep(2)
|
|
|
|
if PERSISTENT_PROFILE_DIR:
|
|
context.close()
|
|
print(f"\n[ÉXITO] Perfil guardado en: {PERSISTENT_PROFILE_DIR}")
|
|
else:
|
|
context.storage_state(path=SESSION_FILE)
|
|
print(f"\n[ÉXITO] Sesión guardada en: {SESSION_FILE}")
|
|
browser.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|