#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Test harness E2E para los workflows n8n de sincronización de contactos. Cada escenario: 1) Setup: crea contactos test en GHL (sucursal PINOTEPA + Marca) con tag `revisar` y nombre `Test _`. Registra todos los ids creados. 2) Disparo: POST al webhook real del workflow correspondiente. 3) Espera (GHL no es instantáneo). 4) Asserts: verifica intra_brand_duplicates, conteos, CF en ambos lados. 5) Cleanup: borra todos los contactos registrados (Marca + sucursal). Uso: python scripts/n8n_e2e_test.py --scenario 1.1 python scripts/n8n_e2e_test.py --scenario all-phase1 python scripts/n8n_e2e_test.py --scenario all """ import argparse import datetime import json import os import sys import time import traceback import urllib.error import urllib.request import warnings warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!") ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__)) if SCRIPTS_DIR not in sys.path: sys.path.insert(0, SCRIPTS_DIR) import sync_engine # noqa: E402 BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3" PINOTEPA_LOCATION_ID = "7H91g95hhLKwIUqSk0Rg" CF_ID_BRAND = "E6lI9ykWhqpj7Pmi7Qd3" # field_id de id_contacto_sucursal en Marca CF_KEY = "contact.id_contacto_sucursal" WORKFLOW_SUC_TO_MARCA = "x4DqZ5FtSc43tdzB" WORKFLOW_MARCA_TO_SUC = "4UMRwxJdHFfOGHBp" WEBHOOK_URL_SUC_TO_MARCA = "https://workflows.consultoriae3.com/webhook/56c4c1a9-0271-48e5-8915-e25e18666dce" WEBHOOK_URL_MARCA_TO_SUC = "https://workflows.consultoriae3.com/webhook/8d16dc55-616a-4532-8334-8ac81e7d3ae1" WAIT_AFTER_WEBHOOK = 12 # segundos. GHL + n8n suman latencia. ASSERT_RETRY_TIMEOUT = 25 ASSERT_RETRY_INTERVAL = 3 gc = sync_engine.ghl_client def log(msg): ts = datetime.datetime.now().strftime("%H:%M:%S") line = f"[{ts}] {msg}" try: sys.stdout.write(line + "\n") sys.stdout.flush() except UnicodeEncodeError: enc = sys.stdout.encoding or "utf-8" sys.stdout.write(line.encode(enc, errors="replace").decode(enc) + "\n") sys.stdout.flush() # --------------------------------------------------------------------------- # Tokens # --------------------------------------------------------------------------- _accounts_cache = None def get_token(location_id): global _accounts_cache if _accounts_cache is None: _accounts_cache = {a["location_id"]: a for a in sync_engine.parse_accounts_csv()} acc = _accounts_cache.get(location_id) if not acc: raise RuntimeError(f"No hay token para location {location_id}") return acc["token"] def resolve_cf_id_in_location(location_id, field_key=CF_KEY): """Devuelve field_id del CF en cualquier location (varía por sucursal).""" token = get_token(location_id) data = gc._request("GET", f"/locations/{location_id}/customFields", token, params={"model": "contact"}) for cf in (data.get("customFields") or []): if cf.get("fieldKey") == field_key: return cf.get("id") return None # --------------------------------------------------------------------------- # GHL helpers # --------------------------------------------------------------------------- def create_contact(location_id, payload): """POST /contacts. payload es el body. Devuelve contact_id.""" token = get_token(location_id) full = {"locationId": location_id, **payload} res = gc._request("POST", "/contacts/", token, json=full) cid = (res.get("contact") or {}).get("id") or res.get("id") if not cid: raise RuntimeError(f"Crear contacto sin id: {res}") return cid def update_contact(location_id, contact_id, payload): token = get_token(location_id) return gc._request("PUT", f"/contacts/{contact_id}", token, json=payload) def delete_contact_safe(location_id, contact_id): """DELETE con 404 silencioso.""" if not contact_id: return False token = get_token(location_id) try: gc.delete_contact(token, contact_id, location_id) return True except Exception as e: msg = str(e) if "404" in msg or "not found" in msg.lower(): return False raise def get_contact(location_id, contact_id): token = get_token(location_id) return gc._request("GET", f"/contacts/{contact_id}", token).get("contact") def cf_value_of(contact, field_id): for cf in (contact or {}).get("customFields") or []: if cf.get("id") == field_id or cf.get("fieldId") == field_id: return cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString") return None # --------------------------------------------------------------------------- # Webhook # --------------------------------------------------------------------------- def fire_webhook(webhook_url, payload): """POST al webhook URL. Retorna (status, response_body_text).""" data = json.dumps(payload).encode("utf-8") req = urllib.request.Request(webhook_url, method="POST", data=data, headers={"Content-Type": "application/json"}) try: with urllib.request.urlopen(req, timeout=30) as r: return r.status, r.read().decode("utf-8", errors="replace")[:500] except urllib.error.HTTPError as e: return e.code, e.read().decode("utf-8", errors="replace")[:500] def build_webhook_payload_suc_to_marca(branch_contact, sucursal_name="PINOTEPA"): """Imita el body plano que GHL envía cuando dispara un webhook al crear contacto. n8n envolverá automáticamente esto en `$json.body` cuando lo reciba.""" full_name = f"{branch_contact.get('firstName') or ''} {branch_contact.get('lastName') or ''}".strip() return { "contact_id": branch_contact["id"], "first_name": branch_contact.get("firstName") or "", "last_name": branch_contact.get("lastName") or "", "full_name": full_name, "email": branch_contact.get("email") or "", "phone": branch_contact.get("phone") or "", "country": branch_contact.get("country") or "MX", "date_created": branch_contact.get("dateAdded") or "", "tags": ",".join(branch_contact.get("tags") or []), "Sucursal": sucursal_name, "TIENDA": sucursal_name, "ID Contacto Sucursal": branch_contact["id"], "Fuente de Posible cliente": "test_harness", "Marca del Vehiculo": "", "Version del Vehiculo": "", "Año del Vehículo": "", "¿Qué modalidad prefieres?": "", "Acepta los terminos para tu cotización": "", "Archivos Adicionales": "", "CANAL DE ORIGEN": "test", "Check Comunicaciones de Marketing": "", "Descripción": "", "Fuente de Prospecto": "test", "ID Contacto Monte Providencia": "", "Información Adicional": "", "Presupuesto": "", "[Correo_Canalizados]": "", "[Correo_Empresa]": "", "[Dirección de la Empresa]": "", "[Nombre de la Empresa]": "", "[Número de Teléfono para Atender]": "", "[Número de WhatsApp para Atender]": "", "¿Cuándo necesitas el dinero?": "", "contact_type": "lead", "customData": {}, "full_address": "", "location": {"id": PINOTEPA_LOCATION_ID, "name": "85957 - MP - PINOTEPA"}, "triggerData": {}, "workflow": {"id": WORKFLOW_SUC_TO_MARCA, "name": "test_harness"}, } def build_webhook_payload_marca_to_suc(brand_contact, sucursal_name="PINOTEPA"): """Para el otro sentido. Payload plano (n8n envuelve).""" full_name = f"{brand_contact.get('firstName') or ''} {brand_contact.get('lastName') or ''}".strip() return { "contact_id": brand_contact["id"], "first_name": brand_contact.get("firstName") or "", "last_name": brand_contact.get("lastName") or "", "full_name": full_name, "email": brand_contact.get("email") or "", "phone": brand_contact.get("phone") or "", "country": brand_contact.get("country") or "MX", "date_created": brand_contact.get("dateAdded") or "", "tags": ",".join(brand_contact.get("tags") or []), "Sucursal": sucursal_name, "TIENDA": sucursal_name, "Fuente de Posible cliente": "test_harness", "customData": {}, "location": {"id": BRAND_LOCATION_ID, "name": "Monte Providencia"}, "workflow": {"id": WORKFLOW_MARCA_TO_SUC, "name": "test_harness"}, } # --------------------------------------------------------------------------- # Cleanup tracker # --------------------------------------------------------------------------- class Cleanup: def __init__(self): self.contacts = [] # list of (location_id, contact_id, note) def register(self, location_id, contact_id, note=""): if contact_id: self.contacts.append((location_id, contact_id, note)) def run(self): log(f"[cleanup] borrando {len(self.contacts)} contactos test...") ok, fail = 0, 0 # Borramos en orden inverso por si el delete en cascade de un nodo padre # invalida otros (no debería pasar, pero defensivo). for loc, cid, note in reversed(self.contacts): try: if delete_contact_safe(loc, cid): ok += 1 log(f" [del] {loc}/{cid} ({note})") else: log(f" [skip-404] {loc}/{cid} ({note})") except Exception as e: fail += 1 log(f" [FAIL] {loc}/{cid}: {e}") log(f"[cleanup] {ok} borrados, {fail} errores.") return fail == 0 # --------------------------------------------------------------------------- # Test scenarios # --------------------------------------------------------------------------- def _ts_suffix(): return datetime.datetime.now().strftime("%y%m%d_%H%M%S") def _new_contact_payload(scenario_id, *, phone=None, email=None, cf_value=None, branch_cf_id=None): """Body base para un contacto test.""" ts = _ts_suffix() body = { "firstName": f"Test{scenario_id.replace('.', '')}", "lastName": f"Harness_{ts}", "tags": ["revisar", "qa-test"], } if phone: body["phone"] = phone if email: body["email"] = email if cf_value and branch_cf_id: body["customFields"] = [{"id": branch_cf_id, "key": CF_KEY, "field_value": cf_value}] return body def assert_(condition, msg): if not condition: raise AssertionError(msg) def assert_eventually(predicate, msg, timeout=None, interval=None): """Asserts que un predicate eventualmente devuelve truthy. Si no, lanza AssertionError tras `timeout` segundos.""" import time as _t timeout = timeout or ASSERT_RETRY_TIMEOUT interval = interval or ASSERT_RETRY_INTERVAL start = _t.time() last_result = None while _t.time() - start < timeout: last_result = predicate() if last_result: return last_result _t.sleep(interval) raise AssertionError(f"{msg} (last={last_result!r})") def _find_brand_with_cf(value): """Busca contacto en Marca por el CF id_contacto_sucursal=value.""" token = get_token(BRAND_LOCATION_ID) try: r = gc._request("POST", "/contacts/search", token, json={ "locationId": BRAND_LOCATION_ID, "pageLimit": 5, "filters": [{"group": "AND", "filters": [ {"field": f"customFields.{CF_ID_BRAND}", "operator": "eq", "value": value} ]}] }) return r.get("contacts") or [] except Exception as e: log(f" [warn] search por CF falló: {e}") return [] def wait_for_brand_cf_indexed(branch_id, expected_count=1, timeout=25, interval=2): """GHL puede tardar varios segundos en indexar un nuevo CF en el filter search. Hace polling hasta que el search por CF devuelve `expected_count` matches. Retorna True si converge, False si timeout.""" import time as _t start = _t.time() while _t.time() - start < timeout: n = len(_find_brand_with_cf(branch_id)) if n >= expected_count: return True _t.sleep(interval) return False def wait_for_brand_query_indexed(query, expected_count=1, timeout=25, interval=2): """Polling por search query (que el workflow usa para phone/email/nombre).""" import time as _t token = get_token(BRAND_LOCATION_ID) start = _t.time() while _t.time() - start < timeout: try: r = gc._request("POST", "/contacts/search", token, json={ "locationId": BRAND_LOCATION_ID, "pageLimit": 5, "query": query }) n = len(r.get("contacts") or []) if n >= expected_count: return True except Exception: pass _t.sleep(interval) return False # ---- Setup helpers ---- def setup_branch_contact(scenario_id, *, phone=None, email=None, with_cf=True, location_id=PINOTEPA_LOCATION_ID): """Crea un contacto en sucursal con CF poblado (si with_cf=True).""" branch_cf_id = resolve_cf_id_in_location(location_id) if with_cf else None payload = _new_contact_payload(scenario_id, phone=phone, email=email) cid = create_contact(location_id, payload) if with_cf and branch_cf_id: # Poblar el CF con su propio id (autoreferencia, igual que el fill_*) update_contact(location_id, cid, { "customFields": [{"id": branch_cf_id, "key": CF_KEY, "field_value": cid}] }) return cid def setup_brand_contact_with_cf(scenario_id, branch_contact_id, *, phone=None, email=None): """Crea contacto en Marca con CF apuntando a branch_contact_id.""" payload = _new_contact_payload(scenario_id, phone=phone, email=email, cf_value=branch_contact_id, branch_cf_id=CF_ID_BRAND) return create_contact(BRAND_LOCATION_ID, payload) # ---- Escenarios Fase 1 (Sucursal → Marca) ---- def scenario_1_1(cleanup): """Happy: sucursal con CF + Marca con CF apuntando. Esperado: match por CF, UPDATE.""" log("== Scenario 1.1: happy path con CF ==") branch_id = setup_branch_contact("1.1", phone="+5219991110011", email="test11@e3.local") cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.1 branch") log(f" branch contact: {branch_id}") brand_id = setup_brand_contact_with_cf("1.1", branch_id, phone="+5219991110011", email="test11@e3.local") cleanup.register(BRAND_LOCATION_ID, brand_id, "1.1 brand") log(f" brand contact: {brand_id} (CF -> {branch_id})") log(f" esperando indexación del CF en Marca...") assert_(wait_for_brand_cf_indexed(branch_id), "Brand CF no se indexó a tiempo") # Disparar webhook branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id) payload = build_webhook_payload_suc_to_marca(branch_obj) s, body = fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, payload) log(f" webhook status: {s} body: {body[:100]}") time.sleep(WAIT_AFTER_WEBHOOK) # Verificar: que NO se creó un segundo contacto en Marca con CF=branch_id def check_unique(): matches = _find_brand_with_cf(branch_id) return matches if len(matches) == 1 and matches[0]["id"] == brand_id else None assert_eventually(check_unique, "esperaba 1 match por CF apuntando al brand original") log(" [OK] match único por CF; no duplicado.") def scenario_1_2(cleanup): """Bug original: sucursal SIN phone ni email, Marca con CF correcto.""" log("== Scenario 1.2: phone+email vacíos (bug original) ==") branch_id = setup_branch_contact("1.2", phone=None, email=None) cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.2 branch") brand_id = setup_brand_contact_with_cf("1.2", branch_id, phone=None, email=None) cleanup.register(BRAND_LOCATION_ID, brand_id, "1.2 brand") log(f" esperando indexación CF...") assert_(wait_for_brand_cf_indexed(branch_id), "Brand CF no se indexó a tiempo") branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id) payload = build_webhook_payload_suc_to_marca(branch_obj) fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, payload) time.sleep(WAIT_AFTER_WEBHOOK) assert_eventually(lambda: len(_find_brand_with_cf(branch_id)) == 1, "esperaba 1 match (sin duplicado)") log(" [OK] sin duplicado.") def scenario_1_3(cleanup): """CF vacío en sucursal pero phone presente → cascada phone.""" log("== Scenario 1.3: CF vacío en sucursal, fallback a phone ==") branch_id = setup_branch_contact("1.3", phone="+5219991110013", email="test13@e3.local", with_cf=False) cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.3 branch") # Marca SIN CF pero con mismo phone — debe matchear por cascada payload = _new_contact_payload("1.3", phone="+5219991110013", email="test13@e3.local") brand_id = create_contact(BRAND_LOCATION_ID, payload) cleanup.register(BRAND_LOCATION_ID, brand_id, "1.3 brand") log(f" esperando indexación de phone en Marca...") assert_(wait_for_brand_query_indexed("+5219991110013"), "phone no indexado a tiempo") branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id) p = build_webhook_payload_suc_to_marca(branch_obj) fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, p) time.sleep(WAIT_AFTER_WEBHOOK) # Tras la corrida con Fase 1, esperaríamos que el workflow haya poblado el CF # en el Marca existente con el branch_id (vía la inyección defensive). upd = get_contact(BRAND_LOCATION_ID, brand_id) cf_now = cf_value_of(upd, CF_ID_BRAND) if cf_now == branch_id: log(f" [OK] cascada phone funcionó + CF inyectado = {cf_now}") else: log(f" [WARN] CF post-update = {cf_now!r} (esperaba {branch_id}). Cascada quizá creó nuevo.") # Si creó duplicado: el find por CF debería seguir vacío matches = _find_brand_with_cf(branch_id) if matches: cleanup.register(BRAND_LOCATION_ID, matches[0]["id"], "1.3 brand (creado por workflow)") assert_(False, "CF no quedó poblado tras cascada") def scenario_1_4(cleanup): """Contacto nuevo en sucursal, no existe en Marca → crea + CF poblado.""" log("== Scenario 1.4: nuevo, crea en Marca ==") branch_id = setup_branch_contact("1.4", phone="+5219991110014", email="test14@e3.local") cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.4 branch") branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id) p = build_webhook_payload_suc_to_marca(branch_obj) fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, p) time.sleep(WAIT_AFTER_WEBHOOK) matches = assert_eventually( lambda: _find_brand_with_cf(branch_id) if len(_find_brand_with_cf(branch_id)) == 1 else None, "esperaba que se creara 1 contacto en Marca con CF" ) brand_new = matches[0] cleanup.register(BRAND_LOCATION_ID, brand_new["id"], "1.4 brand (creado por workflow)") log(f" [OK] creado en Marca: {brand_new['id']} CF={branch_id}") def scenario_1_5(cleanup): """Sucursal con CF + 2 contactos Marca con mismo phone. CF gana.""" log("== Scenario 1.5: CF gana sobre multi-phone ==") branch_id = setup_branch_contact("1.5", phone="+5219991110015", email="test15a@e3.local") cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "1.5 branch") # 2 Marca con mismo phone, solo 1 con CF correcto brand_correct = setup_brand_contact_with_cf("1.5a", branch_id, phone="+5219991110015", email="test15a@e3.local") cleanup.register(BRAND_LOCATION_ID, brand_correct, "1.5 brand correct") log(f" esperando indexación CF...") assert_(wait_for_brand_cf_indexed(branch_id), "Brand CF no se indexó a tiempo") payload_decoy = _new_contact_payload("1.5b", phone="+5219991110015", email="test15b@e3.local") # Sin CF try: brand_decoy = create_contact(BRAND_LOCATION_ID, payload_decoy) cleanup.register(BRAND_LOCATION_ID, brand_decoy, "1.5 brand decoy") except Exception as e: # GHL puede rechazar duplicate phone sin allowDuplicateContact; en ese caso saltamos el caso log(f" [skip] GHL rechazó duplicate phone: {e}") return branch_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id) p = build_webhook_payload_suc_to_marca(branch_obj) fire_webhook(WEBHOOK_URL_SUC_TO_MARCA, p) time.sleep(WAIT_AFTER_WEBHOOK) def check_correct(): m = _find_brand_with_cf(branch_id) return m if len(m) == 1 and m[0]["id"] == brand_correct else None assert_eventually(check_correct, "CF debería apuntar al brand_correct (no al decoy)") log(" [OK] CF prevaleció sobre cascada phone.") # ---- Escenarios Fase 2 (Marca → Sucursal) ---- def scenario_2_1(cleanup): log("== Scenario 2.1: Marca con CF, sucursal existing ==") branch_id = setup_branch_contact("2.1", phone="+5219991110021", email="test21@e3.local") cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.1 branch") brand_id = setup_brand_contact_with_cf("2.1", branch_id, phone="+5219991110021", email="test21@e3.local") cleanup.register(BRAND_LOCATION_ID, brand_id, "2.1 brand") brand_obj = get_contact(BRAND_LOCATION_ID, brand_id) p = build_webhook_payload_marca_to_suc(brand_obj) fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p) time.sleep(WAIT_AFTER_WEBHOOK) # Verificar GET directo + que no se creó otro en sucursal (audit por CF en Marca). suc_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id) assert_(suc_obj and suc_obj["id"] == branch_id, "contacto sucursal preexistente desapareció") assert_eventually(lambda: len(_find_brand_with_cf(branch_id)) == 1, "esperaba 1 marca con CF apuntando al sucursal") log(" [OK] sin duplicado en sucursal.") def scenario_2_2(cleanup): log("== Scenario 2.2: Marca con CF + sucursal sin phone/email ==") branch_id = setup_branch_contact("2.2", phone=None, email=None) cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.2 branch") brand_id = setup_brand_contact_with_cf("2.2", branch_id, phone=None, email=None) cleanup.register(BRAND_LOCATION_ID, brand_id, "2.2 brand") brand_obj = get_contact(BRAND_LOCATION_ID, brand_id) p = build_webhook_payload_marca_to_suc(brand_obj) fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p) time.sleep(WAIT_AFTER_WEBHOOK) # Verificar GET directo + que el CF en Marca sigue apuntando solo al brand_id original. existing = get_contact(PINOTEPA_LOCATION_ID, branch_id) assert_(existing is not None and existing.get("id") == branch_id, "el contacto sucursal preexistente fue eliminado/perdido") def check_one_marca(): m = _find_brand_with_cf(branch_id) return m if len(m) == 1 and m[0]["id"] == brand_id else None assert_eventually(check_one_marca, "esperaba 1 marca con CF apuntando al sucursal") log(" [OK] sin duplicado pese a phone+email vacíos.") def scenario_2_3(cleanup): log("== Scenario 2.3: Marca con CF vacío + sucursal existing → cascada ==") branch_id = setup_branch_contact("2.3", phone="+5219991110023", email="test23@e3.local") cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.3 branch") # Marca SIN CF payload = _new_contact_payload("2.3", phone="+5219991110023", email="test23@e3.local") brand_id = create_contact(BRAND_LOCATION_ID, payload) cleanup.register(BRAND_LOCATION_ID, brand_id, "2.3 brand") brand_obj = get_contact(BRAND_LOCATION_ID, brand_id) p = build_webhook_payload_marca_to_suc(brand_obj) fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p) time.sleep(WAIT_AFTER_WEBHOOK) suc_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id) assert_(suc_obj and suc_obj["id"] == branch_id, "contacto sucursal desapareció") # Eventualmente el CF en sucursal debería tener su propio id (autoref) tras UPDATE log(" [OK] cascada funcionó.") def scenario_2_4(cleanup): log("== Scenario 2.4: Marca con CF apuntando a id inexistente (stale) ==") branch_id = setup_branch_contact("2.4", phone="+5219991110024", email="test24@e3.local") cleanup.register(PINOTEPA_LOCATION_ID, branch_id, "2.4 branch") fake_id = "STALE_" + os.urandom(7).hex() payload = _new_contact_payload("2.4", phone="+5219991110024", email="test24@e3.local", cf_value=fake_id, branch_cf_id=CF_ID_BRAND) brand_id = create_contact(BRAND_LOCATION_ID, payload) cleanup.register(BRAND_LOCATION_ID, brand_id, "2.4 brand") brand_obj = get_contact(BRAND_LOCATION_ID, brand_id) p = build_webhook_payload_marca_to_suc(brand_obj) fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p) time.sleep(WAIT_AFTER_WEBHOOK) suc_obj = get_contact(PINOTEPA_LOCATION_ID, branch_id) assert_(suc_obj and suc_obj["id"] == branch_id, "contacto sucursal desapareció") log(" [OK] cascada manejó CF stale.") def scenario_2_5(cleanup): log("== Scenario 2.5: Marca nuevo, crea en sucursal + autoenlace bidireccional ==") payload = _new_contact_payload("2.5", phone="+5219991110025", email="test25@e3.local") brand_id = create_contact(BRAND_LOCATION_ID, payload) cleanup.register(BRAND_LOCATION_ID, brand_id, "2.5 brand") brand_obj = get_contact(BRAND_LOCATION_ID, brand_id) p = build_webhook_payload_marca_to_suc(brand_obj) fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p) time.sleep(WAIT_AFTER_WEBHOOK + 5) # autoenlace tarda más (3 PUTs) # Buscar en sucursal por phone (con retry para indexación) suc_token = get_token(PINOTEPA_LOCATION_ID) def query_branch(): r = gc._request("POST", "/contacts/search", suc_token, json={ "locationId": PINOTEPA_LOCATION_ID, "pageLimit": 10, "query": "+5219991110025" }) c = r.get("contacts") or [] return c if len(c) == 1 else None contacts = assert_eventually(query_branch, "esperaba 1 contacto sucursal creado") new_branch_id = contacts[0]["id"] cleanup.register(PINOTEPA_LOCATION_ID, new_branch_id, "2.5 branch (creado por workflow)") log(f" creado sucursal: {new_branch_id}") # Verificar CF en ambos lados (Fase 3 incluida) branch_cf_id = resolve_cf_id_in_location(PINOTEPA_LOCATION_ID) new_branch_obj = get_contact(PINOTEPA_LOCATION_ID, new_branch_id) suc_cf_val = cf_value_of(new_branch_obj, branch_cf_id) updated_brand = get_contact(BRAND_LOCATION_ID, brand_id) brand_cf_val = cf_value_of(updated_brand, CF_ID_BRAND) log(f" CF en sucursal: {suc_cf_val!r} (esperado: {new_branch_id})") log(f" CF en marca: {brand_cf_val!r} (esperado: {new_branch_id})") assert_(suc_cf_val == new_branch_id, "CF en sucursal no es autorref") assert_(brand_cf_val == new_branch_id, "CF en Marca no apunta a sucursal nueva") log(" [OK] autoenlace bidireccional funcionando.") def scenario_3_2(cleanup): """Re-disparar webhook tras 2.5 - autoreparación.""" log("== Scenario 3.2: re-disparo, autoreparación funciona ==") # Hacemos el 2.5 primero, luego re-disparamos. payload = _new_contact_payload("3.2", phone="+5219991110032", email="test32@e3.local") brand_id = create_contact(BRAND_LOCATION_ID, payload) cleanup.register(BRAND_LOCATION_ID, brand_id, "3.2 brand") brand_obj = get_contact(BRAND_LOCATION_ID, brand_id) p = build_webhook_payload_marca_to_suc(brand_obj) fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p) time.sleep(WAIT_AFTER_WEBHOOK + 5) suc_token = get_token(PINOTEPA_LOCATION_ID) def q1(): r = gc._request("POST", "/contacts/search", suc_token, json={ "locationId": PINOTEPA_LOCATION_ID, "pageLimit": 10, "query": "+5219991110032" }) c = r.get("contacts") or [] return c if len(c) == 1 else None contacts = assert_eventually(q1, "esperaba 1 en sucursal tras CREATE") new_branch_id = contacts[0]["id"] cleanup.register(PINOTEPA_LOCATION_ID, new_branch_id, "3.2 branch (creado)") # Segundo disparo del MISMO webhook con el mismo brand_id log(" segundo disparo (auto-reparación)...") brand_obj_2 = get_contact(BRAND_LOCATION_ID, brand_id) # ya tiene CF poblado p2 = build_webhook_payload_marca_to_suc(brand_obj_2) fire_webhook(WEBHOOK_URL_MARCA_TO_SUC, p2) time.sleep(WAIT_AFTER_WEBHOOK) r2 = gc._request("POST", "/contacts/search", suc_token, json={ "locationId": PINOTEPA_LOCATION_ID, "pageLimit": 10, "query": "+5219991110032" }) n2 = len(r2.get("contacts") or []) assert_(n2 == 1, f"segundo disparo creó duplicado en sucursal: ahora hay {n2}") log(" [OK] segundo disparo solo hizo UPDATE (match directo por CF).") # ---- Scenario registry ---- SCENARIOS = { "1.1": scenario_1_1, "1.2": scenario_1_2, "1.3": scenario_1_3, "1.4": scenario_1_4, "1.5": scenario_1_5, "2.1": scenario_2_1, "2.2": scenario_2_2, "2.3": scenario_2_3, "2.4": scenario_2_4, "2.5": scenario_2_5, "3.2": scenario_3_2, } GROUPS = { "all-phase1": ["1.1", "1.2", "1.3", "1.4", "1.5"], "all-phase2": ["2.1", "2.2", "2.3", "2.4", "2.5"], "all-phase3": ["3.2"], "all": ["1.1", "1.2", "1.3", "1.4", "1.5", "2.1", "2.2", "2.3", "2.4", "2.5", "3.2"], } def run_scenarios(names): results = [] for name in names: fn = SCENARIOS.get(name) if not fn: log(f"[skip] scenario desconocido: {name}") continue cleanup = Cleanup() try: fn(cleanup) results.append((name, "PASS", None)) except AssertionError as e: results.append((name, "FAIL", str(e))) log(f" [FAIL] {e}") except Exception as e: results.append((name, "ERROR", f"{type(e).__name__}: {e}")) log(f" [ERROR] {type(e).__name__}: {e}") traceback.print_exc() finally: cleanup.run() return results def main(): parser = argparse.ArgumentParser(description=__doc__.splitlines()[0]) parser.add_argument("--scenario", required=True, help="Nombre del scenario (ej 1.2) o grupo (all-phase1, all-phase2, all-phase3, all).") args = parser.parse_args() if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") names = GROUPS.get(args.scenario, [args.scenario]) log(f"Ejecutando: {names}") results = run_scenarios(names) log("\n=== RESULTADOS ===") for name, status, err in results: log(f" {name}: {status}" + (f" -- {err}" if err else "")) n_pass = sum(1 for _, s, _ in results if s == "PASS") n_total = len(results) log(f"\n{n_pass}/{n_total} passed") sys.exit(0 if n_pass == n_total else 1) if __name__ == "__main__": main()