1845 lines
74 KiB
Python
1845 lines
74 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Administra Workflows de GHL de forma híbrida (intercepción de tokens + llamadas API + automatización DOM)."""
|
|
|
|
import argparse
|
|
import atexit
|
|
import json as _json_top
|
|
import os
|
|
import queue
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
from concurrent.futures import ThreadPoolExecutor
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
|
|
import db
|
|
import error_logging
|
|
import script_audit
|
|
from script_logger import RunLogger
|
|
|
|
from paths import SESSION_FILE, SCREENSHOTS_DIR # noqa: E402
|
|
# Si la sesión tiene más de este tiempo, advertimos al usuario antes de gastar tiempo en el navegador.
|
|
SESSION_MAX_AGE_HOURS = 24
|
|
|
|
# Modo de persistencia del navegador:
|
|
# - Por defecto: launch() + storage_state=generated/browser/session.json. El storage_state se
|
|
# refresca al cerrar para capturar cookies emitidas por GHL durante la sesión.
|
|
# - Si la variable de entorno GHL_BROWSER_PROFILE_DIR está definida, usamos un perfil de
|
|
# Chrome persistente completo (cache, IndexedDB, cookies HttpOnly) en ese directorio. Más
|
|
# parecido a un navegador real, pero impide correr scripts en paralelo (el perfil queda
|
|
# bloqueado por un proceso a la vez).
|
|
PERSISTENT_PROFILE_DIR = os.environ.get("GHL_BROWSER_PROFILE_DIR", "").strip() or None
|
|
|
|
|
|
def _open_browser(playwright_p, headless=True):
|
|
"""Crea el browser/context según el modo (shared storage_state vs persistent profile).
|
|
|
|
Devuelve (browser, context, page). En modo persistent, browser es None — el `context`
|
|
representa toda la sesión y se cierra con `context.close()`.
|
|
"""
|
|
if PERSISTENT_PROFILE_DIR:
|
|
os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True)
|
|
print(f"[INFO] Modo perfil persistente: {PERSISTENT_PROFILE_DIR}")
|
|
context = playwright_p.chromium.launch_persistent_context(
|
|
PERSISTENT_PROFILE_DIR,
|
|
headless=headless,
|
|
viewport={"width": 1280, "height": 800},
|
|
)
|
|
page = context.pages[0] if context.pages else context.new_page()
|
|
return None, context, page
|
|
|
|
browser = playwright_p.chromium.launch(headless=headless)
|
|
context = browser.new_context(storage_state=SESSION_FILE)
|
|
page = context.new_page()
|
|
return browser, context, page
|
|
|
|
|
|
def _close_and_save(browser, context):
|
|
"""Cierra el browser y guarda el storage_state actualizado en SESSION_FILE.
|
|
|
|
Esto refresca las cookies (incluyendo cualquier token rotado por GHL durante la sesión),
|
|
así próximas ejecuciones empiezan con cookies frescas y la sesión dura más.
|
|
En modo persistente no se guarda SESSION_FILE — el perfil se persiste solo.
|
|
"""
|
|
# Verificar que el browser está vivo antes de llamar storage_state. Si el event loop de
|
|
# sync_playwright ya empezó a cerrar (p.ej. salimos del `with` o nos interrumpieron),
|
|
# storage_state devuelve una coroutine huérfana y dispara RuntimeWarning / RecursionError.
|
|
is_alive = False
|
|
if browser is not None:
|
|
try:
|
|
is_alive = browser.is_connected()
|
|
except Exception:
|
|
is_alive = False
|
|
|
|
try:
|
|
if is_alive and context is not None and browser is not None:
|
|
try:
|
|
context.storage_state(path=SESSION_FILE)
|
|
except Exception:
|
|
pass # silencioso — si falla aquí es porque ya está cerrando
|
|
if browser is not None:
|
|
try:
|
|
browser.close()
|
|
except Exception:
|
|
pass
|
|
elif context is not None:
|
|
# Modo persistent: cerrar el context guarda el perfil al disco.
|
|
try:
|
|
context.close()
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
# Limpiar las referencias en el estado de interrupción.
|
|
if _INTERRUPT_STATE.get("browser") is browser:
|
|
_INTERRUPT_STATE["browser"] = None
|
|
if _INTERRUPT_STATE.get("context") is context:
|
|
_INTERRUPT_STATE["context"] = None
|
|
|
|
def _launch_browser(playwright_p, headless=True):
|
|
"""Lanza solo el browser (sin context). Sirve para topología paralela: 1 browser → N contexts.
|
|
|
|
En modo persistente devuelve (None, context) ya que `launch_persistent_context` no separa
|
|
browser/context — y en ese caso no es paralelizable (lock del perfil).
|
|
"""
|
|
if PERSISTENT_PROFILE_DIR:
|
|
os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True)
|
|
print(f"[INFO] Modo perfil persistente: {PERSISTENT_PROFILE_DIR}")
|
|
context = playwright_p.chromium.launch_persistent_context(
|
|
PERSISTENT_PROFILE_DIR,
|
|
headless=headless,
|
|
viewport={"width": 1280, "height": 800},
|
|
)
|
|
return None, context
|
|
browser = playwright_p.chromium.launch(headless=headless)
|
|
return browser, None
|
|
|
|
|
|
def _new_worker_context(browser):
|
|
"""Crea un nuevo BrowserContext+Page a partir del browser compartido, reusando SESSION_FILE."""
|
|
context = browser.new_context(storage_state=SESSION_FILE)
|
|
page = context.new_page()
|
|
return context, page
|
|
|
|
|
|
# Estado por-worker para que el handler de interrupción pueda cerrar todos los contexts en paralelo.
|
|
_WORKER_STATES = []
|
|
_WORKER_STATES_LOCK = threading.Lock()
|
|
|
|
|
|
def _register_worker_state(state):
|
|
with _WORKER_STATES_LOCK:
|
|
_WORKER_STATES.append(state)
|
|
|
|
|
|
def _unregister_worker_state(state):
|
|
with _WORKER_STATES_LOCK:
|
|
try:
|
|
_WORKER_STATES.remove(state)
|
|
except ValueError:
|
|
pass
|
|
|
|
|
|
def _run_parallel_bulk(
|
|
items,
|
|
action_fn,
|
|
*,
|
|
workers,
|
|
run_id,
|
|
action_label,
|
|
per_action_pause,
|
|
headless=True,
|
|
resync_after=True,
|
|
):
|
|
"""Procesa `items` con N workers compartiendo un único browser.
|
|
|
|
`action_fn(page, item, prefix, run_id)` debe devolver un dict con al menos:
|
|
{"workflow_id", "location_id", "name", "status": "success"|"failed"|"skipped", "reason"?}
|
|
Si lanza `SessionExpiredError`, el bulk aborta a todos los workers.
|
|
"""
|
|
from playwright.sync_api import sync_playwright
|
|
|
|
# Defensa: el perfil persistente no admite múltiples contexts simultáneos sobre el mismo dir.
|
|
effective_workers = 1 if PERSISTENT_PROFILE_DIR else max(1, min(int(workers or 1), 5))
|
|
if PERSISTENT_PROFILE_DIR and workers and int(workers) > 1:
|
|
print("[INFO] Perfil persistente activo → forzando workers=1.", flush=True)
|
|
|
|
# Agrupar por location_id para que cada worker procese sucursales contiguas (menos saltos).
|
|
items_sorted = sorted(items, key=lambda it: (it.get("location_id") or "", it.get("workflow_id") or ""))
|
|
|
|
work_q = queue.Queue()
|
|
for it in items_sorted:
|
|
work_q.put(it)
|
|
total = work_q.qsize()
|
|
|
|
results = []
|
|
results_lock = threading.Lock()
|
|
cancel_event = threading.Event()
|
|
counter = {"done": 0}
|
|
counter_lock = threading.Lock()
|
|
|
|
print(f"[METRICS] start action={action_label} workers={effective_workers} items={total} pause={per_action_pause}s", flush=True)
|
|
t0 = time.monotonic()
|
|
|
|
# Logger estructurado por-run (no-op si run_id es None).
|
|
rlog = RunLogger(run_id, os.path.basename(__file__))
|
|
rlog.info("bulk_start", action=action_label, workers=effective_workers, items=total,
|
|
per_action_pause=per_action_pause)
|
|
|
|
def _worker(worker_idx):
|
|
"""Cada worker abre su propio sync_playwright + browser + context.
|
|
|
|
Playwright sync API NO permite compartir un Browser entre threads
|
|
(greenlet event loop por thread → 'cannot switch to a different thread').
|
|
"""
|
|
pw = sync_playwright().start()
|
|
owns_pw = True
|
|
try:
|
|
if PERSISTENT_PROFILE_DIR:
|
|
os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True)
|
|
context = pw.chromium.launch_persistent_context(
|
|
PERSISTENT_PROFILE_DIR, headless=headless,
|
|
viewport={"width": 1280, "height": 800},
|
|
)
|
|
browser = None
|
|
page = context.pages[0] if context.pages else context.new_page()
|
|
else:
|
|
browser = pw.chromium.launch(headless=headless)
|
|
context = browser.new_context(storage_state=SESSION_FILE)
|
|
page = context.new_page()
|
|
except Exception as launch_err:
|
|
print(f"[ERROR] worker {worker_idx} no pudo lanzar browser: {launch_err}", flush=True)
|
|
try:
|
|
pw.stop()
|
|
except Exception:
|
|
pass
|
|
return
|
|
state = {"context": context, "page": page, "worker_idx": worker_idx, "location_id": None, "workflow_id": None}
|
|
_register_worker_state(state)
|
|
try:
|
|
while not cancel_event.is_set():
|
|
try:
|
|
item = work_q.get_nowait()
|
|
except queue.Empty:
|
|
return
|
|
wf_id = item.get("workflow_id") or item.get("id")
|
|
loc_id = item.get("location_id")
|
|
name = item.get("name") or wf_id or "?"
|
|
with counter_lock:
|
|
counter["done"] += 1
|
|
idx = counter["done"]
|
|
short_loc = (loc_id or "")[:6]
|
|
prefix = f"[BULK {idx}/{total} w{worker_idx} {short_loc}]"
|
|
state["location_id"] = loc_id
|
|
state["workflow_id"] = wf_id
|
|
|
|
if not wf_id or not loc_id:
|
|
print(f"{prefix} SKIP — falta workflow_id o location_id: {item}", flush=True)
|
|
with results_lock:
|
|
results.append({"workflow_id": wf_id, "location_id": loc_id, "name": name,
|
|
"status": "skipped", "reason": "datos incompletos"})
|
|
print(f"{prefix} RESULT: skipped", flush=True)
|
|
continue
|
|
|
|
print(f"{prefix} === '{name}' ({wf_id}) en {loc_id} ===", flush=True)
|
|
item_t0 = time.monotonic()
|
|
rlog.info("item_start", worker_id=worker_idx, location_id=loc_id,
|
|
workflow_id=wf_id, name=name)
|
|
|
|
try:
|
|
result = action_fn(page, item, prefix, run_id)
|
|
if not isinstance(result, dict):
|
|
result = {"workflow_id": wf_id, "location_id": loc_id, "name": name,
|
|
"status": "success" if result else "failed"}
|
|
result.setdefault("workflow_id", wf_id)
|
|
result.setdefault("location_id", loc_id)
|
|
result.setdefault("name", name)
|
|
except SessionExpiredError as se:
|
|
print(f"{prefix} SESIÓN EXPIRADA: {se}", flush=True)
|
|
with results_lock:
|
|
results.append({"workflow_id": wf_id, "location_id": loc_id, "name": name,
|
|
"status": "skipped", "reason": "sesión expirada"})
|
|
rlog.error("session_expired", worker_id=worker_idx, location_id=loc_id,
|
|
workflow_id=wf_id, message=str(se))
|
|
cancel_event.set()
|
|
print(f"{prefix} RESULT: skipped", flush=True)
|
|
break
|
|
except Exception as ie:
|
|
err_id = error_logging.log_error(
|
|
"playwright_parallel_bulk_item_failed", ie,
|
|
{"action": action_label, "location_id": loc_id, "workflow_id": wf_id,
|
|
"run_id": run_id, "worker_id": worker_idx},
|
|
)
|
|
print(f"{prefix} EXCEPCIÓN: {ie} | error_id={err_id}", flush=True)
|
|
result = {"workflow_id": wf_id, "location_id": loc_id, "name": name,
|
|
"status": "failed", "error_id": err_id}
|
|
rlog.error("item_failed", worker_id=worker_idx, location_id=loc_id,
|
|
workflow_id=wf_id, error_id=err_id, message=str(ie)[:500])
|
|
|
|
with results_lock:
|
|
results.append(result)
|
|
print(f"{prefix} RESULT: {result['status']}", flush=True)
|
|
rlog.info("item_done", worker_id=worker_idx, location_id=loc_id,
|
|
workflow_id=wf_id, status=result.get("status"),
|
|
change_id=result.get("change_id"), error_id=result.get("error_id"),
|
|
duration_ms=int((time.monotonic() - item_t0) * 1000))
|
|
|
|
if resync_after and result.get("status") == "success":
|
|
try:
|
|
_resync_local_workflows(loc_id)
|
|
except Exception:
|
|
pass
|
|
|
|
if per_action_pause and per_action_pause > 0:
|
|
# Romper la espera si llega un cancel para reaccionar rápido.
|
|
waited = 0.0
|
|
step = 0.25
|
|
while waited < per_action_pause and not cancel_event.is_set():
|
|
time.sleep(step)
|
|
waited += step
|
|
finally:
|
|
_unregister_worker_state(state)
|
|
# Persistir storage_state del PRIMER worker que termina limpio (best-effort).
|
|
if not PERSISTENT_PROFILE_DIR and browser is not None:
|
|
try:
|
|
context.storage_state(path=SESSION_FILE)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
context.close()
|
|
except Exception:
|
|
pass
|
|
if browser is not None:
|
|
try:
|
|
browser.close()
|
|
except Exception:
|
|
pass
|
|
if owns_pw:
|
|
try:
|
|
pw.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
if effective_workers == 1:
|
|
_worker(1)
|
|
else:
|
|
with ThreadPoolExecutor(max_workers=effective_workers) as ex:
|
|
futures = [ex.submit(_worker, i + 1) for i in range(effective_workers)]
|
|
for f in futures:
|
|
try:
|
|
f.result()
|
|
except Exception as fe:
|
|
print(f"[ERROR] worker terminó con excepción: {fe}", flush=True)
|
|
except Exception as e:
|
|
err_id = error_logging.log_error(
|
|
"playwright_parallel_bulk_failed", e,
|
|
{"action": action_label, "items_count": total, "workers": effective_workers,
|
|
"run_id": run_id},
|
|
)
|
|
print(f"ERROR fatal en bulk paralelo: {e} | error_id={err_id}", flush=True)
|
|
|
|
# Rellenar items que quedaron en la queue tras un cancel (session expired).
|
|
while True:
|
|
try:
|
|
it = work_q.get_nowait()
|
|
except queue.Empty:
|
|
break
|
|
with results_lock:
|
|
results.append({
|
|
"workflow_id": it.get("workflow_id") or it.get("id"),
|
|
"location_id": it.get("location_id"),
|
|
"name": it.get("name") or "?",
|
|
"status": "skipped",
|
|
"reason": "sesión expirada — bulk abortado",
|
|
})
|
|
|
|
dt = time.monotonic() - t0
|
|
success = sum(1 for r in results if r["status"] == "success")
|
|
skipped = sum(1 for r in results if r["status"] == "skipped")
|
|
failed = sum(1 for r in results if r["status"] == "failed")
|
|
rate = (total / dt) if dt > 0 else 0.0
|
|
print(f"[METRICS] end action={action_label} dt={dt:.1f}s rate={rate:.2f}/s success={success} skipped={skipped} failed={failed}", flush=True)
|
|
rlog.info("bulk_end", action=action_label, duration_s=round(dt, 2),
|
|
success=success, skipped=skipped, failed=failed, rate=round(rate, 3))
|
|
rlog.close()
|
|
|
|
# Persistir métricas a logs (best-effort).
|
|
try:
|
|
from paths import LOGS_DIR
|
|
os.makedirs(LOGS_DIR, exist_ok=True)
|
|
metrics_path = os.path.join(LOGS_DIR, "parallel_runs.jsonl")
|
|
with open(metrics_path, "a", encoding="utf-8") as mf:
|
|
mf.write(_json_top.dumps({
|
|
"ts": datetime_now_str(),
|
|
"run_id": run_id,
|
|
"script": os.path.basename(sys.argv[0]) if sys.argv else "manager",
|
|
"action": action_label,
|
|
"workers": effective_workers,
|
|
"items": total,
|
|
"duration_s": round(dt, 2),
|
|
"success": success,
|
|
"skipped": skipped,
|
|
"failed": failed,
|
|
}, ensure_ascii=False) + "\n")
|
|
except Exception:
|
|
pass
|
|
|
|
return results, failed, success, skipped
|
|
|
|
|
|
def datetime_now_str():
|
|
from datetime import datetime as _dt
|
|
return _dt.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
# Estado global para que los handlers de señales puedan cerrar el browser y reportar.
|
|
_INTERRUPT_STATE = {
|
|
"browser": None,
|
|
"context": None,
|
|
"playwright_ctx": None,
|
|
"location_id": None,
|
|
"workflow_id": None,
|
|
"run_id": None,
|
|
"target_active": None,
|
|
"action": None,
|
|
"shutting_down": False,
|
|
}
|
|
|
|
|
|
def _emergency_cleanup(signum=None, _frame=None):
|
|
"""Cierra el browser y reporta estado si el proceso es interrumpido (Ctrl+C, server restart, etc.)."""
|
|
if _INTERRUPT_STATE["shutting_down"]:
|
|
return
|
|
|
|
br = _INTERRUPT_STATE.get("browser")
|
|
# Detectar si el browser ya está cerrado (ruta normal). En ese caso no hay nada que limpiar.
|
|
browser_alive = False
|
|
if br is not None:
|
|
try:
|
|
browser_alive = br.is_connected()
|
|
except Exception:
|
|
browser_alive = False
|
|
|
|
# Si venimos por atexit y el browser ya no vive y no hay run_id pendiente, salir silencioso.
|
|
if signum is None and not browser_alive:
|
|
return
|
|
|
|
_INTERRUPT_STATE["shutting_down"] = True
|
|
|
|
sig_name = "atexit" if signum is None else f"signal {signum}"
|
|
print(f"\n[INTERRUPCIÓN] Proceso terminado por {sig_name}. Cerrando navegador...", flush=True)
|
|
|
|
# Cerrar contexts de workers paralelos (si los hay) antes que el browser.
|
|
try:
|
|
with _WORKER_STATES_LOCK:
|
|
worker_snapshots = list(_WORKER_STATES)
|
|
for st in worker_snapshots:
|
|
wctx = st.get("context")
|
|
if wctx is None:
|
|
continue
|
|
try:
|
|
wctx.close()
|
|
except Exception:
|
|
pass
|
|
if worker_snapshots:
|
|
print(f"[INTERRUPCIÓN] Cerrados {len(worker_snapshots)} context(s) de workers.", flush=True)
|
|
except Exception:
|
|
pass
|
|
|
|
# Cerrar el browser si todavía vive. Primero refrescar storage_state si hay context.
|
|
if browser_alive:
|
|
ctx = _INTERRUPT_STATE.get("context")
|
|
if ctx is not None and br is not None and not PERSISTENT_PROFILE_DIR:
|
|
try:
|
|
ctx.storage_state(path=SESSION_FILE)
|
|
print(f"[INTERRUPCIÓN] Cookies refrescadas en {SESSION_FILE}.", flush=True)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
br.close()
|
|
print("[INTERRUPCIÓN] Navegador cerrado limpiamente.", flush=True)
|
|
except Exception:
|
|
pass
|
|
|
|
# Intentar reportar el estado real consultando la API (sólo si era toggle-status — mutaciones más críticas).
|
|
loc_id = _INTERRUPT_STATE.get("location_id")
|
|
wf_id = _INTERRUPT_STATE.get("workflow_id")
|
|
action = _INTERRUPT_STATE.get("action")
|
|
target_active = _INTERRUPT_STATE.get("target_active")
|
|
if loc_id and wf_id and action == "toggle-status":
|
|
try:
|
|
import sync_engine
|
|
tokens = sync_engine.get_tokens_map()
|
|
tok = tokens.get(loc_id)
|
|
if tok:
|
|
wfs = sync_engine.ghl_client.get_workflows(tok, loc_id)
|
|
actual = next((w for w in wfs if w.get("id") == wf_id), None)
|
|
if actual:
|
|
st = (actual.get("status") or "").lower()
|
|
actual_active = st in ("active", "published")
|
|
if target_active is None:
|
|
print(f"[INTERRUPCIÓN] Estado actual en API: '{st}'.", flush=True)
|
|
elif actual_active == target_active:
|
|
print(f"[INTERRUPCIÓN] El cambio SÍ se aplicó pese a la interrupción (API: '{st}').", flush=True)
|
|
else:
|
|
print(f"[INTERRUPCIÓN] El cambio NO se aplicó (API: '{st}'). Reintenta cuando estés listo.", flush=True)
|
|
except Exception as ce:
|
|
print(f"[INTERRUPCIÓN] No se pudo consultar API para reportar estado: {ce}", flush=True)
|
|
|
|
# Marcar la auditoría como interrumpida.
|
|
run_id = _INTERRUPT_STATE.get("run_id")
|
|
if run_id:
|
|
try:
|
|
script_audit.update_run_status(run_id, "failed", f"Proceso interrumpido ({sig_name})")
|
|
except Exception:
|
|
pass
|
|
|
|
# Si fue por señal, salir con código convencional (130 = SIGINT, 143 = SIGTERM).
|
|
if signum is not None:
|
|
sys.exit(130 if signum in (signal.SIGINT,) else 143)
|
|
|
|
|
|
def _install_signal_handlers():
|
|
"""Instala handlers para SIGINT, SIGTERM y (en Windows) SIGBREAK."""
|
|
try:
|
|
signal.signal(signal.SIGINT, _emergency_cleanup)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
signal.signal(signal.SIGTERM, _emergency_cleanup)
|
|
except Exception:
|
|
pass
|
|
# En Windows, Ctrl+Break envía SIGBREAK.
|
|
if hasattr(signal, "SIGBREAK"):
|
|
try:
|
|
signal.signal(signal.SIGBREAK, _emergency_cleanup)
|
|
except Exception:
|
|
pass
|
|
atexit.register(_emergency_cleanup)
|
|
|
|
|
|
def ensure_playwright_browsers():
|
|
"""Verifica que los binarios de Chromium de Playwright existan; si no, los instala.
|
|
|
|
Devuelve True si están listos para usar, False si no se pudo dejarlos disponibles.
|
|
"""
|
|
try:
|
|
from playwright.sync_api import sync_playwright
|
|
except Exception as imp_err:
|
|
print(f"ERROR: El paquete 'playwright' no está instalado: {imp_err}")
|
|
print("Solución: python -m pip install playwright")
|
|
return False
|
|
|
|
try:
|
|
with sync_playwright() as p:
|
|
exe_path = p.chromium.executable_path
|
|
if exe_path and os.path.exists(exe_path):
|
|
return True
|
|
except Exception:
|
|
# Si executable_path lanza, asumimos que faltan binarios y dejamos que la instalación los repare.
|
|
pass
|
|
|
|
print("[INFO] Binarios de Chromium no encontrados. Instalando con 'playwright install chromium'...")
|
|
try:
|
|
result = subprocess.run(
|
|
[sys.executable, "-m", "playwright", "install", "chromium"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=600,
|
|
)
|
|
if result.returncode != 0:
|
|
print(f"ERROR: 'playwright install chromium' falló (código {result.returncode}).")
|
|
if result.stderr:
|
|
print(result.stderr.strip())
|
|
return False
|
|
print("[INFO] Binarios de Chromium instalados correctamente.")
|
|
return True
|
|
except Exception as inst_err:
|
|
print(f"ERROR ejecutando 'playwright install chromium': {inst_err}")
|
|
return False
|
|
|
|
|
|
def session_file_status():
|
|
"""Devuelve (existe, edad_en_horas o None) del archivo de sesión."""
|
|
if not os.path.exists(SESSION_FILE):
|
|
return False, None
|
|
age_hours = (time.time() - os.path.getmtime(SESSION_FILE)) / 3600.0
|
|
return True, age_hours
|
|
|
|
SCREENSHOT_DIR = SCREENSHOTS_DIR # alias retrocompatible
|
|
|
|
|
|
def _save_debug_screenshot(page, label: str) -> str:
|
|
"""Guarda una captura de pantalla de diagnóstico y devuelve la ruta."""
|
|
os.makedirs(SCREENSHOT_DIR, exist_ok=True)
|
|
ts = time.strftime("%Y%m%d_%H%M%S")
|
|
path = os.path.join(SCREENSHOT_DIR, f"{label}_{ts}.png")
|
|
try:
|
|
page.screenshot(path=path, full_page=True)
|
|
print(f"[DEBUG] Captura guardada: {path}")
|
|
except Exception:
|
|
pass
|
|
return path
|
|
|
|
_SWITCH_SEL = ".n-switch, .n-switch__rail"
|
|
_MODAL_SEL = "button:has-text('Entendido'), button:has-text('entendido'), button:has-text('Got it'), button:has-text('Deshabilitar')"
|
|
_MODAL_CLOSE_SEL = "button:has-text('Entendido'), button:has-text('entendido'), button:has-text('Got it')"
|
|
_SAVE_SEL = "button:has-text('Save'), button:has-text('Guardar'), button:has-text('save'), button:has-text('guardar')"
|
|
|
|
def _find_builder_frame(page, timeout_sec=35):
|
|
"""
|
|
Escanea todos los frames (principal + iframes) buscando el switch de estado o el modal
|
|
del AI Builder. El workflow builder de GHL carga dentro de un iframe, por lo que los
|
|
selectores no funcionan sobre el frame principal.
|
|
Devuelve (frame, "switch"|"modal"|"timeout"|"login_redirect").
|
|
Detecta login redirect tanto en page como en iframes (sesión expirada).
|
|
"""
|
|
for attempt in range(timeout_sec):
|
|
if _any_frame_at_login(page):
|
|
return None, "login_redirect"
|
|
|
|
for frame in page.frames:
|
|
try:
|
|
if frame.locator(_SWITCH_SEL).count() > 0:
|
|
return frame, "switch"
|
|
if frame.locator(_MODAL_SEL).count() > 0:
|
|
return frame, "modal"
|
|
except Exception:
|
|
continue
|
|
|
|
if attempt % 5 == 4:
|
|
frame_urls = [f.url for f in page.frames]
|
|
try:
|
|
current_url = page.url
|
|
except Exception:
|
|
current_url = "(unknown)"
|
|
print(f"[DEBUG] URL: {current_url} | Título: {page.title()} | Frames: {len(page.frames)} | Intento {attempt+1}/{timeout_sec}")
|
|
print(f"[DEBUG] URLs de frames: {frame_urls}")
|
|
|
|
time.sleep(1)
|
|
|
|
return None, "timeout"
|
|
|
|
|
|
def _verify_status_via_api(location_id, workflow_id, target_active, max_attempts=6, base_wait_sec=3):
|
|
"""Verifica contra la API de GHL que el status del workflow coincida con el deseado.
|
|
|
|
Hace `max_attempts` pasadas con backoff lineal (base_wait_sec, 2*base, 3*base, ...).
|
|
GHL puede tardar 7-20 s en propagar un cambio del builder a su API, así que reintentos
|
|
son necesarios. Devuelve True si en algún intento el estado coincide, False si caduca.
|
|
"""
|
|
try:
|
|
import sync_engine
|
|
except Exception as imp_err:
|
|
print(f"[ADVERTENCIA] No se pudo importar sync_engine: {imp_err}")
|
|
return False
|
|
|
|
tokens = sync_engine.get_tokens_map()
|
|
token = tokens.get(location_id)
|
|
if not token:
|
|
print("[ADVERTENCIA] No hay token en el CSV — se omite validación API.")
|
|
return False
|
|
|
|
for attempt in range(1, max_attempts + 1):
|
|
wait_sec = base_wait_sec * attempt
|
|
print(f"[INFO] Validación API intento {attempt}/{max_attempts} (espera {wait_sec}s antes de consultar)...")
|
|
time.sleep(wait_sec)
|
|
try:
|
|
wfs = sync_engine.ghl_client.get_workflows(token, location_id)
|
|
except Exception as ge:
|
|
print(f"[DEBUG] Error consultando API: {ge}")
|
|
continue
|
|
|
|
actual = next((w for w in wfs if w.get("id") == workflow_id), None)
|
|
if actual is None:
|
|
print(f"[DEBUG] El workflow {workflow_id} aún no aparece en la API.")
|
|
continue
|
|
|
|
actual_status = (actual.get("status") or "").lower()
|
|
actual_is_active = actual_status in ("active", "published")
|
|
print(f"[INFO] Estado en API: '{actual_status}' (active={actual_is_active}) — esperado: active={target_active}")
|
|
|
|
if actual_is_active == target_active:
|
|
print(f"[ÉXITO] Cambio confirmado en API tras {attempt} intento(s).")
|
|
return True
|
|
|
|
print(f"[ADVERTENCIA] La API no reflejó el cambio tras {max_attempts} intentos.")
|
|
return False
|
|
|
|
|
|
def _perform_toggle_on_page(page, location_id, workflow_id, current_status):
|
|
"""Hace el toggle Publicar/Borrador en una `page` ya creada (no abre/cierra browser).
|
|
Devuelve True si el cambio quedó confirmado en API, False si algo falló.
|
|
|
|
Esta función se invoca tanto desde toggle_via_playwright_dom (caso individual) como
|
|
desde bulk_draft_via_playwright_dom (caso lote, reusa un solo browser para N workflows).
|
|
"""
|
|
target_active = current_status not in ("active", "published")
|
|
target_url = f"https://crm.bucefalocrm.io/location/{location_id}/workflow/{workflow_id}"
|
|
print(f"[INFO] Navegando directamente a: {target_url}")
|
|
page.goto(target_url, wait_until="domcontentloaded", timeout=30000)
|
|
|
|
print("[INFO] Buscando frame del workflow builder (puede estar en un iframe)...")
|
|
builder_frame, found_element = _find_builder_frame(page)
|
|
|
|
if found_element == "login_redirect":
|
|
print(f"[ERROR] Redirección a login: {page.url}")
|
|
_save_debug_screenshot(page, "login_redirect")
|
|
raise SessionExpiredError("Bucéfalo redirigió al login durante toggle")
|
|
|
|
if found_element == "timeout":
|
|
print(f"[ERROR] Tiempo agotado. URL: {page.url} | Título: {page.title()}")
|
|
_save_debug_screenshot(page, "timeout_no_switch")
|
|
return False
|
|
|
|
print(f"[INFO] Builder encontrado en frame: {builder_frame.url[:80]} | Elemento: {found_element}")
|
|
|
|
if found_element == "modal":
|
|
print("[INFO] Cerrando modal 'AI Builder habilitado' con botón 'Entendido'...")
|
|
try:
|
|
builder_frame.locator(_MODAL_CLOSE_SEL).first.click()
|
|
print("[INFO] Modal cerrado. Esperando switch...")
|
|
for _ in range(12):
|
|
if builder_frame.locator(_SWITCH_SEL).count() > 0:
|
|
break
|
|
time.sleep(1)
|
|
else:
|
|
print("[ERROR] Switch no apareció tras cerrar el modal.")
|
|
_save_debug_screenshot(page, "no_switch_after_modal")
|
|
return False
|
|
except Exception as me:
|
|
print(f"[ADVERTENCIA] No se pudo cerrar el modal: {me}")
|
|
|
|
print("[INFO] Localizando interruptor de estado (Naive UI)...")
|
|
all_sw = builder_frame.locator(".n-switch").all()
|
|
print(f"[DEBUG] Total de switches en el frame: {len(all_sw)}")
|
|
|
|
publish_switch = None
|
|
for candidate in builder_frame.get_by_role("switch").all():
|
|
try:
|
|
ancestor_text = candidate.locator("xpath=ancestor::*[4]").first.inner_text(timeout=1000)
|
|
if any(kw in ancestor_text.lower() for kw in ("publicar", "borrador", "publish", "draft")):
|
|
publish_switch = candidate
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if publish_switch is None:
|
|
role_switches = builder_frame.get_by_role("switch").all()
|
|
publish_switch = role_switches[-1] if role_switches else builder_frame.locator(".n-switch").first
|
|
|
|
switch = publish_switch
|
|
|
|
print("[INFO] Esperando a que el iframe del builder termine de cargar...")
|
|
try:
|
|
builder_frame.wait_for_load_state("networkidle", timeout=25000)
|
|
except Exception:
|
|
pass
|
|
time.sleep(3)
|
|
|
|
cls = switch.get_attribute("class") or ""
|
|
aria_checked = switch.get_attribute("aria-checked")
|
|
is_active = (aria_checked == "true") or ("n-switch--active" in cls)
|
|
print(f"[INFO] Interruptor (estabilizado): {'Publicado' if is_active else 'Borrador'} (aria-checked={aria_checked})")
|
|
print(f"[INFO] Estado deseado: {'Publicado' if target_active else 'Borrador'}")
|
|
|
|
if is_active == target_active:
|
|
print("[INFO] El workflow ya está en el estado deseado.")
|
|
return True
|
|
|
|
def _close_all_modals():
|
|
closed_any = False
|
|
for frame_candidate in [builder_frame, page.main_frame]:
|
|
try:
|
|
if frame_candidate.locator(_MODAL_SEL).count() > 0:
|
|
frame_candidate.locator(_MODAL_CLOSE_SEL).first.click()
|
|
closed_any = True
|
|
time.sleep(0.7)
|
|
except Exception:
|
|
continue
|
|
return closed_any
|
|
|
|
for _ in range(3):
|
|
if not _close_all_modals():
|
|
break
|
|
|
|
now_active = is_active
|
|
for attempt in range(1, 4):
|
|
print(f"[INFO] Click en interruptor (intento {attempt}/3)...")
|
|
try:
|
|
switch.click()
|
|
except Exception as ce:
|
|
print(f"[DEBUG] Excepción al clickear: {ce}")
|
|
time.sleep(2)
|
|
_close_all_modals()
|
|
time.sleep(0.5)
|
|
new_checked = switch.get_attribute("aria-checked")
|
|
new_cls = switch.get_attribute("class") or ""
|
|
now_active = (new_checked == "true") or ("n-switch--active" in new_cls)
|
|
print(f"[INFO] Estado del switch tras intento {attempt}: {'Publicado' if now_active else 'Borrador'}")
|
|
if now_active == target_active:
|
|
break
|
|
|
|
if now_active != target_active:
|
|
print("[ERROR] El switch no cambió al estado deseado después de 3 intentos.")
|
|
_save_debug_screenshot(page, "switch_no_change")
|
|
return False
|
|
|
|
print("[INFO] Localizando botón Guardar/Save...")
|
|
_SAVE_ID_SEL = "#cmp-header__btn--save-workflow"
|
|
save_button = None
|
|
for loc in [
|
|
builder_frame.locator(_SAVE_ID_SEL),
|
|
builder_frame.locator(_SAVE_SEL),
|
|
page.locator(_SAVE_ID_SEL),
|
|
page.locator(_SAVE_SEL),
|
|
]:
|
|
if loc.count() > 0:
|
|
save_button = loc.first
|
|
break
|
|
|
|
if save_button is None:
|
|
print("[ERROR] No se encontró el botón de Guardar.")
|
|
_save_debug_screenshot(page, "no_save_button")
|
|
return False
|
|
|
|
btn_text = ""
|
|
try:
|
|
btn_text = save_button.inner_text(timeout=2000).strip().lower()
|
|
except Exception:
|
|
pass
|
|
print(f"[DEBUG] Texto del botón antes de guardar: '{btn_text}'")
|
|
|
|
if any(saved_kw in btn_text for saved_kw in ("guardado", "saved")):
|
|
print("[ÉXITO] El cambio se guardó automáticamente (botón muestra 'Guardado').")
|
|
# Aun así validamos contra API por consistencia.
|
|
else:
|
|
print("[INFO] Haciendo click en 'Guardar'...")
|
|
save_button.click()
|
|
print("[INFO] Esperando confirmación de guardado (hasta 15 s)...")
|
|
saved_confirmed = False
|
|
for _ in range(15):
|
|
try:
|
|
txt_now = save_button.inner_text(timeout=1000).strip().lower()
|
|
if any(kw in txt_now for kw in ("guardado", "saved")):
|
|
saved_confirmed = True
|
|
break
|
|
except Exception:
|
|
pass
|
|
time.sleep(1)
|
|
if saved_confirmed:
|
|
print("[ÉXITO] Workflow guardado: el botón ahora muestra 'Guardado'/'Saved'.")
|
|
else:
|
|
print("[ADVERTENCIA] No se vio el texto 'Guardado' tras 15 s — validaremos contra API.")
|
|
_save_debug_screenshot(page, "save_not_confirmed")
|
|
|
|
time.sleep(3)
|
|
api_confirmed = _verify_status_via_api(location_id, workflow_id, target_active, max_attempts=6, base_wait_sec=3)
|
|
|
|
if not api_confirmed:
|
|
print("[INFO] Intentando recargar la página y reverificar (workaround bug visual)...")
|
|
try:
|
|
page.reload(wait_until="domcontentloaded", timeout=30000)
|
|
time.sleep(5)
|
|
api_confirmed = _verify_status_via_api(location_id, workflow_id, target_active, max_attempts=4, base_wait_sec=4)
|
|
except Exception as re:
|
|
print(f"[DEBUG] No se pudo recargar: {re}")
|
|
|
|
if not api_confirmed:
|
|
print("[ERROR] El cambio no se reflejó en la API de GHL tras múltiples reintentos.")
|
|
_save_debug_screenshot(page, "api_not_confirmed")
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def toggle_via_playwright_dom(location_id, workflow_id, current_status, run_id=None):
|
|
"""Caso individual: abre browser, hace toggle, valida contra API y cierra."""
|
|
from playwright.sync_api import sync_playwright
|
|
print(f"[INFO] Iniciando automatización DOM para {workflow_id}...")
|
|
|
|
target_active = current_status not in ("active", "published")
|
|
_INTERRUPT_STATE.update({
|
|
"location_id": location_id,
|
|
"workflow_id": workflow_id,
|
|
"run_id": run_id,
|
|
"target_active": target_active,
|
|
"action": "toggle-status",
|
|
})
|
|
|
|
try:
|
|
with sync_playwright() as p:
|
|
print("[INFO] Lanzando navegador en segundo plano (headless)...")
|
|
browser, context, page = _open_browser(p)
|
|
_INTERRUPT_STATE["browser"] = browser
|
|
_INTERRUPT_STATE["context"] = context
|
|
try:
|
|
ok = _perform_toggle_on_page(page, location_id, workflow_id, current_status)
|
|
finally:
|
|
_close_and_save(browser, context)
|
|
return ok
|
|
except Exception as e:
|
|
error_id = error_logging.log_error("playwright_dom_toggle_failed", e, {"location_id": location_id, "workflow_id": workflow_id, "run_id": run_id, "action": "toggle-status"})
|
|
print(f"ERROR en la automatización DOM de Playwright: {str(e)} | error_id={error_id}")
|
|
return False
|
|
|
|
|
|
def _perform_toggle_item(page, item, prefix, run_id, target_active):
|
|
"""Procesa un solo workflow del bulk de toggle. Devuelve dict-resultado.
|
|
|
|
Registra el cambio en script_audit (planned→applied/failed). Propaga
|
|
SessionExpiredError marcando el change como failed antes (para abortar el bulk).
|
|
"""
|
|
wf_id = item.get("workflow_id") or item.get("id")
|
|
loc_id = item.get("location_id")
|
|
curr = (item.get("current_status") or item.get("status") or "").lower()
|
|
name = item.get("name") or wf_id or "?"
|
|
currently_active = curr in ("active", "published")
|
|
target_value = "published" if target_active else "draft"
|
|
if currently_active == target_active:
|
|
already_label = "publicado" if target_active else "borrador"
|
|
print(f"{prefix} SKIP — ya está en {already_label} (status='{curr}').", flush=True)
|
|
return {"workflow_id": wf_id, "location_id": loc_id, "name": name,
|
|
"status": "skipped", "reason": f"ya en {already_label}"}
|
|
|
|
change_id = None
|
|
if run_id:
|
|
try:
|
|
change_id = script_audit.record_change(
|
|
run_id, loc_id, "workflow", wf_id,
|
|
field_id="status", field_name="status",
|
|
old_value=curr or None, new_value=target_value,
|
|
)
|
|
except Exception as ae:
|
|
print(f"{prefix} [WARN] No se pudo registrar change planned: {ae}", flush=True)
|
|
|
|
try:
|
|
ok = _perform_toggle_on_page(page, loc_id, wf_id, curr)
|
|
except SessionExpiredError:
|
|
if change_id:
|
|
try:
|
|
script_audit.mark_change(change_id, "failed", "session_expired")
|
|
except Exception:
|
|
pass
|
|
raise
|
|
except Exception as ie:
|
|
if change_id:
|
|
try:
|
|
script_audit.mark_change(change_id, "failed", str(ie)[:500])
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
if change_id:
|
|
try:
|
|
if ok:
|
|
script_audit.mark_change(change_id, "applied")
|
|
else:
|
|
script_audit.mark_change(change_id, "failed", "mutacion_no_confirmada_por_api")
|
|
except Exception:
|
|
pass
|
|
|
|
return {"workflow_id": wf_id, "location_id": loc_id, "name": name,
|
|
"status": "success" if ok else "failed", "change_id": change_id}
|
|
|
|
|
|
def bulk_toggle_via_playwright_dom(items, target_active, run_id=None, workers=1, per_action_pause=3.0):
|
|
"""Pone una lista de workflows en Borrador (target_active=False) o Publicado (target_active=True).
|
|
|
|
Reusa un único browser entre items. Con workers>1 se reparte sobre N contexts paralelos.
|
|
`items`: lista de dicts {workflow_id, location_id, current_status, name}.
|
|
Devuelve True si todos los que necesitaban cambio se procesaron OK; False si hubo fallos.
|
|
"""
|
|
target_label = "Publicado" if target_active else "Borrador"
|
|
action_label = "BULK-PUBLISH" if target_active else "BULK-DRAFT"
|
|
print(f"\n[{action_label}] Iniciando procesamiento de {len(items)} workflows (target: {target_label}) con {workers} worker(s)...")
|
|
|
|
_INTERRUPT_STATE.update({
|
|
"action": "bulk-publish" if target_active else "bulk-draft",
|
|
"run_id": run_id,
|
|
"target_active": target_active,
|
|
})
|
|
|
|
def _action(page, item, prefix, rid):
|
|
return _perform_toggle_item(page, item, prefix, rid, target_active)
|
|
|
|
results, failed, success, skipped = _run_parallel_bulk(
|
|
items,
|
|
_action,
|
|
workers=workers,
|
|
run_id=run_id,
|
|
action_label=action_label.lower(),
|
|
per_action_pause=per_action_pause,
|
|
resync_after=True,
|
|
)
|
|
|
|
print(f"\n=== RESUMEN {action_label} ===")
|
|
print(f"Total: {len(items)} | Éxitos: {success} | Ya en {target_label.lower()}: {skipped} | Fallos: {failed}")
|
|
for r in results:
|
|
symbol = {"success": "OK", "skipped": "--", "failed": "XX"}.get(r["status"], "??")
|
|
suffix = f" ({r.get('reason')})" if r.get("reason") else ""
|
|
wf_short = (r.get("workflow_id") or "")[:8]
|
|
loc_short = (r.get("location_id") or "")[:12]
|
|
print(f" [{symbol}] {r.get('name')} ({wf_short}) en {loc_short}{suffix}")
|
|
|
|
return failed == 0
|
|
|
|
|
|
# Compat: mantener alias por si alguien lo invoca por el nombre viejo.
|
|
def bulk_draft_via_playwright_dom(items, run_id=None):
|
|
return bulk_toggle_via_playwright_dom(items, target_active=False, run_id=run_id)
|
|
|
|
|
|
# Frases del menú de 3 puntos en español e inglés (Bucéfalo soporta ambos).
|
|
_DELETE_MENU_TEXTS = ["Eliminar flujo de trabajo", "Delete workflow", "Delete Workflow"]
|
|
_RENAME_MENU_TEXTS = ["Renombrar flujo de trabajo", "Rename workflow", "Rename Workflow"]
|
|
# Texto exacto que el modal pide escribir para confirmar borrado.
|
|
_DELETE_CONFIRM_TOKENS = ["Borrar", "Delete"]
|
|
|
|
|
|
_DISMISS_BUTTON_SELECTOR = (
|
|
'button:has-text("Entendido"), button:has-text("entendido"), '
|
|
'button:has-text("Got it"), button:has-text("Got It"), '
|
|
'button:has-text("OK"), button:has-text("Aceptar")'
|
|
)
|
|
|
|
_LIST_ROW_SELECTORS = [
|
|
"tbody tr",
|
|
".n-data-table-tr",
|
|
"[class*='workflow-row']",
|
|
"[class*='workflows-list'] tr",
|
|
"table tr:not(:has(th))",
|
|
]
|
|
|
|
|
|
def _dismiss_blocking_modals(scope):
|
|
"""Cierra modales de bienvenida tipo 'AI Builder habilitado'. `scope` = page o frame."""
|
|
try:
|
|
btn = scope.locator(_DISMISS_BUTTON_SELECTOR).first
|
|
if btn.count() > 0:
|
|
try:
|
|
if btn.is_visible(timeout=500):
|
|
print("[INFO] Cerrando modal bloqueante...")
|
|
btn.click()
|
|
time.sleep(1)
|
|
return True
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
pass
|
|
return False
|
|
|
|
|
|
def _scope_has_table(scope):
|
|
"""True si el scope (page o frame) ya muestra la tabla de workflows."""
|
|
for sel in _LIST_ROW_SELECTORS:
|
|
try:
|
|
if scope.locator(sel).count() > 0:
|
|
return sel
|
|
except Exception:
|
|
continue
|
|
return None
|
|
|
|
|
|
class SessionExpiredError(Exception):
|
|
"""Bucéfalo redirigió al login → toda interacción posterior va a fallar."""
|
|
pass
|
|
|
|
|
|
def _any_frame_at_login(page):
|
|
"""True si la página principal o cualquier iframe está mostrando la pantalla de login.
|
|
|
|
Importante: Bucéfalo es un SPA, así que cuando una request da 401 el cliente NO redirige
|
|
a /login sino que renderiza el componente de login DENTRO del mismo iframe sin cambiar la URL.
|
|
Por eso revisamos también el DOM (no solo la URL).
|
|
"""
|
|
# Check 1: URL del frame (cubre redirects HTTP reales tipo /login).
|
|
try:
|
|
if "login" in page.url.lower() or "/auth" in page.url.lower():
|
|
return True
|
|
except Exception:
|
|
pass
|
|
for frame in page.frames:
|
|
try:
|
|
url = (frame.url or "").lower()
|
|
if "login" in url or "/auth" in url:
|
|
return True
|
|
except Exception:
|
|
continue
|
|
|
|
# Check 2: DOM del frame (cubre el caso SPA donde el iframe muestra login pero la URL
|
|
# sigue siendo /automation/workflows). Buscar input de password + texto de login.
|
|
login_dom_signals = (
|
|
'input[type="password"]',
|
|
'text=/Inicia.*sesi[oó]n.*cuenta/i',
|
|
'text=/Sign in to your account/i',
|
|
)
|
|
for scope in [page] + list(page.frames):
|
|
for sig in login_dom_signals:
|
|
try:
|
|
if scope.locator(sig).count() > 0:
|
|
# Verificar que sea VISIBLE (no esté oculto en algún div fuera de pantalla).
|
|
el = scope.locator(sig).first
|
|
try:
|
|
if el.is_visible(timeout=300):
|
|
return True
|
|
except Exception:
|
|
# Si is_visible falla, asumir que está visible si hay match.
|
|
return True
|
|
except Exception:
|
|
continue
|
|
return False
|
|
|
|
|
|
def _find_workflows_frame(page, timeout_sec=40):
|
|
"""Escanea todos los frames buscando aquel que contenga la tabla de workflows.
|
|
Devuelve el scope (frame o page) donde operar, o None si caduca.
|
|
Lanza SessionExpiredError si detecta que algún frame se fue al login."""
|
|
deadline = time.time() + timeout_sec
|
|
while time.time() < deadline:
|
|
if _any_frame_at_login(page):
|
|
urls = [f.url for f in page.frames]
|
|
print(f"[ERROR] Redirección a login detectada. URL principal: {page.url} | Frames: {urls}")
|
|
raise SessionExpiredError("Bucéfalo redirigió al login")
|
|
|
|
# Probar primero la página principal y luego cada iframe.
|
|
scopes = [page] + list(page.frames)
|
|
for scope in scopes:
|
|
# Cerrar modal si existe en este scope antes de medir filas.
|
|
_dismiss_blocking_modals(scope)
|
|
sel = _scope_has_table(scope)
|
|
if sel:
|
|
label = "main" if scope is page else f"frame {getattr(scope, 'url', '')[:80]}"
|
|
print(f"[DEBUG] Tabla en {label} (selector: {sel}, filas: {scope.locator(sel).count()}).")
|
|
time.sleep(3) # binding de listeners
|
|
# Cerrar un modal más si reapareció tras render.
|
|
_dismiss_blocking_modals(scope)
|
|
return scope
|
|
time.sleep(1)
|
|
return None
|
|
|
|
|
|
def _open_workflows_list(page, location_id, timeout_ms=45000):
|
|
"""Navega al listado de workflows. Devuelve el scope (page o frame) o None.
|
|
Puede lanzar SessionExpiredError si Bucéfalo redirige al login."""
|
|
url = (
|
|
f"https://crm.bucefalocrm.io/v2/location/{location_id}"
|
|
"/automation/workflows?listTab=all&tab=recent&filter=all"
|
|
)
|
|
print(f"[INFO] Navegando al listado: {url}")
|
|
page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms)
|
|
|
|
# Espera breve para que el iframe inicie su redirección si la sesión está muerta.
|
|
time.sleep(2)
|
|
|
|
if _any_frame_at_login(page):
|
|
print(f"[ERROR] Sesión expirada: algún frame está en login. URL principal: {page.url}")
|
|
raise SessionExpiredError("Bucéfalo redirigió al login")
|
|
|
|
print("[INFO] Buscando frame de la tabla de workflows (puede estar en iframe)...")
|
|
scope = _find_workflows_frame(page)
|
|
if scope is None:
|
|
print("[ERROR] La tabla de workflows no cargó dentro del tiempo esperado.")
|
|
_save_debug_screenshot(page, "list_load_timeout")
|
|
return None
|
|
return scope
|
|
|
|
|
|
def _resolve_workflow_name(location_id, workflow_id):
|
|
"""Busca en SQLite el nombre del workflow para localizarlo por texto en la UI."""
|
|
try:
|
|
rows = db.get_workflows(location_id)
|
|
for w in rows:
|
|
if w.get("id") == workflow_id:
|
|
return w.get("name")
|
|
except Exception as e:
|
|
print(f"[DEBUG] No se pudo resolver nombre del workflow desde SQLite: {e}")
|
|
return None
|
|
|
|
|
|
def _find_workflow_row(scope, workflow_id, workflow_name=None):
|
|
"""Devuelve el locator de la fila que contiene el workflow_id, o None.
|
|
`scope` puede ser una Page o un Frame. Si el ID no aparece en el DOM, intenta por nombre."""
|
|
candidates = [
|
|
f'tr:has(a[href*="{workflow_id}"])',
|
|
f'tr:has([href*="{workflow_id}"])',
|
|
f'[data-id="{workflow_id}"]',
|
|
f'tr:has-text("{workflow_id}")',
|
|
]
|
|
for sel in candidates:
|
|
try:
|
|
loc = scope.locator(sel).first
|
|
if loc.count() > 0:
|
|
print(f"[DEBUG] Fila localizada con selector: {sel}")
|
|
return loc
|
|
except Exception:
|
|
continue
|
|
|
|
if workflow_name:
|
|
try:
|
|
loc = scope.locator(f'tr:has-text("{workflow_name}")').first
|
|
if loc.count() > 0:
|
|
print(f"[DEBUG] Fila localizada por nombre: '{workflow_name}'")
|
|
return loc
|
|
except Exception:
|
|
pass
|
|
|
|
# Diagnóstico: imprimir info de las filas presentes para iterar selectores.
|
|
try:
|
|
rows = scope.locator("tbody tr").all()
|
|
print(f"[DEBUG] Diagnóstico: {len(rows)} filas en la tabla. Ni ID ni nombre matchearon.")
|
|
for idx, r in enumerate(rows[:5]):
|
|
try:
|
|
txt = r.inner_text(timeout=500).replace("\n", " | ")[:120]
|
|
hrefs = []
|
|
for a in r.locator("a").all()[:3]:
|
|
try:
|
|
href = a.get_attribute("href")
|
|
if href:
|
|
hrefs.append(href[:80])
|
|
except Exception:
|
|
pass
|
|
print(f"[DEBUG] row[{idx}] text='{txt}' hrefs={hrefs}")
|
|
except Exception:
|
|
continue
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _open_row_menu(scope, row, workflow_id):
|
|
"""Hace click en el botón de 3 puntos (•••) de la fila."""
|
|
# GHL/Naive usa botones con icono de ellipsis o role=button al final de la fila.
|
|
menu_selectors = [
|
|
'button[aria-label*="More"]',
|
|
'button[aria-label*="more"]',
|
|
'button:has(.fa-ellipsis-vertical)',
|
|
'button:has(.fa-ellipsis)',
|
|
'button:has(i[class*="ellipsis"])',
|
|
'[class*="more-options"]',
|
|
'[class*="three-dots"]',
|
|
'button[class*="ellipsis"]',
|
|
]
|
|
for sel in menu_selectors:
|
|
try:
|
|
btn = row.locator(sel)
|
|
if btn.count() > 0:
|
|
print(f"[DEBUG] Botón menú con selector: {sel}")
|
|
btn.first.click()
|
|
return True
|
|
except Exception:
|
|
continue
|
|
|
|
# Fallback: el último botón de la fila suele ser el menú.
|
|
try:
|
|
buttons = row.locator("button").all()
|
|
if buttons:
|
|
print(f"[DEBUG] Fallback: click en el último botón de la fila ({len(buttons)} botones).")
|
|
buttons[-1].click()
|
|
return True
|
|
except Exception:
|
|
pass
|
|
|
|
print(f"[ERROR] No se encontró el botón ••• en la fila del workflow {workflow_id}.")
|
|
return False
|
|
|
|
|
|
def _click_menu_item(scope, texts, timeout_sec=10):
|
|
"""Click en el primer item del menú flotante que coincida con cualquiera de los textos.
|
|
`scope` puede ser Page o Frame."""
|
|
deadline = time.time() + timeout_sec
|
|
while time.time() < deadline:
|
|
for txt in texts:
|
|
try:
|
|
item = scope.get_by_text(txt, exact=False).first
|
|
if item.count() > 0:
|
|
item.click()
|
|
print(f"[INFO] Click en item: '{txt}'.")
|
|
return True
|
|
except Exception:
|
|
continue
|
|
time.sleep(0.5)
|
|
print(f"[ERROR] No apareció ningún item del menú entre: {texts}")
|
|
return False
|
|
|
|
|
|
def _verify_deletion_via_api(location_id, workflow_id, max_attempts=4, base_wait_sec=3):
|
|
"""Confirma que el workflow ya no existe en la API de GHL. Devuelve True si fue eliminado."""
|
|
try:
|
|
import sync_engine
|
|
except Exception:
|
|
return False
|
|
tokens = sync_engine.get_tokens_map()
|
|
token = tokens.get(location_id)
|
|
if not token:
|
|
print("[ADVERTENCIA] No hay token en el CSV — se omite validación API.")
|
|
return False
|
|
for attempt in range(1, max_attempts + 1):
|
|
wait_sec = base_wait_sec * attempt
|
|
print(f"[INFO] Validación API intento {attempt}/{max_attempts} (espera {wait_sec}s)...")
|
|
time.sleep(wait_sec)
|
|
try:
|
|
wfs = sync_engine.ghl_client.get_workflows(token, location_id)
|
|
except Exception as ge:
|
|
print(f"[DEBUG] Error consultando API: {ge}")
|
|
continue
|
|
exists = any(w.get("id") == workflow_id for w in wfs)
|
|
if not exists:
|
|
print(f"[ÉXITO] Cambio confirmado en API tras {attempt} intento(s).")
|
|
return True
|
|
print(f"[DEBUG] El workflow aún aparece en API (intento {attempt}).")
|
|
print(f"[ADVERTENCIA] La API sigue mostrando el workflow tras {max_attempts} intentos.")
|
|
return False
|
|
|
|
|
|
def _perform_delete_on_page(page, location_id, workflow_id, workflow_name=None):
|
|
"""Borra un workflow en una `page` ya creada (no abre/cierra browser). Devuelve True si OK."""
|
|
scope = _open_workflows_list(page, location_id)
|
|
if scope is None:
|
|
_save_debug_screenshot(page, "delete_list_failed")
|
|
return False
|
|
|
|
row = _find_workflow_row(scope, workflow_id, workflow_name)
|
|
if row is None:
|
|
print(f"[ERROR] No se encontró la fila del workflow {workflow_id} en la tabla.")
|
|
_save_debug_screenshot(page, "delete_no_row")
|
|
return False
|
|
|
|
try:
|
|
row.hover()
|
|
except Exception:
|
|
pass
|
|
|
|
if not _open_row_menu(scope, row, workflow_id):
|
|
_save_debug_screenshot(page, "delete_no_menu_btn")
|
|
return False
|
|
|
|
print("[INFO] Esperando menú flotante con la opción 'Eliminar flujo de trabajo'...")
|
|
if not _click_menu_item(scope, _DELETE_MENU_TEXTS, timeout_sec=12):
|
|
_save_debug_screenshot(page, "delete_no_menu_item")
|
|
return False
|
|
|
|
print("[INFO] Esperando modal de confirmación...")
|
|
time.sleep(1.5)
|
|
|
|
confirm_input = None
|
|
confirm_token = None
|
|
for tok in _DELETE_CONFIRM_TOKENS:
|
|
try:
|
|
candidate = scope.locator(f'input[placeholder="{tok}"]')
|
|
if candidate.count() > 0:
|
|
confirm_input = candidate.first
|
|
confirm_token = tok
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
if confirm_input is None:
|
|
try:
|
|
modal_inputs = scope.locator('div[role="dialog"] input, .modal input, .n-modal input').all()
|
|
if modal_inputs:
|
|
confirm_input = modal_inputs[0]
|
|
confirm_token = _DELETE_CONFIRM_TOKENS[0]
|
|
except Exception:
|
|
pass
|
|
|
|
if confirm_input is None:
|
|
print("[ERROR] No se encontró el input de confirmación del modal.")
|
|
_save_debug_screenshot(page, "delete_no_confirm_input")
|
|
return False
|
|
|
|
print(f"[INFO] Escribiendo '{confirm_token}' en el input de confirmación...")
|
|
confirm_input.fill(confirm_token)
|
|
time.sleep(0.5)
|
|
|
|
print("[INFO] Click en el botón final de confirmación...")
|
|
confirm_clicked = False
|
|
for tok in _DELETE_CONFIRM_TOKENS:
|
|
try:
|
|
btn = scope.locator(f'div[role="dialog"] button:has-text("{tok}"), .n-modal button:has-text("{tok}")').last
|
|
if btn.count() > 0:
|
|
btn.click()
|
|
confirm_clicked = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
if not confirm_clicked:
|
|
try:
|
|
dlg_buttons = scope.locator('div[role="dialog"] button, .n-modal button').all()
|
|
if dlg_buttons:
|
|
dlg_buttons[-1].click()
|
|
confirm_clicked = True
|
|
except Exception:
|
|
pass
|
|
|
|
if not confirm_clicked:
|
|
print("[ERROR] No se pudo hacer click en el botón final de borrado.")
|
|
_save_debug_screenshot(page, "delete_no_confirm_btn")
|
|
return False
|
|
|
|
print("[INFO] Borrado disparado. Esperando 5 s a que GHL procese...")
|
|
time.sleep(5)
|
|
|
|
try:
|
|
still_there = _find_workflow_row(scope, workflow_id) is not None
|
|
except Exception:
|
|
still_there = False
|
|
if not still_there:
|
|
print("[INFO] La fila ya no aparece en la tabla — validando contra API...")
|
|
else:
|
|
print("[INFO] La fila aún aparece en la tabla; validando contra API por si GHL aún procesa...")
|
|
|
|
# Validación API: el workflow no debe aparecer en GET /workflows/.
|
|
return _verify_deletion_via_api(location_id, workflow_id)
|
|
|
|
|
|
def delete_via_playwright_dom(location_id, workflow_id, run_id=None):
|
|
"""Caso individual: abre browser, borra workflow, cierra. Valida contra API."""
|
|
from playwright.sync_api import sync_playwright
|
|
print(f"[INFO] Iniciando borrado DOM para workflow {workflow_id} en {location_id}...")
|
|
workflow_name = _resolve_workflow_name(location_id, workflow_id)
|
|
if workflow_name:
|
|
print(f"[INFO] Nombre del workflow (SQLite): '{workflow_name}'")
|
|
_INTERRUPT_STATE.update({
|
|
"location_id": location_id,
|
|
"workflow_id": workflow_id,
|
|
"run_id": run_id,
|
|
"target_active": None,
|
|
"action": "delete",
|
|
})
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser, context, page = _open_browser(p)
|
|
_INTERRUPT_STATE["browser"] = browser
|
|
_INTERRUPT_STATE["context"] = context
|
|
try:
|
|
ok = _perform_delete_on_page(page, location_id, workflow_id, workflow_name)
|
|
finally:
|
|
_close_and_save(browser, context)
|
|
return ok
|
|
except Exception as e:
|
|
error_id = error_logging.log_error("playwright_dom_delete_failed", e, {"location_id": location_id, "workflow_id": workflow_id, "run_id": run_id, "action": "delete"})
|
|
print(f"ERROR en delete DOM: {str(e)} | error_id={error_id}")
|
|
return False
|
|
|
|
|
|
def _perform_delete_item(page, item, prefix, run_id):
|
|
"""Procesa un solo workflow del bulk de delete. Devuelve dict-resultado.
|
|
|
|
Registra el cambio en script_audit (planned→applied/failed). Propaga
|
|
SessionExpiredError marcando el change como failed antes.
|
|
"""
|
|
wf_id = item.get("workflow_id") or item.get("id")
|
|
loc_id = item.get("location_id")
|
|
name = item.get("name") or _resolve_workflow_name(loc_id, wf_id) or wf_id or "?"
|
|
|
|
change_id = None
|
|
if run_id:
|
|
try:
|
|
change_id = script_audit.record_change(
|
|
run_id, loc_id, "workflow", wf_id,
|
|
field_id="_workflow", field_name="deleted",
|
|
old_value=name, new_value=None,
|
|
)
|
|
except Exception as ae:
|
|
print(f"{prefix} [WARN] No se pudo registrar change planned: {ae}", flush=True)
|
|
|
|
try:
|
|
ok = _perform_delete_on_page(page, loc_id, wf_id, name)
|
|
except SessionExpiredError:
|
|
if change_id:
|
|
try:
|
|
script_audit.mark_change(change_id, "failed", "session_expired")
|
|
except Exception:
|
|
pass
|
|
raise
|
|
except Exception as ie:
|
|
if change_id:
|
|
try:
|
|
script_audit.mark_change(change_id, "failed", str(ie)[:500])
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
if change_id:
|
|
try:
|
|
if ok:
|
|
script_audit.mark_change(change_id, "applied")
|
|
else:
|
|
script_audit.mark_change(change_id, "failed", "delete_no_confirmado_por_api")
|
|
except Exception:
|
|
pass
|
|
|
|
return {"workflow_id": wf_id, "location_id": loc_id, "name": name,
|
|
"status": "success" if ok else "failed", "change_id": change_id}
|
|
|
|
|
|
def bulk_delete_via_playwright_dom(items, run_id=None, workers=1, per_action_pause=3.0):
|
|
"""Elimina una lista de workflows reusando un único browser entre items.
|
|
|
|
Con workers>1, reparte sobre N contexts paralelos compartiendo el browser.
|
|
"""
|
|
print(f"\n[BULK-DELETE] Iniciando procesamiento de {len(items)} workflows con {workers} worker(s)...")
|
|
|
|
_INTERRUPT_STATE.update({
|
|
"action": "bulk-delete",
|
|
"run_id": run_id,
|
|
"target_active": None,
|
|
})
|
|
|
|
results, failed, success, skipped = _run_parallel_bulk(
|
|
items,
|
|
_perform_delete_item,
|
|
workers=workers,
|
|
run_id=run_id,
|
|
action_label="bulk-delete",
|
|
per_action_pause=per_action_pause,
|
|
resync_after=True,
|
|
)
|
|
|
|
print(f"\n=== RESUMEN BULK-DELETE ===")
|
|
print(f"Total: {len(items)} | Éxitos: {success} | Omitidos: {skipped} | Fallos: {failed}")
|
|
for r in results:
|
|
symbol = {"success": "OK", "skipped": "--", "failed": "XX"}.get(r["status"], "??")
|
|
suffix = f" ({r.get('reason')})" if r.get("reason") else ""
|
|
wf_short = (r.get("workflow_id") or "")[:8]
|
|
loc_short = (r.get("location_id") or "")[:12]
|
|
print(f" [{symbol}] {r.get('name')} ({wf_short}) en {loc_short}{suffix}")
|
|
|
|
return failed == 0
|
|
|
|
|
|
def rename_via_playwright_dom(location_id, workflow_id, new_name, run_id=None):
|
|
"""Wrapper que envuelve la lógica real con auditoría granular en script_audit."""
|
|
change_id = None
|
|
if run_id:
|
|
try:
|
|
old_name = _resolve_workflow_name(location_id, workflow_id) or workflow_id
|
|
change_id = script_audit.record_change(
|
|
run_id, location_id, "workflow", workflow_id,
|
|
field_id="name", field_name="name",
|
|
old_value=old_name, new_value=new_name,
|
|
)
|
|
except Exception as ae:
|
|
print(f"[WARN] No se pudo registrar change planned para rename: {ae}")
|
|
|
|
try:
|
|
ok = _rename_via_playwright_dom_impl(location_id, workflow_id, new_name, run_id)
|
|
except Exception:
|
|
if change_id:
|
|
try:
|
|
script_audit.mark_change(change_id, "failed", "excepcion_no_manejada")
|
|
except Exception:
|
|
pass
|
|
raise
|
|
|
|
if change_id:
|
|
try:
|
|
if ok:
|
|
script_audit.mark_change(change_id, "applied")
|
|
else:
|
|
script_audit.mark_change(change_id, "failed", "rename_no_completado")
|
|
except Exception:
|
|
pass
|
|
return ok
|
|
|
|
|
|
def _rename_via_playwright_dom_impl(location_id, workflow_id, new_name, run_id=None):
|
|
"""Renombra un workflow usando automatización DOM en la UI de Bucéfalo."""
|
|
from playwright.sync_api import sync_playwright
|
|
print(f"[INFO] Iniciando rename DOM para workflow {workflow_id} → '{new_name}'...")
|
|
workflow_name = _resolve_workflow_name(location_id, workflow_id)
|
|
if workflow_name:
|
|
print(f"[INFO] Nombre actual (SQLite): '{workflow_name}'")
|
|
_INTERRUPT_STATE.update({
|
|
"location_id": location_id,
|
|
"workflow_id": workflow_id,
|
|
"run_id": run_id,
|
|
"target_active": None,
|
|
"action": "rename",
|
|
})
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser, context, page = _open_browser(p)
|
|
_INTERRUPT_STATE["browser"] = browser
|
|
_INTERRUPT_STATE["context"] = context
|
|
|
|
scope = _open_workflows_list(page, location_id)
|
|
if scope is None:
|
|
_save_debug_screenshot(page, "rename_list_failed")
|
|
_close_and_save(browser, context)
|
|
return False
|
|
|
|
row = _find_workflow_row(scope, workflow_id, workflow_name)
|
|
if row is None:
|
|
print(f"[ERROR] No se encontró la fila del workflow {workflow_id}.")
|
|
_save_debug_screenshot(page, "rename_no_row")
|
|
_close_and_save(browser, context)
|
|
return False
|
|
|
|
try:
|
|
row.hover()
|
|
except Exception:
|
|
pass
|
|
|
|
if not _open_row_menu(scope, row, workflow_id):
|
|
_save_debug_screenshot(page, "rename_no_menu_btn")
|
|
_close_and_save(browser, context)
|
|
return False
|
|
|
|
print("[INFO] Buscando opción 'Renombrar flujo de trabajo' en el menú...")
|
|
if not _click_menu_item(scope, _RENAME_MENU_TEXTS, timeout_sec=12):
|
|
_save_debug_screenshot(page, "rename_no_menu_item")
|
|
_close_and_save(browser, context)
|
|
return False
|
|
|
|
print("[INFO] Esperando modal de rename...")
|
|
time.sleep(1.5)
|
|
|
|
rename_input = None
|
|
try:
|
|
modal_inputs = scope.locator('div[role="dialog"] input, .modal input, .n-modal input').all()
|
|
if modal_inputs:
|
|
rename_input = modal_inputs[0]
|
|
except Exception:
|
|
pass
|
|
|
|
if rename_input is None:
|
|
print("[ERROR] No se encontró el input del modal de rename.")
|
|
_save_debug_screenshot(page, "rename_no_input")
|
|
_close_and_save(browser, context)
|
|
return False
|
|
|
|
rename_input.fill(new_name)
|
|
time.sleep(0.5)
|
|
|
|
confirm_clicked = False
|
|
for txt in ("Guardar", "Save", "Renombrar", "Rename", "Actualizar", "Update"):
|
|
try:
|
|
btn = scope.locator(f'div[role="dialog"] button:has-text("{txt}"), .n-modal button:has-text("{txt}")').last
|
|
if btn.count() > 0:
|
|
btn.click()
|
|
confirm_clicked = True
|
|
break
|
|
except Exception:
|
|
continue
|
|
if not confirm_clicked:
|
|
try:
|
|
dlg_buttons = scope.locator('div[role="dialog"] button, .n-modal button').all()
|
|
if dlg_buttons:
|
|
dlg_buttons[-1].click()
|
|
confirm_clicked = True
|
|
except Exception:
|
|
pass
|
|
|
|
if not confirm_clicked:
|
|
print("[ERROR] No se pudo hacer click en el botón final del modal.")
|
|
_save_debug_screenshot(page, "rename_no_confirm_btn")
|
|
_close_and_save(browser, context)
|
|
return False
|
|
|
|
print("[INFO] Rename disparado. Esperando 6 s a que GHL procese...")
|
|
time.sleep(6)
|
|
print("[ÉXITO] Workflow renombrado.")
|
|
_close_and_save(browser, context)
|
|
return True
|
|
|
|
except Exception as e:
|
|
error_id = error_logging.log_error("playwright_dom_rename_failed", e, {"location_id": location_id, "workflow_id": workflow_id, "run_id": run_id, "action": "rename"})
|
|
print(f"ERROR en rename DOM: {str(e)} | error_id={error_id}")
|
|
return False
|
|
|
|
|
|
def _resync_local_workflows(location_id):
|
|
"""Re-sincroniza la tabla local de SQLite tras una mutación exitosa."""
|
|
try:
|
|
import sync_engine
|
|
tokens = sync_engine.get_tokens_map()
|
|
token = tokens.get(location_id)
|
|
if token:
|
|
workflows = sync_engine.ghl_client.get_workflows(token, location_id)
|
|
db.save_workflows(location_id, workflows)
|
|
print(f"[INFO] SQLite actualizado. {len(workflows)} workflows.")
|
|
except Exception as se:
|
|
print(f"[ADVERTENCIA] No se pudo re-sincronizar SQLite local: {se}")
|
|
|
|
|
|
def main():
|
|
_install_signal_handlers()
|
|
parser = argparse.ArgumentParser(description="Gestor de workflows en Bucéfalo mediante automatización DOM con Playwright.")
|
|
parser.add_argument("--action", required=True, choices=["toggle-status", "rename", "delete", "bulk-draft", "bulk-publish", "bulk-delete"])
|
|
parser.add_argument("--location", help="Location ID de la subcuenta (obligatorio para acciones individuales)")
|
|
parser.add_argument("--workflow-id", help="ID del workflow (obligatorio para acciones individuales)")
|
|
parser.add_argument("--new-name", help="Nuevo nombre (rename)")
|
|
parser.add_argument("--current-status", help="Estado actual del workflow (toggle-status)")
|
|
parser.add_argument("--run-id", help="ID de ejecución de auditoría")
|
|
parser.add_argument("--batch-file", help="Ruta a archivo JSON con lista de items para acciones bulk-*")
|
|
parser.add_argument("--workers", type=int, default=1,
|
|
help="Workers paralelos para acciones bulk-* (1-5, default 1). Ignorado en perfil persistente.")
|
|
parser.add_argument("--action-pause", type=float, default=3.0, dest="action_pause",
|
|
help="Pausa (s) por worker entre items en bulks. Default 3.0.")
|
|
args = parser.parse_args()
|
|
|
|
# Defensa contra perfil persistente: lock del profile dir impide múltiples contexts.
|
|
if PERSISTENT_PROFILE_DIR and args.workers > 1:
|
|
print("[INFO] Perfil persistente activo → forzando --workers=1.")
|
|
args.workers = 1
|
|
args.workers = max(1, min(int(args.workers or 1), 5))
|
|
|
|
# Validación de argumentos según la acción.
|
|
is_bulk = args.action.startswith("bulk-")
|
|
if is_bulk:
|
|
if not args.batch_file:
|
|
print("ERROR: --batch-file es obligatorio para acciones bulk-*.")
|
|
sys.exit(1)
|
|
else:
|
|
if not args.location or not args.workflow_id:
|
|
print("ERROR: --location y --workflow-id son obligatorios para acciones individuales.")
|
|
sys.exit(1)
|
|
|
|
if args.run_id:
|
|
# Bootstrap idempotente: si el dashboard ya creó el run, INSERT OR IGNORE
|
|
# respeta lo previo; si vino de CLI, esto crea la fila para que los
|
|
# record_change posteriores no queden huérfanos.
|
|
try:
|
|
script_audit.create_run(
|
|
args.run_id,
|
|
script_name=os.path.basename(__file__),
|
|
arguments=" ".join(sys.argv[1:]),
|
|
locations=[args.location] if args.location else [],
|
|
execution_mode="parallel" if args.workers > 1 else "sequential",
|
|
)
|
|
except Exception as ae:
|
|
print(f"[WARN] No se pudo crear/actualizar run en script_audit: {ae}")
|
|
script_audit.update_run_status(args.run_id, "running")
|
|
|
|
print("=== CONTROL DE SCRIPTS: ghl_browser_workflow_manager.py ===")
|
|
print(f"Modo: Playwright Híbrido Avanzado | Acción: {args.action}")
|
|
if is_bulk:
|
|
print(f"Batch file: {args.batch_file}")
|
|
else:
|
|
print(f"Subcuenta: {args.location} | Workflow: {args.workflow_id}")
|
|
if args.current_status:
|
|
print(f"Estado Actual: {args.current_status}")
|
|
print("----------------------------------------------------------------------")
|
|
|
|
session_exists, session_age_hours = session_file_status()
|
|
if not session_exists:
|
|
print(f"ERROR: Falta el archivo de sesión: {SESSION_FILE}")
|
|
print("Corre el script 'ghl_browser_session_generator.py' primero.")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", f"Falta el archivo de sesion: {SESSION_FILE}")
|
|
sys.exit(1)
|
|
|
|
if session_age_hours is not None:
|
|
print(f"[INFO] Sesión Bucéfalo: {session_age_hours:.1f} h de antigüedad ({SESSION_FILE}).")
|
|
if session_age_hours > SESSION_MAX_AGE_HOURS:
|
|
print(f"[ADVERTENCIA] La sesión tiene más de {SESSION_MAX_AGE_HOURS} h — probablemente expiró.")
|
|
print("[ADVERTENCIA] Si la acción falla, renueva con: python scripts/ghl_browser_session_generator.py")
|
|
|
|
if not ensure_playwright_browsers():
|
|
print("ERROR: No hay binarios de Chromium disponibles para Playwright.")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", "Faltan binarios de Playwright (chromium)")
|
|
sys.exit(1)
|
|
|
|
# 0. Acciones bulk-*: leen lista del JSON y procesan todo con un solo browser.
|
|
if args.action in ("bulk-draft", "bulk-publish", "bulk-delete"):
|
|
import json as _json
|
|
try:
|
|
with open(args.batch_file, encoding="utf-8") as f:
|
|
items = _json.load(f)
|
|
except Exception as je:
|
|
print(f"ERROR leyendo --batch-file: {je}")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", f"No se pudo leer batch-file: {je}")
|
|
sys.exit(1)
|
|
if not isinstance(items, list) or not items:
|
|
print("ERROR: el batch-file debe contener una lista no vacía de workflows.")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", "batch-file vacío o inválido")
|
|
sys.exit(1)
|
|
if args.action == "bulk-delete":
|
|
ok_all = bulk_delete_via_playwright_dom(
|
|
items, run_id=args.run_id, workers=args.workers, per_action_pause=args.action_pause,
|
|
)
|
|
else:
|
|
target_active = (args.action == "bulk-publish")
|
|
ok_all = bulk_toggle_via_playwright_dom(
|
|
items, target_active=target_active, run_id=args.run_id,
|
|
workers=args.workers, per_action_pause=args.action_pause,
|
|
)
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "success" if ok_all else "failed",
|
|
None if ok_all else f"Algunos items de {args.action} fallaron")
|
|
sys.exit(0 if ok_all else 1)
|
|
|
|
# 1. Si es toggle-status, ejecutamos directamente la automatización DOM sobre la página del constructor del workflow.
|
|
# Esto es 100% robusto y garantiza el guardado correcto a nivel de backend y UI en GHL.
|
|
if args.action == "toggle-status":
|
|
if not ensure_playwright_browsers():
|
|
print("ERROR: No hay binarios de Chromium disponibles para Playwright.")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", "Faltan binarios de Playwright (chromium)")
|
|
sys.exit(1)
|
|
success = toggle_via_playwright_dom(args.location, args.workflow_id, args.current_status, args.run_id)
|
|
if success:
|
|
# Sincronizar SQLite local inmediatamente
|
|
print("[INFO] Re-sincronizando base de datos local SQLite...")
|
|
try:
|
|
import sync_engine
|
|
tokens = sync_engine.get_tokens_map()
|
|
token = tokens.get(args.location)
|
|
if token:
|
|
workflows = sync_engine.ghl_client.get_workflows(token, args.location)
|
|
db.save_workflows(args.location, workflows)
|
|
print(f"[INFO] SQLite actualizado. {len(workflows)} workflows disponibles.")
|
|
except Exception as se:
|
|
print(f"[ADVERTENCIA] No se pudo re-sincronizar SQLite local de forma automática: {se}")
|
|
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "success")
|
|
sys.exit(0)
|
|
else:
|
|
print("[ERROR] Falló la automatización DOM para cambiar el estado.")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", "Falló la automatización DOM de cambio de estado")
|
|
sys.exit(1)
|
|
|
|
# 2. Para rename/delete usamos automatización DOM pura — el flujo de intercepción de tokens
|
|
# dejó de funcionar porque GHL ya no expone los endpoints internos por esa vía.
|
|
if args.action == "delete":
|
|
ok = delete_via_playwright_dom(args.location, args.workflow_id, args.run_id)
|
|
elif args.action == "rename":
|
|
if not args.new_name:
|
|
print("ERROR: --new-name es obligatorio para rename.")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", "Falta --new-name")
|
|
sys.exit(1)
|
|
ok = rename_via_playwright_dom(args.location, args.workflow_id, args.new_name, args.run_id)
|
|
else:
|
|
print(f"ERROR: acción desconocida '{args.action}'.")
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", f"Acción desconocida: {args.action}")
|
|
sys.exit(1)
|
|
|
|
if ok:
|
|
print("[INFO] Re-sincronizando SQLite local...")
|
|
_resync_local_workflows(args.location)
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "success")
|
|
sys.exit(0)
|
|
else:
|
|
if args.run_id:
|
|
script_audit.update_run_status(args.run_id, "failed", f"Falló automatización DOM ({args.action})")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main()
|