#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Administra Workflows de GHL de forma híbrida (intercepción de tokens + llamadas API + automatización DOM).""" import argparse import atexit import json as _json_top import os import queue import signal import subprocess import sys import threading import time from concurrent.futures import ThreadPoolExecutor ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) import db import error_logging import script_audit from script_logger import RunLogger from paths import SESSION_FILE, SCREENSHOTS_DIR # noqa: E402 # Si la sesión tiene más de este tiempo, advertimos al usuario antes de gastar tiempo en el navegador. SESSION_MAX_AGE_HOURS = 24 # Modo de persistencia del navegador: # - Por defecto: launch() + storage_state=generated/browser/session.json. El storage_state se # refresca al cerrar para capturar cookies emitidas por GHL durante la sesión. # - Si la variable de entorno GHL_BROWSER_PROFILE_DIR está definida, usamos un perfil de # Chrome persistente completo (cache, IndexedDB, cookies HttpOnly) en ese directorio. Más # parecido a un navegador real, pero impide correr scripts en paralelo (el perfil queda # bloqueado por un proceso a la vez). PERSISTENT_PROFILE_DIR = os.environ.get("GHL_BROWSER_PROFILE_DIR", "").strip() or None def _open_browser(playwright_p, headless=True): """Crea el browser/context según el modo (shared storage_state vs persistent profile). Devuelve (browser, context, page). En modo persistent, browser es None — el `context` representa toda la sesión y se cierra con `context.close()`. """ if PERSISTENT_PROFILE_DIR: os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True) print(f"[INFO] Modo perfil persistente: {PERSISTENT_PROFILE_DIR}") context = playwright_p.chromium.launch_persistent_context( PERSISTENT_PROFILE_DIR, headless=headless, viewport={"width": 1280, "height": 800}, ) page = context.pages[0] if context.pages else context.new_page() return None, context, page browser = playwright_p.chromium.launch(headless=headless) context = browser.new_context(storage_state=SESSION_FILE) page = context.new_page() return browser, context, page def _close_and_save(browser, context): """Cierra el browser y guarda el storage_state actualizado en SESSION_FILE. Esto refresca las cookies (incluyendo cualquier token rotado por GHL durante la sesión), así próximas ejecuciones empiezan con cookies frescas y la sesión dura más. En modo persistente no se guarda SESSION_FILE — el perfil se persiste solo. """ # Verificar que el browser está vivo antes de llamar storage_state. Si el event loop de # sync_playwright ya empezó a cerrar (p.ej. salimos del `with` o nos interrumpieron), # storage_state devuelve una coroutine huérfana y dispara RuntimeWarning / RecursionError. is_alive = False if browser is not None: try: is_alive = browser.is_connected() except Exception: is_alive = False try: if is_alive and context is not None and browser is not None: try: context.storage_state(path=SESSION_FILE) except Exception: pass # silencioso — si falla aquí es porque ya está cerrando if browser is not None: try: browser.close() except Exception: pass elif context is not None: # Modo persistent: cerrar el context guarda el perfil al disco. try: context.close() except Exception: pass finally: # Limpiar las referencias en el estado de interrupción. if _INTERRUPT_STATE.get("browser") is browser: _INTERRUPT_STATE["browser"] = None if _INTERRUPT_STATE.get("context") is context: _INTERRUPT_STATE["context"] = None def _launch_browser(playwright_p, headless=True): """Lanza solo el browser (sin context). Sirve para topología paralela: 1 browser → N contexts. En modo persistente devuelve (None, context) ya que `launch_persistent_context` no separa browser/context — y en ese caso no es paralelizable (lock del perfil). """ if PERSISTENT_PROFILE_DIR: os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True) print(f"[INFO] Modo perfil persistente: {PERSISTENT_PROFILE_DIR}") context = playwright_p.chromium.launch_persistent_context( PERSISTENT_PROFILE_DIR, headless=headless, viewport={"width": 1280, "height": 800}, ) return None, context browser = playwright_p.chromium.launch(headless=headless) return browser, None def _new_worker_context(browser): """Crea un nuevo BrowserContext+Page a partir del browser compartido, reusando SESSION_FILE.""" context = browser.new_context(storage_state=SESSION_FILE) page = context.new_page() return context, page # Estado por-worker para que el handler de interrupción pueda cerrar todos los contexts en paralelo. _WORKER_STATES = [] _WORKER_STATES_LOCK = threading.Lock() def _register_worker_state(state): with _WORKER_STATES_LOCK: _WORKER_STATES.append(state) def _unregister_worker_state(state): with _WORKER_STATES_LOCK: try: _WORKER_STATES.remove(state) except ValueError: pass def _run_parallel_bulk( items, action_fn, *, workers, run_id, action_label, per_action_pause, headless=True, resync_after=True, ): """Procesa `items` con N workers compartiendo un único browser. `action_fn(page, item, prefix, run_id)` debe devolver un dict con al menos: {"workflow_id", "location_id", "name", "status": "success"|"failed"|"skipped", "reason"?} Si lanza `SessionExpiredError`, el bulk aborta a todos los workers. """ from playwright.sync_api import sync_playwright # Defensa: el perfil persistente no admite múltiples contexts simultáneos sobre el mismo dir. effective_workers = 1 if PERSISTENT_PROFILE_DIR else max(1, min(int(workers or 1), 5)) if PERSISTENT_PROFILE_DIR and workers and int(workers) > 1: print("[INFO] Perfil persistente activo → forzando workers=1.", flush=True) # Agrupar por location_id para que cada worker procese sucursales contiguas (menos saltos). items_sorted = sorted(items, key=lambda it: (it.get("location_id") or "", it.get("workflow_id") or "")) work_q = queue.Queue() for it in items_sorted: work_q.put(it) total = work_q.qsize() results = [] results_lock = threading.Lock() cancel_event = threading.Event() counter = {"done": 0} counter_lock = threading.Lock() print(f"[METRICS] start action={action_label} workers={effective_workers} items={total} pause={per_action_pause}s", flush=True) t0 = time.monotonic() # Logger estructurado por-run (no-op si run_id es None). rlog = RunLogger(run_id, os.path.basename(__file__)) rlog.info("bulk_start", action=action_label, workers=effective_workers, items=total, per_action_pause=per_action_pause) def _worker(worker_idx): """Cada worker abre su propio sync_playwright + browser + context. Playwright sync API NO permite compartir un Browser entre threads (greenlet event loop por thread → 'cannot switch to a different thread'). """ pw = sync_playwright().start() owns_pw = True try: if PERSISTENT_PROFILE_DIR: os.makedirs(PERSISTENT_PROFILE_DIR, exist_ok=True) context = pw.chromium.launch_persistent_context( PERSISTENT_PROFILE_DIR, headless=headless, viewport={"width": 1280, "height": 800}, ) browser = None page = context.pages[0] if context.pages else context.new_page() else: browser = pw.chromium.launch(headless=headless) context = browser.new_context(storage_state=SESSION_FILE) page = context.new_page() except Exception as launch_err: print(f"[ERROR] worker {worker_idx} no pudo lanzar browser: {launch_err}", flush=True) try: pw.stop() except Exception: pass return state = {"context": context, "page": page, "worker_idx": worker_idx, "location_id": None, "workflow_id": None} _register_worker_state(state) try: while not cancel_event.is_set(): try: item = work_q.get_nowait() except queue.Empty: return wf_id = item.get("workflow_id") or item.get("id") loc_id = item.get("location_id") name = item.get("name") or wf_id or "?" with counter_lock: counter["done"] += 1 idx = counter["done"] short_loc = (loc_id or "")[:6] prefix = f"[BULK {idx}/{total} w{worker_idx} {short_loc}]" state["location_id"] = loc_id state["workflow_id"] = wf_id if not wf_id or not loc_id: print(f"{prefix} SKIP — falta workflow_id o location_id: {item}", flush=True) with results_lock: results.append({"workflow_id": wf_id, "location_id": loc_id, "name": name, "status": "skipped", "reason": "datos incompletos"}) print(f"{prefix} RESULT: skipped", flush=True) continue print(f"{prefix} === '{name}' ({wf_id}) en {loc_id} ===", flush=True) item_t0 = time.monotonic() rlog.info("item_start", worker_id=worker_idx, location_id=loc_id, workflow_id=wf_id, name=name) try: result = action_fn(page, item, prefix, run_id) if not isinstance(result, dict): result = {"workflow_id": wf_id, "location_id": loc_id, "name": name, "status": "success" if result else "failed"} result.setdefault("workflow_id", wf_id) result.setdefault("location_id", loc_id) result.setdefault("name", name) except SessionExpiredError as se: print(f"{prefix} SESIÓN EXPIRADA: {se}", flush=True) with results_lock: results.append({"workflow_id": wf_id, "location_id": loc_id, "name": name, "status": "skipped", "reason": "sesión expirada"}) rlog.error("session_expired", worker_id=worker_idx, location_id=loc_id, workflow_id=wf_id, message=str(se)) cancel_event.set() print(f"{prefix} RESULT: skipped", flush=True) break except Exception as ie: err_id = error_logging.log_error( "playwright_parallel_bulk_item_failed", ie, {"action": action_label, "location_id": loc_id, "workflow_id": wf_id, "run_id": run_id, "worker_id": worker_idx}, ) print(f"{prefix} EXCEPCIÓN: {ie} | error_id={err_id}", flush=True) result = {"workflow_id": wf_id, "location_id": loc_id, "name": name, "status": "failed", "error_id": err_id} rlog.error("item_failed", worker_id=worker_idx, location_id=loc_id, workflow_id=wf_id, error_id=err_id, message=str(ie)[:500]) with results_lock: results.append(result) print(f"{prefix} RESULT: {result['status']}", flush=True) rlog.info("item_done", worker_id=worker_idx, location_id=loc_id, workflow_id=wf_id, status=result.get("status"), change_id=result.get("change_id"), error_id=result.get("error_id"), duration_ms=int((time.monotonic() - item_t0) * 1000)) if resync_after and result.get("status") == "success": try: _resync_local_workflows(loc_id) except Exception: pass if per_action_pause and per_action_pause > 0: # Romper la espera si llega un cancel para reaccionar rápido. waited = 0.0 step = 0.25 while waited < per_action_pause and not cancel_event.is_set(): time.sleep(step) waited += step finally: _unregister_worker_state(state) # Persistir storage_state del PRIMER worker que termina limpio (best-effort). if not PERSISTENT_PROFILE_DIR and browser is not None: try: context.storage_state(path=SESSION_FILE) except Exception: pass try: context.close() except Exception: pass if browser is not None: try: browser.close() except Exception: pass if owns_pw: try: pw.stop() except Exception: pass try: if effective_workers == 1: _worker(1) else: with ThreadPoolExecutor(max_workers=effective_workers) as ex: futures = [ex.submit(_worker, i + 1) for i in range(effective_workers)] for f in futures: try: f.result() except Exception as fe: print(f"[ERROR] worker terminó con excepción: {fe}", flush=True) except Exception as e: err_id = error_logging.log_error( "playwright_parallel_bulk_failed", e, {"action": action_label, "items_count": total, "workers": effective_workers, "run_id": run_id}, ) print(f"ERROR fatal en bulk paralelo: {e} | error_id={err_id}", flush=True) # Rellenar items que quedaron en la queue tras un cancel (session expired). while True: try: it = work_q.get_nowait() except queue.Empty: break with results_lock: results.append({ "workflow_id": it.get("workflow_id") or it.get("id"), "location_id": it.get("location_id"), "name": it.get("name") or "?", "status": "skipped", "reason": "sesión expirada — bulk abortado", }) dt = time.monotonic() - t0 success = sum(1 for r in results if r["status"] == "success") skipped = sum(1 for r in results if r["status"] == "skipped") failed = sum(1 for r in results if r["status"] == "failed") rate = (total / dt) if dt > 0 else 0.0 print(f"[METRICS] end action={action_label} dt={dt:.1f}s rate={rate:.2f}/s success={success} skipped={skipped} failed={failed}", flush=True) rlog.info("bulk_end", action=action_label, duration_s=round(dt, 2), success=success, skipped=skipped, failed=failed, rate=round(rate, 3)) rlog.close() # Persistir métricas a logs (best-effort). try: from paths import LOGS_DIR os.makedirs(LOGS_DIR, exist_ok=True) metrics_path = os.path.join(LOGS_DIR, "parallel_runs.jsonl") with open(metrics_path, "a", encoding="utf-8") as mf: mf.write(_json_top.dumps({ "ts": datetime_now_str(), "run_id": run_id, "script": os.path.basename(sys.argv[0]) if sys.argv else "manager", "action": action_label, "workers": effective_workers, "items": total, "duration_s": round(dt, 2), "success": success, "skipped": skipped, "failed": failed, }, ensure_ascii=False) + "\n") except Exception: pass return results, failed, success, skipped def datetime_now_str(): from datetime import datetime as _dt return _dt.now().strftime("%Y-%m-%d %H:%M:%S") # Estado global para que los handlers de señales puedan cerrar el browser y reportar. _INTERRUPT_STATE = { "browser": None, "context": None, "playwright_ctx": None, "location_id": None, "workflow_id": None, "run_id": None, "target_active": None, "action": None, "shutting_down": False, } def _emergency_cleanup(signum=None, _frame=None): """Cierra el browser y reporta estado si el proceso es interrumpido (Ctrl+C, server restart, etc.).""" if _INTERRUPT_STATE["shutting_down"]: return br = _INTERRUPT_STATE.get("browser") # Detectar si el browser ya está cerrado (ruta normal). En ese caso no hay nada que limpiar. browser_alive = False if br is not None: try: browser_alive = br.is_connected() except Exception: browser_alive = False # Si venimos por atexit y el browser ya no vive y no hay run_id pendiente, salir silencioso. if signum is None and not browser_alive: return _INTERRUPT_STATE["shutting_down"] = True sig_name = "atexit" if signum is None else f"signal {signum}" print(f"\n[INTERRUPCIÓN] Proceso terminado por {sig_name}. Cerrando navegador...", flush=True) # Cerrar contexts de workers paralelos (si los hay) antes que el browser. try: with _WORKER_STATES_LOCK: worker_snapshots = list(_WORKER_STATES) for st in worker_snapshots: wctx = st.get("context") if wctx is None: continue try: wctx.close() except Exception: pass if worker_snapshots: print(f"[INTERRUPCIÓN] Cerrados {len(worker_snapshots)} context(s) de workers.", flush=True) except Exception: pass # Cerrar el browser si todavía vive. Primero refrescar storage_state si hay context. if browser_alive: ctx = _INTERRUPT_STATE.get("context") if ctx is not None and br is not None and not PERSISTENT_PROFILE_DIR: try: ctx.storage_state(path=SESSION_FILE) print(f"[INTERRUPCIÓN] Cookies refrescadas en {SESSION_FILE}.", flush=True) except Exception: pass try: br.close() print("[INTERRUPCIÓN] Navegador cerrado limpiamente.", flush=True) except Exception: pass # Intentar reportar el estado real consultando la API (sólo si era toggle-status — mutaciones más críticas). loc_id = _INTERRUPT_STATE.get("location_id") wf_id = _INTERRUPT_STATE.get("workflow_id") action = _INTERRUPT_STATE.get("action") target_active = _INTERRUPT_STATE.get("target_active") if loc_id and wf_id and action == "toggle-status": try: import sync_engine tokens = sync_engine.get_tokens_map() tok = tokens.get(loc_id) if tok: wfs = sync_engine.ghl_client.get_workflows(tok, loc_id) actual = next((w for w in wfs if w.get("id") == wf_id), None) if actual: st = (actual.get("status") or "").lower() actual_active = st in ("active", "published") if target_active is None: print(f"[INTERRUPCIÓN] Estado actual en API: '{st}'.", flush=True) elif actual_active == target_active: print(f"[INTERRUPCIÓN] El cambio SÍ se aplicó pese a la interrupción (API: '{st}').", flush=True) else: print(f"[INTERRUPCIÓN] El cambio NO se aplicó (API: '{st}'). Reintenta cuando estés listo.", flush=True) except Exception as ce: print(f"[INTERRUPCIÓN] No se pudo consultar API para reportar estado: {ce}", flush=True) # Marcar la auditoría como interrumpida. run_id = _INTERRUPT_STATE.get("run_id") if run_id: try: script_audit.update_run_status(run_id, "failed", f"Proceso interrumpido ({sig_name})") except Exception: pass # Si fue por señal, salir con código convencional (130 = SIGINT, 143 = SIGTERM). if signum is not None: sys.exit(130 if signum in (signal.SIGINT,) else 143) def _install_signal_handlers(): """Instala handlers para SIGINT, SIGTERM y (en Windows) SIGBREAK.""" try: signal.signal(signal.SIGINT, _emergency_cleanup) except Exception: pass try: signal.signal(signal.SIGTERM, _emergency_cleanup) except Exception: pass # En Windows, Ctrl+Break envía SIGBREAK. if hasattr(signal, "SIGBREAK"): try: signal.signal(signal.SIGBREAK, _emergency_cleanup) except Exception: pass atexit.register(_emergency_cleanup) def ensure_playwright_browsers(): """Verifica que los binarios de Chromium de Playwright existan; si no, los instala. Devuelve True si están listos para usar, False si no se pudo dejarlos disponibles. """ try: from playwright.sync_api import sync_playwright except Exception as imp_err: print(f"ERROR: El paquete 'playwright' no está instalado: {imp_err}") print("Solución: python -m pip install playwright") return False try: with sync_playwright() as p: exe_path = p.chromium.executable_path if exe_path and os.path.exists(exe_path): return True except Exception: # Si executable_path lanza, asumimos que faltan binarios y dejamos que la instalación los repare. pass print("[INFO] Binarios de Chromium no encontrados. Instalando con 'playwright install chromium'...") try: result = subprocess.run( [sys.executable, "-m", "playwright", "install", "chromium"], capture_output=True, text=True, timeout=600, ) if result.returncode != 0: print(f"ERROR: 'playwright install chromium' falló (código {result.returncode}).") if result.stderr: print(result.stderr.strip()) return False print("[INFO] Binarios de Chromium instalados correctamente.") return True except Exception as inst_err: print(f"ERROR ejecutando 'playwright install chromium': {inst_err}") return False def session_file_status(): """Devuelve (existe, edad_en_horas o None) del archivo de sesión.""" if not os.path.exists(SESSION_FILE): return False, None age_hours = (time.time() - os.path.getmtime(SESSION_FILE)) / 3600.0 return True, age_hours SCREENSHOT_DIR = SCREENSHOTS_DIR # alias retrocompatible def _save_debug_screenshot(page, label: str) -> str: """Guarda una captura de pantalla de diagnóstico y devuelve la ruta.""" os.makedirs(SCREENSHOT_DIR, exist_ok=True) ts = time.strftime("%Y%m%d_%H%M%S") path = os.path.join(SCREENSHOT_DIR, f"{label}_{ts}.png") try: page.screenshot(path=path, full_page=True) print(f"[DEBUG] Captura guardada: {path}") except Exception: pass return path _SWITCH_SEL = ".n-switch, .n-switch__rail" _MODAL_SEL = "button:has-text('Entendido'), button:has-text('entendido'), button:has-text('Got it'), button:has-text('Deshabilitar')" _MODAL_CLOSE_SEL = "button:has-text('Entendido'), button:has-text('entendido'), button:has-text('Got it')" _SAVE_SEL = "button:has-text('Save'), button:has-text('Guardar'), button:has-text('save'), button:has-text('guardar')" def _find_builder_frame(page, timeout_sec=35): """ Escanea todos los frames (principal + iframes) buscando el switch de estado o el modal del AI Builder. El workflow builder de GHL carga dentro de un iframe, por lo que los selectores no funcionan sobre el frame principal. Devuelve (frame, "switch"|"modal"|"timeout"|"login_redirect"). Detecta login redirect tanto en page como en iframes (sesión expirada). """ for attempt in range(timeout_sec): if _any_frame_at_login(page): return None, "login_redirect" for frame in page.frames: try: if frame.locator(_SWITCH_SEL).count() > 0: return frame, "switch" if frame.locator(_MODAL_SEL).count() > 0: return frame, "modal" except Exception: continue if attempt % 5 == 4: frame_urls = [f.url for f in page.frames] try: current_url = page.url except Exception: current_url = "(unknown)" print(f"[DEBUG] URL: {current_url} | Título: {page.title()} | Frames: {len(page.frames)} | Intento {attempt+1}/{timeout_sec}") print(f"[DEBUG] URLs de frames: {frame_urls}") time.sleep(1) return None, "timeout" def _verify_status_via_api(location_id, workflow_id, target_active, max_attempts=6, base_wait_sec=3): """Verifica contra la API de GHL que el status del workflow coincida con el deseado. Hace `max_attempts` pasadas con backoff lineal (base_wait_sec, 2*base, 3*base, ...). GHL puede tardar 7-20 s en propagar un cambio del builder a su API, así que reintentos son necesarios. Devuelve True si en algún intento el estado coincide, False si caduca. """ try: import sync_engine except Exception as imp_err: print(f"[ADVERTENCIA] No se pudo importar sync_engine: {imp_err}") return False tokens = sync_engine.get_tokens_map() token = tokens.get(location_id) if not token: print("[ADVERTENCIA] No hay token en el CSV — se omite validación API.") return False for attempt in range(1, max_attempts + 1): wait_sec = base_wait_sec * attempt print(f"[INFO] Validación API intento {attempt}/{max_attempts} (espera {wait_sec}s antes de consultar)...") time.sleep(wait_sec) try: wfs = sync_engine.ghl_client.get_workflows(token, location_id) except Exception as ge: print(f"[DEBUG] Error consultando API: {ge}") continue actual = next((w for w in wfs if w.get("id") == workflow_id), None) if actual is None: print(f"[DEBUG] El workflow {workflow_id} aún no aparece en la API.") continue actual_status = (actual.get("status") or "").lower() actual_is_active = actual_status in ("active", "published") print(f"[INFO] Estado en API: '{actual_status}' (active={actual_is_active}) — esperado: active={target_active}") if actual_is_active == target_active: print(f"[ÉXITO] Cambio confirmado en API tras {attempt} intento(s).") return True print(f"[ADVERTENCIA] La API no reflejó el cambio tras {max_attempts} intentos.") return False def _perform_toggle_on_page(page, location_id, workflow_id, current_status): """Hace el toggle Publicar/Borrador en una `page` ya creada (no abre/cierra browser). Devuelve True si el cambio quedó confirmado en API, False si algo falló. Esta función se invoca tanto desde toggle_via_playwright_dom (caso individual) como desde bulk_draft_via_playwright_dom (caso lote, reusa un solo browser para N workflows). """ target_active = current_status not in ("active", "published") target_url = f"https://crm.bucefalocrm.io/location/{location_id}/workflow/{workflow_id}" print(f"[INFO] Navegando directamente a: {target_url}") page.goto(target_url, wait_until="domcontentloaded", timeout=30000) print("[INFO] Buscando frame del workflow builder (puede estar en un iframe)...") builder_frame, found_element = _find_builder_frame(page) if found_element == "login_redirect": print(f"[ERROR] Redirección a login: {page.url}") _save_debug_screenshot(page, "login_redirect") raise SessionExpiredError("Bucéfalo redirigió al login durante toggle") if found_element == "timeout": print(f"[ERROR] Tiempo agotado. URL: {page.url} | Título: {page.title()}") _save_debug_screenshot(page, "timeout_no_switch") return False print(f"[INFO] Builder encontrado en frame: {builder_frame.url[:80]} | Elemento: {found_element}") if found_element == "modal": print("[INFO] Cerrando modal 'AI Builder habilitado' con botón 'Entendido'...") try: builder_frame.locator(_MODAL_CLOSE_SEL).first.click() print("[INFO] Modal cerrado. Esperando switch...") for _ in range(12): if builder_frame.locator(_SWITCH_SEL).count() > 0: break time.sleep(1) else: print("[ERROR] Switch no apareció tras cerrar el modal.") _save_debug_screenshot(page, "no_switch_after_modal") return False except Exception as me: print(f"[ADVERTENCIA] No se pudo cerrar el modal: {me}") print("[INFO] Localizando interruptor de estado (Naive UI)...") all_sw = builder_frame.locator(".n-switch").all() print(f"[DEBUG] Total de switches en el frame: {len(all_sw)}") publish_switch = None for candidate in builder_frame.get_by_role("switch").all(): try: ancestor_text = candidate.locator("xpath=ancestor::*[4]").first.inner_text(timeout=1000) if any(kw in ancestor_text.lower() for kw in ("publicar", "borrador", "publish", "draft")): publish_switch = candidate break except Exception: continue if publish_switch is None: role_switches = builder_frame.get_by_role("switch").all() publish_switch = role_switches[-1] if role_switches else builder_frame.locator(".n-switch").first switch = publish_switch print("[INFO] Esperando a que el iframe del builder termine de cargar...") try: builder_frame.wait_for_load_state("networkidle", timeout=25000) except Exception: pass time.sleep(3) cls = switch.get_attribute("class") or "" aria_checked = switch.get_attribute("aria-checked") is_active = (aria_checked == "true") or ("n-switch--active" in cls) print(f"[INFO] Interruptor (estabilizado): {'Publicado' if is_active else 'Borrador'} (aria-checked={aria_checked})") print(f"[INFO] Estado deseado: {'Publicado' if target_active else 'Borrador'}") if is_active == target_active: print("[INFO] El workflow ya está en el estado deseado.") return True def _close_all_modals(): closed_any = False for frame_candidate in [builder_frame, page.main_frame]: try: if frame_candidate.locator(_MODAL_SEL).count() > 0: frame_candidate.locator(_MODAL_CLOSE_SEL).first.click() closed_any = True time.sleep(0.7) except Exception: continue return closed_any for _ in range(3): if not _close_all_modals(): break now_active = is_active for attempt in range(1, 4): print(f"[INFO] Click en interruptor (intento {attempt}/3)...") try: switch.click() except Exception as ce: print(f"[DEBUG] Excepción al clickear: {ce}") time.sleep(2) _close_all_modals() time.sleep(0.5) new_checked = switch.get_attribute("aria-checked") new_cls = switch.get_attribute("class") or "" now_active = (new_checked == "true") or ("n-switch--active" in new_cls) print(f"[INFO] Estado del switch tras intento {attempt}: {'Publicado' if now_active else 'Borrador'}") if now_active == target_active: break if now_active != target_active: print("[ERROR] El switch no cambió al estado deseado después de 3 intentos.") _save_debug_screenshot(page, "switch_no_change") return False print("[INFO] Localizando botón Guardar/Save...") _SAVE_ID_SEL = "#cmp-header__btn--save-workflow" save_button = None for loc in [ builder_frame.locator(_SAVE_ID_SEL), builder_frame.locator(_SAVE_SEL), page.locator(_SAVE_ID_SEL), page.locator(_SAVE_SEL), ]: if loc.count() > 0: save_button = loc.first break if save_button is None: print("[ERROR] No se encontró el botón de Guardar.") _save_debug_screenshot(page, "no_save_button") return False btn_text = "" try: btn_text = save_button.inner_text(timeout=2000).strip().lower() except Exception: pass print(f"[DEBUG] Texto del botón antes de guardar: '{btn_text}'") if any(saved_kw in btn_text for saved_kw in ("guardado", "saved")): print("[ÉXITO] El cambio se guardó automáticamente (botón muestra 'Guardado').") # Aun así validamos contra API por consistencia. else: print("[INFO] Haciendo click en 'Guardar'...") save_button.click() print("[INFO] Esperando confirmación de guardado (hasta 15 s)...") saved_confirmed = False for _ in range(15): try: txt_now = save_button.inner_text(timeout=1000).strip().lower() if any(kw in txt_now for kw in ("guardado", "saved")): saved_confirmed = True break except Exception: pass time.sleep(1) if saved_confirmed: print("[ÉXITO] Workflow guardado: el botón ahora muestra 'Guardado'/'Saved'.") else: print("[ADVERTENCIA] No se vio el texto 'Guardado' tras 15 s — validaremos contra API.") _save_debug_screenshot(page, "save_not_confirmed") time.sleep(3) api_confirmed = _verify_status_via_api(location_id, workflow_id, target_active, max_attempts=6, base_wait_sec=3) if not api_confirmed: print("[INFO] Intentando recargar la página y reverificar (workaround bug visual)...") try: page.reload(wait_until="domcontentloaded", timeout=30000) time.sleep(5) api_confirmed = _verify_status_via_api(location_id, workflow_id, target_active, max_attempts=4, base_wait_sec=4) except Exception as re: print(f"[DEBUG] No se pudo recargar: {re}") if not api_confirmed: print("[ERROR] El cambio no se reflejó en la API de GHL tras múltiples reintentos.") _save_debug_screenshot(page, "api_not_confirmed") return False return True def toggle_via_playwright_dom(location_id, workflow_id, current_status, run_id=None): """Caso individual: abre browser, hace toggle, valida contra API y cierra.""" from playwright.sync_api import sync_playwright print(f"[INFO] Iniciando automatización DOM para {workflow_id}...") target_active = current_status not in ("active", "published") _INTERRUPT_STATE.update({ "location_id": location_id, "workflow_id": workflow_id, "run_id": run_id, "target_active": target_active, "action": "toggle-status", }) try: with sync_playwright() as p: print("[INFO] Lanzando navegador en segundo plano (headless)...") browser, context, page = _open_browser(p) _INTERRUPT_STATE["browser"] = browser _INTERRUPT_STATE["context"] = context try: ok = _perform_toggle_on_page(page, location_id, workflow_id, current_status) finally: _close_and_save(browser, context) return ok except Exception as e: error_id = error_logging.log_error("playwright_dom_toggle_failed", e, {"location_id": location_id, "workflow_id": workflow_id, "run_id": run_id, "action": "toggle-status"}) print(f"ERROR en la automatización DOM de Playwright: {str(e)} | error_id={error_id}") return False def _perform_toggle_item(page, item, prefix, run_id, target_active): """Procesa un solo workflow del bulk de toggle. Devuelve dict-resultado. Registra el cambio en script_audit (planned→applied/failed). Propaga SessionExpiredError marcando el change como failed antes (para abortar el bulk). """ wf_id = item.get("workflow_id") or item.get("id") loc_id = item.get("location_id") curr = (item.get("current_status") or item.get("status") or "").lower() name = item.get("name") or wf_id or "?" currently_active = curr in ("active", "published") target_value = "published" if target_active else "draft" if currently_active == target_active: already_label = "publicado" if target_active else "borrador" print(f"{prefix} SKIP — ya está en {already_label} (status='{curr}').", flush=True) return {"workflow_id": wf_id, "location_id": loc_id, "name": name, "status": "skipped", "reason": f"ya en {already_label}"} change_id = None if run_id: try: change_id = script_audit.record_change( run_id, loc_id, "workflow", wf_id, field_id="status", field_name="status", old_value=curr or None, new_value=target_value, ) except Exception as ae: print(f"{prefix} [WARN] No se pudo registrar change planned: {ae}", flush=True) try: ok = _perform_toggle_on_page(page, loc_id, wf_id, curr) except SessionExpiredError: if change_id: try: script_audit.mark_change(change_id, "failed", "session_expired") except Exception: pass raise except Exception as ie: if change_id: try: script_audit.mark_change(change_id, "failed", str(ie)[:500]) except Exception: pass raise if change_id: try: if ok: script_audit.mark_change(change_id, "applied") else: script_audit.mark_change(change_id, "failed", "mutacion_no_confirmada_por_api") except Exception: pass return {"workflow_id": wf_id, "location_id": loc_id, "name": name, "status": "success" if ok else "failed", "change_id": change_id} def bulk_toggle_via_playwright_dom(items, target_active, run_id=None, workers=1, per_action_pause=3.0): """Pone una lista de workflows en Borrador (target_active=False) o Publicado (target_active=True). Reusa un único browser entre items. Con workers>1 se reparte sobre N contexts paralelos. `items`: lista de dicts {workflow_id, location_id, current_status, name}. Devuelve True si todos los que necesitaban cambio se procesaron OK; False si hubo fallos. """ target_label = "Publicado" if target_active else "Borrador" action_label = "BULK-PUBLISH" if target_active else "BULK-DRAFT" print(f"\n[{action_label}] Iniciando procesamiento de {len(items)} workflows (target: {target_label}) con {workers} worker(s)...") _INTERRUPT_STATE.update({ "action": "bulk-publish" if target_active else "bulk-draft", "run_id": run_id, "target_active": target_active, }) def _action(page, item, prefix, rid): return _perform_toggle_item(page, item, prefix, rid, target_active) results, failed, success, skipped = _run_parallel_bulk( items, _action, workers=workers, run_id=run_id, action_label=action_label.lower(), per_action_pause=per_action_pause, resync_after=True, ) print(f"\n=== RESUMEN {action_label} ===") print(f"Total: {len(items)} | Éxitos: {success} | Ya en {target_label.lower()}: {skipped} | Fallos: {failed}") for r in results: symbol = {"success": "OK", "skipped": "--", "failed": "XX"}.get(r["status"], "??") suffix = f" ({r.get('reason')})" if r.get("reason") else "" wf_short = (r.get("workflow_id") or "")[:8] loc_short = (r.get("location_id") or "")[:12] print(f" [{symbol}] {r.get('name')} ({wf_short}) en {loc_short}{suffix}") return failed == 0 # Compat: mantener alias por si alguien lo invoca por el nombre viejo. def bulk_draft_via_playwright_dom(items, run_id=None): return bulk_toggle_via_playwright_dom(items, target_active=False, run_id=run_id) # Frases del menú de 3 puntos en español e inglés (Bucéfalo soporta ambos). _DELETE_MENU_TEXTS = ["Eliminar flujo de trabajo", "Delete workflow", "Delete Workflow"] _RENAME_MENU_TEXTS = ["Renombrar flujo de trabajo", "Rename workflow", "Rename Workflow"] # Texto exacto que el modal pide escribir para confirmar borrado. _DELETE_CONFIRM_TOKENS = ["Borrar", "Delete"] _DISMISS_BUTTON_SELECTOR = ( 'button:has-text("Entendido"), button:has-text("entendido"), ' 'button:has-text("Got it"), button:has-text("Got It"), ' 'button:has-text("OK"), button:has-text("Aceptar")' ) _LIST_ROW_SELECTORS = [ "tbody tr", ".n-data-table-tr", "[class*='workflow-row']", "[class*='workflows-list'] tr", "table tr:not(:has(th))", ] def _dismiss_blocking_modals(scope): """Cierra modales de bienvenida tipo 'AI Builder habilitado'. `scope` = page o frame.""" try: btn = scope.locator(_DISMISS_BUTTON_SELECTOR).first if btn.count() > 0: try: if btn.is_visible(timeout=500): print("[INFO] Cerrando modal bloqueante...") btn.click() time.sleep(1) return True except Exception: pass except Exception: pass return False def _scope_has_table(scope): """True si el scope (page o frame) ya muestra la tabla de workflows.""" for sel in _LIST_ROW_SELECTORS: try: if scope.locator(sel).count() > 0: return sel except Exception: continue return None class SessionExpiredError(Exception): """Bucéfalo redirigió al login → toda interacción posterior va a fallar.""" pass def _any_frame_at_login(page): """True si la página principal o cualquier iframe está mostrando la pantalla de login. Importante: Bucéfalo es un SPA, así que cuando una request da 401 el cliente NO redirige a /login sino que renderiza el componente de login DENTRO del mismo iframe sin cambiar la URL. Por eso revisamos también el DOM (no solo la URL). """ # Check 1: URL del frame (cubre redirects HTTP reales tipo /login). try: if "login" in page.url.lower() or "/auth" in page.url.lower(): return True except Exception: pass for frame in page.frames: try: url = (frame.url or "").lower() if "login" in url or "/auth" in url: return True except Exception: continue # Check 2: DOM del frame (cubre el caso SPA donde el iframe muestra login pero la URL # sigue siendo /automation/workflows). Buscar input de password + texto de login. login_dom_signals = ( 'input[type="password"]', 'text=/Inicia.*sesi[oó]n.*cuenta/i', 'text=/Sign in to your account/i', ) for scope in [page] + list(page.frames): for sig in login_dom_signals: try: if scope.locator(sig).count() > 0: # Verificar que sea VISIBLE (no esté oculto en algún div fuera de pantalla). el = scope.locator(sig).first try: if el.is_visible(timeout=300): return True except Exception: # Si is_visible falla, asumir que está visible si hay match. return True except Exception: continue return False def _find_workflows_frame(page, timeout_sec=40): """Escanea todos los frames buscando aquel que contenga la tabla de workflows. Devuelve el scope (frame o page) donde operar, o None si caduca. Lanza SessionExpiredError si detecta que algún frame se fue al login.""" deadline = time.time() + timeout_sec while time.time() < deadline: if _any_frame_at_login(page): urls = [f.url for f in page.frames] print(f"[ERROR] Redirección a login detectada. URL principal: {page.url} | Frames: {urls}") raise SessionExpiredError("Bucéfalo redirigió al login") # Probar primero la página principal y luego cada iframe. scopes = [page] + list(page.frames) for scope in scopes: # Cerrar modal si existe en este scope antes de medir filas. _dismiss_blocking_modals(scope) sel = _scope_has_table(scope) if sel: label = "main" if scope is page else f"frame {getattr(scope, 'url', '')[:80]}" print(f"[DEBUG] Tabla en {label} (selector: {sel}, filas: {scope.locator(sel).count()}).") time.sleep(3) # binding de listeners # Cerrar un modal más si reapareció tras render. _dismiss_blocking_modals(scope) return scope time.sleep(1) return None def _open_workflows_list(page, location_id, timeout_ms=45000): """Navega al listado de workflows. Devuelve el scope (page o frame) o None. Puede lanzar SessionExpiredError si Bucéfalo redirige al login.""" url = ( f"https://crm.bucefalocrm.io/v2/location/{location_id}" "/automation/workflows?listTab=all&tab=recent&filter=all" ) print(f"[INFO] Navegando al listado: {url}") page.goto(url, wait_until="domcontentloaded", timeout=timeout_ms) # Espera breve para que el iframe inicie su redirección si la sesión está muerta. time.sleep(2) if _any_frame_at_login(page): print(f"[ERROR] Sesión expirada: algún frame está en login. URL principal: {page.url}") raise SessionExpiredError("Bucéfalo redirigió al login") print("[INFO] Buscando frame de la tabla de workflows (puede estar en iframe)...") scope = _find_workflows_frame(page) if scope is None: print("[ERROR] La tabla de workflows no cargó dentro del tiempo esperado.") _save_debug_screenshot(page, "list_load_timeout") return None return scope def _resolve_workflow_name(location_id, workflow_id): """Busca en SQLite el nombre del workflow para localizarlo por texto en la UI.""" try: rows = db.get_workflows(location_id) for w in rows: if w.get("id") == workflow_id: return w.get("name") except Exception as e: print(f"[DEBUG] No se pudo resolver nombre del workflow desde SQLite: {e}") return None def _find_workflow_row(scope, workflow_id, workflow_name=None): """Devuelve el locator de la fila que contiene el workflow_id, o None. `scope` puede ser una Page o un Frame. Si el ID no aparece en el DOM, intenta por nombre.""" candidates = [ f'tr:has(a[href*="{workflow_id}"])', f'tr:has([href*="{workflow_id}"])', f'[data-id="{workflow_id}"]', f'tr:has-text("{workflow_id}")', ] for sel in candidates: try: loc = scope.locator(sel).first if loc.count() > 0: print(f"[DEBUG] Fila localizada con selector: {sel}") return loc except Exception: continue if workflow_name: try: loc = scope.locator(f'tr:has-text("{workflow_name}")').first if loc.count() > 0: print(f"[DEBUG] Fila localizada por nombre: '{workflow_name}'") return loc except Exception: pass # Diagnóstico: imprimir info de las filas presentes para iterar selectores. try: rows = scope.locator("tbody tr").all() print(f"[DEBUG] Diagnóstico: {len(rows)} filas en la tabla. Ni ID ni nombre matchearon.") for idx, r in enumerate(rows[:5]): try: txt = r.inner_text(timeout=500).replace("\n", " | ")[:120] hrefs = [] for a in r.locator("a").all()[:3]: try: href = a.get_attribute("href") if href: hrefs.append(href[:80]) except Exception: pass print(f"[DEBUG] row[{idx}] text='{txt}' hrefs={hrefs}") except Exception: continue except Exception: pass return None def _open_row_menu(scope, row, workflow_id): """Hace click en el botón de 3 puntos (•••) de la fila.""" # GHL/Naive usa botones con icono de ellipsis o role=button al final de la fila. menu_selectors = [ 'button[aria-label*="More"]', 'button[aria-label*="more"]', 'button:has(.fa-ellipsis-vertical)', 'button:has(.fa-ellipsis)', 'button:has(i[class*="ellipsis"])', '[class*="more-options"]', '[class*="three-dots"]', 'button[class*="ellipsis"]', ] for sel in menu_selectors: try: btn = row.locator(sel) if btn.count() > 0: print(f"[DEBUG] Botón menú con selector: {sel}") btn.first.click() return True except Exception: continue # Fallback: el último botón de la fila suele ser el menú. try: buttons = row.locator("button").all() if buttons: print(f"[DEBUG] Fallback: click en el último botón de la fila ({len(buttons)} botones).") buttons[-1].click() return True except Exception: pass print(f"[ERROR] No se encontró el botón ••• en la fila del workflow {workflow_id}.") return False def _click_menu_item(scope, texts, timeout_sec=10): """Click en el primer item del menú flotante que coincida con cualquiera de los textos. `scope` puede ser Page o Frame.""" deadline = time.time() + timeout_sec while time.time() < deadline: for txt in texts: try: item = scope.get_by_text(txt, exact=False).first if item.count() > 0: item.click() print(f"[INFO] Click en item: '{txt}'.") return True except Exception: continue time.sleep(0.5) print(f"[ERROR] No apareció ningún item del menú entre: {texts}") return False def _verify_deletion_via_api(location_id, workflow_id, max_attempts=4, base_wait_sec=3): """Confirma que el workflow ya no existe en la API de GHL. Devuelve True si fue eliminado.""" try: import sync_engine except Exception: return False tokens = sync_engine.get_tokens_map() token = tokens.get(location_id) if not token: print("[ADVERTENCIA] No hay token en el CSV — se omite validación API.") return False for attempt in range(1, max_attempts + 1): wait_sec = base_wait_sec * attempt print(f"[INFO] Validación API intento {attempt}/{max_attempts} (espera {wait_sec}s)...") time.sleep(wait_sec) try: wfs = sync_engine.ghl_client.get_workflows(token, location_id) except Exception as ge: print(f"[DEBUG] Error consultando API: {ge}") continue exists = any(w.get("id") == workflow_id for w in wfs) if not exists: print(f"[ÉXITO] Cambio confirmado en API tras {attempt} intento(s).") return True print(f"[DEBUG] El workflow aún aparece en API (intento {attempt}).") print(f"[ADVERTENCIA] La API sigue mostrando el workflow tras {max_attempts} intentos.") return False def _perform_delete_on_page(page, location_id, workflow_id, workflow_name=None): """Borra un workflow en una `page` ya creada (no abre/cierra browser). Devuelve True si OK.""" scope = _open_workflows_list(page, location_id) if scope is None: _save_debug_screenshot(page, "delete_list_failed") return False row = _find_workflow_row(scope, workflow_id, workflow_name) if row is None: print(f"[ERROR] No se encontró la fila del workflow {workflow_id} en la tabla.") _save_debug_screenshot(page, "delete_no_row") return False try: row.hover() except Exception: pass if not _open_row_menu(scope, row, workflow_id): _save_debug_screenshot(page, "delete_no_menu_btn") return False print("[INFO] Esperando menú flotante con la opción 'Eliminar flujo de trabajo'...") if not _click_menu_item(scope, _DELETE_MENU_TEXTS, timeout_sec=12): _save_debug_screenshot(page, "delete_no_menu_item") return False print("[INFO] Esperando modal de confirmación...") time.sleep(1.5) confirm_input = None confirm_token = None for tok in _DELETE_CONFIRM_TOKENS: try: candidate = scope.locator(f'input[placeholder="{tok}"]') if candidate.count() > 0: confirm_input = candidate.first confirm_token = tok break except Exception: continue if confirm_input is None: try: modal_inputs = scope.locator('div[role="dialog"] input, .modal input, .n-modal input').all() if modal_inputs: confirm_input = modal_inputs[0] confirm_token = _DELETE_CONFIRM_TOKENS[0] except Exception: pass if confirm_input is None: print("[ERROR] No se encontró el input de confirmación del modal.") _save_debug_screenshot(page, "delete_no_confirm_input") return False print(f"[INFO] Escribiendo '{confirm_token}' en el input de confirmación...") confirm_input.fill(confirm_token) time.sleep(0.5) print("[INFO] Click en el botón final de confirmación...") confirm_clicked = False for tok in _DELETE_CONFIRM_TOKENS: try: btn = scope.locator(f'div[role="dialog"] button:has-text("{tok}"), .n-modal button:has-text("{tok}")').last if btn.count() > 0: btn.click() confirm_clicked = True break except Exception: continue if not confirm_clicked: try: dlg_buttons = scope.locator('div[role="dialog"] button, .n-modal button').all() if dlg_buttons: dlg_buttons[-1].click() confirm_clicked = True except Exception: pass if not confirm_clicked: print("[ERROR] No se pudo hacer click en el botón final de borrado.") _save_debug_screenshot(page, "delete_no_confirm_btn") return False print("[INFO] Borrado disparado. Esperando 5 s a que GHL procese...") time.sleep(5) try: still_there = _find_workflow_row(scope, workflow_id) is not None except Exception: still_there = False if not still_there: print("[INFO] La fila ya no aparece en la tabla — validando contra API...") else: print("[INFO] La fila aún aparece en la tabla; validando contra API por si GHL aún procesa...") # Validación API: el workflow no debe aparecer en GET /workflows/. return _verify_deletion_via_api(location_id, workflow_id) def delete_via_playwright_dom(location_id, workflow_id, run_id=None): """Caso individual: abre browser, borra workflow, cierra. Valida contra API.""" from playwright.sync_api import sync_playwright print(f"[INFO] Iniciando borrado DOM para workflow {workflow_id} en {location_id}...") workflow_name = _resolve_workflow_name(location_id, workflow_id) if workflow_name: print(f"[INFO] Nombre del workflow (SQLite): '{workflow_name}'") _INTERRUPT_STATE.update({ "location_id": location_id, "workflow_id": workflow_id, "run_id": run_id, "target_active": None, "action": "delete", }) try: with sync_playwright() as p: browser, context, page = _open_browser(p) _INTERRUPT_STATE["browser"] = browser _INTERRUPT_STATE["context"] = context try: ok = _perform_delete_on_page(page, location_id, workflow_id, workflow_name) finally: _close_and_save(browser, context) return ok except Exception as e: error_id = error_logging.log_error("playwright_dom_delete_failed", e, {"location_id": location_id, "workflow_id": workflow_id, "run_id": run_id, "action": "delete"}) print(f"ERROR en delete DOM: {str(e)} | error_id={error_id}") return False def _perform_delete_item(page, item, prefix, run_id): """Procesa un solo workflow del bulk de delete. Devuelve dict-resultado. Registra el cambio en script_audit (planned→applied/failed). Propaga SessionExpiredError marcando el change como failed antes. """ wf_id = item.get("workflow_id") or item.get("id") loc_id = item.get("location_id") name = item.get("name") or _resolve_workflow_name(loc_id, wf_id) or wf_id or "?" change_id = None if run_id: try: change_id = script_audit.record_change( run_id, loc_id, "workflow", wf_id, field_id="_workflow", field_name="deleted", old_value=name, new_value=None, ) except Exception as ae: print(f"{prefix} [WARN] No se pudo registrar change planned: {ae}", flush=True) try: ok = _perform_delete_on_page(page, loc_id, wf_id, name) except SessionExpiredError: if change_id: try: script_audit.mark_change(change_id, "failed", "session_expired") except Exception: pass raise except Exception as ie: if change_id: try: script_audit.mark_change(change_id, "failed", str(ie)[:500]) except Exception: pass raise if change_id: try: if ok: script_audit.mark_change(change_id, "applied") else: script_audit.mark_change(change_id, "failed", "delete_no_confirmado_por_api") except Exception: pass return {"workflow_id": wf_id, "location_id": loc_id, "name": name, "status": "success" if ok else "failed", "change_id": change_id} def bulk_delete_via_playwright_dom(items, run_id=None, workers=1, per_action_pause=3.0): """Elimina una lista de workflows reusando un único browser entre items. Con workers>1, reparte sobre N contexts paralelos compartiendo el browser. """ print(f"\n[BULK-DELETE] Iniciando procesamiento de {len(items)} workflows con {workers} worker(s)...") _INTERRUPT_STATE.update({ "action": "bulk-delete", "run_id": run_id, "target_active": None, }) results, failed, success, skipped = _run_parallel_bulk( items, _perform_delete_item, workers=workers, run_id=run_id, action_label="bulk-delete", per_action_pause=per_action_pause, resync_after=True, ) print(f"\n=== RESUMEN BULK-DELETE ===") print(f"Total: {len(items)} | Éxitos: {success} | Omitidos: {skipped} | Fallos: {failed}") for r in results: symbol = {"success": "OK", "skipped": "--", "failed": "XX"}.get(r["status"], "??") suffix = f" ({r.get('reason')})" if r.get("reason") else "" wf_short = (r.get("workflow_id") or "")[:8] loc_short = (r.get("location_id") or "")[:12] print(f" [{symbol}] {r.get('name')} ({wf_short}) en {loc_short}{suffix}") return failed == 0 def rename_via_playwright_dom(location_id, workflow_id, new_name, run_id=None): """Wrapper que envuelve la lógica real con auditoría granular en script_audit.""" change_id = None if run_id: try: old_name = _resolve_workflow_name(location_id, workflow_id) or workflow_id change_id = script_audit.record_change( run_id, location_id, "workflow", workflow_id, field_id="name", field_name="name", old_value=old_name, new_value=new_name, ) except Exception as ae: print(f"[WARN] No se pudo registrar change planned para rename: {ae}") try: ok = _rename_via_playwright_dom_impl(location_id, workflow_id, new_name, run_id) except Exception: if change_id: try: script_audit.mark_change(change_id, "failed", "excepcion_no_manejada") except Exception: pass raise if change_id: try: if ok: script_audit.mark_change(change_id, "applied") else: script_audit.mark_change(change_id, "failed", "rename_no_completado") except Exception: pass return ok def _rename_via_playwright_dom_impl(location_id, workflow_id, new_name, run_id=None): """Renombra un workflow usando automatización DOM en la UI de Bucéfalo.""" from playwright.sync_api import sync_playwright print(f"[INFO] Iniciando rename DOM para workflow {workflow_id} → '{new_name}'...") workflow_name = _resolve_workflow_name(location_id, workflow_id) if workflow_name: print(f"[INFO] Nombre actual (SQLite): '{workflow_name}'") _INTERRUPT_STATE.update({ "location_id": location_id, "workflow_id": workflow_id, "run_id": run_id, "target_active": None, "action": "rename", }) try: with sync_playwright() as p: browser, context, page = _open_browser(p) _INTERRUPT_STATE["browser"] = browser _INTERRUPT_STATE["context"] = context scope = _open_workflows_list(page, location_id) if scope is None: _save_debug_screenshot(page, "rename_list_failed") _close_and_save(browser, context) return False row = _find_workflow_row(scope, workflow_id, workflow_name) if row is None: print(f"[ERROR] No se encontró la fila del workflow {workflow_id}.") _save_debug_screenshot(page, "rename_no_row") _close_and_save(browser, context) return False try: row.hover() except Exception: pass if not _open_row_menu(scope, row, workflow_id): _save_debug_screenshot(page, "rename_no_menu_btn") _close_and_save(browser, context) return False print("[INFO] Buscando opción 'Renombrar flujo de trabajo' en el menú...") if not _click_menu_item(scope, _RENAME_MENU_TEXTS, timeout_sec=12): _save_debug_screenshot(page, "rename_no_menu_item") _close_and_save(browser, context) return False print("[INFO] Esperando modal de rename...") time.sleep(1.5) rename_input = None try: modal_inputs = scope.locator('div[role="dialog"] input, .modal input, .n-modal input').all() if modal_inputs: rename_input = modal_inputs[0] except Exception: pass if rename_input is None: print("[ERROR] No se encontró el input del modal de rename.") _save_debug_screenshot(page, "rename_no_input") _close_and_save(browser, context) return False rename_input.fill(new_name) time.sleep(0.5) confirm_clicked = False for txt in ("Guardar", "Save", "Renombrar", "Rename", "Actualizar", "Update"): try: btn = scope.locator(f'div[role="dialog"] button:has-text("{txt}"), .n-modal button:has-text("{txt}")').last if btn.count() > 0: btn.click() confirm_clicked = True break except Exception: continue if not confirm_clicked: try: dlg_buttons = scope.locator('div[role="dialog"] button, .n-modal button').all() if dlg_buttons: dlg_buttons[-1].click() confirm_clicked = True except Exception: pass if not confirm_clicked: print("[ERROR] No se pudo hacer click en el botón final del modal.") _save_debug_screenshot(page, "rename_no_confirm_btn") _close_and_save(browser, context) return False print("[INFO] Rename disparado. Esperando 6 s a que GHL procese...") time.sleep(6) print("[ÉXITO] Workflow renombrado.") _close_and_save(browser, context) return True except Exception as e: error_id = error_logging.log_error("playwright_dom_rename_failed", e, {"location_id": location_id, "workflow_id": workflow_id, "run_id": run_id, "action": "rename"}) print(f"ERROR en rename DOM: {str(e)} | error_id={error_id}") return False def _resync_local_workflows(location_id): """Re-sincroniza la tabla local de SQLite tras una mutación exitosa.""" try: import sync_engine tokens = sync_engine.get_tokens_map() token = tokens.get(location_id) if token: workflows = sync_engine.ghl_client.get_workflows(token, location_id) db.save_workflows(location_id, workflows) print(f"[INFO] SQLite actualizado. {len(workflows)} workflows.") except Exception as se: print(f"[ADVERTENCIA] No se pudo re-sincronizar SQLite local: {se}") def main(): _install_signal_handlers() parser = argparse.ArgumentParser(description="Gestor de workflows en Bucéfalo mediante automatización DOM con Playwright.") parser.add_argument("--action", required=True, choices=["toggle-status", "rename", "delete", "bulk-draft", "bulk-publish", "bulk-delete"]) parser.add_argument("--location", help="Location ID de la subcuenta (obligatorio para acciones individuales)") parser.add_argument("--workflow-id", help="ID del workflow (obligatorio para acciones individuales)") parser.add_argument("--new-name", help="Nuevo nombre (rename)") parser.add_argument("--current-status", help="Estado actual del workflow (toggle-status)") parser.add_argument("--run-id", help="ID de ejecución de auditoría") parser.add_argument("--batch-file", help="Ruta a archivo JSON con lista de items para acciones bulk-*") parser.add_argument("--workers", type=int, default=1, help="Workers paralelos para acciones bulk-* (1-5, default 1). Ignorado en perfil persistente.") parser.add_argument("--action-pause", type=float, default=3.0, dest="action_pause", help="Pausa (s) por worker entre items en bulks. Default 3.0.") args = parser.parse_args() # Defensa contra perfil persistente: lock del profile dir impide múltiples contexts. if PERSISTENT_PROFILE_DIR and args.workers > 1: print("[INFO] Perfil persistente activo → forzando --workers=1.") args.workers = 1 args.workers = max(1, min(int(args.workers or 1), 5)) # Validación de argumentos según la acción. is_bulk = args.action.startswith("bulk-") if is_bulk: if not args.batch_file: print("ERROR: --batch-file es obligatorio para acciones bulk-*.") sys.exit(1) else: if not args.location or not args.workflow_id: print("ERROR: --location y --workflow-id son obligatorios para acciones individuales.") sys.exit(1) if args.run_id: # Bootstrap idempotente: si el dashboard ya creó el run, INSERT OR IGNORE # respeta lo previo; si vino de CLI, esto crea la fila para que los # record_change posteriores no queden huérfanos. try: script_audit.create_run( args.run_id, script_name=os.path.basename(__file__), arguments=" ".join(sys.argv[1:]), locations=[args.location] if args.location else [], execution_mode="parallel" if args.workers > 1 else "sequential", ) except Exception as ae: print(f"[WARN] No se pudo crear/actualizar run en script_audit: {ae}") script_audit.update_run_status(args.run_id, "running") print("=== CONTROL DE SCRIPTS: ghl_browser_workflow_manager.py ===") print(f"Modo: Playwright Híbrido Avanzado | Acción: {args.action}") if is_bulk: print(f"Batch file: {args.batch_file}") else: print(f"Subcuenta: {args.location} | Workflow: {args.workflow_id}") if args.current_status: print(f"Estado Actual: {args.current_status}") print("----------------------------------------------------------------------") session_exists, session_age_hours = session_file_status() if not session_exists: print(f"ERROR: Falta el archivo de sesión: {SESSION_FILE}") print("Corre el script 'ghl_browser_session_generator.py' primero.") if args.run_id: script_audit.update_run_status(args.run_id, "failed", f"Falta el archivo de sesion: {SESSION_FILE}") sys.exit(1) if session_age_hours is not None: print(f"[INFO] Sesión Bucéfalo: {session_age_hours:.1f} h de antigüedad ({SESSION_FILE}).") if session_age_hours > SESSION_MAX_AGE_HOURS: print(f"[ADVERTENCIA] La sesión tiene más de {SESSION_MAX_AGE_HOURS} h — probablemente expiró.") print("[ADVERTENCIA] Si la acción falla, renueva con: python scripts/ghl_browser_session_generator.py") if not ensure_playwright_browsers(): print("ERROR: No hay binarios de Chromium disponibles para Playwright.") if args.run_id: script_audit.update_run_status(args.run_id, "failed", "Faltan binarios de Playwright (chromium)") sys.exit(1) # 0. Acciones bulk-*: leen lista del JSON y procesan todo con un solo browser. if args.action in ("bulk-draft", "bulk-publish", "bulk-delete"): import json as _json try: with open(args.batch_file, encoding="utf-8") as f: items = _json.load(f) except Exception as je: print(f"ERROR leyendo --batch-file: {je}") if args.run_id: script_audit.update_run_status(args.run_id, "failed", f"No se pudo leer batch-file: {je}") sys.exit(1) if not isinstance(items, list) or not items: print("ERROR: el batch-file debe contener una lista no vacía de workflows.") if args.run_id: script_audit.update_run_status(args.run_id, "failed", "batch-file vacío o inválido") sys.exit(1) if args.action == "bulk-delete": ok_all = bulk_delete_via_playwright_dom( items, run_id=args.run_id, workers=args.workers, per_action_pause=args.action_pause, ) else: target_active = (args.action == "bulk-publish") ok_all = bulk_toggle_via_playwright_dom( items, target_active=target_active, run_id=args.run_id, workers=args.workers, per_action_pause=args.action_pause, ) if args.run_id: script_audit.update_run_status(args.run_id, "success" if ok_all else "failed", None if ok_all else f"Algunos items de {args.action} fallaron") sys.exit(0 if ok_all else 1) # 1. Si es toggle-status, ejecutamos directamente la automatización DOM sobre la página del constructor del workflow. # Esto es 100% robusto y garantiza el guardado correcto a nivel de backend y UI en GHL. if args.action == "toggle-status": if not ensure_playwright_browsers(): print("ERROR: No hay binarios de Chromium disponibles para Playwright.") if args.run_id: script_audit.update_run_status(args.run_id, "failed", "Faltan binarios de Playwright (chromium)") sys.exit(1) success = toggle_via_playwright_dom(args.location, args.workflow_id, args.current_status, args.run_id) if success: # Sincronizar SQLite local inmediatamente print("[INFO] Re-sincronizando base de datos local SQLite...") try: import sync_engine tokens = sync_engine.get_tokens_map() token = tokens.get(args.location) if token: workflows = sync_engine.ghl_client.get_workflows(token, args.location) db.save_workflows(args.location, workflows) print(f"[INFO] SQLite actualizado. {len(workflows)} workflows disponibles.") except Exception as se: print(f"[ADVERTENCIA] No se pudo re-sincronizar SQLite local de forma automática: {se}") if args.run_id: script_audit.update_run_status(args.run_id, "success") sys.exit(0) else: print("[ERROR] Falló la automatización DOM para cambiar el estado.") if args.run_id: script_audit.update_run_status(args.run_id, "failed", "Falló la automatización DOM de cambio de estado") sys.exit(1) # 2. Para rename/delete usamos automatización DOM pura — el flujo de intercepción de tokens # dejó de funcionar porque GHL ya no expone los endpoints internos por esa vía. if args.action == "delete": ok = delete_via_playwright_dom(args.location, args.workflow_id, args.run_id) elif args.action == "rename": if not args.new_name: print("ERROR: --new-name es obligatorio para rename.") if args.run_id: script_audit.update_run_status(args.run_id, "failed", "Falta --new-name") sys.exit(1) ok = rename_via_playwright_dom(args.location, args.workflow_id, args.new_name, args.run_id) else: print(f"ERROR: acción desconocida '{args.action}'.") if args.run_id: script_audit.update_run_status(args.run_id, "failed", f"Acción desconocida: {args.action}") sys.exit(1) if ok: print("[INFO] Re-sincronizando SQLite local...") _resync_local_workflows(args.location) if args.run_id: script_audit.update_run_status(args.run_id, "success") sys.exit(0) else: if args.run_id: script_audit.update_run_status(args.run_id, "failed", f"Falló automatización DOM ({args.action})") sys.exit(1) if __name__ == "__main__": main()