Files
MP-Manager/scripts/sync_missing_opps_to_brand.py
2026-05-30 14:31:19 -06:00

1062 lines
47 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""sync_missing_opps_to_brand.py
Sincroniza al CRM de Marca las oportunidades que existen en sucursal y NO tienen
replica en Marca (bucket "opportunities_in_branch_not_in_brand" del audit
audit_brand_vs_branches_totals).
Flujo por cada opp de sucursal sin replica en Marca:
1. Carga el contacto de sucursal asociado a la opp.
2. Busca contacto en Marca en este orden:
a) telefono normalizado (ultimos 10 digitos)
b) email (lowercase)
c) nombre completo (normalizado: sin acentos, lowercase, espacios colapsados)
3. Si no existe contacto en Marca, lo crea con todos los datos basicos y los
custom fields mapeados por nombre desde el schema de la sucursal al de Marca.
4. Espera 10 segundos si se creo contacto nuevo, por si hay una automatizacion
en Marca que dispara la creacion de la opp automaticamente.
5. Vuelve a buscar oportunidades del contacto en Marca (refresca via API GHL).
6. Si existe opp en Marca: la actualiza con los datos de la opp de sucursal
(custom fields mapeados por nombre).
7. Si no existe opp: crea opp en Marca con todos los datos de la sucursal.
Modos:
- dry-run (default): no escribe nada en GHL, solo planea.
- --apply: ejecuta las escrituras y registra cada cambio en script_audit
(rollback disponible desde el dashboard).
Uso CLI:
python scripts/sync_missing_opps_to_brand.py
python scripts/sync_missing_opps_to_brand.py --apply --yes
python scripts/sync_missing_opps_to_brand.py --apply --run-id <ID>
python scripts/sync_missing_opps_to_brand.py --only-opp <opp_id>
python scripts/sync_missing_opps_to_brand.py --json
Para uso programatico (desde el endpoint /api/comparativa/sync-missing-opps):
from scripts.sync_missing_opps_to_brand import run_sync
result = run_sync(dry_run=True)
"""
import argparse
import json
import os
import sqlite3
import sys
import time
import unicodedata
from datetime import datetime
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
import script_audit # noqa: E402
import sync_engine # noqa: E402
from audit_brand_vs_branches_totals import run_audit # noqa: E402
from common import ( # noqa: E402
match_contacts,
normalize_email,
normalize_phone,
normalize_text,
)
from paths import DB_PATH
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
MATCH_THRESHOLD = 0.80
SCRIPT_NAME = os.path.basename(__file__)
# Tiempo de espera tras crear contacto en Marca, por si automatizaciones de
# Marca crean la opp automaticamente. Configurable via env var.
CONTACT_TO_OPP_WAIT_SECS = int(os.getenv("SYNC_MISSING_OPPS_WAIT_SECS", "10"))
# ---------------------------------------------------------------------------
# Utilidades
# ---------------------------------------------------------------------------
def safe_print(*args, **kwargs):
sep = kwargs.get("sep", " ")
end = kwargs.get("end", "\n")
text = sep.join(str(a) for a in args)
encoding = sys.stdout.encoding or "utf-8"
try:
sys.stdout.write(text + end)
sys.stdout.flush()
except UnicodeEncodeError:
sys.stdout.write(text.encode(encoding, errors="replace").decode(encoding) + end)
sys.stdout.flush()
def strip_accents(value):
if not value:
return ""
nfkd = unicodedata.normalize("NFD", str(value))
return "".join(c for c in nfkd if unicodedata.category(c) != "Mn")
def normalize_name(first, last):
"""Nombre completo normalizado para matching: sin acentos, lowercase, espacios colapsados.
Delega en common.normalize_text para garantizar consistencia con el resto
de scripts cross-location.
"""
return normalize_text(f"{first or ''} {last or ''}")
def parse_custom_fields(raw):
if not raw:
return []
if isinstance(raw, list):
return raw
try:
v = json.loads(raw)
return v if isinstance(v, list) else []
except Exception:
return []
def get_cf_value(cf):
"""GHL retorna 'value' para contactos y 'fieldValue'/'fieldValueString' para opps."""
for k in ("value", "fieldValue", "fieldValueString"):
v = cf.get(k)
if v is not None and v != "":
return v
return None
def custom_fields_by_name(record_or_json, id_to_name):
"""Convierte la lista de CFs en {field_name: value}."""
out = {}
raw = record_or_json.get("custom_fields_json") if isinstance(record_or_json, dict) else None
if raw is None and isinstance(record_or_json, dict):
raw = record_or_json.get("customFields") or record_or_json.get("custom_fields")
cfs = parse_custom_fields(raw) if isinstance(raw, str) else (raw or [])
for cf in cfs:
fid = cf.get("id") or cf.get("fieldId")
fname = id_to_name.get(fid)
if not fname:
continue
val = get_cf_value(cf)
if val is not None and val != "":
out[fname] = val
return out
# ---------------------------------------------------------------------------
# Carga
# ---------------------------------------------------------------------------
def load_schemas_id_to_name(conn, location_id, object_key):
rows = conn.execute(
"SELECT field_id, field_name FROM object_schemas WHERE location_id=? AND object_key=?",
(location_id, object_key),
).fetchall()
return {r["field_id"]: r["field_name"] for r in rows}
def load_schemas_name_to_id(conn, location_id, object_key):
rows = conn.execute(
"SELECT field_id, field_name FROM object_schemas WHERE location_id=? AND object_key=?",
(location_id, object_key),
).fetchall()
# Si hay duplicados, conservamos el primero (igual logica que SchemaResolver.get_field_id).
out = {}
for r in rows:
out.setdefault(r["field_name"], r["field_id"])
return out
def load_brand_contacts(conn):
return [
dict(r) for r in conn.execute(
"SELECT id, first_name, last_name, phone, email FROM contacts WHERE location_id=?",
(BRAND_LOCATION_ID,),
).fetchall()
]
def load_brand_opps_by_contact(conn):
"""Devuelve {contact_id: [opp_dicts]} desde SQLite (snapshot).
Incluye `custom_fields_json` para que `_pick_existing` pueda matchear por el
custom field "ID Oportunidad Sucursal" sin tener que ir en vivo al CRM.
"""
from collections import defaultdict
out = defaultdict(list)
rows = conn.execute(
"SELECT id, contact_id, name, status, pipeline_id, pipeline_stage_id, "
"monetary_value, custom_fields_json "
"FROM opportunities WHERE location_id=?",
(BRAND_LOCATION_ID,),
).fetchall()
for r in rows:
out[r["contact_id"]].append(dict(r))
return out
def index_brand_contacts(contacts):
"""Devuelve indices por phone10, email_lc, name_norm.
Regla del index por nombre: solo se indexan contactos que NO tienen phone
NI email. Esto garantiza que un match por nombre conecte unicamente con un
candidato del mismo tipo (tambien sin identificadores fuertes). Si el
contacto del destino tiene phone o email, no entra al idx_name — entonces
nunca matchearia por nombre con un source.
"""
by_phone, by_email, by_name = {}, {}, {}
for c in contacts:
p = normalize_phone(c.get("phone"))
e = normalize_email(c.get("email"))
n = normalize_name(c.get("first_name"), c.get("last_name"))
if p:
by_phone.setdefault(p, c)
if e:
by_email.setdefault(e, c)
if n and not p and not e:
by_name.setdefault(n, c)
return by_phone, by_email, by_name
def load_branch_contact(conn, location_id, contact_id):
row = conn.execute(
"SELECT * FROM contacts WHERE location_id=? AND id=?",
(location_id, contact_id),
).fetchone()
return dict(row) if row else None
def load_branch_opp(conn, location_id, opp_id):
row = conn.execute(
"SELECT * FROM opportunities WHERE location_id=? AND id=?",
(location_id, opp_id),
).fetchone()
return dict(row) if row else None
# ---------------------------------------------------------------------------
# Upsert SQLite tras mutaciones GHL (mantener snapshot local sincronizado)
# ---------------------------------------------------------------------------
def _now_str():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def upsert_brand_contact_in_db(conn, brand_contact_id, payload, tags=None):
"""Wrapper retrocompatible: upsert en Marca."""
return upsert_contact_in_db(conn, brand_contact_id, payload, BRAND_LOCATION_ID, tags=tags)
def upsert_contact_in_db(conn, contact_id, payload, location_id, tags=None):
"""Inserta o reemplaza un contacto en SQLite local tras crearlo en GHL.
payload es el dict enviado a GHL en create_contact (camelCase). Custom fields
en el payload van como [{id, value}] — los almacenamos como JSON en
custom_fields_json igual que el sync original.
location_id define en qué location del snapshot lo guardamos (Marca o sucursal).
"""
custom_fields = payload.get("customFields") or []
tags_json = json.dumps(tags or payload.get("tags") or [], ensure_ascii=False)
cf_json = json.dumps(custom_fields, ensure_ascii=False)
conn.execute(
"""
INSERT OR REPLACE INTO contacts
(id, location_id, first_name, last_name, email, phone, tags, custom_fields_json, date_added, updated_at, synced_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
contact_id,
location_id,
payload.get("firstName"),
payload.get("lastName"),
payload.get("email"),
payload.get("phone"),
tags_json,
cf_json,
_now_str(),
_now_str(),
_now_str(),
),
)
conn.commit()
def upsert_brand_opp_in_db(conn, brand_opp_id, payload, contact_id, status_override=None):
"""Inserta o reemplaza una opp en SQLite local tras crearla/actualizarla en GHL.
payload contiene los campos enviados (puede ser PUT sin locationId/contactId/status,
o POST con todo). status_override aplica cuando se cambia el estado por endpoint
dedicado. contact_id se pasa aparte porque PUT no lo lleva en el body.
"""
custom_fields = payload.get("customFields") or []
cf_json = json.dumps(custom_fields, ensure_ascii=False)
# Para update: leemos el row actual y mergeamos solo los campos que el payload trae.
existing = conn.execute(
"SELECT * FROM opportunities WHERE location_id=? AND id=?",
(BRAND_LOCATION_ID, brand_opp_id),
).fetchone()
if existing:
existing = dict(existing)
merged = {
"id": brand_opp_id,
"location_id": BRAND_LOCATION_ID,
"name": payload.get("name") or existing.get("name"),
"status": status_override or payload.get("status") or existing.get("status"),
"pipeline_id": payload.get("pipelineId") or existing.get("pipeline_id"),
"pipeline_stage_id": payload.get("pipelineStageId") or existing.get("pipeline_stage_id"),
"monetary_value": payload.get("monetaryValue") if "monetaryValue" in payload else existing.get("monetary_value"),
"contact_id": contact_id or existing.get("contact_id"),
"date_added": existing.get("date_added"),
"custom_fields_json": cf_json if custom_fields else existing.get("custom_fields_json"),
}
else:
merged = {
"id": brand_opp_id,
"location_id": BRAND_LOCATION_ID,
"name": payload.get("name"),
"status": status_override or payload.get("status") or "open",
"pipeline_id": payload.get("pipelineId"),
"pipeline_stage_id": payload.get("pipelineStageId"),
"monetary_value": payload.get("monetaryValue"),
"contact_id": contact_id,
"date_added": _now_str(),
"custom_fields_json": cf_json,
}
conn.execute(
"""
INSERT OR REPLACE INTO opportunities
(id, location_id, name, status, pipeline_id, pipeline_stage_id, monetary_value, contact_id, date_added, custom_fields_json, synced_at)
VALUES (:id, :location_id, :name, :status, :pipeline_id, :pipeline_stage_id, :monetary_value, :contact_id, :date_added, :custom_fields_json, datetime('now', 'localtime'))
""",
merged,
)
conn.commit()
# ---------------------------------------------------------------------------
# Live GHL helpers
# ---------------------------------------------------------------------------
def fetch_brand_opps_for_contact_live(brand_token, contact_id):
"""Devuelve las opps Marca del contacto_id especificado.
BUG CRÍTICO descubierto: GHL ignora el filtro contactId en
POST /opportunities/search (devuelve todas las opps de la location).
Por eso filtramos en Python tras la búsqueda. Sin este filtro local,
el script pisaba opps incorrectas usando _pick_existing.
Estrategia: paginar todas las opps Marca y filtrar por contactId real.
Como Marca puede tener miles de opps, hacemos paginación normal y
early-exit en cuanto encontramos las que matchean.
"""
try:
matched = []
page = 1
while True:
body = {"locationId": BRAND_LOCATION_ID, "limit": 100, "page": page}
data = sync_engine.ghl_client._request(
"POST", "/opportunities/search", brand_token,
json=body,
)
batch = data.get("opportunities", []) if isinstance(data, dict) else []
if not batch:
break
# Filtrar localmente por contactId real.
for opp in batch:
if opp.get("contactId") == contact_id:
matched.append(opp)
if len(batch) < 100:
break
page += 1
# Hard cap defensivo: 100 páginas = 10k opps. Si Marca tiene más,
# estaríamos paginando demasiado; aceptamos el riesgo de no
# encontrar el match porque el contacto probablemente no tiene opps.
if page > 100:
break
return matched
except Exception:
return []
# ---------------------------------------------------------------------------
# Matching de contacto en Marca (telefono -> email -> nombre)
# ---------------------------------------------------------------------------
def find_brand_contact(branch_contact, idx_phone, idx_email, idx_name, threshold=MATCH_THRESHOLD):
"""Devuelve (match_dict, match_strategy, collision_candidate).
Estrategia (orden + condiciones):
1. Telefono normalizado (ultimos 10 digitos): ahora REQUIERE además que
el nombre matchee con similitud >= threshold (vía common.match_contacts).
Si el teléfono coincide pero el nombre diverge → colisión sin match.
Esto evita el caso real de pareja con mismo número siendo tratados
como duplicado y eliminados por la integración.
2. Email lowercase exacto (sin requerir nombre — email es identificador
más fuerte y la colisión es muy rara).
3. Nombre completo normalizado, PERO solo si:
a) el contacto source NO tiene phone NI email, y
b) el candidato del destino TAMPOCO tiene phone ni email
(garantizado por index_brand_contacts).
Returns:
tuple (match, strategy, collision):
match: dict del contacto que matchea, o None.
strategy: 'phone+name' | 'email' | 'name' | None.
collision: contacto con phone idéntico pero nombre divergente,
o None. Cuando esto es no-None, match es None — el
caller debe reportar la colisión y NO mergear.
"""
p = normalize_phone(branch_contact.get("phone"))
if p and p in idx_phone:
candidate = idx_phone[p]
result = match_contacts(branch_contact, candidate, threshold=threshold)
if result["level"] in ("strong", "medium"):
return candidate, "phone+name", None
# Mismo teléfono, distinto nombre → colisión sin resolver.
return None, None, candidate
e = normalize_email(branch_contact.get("email"))
if e and e in idx_email:
return idx_email[e], "email", None
if not p and not e:
n = normalize_name(branch_contact.get("first_name"), branch_contact.get("last_name"))
if n and n in idx_name:
return idx_name[n], "name", None
return None, None, None
# ---------------------------------------------------------------------------
# Construccion de payloads
# ---------------------------------------------------------------------------
def build_brand_contact_payload(branch_contact, branch_contact_schema_id_to_name, brand_contact_schema_name_to_id):
"""Wrapper retrocompatible: payload de contacto hacia Marca."""
return build_contact_payload(
branch_contact,
branch_contact_schema_id_to_name,
brand_contact_schema_name_to_id,
target_location_id=BRAND_LOCATION_ID,
)
def build_contact_payload(src_contact, src_schema_id_to_name, dst_schema_name_to_id, target_location_id):
"""Construye payload para POST /contacts/ con custom fields mapeados por nombre
desde el schema de origen al schema de destino.
Funciona en cualquier direccion (Sucursal->Marca o Marca->Sucursal): los CFs
se mapean por nombre comun y los que no existan en destino simplemente se
omiten.
"""
cf_by_name = custom_fields_by_name(src_contact, src_schema_id_to_name)
mapped_cfs = []
for fname, fval in cf_by_name.items():
target_id = dst_schema_name_to_id.get(fname)
if target_id:
mapped_cfs.append({"id": target_id, "value": fval})
payload = {
"locationId": target_location_id,
"firstName": src_contact.get("first_name") or "",
"lastName": src_contact.get("last_name") or "",
}
if src_contact.get("email"):
payload["email"] = src_contact["email"]
if src_contact.get("phone"):
payload["phone"] = src_contact["phone"]
if mapped_cfs:
payload["customFields"] = mapped_cfs
tags_raw = src_contact.get("tags")
if tags_raw:
try:
tags = json.loads(tags_raw) if isinstance(tags_raw, str) else tags_raw
if isinstance(tags, list) and tags:
payload["tags"] = tags
except Exception:
pass
if src_contact.get("source"):
payload["source"] = src_contact["source"]
return payload
def build_brand_opp_payload(branch_opp, brand_contact_id, branch_opp_schema_id_to_name, brand_opp_schema_name_to_id, brand_pipeline_id, brand_stage_id, *, for_update=False):
"""Construye payload para POST/PUT /opportunities/ en Marca.
GHL es estricto con los campos aceptados segun el verbo:
- POST /opportunities/ acepta locationId, status, contactId, name, ...
- PUT /opportunities/X rechaza locationId, contactId, status — el status
se cambia con PUT /opportunities/{id}/status (endpoint dedicado).
"""
cf_by_name = custom_fields_by_name(branch_opp, branch_opp_schema_id_to_name)
mapped_cfs = []
for fname, fval in cf_by_name.items():
target_id = brand_opp_schema_name_to_id.get(fname)
if target_id:
mapped_cfs.append({"id": target_id, "value": fval})
payload = {
"name": branch_opp.get("name") or "Sin nombre",
}
if not for_update:
# Solo POST acepta estos campos.
payload["locationId"] = BRAND_LOCATION_ID
payload["status"] = branch_opp.get("status") or "open"
payload["contactId"] = brand_contact_id
if brand_pipeline_id:
payload["pipelineId"] = brand_pipeline_id
if brand_stage_id:
payload["pipelineStageId"] = brand_stage_id
if branch_opp.get("monetary_value") is not None:
payload["monetaryValue"] = branch_opp["monetary_value"]
if mapped_cfs:
payload["customFields"] = mapped_cfs
return payload
# ---------------------------------------------------------------------------
# Resolucion de pipeline/stage en Marca
# ---------------------------------------------------------------------------
def resolve_brand_pipeline_and_stage(conn, branch_loc_id, branch_pipeline_id, branch_stage_id):
"""
Intenta hacer match por nombre (pipeline + stage) desde sucursal a Marca.
Devuelve (brand_pipeline_id, brand_stage_id) o (None, None).
"""
# Cargar pipelines de sucursal y Marca
branch_pipes = conn.execute(
"SELECT id, name, stages_json FROM pipelines WHERE location_id=?",
(branch_loc_id,),
).fetchall()
brand_pipes = conn.execute(
"SELECT id, name, stages_json FROM pipelines WHERE location_id=?",
(BRAND_LOCATION_ID,),
).fetchall()
branch_pipe = next((dict(p) for p in branch_pipes if p["id"] == branch_pipeline_id), None)
if not branch_pipe:
return None, None
branch_pipe_name = strip_accents(branch_pipe["name"] or "").lower().strip()
branch_stages = json.loads(branch_pipe["stages_json"] or "[]")
branch_stage = next((s for s in branch_stages if s.get("id") == branch_stage_id), None)
branch_stage_name = strip_accents(branch_stage.get("name") or "").lower().strip() if branch_stage else ""
# Buscar pipeline en Marca por nombre
target_brand_pipe = None
for bp in brand_pipes:
if strip_accents(bp["name"] or "").lower().strip() == branch_pipe_name:
target_brand_pipe = dict(bp)
break
if not target_brand_pipe:
# Fallback: tomar el primer pipeline de Marca si solo hay uno.
if len(brand_pipes) == 1:
target_brand_pipe = dict(brand_pipes[0])
else:
return None, None
brand_stages = json.loads(target_brand_pipe["stages_json"] or "[]")
target_brand_stage = None
if branch_stage_name:
for bs in brand_stages:
if strip_accents(bs.get("name") or "").lower().strip() == branch_stage_name:
target_brand_stage = bs
break
if not target_brand_stage and brand_stages:
# Fallback: primer stage del pipeline brand.
target_brand_stage = brand_stages[0]
return target_brand_pipe["id"], (target_brand_stage.get("id") if target_brand_stage else None)
# ---------------------------------------------------------------------------
# Procesamiento
# ---------------------------------------------------------------------------
def run_sync(opp_ids=None, dry_run=True, log=None, run_id=None):
"""Ejecuta la sincronizacion. Devuelve dict serializable.
Args:
opp_ids: lista opcional de opp_ids (sucursal) a procesar. None = todos los
del bucket missing_opps_in_brand.
dry_run: True (default) = no escribe en GHL.
log: funcion opcional log(line) para streaming.
run_id: id de script_audit para registrar cambios cuando apply.
"""
if log is None:
log = safe_print
if not os.path.exists(DB_PATH):
raise FileNotFoundError(f"No existe {DB_PATH}. Corre una sincronizacion global primero.")
log(f"[{datetime.now().strftime('%H:%M:%S')}] === sync_missing_opps_to_brand ===")
log(f"Modo: {'DRY-RUN (no escribe)' if dry_run else 'APPLY (escribe en GHL)'}")
log(f"Espera tras crear contacto: {CONTACT_TO_OPP_WAIT_SECS}s")
# 1) Recuperar el bucket desde el audit (sin limite).
log("Calculando bucket missing_opps_in_brand desde audit...")
audit_data = run_audit(limit_missing=None)
missing = audit_data["missing"]["opportunities_in_branch_not_in_brand"]
targets = missing["items"]
if opp_ids:
wanted = set(opp_ids)
targets = [t for t in targets if t["id"] in wanted]
log(f"Opps candidatas: {len(targets)} (total en bucket: {missing['total']})")
if not targets:
return {
"dry_run": dry_run,
"summary": {
"candidates": 0,
"contacts_created": 0,
"opps_created": 0,
"opps_updated": 0,
"skipped": 0,
"errors": 0,
},
"items": [],
}
# 2) Resolver tokens y schemas.
tokens = sync_engine.get_tokens_map()
brand_token = tokens.get(BRAND_LOCATION_ID)
if not brand_token:
raise RuntimeError("No se encontro token para la cuenta de Marca en el CSV de tokens.")
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
try:
brand_contact_schema_name_to_id = load_schemas_name_to_id(conn, BRAND_LOCATION_ID, "contact")
brand_opp_schema_name_to_id = load_schemas_name_to_id(conn, BRAND_LOCATION_ID, "opportunity")
if not brand_contact_schema_name_to_id:
log("ADVERTENCIA: schema de contactos de Marca vacio. Corre sync de metadata.")
if not brand_opp_schema_name_to_id:
log("ADVERTENCIA: schema de opps de Marca vacio. Corre sync de metadata.")
# 3) Cargar indice de contactos Marca.
brand_contacts = load_brand_contacts(conn)
idx_phone, idx_email, idx_name = index_brand_contacts(brand_contacts)
brand_opps_local_by_contact = load_brand_opps_by_contact(conn)
log(f"Indice Marca: {len(brand_contacts)} contactos, {sum(len(v) for v in brand_opps_local_by_contact.values())} opps cacheadas.")
# 4) Procesar cada opp.
results = []
summary = {
"candidates": len(targets),
"contacts_created": 0,
"opps_created": 0,
"opps_updated": 0,
"skipped": 0,
"errors": 0,
}
for idx, target in enumerate(targets, 1):
opp_id = target["id"]
branch_loc_id = target["branch_location_id"]
branch_name = target.get("branch_name", "")
log(f"\n[{idx}/{len(targets)}] Opp {opp_id} | sucursal: {branch_name}")
item = {
"opp_id": opp_id,
"branch_location_id": branch_loc_id,
"branch_name": branch_name,
"opp_name": target.get("name", ""),
"actions": [],
"status": "pending",
"error": None,
}
try:
# Cargar opp y contacto sucursal.
branch_opp = load_branch_opp(conn, branch_loc_id, opp_id)
if not branch_opp:
raise RuntimeError(f"No se encontro la opp {opp_id} en SQLite para {branch_loc_id}")
branch_contact_id = branch_opp.get("contact_id")
if not branch_contact_id:
raise RuntimeError(f"Opp {opp_id} no tiene contact_id en SQLite (huerfana)")
branch_contact = load_branch_contact(conn, branch_loc_id, branch_contact_id)
if not branch_contact:
raise RuntimeError(f"No se encontro el contacto {branch_contact_id} en SQLite")
item["branch_contact"] = {
"id": branch_contact["id"],
"name": f"{branch_contact.get('first_name') or ''} {branch_contact.get('last_name') or ''}".strip(),
"phone": branch_contact.get("phone"),
"email": branch_contact.get("email"),
}
branch_contact_schema_id_to_name = load_schemas_id_to_name(conn, branch_loc_id, "contact")
branch_opp_schema_id_to_name = load_schemas_id_to_name(conn, branch_loc_id, "opportunity")
# Buscar contacto en Marca.
match, strategy, collision = find_brand_contact(branch_contact, idx_phone, idx_email, idx_name)
brand_contact_id = match["id"] if match else None
contact_was_created = False
if collision and not match:
# Mismo teléfono que un contacto de Marca pero el nombre
# diverge. NO mergear: el contacto destino probablemente
# es una persona distinta (caso pareja con mismo número).
log(
f" COLISION TELEFONO sin match por nombre: "
f"brand_contact_id={collision.get('id')} "
f"({(collision.get('first_name') or '') + ' ' + (collision.get('last_name') or '')!r}). "
"Plan: crear contacto NUEVO en Marca."
)
item["actions"].append({
"action": "phone_collision_unresolved",
"colliding_brand_contact_id": collision.get("id"),
"colliding_brand_name": f"{collision.get('first_name') or ''} {collision.get('last_name') or ''}".strip(),
})
summary.setdefault("phone_collisions_unresolved", 0)
summary["phone_collisions_unresolved"] += 1
if run_id:
try:
script_audit.record_change(
run_id, BRAND_LOCATION_ID, "contact", branch_contact.get("id") or "",
"", "skipped_phone_collision", None,
{
"branch_contact_id": branch_contact.get("id"),
"branch_location_id": branch_loc_id,
"colliding_brand_contact_id": collision.get("id"),
"phone": branch_contact.get("phone"),
},
)
except Exception as audit_exc:
log(f" WARN: no se pudo registrar la colision en script_audit: {audit_exc}")
if match:
log(f" Contacto en Marca encontrado por {strategy}: {brand_contact_id}")
item["actions"].append({"action": "match_existing_contact", "strategy": strategy, "brand_contact_id": brand_contact_id})
else:
log(f" No hay contacto en Marca. Plan: CREAR con datos de sucursal.")
payload = build_brand_contact_payload(
branch_contact,
branch_contact_schema_id_to_name,
brand_contact_schema_name_to_id,
)
item["actions"].append({"action": "create_contact", "payload_preview": _preview_payload(payload)})
if not dry_run:
res = sync_engine.ghl_client.create_contact(brand_token, payload)
brand_contact_id = (res.get("contact") or {}).get("id") or res.get("id")
if not brand_contact_id:
raise RuntimeError(f"GHL no devolvio id de contacto creado. Respuesta: {res}")
contact_was_created = True
summary["contacts_created"] += 1
log(f" Contacto creado en Marca: {brand_contact_id}")
item["actions"][-1]["result"] = {"brand_contact_id": brand_contact_id}
# Replicar en SQLite local para mantener snapshot sincronizado.
try:
upsert_brand_contact_in_db(conn, brand_contact_id, payload)
except Exception as db_exc:
log(f" WARN: no se pudo upsert contacto en SQLite: {db_exc}")
# Indexar el nuevo contacto en memoria.
new_c = {
"id": brand_contact_id,
"first_name": branch_contact.get("first_name"),
"last_name": branch_contact.get("last_name"),
"phone": branch_contact.get("phone"),
"email": branch_contact.get("email"),
}
p = normalize_phone(new_c.get("phone"))
e = normalize_email(new_c.get("email"))
n = normalize_name(new_c.get("first_name"), new_c.get("last_name"))
if p: idx_phone.setdefault(p, new_c)
if e: idx_email.setdefault(e, new_c)
if n: idx_name.setdefault(n, new_c)
if run_id:
cid = script_audit.record_change(
run_id, BRAND_LOCATION_ID, "contact", brand_contact_id,
"", "created", None,
{"phone": new_c["phone"], "email": new_c["email"], "name": f"{new_c['first_name']} {new_c['last_name']}".strip()},
)
if cid:
script_audit.mark_change(cid, "applied")
else:
# dry-run: simular id para el resto del flujo
brand_contact_id = f"<NEW_CONTACT_FOR_{branch_contact_id}>"
summary["contacts_created"] += 1
# Espera de 10s si se creo contacto nuevo y estamos en apply.
if contact_was_created and not dry_run:
log(f" Esperando {CONTACT_TO_OPP_WAIT_SECS}s por automatizaciones de Marca...")
time.sleep(CONTACT_TO_OPP_WAIT_SECS)
# Buscar opp existente en Marca para este contacto.
#
# Estrategia:
# 1. Si el contacto YA existia en Marca (match en snapshot SQLite),
# buscar la opp en el snapshot — es la verdad cacheada y evita
# una llamada extra a GHL. Match por nombre exacto, sino la
# primera open, sino la primera.
# 2. Si no encontramos en snapshot Y estamos en apply, hacer live
# lookup como fallback (puede pasar si el snapshot esta stale).
# 3. Si el contacto se acaba de crear en este run, SIEMPRE live
# lookup tras los 10s (porque puede haberse creado por automatizacion).
existing_brand_opp = None
target_name = (branch_opp.get("name") or "").strip().lower()
branch_opp_id = branch_opp.get("id")
def _has_link_to_branch(bo):
"""True si esta opp de Marca tiene un custom field cuyo valor
== branch_opp_id. Cubre ambos formatos: snapshot SQLite
(custom_fields_json string) y live API (customFields list)."""
if not branch_opp_id:
return False
target = str(branch_opp_id)
# live API: customFields como lista de dicts
for cf in bo.get("customFields") or []:
v = cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString")
if v and str(v) == target:
return True
# snapshot SQLite: custom_fields_json como string JSON
raw = bo.get("custom_fields_json")
if raw:
try:
cfs = json.loads(raw) if isinstance(raw, str) else raw
except Exception:
cfs = None
if isinstance(cfs, list):
for cf in cfs:
v = cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString")
if v and str(v) == target:
return True
return False
def _pick_existing(opps_list):
if not opps_list:
return None
# Match PRINCIPAL (determinístico): opp de Marca cuyo CF
# "ID Oportunidad Sucursal" == id nativo de la opp de sucursal.
# Alinea esta selección con el match del workflow n8n y del
# audit de la Comparativa. Evita pisar la opp equivocada en
# casos multi-empeño.
for bo in opps_list:
if _has_link_to_branch(bo):
return bo
# Respaldo: por nombre exacto, luego primera open, luego primera.
for bo in opps_list:
if (bo.get("name") or "").strip().lower() == target_name:
return bo
opens = [bo for bo in opps_list if (bo.get("status") or "").lower() == "open"]
return opens[0] if opens else opps_list[0]
if not contact_was_created:
# Caso 1: contacto existente. Snapshot primero.
cached = brand_opps_local_by_contact.get(brand_contact_id, [])
existing_brand_opp = _pick_existing(cached)
if existing_brand_opp:
log(f" Match opp en snapshot SQLite: {existing_brand_opp.get('id')}")
elif not dry_run:
# Snapshot vacio: fallback a live (puede ser snapshot stale).
live_opps = fetch_brand_opps_for_contact_live(brand_token, brand_contact_id)
log(f" Snapshot vacio; live encontro {len(live_opps)} opps para este contacto.")
existing_brand_opp = _pick_existing(live_opps)
else:
# Caso 2: contacto recien creado. Solo live tras la espera.
if not dry_run:
live_opps = fetch_brand_opps_for_contact_live(brand_token, brand_contact_id)
log(f" Tras espera: live encontro {len(live_opps)} opps para el contacto nuevo.")
existing_brand_opp = _pick_existing(live_opps)
# En dry-run, no podemos saber lo que una automatizacion crearia.
# Resolver pipeline/stage en Marca.
brand_pipeline_id, brand_stage_id = resolve_brand_pipeline_and_stage(
conn, branch_loc_id, branch_opp.get("pipeline_id"), branch_opp.get("pipeline_stage_id"),
)
if existing_brand_opp:
log(f" Opp ya existe en Marca: {existing_brand_opp.get('id')} (status: {existing_brand_opp.get('status')}). Plan: ACTUALIZAR.")
payload = build_brand_opp_payload(
branch_opp, brand_contact_id,
branch_opp_schema_id_to_name, brand_opp_schema_name_to_id,
brand_pipeline_id, brand_stage_id,
for_update=True,
)
target_status = (branch_opp.get("status") or "open")
current_status = (existing_brand_opp.get("status") or "open")
status_changes = target_status != current_status
item["actions"].append({
"action": "update_opp",
"brand_opp_id": existing_brand_opp.get("id"),
"payload_preview": _preview_payload(payload),
"status_change": (current_status, target_status) if status_changes else None,
})
if not dry_run:
sync_engine.ghl_client.update_opportunity(brand_token, existing_brand_opp["id"], payload)
if status_changes:
try:
sync_engine.ghl_client.update_opportunity_status(brand_token, existing_brand_opp["id"], target_status)
log(f" Status actualizado: {current_status} -> {target_status}")
except Exception as se:
log(f" WARN: no se pudo actualizar status: {se}")
summary["opps_updated"] += 1
log(f" Opp actualizada en Marca: {existing_brand_opp['id']}")
# Replicar en SQLite local (camino rápido, con datos del payload).
try:
upsert_brand_opp_in_db(
conn, existing_brand_opp["id"], payload,
contact_id=brand_contact_id,
status_override=target_status if status_changes else None,
)
except Exception as db_exc:
log(f" WARN: no se pudo upsert opp en SQLite: {db_exc}")
# Refresh autoritativo: GET /opportunities/{id} y save_single
# garantiza que SQLite quede 1:1 con Bucéfalo (CFs, status real,
# cualquier campo derivado que GHL haya seteado tras el PUT).
ref = sync_engine.refresh_opportunity_in_db(
brand_token, existing_brand_opp["id"], BRAND_LOCATION_ID,
)
if not ref.get("ok"):
log(f" WARN: refresh_opportunity_in_db fallo: {ref.get('error')}")
summary.setdefault("local_refresh_errors", 0)
summary["local_refresh_errors"] += 1
if run_id:
cid = script_audit.record_change(
run_id, BRAND_LOCATION_ID, "opportunity", existing_brand_opp["id"],
"", "updated", existing_brand_opp, payload,
)
if cid:
script_audit.mark_change(cid, "applied")
else:
summary["opps_updated"] += 1
item["status"] = "updated"
else:
log(f" No hay opp en Marca para este contacto. Plan: CREAR.")
payload = build_brand_opp_payload(
branch_opp, brand_contact_id,
branch_opp_schema_id_to_name, brand_opp_schema_name_to_id,
brand_pipeline_id, brand_stage_id,
)
item["actions"].append({"action": "create_opp", "payload_preview": _preview_payload(payload)})
if not dry_run:
if not brand_pipeline_id or not brand_stage_id:
raise RuntimeError(
f"No se pudo resolver pipeline/stage en Marca para opp {opp_id} "
f"(branch pipeline {branch_opp.get('pipeline_id')})"
)
res = sync_engine.ghl_client.create_opportunity(brand_token, payload)
new_opp_id = (res.get("opportunity") or {}).get("id") or res.get("id")
if not new_opp_id:
raise RuntimeError(f"GHL no devolvio id de opp creada. Respuesta: {res}")
summary["opps_created"] += 1
log(f" Opp creada en Marca: {new_opp_id}")
item["actions"][-1]["result"] = {"brand_opp_id": new_opp_id}
# Replicar en SQLite local (camino rápido).
try:
upsert_brand_opp_in_db(conn, new_opp_id, payload, contact_id=brand_contact_id)
except Exception as db_exc:
log(f" WARN: no se pudo upsert opp en SQLite: {db_exc}")
# Refresh autoritativo desde Bucéfalo.
ref = sync_engine.refresh_opportunity_in_db(
brand_token, new_opp_id, BRAND_LOCATION_ID,
)
if not ref.get("ok"):
log(f" WARN: refresh_opportunity_in_db fallo: {ref.get('error')}")
summary.setdefault("local_refresh_errors", 0)
summary["local_refresh_errors"] += 1
if run_id:
cid = script_audit.record_change(
run_id, BRAND_LOCATION_ID, "opportunity", new_opp_id,
"", "created", None, payload,
)
if cid:
script_audit.mark_change(cid, "applied")
else:
summary["opps_created"] += 1
item["status"] = "created"
results.append(item)
except Exception as e:
summary["errors"] += 1
item["status"] = "error"
item["error"] = str(e)
log(f" ERROR: {e}")
results.append(item)
log(f"\n=== RESUMEN ===")
log(f" Candidatas : {summary['candidates']}")
log(f" Contactos {'a crear' if dry_run else 'creados'} : {summary['contacts_created']}")
log(f" Opps {'a crear' if dry_run else 'creadas'} : {summary['opps_created']}")
log(f" Opps {'a actualizar' if dry_run else 'actualizadas'}: {summary['opps_updated']}")
if summary.get("phone_collisions_unresolved"):
log(f" Colisiones de telefono sin match (revision manual): {summary['phone_collisions_unresolved']}")
log(f" Errores : {summary['errors']}")
return {
"dry_run": dry_run,
"summary": summary,
"items": results,
}
finally:
conn.close()
def _preview_payload(payload):
"""Resumen del payload para no inundar el output (los CFs pueden ser muchos)."""
cf_count = len(payload.get("customFields", []))
p = {k: v for k, v in payload.items() if k != "customFields"}
if cf_count:
p["customFields_count"] = cf_count
return p
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--apply", action="store_true", help="Ejecuta las escrituras en GHL. Por default es dry-run.")
parser.add_argument("--yes", action="store_true", help="Skip confirmacion interactiva (uso headless desde dashboard).")
parser.add_argument("--only-opp", action="append", default=[], help="Procesa solo el opp_id dado (puede repetirse).")
parser.add_argument("--run-id", type=str, default=None, help="Id de script_audit para registrar cambios. Solo aplica con --apply.")
parser.add_argument("--json", action="store_true", help="Imprime el resultado como JSON al final.")
args = parser.parse_args()
dry_run = not args.apply
if not dry_run and not args.yes:
# Confirmacion interactiva (solo CLI, no headless).
confirm = input("Esto escribira en GHL. Continuar? (y/N): ").strip().lower()
if confirm not in ("y", "yes", "s", "si", "sí"):
print("Cancelado.")
sys.exit(0)
try:
result = run_sync(
opp_ids=args.only_opp or None,
dry_run=dry_run,
log=safe_print,
run_id=args.run_id,
)
except FileNotFoundError as e:
safe_print(f"ERROR: {e}")
sys.exit(2)
except RuntimeError as e:
safe_print(f"ERROR: {e}")
sys.exit(3)
if args.json:
safe_print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
if __name__ == "__main__":
main()