1062 lines
47 KiB
Python
1062 lines
47 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""sync_missing_opps_to_brand.py
|
|
|
|
Sincroniza al CRM de Marca las oportunidades que existen en sucursal y NO tienen
|
|
replica en Marca (bucket "opportunities_in_branch_not_in_brand" del audit
|
|
audit_brand_vs_branches_totals).
|
|
|
|
Flujo por cada opp de sucursal sin replica en Marca:
|
|
|
|
1. Carga el contacto de sucursal asociado a la opp.
|
|
2. Busca contacto en Marca en este orden:
|
|
a) telefono normalizado (ultimos 10 digitos)
|
|
b) email (lowercase)
|
|
c) nombre completo (normalizado: sin acentos, lowercase, espacios colapsados)
|
|
3. Si no existe contacto en Marca, lo crea con todos los datos basicos y los
|
|
custom fields mapeados por nombre desde el schema de la sucursal al de Marca.
|
|
4. Espera 10 segundos si se creo contacto nuevo, por si hay una automatizacion
|
|
en Marca que dispara la creacion de la opp automaticamente.
|
|
5. Vuelve a buscar oportunidades del contacto en Marca (refresca via API GHL).
|
|
6. Si existe opp en Marca: la actualiza con los datos de la opp de sucursal
|
|
(custom fields mapeados por nombre).
|
|
7. Si no existe opp: crea opp en Marca con todos los datos de la sucursal.
|
|
|
|
Modos:
|
|
- dry-run (default): no escribe nada en GHL, solo planea.
|
|
- --apply: ejecuta las escrituras y registra cada cambio en script_audit
|
|
(rollback disponible desde el dashboard).
|
|
|
|
Uso CLI:
|
|
python scripts/sync_missing_opps_to_brand.py
|
|
python scripts/sync_missing_opps_to_brand.py --apply --yes
|
|
python scripts/sync_missing_opps_to_brand.py --apply --run-id <ID>
|
|
python scripts/sync_missing_opps_to_brand.py --only-opp <opp_id>
|
|
python scripts/sync_missing_opps_to_brand.py --json
|
|
|
|
Para uso programatico (desde el endpoint /api/comparativa/sync-missing-opps):
|
|
from scripts.sync_missing_opps_to_brand import run_sync
|
|
result = run_sync(dry_run=True)
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
import time
|
|
import unicodedata
|
|
from datetime import datetime
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT_DIR not in sys.path:
|
|
sys.path.insert(0, ROOT_DIR)
|
|
SCRIPTS_DIR = os.path.dirname(os.path.abspath(__file__))
|
|
if SCRIPTS_DIR not in sys.path:
|
|
sys.path.insert(0, SCRIPTS_DIR)
|
|
|
|
import script_audit # noqa: E402
|
|
import sync_engine # noqa: E402
|
|
from audit_brand_vs_branches_totals import run_audit # noqa: E402
|
|
from common import ( # noqa: E402
|
|
match_contacts,
|
|
normalize_email,
|
|
normalize_phone,
|
|
normalize_text,
|
|
)
|
|
|
|
from paths import DB_PATH
|
|
BRAND_LOCATION_ID = "GbKkBpCmKu2QmloKFHy3"
|
|
MATCH_THRESHOLD = 0.80
|
|
SCRIPT_NAME = os.path.basename(__file__)
|
|
|
|
# Tiempo de espera tras crear contacto en Marca, por si automatizaciones de
|
|
# Marca crean la opp automaticamente. Configurable via env var.
|
|
CONTACT_TO_OPP_WAIT_SECS = int(os.getenv("SYNC_MISSING_OPPS_WAIT_SECS", "10"))
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Utilidades
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def safe_print(*args, **kwargs):
|
|
sep = kwargs.get("sep", " ")
|
|
end = kwargs.get("end", "\n")
|
|
text = sep.join(str(a) for a in args)
|
|
encoding = sys.stdout.encoding or "utf-8"
|
|
try:
|
|
sys.stdout.write(text + end)
|
|
sys.stdout.flush()
|
|
except UnicodeEncodeError:
|
|
sys.stdout.write(text.encode(encoding, errors="replace").decode(encoding) + end)
|
|
sys.stdout.flush()
|
|
|
|
|
|
def strip_accents(value):
|
|
if not value:
|
|
return ""
|
|
nfkd = unicodedata.normalize("NFD", str(value))
|
|
return "".join(c for c in nfkd if unicodedata.category(c) != "Mn")
|
|
|
|
|
|
def normalize_name(first, last):
|
|
"""Nombre completo normalizado para matching: sin acentos, lowercase, espacios colapsados.
|
|
|
|
Delega en common.normalize_text para garantizar consistencia con el resto
|
|
de scripts cross-location.
|
|
"""
|
|
return normalize_text(f"{first or ''} {last or ''}")
|
|
|
|
|
|
def parse_custom_fields(raw):
|
|
if not raw:
|
|
return []
|
|
if isinstance(raw, list):
|
|
return raw
|
|
try:
|
|
v = json.loads(raw)
|
|
return v if isinstance(v, list) else []
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def get_cf_value(cf):
|
|
"""GHL retorna 'value' para contactos y 'fieldValue'/'fieldValueString' para opps."""
|
|
for k in ("value", "fieldValue", "fieldValueString"):
|
|
v = cf.get(k)
|
|
if v is not None and v != "":
|
|
return v
|
|
return None
|
|
|
|
|
|
def custom_fields_by_name(record_or_json, id_to_name):
|
|
"""Convierte la lista de CFs en {field_name: value}."""
|
|
out = {}
|
|
raw = record_or_json.get("custom_fields_json") if isinstance(record_or_json, dict) else None
|
|
if raw is None and isinstance(record_or_json, dict):
|
|
raw = record_or_json.get("customFields") or record_or_json.get("custom_fields")
|
|
cfs = parse_custom_fields(raw) if isinstance(raw, str) else (raw or [])
|
|
for cf in cfs:
|
|
fid = cf.get("id") or cf.get("fieldId")
|
|
fname = id_to_name.get(fid)
|
|
if not fname:
|
|
continue
|
|
val = get_cf_value(cf)
|
|
if val is not None and val != "":
|
|
out[fname] = val
|
|
return out
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Carga
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def load_schemas_id_to_name(conn, location_id, object_key):
|
|
rows = conn.execute(
|
|
"SELECT field_id, field_name FROM object_schemas WHERE location_id=? AND object_key=?",
|
|
(location_id, object_key),
|
|
).fetchall()
|
|
return {r["field_id"]: r["field_name"] for r in rows}
|
|
|
|
|
|
def load_schemas_name_to_id(conn, location_id, object_key):
|
|
rows = conn.execute(
|
|
"SELECT field_id, field_name FROM object_schemas WHERE location_id=? AND object_key=?",
|
|
(location_id, object_key),
|
|
).fetchall()
|
|
# Si hay duplicados, conservamos el primero (igual logica que SchemaResolver.get_field_id).
|
|
out = {}
|
|
for r in rows:
|
|
out.setdefault(r["field_name"], r["field_id"])
|
|
return out
|
|
|
|
|
|
def load_brand_contacts(conn):
|
|
return [
|
|
dict(r) for r in conn.execute(
|
|
"SELECT id, first_name, last_name, phone, email FROM contacts WHERE location_id=?",
|
|
(BRAND_LOCATION_ID,),
|
|
).fetchall()
|
|
]
|
|
|
|
|
|
def load_brand_opps_by_contact(conn):
|
|
"""Devuelve {contact_id: [opp_dicts]} desde SQLite (snapshot).
|
|
|
|
Incluye `custom_fields_json` para que `_pick_existing` pueda matchear por el
|
|
custom field "ID Oportunidad Sucursal" sin tener que ir en vivo al CRM.
|
|
"""
|
|
from collections import defaultdict
|
|
out = defaultdict(list)
|
|
rows = conn.execute(
|
|
"SELECT id, contact_id, name, status, pipeline_id, pipeline_stage_id, "
|
|
"monetary_value, custom_fields_json "
|
|
"FROM opportunities WHERE location_id=?",
|
|
(BRAND_LOCATION_ID,),
|
|
).fetchall()
|
|
for r in rows:
|
|
out[r["contact_id"]].append(dict(r))
|
|
return out
|
|
|
|
|
|
def index_brand_contacts(contacts):
|
|
"""Devuelve indices por phone10, email_lc, name_norm.
|
|
|
|
Regla del index por nombre: solo se indexan contactos que NO tienen phone
|
|
NI email. Esto garantiza que un match por nombre conecte unicamente con un
|
|
candidato del mismo tipo (tambien sin identificadores fuertes). Si el
|
|
contacto del destino tiene phone o email, no entra al idx_name — entonces
|
|
nunca matchearia por nombre con un source.
|
|
"""
|
|
by_phone, by_email, by_name = {}, {}, {}
|
|
for c in contacts:
|
|
p = normalize_phone(c.get("phone"))
|
|
e = normalize_email(c.get("email"))
|
|
n = normalize_name(c.get("first_name"), c.get("last_name"))
|
|
if p:
|
|
by_phone.setdefault(p, c)
|
|
if e:
|
|
by_email.setdefault(e, c)
|
|
if n and not p and not e:
|
|
by_name.setdefault(n, c)
|
|
return by_phone, by_email, by_name
|
|
|
|
|
|
def load_branch_contact(conn, location_id, contact_id):
|
|
row = conn.execute(
|
|
"SELECT * FROM contacts WHERE location_id=? AND id=?",
|
|
(location_id, contact_id),
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def load_branch_opp(conn, location_id, opp_id):
|
|
row = conn.execute(
|
|
"SELECT * FROM opportunities WHERE location_id=? AND id=?",
|
|
(location_id, opp_id),
|
|
).fetchone()
|
|
return dict(row) if row else None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Upsert SQLite tras mutaciones GHL (mantener snapshot local sincronizado)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def _now_str():
|
|
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
|
|
|
|
|
def upsert_brand_contact_in_db(conn, brand_contact_id, payload, tags=None):
|
|
"""Wrapper retrocompatible: upsert en Marca."""
|
|
return upsert_contact_in_db(conn, brand_contact_id, payload, BRAND_LOCATION_ID, tags=tags)
|
|
|
|
|
|
def upsert_contact_in_db(conn, contact_id, payload, location_id, tags=None):
|
|
"""Inserta o reemplaza un contacto en SQLite local tras crearlo en GHL.
|
|
|
|
payload es el dict enviado a GHL en create_contact (camelCase). Custom fields
|
|
en el payload van como [{id, value}] — los almacenamos como JSON en
|
|
custom_fields_json igual que el sync original.
|
|
|
|
location_id define en qué location del snapshot lo guardamos (Marca o sucursal).
|
|
"""
|
|
custom_fields = payload.get("customFields") or []
|
|
tags_json = json.dumps(tags or payload.get("tags") or [], ensure_ascii=False)
|
|
cf_json = json.dumps(custom_fields, ensure_ascii=False)
|
|
conn.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO contacts
|
|
(id, location_id, first_name, last_name, email, phone, tags, custom_fields_json, date_added, updated_at, synced_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
contact_id,
|
|
location_id,
|
|
payload.get("firstName"),
|
|
payload.get("lastName"),
|
|
payload.get("email"),
|
|
payload.get("phone"),
|
|
tags_json,
|
|
cf_json,
|
|
_now_str(),
|
|
_now_str(),
|
|
_now_str(),
|
|
),
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
def upsert_brand_opp_in_db(conn, brand_opp_id, payload, contact_id, status_override=None):
|
|
"""Inserta o reemplaza una opp en SQLite local tras crearla/actualizarla en GHL.
|
|
|
|
payload contiene los campos enviados (puede ser PUT sin locationId/contactId/status,
|
|
o POST con todo). status_override aplica cuando se cambia el estado por endpoint
|
|
dedicado. contact_id se pasa aparte porque PUT no lo lleva en el body.
|
|
"""
|
|
custom_fields = payload.get("customFields") or []
|
|
cf_json = json.dumps(custom_fields, ensure_ascii=False)
|
|
# Para update: leemos el row actual y mergeamos solo los campos que el payload trae.
|
|
existing = conn.execute(
|
|
"SELECT * FROM opportunities WHERE location_id=? AND id=?",
|
|
(BRAND_LOCATION_ID, brand_opp_id),
|
|
).fetchone()
|
|
if existing:
|
|
existing = dict(existing)
|
|
merged = {
|
|
"id": brand_opp_id,
|
|
"location_id": BRAND_LOCATION_ID,
|
|
"name": payload.get("name") or existing.get("name"),
|
|
"status": status_override or payload.get("status") or existing.get("status"),
|
|
"pipeline_id": payload.get("pipelineId") or existing.get("pipeline_id"),
|
|
"pipeline_stage_id": payload.get("pipelineStageId") or existing.get("pipeline_stage_id"),
|
|
"monetary_value": payload.get("monetaryValue") if "monetaryValue" in payload else existing.get("monetary_value"),
|
|
"contact_id": contact_id or existing.get("contact_id"),
|
|
"date_added": existing.get("date_added"),
|
|
"custom_fields_json": cf_json if custom_fields else existing.get("custom_fields_json"),
|
|
}
|
|
else:
|
|
merged = {
|
|
"id": brand_opp_id,
|
|
"location_id": BRAND_LOCATION_ID,
|
|
"name": payload.get("name"),
|
|
"status": status_override or payload.get("status") or "open",
|
|
"pipeline_id": payload.get("pipelineId"),
|
|
"pipeline_stage_id": payload.get("pipelineStageId"),
|
|
"monetary_value": payload.get("monetaryValue"),
|
|
"contact_id": contact_id,
|
|
"date_added": _now_str(),
|
|
"custom_fields_json": cf_json,
|
|
}
|
|
conn.execute(
|
|
"""
|
|
INSERT OR REPLACE INTO opportunities
|
|
(id, location_id, name, status, pipeline_id, pipeline_stage_id, monetary_value, contact_id, date_added, custom_fields_json, synced_at)
|
|
VALUES (:id, :location_id, :name, :status, :pipeline_id, :pipeline_stage_id, :monetary_value, :contact_id, :date_added, :custom_fields_json, datetime('now', 'localtime'))
|
|
""",
|
|
merged,
|
|
)
|
|
conn.commit()
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Live GHL helpers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def fetch_brand_opps_for_contact_live(brand_token, contact_id):
|
|
"""Devuelve las opps Marca del contacto_id especificado.
|
|
|
|
BUG CRÃTICO descubierto: GHL ignora el filtro contactId en
|
|
POST /opportunities/search (devuelve todas las opps de la location).
|
|
Por eso filtramos en Python tras la búsqueda. Sin este filtro local,
|
|
el script pisaba opps incorrectas usando _pick_existing.
|
|
|
|
Estrategia: paginar todas las opps Marca y filtrar por contactId real.
|
|
Como Marca puede tener miles de opps, hacemos paginación normal y
|
|
early-exit en cuanto encontramos las que matchean.
|
|
"""
|
|
try:
|
|
matched = []
|
|
page = 1
|
|
while True:
|
|
body = {"locationId": BRAND_LOCATION_ID, "limit": 100, "page": page}
|
|
data = sync_engine.ghl_client._request(
|
|
"POST", "/opportunities/search", brand_token,
|
|
json=body,
|
|
)
|
|
batch = data.get("opportunities", []) if isinstance(data, dict) else []
|
|
if not batch:
|
|
break
|
|
# Filtrar localmente por contactId real.
|
|
for opp in batch:
|
|
if opp.get("contactId") == contact_id:
|
|
matched.append(opp)
|
|
if len(batch) < 100:
|
|
break
|
|
page += 1
|
|
# Hard cap defensivo: 100 páginas = 10k opps. Si Marca tiene más,
|
|
# estarÃamos paginando demasiado; aceptamos el riesgo de no
|
|
# encontrar el match porque el contacto probablemente no tiene opps.
|
|
if page > 100:
|
|
break
|
|
return matched
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Matching de contacto en Marca (telefono -> email -> nombre)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def find_brand_contact(branch_contact, idx_phone, idx_email, idx_name, threshold=MATCH_THRESHOLD):
|
|
"""Devuelve (match_dict, match_strategy, collision_candidate).
|
|
|
|
Estrategia (orden + condiciones):
|
|
1. Telefono normalizado (ultimos 10 digitos): ahora REQUIERE además que
|
|
el nombre matchee con similitud >= threshold (vía common.match_contacts).
|
|
Si el teléfono coincide pero el nombre diverge → colisión sin match.
|
|
Esto evita el caso real de pareja con mismo número siendo tratados
|
|
como duplicado y eliminados por la integración.
|
|
2. Email lowercase exacto (sin requerir nombre — email es identificador
|
|
más fuerte y la colisión es muy rara).
|
|
3. Nombre completo normalizado, PERO solo si:
|
|
a) el contacto source NO tiene phone NI email, y
|
|
b) el candidato del destino TAMPOCO tiene phone ni email
|
|
(garantizado por index_brand_contacts).
|
|
|
|
Returns:
|
|
tuple (match, strategy, collision):
|
|
match: dict del contacto que matchea, o None.
|
|
strategy: 'phone+name' | 'email' | 'name' | None.
|
|
collision: contacto con phone idéntico pero nombre divergente,
|
|
o None. Cuando esto es no-None, match es None — el
|
|
caller debe reportar la colisión y NO mergear.
|
|
"""
|
|
p = normalize_phone(branch_contact.get("phone"))
|
|
if p and p in idx_phone:
|
|
candidate = idx_phone[p]
|
|
result = match_contacts(branch_contact, candidate, threshold=threshold)
|
|
if result["level"] in ("strong", "medium"):
|
|
return candidate, "phone+name", None
|
|
# Mismo teléfono, distinto nombre → colisión sin resolver.
|
|
return None, None, candidate
|
|
e = normalize_email(branch_contact.get("email"))
|
|
if e and e in idx_email:
|
|
return idx_email[e], "email", None
|
|
if not p and not e:
|
|
n = normalize_name(branch_contact.get("first_name"), branch_contact.get("last_name"))
|
|
if n and n in idx_name:
|
|
return idx_name[n], "name", None
|
|
return None, None, None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Construccion de payloads
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def build_brand_contact_payload(branch_contact, branch_contact_schema_id_to_name, brand_contact_schema_name_to_id):
|
|
"""Wrapper retrocompatible: payload de contacto hacia Marca."""
|
|
return build_contact_payload(
|
|
branch_contact,
|
|
branch_contact_schema_id_to_name,
|
|
brand_contact_schema_name_to_id,
|
|
target_location_id=BRAND_LOCATION_ID,
|
|
)
|
|
|
|
|
|
def build_contact_payload(src_contact, src_schema_id_to_name, dst_schema_name_to_id, target_location_id):
|
|
"""Construye payload para POST /contacts/ con custom fields mapeados por nombre
|
|
desde el schema de origen al schema de destino.
|
|
|
|
Funciona en cualquier direccion (Sucursal->Marca o Marca->Sucursal): los CFs
|
|
se mapean por nombre comun y los que no existan en destino simplemente se
|
|
omiten.
|
|
"""
|
|
cf_by_name = custom_fields_by_name(src_contact, src_schema_id_to_name)
|
|
mapped_cfs = []
|
|
for fname, fval in cf_by_name.items():
|
|
target_id = dst_schema_name_to_id.get(fname)
|
|
if target_id:
|
|
mapped_cfs.append({"id": target_id, "value": fval})
|
|
|
|
payload = {
|
|
"locationId": target_location_id,
|
|
"firstName": src_contact.get("first_name") or "",
|
|
"lastName": src_contact.get("last_name") or "",
|
|
}
|
|
if src_contact.get("email"):
|
|
payload["email"] = src_contact["email"]
|
|
if src_contact.get("phone"):
|
|
payload["phone"] = src_contact["phone"]
|
|
if mapped_cfs:
|
|
payload["customFields"] = mapped_cfs
|
|
|
|
tags_raw = src_contact.get("tags")
|
|
if tags_raw:
|
|
try:
|
|
tags = json.loads(tags_raw) if isinstance(tags_raw, str) else tags_raw
|
|
if isinstance(tags, list) and tags:
|
|
payload["tags"] = tags
|
|
except Exception:
|
|
pass
|
|
|
|
if src_contact.get("source"):
|
|
payload["source"] = src_contact["source"]
|
|
|
|
return payload
|
|
|
|
|
|
def build_brand_opp_payload(branch_opp, brand_contact_id, branch_opp_schema_id_to_name, brand_opp_schema_name_to_id, brand_pipeline_id, brand_stage_id, *, for_update=False):
|
|
"""Construye payload para POST/PUT /opportunities/ en Marca.
|
|
|
|
GHL es estricto con los campos aceptados segun el verbo:
|
|
- POST /opportunities/ acepta locationId, status, contactId, name, ...
|
|
- PUT /opportunities/X rechaza locationId, contactId, status — el status
|
|
se cambia con PUT /opportunities/{id}/status (endpoint dedicado).
|
|
"""
|
|
cf_by_name = custom_fields_by_name(branch_opp, branch_opp_schema_id_to_name)
|
|
mapped_cfs = []
|
|
for fname, fval in cf_by_name.items():
|
|
target_id = brand_opp_schema_name_to_id.get(fname)
|
|
if target_id:
|
|
mapped_cfs.append({"id": target_id, "value": fval})
|
|
|
|
payload = {
|
|
"name": branch_opp.get("name") or "Sin nombre",
|
|
}
|
|
if not for_update:
|
|
# Solo POST acepta estos campos.
|
|
payload["locationId"] = BRAND_LOCATION_ID
|
|
payload["status"] = branch_opp.get("status") or "open"
|
|
payload["contactId"] = brand_contact_id
|
|
if brand_pipeline_id:
|
|
payload["pipelineId"] = brand_pipeline_id
|
|
if brand_stage_id:
|
|
payload["pipelineStageId"] = brand_stage_id
|
|
if branch_opp.get("monetary_value") is not None:
|
|
payload["monetaryValue"] = branch_opp["monetary_value"]
|
|
if mapped_cfs:
|
|
payload["customFields"] = mapped_cfs
|
|
return payload
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Resolucion de pipeline/stage en Marca
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def resolve_brand_pipeline_and_stage(conn, branch_loc_id, branch_pipeline_id, branch_stage_id):
|
|
"""
|
|
Intenta hacer match por nombre (pipeline + stage) desde sucursal a Marca.
|
|
Devuelve (brand_pipeline_id, brand_stage_id) o (None, None).
|
|
"""
|
|
# Cargar pipelines de sucursal y Marca
|
|
branch_pipes = conn.execute(
|
|
"SELECT id, name, stages_json FROM pipelines WHERE location_id=?",
|
|
(branch_loc_id,),
|
|
).fetchall()
|
|
brand_pipes = conn.execute(
|
|
"SELECT id, name, stages_json FROM pipelines WHERE location_id=?",
|
|
(BRAND_LOCATION_ID,),
|
|
).fetchall()
|
|
|
|
branch_pipe = next((dict(p) for p in branch_pipes if p["id"] == branch_pipeline_id), None)
|
|
if not branch_pipe:
|
|
return None, None
|
|
|
|
branch_pipe_name = strip_accents(branch_pipe["name"] or "").lower().strip()
|
|
branch_stages = json.loads(branch_pipe["stages_json"] or "[]")
|
|
branch_stage = next((s for s in branch_stages if s.get("id") == branch_stage_id), None)
|
|
branch_stage_name = strip_accents(branch_stage.get("name") or "").lower().strip() if branch_stage else ""
|
|
|
|
# Buscar pipeline en Marca por nombre
|
|
target_brand_pipe = None
|
|
for bp in brand_pipes:
|
|
if strip_accents(bp["name"] or "").lower().strip() == branch_pipe_name:
|
|
target_brand_pipe = dict(bp)
|
|
break
|
|
if not target_brand_pipe:
|
|
# Fallback: tomar el primer pipeline de Marca si solo hay uno.
|
|
if len(brand_pipes) == 1:
|
|
target_brand_pipe = dict(brand_pipes[0])
|
|
else:
|
|
return None, None
|
|
|
|
brand_stages = json.loads(target_brand_pipe["stages_json"] or "[]")
|
|
target_brand_stage = None
|
|
if branch_stage_name:
|
|
for bs in brand_stages:
|
|
if strip_accents(bs.get("name") or "").lower().strip() == branch_stage_name:
|
|
target_brand_stage = bs
|
|
break
|
|
if not target_brand_stage and brand_stages:
|
|
# Fallback: primer stage del pipeline brand.
|
|
target_brand_stage = brand_stages[0]
|
|
|
|
return target_brand_pipe["id"], (target_brand_stage.get("id") if target_brand_stage else None)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Procesamiento
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def run_sync(opp_ids=None, dry_run=True, log=None, run_id=None):
|
|
"""Ejecuta la sincronizacion. Devuelve dict serializable.
|
|
|
|
Args:
|
|
opp_ids: lista opcional de opp_ids (sucursal) a procesar. None = todos los
|
|
del bucket missing_opps_in_brand.
|
|
dry_run: True (default) = no escribe en GHL.
|
|
log: funcion opcional log(line) para streaming.
|
|
run_id: id de script_audit para registrar cambios cuando apply.
|
|
"""
|
|
if log is None:
|
|
log = safe_print
|
|
|
|
if not os.path.exists(DB_PATH):
|
|
raise FileNotFoundError(f"No existe {DB_PATH}. Corre una sincronizacion global primero.")
|
|
|
|
log(f"[{datetime.now().strftime('%H:%M:%S')}] === sync_missing_opps_to_brand ===")
|
|
log(f"Modo: {'DRY-RUN (no escribe)' if dry_run else 'APPLY (escribe en GHL)'}")
|
|
log(f"Espera tras crear contacto: {CONTACT_TO_OPP_WAIT_SECS}s")
|
|
|
|
# 1) Recuperar el bucket desde el audit (sin limite).
|
|
log("Calculando bucket missing_opps_in_brand desde audit...")
|
|
audit_data = run_audit(limit_missing=None)
|
|
missing = audit_data["missing"]["opportunities_in_branch_not_in_brand"]
|
|
targets = missing["items"]
|
|
if opp_ids:
|
|
wanted = set(opp_ids)
|
|
targets = [t for t in targets if t["id"] in wanted]
|
|
log(f"Opps candidatas: {len(targets)} (total en bucket: {missing['total']})")
|
|
|
|
if not targets:
|
|
return {
|
|
"dry_run": dry_run,
|
|
"summary": {
|
|
"candidates": 0,
|
|
"contacts_created": 0,
|
|
"opps_created": 0,
|
|
"opps_updated": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
},
|
|
"items": [],
|
|
}
|
|
|
|
# 2) Resolver tokens y schemas.
|
|
tokens = sync_engine.get_tokens_map()
|
|
brand_token = tokens.get(BRAND_LOCATION_ID)
|
|
if not brand_token:
|
|
raise RuntimeError("No se encontro token para la cuenta de Marca en el CSV de tokens.")
|
|
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
|
|
try:
|
|
brand_contact_schema_name_to_id = load_schemas_name_to_id(conn, BRAND_LOCATION_ID, "contact")
|
|
brand_opp_schema_name_to_id = load_schemas_name_to_id(conn, BRAND_LOCATION_ID, "opportunity")
|
|
|
|
if not brand_contact_schema_name_to_id:
|
|
log("ADVERTENCIA: schema de contactos de Marca vacio. Corre sync de metadata.")
|
|
if not brand_opp_schema_name_to_id:
|
|
log("ADVERTENCIA: schema de opps de Marca vacio. Corre sync de metadata.")
|
|
|
|
# 3) Cargar indice de contactos Marca.
|
|
brand_contacts = load_brand_contacts(conn)
|
|
idx_phone, idx_email, idx_name = index_brand_contacts(brand_contacts)
|
|
brand_opps_local_by_contact = load_brand_opps_by_contact(conn)
|
|
log(f"Indice Marca: {len(brand_contacts)} contactos, {sum(len(v) for v in brand_opps_local_by_contact.values())} opps cacheadas.")
|
|
|
|
# 4) Procesar cada opp.
|
|
results = []
|
|
summary = {
|
|
"candidates": len(targets),
|
|
"contacts_created": 0,
|
|
"opps_created": 0,
|
|
"opps_updated": 0,
|
|
"skipped": 0,
|
|
"errors": 0,
|
|
}
|
|
|
|
for idx, target in enumerate(targets, 1):
|
|
opp_id = target["id"]
|
|
branch_loc_id = target["branch_location_id"]
|
|
branch_name = target.get("branch_name", "")
|
|
log(f"\n[{idx}/{len(targets)}] Opp {opp_id} | sucursal: {branch_name}")
|
|
|
|
item = {
|
|
"opp_id": opp_id,
|
|
"branch_location_id": branch_loc_id,
|
|
"branch_name": branch_name,
|
|
"opp_name": target.get("name", ""),
|
|
"actions": [],
|
|
"status": "pending",
|
|
"error": None,
|
|
}
|
|
|
|
try:
|
|
# Cargar opp y contacto sucursal.
|
|
branch_opp = load_branch_opp(conn, branch_loc_id, opp_id)
|
|
if not branch_opp:
|
|
raise RuntimeError(f"No se encontro la opp {opp_id} en SQLite para {branch_loc_id}")
|
|
branch_contact_id = branch_opp.get("contact_id")
|
|
if not branch_contact_id:
|
|
raise RuntimeError(f"Opp {opp_id} no tiene contact_id en SQLite (huerfana)")
|
|
branch_contact = load_branch_contact(conn, branch_loc_id, branch_contact_id)
|
|
if not branch_contact:
|
|
raise RuntimeError(f"No se encontro el contacto {branch_contact_id} en SQLite")
|
|
|
|
item["branch_contact"] = {
|
|
"id": branch_contact["id"],
|
|
"name": f"{branch_contact.get('first_name') or ''} {branch_contact.get('last_name') or ''}".strip(),
|
|
"phone": branch_contact.get("phone"),
|
|
"email": branch_contact.get("email"),
|
|
}
|
|
|
|
branch_contact_schema_id_to_name = load_schemas_id_to_name(conn, branch_loc_id, "contact")
|
|
branch_opp_schema_id_to_name = load_schemas_id_to_name(conn, branch_loc_id, "opportunity")
|
|
|
|
# Buscar contacto en Marca.
|
|
match, strategy, collision = find_brand_contact(branch_contact, idx_phone, idx_email, idx_name)
|
|
brand_contact_id = match["id"] if match else None
|
|
contact_was_created = False
|
|
|
|
if collision and not match:
|
|
# Mismo teléfono que un contacto de Marca pero el nombre
|
|
# diverge. NO mergear: el contacto destino probablemente
|
|
# es una persona distinta (caso pareja con mismo número).
|
|
log(
|
|
f" COLISION TELEFONO sin match por nombre: "
|
|
f"brand_contact_id={collision.get('id')} "
|
|
f"({(collision.get('first_name') or '') + ' ' + (collision.get('last_name') or '')!r}). "
|
|
"Plan: crear contacto NUEVO en Marca."
|
|
)
|
|
item["actions"].append({
|
|
"action": "phone_collision_unresolved",
|
|
"colliding_brand_contact_id": collision.get("id"),
|
|
"colliding_brand_name": f"{collision.get('first_name') or ''} {collision.get('last_name') or ''}".strip(),
|
|
})
|
|
summary.setdefault("phone_collisions_unresolved", 0)
|
|
summary["phone_collisions_unresolved"] += 1
|
|
if run_id:
|
|
try:
|
|
script_audit.record_change(
|
|
run_id, BRAND_LOCATION_ID, "contact", branch_contact.get("id") or "",
|
|
"", "skipped_phone_collision", None,
|
|
{
|
|
"branch_contact_id": branch_contact.get("id"),
|
|
"branch_location_id": branch_loc_id,
|
|
"colliding_brand_contact_id": collision.get("id"),
|
|
"phone": branch_contact.get("phone"),
|
|
},
|
|
)
|
|
except Exception as audit_exc:
|
|
log(f" WARN: no se pudo registrar la colision en script_audit: {audit_exc}")
|
|
|
|
if match:
|
|
log(f" Contacto en Marca encontrado por {strategy}: {brand_contact_id}")
|
|
item["actions"].append({"action": "match_existing_contact", "strategy": strategy, "brand_contact_id": brand_contact_id})
|
|
else:
|
|
log(f" No hay contacto en Marca. Plan: CREAR con datos de sucursal.")
|
|
payload = build_brand_contact_payload(
|
|
branch_contact,
|
|
branch_contact_schema_id_to_name,
|
|
brand_contact_schema_name_to_id,
|
|
)
|
|
item["actions"].append({"action": "create_contact", "payload_preview": _preview_payload(payload)})
|
|
|
|
if not dry_run:
|
|
res = sync_engine.ghl_client.create_contact(brand_token, payload)
|
|
brand_contact_id = (res.get("contact") or {}).get("id") or res.get("id")
|
|
if not brand_contact_id:
|
|
raise RuntimeError(f"GHL no devolvio id de contacto creado. Respuesta: {res}")
|
|
contact_was_created = True
|
|
summary["contacts_created"] += 1
|
|
log(f" Contacto creado en Marca: {brand_contact_id}")
|
|
item["actions"][-1]["result"] = {"brand_contact_id": brand_contact_id}
|
|
|
|
# Replicar en SQLite local para mantener snapshot sincronizado.
|
|
try:
|
|
upsert_brand_contact_in_db(conn, brand_contact_id, payload)
|
|
except Exception as db_exc:
|
|
log(f" WARN: no se pudo upsert contacto en SQLite: {db_exc}")
|
|
|
|
# Indexar el nuevo contacto en memoria.
|
|
new_c = {
|
|
"id": brand_contact_id,
|
|
"first_name": branch_contact.get("first_name"),
|
|
"last_name": branch_contact.get("last_name"),
|
|
"phone": branch_contact.get("phone"),
|
|
"email": branch_contact.get("email"),
|
|
}
|
|
p = normalize_phone(new_c.get("phone"))
|
|
e = normalize_email(new_c.get("email"))
|
|
n = normalize_name(new_c.get("first_name"), new_c.get("last_name"))
|
|
if p: idx_phone.setdefault(p, new_c)
|
|
if e: idx_email.setdefault(e, new_c)
|
|
if n: idx_name.setdefault(n, new_c)
|
|
|
|
if run_id:
|
|
cid = script_audit.record_change(
|
|
run_id, BRAND_LOCATION_ID, "contact", brand_contact_id,
|
|
"", "created", None,
|
|
{"phone": new_c["phone"], "email": new_c["email"], "name": f"{new_c['first_name']} {new_c['last_name']}".strip()},
|
|
)
|
|
if cid:
|
|
script_audit.mark_change(cid, "applied")
|
|
else:
|
|
# dry-run: simular id para el resto del flujo
|
|
brand_contact_id = f"<NEW_CONTACT_FOR_{branch_contact_id}>"
|
|
summary["contacts_created"] += 1
|
|
|
|
# Espera de 10s si se creo contacto nuevo y estamos en apply.
|
|
if contact_was_created and not dry_run:
|
|
log(f" Esperando {CONTACT_TO_OPP_WAIT_SECS}s por automatizaciones de Marca...")
|
|
time.sleep(CONTACT_TO_OPP_WAIT_SECS)
|
|
|
|
# Buscar opp existente en Marca para este contacto.
|
|
#
|
|
# Estrategia:
|
|
# 1. Si el contacto YA existia en Marca (match en snapshot SQLite),
|
|
# buscar la opp en el snapshot — es la verdad cacheada y evita
|
|
# una llamada extra a GHL. Match por nombre exacto, sino la
|
|
# primera open, sino la primera.
|
|
# 2. Si no encontramos en snapshot Y estamos en apply, hacer live
|
|
# lookup como fallback (puede pasar si el snapshot esta stale).
|
|
# 3. Si el contacto se acaba de crear en este run, SIEMPRE live
|
|
# lookup tras los 10s (porque puede haberse creado por automatizacion).
|
|
existing_brand_opp = None
|
|
target_name = (branch_opp.get("name") or "").strip().lower()
|
|
|
|
branch_opp_id = branch_opp.get("id")
|
|
|
|
def _has_link_to_branch(bo):
|
|
"""True si esta opp de Marca tiene un custom field cuyo valor
|
|
== branch_opp_id. Cubre ambos formatos: snapshot SQLite
|
|
(custom_fields_json string) y live API (customFields list)."""
|
|
if not branch_opp_id:
|
|
return False
|
|
target = str(branch_opp_id)
|
|
# live API: customFields como lista de dicts
|
|
for cf in bo.get("customFields") or []:
|
|
v = cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString")
|
|
if v and str(v) == target:
|
|
return True
|
|
# snapshot SQLite: custom_fields_json como string JSON
|
|
raw = bo.get("custom_fields_json")
|
|
if raw:
|
|
try:
|
|
cfs = json.loads(raw) if isinstance(raw, str) else raw
|
|
except Exception:
|
|
cfs = None
|
|
if isinstance(cfs, list):
|
|
for cf in cfs:
|
|
v = cf.get("value") or cf.get("fieldValue") or cf.get("fieldValueString")
|
|
if v and str(v) == target:
|
|
return True
|
|
return False
|
|
|
|
def _pick_existing(opps_list):
|
|
if not opps_list:
|
|
return None
|
|
# Match PRINCIPAL (determinístico): opp de Marca cuyo CF
|
|
# "ID Oportunidad Sucursal" == id nativo de la opp de sucursal.
|
|
# Alinea esta selección con el match del workflow n8n y del
|
|
# audit de la Comparativa. Evita pisar la opp equivocada en
|
|
# casos multi-empeño.
|
|
for bo in opps_list:
|
|
if _has_link_to_branch(bo):
|
|
return bo
|
|
# Respaldo: por nombre exacto, luego primera open, luego primera.
|
|
for bo in opps_list:
|
|
if (bo.get("name") or "").strip().lower() == target_name:
|
|
return bo
|
|
opens = [bo for bo in opps_list if (bo.get("status") or "").lower() == "open"]
|
|
return opens[0] if opens else opps_list[0]
|
|
|
|
if not contact_was_created:
|
|
# Caso 1: contacto existente. Snapshot primero.
|
|
cached = brand_opps_local_by_contact.get(brand_contact_id, [])
|
|
existing_brand_opp = _pick_existing(cached)
|
|
if existing_brand_opp:
|
|
log(f" Match opp en snapshot SQLite: {existing_brand_opp.get('id')}")
|
|
elif not dry_run:
|
|
# Snapshot vacio: fallback a live (puede ser snapshot stale).
|
|
live_opps = fetch_brand_opps_for_contact_live(brand_token, brand_contact_id)
|
|
log(f" Snapshot vacio; live encontro {len(live_opps)} opps para este contacto.")
|
|
existing_brand_opp = _pick_existing(live_opps)
|
|
else:
|
|
# Caso 2: contacto recien creado. Solo live tras la espera.
|
|
if not dry_run:
|
|
live_opps = fetch_brand_opps_for_contact_live(brand_token, brand_contact_id)
|
|
log(f" Tras espera: live encontro {len(live_opps)} opps para el contacto nuevo.")
|
|
existing_brand_opp = _pick_existing(live_opps)
|
|
# En dry-run, no podemos saber lo que una automatizacion crearia.
|
|
|
|
# Resolver pipeline/stage en Marca.
|
|
brand_pipeline_id, brand_stage_id = resolve_brand_pipeline_and_stage(
|
|
conn, branch_loc_id, branch_opp.get("pipeline_id"), branch_opp.get("pipeline_stage_id"),
|
|
)
|
|
|
|
if existing_brand_opp:
|
|
log(f" Opp ya existe en Marca: {existing_brand_opp.get('id')} (status: {existing_brand_opp.get('status')}). Plan: ACTUALIZAR.")
|
|
payload = build_brand_opp_payload(
|
|
branch_opp, brand_contact_id,
|
|
branch_opp_schema_id_to_name, brand_opp_schema_name_to_id,
|
|
brand_pipeline_id, brand_stage_id,
|
|
for_update=True,
|
|
)
|
|
target_status = (branch_opp.get("status") or "open")
|
|
current_status = (existing_brand_opp.get("status") or "open")
|
|
status_changes = target_status != current_status
|
|
item["actions"].append({
|
|
"action": "update_opp",
|
|
"brand_opp_id": existing_brand_opp.get("id"),
|
|
"payload_preview": _preview_payload(payload),
|
|
"status_change": (current_status, target_status) if status_changes else None,
|
|
})
|
|
if not dry_run:
|
|
sync_engine.ghl_client.update_opportunity(brand_token, existing_brand_opp["id"], payload)
|
|
if status_changes:
|
|
try:
|
|
sync_engine.ghl_client.update_opportunity_status(brand_token, existing_brand_opp["id"], target_status)
|
|
log(f" Status actualizado: {current_status} -> {target_status}")
|
|
except Exception as se:
|
|
log(f" WARN: no se pudo actualizar status: {se}")
|
|
summary["opps_updated"] += 1
|
|
log(f" Opp actualizada en Marca: {existing_brand_opp['id']}")
|
|
|
|
# Replicar en SQLite local (camino rápido, con datos del payload).
|
|
try:
|
|
upsert_brand_opp_in_db(
|
|
conn, existing_brand_opp["id"], payload,
|
|
contact_id=brand_contact_id,
|
|
status_override=target_status if status_changes else None,
|
|
)
|
|
except Exception as db_exc:
|
|
log(f" WARN: no se pudo upsert opp en SQLite: {db_exc}")
|
|
# Refresh autoritativo: GET /opportunities/{id} y save_single
|
|
# garantiza que SQLite quede 1:1 con Bucéfalo (CFs, status real,
|
|
# cualquier campo derivado que GHL haya seteado tras el PUT).
|
|
ref = sync_engine.refresh_opportunity_in_db(
|
|
brand_token, existing_brand_opp["id"], BRAND_LOCATION_ID,
|
|
)
|
|
if not ref.get("ok"):
|
|
log(f" WARN: refresh_opportunity_in_db fallo: {ref.get('error')}")
|
|
summary.setdefault("local_refresh_errors", 0)
|
|
summary["local_refresh_errors"] += 1
|
|
|
|
if run_id:
|
|
cid = script_audit.record_change(
|
|
run_id, BRAND_LOCATION_ID, "opportunity", existing_brand_opp["id"],
|
|
"", "updated", existing_brand_opp, payload,
|
|
)
|
|
if cid:
|
|
script_audit.mark_change(cid, "applied")
|
|
else:
|
|
summary["opps_updated"] += 1
|
|
item["status"] = "updated"
|
|
else:
|
|
log(f" No hay opp en Marca para este contacto. Plan: CREAR.")
|
|
payload = build_brand_opp_payload(
|
|
branch_opp, brand_contact_id,
|
|
branch_opp_schema_id_to_name, brand_opp_schema_name_to_id,
|
|
brand_pipeline_id, brand_stage_id,
|
|
)
|
|
item["actions"].append({"action": "create_opp", "payload_preview": _preview_payload(payload)})
|
|
if not dry_run:
|
|
if not brand_pipeline_id or not brand_stage_id:
|
|
raise RuntimeError(
|
|
f"No se pudo resolver pipeline/stage en Marca para opp {opp_id} "
|
|
f"(branch pipeline {branch_opp.get('pipeline_id')})"
|
|
)
|
|
res = sync_engine.ghl_client.create_opportunity(brand_token, payload)
|
|
new_opp_id = (res.get("opportunity") or {}).get("id") or res.get("id")
|
|
if not new_opp_id:
|
|
raise RuntimeError(f"GHL no devolvio id de opp creada. Respuesta: {res}")
|
|
summary["opps_created"] += 1
|
|
log(f" Opp creada en Marca: {new_opp_id}")
|
|
item["actions"][-1]["result"] = {"brand_opp_id": new_opp_id}
|
|
|
|
# Replicar en SQLite local (camino rápido).
|
|
try:
|
|
upsert_brand_opp_in_db(conn, new_opp_id, payload, contact_id=brand_contact_id)
|
|
except Exception as db_exc:
|
|
log(f" WARN: no se pudo upsert opp en SQLite: {db_exc}")
|
|
# Refresh autoritativo desde Bucéfalo.
|
|
ref = sync_engine.refresh_opportunity_in_db(
|
|
brand_token, new_opp_id, BRAND_LOCATION_ID,
|
|
)
|
|
if not ref.get("ok"):
|
|
log(f" WARN: refresh_opportunity_in_db fallo: {ref.get('error')}")
|
|
summary.setdefault("local_refresh_errors", 0)
|
|
summary["local_refresh_errors"] += 1
|
|
|
|
if run_id:
|
|
cid = script_audit.record_change(
|
|
run_id, BRAND_LOCATION_ID, "opportunity", new_opp_id,
|
|
"", "created", None, payload,
|
|
)
|
|
if cid:
|
|
script_audit.mark_change(cid, "applied")
|
|
else:
|
|
summary["opps_created"] += 1
|
|
item["status"] = "created"
|
|
|
|
results.append(item)
|
|
|
|
except Exception as e:
|
|
summary["errors"] += 1
|
|
item["status"] = "error"
|
|
item["error"] = str(e)
|
|
log(f" ERROR: {e}")
|
|
results.append(item)
|
|
|
|
log(f"\n=== RESUMEN ===")
|
|
log(f" Candidatas : {summary['candidates']}")
|
|
log(f" Contactos {'a crear' if dry_run else 'creados'} : {summary['contacts_created']}")
|
|
log(f" Opps {'a crear' if dry_run else 'creadas'} : {summary['opps_created']}")
|
|
log(f" Opps {'a actualizar' if dry_run else 'actualizadas'}: {summary['opps_updated']}")
|
|
if summary.get("phone_collisions_unresolved"):
|
|
log(f" Colisiones de telefono sin match (revision manual): {summary['phone_collisions_unresolved']}")
|
|
log(f" Errores : {summary['errors']}")
|
|
|
|
return {
|
|
"dry_run": dry_run,
|
|
"summary": summary,
|
|
"items": results,
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def _preview_payload(payload):
|
|
"""Resumen del payload para no inundar el output (los CFs pueden ser muchos)."""
|
|
cf_count = len(payload.get("customFields", []))
|
|
p = {k: v for k, v in payload.items() if k != "customFields"}
|
|
if cf_count:
|
|
p["customFields_count"] = cf_count
|
|
return p
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter)
|
|
parser.add_argument("--apply", action="store_true", help="Ejecuta las escrituras en GHL. Por default es dry-run.")
|
|
parser.add_argument("--yes", action="store_true", help="Skip confirmacion interactiva (uso headless desde dashboard).")
|
|
parser.add_argument("--only-opp", action="append", default=[], help="Procesa solo el opp_id dado (puede repetirse).")
|
|
parser.add_argument("--run-id", type=str, default=None, help="Id de script_audit para registrar cambios. Solo aplica con --apply.")
|
|
parser.add_argument("--json", action="store_true", help="Imprime el resultado como JSON al final.")
|
|
args = parser.parse_args()
|
|
|
|
dry_run = not args.apply
|
|
if not dry_run and not args.yes:
|
|
# Confirmacion interactiva (solo CLI, no headless).
|
|
confirm = input("Esto escribira en GHL. Continuar? (y/N): ").strip().lower()
|
|
if confirm not in ("y", "yes", "s", "si", "sÃ"):
|
|
print("Cancelado.")
|
|
sys.exit(0)
|
|
|
|
try:
|
|
result = run_sync(
|
|
opp_ids=args.only_opp or None,
|
|
dry_run=dry_run,
|
|
log=safe_print,
|
|
run_id=args.run_id,
|
|
)
|
|
except FileNotFoundError as e:
|
|
safe_print(f"ERROR: {e}")
|
|
sys.exit(2)
|
|
except RuntimeError as e:
|
|
safe_print(f"ERROR: {e}")
|
|
sys.exit(3)
|
|
|
|
if args.json:
|
|
safe_print(json.dumps(result, ensure_ascii=False, indent=2, default=str))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|