#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Auditoría READ-ONLY del Verificador (Baserow tabla 750) y la Mesa de control (tabla 749) contra la lista OFICIAL de cuentas (n8n/cuentas_oficiales.csv). El workflow n8n [2004] resuelve la sucursal encadenando: webhook location.name -> 749.Nombre (7235) -> 750."SC BUCEFALO" (7247) Para que el match funcione, AMBOS (749.Nombre y 750.SC BUCEFALO) deben ser idénticos al nombre oficial de la cuenta (lo que GHL manda en location.name). Fuente de verdad: - Nombre canónico: `n8n/cuentas_oficiales.csv` (name <-> location_id). - SUCURSAL / TIENDA: Verificador CSV local (load_verifier_map), donde exista. Reporta (cruce por location_id, excluye Marca y DEMO): - fix_749 : 749.Nombre != oficial (rompe el 1er match) - mismatch_750 : 750."SC BUCEFALO" != oficial (rompe el 2do match) - sucursal_vacia / tienda_vacia : con subclase csv_tiene (automatizable) o sin_fuente (Erandi) - ausente_750 : oficial sin fila en 750 - sin_749 : oficial sin fila en 749 - filas_no_oficiales : filas de 750 cuyo location_id no está en la lista oficial (no se tocan) Uso: python scripts/audit_baserow_verificador.py """ import csv import json import os import sys ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from paths import REPORTS_DIR # noqa: E402 from baserow_client import BaserowClient # noqa: E402 from fill_sucursal_tienda_from_location import load_verifier_map # noqa: E402 TABLE_CUENTAS = 749 TABLE_VERIFICADOR = 750 BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3" OFICIAL_CSV = os.path.join(ROOT_DIR, "n8n", "cuentas_oficiales.csv") F_749_NOMBRE = "Nombre" F_749_LOC = "Location_ID" F_SC_BUCEFALO = "SC BUCEFALO" F_SUCURSAL = "SUCURSAL" F_TIENDA = "TIENDA" F_ID_LOC = "ID LOCATION BUCEFALO" def clean(v): return str(v or "").strip() def load_official_accounts(path=None): """{location_id: name} de la lista oficial; excluye Marca y cuentas DEMO.""" path = path or OFICIAL_CSV out = {} with open(path, encoding="utf-8-sig", newline="") as fh: for r in csv.DictReader(fh): loc = clean(r.get("location_id")) name = clean(r.get("name")) if not loc or loc == BRAND_LOCATION_ID or "demo" in name.lower(): continue out[loc] = name return out def audit(client=None, oficial=None, verifier_map=None): """Devuelve el dict de discrepancias. Reutilizable por el corrector.""" client = client or BaserowClient.from_credentials() oficial = oficial if oficial is not None else load_official_accounts() verifier_map = verifier_map if verifier_map is not None else load_verifier_map() rows749 = client.list_rows(TABLE_CUENTAS) rows750 = client.list_rows(TABLE_VERIFICADOR) n749 = {clean(r.get(F_749_LOC)): r for r in rows749 if clean(r.get(F_749_LOC))} n750 = {clean(r.get(F_ID_LOC)): r for r in rows750 if clean(r.get(F_ID_LOC))} res = { "totals": {"oficiales": len(oficial), "filas_749": len(rows749), "filas_750": len(rows750)}, "fix_749": [], "mismatch_750": [], "sucursal_vacia": [], "tienda_vacia": [], "ausente_750": [], "sin_749": [], "filas_no_oficiales": [], } for loc, name in oficial.items(): row749 = n749.get(loc) if not row749: res["sin_749"].append({"location_id": loc, "oficial": name}) elif clean(row749.get(F_749_NOMBRE)) != name: res["fix_749"].append({"location_id": loc, "row_id": row749.get("id"), "actual": clean(row749.get(F_749_NOMBRE)), "oficial": name}) row750 = n750.get(loc) csv_entry = verifier_map.get(loc) or {} csv_suc = clean(csv_entry.get("sucursal")) csv_tie = clean(csv_entry.get("tienda")) if not row750: res["ausente_750"].append({"location_id": loc, "oficial": name, "csv_sucursal": csv_suc, "csv_tienda": csv_tie}) continue if clean(row750.get(F_SC_BUCEFALO)) != name: res["mismatch_750"].append({"location_id": loc, "row_id": row750.get("id"), "actual": clean(row750.get(F_SC_BUCEFALO)), "oficial": name}) if not clean(row750.get(F_SUCURSAL)): res["sucursal_vacia"].append({"location_id": loc, "row_id": row750.get("id"), "oficial": name, "csv_sucursal": csv_suc, "fuente": "csv_tiene" if csv_suc else "sin_fuente"}) if not clean(row750.get(F_TIENDA)): res["tienda_vacia"].append({"location_id": loc, "row_id": row750.get("id"), "oficial": name, "csv_tienda": csv_tie, "fuente": "csv_tiene" if csv_tie else "sin_fuente"}) for loc, row in n750.items(): if loc != BRAND_LOCATION_ID and loc not in oficial: res["filas_no_oficiales"].append({"location_id": loc, "row_id": row.get("id"), "sc_bucefalo": clean(row.get(F_SC_BUCEFALO))}) return res def _sec(title, items, fmt): print(f"\n=== {title}: {len(items)} ===") for it in items: print(" " + fmt(it)) def main(): if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") res = audit() print("=" * 72) print("AUDITORÍA BASEROW (749 + 750) vs lista OFICIAL de cuentas") print("=" * 72) t = res["totals"] print(f"Oficiales={t['oficiales']} | filas 749={t['filas_749']} | filas 750={t['filas_750']}") _sec("749.Nombre != oficial (corregir 749) [rompe 1er match]", res["fix_749"], lambda x: f"{x['actual']!r} -> {x['oficial']!r} [{x['location_id']}]") _sec("750.SC_BUCEFALO != oficial (corregir 750) [rompe 2do match]", res["mismatch_750"], lambda x: f"{x['actual']!r} -> {x['oficial']!r} (row {x['row_id']})") _sec("SUCURSAL vacía", res["sucursal_vacia"], lambda x: f"{x['oficial']!r} fuente={x['fuente']} csv={x['csv_sucursal']!r}") _sec("TIENDA vacía", res["tienda_vacia"], lambda x: f"{x['oficial']!r} fuente={x['fuente']} csv={x['csv_tienda']!r}") _sec("Oficiales ausentes en 750 (crear)", res["ausente_750"], lambda x: f"{x['oficial']!r} csv_suc={x['csv_sucursal']!r} csv_tie={x['csv_tienda']!r} [{x['location_id']}]") _sec("Oficiales sin fila en 749", res["sin_749"], lambda x: f"{x['oficial']!r} [{x['location_id']}]") _sec("Filas 750 NO oficiales (se ignoran)", res["filas_no_oficiales"], lambda x: f"{x['sc_bucefalo']!r} [{x['location_id']}]") os.makedirs(REPORTS_DIR, exist_ok=True) out = os.path.join(REPORTS_DIR, "audit_baserow_verificador.json") with open(out, "w", encoding="utf-8") as fh: json.dump(res, fh, ensure_ascii=False, indent=2) print(f"\nReporte JSON -> {out}") if __name__ == "__main__": main()