Files
2026-05-30 14:31:19 -06:00

752 lines
30 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Test harness E2E para los workflows n8n de sincronización de contactos.
Cada escenario:
1) Setup: crea contactos test en GHL (sucursal PINOTEPA + Marca) con tag
`revisar` y nombre `Test <scenario>_<ts>`. Registra todos los ids creados.
2) Disparo: POST al webhook real del workflow correspondiente.
3) Espera (GHL no es instantáneo).
4) Asserts: verifica intra_brand_duplicates, conteos, CF en ambos lados.
5) Cleanup: borra todos los contactos registrados (Marca + sucursal).
Uso:
python scripts/n8n_e2e_test.py --scenario 1.1
python scripts/n8n_e2e_test.py --scenario all-phase1
python scripts/n8n_e2e_test.py --scenario all
"""
import argparse
import datetime
import json
import os
import sys
import time
import traceback
import urllib.error
import urllib.request
import warnings
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
if SCRIPTS_DIR not in sys.path:
sys.path.insert(0, SCRIPTS_DIR)
import sync_engine # noqa: E402
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
PINOTEPA_LOCATION_ID = "7H91g95hhLKwIUqSk0Rg"
CF_ID_BRAND = "E6lI9ykWhqpj7Pmi7Qd3" # field_id de id_contacto_sucursal en Marca
CF_KEY = "contact.id_contacto_sucursal"
WORKFLOW_SUC_TO_MARCA = "x4DqZ5FtSc43tdzB"
WORKFLOW_MARCA_TO_SUC = "4UMRwxJdHFfOGHBp"
WEBHOOK_URL_SUC_TO_MARCA = "https://workflows.consultoriae3.com/webhook/56c4c1a9-0271-48e5-8915-e25e18666dce"
WEBHOOK_URL_MARCA_TO_SUC = "https://workflows.consultoriae3.com/webhook/8d16dc55-616a-4532-8334-8ac81e7d3ae1"
WAIT_AFTER_WEBHOOK = 12 # segundos. GHL + n8n suman latencia.
ASSERT_RETRY_TIMEOUT = 25
ASSERT_RETRY_INTERVAL = 3
gc = sync_engine.ghl_client
def log(msg):
ts = datetime.datetime.now().strftime("%H:%M:%S")
line = f"[{ts}] {msg}"
try:
sys.stdout.write(line + "\n")
sys.stdout.flush()
except UnicodeEncodeError:
enc = sys.stdout.encoding or "utf-8"
sys.stdout.write(line.encode(enc, errors="replace").decode(enc) + "\n")
sys.stdout.flush()
# ---------------------------------------------------------------------------
# Tokens
# ---------------------------------------------------------------------------
_accounts_cache = None
def get_token(location_id):
global _accounts_cache
if _accounts_cache is None:
_accounts_cache = {a["location_id"]: a for a in sync_engine.parse_accounts_csv()}
acc = _accounts_cache.get(location_id)
if not acc:
raise RuntimeError(f"No hay token para location {location_id}")
return acc["token"]
def resolve_cf_id_in_location(location_id, field_key=CF_KEY):
"""Devuelve field_id del CF en cualquier location (varía por sucursal)."""
token = get_token(location_id)
data = gc._request("GET", f"/locations/{location_id}/customFields", token,
params={"model": "contact"})
for cf in (data.get("customFields") or []):
if cf.get("fieldKey") == field_key:
return cf.get("id")
return None
# ---------------------------------------------------------------------------
# GHL helpers
# ---------------------------------------------------------------------------
def create_contact(location_id, payload):
"""POST /contacts. payload es el body. Devuelve contact_id."""
token = get_token(location_id)
full = {"locationId": location_id, **payload}
res = gc._request("POST", "/contacts/", token, json=full)
cid = (res.get("contact") or {}).get("id") or res.get("id")
if not cid:
raise RuntimeError(f"Crear contacto sin id: {res}")
return cid
def update_contact(location_id, contact_id, payload):
token = get_token(location_id)
return gc._request("PUT", f"/contacts/{contact_id}", token, json=payload)
def delete_contact_safe(location_id, contact_id):
"""DELETE con 404 silencioso."""
if not contact_id:
return False
token = get_token(location_id)
try:
gc.delete_contact(token, contact_id, location_id)
return True
except Exception as e:
msg = str(e)
if "404" in msg or "not found" in msg.lower():
return False
raise
def get_contact(location_id, contact_id):
token = get_token(location_id)
return gc._request("GET", f"/contacts/{contact_id}", token).get("contact")
def cf_value_of(contact, field_id):
for cf in (contact or {}).get("customFields") or []:
if cf.get("id") == field_id or cf.get("fieldId") == field_id:
return cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString")
return None
# ---------------------------------------------------------------------------
# Webhook
# ---------------------------------------------------------------------------
def fire_webhook(webhook_url, payload):
"""POST al webhook URL. Retorna (status, response_body_text)."""
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(webhook_url, method="POST", data=data,
headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=30) as r:
return r.status, r.read().decode("utf-8", errors="replace")[:500]
except urllib.error.HTTPError as e:
return e.code, e.read().decode("utf-8", errors="replace")[:500]
def build_webhook_payload_suc_to_marca(branch_contact, sucursal_name="PINOTEPA"):
"""Imita el body plano que GHL envía cuando dispara un webhook al crear contacto.
n8n envolverá automáticamente esto en `$json.body` cuando lo reciba."""
full_name = f"{branch_contact.get('firstName') or ''} {branch_contact.get('lastName') or ''}".strip()
return {
"contact_id": branch_contact["id"],
"first_name": branch_contact.get("firstName") or "",
"last_name": branch_contact.get("lastName") or "",
"full_name": full_name,
"email": branch_contact.get("email") or "",
"phone": branch_contact.get("phone") or "",
"country": branch_contact.get("country") or "MX",
"date_created": branch_contact.get("dateAdded") or "",
"tags": ",".join(branch_contact.get("tags") or []),
"Sucursal": sucursal_name,
"TIENDA": sucursal_name,
"ID Contacto Sucursal": branch_contact["id"],
"Fuente de Posible cliente": "test_harness",
"Marca del Vehiculo": "",
"Version del Vehiculo": "",
"Año del Vehículo": "",
"¿Qué modalidad prefieres?": "",
"Acepta los terminos para tu cotización": "",
"Archivos Adicionales": "",
"CANAL DE ORIGEN": "test",
"Check Comunicaciones de Marketing": "",
"Descripción": "",
"Fuente de Prospecto": "test",
"ID Contacto Monte Providencia": "",
"Información Adicional": "",
"Presupuesto": "",
"[Correo_Canalizados]": "",
"[Correo_Empresa]": "",
"[Dirección de la Empresa]": "",
"[Nombre de la Empresa]": "",
"[Número de Teléfono para Atender]": "",
"[Número de WhatsApp para Atender]": "",
"¿Cuándo necesitas el dinero?": "",
"contact_type": "lead",
"customData": {},
"full_address": "",
"location": {"id": PINOTEPA_LOCATION_ID, "name": "85957 - MP - PINOTEPA"},
"triggerData": {},
"workflow": {"id": WORKFLOW_SUC_TO_MARCA, "name": "test_harness"},
}
def build_webhook_payload_marca_to_suc(brand_contact, sucursal_name="PINOTEPA"):
"""Para el otro sentido. Payload plano (n8n envuelve)."""
full_name = f"{brand_contact.get('firstName') or ''} {brand_contact.get('lastName') or ''}".strip()
return {
"contact_id": brand_contact["id"],
"first_name": brand_contact.get("firstName") or "",
"last_name": brand_contact.get("lastName") or "",
"full_name": full_name,
"email": brand_contact.get("email") or "",
"phone": brand_contact.get("phone") or "",
"country": brand_contact.get("country") or "MX",
"date_created": brand_contact.get("dateAdded") or "",
"tags": ",".join(brand_contact.get("tags") or []),
"Sucursal": sucursal_name,
"TIENDA": sucursal_name,
"Fuente de Posible cliente": "test_harness",
"customData": {},
"location": {"id": BRAND_LOCATION_ID, "name": "Monte Providencia"},
"workflow": {"id": WORKFLOW_MARCA_TO_SUC, "name": "test_harness"},
}
# ---------------------------------------------------------------------------
# Cleanup tracker
# ---------------------------------------------------------------------------
class Cleanup:
def __init__(self):
self.contacts = [] # list of (location_id, contact_id, note)
def register(self, location_id, contact_id, note=""):
if contact_id:
self.contacts.append((location_id, contact_id, note))
def run(self):
log(f"[cleanup] borrando {len(self.contacts)} contactos test...")
ok, fail = 0, 0
# Borramos en orden inverso por si el delete en cascade de un nodo padre
# invalida otros (no debería pasar, pero defensivo).
for loc, cid, note in reversed(self.contacts):
try:
if delete_contact_safe(loc, cid):
ok += 1
log(f" [del] {loc}/{cid} ({note})")
else:
log(f" [skip-404] {loc}/{cid} ({note})")
except Exception as e:
fail += 1
log(f" [FAIL] {loc}/{cid}: {e}")
log(f"[cleanup] {ok} borrados, {fail} errores.")
return fail == 0
# ---------------------------------------------------------------------------
# Test scenarios
# ---------------------------------------------------------------------------
def _ts_suffix():
return datetime.datetime.now().strftime("%y%m%d_%H%M%S")
def _new_contact_payload(scenario_id, *, phone=None, email=None, cf_value=None, branch_cf_id=None):
"""Body base para un contacto test."""
ts = _ts_suffix()
body = {
"firstName": f"Test{scenario_id.replace('.', '')}",
"lastName": f"Harness_{ts}",
"tags": ["revisar", "qa-test"],
}
if phone:
body["phone"] = phone
if email:
body["email"] = email
if cf_value and branch_cf_id:
body["customFields"] = [{"id": branch_cf_id, "key": CF_KEY, "field_value": cf_value}]
return body
def assert_(condition, msg):
if not condition:
raise AssertionError(msg)
def assert_eventually(predicate, msg, timeout=None, interval=None):
"""Asserts que un predicate eventualmente devuelve truthy. Si no, lanza
AssertionError tras `timeout` segundos."""
import time as _t
timeout = timeout or ASSERT_RETRY_TIMEOUT
interval = interval or ASSERT_RETRY_INTERVAL
start = _t.time()
last_result = None
while _t.time() - start < timeout:
last_result = predicate()
if last_result:
return last_result
_t.sleep(interval)
raise AssertionError(f"{msg} (last={last_result!r})")
def _find_brand_with_cf(value):
"""Busca contacto en Marca por el CF id_contacto_sucursal=value."""
token = get_token(BRAND_LOCATION_ID)
try:
r = gc._request("POST", "/contacts/search", token, json={
"locationId": BRAND_LOCATION_ID,
"pageLimit": 5,
"filters": [{"group": "AND", "filters": [
{"field": f"customFields.{CF_ID_BRAND}", "operator": "eq", "value": value}
]}]
})
return r.get("contacts") or []
except Exception as e:
log(f" [warn] search por CF falló: {e}")
return []
def wait_for_brand_cf_indexed(branch_id, expected_count=1, timeout=25, interval=2):
"""GHL puede tardar varios segundos en indexar un nuevo CF en el filter search.
Hace polling hasta que el search por CF devuelve `expected_count` matches.
Retorna True si converge, False si timeout."""
import time as _t
start = _t.time()
while _t.time() - start < timeout:
n = len(_find_brand_with_cf(branch_id))
if n >= expected_count:
return True
_t.sleep(interval)
return False
def wait_for_brand_query_indexed(query, expected_count=1, timeout=25, interval=2):
"""Polling por search query (que el workflow usa para phone/email/nombre)."""
import time as _t
token = get_token(BRAND_LOCATION_ID)
start = _t.time()
while _t.time() - start < timeout:
try:
r = gc._request("POST", "/contacts/search", token, json={
"locationId": BRAND_LOCATION_ID, "pageLimit": 5, "query": query
})
n = len(r.get("contacts") or [])
if n >= expected_count:
return True
except Exception:
pass
_t.sleep(interval)
return False
# ---- Setup helpers ----
def setup_branch_contact(scenario_id, *, phone=None, email=None, with_cf=True, location_id=PINOTEPA_LOCATION_ID):
"""Crea un contacto en sucursal con CF poblado (si with_cf=True)."""
branch_cf_id = resolve_cf_id_in_location(location_id) if with_cf else None
payload = _new_contact_payload(scenario_id, phone=phone, email=email)
cid = create_contact(location_id, payload)
if with_cf and branch_cf_id:
# Poblar el CF con su propio id (autoreferencia, igual que el fill_*)
update_contact(location_id, cid, {
"customFields": [{"id": branch_cf_id, "key": CF_KEY, "field_value": cid}]
})
return cid
def setup_brand_contact_with_cf(scenario_id, branch_contact_id, *, phone=None, email=None):
"""Crea contacto en Marca con CF apuntando a branch_contact_id."""
payload = _new_contact_payload(scenario_id, phone=phone, email=email,
cf_value=branch_contact_id, branch_cf_id=CF_ID_BRAND)
return create_contact(BRAND_LOCATION_ID, payload)
# ---- Escenarios Fase 1 (Sucursal → Marca) ----
def scenario_1_1(cleanup):
"""Happy: sucursal con CF + Marca con CF apuntando. Esperado: match por CF, UPDATE."""
log("== Scenario 1.1: happy path con CF ==")
branch_id = setup_branch_contact("1.1", phone="+5219991110011", email="test11@e3.local")
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.1 branch")
log(f" branch contact: {branch_id}")
brand_id = setup_brand_contact_with_cf("1.1", branch_id, phone="+5219991110011", email="test11@e3.local")
cleanup.register(BRAND_LOCATION_ID, brand_id, "1.1 brand")
log(f" brand contact: {brand_id} (CF -> {branch_id})")
log(f" esperando indexación del CF en Marca...")
assert_(wait_for_brand_cf_indexed(branch_id), "Brand CF no se indexó a tiempo")
# Disparar webhook
branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
payload = build_webhook_payload_suc_to_marca(branch_obj)
s, body = fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, payload)
log(f" webhook status: {s} body: {body[:100]}")
time.sleep(WAIT_AFTER_WEBHOOK)
# Verificar: que NO se creó un segundo contacto en Marca con CF=branch_id
def check_unique():
matches = _find_brand_with_cf(branch_id)
return matches if len(matches) == 1 and matches[0]["id"] == brand_id else None
assert_eventually(check_unique, "esperaba 1 match por CF apuntando al brand original")
log(" [OK] match único por CF; no duplicado.")
def scenario_1_2(cleanup):
"""Bug original: sucursal SIN phone ni email, Marca con CF correcto."""
log("== Scenario 1.2: phone+email vacíos (bug original) ==")
branch_id = setup_branch_contact("1.2", phone=None, email=None)
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.2 branch")
brand_id = setup_brand_contact_with_cf("1.2", branch_id, phone=None, email=None)
cleanup.register(BRAND_LOCATION_ID, brand_id, "1.2 brand")
log(f" esperando indexación CF...")
assert_(wait_for_brand_cf_indexed(branch_id), "Brand CF no se indexó a tiempo")
branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
payload = build_webhook_payload_suc_to_marca(branch_obj)
fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, payload)
time.sleep(WAIT_AFTER_WEBHOOK)
assert_eventually(lambda: len(_find_brand_with_cf(branch_id)) == 1,
"esperaba 1 match (sin duplicado)")
log(" [OK] sin duplicado.")
def scenario_1_3(cleanup):
"""CF vacío en sucursal pero phone presente → cascada phone."""
log("== Scenario 1.3: CF vacío en sucursal, fallback a phone ==")
branch_id = setup_branch_contact("1.3", phone="+5219991110013", email="test13@e3.local", with_cf=False)
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.3 branch")
# Marca SIN CF pero con mismo phone — debe matchear por cascada
payload = _new_contact_payload("1.3", phone="+5219991110013", email="test13@e3.local")
brand_id = create_contact(BRAND_LOCATION_ID, payload)
cleanup.register(BRAND_LOCATION_ID, brand_id, "1.3 brand")
log(f" esperando indexación de phone en Marca...")
assert_(wait_for_brand_query_indexed("+5219991110013"), "phone no indexado a tiempo")
branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
p = build_webhook_payload_suc_to_marca(branch_obj)
fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, p)
time.sleep(WAIT_AFTER_WEBHOOK)
# Tras la corrida con Fase 1, esperaríamos que el workflow haya poblado el CF
# en el Marca existente con el branch_id (vía la inyección defensive).
upd = get_contact(BRAND_LOCATION_ID, brand_id)
cf_now = cf_value_of(upd, CF_ID_BRAND)
if cf_now == branch_id:
log(f" [OK] cascada phone funcionó + CF inyectado = {cf_now}")
else:
log(f" [WARN] CF post-update = {cf_now!r} (esperaba {branch_id}). Cascada quizá creó nuevo.")
# Si creó duplicado: el find por CF debería seguir vacío
matches = _find_brand_with_cf(branch_id)
if matches:
cleanup.register(BRAND_LOCATION_ID, matches[0]["id"], "1.3 brand (creado por workflow)")
assert_(False, "CF no quedó poblado tras cascada")
def scenario_1_4(cleanup):
"""Contacto nuevo en sucursal, no existe en Marca → crea + CF poblado."""
log("== Scenario 1.4: nuevo, crea en Marca ==")
branch_id = setup_branch_contact("1.4", phone="+5219991110014", email="test14@e3.local")
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.4 branch")
branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
p = build_webhook_payload_suc_to_marca(branch_obj)
fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, p)
time.sleep(WAIT_AFTER_WEBHOOK)
matches = assert_eventually(
lambda: _find_brand_with_cf(branch_id) if len(_find_brand_with_cf(branch_id)) == 1 else None,
"esperaba que se creara 1 contacto en Marca con CF"
)
brand_new = matches[0]
cleanup.register(BRAND_LOCATION_ID, brand_new["id"], "1.4 brand (creado por workflow)")
log(f" [OK] creado en Marca: {brand_new['id']} CF={branch_id}")
def scenario_1_5(cleanup):
"""Sucursal con CF + 2 contactos Marca con mismo phone. CF gana."""
log("== Scenario 1.5: CF gana sobre multi-phone ==")
branch_id = setup_branch_contact("1.5", phone="+5219991110015", email="test15a@e3.local")
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.5 branch")
# 2 Marca con mismo phone, solo 1 con CF correcto
brand_correct = setup_brand_contact_with_cf("1.5a", branch_id, phone="+5219991110015", email="test15a@e3.local")
cleanup.register(BRAND_LOCATION_ID, brand_correct, "1.5 brand correct")
log(f" esperando indexación CF...")
assert_(wait_for_brand_cf_indexed(branch_id), "Brand CF no se indexó a tiempo")
payload_decoy = _new_contact_payload("1.5b", phone="+5219991110015", email="test15b@e3.local")
# Sin CF
try:
brand_decoy = create_contact(BRAND_LOCATION_ID, payload_decoy)
cleanup.register(BRAND_LOCATION_ID, brand_decoy, "1.5 brand decoy")
except Exception as e:
# GHL puede rechazar duplicate phone sin allowDuplicateContact; en ese caso saltamos el caso
log(f" [skip] GHL rechazó duplicate phone: {e}")
return
branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
p = build_webhook_payload_suc_to_marca(branch_obj)
fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, p)
time.sleep(WAIT_AFTER_WEBHOOK)
def check_correct():
m = _find_brand_with_cf(branch_id)
return m if len(m) == 1 and m[0]["id"] == brand_correct else None
assert_eventually(check_correct, "CF debería apuntar al brand_correct (no al decoy)")
log(" [OK] CF prevaleció sobre cascada phone.")
# ---- Escenarios Fase 2 (Marca → Sucursal) ----
def scenario_2_1(cleanup):
log("== Scenario 2.1: Marca con CF, sucursal existing ==")
branch_id = setup_branch_contact("2.1", phone="+5219991110021", email="test21@e3.local")
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.1 branch")
brand_id = setup_brand_contact_with_cf("2.1", branch_id, phone="+5219991110021", email="test21@e3.local")
cleanup.register(BRAND_LOCATION_ID, brand_id, "2.1 brand")
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
p = build_webhook_payload_marca_to_suc(brand_obj)
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
time.sleep(WAIT_AFTER_WEBHOOK)
# Verificar GET directo + que no se creó otro en sucursal (audit por CF en Marca).
suc_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
assert_(suc_obj and suc_obj["id"] == branch_id, "contacto sucursal preexistente desapareció")
assert_eventually(lambda: len(_find_brand_with_cf(branch_id)) == 1,
"esperaba 1 marca con CF apuntando al sucursal")
log(" [OK] sin duplicado en sucursal.")
def scenario_2_2(cleanup):
log("== Scenario 2.2: Marca con CF + sucursal sin phone/email ==")
branch_id = setup_branch_contact("2.2", phone=None, email=None)
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.2 branch")
brand_id = setup_brand_contact_with_cf("2.2", branch_id, phone=None, email=None)
cleanup.register(BRAND_LOCATION_ID, brand_id, "2.2 brand")
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
p = build_webhook_payload_marca_to_suc(brand_obj)
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
time.sleep(WAIT_AFTER_WEBHOOK)
# Verificar GET directo + que el CF en Marca sigue apuntando solo al brand_id original.
existing = get_contact(PINOTEPA_LOCATION_ID, branch_id)
assert_(existing is not None and existing.get("id") == branch_id,
"el contacto sucursal preexistente fue eliminado/perdido")
def check_one_marca():
m = _find_brand_with_cf(branch_id)
return m if len(m) == 1 and m[0]["id"] == brand_id else None
assert_eventually(check_one_marca, "esperaba 1 marca con CF apuntando al sucursal")
log(" [OK] sin duplicado pese a phone+email vacíos.")
def scenario_2_3(cleanup):
log("== Scenario 2.3: Marca con CF vacío + sucursal existing → cascada ==")
branch_id = setup_branch_contact("2.3", phone="+5219991110023", email="test23@e3.local")
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.3 branch")
# Marca SIN CF
payload = _new_contact_payload("2.3", phone="+5219991110023", email="test23@e3.local")
brand_id = create_contact(BRAND_LOCATION_ID, payload)
cleanup.register(BRAND_LOCATION_ID, brand_id, "2.3 brand")
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
p = build_webhook_payload_marca_to_suc(brand_obj)
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
time.sleep(WAIT_AFTER_WEBHOOK)
suc_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
assert_(suc_obj and suc_obj["id"] == branch_id, "contacto sucursal desapareció")
# Eventualmente el CF en sucursal debería tener su propio id (autoref) tras UPDATE
log(" [OK] cascada funcionó.")
def scenario_2_4(cleanup):
log("== Scenario 2.4: Marca con CF apuntando a id inexistente (stale) ==")
branch_id = setup_branch_contact("2.4", phone="+5219991110024", email="test24@e3.local")
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.4 branch")
fake_id = "STALE_" + os.urandom(7).hex()
payload = _new_contact_payload("2.4", phone="+5219991110024", email="test24@e3.local",
cf_value=fake_id, branch_cf_id=CF_ID_BRAND)
brand_id = create_contact(BRAND_LOCATION_ID, payload)
cleanup.register(BRAND_LOCATION_ID, brand_id, "2.4 brand")
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
p = build_webhook_payload_marca_to_suc(brand_obj)
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
time.sleep(WAIT_AFTER_WEBHOOK)
suc_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
assert_(suc_obj and suc_obj["id"] == branch_id, "contacto sucursal desapareció")
log(" [OK] cascada manejó CF stale.")
def scenario_2_5(cleanup):
log("== Scenario 2.5: Marca nuevo, crea en sucursal + autoenlace bidireccional ==")
payload = _new_contact_payload("2.5", phone="+5219991110025", email="test25@e3.local")
brand_id = create_contact(BRAND_LOCATION_ID, payload)
cleanup.register(BRAND_LOCATION_ID, brand_id, "2.5 brand")
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
p = build_webhook_payload_marca_to_suc(brand_obj)
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
time.sleep(WAIT_AFTER_WEBHOOK + 5) # autoenlace tarda más (3 PUTs)
# Buscar en sucursal por phone (con retry para indexación)
suc_token = get_token(PINOTEPA_LOCATION_ID)
def query_branch():
r = gc._request("POST", "/contacts/search", suc_token, json={
"locationId": PINOTEPA_LOCATION_ID, "pageLimit": 10, "query": "+5219991110025"
})
c = r.get("contacts") or []
return c if len(c) == 1 else None
contacts = assert_eventually(query_branch, "esperaba 1 contacto sucursal creado")
new_branch_id = contacts[0]["id"]
cleanup.register(PINOTEPA_LOCATION_ID, new_branch_id, "2.5 branch (creado por workflow)")
log(f" creado sucursal: {new_branch_id}")
# Verificar CF en ambos lados (Fase 3 incluida)
branch_cf_id = resolve_cf_id_in_location(PINOTEPA_LOCATION_ID)
new_branch_obj = get_contact(PINOTEPA_LOCATION_ID, new_branch_id)
suc_cf_val = cf_value_of(new_branch_obj, branch_cf_id)
updated_brand = get_contact(BRAND_LOCATION_ID, brand_id)
brand_cf_val = cf_value_of(updated_brand, CF_ID_BRAND)
log(f" CF en sucursal: {suc_cf_val!r} (esperado: {new_branch_id})")
log(f" CF en marca: {brand_cf_val!r} (esperado: {new_branch_id})")
assert_(suc_cf_val == new_branch_id, "CF en sucursal no es autorref")
assert_(brand_cf_val == new_branch_id, "CF en Marca no apunta a sucursal nueva")
log(" [OK] autoenlace bidireccional funcionando.")
def scenario_3_2(cleanup):
"""Re-disparar webhook tras 2.5 - autoreparación."""
log("== Scenario 3.2: re-disparo, autoreparación funciona ==")
# Hacemos el 2.5 primero, luego re-disparamos.
payload = _new_contact_payload("3.2", phone="+5219991110032", email="test32@e3.local")
brand_id = create_contact(BRAND_LOCATION_ID, payload)
cleanup.register(BRAND_LOCATION_ID, brand_id, "3.2 brand")
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
p = build_webhook_payload_marca_to_suc(brand_obj)
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
time.sleep(WAIT_AFTER_WEBHOOK + 5)
suc_token = get_token(PINOTEPA_LOCATION_ID)
def q1():
r = gc._request("POST", "/contacts/search", suc_token, json={
"locationId": PINOTEPA_LOCATION_ID, "pageLimit": 10, "query": "+5219991110032"
})
c = r.get("contacts") or []
return c if len(c) == 1 else None
contacts = assert_eventually(q1, "esperaba 1 en sucursal tras CREATE")
new_branch_id = contacts[0]["id"]
cleanup.register(PINOTEPA_LOCATION_ID, new_branch_id, "3.2 branch (creado)")
# Segundo disparo del MISMO webhook con el mismo brand_id
log(" segundo disparo (auto-reparación)...")
brand_obj_2 = get_contact(BRAND_LOCATION_ID, brand_id) # ya tiene CF poblado
p2 = build_webhook_payload_marca_to_suc(brand_obj_2)
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p2)
time.sleep(WAIT_AFTER_WEBHOOK)
r2 = gc._request("POST", "/contacts/search", suc_token, json={
"locationId": PINOTEPA_LOCATION_ID, "pageLimit": 10, "query": "+5219991110032"
})
n2 = len(r2.get("contacts") or [])
assert_(n2 == 1, f"segundo disparo creó duplicado en sucursal: ahora hay {n2}")
log(" [OK] segundo disparo solo hizo UPDATE (match directo por CF).")
# ---- Scenario registry ----
SCENARIOS = {
"1.1": scenario_1_1,
"1.2": scenario_1_2,
"1.3": scenario_1_3,
"1.4": scenario_1_4,
"1.5": scenario_1_5,
"2.1": scenario_2_1,
"2.2": scenario_2_2,
"2.3": scenario_2_3,
"2.4": scenario_2_4,
"2.5": scenario_2_5,
"3.2": scenario_3_2,
}
GROUPS = {
"all-phase1": ["1.1", "1.2", "1.3", "1.4", "1.5"],
"all-phase2": ["2.1", "2.2", "2.3", "2.4", "2.5"],
"all-phase3": ["3.2"],
"all": ["1.1", "1.2", "1.3", "1.4", "1.5",
"2.1", "2.2", "2.3", "2.4", "2.5", "3.2"],
}
def run_scenarios(names):
results = []
for name in names:
fn = SCENARIOS.get(name)
if not fn:
log(f"[skip] scenario desconocido: {name}")
continue
cleanup = Cleanup()
try:
fn(cleanup)
results.append((name, "PASS", None))
except AssertionError as e:
results.append((name, "FAIL", str(e)))
log(f" [FAIL] {e}")
except Exception as e:
results.append((name, "ERROR", f"{type(e).__name__}: {e}"))
log(f" [ERROR] {type(e).__name__}: {e}")
traceback.print_exc()
finally:
cleanup.run()
return results
def main():
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
parser.add_argument("--scenario", required=True,
help="Nombre del scenario (ej 1.2) o grupo (all-phase1, all-phase2, all-phase3, all).")
args = parser.parse_args()
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8")
names = GROUPS.get(args.scenario, [args.scenario])
log(f"Ejecutando: {names}")
results = run_scenarios(names)
log("\n=== RESULTADOS ===")
for name, status, err in results:
log(f" {name}: {status}" + (f" -- {err}" if err else ""))
n_pass = sum(1 for _, s, _ in results if s == "PASS")
n_total = len(results)
log(f"\n{n_pass}/{n_total} passed")
sys.exit(0 if n_pass == n_total else 1)
if __name__ == "__main__":
main()