#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Read-only audit for Monte Providencia branch verifier mappings.""" import argparse import csv import os import re import sqlite3 import sys import unicodedata from collections import Counter, defaultdict ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from paths import DB_PATH # noqa: E402 VERIFIER_FILENAME = "Monte Providencia - Verificador de sucursales y correos - Sucursales.csv" TOKENS_FILENAME = "Bucéfalo - Mesa de control - API Tokens - MP.csv" MAIN_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3" EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") SC_RE = re.compile(r"^\d{4,5}\s*-\s*MP\s*-\s*.+$", re.IGNORECASE) def load_csv(filename): path = os.path.join(ROOT_DIR, filename) if not os.path.exists(path): raise FileNotFoundError(f"No existe el archivo requerido: {path}") with open(path, mode="r", encoding="utf-8-sig", newline="") as fh: return list(csv.DictReader(fh)) def clean(value): return str(value or "").strip() def norm_text(value): value = unicodedata.normalize("NFKD", clean(value)) value = "".join(ch for ch in value if not unicodedata.combining(ch)) return " ".join(value.upper().split()) def norm_sc_name(value): value = norm_text(value) value = re.sub(r"^\d{4,5}\s*-\s*MP\s*-\s*", "", value) return value.replace(".", "") def is_empty_marker(value): return clean(value) in {"", "-"} def add_issue(issues, severity, code, message): issues.append({"severity": severity, "code": code, "message": message}) def load_db_accounts(): db_path = DB_PATH if not os.path.exists(db_path): return None conn = sqlite3.connect(db_path) conn.row_factory = sqlite3.Row try: return {row["location_id"]: dict(row) for row in conn.execute("SELECT location_id, nombre, type FROM accounts")} finally: conn.close() def audit_verifier_rows(rows, issues): location_ids = [clean(row.get("ID LOCATION BUCEFALO")) for row in rows if clean(row.get("ID LOCATION BUCEFALO"))] tiendas = [norm_text(row.get("TIENDA")) for row in rows if norm_text(row.get("TIENDA"))] for loc_id, count in sorted(Counter(location_ids).items()): if count > 1 and loc_id != MAIN_LOCATION_ID: tienda_list = [clean(row.get("TIENDA")) for row in rows if clean(row.get("ID LOCATION BUCEFALO")) == loc_id] add_issue( issues, "WARN", "VERIFIER_DUP_LOCATION", f"Location duplicada en verificador: {loc_id} aparece {count} veces ({', '.join(tienda_list)}).", ) for tienda, count in sorted(Counter(tiendas).items()): if count > 1: matching = [row for row in rows if norm_text(row.get("TIENDA")) == tienda] locs = sorted({clean(row.get("ID LOCATION BUCEFALO")) or "(sin location)" for row in matching}) types = sorted({clean(row.get("TIPO DE TIENDA")) or "(sin tipo)" for row in matching}) severity = "WARN" if "NO DIGITAL" in types else "INFO" add_issue( issues, severity, "VERIFIER_DUP_TIENDA", f"TIENDA duplicada: {tienda} aparece {count} veces; locations={', '.join(locs)}; tipos={', '.join(types)}.", ) for index, row in enumerate(rows, start=2): tienda = clean(row.get("TIENDA")) loc_id = clean(row.get("ID LOCATION BUCEFALO")) sc_name = clean(row.get("SC BUCEFALO")) row_label = f"fila {index} / TIENDA={tienda or '(vacia)'}" if not tienda: add_issue(issues, "ERROR", "VERIFIER_MISSING_TIENDA", f"{row_label}: falta TIENDA.") if not loc_id: add_issue(issues, "ERROR", "VERIFIER_MISSING_LOCATION", f"{row_label}: falta ID LOCATION BUCEFALO.") if not sc_name: add_issue(issues, "ERROR", "VERIFIER_MISSING_SC", f"{row_label}: falta SC BUCEFALO.") elif loc_id != MAIN_LOCATION_ID and not SC_RE.match(sc_name): add_issue(issues, "WARN", "VERIFIER_BAD_SC_FORMAT", f"{row_label}: SC BUCEFALO no sigue formato #### - MP - TIENDA: {sc_name}.") if loc_id != MAIN_LOCATION_ID and tienda and sc_name and norm_text(tienda) not in norm_sc_name(sc_name): add_issue(issues, "INFO", "VERIFIER_SC_TIENDA_MISMATCH", f"{row_label}: TIENDA no aparece claramente en SC BUCEFALO ({sc_name}).") for field in ("CORREO TIENDA", "CORREO DM", "CORREO RDO"): email = clean(row.get(field)) if is_empty_marker(email): if loc_id != MAIN_LOCATION_ID: add_issue(issues, "WARN", "VERIFIER_EMPTY_EMAIL", f"{row_label}: {field} vacio.") elif not EMAIL_RE.match(email): add_issue(issues, "WARN", "VERIFIER_BAD_EMAIL", f"{row_label}: {field} no parece email valido: {email}.") def audit_tokens_rows(rows, issues): location_ids = [clean(row.get("Location_ID")) for row in rows if clean(row.get("Location_ID"))] for loc_id, count in sorted(Counter(location_ids).items()): if count > 1: names = [clean(row.get("Nombre")) for row in rows if clean(row.get("Location_ID")) == loc_id] add_issue(issues, "WARN", "TOKENS_DUP_LOCATION", f"Location duplicada en CSV de tokens: {loc_id} aparece {count} veces ({', '.join(names)}).") for index, row in enumerate(rows, start=2): loc_id = clean(row.get("Location_ID")) name = clean(row.get("Nombre")) token = clean(row.get("API_token")) row_label = f"fila {index} / Nombre={name or '(vacio)'}" if not loc_id and not token: add_issue(issues, "INFO", "TOKENS_SKIPPED_ADMIN_ROW", f"{row_label}: fila sin Location_ID ni API_token; se interpreta como administrativa/no sincronizable.") continue if not loc_id: add_issue(issues, "WARN", "TOKENS_MISSING_LOCATION", f"{row_label}: tiene API_token pero falta Location_ID; la sync actual la omite.") if not name: add_issue(issues, "WARN", "TOKENS_MISSING_NAME", f"{row_label}: falta Nombre.") if not token: add_issue(issues, "WARN", "TOKENS_MISSING_TOKEN", f"{row_label}: falta API_token; la sync actual la omite.") def audit_cross_sources(verifier_rows, token_rows, db_accounts, issues): verifier_by_id = defaultdict(list) for row in verifier_rows: loc_id = clean(row.get("ID LOCATION BUCEFALO")) if loc_id: verifier_by_id[loc_id].append(row) token_by_id = defaultdict(list) for row in token_rows: loc_id = clean(row.get("Location_ID")) if loc_id: token_by_id[loc_id].append(row) verifier_ids = set(verifier_by_id) token_ids = set(token_by_id) for loc_id in sorted(token_ids - verifier_ids): names = ", ".join(clean(row.get("Nombre")) for row in token_by_id[loc_id]) severity = "INFO" if "DEMO" in norm_text(names) else "WARN" add_issue(issues, severity, "CROSS_TOKEN_NOT_VERIFIER", f"Location en tokens pero no en verificador: {loc_id} ({names}).") for loc_id in sorted(verifier_ids - token_ids): tiendas = ", ".join(clean(row.get("TIENDA")) for row in verifier_by_id[loc_id]) add_issue(issues, "ERROR", "CROSS_VERIFIER_NOT_TOKENS", f"Location en verificador pero no en tokens: {loc_id} ({tiendas}).") for loc_id in sorted(verifier_ids & token_ids): verifier_names = {norm_sc_name(row.get("SC BUCEFALO")) for row in verifier_by_id[loc_id] if clean(row.get("SC BUCEFALO"))} token_names = {norm_sc_name(row.get("Nombre")) for row in token_by_id[loc_id] if clean(row.get("Nombre"))} if verifier_names and token_names and verifier_names.isdisjoint(token_names): add_issue( issues, "INFO", "CROSS_NAME_MISMATCH", f"Nombre distinto entre verificador y tokens para {loc_id}: verificador={sorted(verifier_names)}; tokens={sorted(token_names)}.", ) if db_accounts is None: add_issue(issues, "INFO", "DB_NOT_FOUND", "SQLite no existe; se omite cruce contra accounts.") return db_ids = set(db_accounts) for loc_id in sorted(token_ids - db_ids): names = ", ".join(clean(row.get("Nombre")) for row in token_by_id[loc_id]) add_issue(issues, "WARN", "DB_TOKEN_NOT_SYNCED", f"Location en tokens pero no en SQLite accounts: {loc_id} ({names}).") for loc_id in sorted(db_ids - token_ids): account = db_accounts[loc_id] add_issue(issues, "WARN", "DB_ACCOUNT_NOT_TOKENS", f"Location en SQLite accounts pero no en tokens: {loc_id} ({account.get('nombre')}).") def print_summary(verifier_rows, token_rows, db_accounts, issues, show_details): counts = Counter(issue["severity"] for issue in issues) print("=== AUDITORIA VERIFICADOR DE SUCURSALES MP ===") print("Modo: solo lectura; no modifica CSV, SQLite ni GHL.") print("Tokens: no se imprimen ni se validan contra GHL.") print("-" * 70) print(f"Filas verificador: {len(verifier_rows)}") print(f"Filas CSV tokens: {len(token_rows)}") print(f"Accounts SQLite: {len(db_accounts) if db_accounts is not None else 'no disponible'}") print("-" * 70) print(f"ERROR: {counts.get('ERROR', 0)} | WARN: {counts.get('WARN', 0)} | INFO: {counts.get('INFO', 0)}") if not issues: print("Sin hallazgos.") return print("-" * 70) grouped = defaultdict(list) for issue in issues: grouped[issue["severity"]].append(issue) for severity in ("ERROR", "WARN", "INFO"): items = grouped.get(severity, []) if not items: continue print(f"\n[{severity}] {len(items)} hallazgos") limit = len(items) if show_details else min(len(items), 30) for issue in items[:limit]: print(f"- {issue['code']}: {issue['message']}") if limit < len(items): print(f"- ... {len(items) - limit} hallazgos adicionales. Usa --details para ver todos.") def main(): if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") parser = argparse.ArgumentParser(description="Audita el verificador de sucursales MP contra CSV de tokens y SQLite.") parser.add_argument("--details", action="store_true", help="Muestra todos los hallazgos; por defecto limita cada severidad a 30.") args = parser.parse_args() try: verifier_rows = load_csv(VERIFIER_FILENAME) token_rows = load_csv(TOKENS_FILENAME) db_accounts = load_db_accounts() except Exception as exc: print(f"ERROR: {exc}") return 1 issues = [] audit_verifier_rows(verifier_rows, issues) audit_tokens_rows(token_rows, issues) audit_cross_sources(verifier_rows, token_rows, db_accounts, issues) issues.sort(key=lambda item: ({"ERROR": 0, "WARN": 1, "INFO": 2}.get(item["severity"], 9), item["code"], item["message"])) print_summary(verifier_rows, token_rows, db_accounts, issues, args.details) return 1 if any(issue["severity"] == "ERROR" for issue in issues) else 0 if __name__ == "__main__": sys.exit(main())