Files
MP-Manager/scripts/audit_branch_verifier.py
2026-05-30 14:31:19 -06:00

260 lines
11 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Read-only audit for Monte Providencia branch verifier mappings."""
import argparse
import csv
import os
import re
import sqlite3
import sys
import unicodedata
from collections import Counter, defaultdict
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
from paths import DB_PATH # noqa: E402
VERIFIER_FILENAME = "Monte Providencia - Verificador de sucursales y correos - Sucursales.csv"
TOKENS_FILENAME = "Bucéfalo - Mesa de control - API Tokens - MP.csv"
MAIN_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
SC_RE = re.compile(r"^\d{4,5}\s*-\s*MP\s*-\s*.+$", re.IGNORECASE)
def load_csv(filename):
path = os.path.join(ROOT_DIR, filename)
if not os.path.exists(path):
raise FileNotFoundError(f"No existe el archivo requerido: {path}")
with open(path, mode="r", encoding="utf-8-sig", newline="") as fh:
return list(csv.DictReader(fh))
def clean(value):
return str(value or "").strip()
def norm_text(value):
value = unicodedata.normalize("NFKD", clean(value))
value = "".join(ch for ch in value if not unicodedata.combining(ch))
return " ".join(value.upper().split())
def norm_sc_name(value):
value = norm_text(value)
value = re.sub(r"^\d{4,5}\s*-\s*MP\s*-\s*", "", value)
return value.replace(".", "")
def is_empty_marker(value):
return clean(value) in {"", "-"}
def add_issue(issues, severity, code, message):
issues.append({"severity": severity, "code": code, "message": message})
def load_db_accounts():
db_path = DB_PATH
if not os.path.exists(db_path):
return None
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
return {row["location_id"]: dict(row) for row in conn.execute("SELECT location_id, nombre, type FROM accounts")}
finally:
conn.close()
def audit_verifier_rows(rows, issues):
location_ids = [clean(row.get("ID LOCATION BUCEFALO")) for row in rows if clean(row.get("ID LOCATION BUCEFALO"))]
tiendas = [norm_text(row.get("TIENDA")) for row in rows if norm_text(row.get("TIENDA"))]
for loc_id, count in sorted(Counter(location_ids).items()):
if count > 1 and loc_id != MAIN_LOCATION_ID:
tienda_list = [clean(row.get("TIENDA")) for row in rows if clean(row.get("ID LOCATION BUCEFALO")) == loc_id]
add_issue(
issues,
"WARN",
"VERIFIER_DUP_LOCATION",
f"Location duplicada en verificador: {loc_id} aparece {count} veces ({', '.join(tienda_list)}).",
)
for tienda, count in sorted(Counter(tiendas).items()):
if count > 1:
matching = [row for row in rows if norm_text(row.get("TIENDA")) == tienda]
locs = sorted({clean(row.get("ID LOCATION BUCEFALO")) or "(sin location)" for row in matching})
types = sorted({clean(row.get("TIPO DE TIENDA")) or "(sin tipo)" for row in matching})
severity = "WARN" if "NO DIGITAL" in types else "INFO"
add_issue(
issues,
severity,
"VERIFIER_DUP_TIENDA",
f"TIENDA duplicada: {tienda} aparece {count} veces; locations={', '.join(locs)}; tipos={', '.join(types)}.",
)
for index, row in enumerate(rows, start=2):
tienda = clean(row.get("TIENDA"))
loc_id = clean(row.get("ID LOCATION BUCEFALO"))
sc_name = clean(row.get("SC BUCEFALO"))
row_label = f"fila {index} / TIENDA={tienda or '(vacia)'}"
if not tienda:
add_issue(issues, "ERROR", "VERIFIER_MISSING_TIENDA", f"{row_label}: falta TIENDA.")
if not loc_id:
add_issue(issues, "ERROR", "VERIFIER_MISSING_LOCATION", f"{row_label}: falta ID LOCATION BUCEFALO.")
if not sc_name:
add_issue(issues, "ERROR", "VERIFIER_MISSING_SC", f"{row_label}: falta SC BUCEFALO.")
elif loc_id != MAIN_LOCATION_ID and not SC_RE.match(sc_name):
add_issue(issues, "WARN", "VERIFIER_BAD_SC_FORMAT", f"{row_label}: SC BUCEFALO no sigue formato #### - MP - TIENDA: {sc_name}.")
if loc_id != MAIN_LOCATION_ID and tienda and sc_name and norm_text(tienda) not in norm_sc_name(sc_name):
add_issue(issues, "INFO", "VERIFIER_SC_TIENDA_MISMATCH", f"{row_label}: TIENDA no aparece claramente en SC BUCEFALO ({sc_name}).")
for field in ("CORREO TIENDA", "CORREO DM", "CORREO RDO"):
email = clean(row.get(field))
if is_empty_marker(email):
if loc_id != MAIN_LOCATION_ID:
add_issue(issues, "WARN", "VERIFIER_EMPTY_EMAIL", f"{row_label}: {field} vacio.")
elif not EMAIL_RE.match(email):
add_issue(issues, "WARN", "VERIFIER_BAD_EMAIL", f"{row_label}: {field} no parece email valido: {email}.")
def audit_tokens_rows(rows, issues):
location_ids = [clean(row.get("Location_ID")) for row in rows if clean(row.get("Location_ID"))]
for loc_id, count in sorted(Counter(location_ids).items()):
if count > 1:
names = [clean(row.get("Nombre")) for row in rows if clean(row.get("Location_ID")) == loc_id]
add_issue(issues, "WARN", "TOKENS_DUP_LOCATION", f"Location duplicada en CSV de tokens: {loc_id} aparece {count} veces ({', '.join(names)}).")
for index, row in enumerate(rows, start=2):
loc_id = clean(row.get("Location_ID"))
name = clean(row.get("Nombre"))
token = clean(row.get("API_token"))
row_label = f"fila {index} / Nombre={name or '(vacio)'}"
if not loc_id and not token:
add_issue(issues, "INFO", "TOKENS_SKIPPED_ADMIN_ROW", f"{row_label}: fila sin Location_ID ni API_token; se interpreta como administrativa/no sincronizable.")
continue
if not loc_id:
add_issue(issues, "WARN", "TOKENS_MISSING_LOCATION", f"{row_label}: tiene API_token pero falta Location_ID; la sync actual la omite.")
if not name:
add_issue(issues, "WARN", "TOKENS_MISSING_NAME", f"{row_label}: falta Nombre.")
if not token:
add_issue(issues, "WARN", "TOKENS_MISSING_TOKEN", f"{row_label}: falta API_token; la sync actual la omite.")
def audit_cross_sources(verifier_rows, token_rows, db_accounts, issues):
verifier_by_id = defaultdict(list)
for row in verifier_rows:
loc_id = clean(row.get("ID LOCATION BUCEFALO"))
if loc_id:
verifier_by_id[loc_id].append(row)
token_by_id = defaultdict(list)
for row in token_rows:
loc_id = clean(row.get("Location_ID"))
if loc_id:
token_by_id[loc_id].append(row)
verifier_ids = set(verifier_by_id)
token_ids = set(token_by_id)
for loc_id in sorted(token_ids - verifier_ids):
names = ", ".join(clean(row.get("Nombre")) for row in token_by_id[loc_id])
severity = "INFO" if "DEMO" in norm_text(names) else "WARN"
add_issue(issues, severity, "CROSS_TOKEN_NOT_VERIFIER", f"Location en tokens pero no en verificador: {loc_id} ({names}).")
for loc_id in sorted(verifier_ids - token_ids):
tiendas = ", ".join(clean(row.get("TIENDA")) for row in verifier_by_id[loc_id])
add_issue(issues, "ERROR", "CROSS_VERIFIER_NOT_TOKENS", f"Location en verificador pero no en tokens: {loc_id} ({tiendas}).")
for loc_id in sorted(verifier_ids & token_ids):
verifier_names = {norm_sc_name(row.get("SC BUCEFALO")) for row in verifier_by_id[loc_id] if clean(row.get("SC BUCEFALO"))}
token_names = {norm_sc_name(row.get("Nombre")) for row in token_by_id[loc_id] if clean(row.get("Nombre"))}
if verifier_names and token_names and verifier_names.isdisjoint(token_names):
add_issue(
issues,
"INFO",
"CROSS_NAME_MISMATCH",
f"Nombre distinto entre verificador y tokens para {loc_id}: verificador={sorted(verifier_names)}; tokens={sorted(token_names)}.",
)
if db_accounts is None:
add_issue(issues, "INFO", "DB_NOT_FOUND", "SQLite no existe; se omite cruce contra accounts.")
return
db_ids = set(db_accounts)
for loc_id in sorted(token_ids - db_ids):
names = ", ".join(clean(row.get("Nombre")) for row in token_by_id[loc_id])
add_issue(issues, "WARN", "DB_TOKEN_NOT_SYNCED", f"Location en tokens pero no en SQLite accounts: {loc_id} ({names}).")
for loc_id in sorted(db_ids - token_ids):
account = db_accounts[loc_id]
add_issue(issues, "WARN", "DB_ACCOUNT_NOT_TOKENS", f"Location en SQLite accounts pero no en tokens: {loc_id} ({account.get('nombre')}).")
def print_summary(verifier_rows, token_rows, db_accounts, issues, show_details):
counts = Counter(issue["severity"] for issue in issues)
print("=== AUDITORIA VERIFICADOR DE SUCURSALES MP ===")
print("Modo: solo lectura; no modifica CSV, SQLite ni GHL.")
print("Tokens: no se imprimen ni se validan contra GHL.")
print("-" * 70)
print(f"Filas verificador: {len(verifier_rows)}")
print(f"Filas CSV tokens: {len(token_rows)}")
print(f"Accounts SQLite: {len(db_accounts) if db_accounts is not None else 'no disponible'}")
print("-" * 70)
print(f"ERROR: {counts.get('ERROR', 0)} | WARN: {counts.get('WARN', 0)} | INFO: {counts.get('INFO', 0)}")
if not issues:
print("Sin hallazgos.")
return
print("-" * 70)
grouped = defaultdict(list)
for issue in issues:
grouped[issue["severity"]].append(issue)
for severity in ("ERROR", "WARN", "INFO"):
items = grouped.get(severity, [])
if not items:
continue
print(f"\n[{severity}] {len(items)} hallazgos")
limit = len(items) if show_details else min(len(items), 30)
for issue in items[:limit]:
print(f"- {issue['code']}: {issue['message']}")
if limit < len(items):
print(f"- ... {len(items) - limit} hallazgos adicionales. Usa --details para ver todos.")
def main():
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
parser = argparse.ArgumentParser(description="Audita el verificador de sucursales MP contra CSV de tokens y SQLite.")
parser.add_argument("--details", action="store_true", help="Muestra todos los hallazgos; por defecto limita cada severidad a 30.")
args = parser.parse_args()
try:
verifier_rows = load_csv(VERIFIER_FILENAME)
token_rows = load_csv(TOKENS_FILENAME)
db_accounts = load_db_accounts()
except Exception as exc:
print(f"ERROR: {exc}")
return 1
issues = []
audit_verifier_rows(verifier_rows, issues)
audit_tokens_rows(token_rows, issues)
audit_cross_sources(verifier_rows, token_rows, db_accounts, issues)
issues.sort(key=lambda item: ({"ERROR": 0, "WARN": 1, "INFO": 2}.get(item["severity"], 9), item["code"], item["message"]))
print_summary(verifier_rows, token_rows, db_accounts, issues, args.details)
return 1 if any(issue["severity"] == "ERROR" for issue in issues) else 0
if __name__ == "__main__":
sys.exit(main())