169 lines
6.4 KiB
Python
169 lines
6.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Backfill de la tabla de mapeo Baserow id_opp_sucursal -> id_opp_marca.
|
|
|
|
Contexto: el workflow n8n "Sincronizar Oportunidad - Nodos Nuevos"
|
|
(Cfgwp0bOtDW8zuKW) decide UPDATE-vs-CREATE buscando la opp existente en Marca
|
|
SOLO entre las opps del contacto resuelto ese run. Cuando la identidad del
|
|
contacto en Marca es ambigua (mismo telefono / nombre variante), resuelve un
|
|
contacto distinto al de la opp original, no la encuentra y hace CREATE ->
|
|
replica duplicada (causa del descuadre positivo Marca > sucursales).
|
|
|
|
El fix: una tabla Baserow que mapea de forma GLOBAL el id de la opp de sucursal
|
|
al id de la opp de Marca. El workflow consultara esa tabla antes de decidir; si
|
|
existe el mapeo -> UPDATE de esa opp (independiente del contacto). Este script
|
|
puebla la tabla con el estado ACTUAL: cada opp de Marca con "ID Oportunidad
|
|
Sucursal" valido => fila {id_opp_sucursal, id_opp_marca}.
|
|
|
|
Idempotente: upsert por id_opp_sucursal (crea si falta, actualiza si cambio el
|
|
id_opp_marca). Dry-run por defecto.
|
|
|
|
Uso:
|
|
python scripts/backfill_baserow_opp_mapping.py --table-id <ID> # dry-run
|
|
python scripts/backfill_baserow_opp_mapping.py --table-id <ID> --apply
|
|
python scripts/backfill_baserow_opp_mapping.py --table-id <ID> --json
|
|
"""
|
|
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
if SCRIPTS_DIR not in sys.path:
|
|
sys.path.insert(0, SCRIPTS_DIR)
|
|
|
|
from paths import DB_PATH # noqa: E402
|
|
from baserow_client import BaserowClient # noqa: E402
|
|
from audit_brand_vs_branches_totals import ( # noqa: E402
|
|
resolve_opp_link_field_id, extract_opp_link_value, OPP_ID_PATTERN, BRAND_LOCATION_ID,
|
|
)
|
|
|
|
# Nombres de columna esperados en la tabla de mapeo (user_field_names).
|
|
COL_SUC = "id_opp_sucursal"
|
|
COL_MARCA = "id_opp_marca"
|
|
COL_LOC = "location_id_sucursal"
|
|
COL_UPDATED = "updated_at"
|
|
|
|
|
|
def safe_print(*a):
|
|
text = " ".join(str(x) for x in a)
|
|
try:
|
|
sys.stdout.write(text + "\n"); sys.stdout.flush()
|
|
except UnicodeEncodeError:
|
|
enc = sys.stdout.encoding or "utf-8"
|
|
sys.stdout.write(text.encode(enc, errors="replace").decode(enc) + "\n"); sys.stdout.flush()
|
|
|
|
|
|
def load_brand_mappings():
|
|
"""Lee de SQLite las opps de Marca y devuelve [{id_opp_sucursal, id_opp_marca, location_id_sucursal}]."""
|
|
conn = sqlite3.connect(DB_PATH); conn.row_factory = sqlite3.Row
|
|
try:
|
|
fid = resolve_opp_link_field_id(conn, BRAND_LOCATION_ID)
|
|
# location de origen: la opp de sucursal con ese id nativo
|
|
branch_loc_by_oppid = {
|
|
r["id"]: r["location_id"]
|
|
for r in conn.execute(
|
|
"SELECT id, location_id FROM opportunities WHERE location_id != ?",
|
|
(BRAND_LOCATION_ID,),
|
|
)
|
|
}
|
|
out = []
|
|
for r in conn.execute(
|
|
"SELECT id, custom_fields_json FROM opportunities WHERE location_id = ?",
|
|
(BRAND_LOCATION_ID,),
|
|
):
|
|
v = extract_opp_link_value(r["custom_fields_json"], fid)
|
|
if v and OPP_ID_PATTERN.match(str(v)):
|
|
out.append({
|
|
COL_SUC: str(v),
|
|
COL_MARCA: r["id"],
|
|
COL_LOC: branch_loc_by_oppid.get(str(v), ""),
|
|
})
|
|
return out
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def run(table_id, apply=False, log=safe_print):
|
|
cli = BaserowClient.from_credentials()
|
|
mappings = load_brand_mappings()
|
|
log(f"Opps de Marca con link valido: {len(mappings)}")
|
|
|
|
existing = cli.list_rows(table_id)
|
|
by_suc = {}
|
|
dup_keys = 0
|
|
for row in existing:
|
|
k = str(row.get(COL_SUC) or "").strip()
|
|
if not k:
|
|
continue
|
|
if k in by_suc:
|
|
dup_keys += 1
|
|
by_suc.setdefault(k, row)
|
|
log(f"Filas existentes en la tabla {table_id}: {len(existing)} (claves duplicadas: {dup_keys})")
|
|
|
|
ts = datetime.datetime.now(datetime.timezone.utc).isoformat()
|
|
to_create, to_update, ok = [], [], 0
|
|
for m in mappings:
|
|
cur = by_suc.get(m[COL_SUC])
|
|
if cur is None:
|
|
to_create.append(m)
|
|
elif str(cur.get(COL_MARCA) or "") != m[COL_MARCA]:
|
|
to_update.append({"row_id": cur["id"], **m})
|
|
else:
|
|
ok += 1
|
|
|
|
log(f"Plan: crear={len(to_create)} actualizar={len(to_update)} ya_ok={ok}")
|
|
|
|
summary = {"total_mappings": len(mappings), "existing_rows": len(existing),
|
|
"to_create": len(to_create), "to_update": len(to_update),
|
|
"already_ok": ok, "created": 0, "updated": 0, "errors": 0,
|
|
"duplicate_keys_in_table": dup_keys}
|
|
|
|
if not apply:
|
|
log("DRY-RUN. Para aplicar: --apply")
|
|
return {"summary": summary, "to_create": to_create, "to_update": to_update}
|
|
|
|
# Backup de la tabla antes de mutar.
|
|
bpath = cli.backup_table(table_id, label="opp_mapping_prebackfill")
|
|
log(f"Backup: {bpath}")
|
|
|
|
for m in to_create:
|
|
try:
|
|
cli.create_row(table_id, {**m, COL_UPDATED: ts}, dry_run=False)
|
|
summary["created"] += 1
|
|
except Exception as e:
|
|
summary["errors"] += 1; log(f" ERROR create {m[COL_SUC]}: {e}")
|
|
for u in to_update:
|
|
rid = u.pop("row_id")
|
|
try:
|
|
cli.update_row(table_id, rid, {**u, COL_UPDATED: ts}, dry_run=False)
|
|
summary["updated"] += 1
|
|
except Exception as e:
|
|
summary["errors"] += 1; log(f" ERROR update {u[COL_SUC]}: {e}")
|
|
|
|
log(f"Resultado: created={summary['created']} updated={summary['updated']} errors={summary['errors']}")
|
|
return {"summary": summary}
|
|
|
|
|
|
def main():
|
|
p = argparse.ArgumentParser(description="Backfill tabla Baserow de mapeo opp sucursal->marca.")
|
|
p.add_argument("--table-id", type=int, required=True, help="ID de la tabla de mapeo en Baserow.")
|
|
p.add_argument("--apply", action="store_true", help="Aplica (sin esto, dry-run).")
|
|
p.add_argument("--json", action="store_true", help="Salida JSON.")
|
|
args = p.parse_args()
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
res = run(args.table_id, apply=args.apply, log=(lambda *a: None) if args.json else safe_print)
|
|
if args.json:
|
|
print(json.dumps(res, ensure_ascii=False, indent=2, default=str))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|