Files
MP-Manager/scripts/email_otp_reader.py
2026-05-30 14:31:19 -06:00

203 lines
7.6 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Lee códigos OTP de Bucéfalo (Login security code) desde IMAP.
Bucéfalo manda el código 2FA desde `noreply@donotreply.acct-mgmt.com` con un cuerpo
tipo: `Your one time login code for logging into crm.bucefalocrm.io is 929711.`
Uso:
from scripts.email_otp_reader import wait_for_bucefalo_otp
code = wait_for_bucefalo_otp(since=datetime.now()) # bloquea hasta verlo o timeout
Configuración: variables de entorno (carga `.env` si existe).
EMAIL_IMAP_HOST, EMAIL_IMAP_PORT, EMAIL_IMAP_USER, EMAIL_IMAP_PASSWORD,
EMAIL_IMAP_FOLDER (default INBOX), OTP_TIMEOUT_SECONDS, OTP_POLL_INTERVAL_SECONDS.
"""
import os
import re
import sys
import time
from datetime import datetime, timedelta, timezone
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
# Cargar .env si existe (silencioso si falta python-dotenv).
try:
from dotenv import load_dotenv
load_dotenv(os.path.join(ROOT_DIR, ".env"))
except Exception:
pass
# Remitente esperado del correo de 2FA de Bucéfalo / GHL.
EXPECTED_SENDERS = (
"noreply@donotreply.acct-mgmt.com",
"donotreply.acct-mgmt.com",
)
# Patrones para extraer el código. El correo dice "is 929711." pero por robustez
# probamos varios formatos.
CODE_PATTERNS = (
re.compile(r"\bis\s+(\d{6})\b", re.IGNORECASE), # "is 929711"
re.compile(r"\bc[oó]digo[^\d]{0,20}(\d{6})\b", re.IGNORECASE), # español
re.compile(r"\bcode[^\d]{0,20}(\d{6})\b", re.IGNORECASE), # inglés
re.compile(r"(?<!\d)(\d{6})(?!\d)"), # cualquier grupo aislado de 6 dígitos (último recurso)
)
class OtpReaderError(Exception):
pass
def _config():
"""Lee variables de entorno y normaliza."""
host = (os.environ.get("EMAIL_IMAP_HOST") or "").strip()
port = int(os.environ.get("EMAIL_IMAP_PORT") or "993")
user = (os.environ.get("EMAIL_IMAP_USER") or "").strip()
pwd = os.environ.get("EMAIL_IMAP_PASSWORD") or ""
folder = (os.environ.get("EMAIL_IMAP_FOLDER") or "INBOX").strip()
timeout = int(os.environ.get("OTP_TIMEOUT_SECONDS") or "120")
poll = int(os.environ.get("OTP_POLL_INTERVAL_SECONDS") or "3")
if not host or not user or not pwd:
raise OtpReaderError(
"Faltan variables IMAP en .env: EMAIL_IMAP_HOST / EMAIL_IMAP_USER / EMAIL_IMAP_PASSWORD"
)
return host, port, user, pwd, folder, timeout, poll
def _extract_code(text):
"""Devuelve el primer código de 6 dígitos que matchee algún patrón, o None."""
if not text:
return None
for pat in CODE_PATTERNS:
m = pat.search(text)
if m:
return m.group(1)
return None
def wait_for_bucefalo_otp(since=None, timeout_sec=None, poll_sec=None, debug=False):
"""Espera el próximo OTP de Bucéfalo y devuelve el código (string de 6 dígitos).
Args:
since: datetime con timezone; ignora correos anteriores a esta fecha.
Si es None, usa "ahora - 60s" para tolerar relojes desfasados.
timeout_sec: override del timeout (default desde .env).
poll_sec: override del intervalo (default desde .env).
debug: imprime info de los correos vistos por consola.
Returns:
str: código de 6 dígitos.
Raises:
OtpReaderError: si caduca o falla la conexión.
"""
try:
from imap_tools import MailBox, AND
except ImportError as e:
raise OtpReaderError(
"Falta la librería imap-tools. Instálala con: pip install imap-tools"
) from e
host, port, user, pwd, folder, default_timeout, default_poll = _config()
if since is None:
since = datetime.now(timezone.utc) - timedelta(seconds=60)
if since.tzinfo is None:
# Asumimos UTC si no viene con tz.
since = since.replace(tzinfo=timezone.utc)
# Margen para tolerar skew entre el reloj local y el del servidor IMAP/SMTP
# (vimos correos que tardan 20-30s y a veces el server stamp viene desfasado).
effective_since = since - timedelta(seconds=45)
since = effective_since
timeout_sec = timeout_sec if timeout_sec is not None else default_timeout
poll_sec = poll_sec if poll_sec is not None else default_poll
deadline = time.time() + timeout_sec
seen_uids = set()
if debug:
print(f"[OTP] Conectando a IMAP {host}:{port} como {user}...")
try:
with MailBox(host, port=port).login(user, pwd, initial_folder=folder) as mailbox:
if debug:
print(f"[OTP] Conectado. Esperando correo de {EXPECTED_SENDERS[0]} (timeout {timeout_sec}s)...")
while time.time() < deadline:
# imap-tools acepta filtros AND; buscamos correos desde la fecha indicada.
# IMAP `SINCE` ignora la hora, solo mira fecha → la usamos como filtro burdo
# y luego filtramos por timestamp real en Python.
since_date = since.date()
try:
candidates = list(mailbox.fetch(
AND(date_gte=since_date),
mark_seen=False,
reverse=True, # más nuevo primero
limit=10,
))
except Exception as fe:
if debug:
print(f"[OTP] Error fetch: {fe}")
candidates = []
for msg in candidates:
if msg.uid in seen_uids:
continue
seen_uids.add(msg.uid)
sender = (msg.from_ or "").lower()
if not any(s in sender for s in EXPECTED_SENDERS):
if debug:
print(f"[OTP] skip (otro remitente): {sender}")
continue
# Filtro por timestamp real (no solo fecha).
msg_dt = msg.date
if msg_dt is None:
if debug:
print(f"[OTP] skip (sin fecha): uid={msg.uid}")
continue
if msg_dt.tzinfo is None:
msg_dt = msg_dt.replace(tzinfo=timezone.utc)
if msg_dt < since:
if debug:
print(f"[OTP] skip (anterior a since={since.isoformat()}): {msg_dt.isoformat()}")
continue
body = (msg.text or "") + " " + (msg.subject or "")
code = _extract_code(body)
if code:
if debug:
print(f"[OTP] ¡Código encontrado! {code} (de mensaje {msg_dt.isoformat()})")
return code
if debug:
print(f"[OTP] matcheó remitente pero no extrajo código de: {body[:160]}")
time.sleep(poll_sec)
except OtpReaderError:
raise
except Exception as e:
raise OtpReaderError(f"Error IMAP ({type(e).__name__}): {e}") from e
raise OtpReaderError(f"Tiempo agotado ({timeout_sec}s) esperando el código OTP de Bucéfalo.")
def main():
"""CLI de prueba: imprime el próximo código OTP que llegue."""
print("=== email_otp_reader: prueba CLI ===")
print("Esperando el próximo correo OTP de Bucéfalo (provócalo intentando loggearte ahora)...")
try:
code = wait_for_bucefalo_otp(debug=True)
print(f"\n[ÉXITO] Código recibido: {code}")
sys.exit(0)
except OtpReaderError as e:
print(f"\n[ERROR] {e}")
sys.exit(1)
if __name__ == "__main__":
main()