Files
MP-Manager/scripts/sync_contacts_branch_to_brand.py
2026-05-30 14:31:19 -06:00

954 lines
38 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Sync branch contacts into the main brand account.
Dry-run is the default. Use --apply to write changes in GHL.
"""
import argparse
import csv
import json
import os
import re
import sys
import threading
import unicodedata
from concurrent.futures import ThreadPoolExecutor, as_completed
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
import script_audit # noqa: E402
import sync_engine # noqa: E402
from common import FIELD_ALIASES, match_contacts # noqa: E402
MATCH_THRESHOLD = 0.80
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
VERIFIER_CSV = os.path.join(ROOT_DIR, "Monte Providencia - Verificador de sucursales y correos - Sucursales.csv")
SCALAR_STANDARD_FIELDS = [
"firstName", "lastName", "name",
"email", "phone",
"address1", "city", "state", "country", "postalCode",
"dateOfBirth", "companyName", "website", "timezone",
"source", "type", "dnd",
# NOTA: assignedTo NO se sincroniza cross-location porque los userIds son
# por location en GHL. Asignar un user de la sucursal a Marca devuelve 400.
]
BOOLEAN_STANDARD_FIELDS = {"dnd"}
LIST_STANDARD_FIELDS = ["tags"]
# NOTA: additionalEmails NO se sincroniza porque GHL lo devuelve como lista de
# objetos {validEmailDate, email}, no como lista plana de strings.
CUSTOM_VALUE_KEYS = (
"value", "fieldValueString", "fieldValueDate",
"fieldValueNumber", "fieldValueArray",
"fieldValueOptions", "fieldValueFile",
)
DIGITAL_SOURCE_KEYWORDS = (
"facebook", "instagram", "form", "tiktok", "google",
"youtube", "whatsapp", "ads", "lead", "digital",
"messenger", "linkedin", "api", "landing",
)
def clean(value):
return str(value or "").strip()
def normalize_phone(phone):
digits = re.sub(r"\D+", "", str(phone or ""))
return digits[-10:] if len(digits) >= 10 else digits
def normalize_email(email):
return clean(email).lower()
def remove_accents(text):
if not text:
return ""
accents_map = {
"á": "a", "é": "e", "í": "i", "ó": "o", "ú": "u", "ü": "u", "ñ": "n",
"Á": "a", "É": "e", "Í": "i", "Ó": "o", "Ú": "u", "Ü": "u", "Ñ": "n",
}
return "".join(accents_map.get(char, char.lower()) for char in str(text))
def contact_name(contact):
full_name = contact.get("contactName") or contact.get("name")
if full_name:
return clean(full_name)
return clean(f"{contact.get('firstName') or ''} {contact.get('lastName') or ''}")
def normalize_name(contact):
name = remove_accents(contact_name(contact))
name = re.sub(r"[^\w\s]", " ", name)
return " ".join(name.split())
def get_contact_id(contact):
return contact.get("id") or contact.get("contactId")
def norm_field_name(name):
text = unicodedata.normalize("NFKD", str(name or "").strip().lower())
text = "".join(ch for ch in text if not unicodedata.combining(ch))
return " ".join(text.split())
def build_alias_lookup():
lookup = {}
for canonical, variants in FIELD_ALIASES.items():
for variant in variants:
lookup[norm_field_name(variant)] = canonical
return lookup
ALIAS_LOOKUP = build_alias_lookup()
def build_brand_field_indexes(brand_schema):
by_norm = {norm_field_name(name): (name, fid) for name, fid in brand_schema.items()}
by_canonical = {}
for name, fid in brand_schema.items():
canonical = ALIAS_LOOKUP.get(norm_field_name(name))
if canonical and canonical not in by_canonical:
by_canonical[canonical] = (name, fid)
return by_norm, by_canonical
def resolve_brand_field(branch_field_name, brand_schema, brand_norm, brand_by_canonical):
if branch_field_name in brand_schema:
return branch_field_name, brand_schema[branch_field_name]
norm = norm_field_name(branch_field_name)
if norm in brand_norm:
return brand_norm[norm]
canonical = ALIAS_LOOKUP.get(norm)
if canonical and canonical in brand_by_canonical:
return brand_by_canonical[canonical]
return None, None
def extract_custom_value(field):
for key in CUSTOM_VALUE_KEYS:
if key not in field:
continue
value = field[key]
if value is None:
continue
if isinstance(value, str) and not value.strip():
continue
if isinstance(value, (list, dict)) and not value:
continue
return value
return None
def list_value(value):
if isinstance(value, list):
return [str(item).strip() for item in value if str(item).strip()]
return []
def merge_unique(existing, incoming):
seen = set()
result = []
for item in list(existing) + list(incoming):
key = item.lower() if isinstance(item, str) else item
if key in seen:
continue
seen.add(key)
result.append(item)
return result
def values_equal(a, b):
if isinstance(a, list) and isinstance(b, list):
return sorted(map(str, a)) == sorted(map(str, b))
return clean(a) == clean(b)
def should_copy_custom_value(source_value, target_value, overwrite):
if source_value is None:
return False
if isinstance(source_value, str) and not source_value.strip():
return False
if isinstance(source_value, list) and not source_value:
return False
if values_equal(source_value, target_value):
return False
if overwrite:
return True
if target_value is None:
return True
if isinstance(target_value, str) and not target_value.strip():
return True
if isinstance(target_value, list) and not target_value:
return True
return False
def custom_fields_by_id(contact):
result = {}
for field in contact.get("customFields", []) or []:
if not isinstance(field, dict):
continue
fid = field.get("id") or field.get("fieldId")
if not fid:
continue
value = extract_custom_value(field)
if value is not None:
result[fid] = value
return result
def is_digital_source(source_value):
if not source_value:
return False
text = str(source_value).lower()
return any(keyword in text for keyword in DIGITAL_SOURCE_KEYWORDS)
def extract_tienda_label(account_nombre):
if not account_nombre:
return None
parts = str(account_nombre).split(" - ")
if len(parts) < 3:
return None
raw = " - ".join(parts[2:]).strip()
return raw.upper() if raw else None
def load_branch_verifier(path=VERIFIER_CSV):
"""Carga el verificador: {location_id: {tienda, sucursal, sc}}.
Descarta filas ambiguas: si varias filas apuntan al mismo location_id con
distintos TIENDA, se conserva la primera y se reportan los conflictos.
"""
mapping = {}
conflicts = {}
if not os.path.exists(path):
return mapping, conflicts
try:
with open(path, encoding="utf-8-sig", newline="") as fh:
for row in csv.DictReader(fh):
loc = clean(row.get("ID LOCATION BUCEFALO"))
if not loc:
continue
tienda_raw = clean(row.get("TIENDA"))
sucursal_raw = clean(row.get("SUCURSAL"))
sc_raw = clean(row.get("SC BUCEFALO"))
entry = {
"tienda": tienda_raw.upper() if tienda_raw else None,
"sucursal": sucursal_raw if sucursal_raw and sucursal_raw != "-" else None,
"sc": sc_raw,
}
if loc in mapping:
if mapping[loc]["tienda"] != entry["tienda"]:
conflicts.setdefault(loc, [mapping[loc]]).append(entry)
continue
mapping[loc] = entry
except Exception as exc:
print(f"WARN no se pudo cargar verificador de sucursales: {exc}")
return mapping, conflicts
def validate_verifier_against_accounts(verifier_map, accounts):
"""Marca como ambiguous las entries cuyo SC BUCEFALO no coincide con el nombre real."""
invalid_locs = set()
for acc in accounts:
loc = acc["location_id"]
info = verifier_map.get(loc)
if not info:
continue
expected_norm = norm_field_name(acc.get("nombre"))
sc_norm = norm_field_name(info.get("sc"))
if sc_norm and expected_norm and sc_norm != expected_norm:
invalid_locs.add(loc)
return invalid_locs
def resolve_branch_labels(branch_account, verifier_map):
"""Devuelve (tienda, sucursal) usando el verificador, fallback al nombre de la cuenta."""
loc = branch_account.get("location_id")
info = verifier_map.get(loc) if verifier_map else None
tienda = (info or {}).get("tienda")
sucursal = (info or {}).get("sucursal")
if not tienda:
tienda = extract_tienda_label(branch_account.get("nombre"))
if not sucursal:
sucursal = tienda
return tienda, sucursal
def brand_field_id_by_alias(brand_by_canonical, alias):
entry = brand_by_canonical.get(alias)
return entry[1] if entry else None
def plan_bi_autofill(branch_contact, brand_contact, branch_account,
brand_by_canonical, existing_custom_payload, verifier_map):
in_payload_ids = {item["id"] for item in existing_custom_payload}
brand_custom_by_id = custom_fields_by_id(brand_contact or {})
additions = []
changes = []
def add(alias_key, value, label):
field_id = brand_field_id_by_alias(brand_by_canonical, alias_key)
if not field_id or field_id in in_payload_ids:
return
existing_value = brand_custom_by_id.get(field_id)
if existing_value is not None and clean(existing_value):
return
additions.append({"id": field_id, "value": value})
changes.append((field_id, label, existing_value, value))
in_payload_ids.add(field_id)
tienda, sucursal = resolve_branch_labels(branch_account, verifier_map)
if tienda:
add("tienda", tienda, "TIENDA (auto)")
if sucursal:
add("sucursal", sucursal, "Sucursal (auto)")
source_value = branch_contact.get("source") or ""
raw_tags = branch_contact.get("tags") or []
tags_lower = {str(t).lower() for t in raw_tags if t}
has_sucursal_tag = "sucursal" in tags_lower
if not is_digital_source(source_value) and (has_sucursal_tag or not source_value):
add("canal_origen", "SUCURSAL", "Canal de Origen (auto)")
add("fuente_prospecto", "SUCURSAL", "Fuente de Prospecto (auto)")
return additions, changes
def has_corrupt_encoding(name):
if not name:
return False
return "" in str(name)
def detect_corrupt_schema_fields(schema):
return sorted(name for name in schema if has_corrupt_encoding(name))
def matches_field_filter(field_filter, field_filter_norm, name):
if not field_filter:
return True
if name in field_filter:
return True
return norm_field_name(name) in field_filter_norm
def build_match_index(contacts):
index = {"phone": {}, "email": {}, "name": {}}
for contact in contacts:
index_add_contact(index, contact)
return index
def index_add_contact(index, contact):
phone = normalize_phone(contact.get("phone"))
email = normalize_email(contact.get("email"))
name = normalize_name(contact)
if phone:
index["phone"].setdefault(phone, []).append(contact)
if email:
index["email"].setdefault(email, []).append(contact)
if name:
index["name"].setdefault(name, []).append(contact)
def find_brand_contact(branch_contact, brand_index):
"""Devuelve (match, label, ambiguous, collision).
Cambio respecto a la versión anterior: cuando hay match único por
teléfono, ahora se exige también que el nombre coincida con similitud
>= MATCH_THRESHOLD (vía common.match_contacts). Si el teléfono coincide
pero el nombre diverge, se devuelve el candidato como `collision` (no
como match) para que el caller lo reporte sin mergear. Esto evita el
caso real de pareja con mismo número siendo tratados como duplicado.
"""
match_order = (
("phone", normalize_phone(branch_contact.get("phone")), "telefono"),
("email", normalize_email(branch_contact.get("email")), "email"),
("name", normalize_name(branch_contact), "nombre"),
)
last_ambiguous_label = None
phone_collision = None
for key, value, label in match_order:
if not value:
continue
matches = brand_index[key].get(value, [])
if len(matches) == 1:
candidate = matches[0]
if key == "phone":
result = match_contacts(branch_contact, candidate, threshold=MATCH_THRESHOLD)
if result["level"] in ("strong", "medium"):
return candidate, label, False, None
phone_collision = candidate
continue
return candidate, label, False, None
if len(matches) > 1:
last_ambiguous_label = label
continue
if last_ambiguous_label:
return None, last_ambiguous_label, True, None
if phone_collision is not None:
return None, "telefono", False, phone_collision
return None, None, False, None
def parse_field_filter(raw_fields):
if not raw_fields:
return None
fields = {clean(field) for field in raw_fields.split(",") if clean(field)}
return fields or None
def should_copy_value(source_value, target_value, overwrite):
if source_value is None:
return False
if isinstance(source_value, str) and not source_value.strip():
return False
if clean(source_value) == clean(target_value):
return False
if overwrite:
return True
return not clean(target_value)
def select_accounts(args, accounts):
if args.location:
selected = [a for a in accounts if a["location_id"] == args.location and a["location_id"] != BRAND_LOCATION_ID]
if not selected:
raise SystemExit(f"Location {args.location} no existe como sucursal en el CSV de tokens")
return selected
if args.all:
return [a for a in accounts if a["location_id"] != BRAND_LOCATION_ID and "demo" not in a["nombre"].lower()]
raise SystemExit("Especifica --location <id> o --all")
def record_planned_change(args, location_id, object_id, field_id, field_name, old_value, new_value):
return script_audit.record_change(args.run_id, location_id, "contact", object_id, field_id, field_name, old_value, new_value)
def plan_contact_payload(branch_contact, brand_contact, branch_id_to_name, brand_schema, brand_norm, brand_by_canonical, branch_account, args):
field_filter = args.field_filter
field_filter_norm = args.field_filter_norm
overwrite = args.overwrite
payload = {}
changes = []
dropped = []
for field_name in SCALAR_STANDARD_FIELDS:
if not matches_field_filter(field_filter, field_filter_norm, field_name):
continue
raw_value = branch_contact.get(field_name)
target_raw = brand_contact.get(field_name) if brand_contact else None
if field_name in BOOLEAN_STANDARD_FIELDS:
if raw_value is None:
continue
source_value = bool(raw_value)
if source_value == bool(target_raw):
continue
if not overwrite and target_raw is not None and target_raw != "":
continue
payload[field_name] = source_value
changes.append((field_name, field_name, target_raw, source_value))
continue
source_value = clean(raw_value)
target_value = clean(target_raw)
if should_copy_value(source_value, target_value, overwrite):
payload[field_name] = source_value
changes.append((field_name, field_name, target_value, source_value))
for field_name in LIST_STANDARD_FIELDS:
if not matches_field_filter(field_filter, field_filter_norm, field_name):
continue
source_list = list_value(branch_contact.get(field_name))
target_list = list_value(brand_contact.get(field_name) if brand_contact else None)
if not source_list:
continue
merged = merge_unique(target_list, source_list)
if set(map(str, merged)) != set(map(str, target_list)):
payload[field_name] = merged
changes.append((field_name, field_name, target_list, merged))
brand_custom_by_id = custom_fields_by_id(brand_contact or {})
custom_payload = []
seen_brand_field_ids = set()
for field in branch_contact.get("customFields", []) or []:
if not isinstance(field, dict):
continue
branch_fid = field.get("id") or field.get("fieldId")
branch_field_name = branch_id_to_name.get(branch_fid)
if not branch_field_name:
continue
if not matches_field_filter(field_filter, field_filter_norm, branch_field_name):
continue
source_value = extract_custom_value(field)
if source_value is None:
continue
brand_name, brand_field_id = resolve_brand_field(
branch_field_name, brand_schema, brand_norm, brand_by_canonical
)
if not brand_field_id:
dropped.append(branch_field_name)
continue
if brand_field_id in seen_brand_field_ids:
continue
target_value = brand_custom_by_id.get(brand_field_id)
if should_copy_custom_value(source_value, target_value, overwrite):
custom_payload.append({"id": brand_field_id, "value": source_value})
changes.append((brand_field_id, brand_name, target_value, source_value))
seen_brand_field_ids.add(brand_field_id)
if args.auto_bi:
bi_additions, bi_changes = plan_bi_autofill(
branch_contact, brand_contact, branch_account, brand_by_canonical, custom_payload,
args.verifier_map,
)
custom_payload.extend(bi_additions)
changes.extend(bi_changes)
if custom_payload:
payload["customFields"] = custom_payload
return payload, changes, dropped
def create_payload(branch_contact, branch_id_to_name, brand_schema, brand_norm, brand_by_canonical, branch_account, args):
payload = {"locationId": BRAND_LOCATION_ID}
field_filter = args.field_filter
field_filter_norm = args.field_filter_norm
dropped = []
for field_name in SCALAR_STANDARD_FIELDS:
if not matches_field_filter(field_filter, field_filter_norm, field_name):
continue
raw_value = branch_contact.get(field_name)
if field_name in BOOLEAN_STANDARD_FIELDS:
if raw_value is None:
continue
payload[field_name] = bool(raw_value)
continue
value = clean(raw_value)
if value:
payload[field_name] = value
for field_name in LIST_STANDARD_FIELDS:
if not matches_field_filter(field_filter, field_filter_norm, field_name):
continue
source_list = list_value(branch_contact.get(field_name))
if source_list:
payload[field_name] = source_list
custom_payload = []
seen_brand_field_ids = set()
for field in branch_contact.get("customFields", []) or []:
if not isinstance(field, dict):
continue
branch_fid = field.get("id") or field.get("fieldId")
branch_field_name = branch_id_to_name.get(branch_fid)
if not branch_field_name:
continue
if not matches_field_filter(field_filter, field_filter_norm, branch_field_name):
continue
source_value = extract_custom_value(field)
if source_value is None:
continue
_, brand_field_id = resolve_brand_field(
branch_field_name, brand_schema, brand_norm, brand_by_canonical
)
if not brand_field_id:
dropped.append(branch_field_name)
continue
if brand_field_id in seen_brand_field_ids:
continue
custom_payload.append({"id": brand_field_id, "value": source_value})
seen_brand_field_ids.add(brand_field_id)
if args.auto_bi:
bi_additions, _ = plan_bi_autofill(
branch_contact, None, branch_account, brand_by_canonical, custom_payload,
args.verifier_map,
)
custom_payload.extend(bi_additions)
if custom_payload:
payload["customFields"] = custom_payload
return payload, dropped
def apply_update(brand_token, brand_contact_id, payload):
return sync_engine.ghl_client.update_contact(brand_token, brand_contact_id, payload)
def apply_create(brand_token, payload):
return sync_engine.ghl_client.create_contact(brand_token, payload)
def process_branch(branch_account, brand_state, args):
location_id = branch_account["location_id"]
branch_token = branch_account["token"]
brand_token = brand_state["account"]["token"]
dry_run = not args.apply
stats = {
"contacts": 0,
"matched": 0,
"created": 0,
"updated": 0,
"planned_creates": 0,
"planned_updates": 0,
"ambiguous": 0,
"phone_collisions_unresolved": 0,
"unchanged": 0,
"errors": 0,
"dropped_unmapped_fields": 0,
}
dropped_field_counter = {}
print(f"\n=== {branch_account['nombre']} ({location_id}) ===")
branch_schema = sync_engine.ghl_client.get_object_schema(branch_token, location_id, "contact")
branch_id_to_name = {field_id: name for name, field_id in branch_schema.items()}
brand_schema = brand_state["schema"]
brand_norm = brand_state["schema_norm"]
brand_by_canonical = brand_state["schema_by_canonical"]
branch_contacts = sync_engine.ghl_client.get_all_contacts(branch_token, location_id)
if args.limit:
branch_contacts = branch_contacts[:args.limit]
print(f"Contactos de sucursal a revisar: {len(branch_contacts)}")
seen_phones = {}
seen_emails = {}
internal_dups = 0
for c in branch_contacts:
p = normalize_phone(c.get("phone"))
if p:
seen_phones.setdefault(p, []).append(get_contact_id(c))
e = normalize_email(c.get("email"))
if e:
seen_emails.setdefault(e, []).append(get_contact_id(c))
dup_phones = {k: v for k, v in seen_phones.items() if len(v) > 1}
dup_emails = {k: v for k, v in seen_emails.items() if len(v) > 1}
if dup_phones or dup_emails:
internal_dups = len(dup_phones) + len(dup_emails)
print(f"WARN [{location_id}] duplicados internos detectados: {len(dup_phones)} por telefono, {len(dup_emails)} por email")
for p, ids in list(dup_phones.items())[:3]:
print(f" phone {p}: {ids}")
for e, ids in list(dup_emails.items())[:3]:
print(f" email {e}: {ids}")
brand_lock = brand_state["lock"]
for branch_contact in branch_contacts:
if not script_audit.wait_if_paused_or_stopped(args.run_id):
break
stats["contacts"] += 1
branch_contact_id = get_contact_id(branch_contact)
label = contact_name(branch_contact) or branch_contact_id or "Sin nombre"
with brand_lock:
brand_contact, match_method, ambiguous, collision = find_brand_contact(branch_contact, brand_state["index"])
if ambiguous:
stats["ambiguous"] += 1
print(f"SKIP ambiguo por {match_method}: {label}")
continue
if collision is not None:
# Phone match único pero nombre diverge — caso pareja con mismo
# número. NO crear ni actualizar: probablemente es otra persona.
stats["phone_collisions_unresolved"] += 1
colliding_id = get_contact_id(collision)
print(f"SKIP colision telefono con Marca {colliding_id}: {label}")
if args.run_id:
try:
script_audit.record_change(
args.run_id, BRAND_LOCATION_ID, "contact",
branch_contact_id or f"branch:{label}",
"", "skipped_phone_collision", None,
{
"branch_contact_id": branch_contact_id,
"branch_location_id": location_id,
"colliding_brand_contact_id": colliding_id,
"phone": branch_contact.get("phone"),
},
)
except Exception as audit_exc:
print(f" WARN no se pudo registrar la colision: {audit_exc}")
continue
if not brand_contact:
if args.no_create:
stats["unchanged"] += 1
continue
payload, create_dropped = create_payload(
branch_contact, branch_id_to_name, brand_schema, brand_norm, brand_by_canonical, branch_account, args
)
for fname in create_dropped:
dropped_field_counter[fname] = dropped_field_counter.get(fname, 0) + 1
stats["dropped_unmapped_fields"] += 1
source_ref = branch_contact_id or label
change_id = record_planned_change(
args, BRAND_LOCATION_ID, f"branch:{source_ref}", "__create__", "__create__", None, payload
)
if dry_run:
stats["planned_creates"] += 1
print(f"PLAN crear en Marca: {label}")
continue
with brand_lock:
recheck_contact, _, _, _ = find_brand_contact(branch_contact, brand_state["index"])
if recheck_contact:
script_audit.mark_change(change_id, "applied", "skipped: created by another worker")
stats["unchanged"] += 1
print(f"SKIP doble-create evitado: {label}")
continue
try:
response = apply_create(brand_token, payload)
created = response.get("contact", response if isinstance(response, dict) else {})
created_id = created.get("id")
if created_id:
created["id"] = created_id
brand_state["contacts"].append(created)
index_add_contact(brand_state["index"], created)
script_audit.mark_change(change_id, "applied")
stats["created"] += 1
print(f"OK creado en Marca: {label} -> {created_id or '(sin id en respuesta)'}")
except Exception as exc:
script_audit.mark_change(change_id, "failed", str(exc))
stats["errors"] += 1
print(f"ERROR crear {label}: {exc}")
continue
stats["matched"] += 1
if args.no_update:
stats["unchanged"] += 1
continue
with brand_lock:
brand_snapshot = dict(brand_contact)
brand_snapshot["customFields"] = [dict(f) for f in (brand_contact.get("customFields") or []) if isinstance(f, dict)]
payload, changes, plan_dropped = plan_contact_payload(
branch_contact, brand_snapshot, branch_id_to_name,
brand_schema, brand_norm, brand_by_canonical, branch_account, args,
)
for fname in plan_dropped:
dropped_field_counter[fname] = dropped_field_counter.get(fname, 0) + 1
stats["dropped_unmapped_fields"] += 1
if not changes:
stats["unchanged"] += 1
continue
brand_contact_id = get_contact_id(brand_contact)
change_ids = []
change_labels = []
for field_id, field_name, old_value, new_value in changes:
change_labels.append(field_name)
cid = record_planned_change(
args, BRAND_LOCATION_ID, brand_contact_id, field_id, field_name, old_value, new_value
)
if cid is not None:
change_ids.append(cid)
if dry_run:
stats["planned_updates"] += 1
print(f"PLAN actualizar Marca ({match_method}) {label}: {', '.join(change_labels)}")
continue
with brand_lock:
try:
apply_update(brand_token, brand_contact_id, payload)
stats["updated"] += 1
for key, value in payload.items():
if key == "customFields":
existing = {f.get("id") or f.get("fieldId"): f for f in brand_contact.get("customFields", []) or []}
for cf in value:
existing[cf["id"]] = {"id": cf["id"], "value": cf["value"]}
brand_contact["customFields"] = list(existing.values())
else:
brand_contact[key] = value
for cid in change_ids:
script_audit.mark_change(cid, "applied")
print(f"OK actualizado Marca ({match_method}) {label}: {len(changes)} cambio(s)")
except Exception as exc:
for cid in change_ids:
script_audit.mark_change(cid, "failed", str(exc))
stats["errors"] += 1
print(f"ERROR actualizar {label}: {exc}")
if dropped_field_counter:
top_dropped = sorted(dropped_field_counter.items(), key=lambda kv: kv[1], reverse=True)[:10]
summary = ", ".join(f"{name}({count})" for name, count in top_dropped)
print(f"WARN [{location_id}] custom fields sin contraparte en Marca: {summary}")
print(f"Stats {location_id}: {stats}")
return stats
def add_stats(total, current):
for key, value in current.items():
total[key] = total.get(key, 0) + value
def parse_args():
parser = argparse.ArgumentParser(description="Sincroniza contactos de sucursales hacia la Marca Principal. Dry-run por defecto.")
parser.add_argument("--location", help="Location ID de una sucursal")
parser.add_argument("--all", action="store_true", help="Procesa todas las sucursales no demo del CSV")
parser.add_argument("--apply", action="store_true", help="Aplica POST/PUT reales en GHL. Sin este flag solo simula")
parser.add_argument("--overwrite", action="store_true", help="Sobrescribe valores existentes en Marca cuando difieren")
parser.add_argument("--no-create", action="store_true", help="No crea contactos faltantes en Marca")
parser.add_argument("--no-update", action="store_true", help="No actualiza contactos existentes en Marca")
parser.add_argument("--fields", help="Campos a sincronizar separados por coma. Default: campos BI/vehiculo + nombre/email/telefono")
parser.add_argument("--limit", type=int, help="Limite de contactos por sucursal para esta corrida (escape hatch para debugging; sin default procesa todo)")
parser.add_argument("--workers", type=int, default=1, help="Paralelismo para procesar sucursales. Default: 1")
parser.add_argument("--run-id", help="Audit run ID suministrado por dashboard")
parser.add_argument("--yes", action="store_true", help="Salta la confirmacion interactiva. Requerido para --apply --all en entornos no TTY (dashboard).")
parser.add_argument("--no-auto-bi", action="store_true", help="Desactiva el auto-poblado de TIENDA/Sucursal/Canal/Fuente al sincronizar. Por defecto se rellenan cuando estan vacios en Marca.")
args = parser.parse_args()
if args.limit is not None and args.limit < 1:
raise SystemExit("--limit debe ser mayor a 0")
if args.workers < 1:
raise SystemExit("--workers debe ser mayor a 0")
args.field_filter = parse_field_filter(args.fields)
args.field_filter_norm = {norm_field_name(f) for f in args.field_filter} if args.field_filter else None
args.auto_bi = not args.no_auto_bi
args.verifier_map, args.verifier_conflicts = load_branch_verifier()
return args
def main():
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
args = parse_args()
accounts = sync_engine.parse_accounts_csv()
brand_account = next((a for a in accounts if a["location_id"] == BRAND_LOCATION_ID), None)
if not brand_account:
raise SystemExit("No se encontro la cuenta de Marca Principal en el CSV de tokens")
branch_accounts = select_accounts(args, accounts)
if args.apply and args.all and not args.yes:
if sys.stdin.isatty():
print(f"\nVas a aplicar cambios reales en {len(branch_accounts)} sucursales contra GHL.")
response = input("Escribe CONFIRMO para continuar: ").strip()
if response != "CONFIRMO":
raise SystemExit("Cancelado por el usuario.")
else:
raise SystemExit(
"--apply --all requiere --yes en entornos no TTY (dashboard/subprocess). Abortando por seguridad."
)
if args.run_id:
script_audit.create_run(
args.run_id,
os.path.basename(__file__),
arguments=" ".join(sys.argv[1:]),
locations=[a["location_id"] for a in branch_accounts],
execution_mode="parallel" if args.workers > 1 else "sequential",
)
print("=" * 78)
print("SYNC CONTACTS SUCURSAL -> MARCA")
print("=" * 78)
print(f"Modo: {'APPLY (POST/PUT real en GHL)' if args.apply else 'DRY-RUN (sin cambios)'}")
print(f"Sucursales: {len(branch_accounts)} | Workers: {min(args.workers, len(branch_accounts))}")
print(f"Update: {not args.no_update} | Create: {not args.no_create} | Overwrite: {args.overwrite}")
if args.field_filter:
print(f"Campos filtrados: {', '.join(sorted(args.field_filter))}")
else:
print("Campos custom: match por nombre exacto, normalizado (mayus/acentos) y por aliases del manual.")
print(f"Estandar copiados: {', '.join(SCALAR_STANDARD_FIELDS)}")
print(f"Listas unidas (no overwrite): {', '.join(LIST_STANDARD_FIELDS)}")
print("\nCargando contactos live de Marca Principal...")
brand_schema = sync_engine.ghl_client.get_object_schema(brand_account["token"], BRAND_LOCATION_ID, "contact")
brand_norm, brand_by_canonical = build_brand_field_indexes(brand_schema)
corrupt = detect_corrupt_schema_fields(brand_schema)
if corrupt:
print(f"WARN Marca tiene {len(corrupt)} campo(s) con encoding corrupto en el schema:")
for name in corrupt:
print(f" - {repr(name)} (id: {brand_schema[name]})")
print(" Estos campos se sincronizan pero conviene renombrarlos en GHL.")
if args.verifier_map:
if args.verifier_conflicts:
print(f"WARN verificador con {len(args.verifier_conflicts)} location_id(s) duplicados con TIENDA contradictorias (data quality del CSV):")
for loc, entries in args.verifier_conflicts.items():
tiendas = [e.get("tienda") for e in entries]
print(f" - {loc}: {tiendas}")
invalid = validate_verifier_against_accounts(args.verifier_map, branch_accounts)
for loc in invalid:
args.verifier_map.pop(loc, None)
missing_in_verifier = [a for a in branch_accounts if a["location_id"] not in args.verifier_map]
if missing_in_verifier:
print(f"WARN {len(missing_in_verifier)} sucursal(es) usaran fallback al nombre (sin verificador o SC BUCEFALO no coincide):")
for a in missing_in_verifier:
print(f" - {a['location_id']} {a['nombre']}")
else:
print("WARN no se cargo el verificador de sucursales. TIENDA/Sucursal se derivaran del nombre.")
brand_contacts = sync_engine.ghl_client.get_all_contacts(brand_account["token"], BRAND_LOCATION_ID)
brand_state = {
"account": brand_account,
"schema": brand_schema,
"schema_norm": brand_norm,
"schema_by_canonical": brand_by_canonical,
"contacts": brand_contacts,
"index": build_match_index(brand_contacts),
"lock": threading.Lock(),
}
print(f"Contactos de Marca cargados: {len(brand_contacts)} (tope {brand_max})")
total = {}
if args.workers > 1 and len(branch_accounts) > 1:
with ThreadPoolExecutor(max_workers=min(args.workers, len(branch_accounts))) as executor:
futures = {executor.submit(process_branch, account, brand_state, args): account for account in branch_accounts}
for future in as_completed(futures):
add_stats(total, future.result())
else:
for account in branch_accounts:
add_stats(total, process_branch(account, brand_state, args))
print("\n" + "=" * 78)
print(f"TOTAL: {json.dumps(total, ensure_ascii=False)}")
print("=" * 78)
print("HEALTH SUMMARY")
print(f" Sucursales procesadas: {len(branch_accounts)}")
if args.verifier_conflicts:
print(f" Conflictos en verificador: {len(args.verifier_conflicts)} location_id(s)")
missing = sum(1 for a in branch_accounts if a["location_id"] not in args.verifier_map)
if missing:
print(f" Sucursales con fallback (sin verificador): {missing}")
if total.get("ambiguous"):
print(f" Contactos no sincronizados por ambiguedad: {total['ambiguous']}")
if total.get("phone_collisions_unresolved"):
print(f" Colisiones telefono sin match por nombre (revision manual): {total['phone_collisions_unresolved']}")
if total.get("dropped_unmapped_fields"):
print(f" Campos custom descartados sin contraparte en Marca: {total['dropped_unmapped_fields']}")
if total.get("errors"):
print(f" Errores: {total['errors']}")
coverage_pct = 0
if total.get("contacts"):
synced = total.get("matched", 0) + total.get("planned_creates", 0) + total.get("created", 0)
coverage_pct = round(100.0 * synced / total["contacts"], 2)
print(f" Cobertura de sincronizacion: {coverage_pct}% ({total.get('matched', 0)} matched + {total.get('planned_creates', 0) + total.get('created', 0)} crear)")
if not args.apply:
print("\nDry-run terminado. Usa --apply para escribir cambios en GHL.")
if args.run_id:
final_status = "failed" if total.get("errors") else "success"
script_audit.update_run_status(args.run_id, final_status)
if total.get("errors"):
raise SystemExit(1)
if __name__ == "__main__":
main()