752 lines
30 KiB
Python
752 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Test harness E2E para los workflows n8n de sincronización de contactos.
|
|
|
|
Cada escenario:
|
|
1) Setup: crea contactos test en GHL (sucursal PINOTEPA + Marca) con tag
|
|
`revisar` y nombre `Test <scenario>_<ts>`. Registra todos los ids creados.
|
|
2) Disparo: POST al webhook real del workflow correspondiente.
|
|
3) Espera (GHL no es instantáneo).
|
|
4) Asserts: verifica intra_brand_duplicates, conteos, CF en ambos lados.
|
|
5) Cleanup: borra todos los contactos registrados (Marca + sucursal).
|
|
|
|
Uso:
|
|
python scripts/n8n_e2e_test.py --scenario 1.1
|
|
python scripts/n8n_e2e_test.py --scenario all-phase1
|
|
python scripts/n8n_e2e_test.py --scenario all
|
|
"""
|
|
|
|
import argparse
|
|
import datetime
|
|
import json
|
|
import os
|
|
import sys
|
|
import time
|
|
import traceback
|
|
import urllib.error
|
|
import urllib.request
|
|
import warnings
|
|
|
|
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
if SCRIPTS_DIR not in sys.path:
|
|
sys.path.insert(0, SCRIPTS_DIR)
|
|
|
|
import sync_engine # noqa: E402
|
|
|
|
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
|
|
PINOTEPA_LOCATION_ID = "7H91g95hhLKwIUqSk0Rg"
|
|
CF_ID_BRAND = "E6lI9ykWhqpj7Pmi7Qd3" # field_id de id_contacto_sucursal en Marca
|
|
CF_KEY = "contact.id_contacto_sucursal"
|
|
|
|
WORKFLOW_SUC_TO_MARCA = "x4DqZ5FtSc43tdzB"
|
|
WORKFLOW_MARCA_TO_SUC = "4UMRwxJdHFfOGHBp"
|
|
|
|
WEBHOOK_URL_SUC_TO_MARCA = "https://workflows.consultoriae3.com/webhook/56c4c1a9-0271-48e5-8915-e25e18666dce"
|
|
WEBHOOK_URL_MARCA_TO_SUC = "https://workflows.consultoriae3.com/webhook/8d16dc55-616a-4532-8334-8ac81e7d3ae1"
|
|
|
|
WAIT_AFTER_WEBHOOK = 12 # segundos. GHL + n8n suman latencia.
|
|
ASSERT_RETRY_TIMEOUT = 25
|
|
ASSERT_RETRY_INTERVAL = 3
|
|
|
|
gc = sync_engine.ghl_client
|
|
|
|
|
|
def log(msg):
|
|
ts = datetime.datetime.now().strftime("%H:%M:%S")
|
|
line = f"[{ts}] {msg}"
|
|
try:
|
|
sys.stdout.write(line + "\n")
|
|
sys.stdout.flush()
|
|
except UnicodeEncodeError:
|
|
enc = sys.stdout.encoding or "utf-8"
|
|
sys.stdout.write(line.encode(enc, errors="replace").decode(enc) + "\n")
|
|
sys.stdout.flush()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tokens
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_accounts_cache = None
|
|
|
|
def get_token(location_id):
|
|
global _accounts_cache
|
|
if _accounts_cache is None:
|
|
_accounts_cache = {a["location_id"]: a for a in sync_engine.parse_accounts_csv()}
|
|
acc = _accounts_cache.get(location_id)
|
|
if not acc:
|
|
raise RuntimeError(f"No hay token para location {location_id}")
|
|
return acc["token"]
|
|
|
|
|
|
def resolve_cf_id_in_location(location_id, field_key=CF_KEY):
|
|
"""Devuelve field_id del CF en cualquier location (varía por sucursal)."""
|
|
token = get_token(location_id)
|
|
data = gc._request("GET", f"/locations/{location_id}/customFields", token,
|
|
params={"model": "contact"})
|
|
for cf in (data.get("customFields") or []):
|
|
if cf.get("fieldKey") == field_key:
|
|
return cf.get("id")
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# GHL helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def create_contact(location_id, payload):
|
|
"""POST /contacts. payload es el body. Devuelve contact_id."""
|
|
token = get_token(location_id)
|
|
full = {"locationId": location_id, **payload}
|
|
res = gc._request("POST", "/contacts/", token, json=full)
|
|
cid = (res.get("contact") or {}).get("id") or res.get("id")
|
|
if not cid:
|
|
raise RuntimeError(f"Crear contacto sin id: {res}")
|
|
return cid
|
|
|
|
|
|
def update_contact(location_id, contact_id, payload):
|
|
token = get_token(location_id)
|
|
return gc._request("PUT", f"/contacts/{contact_id}", token, json=payload)
|
|
|
|
|
|
def delete_contact_safe(location_id, contact_id):
|
|
"""DELETE con 404 silencioso."""
|
|
if not contact_id:
|
|
return False
|
|
token = get_token(location_id)
|
|
try:
|
|
gc.delete_contact(token, contact_id, location_id)
|
|
return True
|
|
except Exception as e:
|
|
msg = str(e)
|
|
if "404" in msg or "not found" in msg.lower():
|
|
return False
|
|
raise
|
|
|
|
|
|
def get_contact(location_id, contact_id):
|
|
token = get_token(location_id)
|
|
return gc._request("GET", f"/contacts/{contact_id}", token).get("contact")
|
|
|
|
|
|
def cf_value_of(contact, field_id):
|
|
for cf in (contact or {}).get("customFields") or []:
|
|
if cf.get("id") == field_id or cf.get("fieldId") == field_id:
|
|
return cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString")
|
|
return None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Webhook
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def fire_webhook(webhook_url, payload):
|
|
"""POST al webhook URL. Retorna (status, response_body_text)."""
|
|
data = json.dumps(payload).encode("utf-8")
|
|
req = urllib.request.Request(webhook_url, method="POST", data=data,
|
|
headers={"Content-Type": "application/json"})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as r:
|
|
return r.status, r.read().decode("utf-8", errors="replace")[:500]
|
|
except urllib.error.HTTPError as e:
|
|
return e.code, e.read().decode("utf-8", errors="replace")[:500]
|
|
|
|
|
|
def build_webhook_payload_suc_to_marca(branch_contact, sucursal_name="PINOTEPA"):
|
|
"""Imita el body plano que GHL envía cuando dispara un webhook al crear contacto.
|
|
n8n envolverá automáticamente esto en `$json.body` cuando lo reciba."""
|
|
full_name = f"{branch_contact.get('firstName') or ''} {branch_contact.get('lastName') or ''}".strip()
|
|
return {
|
|
"contact_id": branch_contact["id"],
|
|
"first_name": branch_contact.get("firstName") or "",
|
|
"last_name": branch_contact.get("lastName") or "",
|
|
"full_name": full_name,
|
|
"email": branch_contact.get("email") or "",
|
|
"phone": branch_contact.get("phone") or "",
|
|
"country": branch_contact.get("country") or "MX",
|
|
"date_created": branch_contact.get("dateAdded") or "",
|
|
"tags": ",".join(branch_contact.get("tags") or []),
|
|
"Sucursal": sucursal_name,
|
|
"TIENDA": sucursal_name,
|
|
"ID Contacto Sucursal": branch_contact["id"],
|
|
"Fuente de Posible cliente": "test_harness",
|
|
"Marca del Vehiculo": "",
|
|
"Version del Vehiculo": "",
|
|
"Año del Vehículo": "",
|
|
"¿Qué modalidad prefieres?": "",
|
|
"Acepta los terminos para tu cotización": "",
|
|
"Archivos Adicionales": "",
|
|
"CANAL DE ORIGEN": "test",
|
|
"Check Comunicaciones de Marketing": "",
|
|
"Descripción": "",
|
|
"Fuente de Prospecto": "test",
|
|
"ID Contacto Monte Providencia": "",
|
|
"Información Adicional": "",
|
|
"Presupuesto": "",
|
|
"[Correo_Canalizados]": "",
|
|
"[Correo_Empresa]": "",
|
|
"[Dirección de la Empresa]": "",
|
|
"[Nombre de la Empresa]": "",
|
|
"[Número de Teléfono para Atender]": "",
|
|
"[Número de WhatsApp para Atender]": "",
|
|
"¿Cuándo necesitas el dinero?": "",
|
|
"contact_type": "lead",
|
|
"customData": {},
|
|
"full_address": "",
|
|
"location": {"id": PINOTEPA_LOCATION_ID, "name": "85957 - MP - PINOTEPA"},
|
|
"triggerData": {},
|
|
"workflow": {"id": WORKFLOW_SUC_TO_MARCA, "name": "test_harness"},
|
|
}
|
|
|
|
|
|
def build_webhook_payload_marca_to_suc(brand_contact, sucursal_name="PINOTEPA"):
|
|
"""Para el otro sentido. Payload plano (n8n envuelve)."""
|
|
full_name = f"{brand_contact.get('firstName') or ''} {brand_contact.get('lastName') or ''}".strip()
|
|
return {
|
|
"contact_id": brand_contact["id"],
|
|
"first_name": brand_contact.get("firstName") or "",
|
|
"last_name": brand_contact.get("lastName") or "",
|
|
"full_name": full_name,
|
|
"email": brand_contact.get("email") or "",
|
|
"phone": brand_contact.get("phone") or "",
|
|
"country": brand_contact.get("country") or "MX",
|
|
"date_created": brand_contact.get("dateAdded") or "",
|
|
"tags": ",".join(brand_contact.get("tags") or []),
|
|
"Sucursal": sucursal_name,
|
|
"TIENDA": sucursal_name,
|
|
"Fuente de Posible cliente": "test_harness",
|
|
"customData": {},
|
|
"location": {"id": BRAND_LOCATION_ID, "name": "Monte Providencia"},
|
|
"workflow": {"id": WORKFLOW_MARCA_TO_SUC, "name": "test_harness"},
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Cleanup tracker
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class Cleanup:
|
|
def __init__(self):
|
|
self.contacts = [] # list of (location_id, contact_id, note)
|
|
|
|
def register(self, location_id, contact_id, note=""):
|
|
if contact_id:
|
|
self.contacts.append((location_id, contact_id, note))
|
|
|
|
def run(self):
|
|
log(f"[cleanup] borrando {len(self.contacts)} contactos test...")
|
|
ok, fail = 0, 0
|
|
# Borramos en orden inverso por si el delete en cascade de un nodo padre
|
|
# invalida otros (no debería pasar, pero defensivo).
|
|
for loc, cid, note in reversed(self.contacts):
|
|
try:
|
|
if delete_contact_safe(loc, cid):
|
|
ok += 1
|
|
log(f" [del] {loc}/{cid} ({note})")
|
|
else:
|
|
log(f" [skip-404] {loc}/{cid} ({note})")
|
|
except Exception as e:
|
|
fail += 1
|
|
log(f" [FAIL] {loc}/{cid}: {e}")
|
|
log(f"[cleanup] {ok} borrados, {fail} errores.")
|
|
return fail == 0
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test scenarios
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _ts_suffix():
|
|
return datetime.datetime.now().strftime("%y%m%d_%H%M%S")
|
|
|
|
|
|
def _new_contact_payload(scenario_id, *, phone=None, email=None, cf_value=None, branch_cf_id=None):
|
|
"""Body base para un contacto test."""
|
|
ts = _ts_suffix()
|
|
body = {
|
|
"firstName": f"Test{scenario_id.replace('.', '')}",
|
|
"lastName": f"Harness_{ts}",
|
|
"tags": ["revisar", "qa-test"],
|
|
}
|
|
if phone:
|
|
body["phone"] = phone
|
|
if email:
|
|
body["email"] = email
|
|
if cf_value and branch_cf_id:
|
|
body["customFields"] = [{"id": branch_cf_id, "key": CF_KEY, "field_value": cf_value}]
|
|
return body
|
|
|
|
|
|
def assert_(condition, msg):
|
|
if not condition:
|
|
raise AssertionError(msg)
|
|
|
|
|
|
def assert_eventually(predicate, msg, timeout=None, interval=None):
|
|
"""Asserts que un predicate eventualmente devuelve truthy. Si no, lanza
|
|
AssertionError tras `timeout` segundos."""
|
|
import time as _t
|
|
timeout = timeout or ASSERT_RETRY_TIMEOUT
|
|
interval = interval or ASSERT_RETRY_INTERVAL
|
|
start = _t.time()
|
|
last_result = None
|
|
while _t.time() - start < timeout:
|
|
last_result = predicate()
|
|
if last_result:
|
|
return last_result
|
|
_t.sleep(interval)
|
|
raise AssertionError(f"{msg} (last={last_result!r})")
|
|
|
|
|
|
def _find_brand_with_cf(value):
|
|
"""Busca contacto en Marca por el CF id_contacto_sucursal=value."""
|
|
token = get_token(BRAND_LOCATION_ID)
|
|
try:
|
|
r = gc._request("POST", "/contacts/search", token, json={
|
|
"locationId": BRAND_LOCATION_ID,
|
|
"pageLimit": 5,
|
|
"filters": [{"group": "AND", "filters": [
|
|
{"field": f"customFields.{CF_ID_BRAND}", "operator": "eq", "value": value}
|
|
]}]
|
|
})
|
|
return r.get("contacts") or []
|
|
except Exception as e:
|
|
log(f" [warn] search por CF falló: {e}")
|
|
return []
|
|
|
|
|
|
def wait_for_brand_cf_indexed(branch_id, expected_count=1, timeout=25, interval=2):
|
|
"""GHL puede tardar varios segundos en indexar un nuevo CF en el filter search.
|
|
Hace polling hasta que el search por CF devuelve `expected_count` matches.
|
|
Retorna True si converge, False si timeout."""
|
|
import time as _t
|
|
start = _t.time()
|
|
while _t.time() - start < timeout:
|
|
n = len(_find_brand_with_cf(branch_id))
|
|
if n >= expected_count:
|
|
return True
|
|
_t.sleep(interval)
|
|
return False
|
|
|
|
|
|
def wait_for_brand_query_indexed(query, expected_count=1, timeout=25, interval=2):
|
|
"""Polling por search query (que el workflow usa para phone/email/nombre)."""
|
|
import time as _t
|
|
token = get_token(BRAND_LOCATION_ID)
|
|
start = _t.time()
|
|
while _t.time() - start < timeout:
|
|
try:
|
|
r = gc._request("POST", "/contacts/search", token, json={
|
|
"locationId": BRAND_LOCATION_ID, "pageLimit": 5, "query": query
|
|
})
|
|
n = len(r.get("contacts") or [])
|
|
if n >= expected_count:
|
|
return True
|
|
except Exception:
|
|
pass
|
|
_t.sleep(interval)
|
|
return False
|
|
|
|
|
|
# ---- Setup helpers ----
|
|
|
|
def setup_branch_contact(scenario_id, *, phone=None, email=None, with_cf=True, location_id=PINOTEPA_LOCATION_ID):
|
|
"""Crea un contacto en sucursal con CF poblado (si with_cf=True)."""
|
|
branch_cf_id = resolve_cf_id_in_location(location_id) if with_cf else None
|
|
payload = _new_contact_payload(scenario_id, phone=phone, email=email)
|
|
cid = create_contact(location_id, payload)
|
|
if with_cf and branch_cf_id:
|
|
# Poblar el CF con su propio id (autoreferencia, igual que el fill_*)
|
|
update_contact(location_id, cid, {
|
|
"customFields": [{"id": branch_cf_id, "key": CF_KEY, "field_value": cid}]
|
|
})
|
|
return cid
|
|
|
|
|
|
def setup_brand_contact_with_cf(scenario_id, branch_contact_id, *, phone=None, email=None):
|
|
"""Crea contacto en Marca con CF apuntando a branch_contact_id."""
|
|
payload = _new_contact_payload(scenario_id, phone=phone, email=email,
|
|
cf_value=branch_contact_id, branch_cf_id=CF_ID_BRAND)
|
|
return create_contact(BRAND_LOCATION_ID, payload)
|
|
|
|
|
|
# ---- Escenarios Fase 1 (Sucursal → Marca) ----
|
|
|
|
def scenario_1_1(cleanup):
|
|
"""Happy: sucursal con CF + Marca con CF apuntando. Esperado: match por CF, UPDATE."""
|
|
log("== Scenario 1.1: happy path con CF ==")
|
|
branch_id = setup_branch_contact("1.1", phone="+5219991110011", email="test11@e3.local")
|
|
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.1 branch")
|
|
log(f" branch contact: {branch_id}")
|
|
|
|
brand_id = setup_brand_contact_with_cf("1.1", branch_id, phone="+5219991110011", email="test11@e3.local")
|
|
cleanup.register(BRAND_LOCATION_ID, brand_id, "1.1 brand")
|
|
log(f" brand contact: {brand_id} (CF -> {branch_id})")
|
|
log(f" esperando indexación del CF en Marca...")
|
|
assert_(wait_for_brand_cf_indexed(branch_id), "Brand CF no se indexó a tiempo")
|
|
|
|
# Disparar webhook
|
|
branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
|
|
payload = build_webhook_payload_suc_to_marca(branch_obj)
|
|
s, body = fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, payload)
|
|
log(f" webhook status: {s} body: {body[:100]}")
|
|
time.sleep(WAIT_AFTER_WEBHOOK)
|
|
|
|
# Verificar: que NO se creó un segundo contacto en Marca con CF=branch_id
|
|
def check_unique():
|
|
matches = _find_brand_with_cf(branch_id)
|
|
return matches if len(matches) == 1 and matches[0]["id"] == brand_id else None
|
|
assert_eventually(check_unique, "esperaba 1 match por CF apuntando al brand original")
|
|
log(" [OK] match único por CF; no duplicado.")
|
|
|
|
|
|
def scenario_1_2(cleanup):
|
|
"""Bug original: sucursal SIN phone ni email, Marca con CF correcto."""
|
|
log("== Scenario 1.2: phone+email vacíos (bug original) ==")
|
|
branch_id = setup_branch_contact("1.2", phone=None, email=None)
|
|
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.2 branch")
|
|
|
|
brand_id = setup_brand_contact_with_cf("1.2", branch_id, phone=None, email=None)
|
|
cleanup.register(BRAND_LOCATION_ID, brand_id, "1.2 brand")
|
|
log(f" esperando indexación CF...")
|
|
assert_(wait_for_brand_cf_indexed(branch_id), "Brand CF no se indexó a tiempo")
|
|
|
|
branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
|
|
payload = build_webhook_payload_suc_to_marca(branch_obj)
|
|
fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, payload)
|
|
time.sleep(WAIT_AFTER_WEBHOOK)
|
|
|
|
assert_eventually(lambda: len(_find_brand_with_cf(branch_id)) == 1,
|
|
"esperaba 1 match (sin duplicado)")
|
|
log(" [OK] sin duplicado.")
|
|
|
|
|
|
def scenario_1_3(cleanup):
|
|
"""CF vacío en sucursal pero phone presente → cascada phone."""
|
|
log("== Scenario 1.3: CF vacío en sucursal, fallback a phone ==")
|
|
branch_id = setup_branch_contact("1.3", phone="+5219991110013", email="test13@e3.local", with_cf=False)
|
|
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.3 branch")
|
|
|
|
# Marca SIN CF pero con mismo phone — debe matchear por cascada
|
|
payload = _new_contact_payload("1.3", phone="+5219991110013", email="test13@e3.local")
|
|
brand_id = create_contact(BRAND_LOCATION_ID, payload)
|
|
cleanup.register(BRAND_LOCATION_ID, brand_id, "1.3 brand")
|
|
log(f" esperando indexación de phone en Marca...")
|
|
assert_(wait_for_brand_query_indexed("+5219991110013"), "phone no indexado a tiempo")
|
|
|
|
branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
|
|
p = build_webhook_payload_suc_to_marca(branch_obj)
|
|
fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, p)
|
|
time.sleep(WAIT_AFTER_WEBHOOK)
|
|
|
|
# Tras la corrida con Fase 1, esperaríamos que el workflow haya poblado el CF
|
|
# en el Marca existente con el branch_id (vía la inyección defensive).
|
|
upd = get_contact(BRAND_LOCATION_ID, brand_id)
|
|
cf_now = cf_value_of(upd, CF_ID_BRAND)
|
|
if cf_now == branch_id:
|
|
log(f" [OK] cascada phone funcionó + CF inyectado = {cf_now}")
|
|
else:
|
|
log(f" [WARN] CF post-update = {cf_now!r} (esperaba {branch_id}). Cascada quizá creó nuevo.")
|
|
# Si creó duplicado: el find por CF debería seguir vacío
|
|
matches = _find_brand_with_cf(branch_id)
|
|
if matches:
|
|
cleanup.register(BRAND_LOCATION_ID, matches[0]["id"], "1.3 brand (creado por workflow)")
|
|
assert_(False, "CF no quedó poblado tras cascada")
|
|
|
|
|
|
def scenario_1_4(cleanup):
|
|
"""Contacto nuevo en sucursal, no existe en Marca → crea + CF poblado."""
|
|
log("== Scenario 1.4: nuevo, crea en Marca ==")
|
|
branch_id = setup_branch_contact("1.4", phone="+5219991110014", email="test14@e3.local")
|
|
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.4 branch")
|
|
|
|
branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
|
|
p = build_webhook_payload_suc_to_marca(branch_obj)
|
|
fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, p)
|
|
time.sleep(WAIT_AFTER_WEBHOOK)
|
|
|
|
matches = assert_eventually(
|
|
lambda: _find_brand_with_cf(branch_id) if len(_find_brand_with_cf(branch_id)) == 1 else None,
|
|
"esperaba que se creara 1 contacto en Marca con CF"
|
|
)
|
|
brand_new = matches[0]
|
|
cleanup.register(BRAND_LOCATION_ID, brand_new["id"], "1.4 brand (creado por workflow)")
|
|
log(f" [OK] creado en Marca: {brand_new['id']} CF={branch_id}")
|
|
|
|
|
|
def scenario_1_5(cleanup):
|
|
"""Sucursal con CF + 2 contactos Marca con mismo phone. CF gana."""
|
|
log("== Scenario 1.5: CF gana sobre multi-phone ==")
|
|
branch_id = setup_branch_contact("1.5", phone="+5219991110015", email="test15a@e3.local")
|
|
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.5 branch")
|
|
|
|
# 2 Marca con mismo phone, solo 1 con CF correcto
|
|
brand_correct = setup_brand_contact_with_cf("1.5a", branch_id, phone="+5219991110015", email="test15a@e3.local")
|
|
cleanup.register(BRAND_LOCATION_ID, brand_correct, "1.5 brand correct")
|
|
log(f" esperando indexación CF...")
|
|
assert_(wait_for_brand_cf_indexed(branch_id), "Brand CF no se indexó a tiempo")
|
|
|
|
payload_decoy = _new_contact_payload("1.5b", phone="+5219991110015", email="test15b@e3.local")
|
|
# Sin CF
|
|
try:
|
|
brand_decoy = create_contact(BRAND_LOCATION_ID, payload_decoy)
|
|
cleanup.register(BRAND_LOCATION_ID, brand_decoy, "1.5 brand decoy")
|
|
except Exception as e:
|
|
# GHL puede rechazar duplicate phone sin allowDuplicateContact; en ese caso saltamos el caso
|
|
log(f" [skip] GHL rechazó duplicate phone: {e}")
|
|
return
|
|
|
|
branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
|
|
p = build_webhook_payload_suc_to_marca(branch_obj)
|
|
fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, p)
|
|
time.sleep(WAIT_AFTER_WEBHOOK)
|
|
|
|
def check_correct():
|
|
m = _find_brand_with_cf(branch_id)
|
|
return m if len(m) == 1 and m[0]["id"] == brand_correct else None
|
|
assert_eventually(check_correct, "CF debería apuntar al brand_correct (no al decoy)")
|
|
log(" [OK] CF prevaleció sobre cascada phone.")
|
|
|
|
|
|
# ---- Escenarios Fase 2 (Marca → Sucursal) ----
|
|
|
|
def scenario_2_1(cleanup):
|
|
log("== Scenario 2.1: Marca con CF, sucursal existing ==")
|
|
branch_id = setup_branch_contact("2.1", phone="+5219991110021", email="test21@e3.local")
|
|
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.1 branch")
|
|
brand_id = setup_brand_contact_with_cf("2.1", branch_id, phone="+5219991110021", email="test21@e3.local")
|
|
cleanup.register(BRAND_LOCATION_ID, brand_id, "2.1 brand")
|
|
|
|
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
|
|
p = build_webhook_payload_marca_to_suc(brand_obj)
|
|
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
|
|
time.sleep(WAIT_AFTER_WEBHOOK)
|
|
|
|
# Verificar GET directo + que no se creó otro en sucursal (audit por CF en Marca).
|
|
suc_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
|
|
assert_(suc_obj and suc_obj["id"] == branch_id, "contacto sucursal preexistente desapareció")
|
|
assert_eventually(lambda: len(_find_brand_with_cf(branch_id)) == 1,
|
|
"esperaba 1 marca con CF apuntando al sucursal")
|
|
log(" [OK] sin duplicado en sucursal.")
|
|
|
|
|
|
def scenario_2_2(cleanup):
|
|
log("== Scenario 2.2: Marca con CF + sucursal sin phone/email ==")
|
|
branch_id = setup_branch_contact("2.2", phone=None, email=None)
|
|
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.2 branch")
|
|
brand_id = setup_brand_contact_with_cf("2.2", branch_id, phone=None, email=None)
|
|
cleanup.register(BRAND_LOCATION_ID, brand_id, "2.2 brand")
|
|
|
|
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
|
|
p = build_webhook_payload_marca_to_suc(brand_obj)
|
|
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
|
|
time.sleep(WAIT_AFTER_WEBHOOK)
|
|
|
|
# Verificar GET directo + que el CF en Marca sigue apuntando solo al brand_id original.
|
|
existing = get_contact(PINOTEPA_LOCATION_ID, branch_id)
|
|
assert_(existing is not None and existing.get("id") == branch_id,
|
|
"el contacto sucursal preexistente fue eliminado/perdido")
|
|
def check_one_marca():
|
|
m = _find_brand_with_cf(branch_id)
|
|
return m if len(m) == 1 and m[0]["id"] == brand_id else None
|
|
assert_eventually(check_one_marca, "esperaba 1 marca con CF apuntando al sucursal")
|
|
log(" [OK] sin duplicado pese a phone+email vacíos.")
|
|
|
|
|
|
def scenario_2_3(cleanup):
|
|
log("== Scenario 2.3: Marca con CF vacío + sucursal existing → cascada ==")
|
|
branch_id = setup_branch_contact("2.3", phone="+5219991110023", email="test23@e3.local")
|
|
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.3 branch")
|
|
|
|
# Marca SIN CF
|
|
payload = _new_contact_payload("2.3", phone="+5219991110023", email="test23@e3.local")
|
|
brand_id = create_contact(BRAND_LOCATION_ID, payload)
|
|
cleanup.register(BRAND_LOCATION_ID, brand_id, "2.3 brand")
|
|
|
|
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
|
|
p = build_webhook_payload_marca_to_suc(brand_obj)
|
|
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
|
|
time.sleep(WAIT_AFTER_WEBHOOK)
|
|
|
|
suc_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
|
|
assert_(suc_obj and suc_obj["id"] == branch_id, "contacto sucursal desapareció")
|
|
# Eventualmente el CF en sucursal debería tener su propio id (autoref) tras UPDATE
|
|
log(" [OK] cascada funcionó.")
|
|
|
|
|
|
def scenario_2_4(cleanup):
|
|
log("== Scenario 2.4: Marca con CF apuntando a id inexistente (stale) ==")
|
|
branch_id = setup_branch_contact("2.4", phone="+5219991110024", email="test24@e3.local")
|
|
cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.4 branch")
|
|
fake_id = "STALE_" + os.urandom(7).hex()
|
|
payload = _new_contact_payload("2.4", phone="+5219991110024", email="test24@e3.local",
|
|
cf_value=fake_id, branch_cf_id=CF_ID_BRAND)
|
|
brand_id = create_contact(BRAND_LOCATION_ID, payload)
|
|
cleanup.register(BRAND_LOCATION_ID, brand_id, "2.4 brand")
|
|
|
|
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
|
|
p = build_webhook_payload_marca_to_suc(brand_obj)
|
|
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
|
|
time.sleep(WAIT_AFTER_WEBHOOK)
|
|
|
|
suc_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id)
|
|
assert_(suc_obj and suc_obj["id"] == branch_id, "contacto sucursal desapareció")
|
|
log(" [OK] cascada manejó CF stale.")
|
|
|
|
|
|
def scenario_2_5(cleanup):
|
|
log("== Scenario 2.5: Marca nuevo, crea en sucursal + autoenlace bidireccional ==")
|
|
payload = _new_contact_payload("2.5", phone="+5219991110025", email="test25@e3.local")
|
|
brand_id = create_contact(BRAND_LOCATION_ID, payload)
|
|
cleanup.register(BRAND_LOCATION_ID, brand_id, "2.5 brand")
|
|
|
|
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
|
|
p = build_webhook_payload_marca_to_suc(brand_obj)
|
|
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
|
|
time.sleep(WAIT_AFTER_WEBHOOK + 5) # autoenlace tarda más (3 PUTs)
|
|
|
|
# Buscar en sucursal por phone (con retry para indexación)
|
|
suc_token = get_token(PINOTEPA_LOCATION_ID)
|
|
def query_branch():
|
|
r = gc._request("POST", "/contacts/search", suc_token, json={
|
|
"locationId": PINOTEPA_LOCATION_ID, "pageLimit": 10, "query": "+5219991110025"
|
|
})
|
|
c = r.get("contacts") or []
|
|
return c if len(c) == 1 else None
|
|
contacts = assert_eventually(query_branch, "esperaba 1 contacto sucursal creado")
|
|
new_branch_id = contacts[0]["id"]
|
|
cleanup.register(PINOTEPA_LOCATION_ID, new_branch_id, "2.5 branch (creado por workflow)")
|
|
log(f" creado sucursal: {new_branch_id}")
|
|
|
|
# Verificar CF en ambos lados (Fase 3 incluida)
|
|
branch_cf_id = resolve_cf_id_in_location(PINOTEPA_LOCATION_ID)
|
|
new_branch_obj = get_contact(PINOTEPA_LOCATION_ID, new_branch_id)
|
|
suc_cf_val = cf_value_of(new_branch_obj, branch_cf_id)
|
|
updated_brand = get_contact(BRAND_LOCATION_ID, brand_id)
|
|
brand_cf_val = cf_value_of(updated_brand, CF_ID_BRAND)
|
|
log(f" CF en sucursal: {suc_cf_val!r} (esperado: {new_branch_id})")
|
|
log(f" CF en marca: {brand_cf_val!r} (esperado: {new_branch_id})")
|
|
assert_(suc_cf_val == new_branch_id, "CF en sucursal no es autorref")
|
|
assert_(brand_cf_val == new_branch_id, "CF en Marca no apunta a sucursal nueva")
|
|
log(" [OK] autoenlace bidireccional funcionando.")
|
|
|
|
|
|
def scenario_3_2(cleanup):
|
|
"""Re-disparar webhook tras 2.5 - autoreparación."""
|
|
log("== Scenario 3.2: re-disparo, autoreparación funciona ==")
|
|
# Hacemos el 2.5 primero, luego re-disparamos.
|
|
payload = _new_contact_payload("3.2", phone="+5219991110032", email="test32@e3.local")
|
|
brand_id = create_contact(BRAND_LOCATION_ID, payload)
|
|
cleanup.register(BRAND_LOCATION_ID, brand_id, "3.2 brand")
|
|
|
|
brand_obj = get_contact(BRAND_LOCATION_ID, brand_id)
|
|
p = build_webhook_payload_marca_to_suc(brand_obj)
|
|
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p)
|
|
time.sleep(WAIT_AFTER_WEBHOOK + 5)
|
|
|
|
suc_token = get_token(PINOTEPA_LOCATION_ID)
|
|
def q1():
|
|
r = gc._request("POST", "/contacts/search", suc_token, json={
|
|
"locationId": PINOTEPA_LOCATION_ID, "pageLimit": 10, "query": "+5219991110032"
|
|
})
|
|
c = r.get("contacts") or []
|
|
return c if len(c) == 1 else None
|
|
contacts = assert_eventually(q1, "esperaba 1 en sucursal tras CREATE")
|
|
new_branch_id = contacts[0]["id"]
|
|
cleanup.register(PINOTEPA_LOCATION_ID, new_branch_id, "3.2 branch (creado)")
|
|
|
|
# Segundo disparo del MISMO webhook con el mismo brand_id
|
|
log(" segundo disparo (auto-reparación)...")
|
|
brand_obj_2 = get_contact(BRAND_LOCATION_ID, brand_id) # ya tiene CF poblado
|
|
p2 = build_webhook_payload_marca_to_suc(brand_obj_2)
|
|
fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p2)
|
|
time.sleep(WAIT_AFTER_WEBHOOK)
|
|
|
|
r2 = gc._request("POST", "/contacts/search", suc_token, json={
|
|
"locationId": PINOTEPA_LOCATION_ID, "pageLimit": 10, "query": "+5219991110032"
|
|
})
|
|
n2 = len(r2.get("contacts") or [])
|
|
assert_(n2 == 1, f"segundo disparo creó duplicado en sucursal: ahora hay {n2}")
|
|
log(" [OK] segundo disparo solo hizo UPDATE (match directo por CF).")
|
|
|
|
|
|
# ---- Scenario registry ----
|
|
|
|
SCENARIOS = {
|
|
"1.1": scenario_1_1,
|
|
"1.2": scenario_1_2,
|
|
"1.3": scenario_1_3,
|
|
"1.4": scenario_1_4,
|
|
"1.5": scenario_1_5,
|
|
"2.1": scenario_2_1,
|
|
"2.2": scenario_2_2,
|
|
"2.3": scenario_2_3,
|
|
"2.4": scenario_2_4,
|
|
"2.5": scenario_2_5,
|
|
"3.2": scenario_3_2,
|
|
}
|
|
|
|
GROUPS = {
|
|
"all-phase1": ["1.1", "1.2", "1.3", "1.4", "1.5"],
|
|
"all-phase2": ["2.1", "2.2", "2.3", "2.4", "2.5"],
|
|
"all-phase3": ["3.2"],
|
|
"all": ["1.1", "1.2", "1.3", "1.4", "1.5",
|
|
"2.1", "2.2", "2.3", "2.4", "2.5", "3.2"],
|
|
}
|
|
|
|
|
|
def run_scenarios(names):
|
|
results = []
|
|
for name in names:
|
|
fn = SCENARIOS.get(name)
|
|
if not fn:
|
|
log(f"[skip] scenario desconocido: {name}")
|
|
continue
|
|
cleanup = Cleanup()
|
|
try:
|
|
fn(cleanup)
|
|
results.append((name, "PASS", None))
|
|
except AssertionError as e:
|
|
results.append((name, "FAIL", str(e)))
|
|
log(f" [FAIL] {e}")
|
|
except Exception as e:
|
|
results.append((name, "ERROR", f"{type(e).__name__}: {e}"))
|
|
log(f" [ERROR] {type(e).__name__}: {e}")
|
|
traceback.print_exc()
|
|
finally:
|
|
cleanup.run()
|
|
return results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description=__doc__.splitlines()[0])
|
|
parser.add_argument("--scenario", required=True,
|
|
help="Nombre del scenario (ej 1.2) o grupo (all-phase1, all-phase2, all-phase3, all).")
|
|
args = parser.parse_args()
|
|
|
|
if hasattr(sys.stdout, "reconfigure"):
|
|
sys.stdout.reconfigure(encoding="utf-8")
|
|
|
|
names = GROUPS.get(args.scenario, [args.scenario])
|
|
log(f"Ejecutando: {names}")
|
|
results = run_scenarios(names)
|
|
|
|
log("\n=== RESULTADOS ===")
|
|
for name, status, err in results:
|
|
log(f" {name}: {status}" + (f" -- {err}" if err else ""))
|
|
|
|
n_pass = sum(1 for _, s, _ in results if s == "PASS")
|
|
n_total = len(results)
|
|
log(f"\n{n_pass}/{n_total} passed")
|
|
sys.exit(0 if n_pass == n_total else 1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|