99 lines
4.0 KiB
Python
99 lines
4.0 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Chequeo agendado (diario) del origen de sucursal.
|
|
|
|
Corre `fix_branch_user_origin.py` en DRY-RUN (no escribe nada en el CRM), guarda
|
|
el log fechado y, si detecta contactos/opps pendientes de corregir, deja una
|
|
alerta en generated/runtime/origen_check_alert.json para que el owner los aplique
|
|
manualmente desde el dashboard (respeta el protocolo dry-run -> confirmación).
|
|
|
|
Cubre lo que el workflow n8n [2004] NO puede en tiempo real:
|
|
- Canal de Origen de la Oportunidad en opps nuevas (las crea el workflow GHL).
|
|
- Sucursales ausentes del Verificador Baserow 750 (p.ej. Eugenia renombrada),
|
|
donde el [2004] corta el flujo antes de escribir.
|
|
|
|
Uso:
|
|
python scripts/scheduled_origen_check.py # dry-run de TODAS las sucursales
|
|
python scripts/scheduled_origen_check.py --location <id> # prueba en una sucursal
|
|
"""
|
|
|
|
import datetime
|
|
import json
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
|
|
from paths import LOGS_DIR, RUNTIME_DIR # noqa: E402
|
|
|
|
TARGET_SCRIPT = os.path.join(ROOT_DIR, "scripts", "fix_branch_user_origin.py")
|
|
ALERT_PATH = os.path.join(RUNTIME_DIR, "origen_check_alert.json")
|
|
SUMMARY_RE = re.compile(
|
|
r"creados por usuario=(\d+).*?contactos corregidos=(\d+),\s*opps corregidas=(\d+),"
|
|
r"\s*locations omitidas=(\d+),\s*errores=(\d+)"
|
|
)
|
|
|
|
|
|
def main():
|
|
passthrough = sys.argv[1:] or ["--all"] # default: todas las sucursales
|
|
os.makedirs(LOGS_DIR, exist_ok=True)
|
|
os.makedirs(RUNTIME_DIR, exist_ok=True)
|
|
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
log_path = os.path.join(LOGS_DIR, f"origen_check_{ts}.log")
|
|
|
|
# DRY-RUN explícito: NUNCA pasamos --apply.
|
|
cmd = [sys.executable, TARGET_SCRIPT] + [a for a in passthrough if a != "--apply"]
|
|
proc = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", errors="replace")
|
|
out = (proc.stdout or "") + (("\n[STDERR]\n" + proc.stderr) if proc.stderr else "")
|
|
with open(log_path, "w", encoding="utf-8") as fh:
|
|
fh.write(out)
|
|
|
|
m = SUMMARY_RE.search(out)
|
|
if not m:
|
|
# No se pudo parsear el resumen: deja alerta de fallo para revisar el log.
|
|
alert = {
|
|
"timestamp": ts,
|
|
"status": "parse_error",
|
|
"log": log_path,
|
|
"return_code": proc.returncode,
|
|
"mensaje": "No se pudo leer el RESUMEN del dry-run; revisar el log.",
|
|
}
|
|
with open(ALERT_PATH, "w", encoding="utf-8") as fh:
|
|
json.dump(alert, fh, ensure_ascii=False, indent=2)
|
|
print(f"[origen-check] sin resumen parseable (rc={proc.returncode}). Log: {log_path}")
|
|
raise SystemExit(1)
|
|
|
|
detectados, contactos, opps, omitidas, errores = (int(g) for g in m.groups())
|
|
pendientes = contactos + opps
|
|
print(f"[origen-check] {ts} dry-run: creados_por_usuario={detectados} "
|
|
f"contactos_pendientes={contactos} opps_pendientes={opps} errores={errores}")
|
|
print(f"[origen-check] log: {log_path}")
|
|
|
|
if pendientes > 0 or errores > 0:
|
|
alert = {
|
|
"timestamp": ts,
|
|
"status": "pendientes" if pendientes > 0 else "errores",
|
|
"contactos_pendientes": contactos,
|
|
"opps_pendientes": opps,
|
|
"errores": errores,
|
|
"log": log_path,
|
|
"aplicar_con": "python scripts/fix_branch_user_origin.py --all --apply --run-id <uuid>",
|
|
"nota": "Revisa el log y aplica desde el dashboard tras confirmar (protocolo dry-run).",
|
|
}
|
|
with open(ALERT_PATH, "w", encoding="utf-8") as fh:
|
|
json.dump(alert, fh, ensure_ascii=False, indent=2)
|
|
print(f"[origen-check] ALERTA: {pendientes} pendientes -> {ALERT_PATH}")
|
|
else:
|
|
# Sin pendientes: limpiar alerta previa si existe.
|
|
if os.path.exists(ALERT_PATH):
|
|
os.remove(ALERT_PATH)
|
|
print("[origen-check] sin pendientes. Todo al día.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|