686 lines
27 KiB
Python
686 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""find_cross_branch_duplicates.py
|
|
|
|
Detecta contactos que aparecen en 2 o mas sucursales distintas (anomalia
|
|
cross-branch). El criterio primario es coincidencia por telefono normalizado;
|
|
el secundario es coincidencia por email normalizado. Despues de armar los
|
|
grupos sospechosos, hace doble check contra la cuenta de Marca principal para
|
|
indicar cuantas copias del mismo contacto existen alli.
|
|
|
|
Lectura 100% read-only desde mp_manager.sqlite. Requiere una sincronizacion
|
|
previa desde el dashboard.
|
|
|
|
Uso:
|
|
python scripts/find_cross_branch_duplicates.py
|
|
python scripts/find_cross_branch_duplicates.py --xlsx duplicados.xlsx
|
|
python scripts/find_cross_branch_duplicates.py --json duplicados.json --top 50
|
|
python scripts/find_cross_branch_duplicates.py --match phone
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
from collections import defaultdict
|
|
from datetime import datetime
|
|
|
|
from openpyxl import Workbook
|
|
from openpyxl.styles import Font, PatternFill
|
|
from openpyxl.utils import get_column_letter
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
|
|
from common import ( # noqa: E402
|
|
DB_PATH,
|
|
REPORT_DUPLICADOS,
|
|
match_contacts,
|
|
normalize_email,
|
|
normalize_phone,
|
|
)
|
|
|
|
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
|
|
MATCH_THRESHOLD = 0.80
|
|
|
|
|
|
def resolve_export_path(user_path, default_basename, extension):
|
|
"""Si el usuario pasa solo un nombre, lo guarda en REPORT_DUPLICADOS con timestamp.
|
|
Si pasa una ruta absoluta o relativa con carpeta, respeta esa ruta."""
|
|
if user_path and (os.path.isabs(user_path) or os.sep in user_path or "/" in user_path):
|
|
return user_path, False
|
|
os.makedirs(REPORT_DUPLICADOS, exist_ok=True)
|
|
ts = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
if user_path:
|
|
base, ext = os.path.splitext(os.path.basename(user_path))
|
|
ext = ext or extension
|
|
filename = f"{base}_{ts}{ext}"
|
|
else:
|
|
filename = f"{default_basename}_{ts}{extension}"
|
|
return os.path.join(REPORT_DUPLICADOS, filename), True
|
|
|
|
|
|
def safe_print(*args, **kwargs):
|
|
sep = kwargs.get("sep", " ")
|
|
end = kwargs.get("end", "\n")
|
|
text = sep.join(str(a) for a in args)
|
|
encoding = sys.stdout.encoding or "utf-8"
|
|
try:
|
|
sys.stdout.write(text + end)
|
|
sys.stdout.flush()
|
|
except UnicodeEncodeError:
|
|
sys.stdout.write(text.encode(encoding, errors="replace").decode(encoding) + end)
|
|
sys.stdout.flush()
|
|
|
|
|
|
def display_name(row):
|
|
first = (row["first_name"] or "").strip()
|
|
last = (row["last_name"] or "").strip()
|
|
full = (first + " " + last).strip()
|
|
return full or "(sin nombre)"
|
|
|
|
|
|
class UnionFind:
|
|
def __init__(self):
|
|
self.parent = {}
|
|
|
|
def find(self, key):
|
|
self.parent.setdefault(key, key)
|
|
while self.parent[key] != key:
|
|
self.parent[key] = self.parent[self.parent[key]]
|
|
key = self.parent[key]
|
|
return key
|
|
|
|
def union(self, a, b):
|
|
ra, rb = self.find(a), self.find(b)
|
|
if ra != rb:
|
|
self.parent[rb] = ra
|
|
|
|
|
|
def load_contacts():
|
|
if not os.path.exists(DB_PATH):
|
|
raise SystemExit(
|
|
f"No se encontro la base local en {DB_PATH}. "
|
|
"Ejecuta una sincronizacion desde el dashboard antes de correr este script."
|
|
)
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
try:
|
|
accounts = {
|
|
row["location_id"]: dict(row)
|
|
for row in conn.execute(
|
|
"SELECT location_id, nombre, type FROM accounts"
|
|
).fetchall()
|
|
}
|
|
contacts = conn.execute(
|
|
"SELECT id, location_id, first_name, last_name, email, phone "
|
|
"FROM contacts"
|
|
).fetchall()
|
|
return accounts, [dict(row) for row in contacts]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def index_brand(brand_contacts):
|
|
by_phone = defaultdict(list)
|
|
by_email = defaultdict(list)
|
|
for contact in brand_contacts:
|
|
phone = normalize_phone(contact["phone"])
|
|
if phone:
|
|
by_phone[phone].append(contact)
|
|
email = normalize_email(contact["email"])
|
|
if email:
|
|
by_email[email].append(contact)
|
|
return by_phone, by_email
|
|
|
|
|
|
def build_groups(branch_contacts, match_modes, threshold=MATCH_THRESHOLD):
|
|
"""Agrupa contactos cross-branch que parecen ser la misma persona.
|
|
|
|
Cambio respecto a la versión anterior: el matching por teléfono ahora
|
|
requiere también coincidencia de nombre (vía match_contacts en common.py).
|
|
Si dos contactos comparten teléfono pero el nombre diverge (caso pareja
|
|
con mismo número), se reportan en `unmatched_phone_collisions` para
|
|
revisión manual en vez de unirse silenciosamente.
|
|
|
|
Devuelve (groups, unmatched_phone_collisions).
|
|
"""
|
|
uf = UnionFind()
|
|
phone_index = defaultdict(list)
|
|
email_index = defaultdict(list)
|
|
|
|
for idx, contact in enumerate(branch_contacts):
|
|
if "phone" in match_modes:
|
|
phone = normalize_phone(contact["phone"])
|
|
if phone:
|
|
phone_index[phone].append(idx)
|
|
if "email" in match_modes:
|
|
email = normalize_email(contact["email"])
|
|
if email:
|
|
email_index[email].append(idx)
|
|
|
|
unmatched_phone_collisions = []
|
|
seen_collisions = set()
|
|
# Pares (idx_a, idx_b) que comparten phone pero divergen en nombre. NO
|
|
# deben unirse luego vía email (suele indicar datos confundidos por la
|
|
# integración, no la misma persona).
|
|
collision_pairs = set()
|
|
|
|
# Phone: validar cada par con match_contacts antes de unir.
|
|
for phone, indices in phone_index.items():
|
|
if len(indices) < 2:
|
|
continue
|
|
for i in range(len(indices)):
|
|
for j in range(i + 1, len(indices)):
|
|
idx_a, idx_b = indices[i], indices[j]
|
|
a, b = branch_contacts[idx_a], branch_contacts[idx_b]
|
|
# Saltar pares dentro de la misma location (mismo contacto local).
|
|
if a["location_id"] == b["location_id"]:
|
|
continue
|
|
result = match_contacts(a, b, threshold=threshold)
|
|
if result["level"] != "none":
|
|
uf.union(idx_a, idx_b)
|
|
elif "phone_collision_unresolved" in result["reasons"]:
|
|
pair_key = tuple(sorted((a["id"], b["id"])))
|
|
collision_pairs.add(tuple(sorted((idx_a, idx_b))))
|
|
if pair_key in seen_collisions:
|
|
continue
|
|
seen_collisions.add(pair_key)
|
|
unmatched_phone_collisions.append({
|
|
"phone": phone,
|
|
"a": a,
|
|
"b": b,
|
|
"name_score": result["name_score"],
|
|
})
|
|
|
|
# Email: compartir email exacto es señal fuerte (raro entre personas
|
|
# distintas). Se preserva el comportamiento previo de unir, EXCEPTO
|
|
# cuando el par ya fue marcado como colisión por phone — en ese caso el
|
|
# nombre divergente invalida el grupo aunque coincidan email y teléfono.
|
|
for indices in email_index.values():
|
|
if len(indices) < 2:
|
|
continue
|
|
for i in range(len(indices)):
|
|
for j in range(i + 1, len(indices)):
|
|
pair = tuple(sorted((indices[i], indices[j])))
|
|
if pair in collision_pairs:
|
|
continue
|
|
uf.union(indices[i], indices[j])
|
|
|
|
components = defaultdict(list)
|
|
for idx in range(len(branch_contacts)):
|
|
if idx in uf.parent:
|
|
components[uf.find(idx)].append(idx)
|
|
|
|
groups = []
|
|
for member_indices in components.values():
|
|
if len(member_indices) < 2:
|
|
continue
|
|
members = [branch_contacts[i] for i in member_indices]
|
|
locations = {m["location_id"] for m in members}
|
|
if len(locations) < 2:
|
|
continue
|
|
phones = sorted({normalize_phone(m["phone"]) for m in members if normalize_phone(m["phone"])})
|
|
emails = sorted({normalize_email(m["email"]) for m in members if normalize_email(m["email"])})
|
|
match_reasons = []
|
|
if phones and "phone" in match_modes:
|
|
match_reasons.append("telefono")
|
|
if emails and "email" in match_modes:
|
|
match_reasons.append("email")
|
|
groups.append({
|
|
"members": members,
|
|
"locations": locations,
|
|
"phones": phones,
|
|
"emails": emails,
|
|
"match_reasons": match_reasons,
|
|
})
|
|
|
|
groups.sort(
|
|
key=lambda g: (len(g["locations"]), len(g["members"])),
|
|
reverse=True,
|
|
)
|
|
return groups, unmatched_phone_collisions
|
|
|
|
|
|
def brand_check(group, brand_by_phone, brand_by_email):
|
|
seen_ids = set()
|
|
matches = []
|
|
for phone in group["phones"]:
|
|
for contact in brand_by_phone.get(phone, []):
|
|
if contact["id"] not in seen_ids:
|
|
seen_ids.add(contact["id"])
|
|
matches.append(contact)
|
|
for email in group["emails"]:
|
|
for contact in brand_by_email.get(email, []):
|
|
if contact["id"] not in seen_ids:
|
|
seen_ids.add(contact["id"])
|
|
matches.append(contact)
|
|
return matches
|
|
|
|
|
|
def print_group(idx, group, brand_matches, accounts):
|
|
locations = group["locations"]
|
|
safe_print(
|
|
f"\n[{idx}] {len(locations)} sucursales | {len(group['members'])} contactos "
|
|
f"| coincidencia por: {', '.join(group['match_reasons'])}"
|
|
)
|
|
if group["phones"]:
|
|
safe_print(f" Telefono(s): {', '.join(group['phones'])}")
|
|
if group["emails"]:
|
|
safe_print(f" Email(s): {', '.join(group['emails'])}")
|
|
for member in group["members"]:
|
|
loc_name = accounts.get(member["location_id"], {}).get("nombre") or member["location_id"]
|
|
safe_print(
|
|
f" - {loc_name} ({member['location_id']})\n"
|
|
f" contact_id: {member['id']}\n"
|
|
f" nombre: {display_name(member)}\n"
|
|
f" telefono: {member['phone'] or '(vacio)'}\n"
|
|
f" email: {member['email'] or '(vacio)'}"
|
|
)
|
|
if brand_matches is None:
|
|
return
|
|
if not brand_matches:
|
|
safe_print(" Doble check en Marca: NO existe contacto coincidente (sospechoso).")
|
|
return
|
|
label = "Marca" if len(brand_matches) == 1 else f"Marca (multiples: {len(brand_matches)})"
|
|
safe_print(f" Doble check en {label}:")
|
|
for contact in brand_matches:
|
|
safe_print(
|
|
f" - contact_id: {contact['id']} | nombre: {display_name(contact)} "
|
|
f"| telefono: {contact['phone'] or '(vacio)'} | email: {contact['email'] or '(vacio)'}"
|
|
)
|
|
|
|
|
|
def export_xlsx(path, groups, brand_results, accounts, collisions=None):
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "Duplicados"
|
|
|
|
headers = [
|
|
"grupo", "sucursales", "miembros", "match", "phone_keys", "email_keys",
|
|
"location_id", "location_nombre", "contact_id", "nombre", "telefono", "email",
|
|
"es_marca", "marca_matches",
|
|
]
|
|
ws.append(headers)
|
|
|
|
header_font = Font(bold=True, color="FFFFFF")
|
|
header_fill = PatternFill("solid", fgColor="305496")
|
|
brand_fill = PatternFill("solid", fgColor="FFF2CC")
|
|
for col_idx, _ in enumerate(headers, start=1):
|
|
cell = ws.cell(row=1, column=col_idx)
|
|
cell.font = header_font
|
|
cell.fill = header_fill
|
|
|
|
for idx, (group, brand_matches) in enumerate(zip(groups, brand_results), start=1):
|
|
rows = []
|
|
for member in group["members"]:
|
|
rows.append((
|
|
[
|
|
idx,
|
|
len(group["locations"]),
|
|
len(group["members"]),
|
|
"+".join(group["match_reasons"]),
|
|
";".join(group["phones"]),
|
|
";".join(group["emails"]),
|
|
member["location_id"],
|
|
accounts.get(member["location_id"], {}).get("nombre") or "",
|
|
member["id"],
|
|
display_name(member),
|
|
member["phone"] or "",
|
|
member["email"] or "",
|
|
"no",
|
|
len(brand_matches) if brand_matches is not None else "",
|
|
],
|
|
False,
|
|
))
|
|
if brand_matches:
|
|
for contact in brand_matches:
|
|
rows.append((
|
|
[
|
|
idx,
|
|
len(group["locations"]),
|
|
len(group["members"]),
|
|
"+".join(group["match_reasons"]),
|
|
";".join(group["phones"]),
|
|
";".join(group["emails"]),
|
|
contact["location_id"],
|
|
accounts.get(contact["location_id"], {}).get("nombre") or "",
|
|
contact["id"],
|
|
display_name(contact),
|
|
contact["phone"] or "",
|
|
contact["email"] or "",
|
|
"si",
|
|
len(brand_matches),
|
|
],
|
|
True,
|
|
))
|
|
for row_data, is_brand in rows:
|
|
ws.append(row_data)
|
|
if is_brand:
|
|
for col_idx in range(1, len(headers) + 1):
|
|
ws.cell(row=ws.max_row, column=col_idx).fill = brand_fill
|
|
|
|
widths = [7, 12, 10, 14, 30, 30, 24, 28, 24, 28, 18, 32, 9, 14]
|
|
for col_idx, width in enumerate(widths, start=1):
|
|
ws.column_dimensions[get_column_letter(col_idx)].width = width
|
|
ws.freeze_panes = "A2"
|
|
ws.auto_filter.ref = ws.dimensions
|
|
|
|
# Segunda hoja: colisiones de teléfono que no matchean por nombre.
|
|
# Estos pares comparten teléfono normalizado pero el nombre diverge
|
|
# (caso "pareja con mismo número"). NO se tratan como duplicados.
|
|
if collisions:
|
|
ws2 = wb.create_sheet("colisiones_phone_sin_match")
|
|
collision_headers = [
|
|
"telefono", "name_score",
|
|
"location_id_a", "location_nombre_a", "contact_id_a",
|
|
"nombre_a", "email_a",
|
|
"location_id_b", "location_nombre_b", "contact_id_b",
|
|
"nombre_b", "email_b",
|
|
]
|
|
ws2.append(collision_headers)
|
|
for col_idx in range(1, len(collision_headers) + 1):
|
|
cell = ws2.cell(row=1, column=col_idx)
|
|
cell.font = header_font
|
|
cell.fill = header_fill
|
|
for item in collisions:
|
|
a, b = item["a"], item["b"]
|
|
ws2.append([
|
|
item["phone"],
|
|
round(float(item["name_score"]), 3),
|
|
a["location_id"],
|
|
accounts.get(a["location_id"], {}).get("nombre") or "",
|
|
a["id"],
|
|
display_name(a),
|
|
a["email"] or "",
|
|
b["location_id"],
|
|
accounts.get(b["location_id"], {}).get("nombre") or "",
|
|
b["id"],
|
|
display_name(b),
|
|
b["email"] or "",
|
|
])
|
|
widths2 = [14, 11, 24, 28, 24, 28, 32, 24, 28, 24, 28, 32]
|
|
for col_idx, width in enumerate(widths2, start=1):
|
|
ws2.column_dimensions[get_column_letter(col_idx)].width = width
|
|
ws2.freeze_panes = "A2"
|
|
ws2.auto_filter.ref = ws2.dimensions
|
|
|
|
wb.save(path)
|
|
|
|
|
|
def export_json(path, groups, brand_results, accounts, collisions=None):
|
|
grupos = []
|
|
for group, brand_matches in zip(groups, brand_results):
|
|
grupos.append({
|
|
"sucursales_count": len(group["locations"]),
|
|
"contactos_count": len(group["members"]),
|
|
"match_reasons": group["match_reasons"],
|
|
"phone_keys": group["phones"],
|
|
"email_keys": group["emails"],
|
|
"branch_members": [
|
|
{
|
|
"location_id": m["location_id"],
|
|
"location_nombre": accounts.get(m["location_id"], {}).get("nombre") or "",
|
|
"contact_id": m["id"],
|
|
"nombre": display_name(m),
|
|
"telefono": m["phone"] or "",
|
|
"email": m["email"] or "",
|
|
}
|
|
for m in group["members"]
|
|
],
|
|
"brand_matches": [
|
|
{
|
|
"contact_id": c["id"],
|
|
"nombre": display_name(c),
|
|
"telefono": c["phone"] or "",
|
|
"email": c["email"] or "",
|
|
}
|
|
for c in (brand_matches or [])
|
|
] if brand_matches is not None else None,
|
|
})
|
|
payload = {
|
|
"grupos": grupos,
|
|
"colisiones_phone_sin_match": [
|
|
{
|
|
"telefono": item["phone"],
|
|
"name_score": round(float(item["name_score"]), 3),
|
|
"a": {
|
|
"location_id": item["a"]["location_id"],
|
|
"location_nombre": accounts.get(item["a"]["location_id"], {}).get("nombre") or "",
|
|
"contact_id": item["a"]["id"],
|
|
"nombre": display_name(item["a"]),
|
|
"email": item["a"]["email"] or "",
|
|
},
|
|
"b": {
|
|
"location_id": item["b"]["location_id"],
|
|
"location_nombre": accounts.get(item["b"]["location_id"], {}).get("nombre") or "",
|
|
"contact_id": item["b"]["id"],
|
|
"nombre": display_name(item["b"]),
|
|
"email": item["b"]["email"] or "",
|
|
},
|
|
}
|
|
for item in (collisions or [])
|
|
],
|
|
}
|
|
with open(path, "w", encoding="utf-8") as fh:
|
|
json.dump(payload, fh, ensure_ascii=False, indent=2)
|
|
|
|
|
|
def print_overview(groups, brand_results, accounts, branch_contacts, brand_contacts, match_modes, no_brand_check, phone_collisions=None):
|
|
total_groups = len(groups)
|
|
branch_total = len(branch_contacts)
|
|
brand_total = len(brand_contacts)
|
|
|
|
duplicated_contacts = sum(len(g["members"]) for g in groups)
|
|
locations_in_dupes = set()
|
|
by_branch_dup_contacts = defaultdict(int)
|
|
by_branch_dup_groups = defaultdict(set)
|
|
locations_size_distribution = defaultdict(int)
|
|
contacts_size_distribution = defaultdict(int)
|
|
only_phone = only_email = both = 0
|
|
|
|
for g_idx, group in enumerate(groups):
|
|
locations_size_distribution[len(group["locations"])] += 1
|
|
contacts_size_distribution[len(group["members"])] += 1
|
|
reasons = set(group["match_reasons"])
|
|
if reasons == {"telefono"}:
|
|
only_phone += 1
|
|
elif reasons == {"email"}:
|
|
only_email += 1
|
|
elif "telefono" in reasons and "email" in reasons:
|
|
both += 1
|
|
for member in group["members"]:
|
|
loc = member["location_id"]
|
|
locations_in_dupes.add(loc)
|
|
by_branch_dup_contacts[loc] += 1
|
|
by_branch_dup_groups[loc].add(g_idx)
|
|
|
|
brand_copies_total = 0
|
|
sin_marca = una_en_marca = multi_marca = 0
|
|
if not no_brand_check:
|
|
for r in brand_results:
|
|
if r is None:
|
|
continue
|
|
brand_copies_total += len(r)
|
|
if not r:
|
|
sin_marca += 1
|
|
elif len(r) == 1:
|
|
una_en_marca += 1
|
|
else:
|
|
multi_marca += 1
|
|
|
|
safe_print("\n" + "=" * 78)
|
|
safe_print("OVERVIEW FINAL")
|
|
safe_print("=" * 78)
|
|
safe_print(f"Sucursales auditadas: {len({c['location_id'] for c in branch_contacts})}")
|
|
safe_print(f"Sucursales involucradas en duplicados: {len(locations_in_dupes)}")
|
|
safe_print(f"Contactos totales en sucursales: {branch_total}")
|
|
safe_print(f"Contactos totales en Marca: {brand_total}")
|
|
safe_print(f"Criterios de match aplicados: {', '.join(sorted(match_modes))}")
|
|
safe_print("")
|
|
safe_print(f"Grupos de duplicados detectados: {total_groups}")
|
|
safe_print(f"Contactos duplicados (en sucursales): {duplicated_contacts}")
|
|
if branch_total:
|
|
pct = (duplicated_contacts / branch_total) * 100
|
|
safe_print(f"% de contactos en sucursales duplicados: {pct:.2f}%")
|
|
|
|
safe_print("")
|
|
safe_print("Distribucion por motivo de coincidencia:")
|
|
safe_print(f" - solo por telefono: {only_phone}")
|
|
safe_print(f" - solo por email: {only_email}")
|
|
safe_print(f" - por telefono y email (ambos): {both}")
|
|
|
|
if locations_size_distribution:
|
|
safe_print("")
|
|
safe_print("Distribucion por # de sucursales involucradas:")
|
|
for size in sorted(locations_size_distribution):
|
|
safe_print(f" - {size} sucursales: {locations_size_distribution[size]} grupos")
|
|
|
|
if contacts_size_distribution:
|
|
safe_print("")
|
|
safe_print("Distribucion por # de contactos por grupo:")
|
|
for size in sorted(contacts_size_distribution):
|
|
safe_print(f" - {size} contactos: {contacts_size_distribution[size]} grupos")
|
|
|
|
if not no_brand_check:
|
|
safe_print("")
|
|
safe_print("Doble check contra Marca:")
|
|
safe_print(f" - grupos sin contraparte en Marca: {sin_marca}")
|
|
safe_print(f" - grupos con 1 contacto en Marca: {una_en_marca}")
|
|
safe_print(f" - grupos con multiples copias en Marca: {multi_marca}")
|
|
safe_print(f" - contactos sumados en Marca (anomalias): {brand_copies_total}")
|
|
|
|
if by_branch_dup_contacts:
|
|
ranking = sorted(
|
|
by_branch_dup_contacts.items(),
|
|
key=lambda kv: (kv[1], len(by_branch_dup_groups[kv[0]])),
|
|
reverse=True,
|
|
)
|
|
safe_print("")
|
|
safe_print("Sucursales con mas contactos en duplicados (top 10):")
|
|
for loc_id, count in ranking[:10]:
|
|
nombre = accounts.get(loc_id, {}).get("nombre") or loc_id
|
|
grupos = len(by_branch_dup_groups[loc_id])
|
|
safe_print(f" - {nombre} ({loc_id}): {count} contactos en {grupos} grupos")
|
|
|
|
if phone_collisions:
|
|
safe_print("")
|
|
safe_print("Colisiones de telefono sin match por nombre (revisión manual):")
|
|
safe_print(f" - pares detectados: {len(phone_collisions)}")
|
|
safe_print(" Ver hoja 'colisiones_phone_sin_match' del XLSX para detalle.")
|
|
|
|
|
|
def parse_args():
|
|
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
|
|
parser.add_argument(
|
|
"--match", default="phone,email",
|
|
help="Criterios de match: 'phone', 'email' o 'phone,email' (default).",
|
|
)
|
|
parser.add_argument(
|
|
"--no-brand-check", action="store_true",
|
|
help="Omite la verificacion cruzada contra la cuenta de Marca.",
|
|
)
|
|
parser.add_argument(
|
|
"--top", type=int, default=30,
|
|
help="Cantidad de grupos a imprimir en consola (default 30). Usa 0 para imprimir todos.",
|
|
)
|
|
parser.add_argument(
|
|
"--xlsx", dest="xlsx_path", nargs="?", const="",
|
|
help="Exporta Excel (.xlsx). Sin argumento guarda en exports/ con timestamp; con ruta absoluta usa esa ruta.",
|
|
)
|
|
parser.add_argument(
|
|
"--json", dest="json_path", nargs="?", const="",
|
|
help="Exporta JSON. Sin argumento guarda en exports/ con timestamp; con ruta absoluta usa esa ruta.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def main():
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
|
|
args = parse_args()
|
|
match_modes = {m.strip().lower() for m in args.match.split(",") if m.strip()}
|
|
invalid = match_modes - {"phone", "email"}
|
|
if invalid or not match_modes:
|
|
raise SystemExit(f"Criterios de --match invalidos: {invalid or 'vacio'}")
|
|
|
|
accounts, contacts = load_contacts()
|
|
branch_contacts = [c for c in contacts if c["location_id"] != BRAND_LOCATION_ID]
|
|
brand_contacts = [c for c in contacts if c["location_id"] == BRAND_LOCATION_ID]
|
|
branch_locations = {c["location_id"] for c in branch_contacts}
|
|
|
|
safe_print("=" * 78)
|
|
safe_print("AUDIT: CONTACTOS DUPLICADOS ENTRE SUCURSALES")
|
|
safe_print("=" * 78)
|
|
safe_print(f"Sucursales con contactos: {len(branch_locations)}")
|
|
safe_print(f"Contactos en sucursales: {len(branch_contacts)}")
|
|
safe_print(f"Contactos en Marca: {len(brand_contacts)}")
|
|
safe_print(f"Criterios de match: {', '.join(sorted(match_modes))}")
|
|
|
|
sin_phone = sum(1 for c in branch_contacts if not normalize_phone(c["phone"]))
|
|
sin_email = sum(1 for c in branch_contacts if not normalize_email(c["email"]))
|
|
safe_print(f"Sin telefono normalizable: {sin_phone}")
|
|
safe_print(f"Sin email normalizable: {sin_email}")
|
|
|
|
groups, phone_collisions = build_groups(branch_contacts, match_modes)
|
|
safe_print(f"\nGrupos duplicados detectados (>=2 sucursales): {len(groups)}")
|
|
if "phone" in match_modes:
|
|
safe_print(
|
|
f"Colisiones de telefono sin match (revisión manual): {len(phone_collisions)}"
|
|
)
|
|
|
|
brand_by_phone, brand_by_email = ({}, {})
|
|
if not args.no_brand_check:
|
|
brand_by_phone, brand_by_email = index_brand(brand_contacts)
|
|
|
|
brand_results = []
|
|
for group in groups:
|
|
if args.no_brand_check:
|
|
brand_results.append(None)
|
|
else:
|
|
brand_results.append(brand_check(group, brand_by_phone, brand_by_email))
|
|
|
|
# Bucket counts para resumen.
|
|
sin_marca = sum(1 for r in brand_results if r is not None and not r)
|
|
una_en_marca = sum(1 for r in brand_results if r is not None and len(r) == 1)
|
|
multi_marca = sum(1 for r in brand_results if r is not None and len(r) > 1)
|
|
if not args.no_brand_check:
|
|
safe_print(f" - sin contraparte en Marca: {sin_marca}")
|
|
safe_print(f" - con 1 contacto en Marca: {una_en_marca}")
|
|
safe_print(f" - con multiples copias en Marca: {multi_marca}")
|
|
|
|
limit = len(groups) if args.top <= 0 else min(args.top, len(groups))
|
|
if limit:
|
|
safe_print("\n" + "-" * 78)
|
|
safe_print(f"DETALLE (top {limit} grupos por # sucursales involucradas)")
|
|
safe_print("-" * 78)
|
|
for i in range(limit):
|
|
print_group(i + 1, groups[i], brand_results[i], accounts)
|
|
|
|
if args.xlsx_path is not None:
|
|
xlsx_path, in_exports = resolve_export_path(args.xlsx_path, "duplicados_sucursales", ".xlsx")
|
|
export_xlsx(xlsx_path, groups, brand_results, accounts, phone_collisions)
|
|
safe_print(f"\nExcel exportado en: {xlsx_path}")
|
|
if in_exports:
|
|
safe_print(f"[DOWNLOAD] /api/exports/{os.path.basename(xlsx_path)}")
|
|
|
|
if args.json_path is not None:
|
|
json_path, in_exports = resolve_export_path(args.json_path, "duplicados_sucursales", ".json")
|
|
export_json(json_path, groups, brand_results, accounts, phone_collisions)
|
|
safe_print(f"JSON exportado en: {json_path}")
|
|
if in_exports:
|
|
safe_print(f"[DOWNLOAD] /api/exports/{os.path.basename(json_path)}")
|
|
|
|
print_overview(
|
|
groups, brand_results, accounts,
|
|
branch_contacts, brand_contacts, match_modes, args.no_brand_check,
|
|
phone_collisions,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|