Files
MP-Manager/scripts/backfill_opp_sucursal_link.py
2026-05-30 14:31:19 -06:00

706 lines
25 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""backfill_opp_sucursal_link.py
Backfill del custom field de oportunidad "ID Oportunidad Sucursal"
(fieldKey: opportunity.id_oportunidad_sucursal) en la cuenta de MARCA.
Cada opp de Marca debe llevar en este CF el id nativo de la opp de Sucursal de la
que proviene. Esto da idempotencia al workflow n8n de sincronizacion: futuras
ejecuciones del webhook hacen UPDATE en lugar de duplicar.
Estrategia (conservadora, 1-a-1 estricta):
1. Para cada opp Marca con el CF vacio o con valor invalido (len != 20 chars
alfanumericos), resolvemos su contacto Marca.
2. Buscamos el contacto sucursal correspondiente con `common.match_contacts`
(strong/medium: phone+nombre o phone+email+nombre). Tie-break por
dateAdded mas antiguo cuando varias sucursales matchean.
3. Cargamos las opps de esa sucursal asociadas al contacto match.
4. Empejaramos opps Marca <-> opps sucursal para el mismo contacto:
Pasada 1: clave (nombre normalizado, monto). Si la cardinalidad permite
emparejamiento 1-a-1 sin ambigüedad => MATCH.
Pasada 2: clave (nombre normalizado). Idem.
Cualquier ambiguedad => status "review_ambiguous" (no se toca).
5. Cuando M (Marca) != N (sucursal) opps por contacto => "review_count_mismatch".
Solo escribe en Marca. Las sucursales no se tocan. Idempotente: opps Marca con
CF valido se saltan; opps con valor invalido (len != 20) se reprocesan.
Modos:
- dry-run (default): no escribe nada en GHL.
- --apply --run-id <uuid>: aplica, registra cada cambio en script_audit.
Uso CLI:
python scripts/backfill_opp_sucursal_link.py
python scripts/backfill_opp_sucursal_link.py --apply --run-id <uuid>
python scripts/backfill_opp_sucursal_link.py --only-opp <marca_opp_id>
python scripts/backfill_opp_sucursal_link.py --export-unmatched
Uso programatico (desde main.py):
from scripts import backfill_opp_sucursal_link as bf
result = bf.run_match(opp_ids=[...], dry_run=False, run_id=...)
"""
import argparse
import csv
import datetime
import json
import os
import sqlite3
import sys
import uuid
import warnings
from collections import Counter, defaultdict
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
import sync_engine # noqa: E402
import script_audit # noqa: E402
from paths import DB_PATH, MIGRATIONS_DIR, EXPORTS_DIR # noqa: E402
from common import match_contacts, normalize_phone, normalize_email # noqa: E402
from audit_brand_vs_branches_totals import ( # noqa: E402
BRAND_LOCATION_ID,
OPP_ID_PATTERN,
OPP_LINK_FIELD_KEY,
resolve_opp_link_field_id,
extract_opp_link_value,
)
DEMO_LOCATION_IDS = {"Vf7qQl3L9vakJ8hDtQ8e", "Z64WQKORPVwXb5mn68Ef"}
CF_NAME = "id_oportunidad_sucursal"
gc = sync_engine.ghl_client
def safe_print(*args, **kwargs):
text = " ".join(str(a) for a in args)
try:
sys.stdout.write(text + "\n")
sys.stdout.flush()
except UnicodeEncodeError:
enc = sys.stdout.encoding or "utf-8"
sys.stdout.write(text.encode(enc, errors="replace").decode(enc) + "\n")
sys.stdout.flush()
def norm_name(s):
return " ".join(str(s or "").strip().lower().split())
def _money_key(value):
# Normaliza el monto para comparacion exacta: trata None y 0 igual y evita
# falsos negativos por float vs int (1000 vs 1000.0).
try:
return round(float(value or 0), 2)
except (TypeError, ValueError):
return 0.0
def row_to_contact(row):
"""SQLite row -> dict con shape compatible con match_contacts."""
return {
"id": row["id"],
"location_id": row["location_id"],
"phone": row["phone"],
"email": row["email"],
"firstName": row["first_name"],
"lastName": row["last_name"],
"dateAdded": row["date_added"],
}
def pick_oldest(candidates):
def ts(c):
d = c.get("dateAdded") or ""
try:
return datetime.datetime.fromisoformat(d.replace("Z", "+00:00")).timestamp()
except Exception:
return float("inf")
return min(candidates, key=ts)
def load_brand_contacts(conn):
return conn.execute(
"SELECT id, location_id, first_name, last_name, phone, email, date_added "
"FROM contacts WHERE location_id=?",
(BRAND_LOCATION_ID,),
).fetchall()
def load_branch_contacts(conn):
placeholders = ",".join("?" for _ in DEMO_LOCATION_IDS)
return conn.execute(
f"SELECT id, location_id, first_name, last_name, phone, email, date_added "
f"FROM contacts WHERE location_id != ? AND location_id NOT IN ({placeholders})",
(BRAND_LOCATION_ID, *DEMO_LOCATION_IDS),
).fetchall()
def load_brand_opps(conn):
return [
dict(r) for r in conn.execute(
"SELECT id, location_id, contact_id, status, name, pipeline_id, "
"monetary_value, custom_fields_json "
"FROM opportunities WHERE location_id=?",
(BRAND_LOCATION_ID,),
).fetchall()
]
def load_branch_opps(conn):
placeholders = ",".join("?" for _ in DEMO_LOCATION_IDS)
return [
dict(r) for r in conn.execute(
f"SELECT id, location_id, contact_id, status, name, pipeline_id, "
f"monetary_value, custom_fields_json "
f"FROM opportunities "
f"WHERE location_id != ? AND location_id NOT IN ({placeholders})",
(BRAND_LOCATION_ID, *DEMO_LOCATION_IDS),
).fetchall()
]
def index_branches(branch_contact_rows):
by_phone = defaultdict(list)
by_email = defaultdict(list)
for r in branch_contact_rows:
c = row_to_contact(r)
ph = normalize_phone(c.get("phone") or "")
em = normalize_email(c.get("email") or "")
if ph:
by_phone[ph].append(c)
if em:
by_email[em].append(c)
return by_phone, by_email
def classify_cf_value(value):
"""None/'' -> 'vacio'; len!=20 alphanum -> 'longitud_invalida'; ok -> None."""
if not value:
return "vacio"
if not OPP_ID_PATTERN.match(str(value)):
return "longitud_invalida"
return None
def match_branch_contact(brand_contact, by_phone, by_email):
"""Devuelve dict con status y, si aplica, branch contact dict."""
ph = normalize_phone(brand_contact.get("phone") or "")
em = normalize_email(brand_contact.get("email") or "")
if not ph and not em:
return {"status": "no_data"}
candidates = []
if ph:
candidates.extend(by_phone.get(ph, []))
if em:
for c in by_email.get(em, []):
if c not in candidates:
candidates.append(c)
if not candidates:
return {"status": "no_branch_contact"}
matches = []
collisions = []
for cand in candidates:
r = match_contacts(brand_contact, cand)
if r["level"] in ("strong", "medium"):
matches.append(cand)
elif "phone_collision_unresolved" in r["reasons"]:
collisions.append(cand)
if not matches:
if collisions:
return {"status": "phone_collision", "candidates": collisions}
return {"status": "no_branch_contact"}
ids = {m["id"] for m in matches}
chosen = matches[0] if len(ids) == 1 else pick_oldest(matches)
return {"status": "matched", "branch_contact": chosen, "all_matches": matches}
def try_pairing_1to1(brand_opps, branch_opps, key_fn):
"""Empareja brand_opps <-> branch_opps usando key_fn (1-a-1 estricto).
Retorna list de tuplas (brand_opp, branch_opp, key) si TODOS los brand_opps
se pueden emparejar 1-a-1 con un branch_opp sin ambigüedades. Si hay
duplicados de clave en cualquier lado o las cardinalidades no calzan, devuelve
None (= no se puede emparejar limpiamente con esta clave).
"""
if len(brand_opps) != len(branch_opps):
return None
brand_keys = [key_fn(b) for b in brand_opps]
branch_keys = [key_fn(m) for m in branch_opps]
if Counter(brand_keys) != Counter(branch_keys):
return None
if any(v > 1 for v in Counter(brand_keys).values()):
return None # ambigüedad: misma clave para varios elementos
branch_by_key = {key_fn(m): m for m in branch_opps}
return [(b, branch_by_key[key_fn(b)], key_fn(b)) for b in brand_opps]
def plan_for_contact_group(brand_opps_grp, branch_opps_grp, branch_location):
"""Construye plans para un grupo de opps del mismo contacto.
Devuelve list de dicts {marca_opp_id, plan: {status, ...}}.
"""
M = len(brand_opps_grp)
N = len(branch_opps_grp)
out = []
if N == 0:
for bo in brand_opps_grp:
out.append({"marca_opp_id": bo["id"], "plan": {
"status": "branch_contact_no_opps",
"branch_location": branch_location,
}})
return out
if M != N:
for bo in brand_opps_grp:
out.append({"marca_opp_id": bo["id"], "plan": {
"status": "review_count_mismatch",
"branch_location": branch_location,
"M": M, "N": N,
}})
return out
if M == 1 and N == 1:
out.append({"marca_opp_id": brand_opps_grp[0]["id"], "plan": {
"status": "match_unique",
"branch_opp_id": branch_opps_grp[0]["id"],
"branch_location": branch_location,
"matched_by": "single_opp",
}})
return out
# M == N > 1: pasada 1 nombre+monto, pasada 2 nombre.
paired = try_pairing_1to1(
brand_opps_grp, branch_opps_grp,
lambda o: (norm_name(o.get("name")), _money_key(o.get("monetary_value"))),
)
if paired is None:
paired = try_pairing_1to1(
brand_opps_grp, branch_opps_grp,
lambda o: norm_name(o.get("name")),
)
strategy = "name_only"
else:
strategy = "name_amount"
if paired is not None:
for bo, mo, key in paired:
out.append({"marca_opp_id": bo["id"], "plan": {
"status": "match_unique",
"branch_opp_id": mo["id"],
"branch_location": branch_location,
"matched_by": strategy,
}})
return out
for bo in brand_opps_grp:
out.append({"marca_opp_id": bo["id"], "plan": {
"status": "review_ambiguous",
"branch_location": branch_location,
"M": M, "N": N,
}})
return out
def build_plan(conn, opp_filter=None, log=None):
if log is None:
log = safe_print
brand_field_id = resolve_opp_link_field_id(conn, BRAND_LOCATION_ID)
if not brand_field_id:
raise SystemExit(
"No se encontro el field_id de 'ID Oportunidad Sucursal' en Marca. "
"Sincroniza el schema o crea el campo primero."
)
log(f"CF id_oportunidad_sucursal en Marca: field_id={brand_field_id}")
# 1. Identificar opps Marca a procesar.
all_brand_opps = load_brand_opps(conn)
targets = []
skipped_already_ok = 0
for o in all_brand_opps:
if opp_filter is not None and o["id"] not in opp_filter:
continue
cur = extract_opp_link_value(o.get("custom_fields_json"), brand_field_id)
reason = classify_cf_value(cur)
if reason is None:
skipped_already_ok += 1
continue
o["_current_cf"] = cur
o["_reason"] = reason
targets.append(o)
log(f"Opps Marca con CF a poblar: {len(targets)} (saltadas ya validas: {skipped_already_ok})")
if not targets:
return brand_field_id, [], 0
# 2. Indexar contactos.
brand_contacts_by_id = {r["id"]: row_to_contact(r) for r in load_brand_contacts(conn)}
branch_contact_rows = load_branch_contacts(conn)
by_phone, by_email = index_branches(branch_contact_rows)
# 3. Cargar opps de sucursal y agrupar por (location_id, contact_id).
branch_opps = load_branch_opps(conn)
branch_opps_by_loc_cid = defaultdict(list)
for o in branch_opps:
branch_opps_by_loc_cid[(o["location_id"], o["contact_id"])].append(o)
# 4. Agrupar opps Marca target por contact_id.
targets_by_brand_cid = defaultdict(list)
targets_orphan = [] # opps sin contact_id (extra paranoia)
for o in targets:
cid = o.get("contact_id")
if not cid:
targets_orphan.append(o)
else:
targets_by_brand_cid[cid].append(o)
plans = []
for o in targets_orphan:
plans.append({
"marca_opp_id": o["id"],
"marca_opp_name": o.get("name"),
"marca_opp_status": o.get("status"),
"marca_opp_amount": o.get("monetary_value"),
"marca_contact_id": None,
"current_value": o.get("_current_cf"),
"current_reason": o.get("_reason"),
"plan": {"status": "no_brand_contact"},
})
for brand_cid, brand_opps_grp in targets_by_brand_cid.items():
brand_contact = brand_contacts_by_id.get(brand_cid)
if not brand_contact:
for bo in brand_opps_grp:
plans.append({
"marca_opp_id": bo["id"],
"marca_opp_name": bo.get("name"),
"marca_opp_status": bo.get("status"),
"marca_opp_amount": bo.get("monetary_value"),
"marca_contact_id": brand_cid,
"current_value": bo.get("_current_cf"),
"current_reason": bo.get("_reason"),
"plan": {"status": "no_brand_contact"},
})
continue
m = match_branch_contact(brand_contact, by_phone, by_email)
if m["status"] != "matched":
for bo in brand_opps_grp:
plans.append({
"marca_opp_id": bo["id"],
"marca_opp_name": bo.get("name"),
"marca_opp_status": bo.get("status"),
"marca_opp_amount": bo.get("monetary_value"),
"marca_contact_id": brand_cid,
"current_value": bo.get("_current_cf"),
"current_reason": bo.get("_reason"),
"plan": {"status": m["status"]},
})
continue
branch_contact = m["branch_contact"]
branch_loc = branch_contact["location_id"]
branch_opps_grp = branch_opps_by_loc_cid.get((branch_loc, branch_contact["id"]), [])
contact_plans = plan_for_contact_group(brand_opps_grp, branch_opps_grp, branch_loc)
# Enriquecer cada plan con la info del brand opp.
by_brand_oid = {bo["id"]: bo for bo in brand_opps_grp}
for cp in contact_plans:
bo = by_brand_oid[cp["marca_opp_id"]]
cp.update({
"marca_opp_name": bo.get("name"),
"marca_opp_status": bo.get("status"),
"marca_opp_amount": bo.get("monetary_value"),
"marca_contact_id": brand_cid,
"branch_contact_id": branch_contact["id"],
"current_value": bo.get("_current_cf"),
"current_reason": bo.get("_reason"),
})
plans.append(cp)
return brand_field_id, plans, skipped_already_ok
def render_summary(plans, log):
counts = Counter(p["plan"]["status"] for p in plans)
log("\n=== Resumen del plan ===")
for status in ("match_unique", "review_ambiguous", "review_count_mismatch",
"branch_contact_no_opps", "no_branch_contact",
"phone_collision", "no_data", "no_brand_contact"):
if counts.get(status):
log(f" {status:28s}: {counts[status]}")
def render_examples(plans, log, n=4):
by_status = defaultdict(list)
for p in plans:
by_status[p["plan"]["status"]].append(p)
for status in ("match_unique", "review_ambiguous", "review_count_mismatch",
"branch_contact_no_opps", "no_branch_contact",
"phone_collision", "no_data", "no_brand_contact"):
items = by_status.get(status, [])
if not items:
continue
log(f"\n [{status}] ejemplos:")
for item in items[:n]:
name = item.get("marca_opp_name") or "(sin nombre)"
line = f" - opp={item['marca_opp_id']} name={name!r} status={item.get('marca_opp_status')}"
if status == "match_unique":
line += (f" -> branch_opp={item['plan']['branch_opp_id']} "
f"({item['plan']['branch_location']}, {item['plan']['matched_by']})")
elif "M" in item["plan"]:
line += f" M={item['plan']['M']} N={item['plan']['N']}"
log(line)
if len(items) > n:
log(f" ... y {len(items)-n} mas")
def snapshot_plans(plans, run_id, dry_run):
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
fname = f"backfill_brand_opp_link_{ts}.json"
path = os.path.join(MIGRATIONS_DIR, fname)
with open(path, "w", encoding="utf-8") as fh:
json.dump({
"run_id": run_id,
"dry_run": dry_run,
"timestamp_utc": datetime.datetime.now(datetime.timezone.utc).isoformat(),
"count": len(plans),
"plans": plans,
}, fh, ensure_ascii=False, indent=2, default=str)
return path
def export_unmatched_csv(plans, log):
os.makedirs(EXPORTS_DIR, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
path = os.path.join(EXPORTS_DIR, f"brand_opps_unmatched_{ts}.csv")
skipset = {"match_unique"}
with open(path, "w", encoding="utf-8", newline="") as fh:
w = csv.writer(fh)
w.writerow(["status", "marca_opp_id", "marca_opp_name", "marca_opp_status",
"marca_opp_amount", "marca_contact_id", "branch_location",
"M", "N", "current_value", "current_reason"])
for p in plans:
st = p["plan"]["status"]
if st in skipset:
continue
w.writerow([
st, p["marca_opp_id"], p.get("marca_opp_name") or "",
p.get("marca_opp_status") or "", p.get("marca_opp_amount") or "",
p.get("marca_contact_id") or "", p["plan"].get("branch_location") or "",
p["plan"].get("M") or "", p["plan"].get("N") or "",
p.get("current_value") or "", p.get("current_reason") or "",
])
log(f" Export CSV (unmatched/review): {path}")
return path
def apply_match(plan_item, brand_field_id, brand_token, run_id, log):
p = plan_item["plan"]
if p["status"] != "match_unique":
return {"applied": False, "reason": p["status"]}
marca_opp_id = plan_item["marca_opp_id"]
branch_opp_id = p["branch_opp_id"]
change_id = None
if run_id:
change_id = script_audit.record_change(
run_id, BRAND_LOCATION_ID, "opportunity",
marca_opp_id, brand_field_id, CF_NAME,
{"value": plan_item.get("current_value")}, {"value": branch_opp_id})
try:
gc.update_opportunity(brand_token, marca_opp_id, {
"customFields": [{
"id": brand_field_id,
"key": OPP_LINK_FIELD_KEY,
"field_value": branch_opp_id,
}]
})
if change_id:
script_audit.mark_change(change_id, "applied")
return {"applied": True}
except Exception as exc:
if change_id:
script_audit.mark_change(change_id, "failed", str(exc))
log(f" x Error en {marca_opp_id}: {exc}")
return {"applied": False, "error": str(exc)}
def run_match(*, location_ids=None, opp_ids=None, dry_run=True,
run_id=None, log=None, export_unmatched=False):
"""Matchea opps de Marca con opps de sucursal y rellena el CF.
Args:
location_ids: ignorado para back-compat. Defensa: siempre opera solo
sobre BRAND_LOCATION_ID. Si no es None y no contiene BRAND, se
levanta error.
opp_ids: iterable opcional de opp_ids (Marca) a procesar. None = todas
las opps Marca con CF vacio/invalido.
dry_run: True (default) = no escribe en GHL.
run_id: id para script_audit (rollback).
log: callable opcional log(line).
export_unmatched: si True, escribe un CSV con los review/unmatched.
Returns:
dict JSON-serializable con summary, items y errores.
"""
if log is None:
log = safe_print
# Defensa: este script SOLO opera sobre Marca.
if location_ids is not None:
bad = [l for l in location_ids if l != BRAND_LOCATION_ID]
if bad:
raise RuntimeError(
"backfill_opp_sucursal_link solo opera sobre Marca; "
f"recibido location_ids={location_ids}"
)
opp_filter = set(opp_ids) if opp_ids else None
accounts = sync_engine.parse_accounts_csv()
brand = next((a for a in accounts if a["location_id"] == BRAND_LOCATION_ID), None)
if not brand:
raise RuntimeError("Sin token de Marca en el CSV.")
brand_token = brand["token"]
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
brand_field_id, plans, skipped_already_ok = build_plan(
conn, opp_filter=opp_filter, log=log)
finally:
conn.close()
render_summary(plans, log)
render_examples(plans, log)
snap_path = snapshot_plans(plans, run_id, dry_run=dry_run)
log(f"\nSnapshot: {snap_path}")
if export_unmatched:
export_unmatched_csv(plans, log)
summary = {
"plans_total": len(plans),
"skipped_already_ok": skipped_already_ok,
"match_unique": sum(1 for p in plans if p["plan"]["status"] == "match_unique"),
"review_ambiguous": sum(1 for p in plans if p["plan"]["status"] == "review_ambiguous"),
"review_count_mismatch": sum(1 for p in plans if p["plan"]["status"] == "review_count_mismatch"),
"branch_contact_no_opps": sum(1 for p in plans if p["plan"]["status"] == "branch_contact_no_opps"),
"no_branch_contact": sum(1 for p in plans if p["plan"]["status"] == "no_branch_contact"),
"phone_collision": sum(1 for p in plans if p["plan"]["status"] == "phone_collision"),
"no_data": sum(1 for p in plans if p["plan"]["status"] == "no_data"),
"no_brand_contact": sum(1 for p in plans if p["plan"]["status"] == "no_brand_contact"),
"applied": 0,
"errors": 0,
}
errors_global = []
if dry_run:
log("\nDRY-RUN: no se escribio en GHL. Para aplicar: --apply --run-id <uuid>")
return {
"dry_run": True,
"summary": summary,
"items": plans,
"snapshot": snap_path,
"errors": errors_global,
"run_id": run_id,
}
matchables = [p for p in plans if p["plan"]["status"] == "match_unique"]
log(f"\nAplicando {len(matchables)} matches en Marca (run_id={run_id})...")
if run_id:
script_audit.create_run(
run_id, "backfill_opp_sucursal_link.py",
arguments=f"matches:{len(matchables)} apply",
locations=[BRAND_LOCATION_ID],
)
for item in plans:
if item["plan"]["status"] != "match_unique":
continue
if run_id and not script_audit.wait_if_paused_or_stopped(run_id):
log("Detencion solicitada. Saliendo.")
break
r = apply_match(item, brand_field_id, brand_token, run_id, log)
if r["applied"]:
summary["applied"] += 1
else:
summary["errors"] += 1
errors_global.append({
"marca_opp_id": item["marca_opp_id"],
"error": r.get("error") or r.get("reason"),
})
if run_id:
script_audit.update_run_status(
run_id,
"completed" if summary["errors"] == 0 else "failed",
f"errors={summary['errors']}" if summary["errors"] else None,
)
log(f"\nResumen: applied={summary['applied']} errors={summary['errors']} "
f"match_unique={summary['match_unique']} review={summary['review_ambiguous']+summary['review_count_mismatch']}")
return {
"dry_run": False,
"summary": summary,
"items": plans,
"snapshot": snap_path,
"errors": errors_global,
"run_id": run_id,
}
def main():
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
parser.add_argument("--apply", action="store_true", help="Aplica. Sin esto: dry-run.")
parser.add_argument("--run-id", help="ID para script_audit.")
parser.add_argument("--only-opp", help="Filtrar a un solo opp_id Marca.")
parser.add_argument("--export-unmatched", action="store_true",
help="Exporta CSV con los plans que no son match_unique.")
args = parser.parse_args()
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
run_id = args.run_id
if args.apply and not run_id:
run_id = str(uuid.uuid4())
safe_print(f"[info] run_id autogenerado: {run_id}")
opp_ids = [args.only_opp] if args.only_opp else None
run_match(
opp_ids=opp_ids,
dry_run=not args.apply,
run_id=run_id,
log=safe_print,
export_unmatched=args.export_unmatched,
)
if __name__ == "__main__":
main()