Files
2026-05-30 14:31:19 -06:00

445 lines
18 KiB
Python

import requests
import time
import threading
from urllib.parse import urlparse, parse_qs
import error_logging
BASE_URL = "https://services.leadconnectorhq.com"
API_VERSION = "2021-07-28"
class GHLClient:
def __init__(self):
self._last_request_times = {} # token -> float
self._lock = threading.Lock()
self._local = threading.local()
def _wait_for_rate_limit(self, token):
"""
Implementa un rate limiting estricto de 110ms entre peticiones por token (Location/Recurso),
ajustado al límite de ráfaga de GHL V2 (100 peticiones por cada 10 segundos por ubicación).
"""
with self._lock:
now = time.time()
last_time = self._last_request_times.get(token, 0)
elapsed = now - last_time
if elapsed < 0.110:
sleep_time = 0.110 - elapsed
time.sleep(sleep_time)
self._last_request_times[token] = time.time()
def _request(self, method, endpoint, token, params=None, json=None, attempts=3, version=None):
url = f"{BASE_URL}{endpoint}" if not endpoint.startswith("http") else endpoint
headers = {
"Authorization": f"Bearer {token}",
"Version": version or API_VERSION,
"Accept": "application/json",
"Content-Type": "application/json"
}
# Inicializar sesión TCP persistente por hilo para reutilización de sockets/SSL keep-alive
if not hasattr(self._local, "session"):
self._local.session = requests.Session()
self._wait_for_rate_limit(token)
last_response = None
for attempt in range(attempts):
try:
response = self._local.session.request(method, url, headers=headers, params=params, json=json, timeout=15)
last_response = response
# 429 Rate Limit - Backoff Lineal de 5 segundos
if response.status_code == 429:
sleep_sec = 5 + attempt
time.sleep(sleep_sec)
continue
# 5xx Server Error - Backoff Exponencial
if response.status_code >= 500:
sleep_sec = 2 * (2 ** attempt)
time.sleep(sleep_sec)
continue
# 401 Unauthorized - Error Inmediato
if response.status_code == 401:
error = Exception(f"GHL HTTP 401: Token inválido o no autorizado")
self._log_request_error(error, method, url, params, json, response, attempt + 1)
raise error
# Devolver JSON
if response.status_code in (200, 201):
return response.json()
elif response.status_code == 204:
return {"success": True}
else:
try:
response.raise_for_status()
except requests.exceptions.RequestException as e:
self._log_request_error(e, method, url, params, json, response, attempt + 1)
raise
except requests.exceptions.RequestException as e:
response = getattr(e, "response", None)
if response is not None and 400 <= response.status_code < 500 and response.status_code != 429:
raise e
if attempt == attempts - 1:
self._log_request_error(e, method, url, params, json, last_response, attempt + 1)
raise e
time.sleep(1 * (attempt + 1))
error = Exception(f"Fallaron todos los reintentos para la petición {method} {url}")
self._log_request_error(error, method, url, params, json, last_response, attempts)
raise error
def _log_request_error(self, exc, method, url, params, json_body, response, attempt):
context = {
"ghl_method": method,
"ghl_url": url,
"params": params,
"json_body": json_body,
"attempt": attempt,
}
if response is not None:
context.update({
"ghl_status_code": response.status_code,
"ghl_response_body": response.text[:4000],
})
error_logging.log_error("ghl_request_failed", exc, context)
# --- SERVICIOS DE CONTACTOS ---
def get_contacts(self, token, location_id, limit=100, start_after=None, start_after_id=None):
"""
Obtiene una página de contactos usando la API de GHL.
"""
params = {"locationId": location_id, "limit": min(limit, 100)}
if start_after:
params["startAfter"] = start_after
if start_after_id:
params["startAfterId"] = start_after_id
return self._request("GET", "/contacts/", token, params=params)
def get_all_contacts(self, token, location_id):
"""
Pagina todos los contactos de una location hasta agotar la API.
Sin tope artificial: el único corte es la paginación nativa de GHL
(batch vacío, batch menor al limit, total alcanzado o cursor agotado).
"""
contacts = []
start_after = None
start_after_id = None
limit = 100
while True:
data = self.get_contacts(token, location_id, limit=limit, start_after=start_after, start_after_id=start_after_id)
batch = data.get("contacts", [])
if not batch:
break
contacts.extend(batch)
meta = data.get("meta", {})
total_reported = meta.get("total", 0) or 0
if len(batch) < limit or (total_reported > 0 and len(contacts) >= total_reported):
break
next_page = meta.get("nextPage")
if next_page == "" or next_page is None:
break
next_page_url = meta.get("nextPageUrl")
if not next_page_url:
break
parsed = urlparse(next_page_url)
queries = parse_qs(parsed.query)
cursors = queries.get("startAfter")
cursor_ids = queries.get("startAfterId")
start_after = cursors[0] if cursors else None
start_after_id = cursor_ids[0] if cursor_ids else None
if not start_after and not start_after_id:
break
return contacts
def create_contact(self, token, contact_data):
return self._request("POST", "/contacts/", token, json=contact_data)
def update_contact(self, token, contact_id, contact_data):
return self._request("PUT", f"/contacts/{contact_id}", token, json=contact_data)
def delete_contact(self, token, contact_id, location_id):
return self._request("DELETE", f"/contacts/{contact_id}", token, params={"locationId": location_id})
def add_contact_tag(self, token, contact_id, tags):
# tags es una lista de strings
return self._request("POST", f"/contacts/{contact_id}/tags", token, json={"tags": tags})
def delete_contact_tag(self, token, contact_id, tag_name, location_id):
return self._request("DELETE", f"/contacts/{contact_id}/tags/{tag_name}", token, params={"locationId": location_id})
# --- SERVICIOS DE OPORTUNIDADES ---
def search_opportunities(self, token, location_id, limit=100, page=1, pipeline_id=None):
"""
Obtiene oportunidades usando el endpoint POST /opportunities/search (camelCase!).
Acepta pipeline_id opcional para filtrar por un pipeline puntual.
El filtro por pipeline se envia como query param snake_case (regla de GHL).
"""
body = {
"locationId": location_id,
"limit": min(limit, 100),
"page": page
}
params = {}
if pipeline_id:
params["pipeline_id"] = pipeline_id
return self._request("POST", "/opportunities/search", token, params=params or None, json=body)
def get_all_opportunities(self, token, location_id):
"""
Pagina todas las oportunidades de una location hasta agotar la API.
Sin tope artificial: el único corte es la paginación nativa de GHL
(batch vacío, batch menor al limit, o total alcanzado).
"""
opportunities = []
page = 1
limit = 100
while True:
try:
data = self.search_opportunities(token, location_id, limit=limit, page=page)
except Exception as e:
error_logging.log_error("ghl_get_all_opportunities_page_failed", e, {
"location_id": location_id,
"page": page,
"accumulated_opps": len(opportunities)
})
break
batch = data.get("opportunities", [])
if not batch:
break
opportunities.extend(batch)
total_reported = data.get("total", 0) or 0
if len(batch) < limit or (total_reported > 0 and len(opportunities) >= total_reported):
break
page += 1
return opportunities
def create_opportunity(self, token, opp_data):
return self._request("POST", "/opportunities/", token, json=opp_data)
def get_opportunity(self, token, opp_id):
"""GET /opportunities/{id} → trae la opp completa con su contactId,
pipelineId, pipelineStageId, status, customFields, etc. Usar tras una
mutación para refrescar el cache local con la verdad de Bucéfalo."""
return self._request("GET", f"/opportunities/{opp_id}", token)
def update_opportunity(self, token, opp_id, opp_data):
return self._request("PUT", f"/opportunities/{opp_id}", token, json=opp_data)
def update_opportunity_status(self, token, opp_id, status):
return self._request("PUT", f"/opportunities/{opp_id}/status", token, json={"status": status})
def delete_opportunity(self, token, opp_id, location_id):
return self._request("DELETE", f"/opportunities/{opp_id}", token, params={"locationId": location_id})
# --- SERVICIOS DE PIPELINES ---
def get_pipelines(self, token, location_id):
"""
Obtiene pipelines de oportunidades usando GET /opportunities/pipelines.
"""
try:
data = self._request("GET", "/opportunities/pipelines", token, params={"locationId": location_id})
return data.get("pipelines", [])
except Exception as e:
error_id = error_logging.log_error("ghl_get_opportunity_pipelines_failed", e, {"location_id": location_id})
print(f"Error fetching opportunity pipelines for {location_id}: {e} | error_id={error_id}")
try:
data = self._request("GET", "/pipelines/", token, params={"locationId": location_id})
return data.get("pipelines", [])
except Exception as e:
# Capturar error y devolver vacío para activar el fallback sintético existente.
error_id = error_logging.log_error("ghl_get_pipelines_failed", e, {"location_id": location_id})
print(f"Error fetching legacy pipelines for {location_id}: {e} | error_id={error_id}")
return []
# --- SERVICIOS DE WORKFLOWS ---
def get_workflows(self, token, location_id):
"""
Obtiene workflows usando GET /workflows/ con locationId en query params.
"""
try:
data = self._request("GET", "/workflows/", token, params={"locationId": location_id})
return data.get("workflows", [])
except Exception as e:
error_id = error_logging.log_error("ghl_get_workflows_failed", e, {"location_id": location_id})
print(f"Error fetching workflows for {location_id}: {e} | error_id={error_id}")
return []
# --- SERVICIOS DE CUSTOM FIELDS ---
def get_custom_fields(self, token, location_id):
return self._request("GET", f"/locations/{location_id}/customFields", token)
# --- SERVICIOS DE LOCATIONS ---
def get_location(self, token, location_id):
"""
Obtiene metadata de una location usando GET /locations/{locationId}.
"""
return self._request("GET", f"/locations/{location_id}", token)
# --- SERVICIOS DE ESQUEMAS DINÁMICOS (OBJECTS) ---
def get_objects_catalog(self, token, location_id):
"""
Obtiene el catálogo de objetos de GHL para inicializar y autorizar la consulta de esquemas.
"""
try:
return self._request("GET", "/objects/", token, params={"locationId": location_id}, version="2021-04-15")
except Exception:
return {}
def get_object_schema(self, token, location_id, object_key):
"""
Obtiene el esquema completo (campos estándar y personalizados) de un objeto (contact o opportunity).
"""
try:
fields = self.get_object_schema_fields(token, location_id, object_key)
return {field["name"]: field["id"] for field in fields if field.get("name") and field.get("id")}
except Exception as e:
print(f"Error al obtener esquema del objeto {object_key} para {location_id}: {e}")
return {}
def get_object_schema_fields(self, token, location_id, object_key):
"""
Obtiene la lista cruda de campos de un objeto para cachearla en SQLite.
"""
try:
self.get_objects_catalog(token, location_id)
data = self._request("GET", f"/objects/{object_key}", token, params={"locationId": location_id}, version="2021-04-15")
return data.get("fields", [])
except Exception as e:
print(f"Error al obtener campos del objeto {object_key} para {location_id}: {e}")
return []
# --- SERVICIOS DE FORMS (FORMULARIOS Y SUBMISSIONS) ---
def get_forms(self, token, location_id):
"""
Lista los formularios de una location.
GET /forms/?locationId=...
Devuelve: lista de {id, locationId, name}
"""
try:
data = self._request("GET", "/forms/", token, params={"locationId": location_id})
return data.get("forms", []) if isinstance(data, dict) else []
except Exception as e:
print(f"Error al obtener formularios de {location_id}: {e}")
return []
def get_form_submissions_page(self, token, location_id, form_id=None, page=1, limit=100,
start_at=None, end_at=None):
"""
Una página de submissions. GHL acepta page y limit (max 100), y opcional
startAt / endAt (formato 'YYYY-MM-DD' o ISO 8601; unix timestamp da 422).
NOTA: sin startAt/endAt la API parece capar el resultado a ~1 mes hacia
atras. Para histórico completo hay que paginar por ventanas de fecha.
Devuelve el dict crudo {submissions, meta, traceId}.
"""
params = {"locationId": location_id, "page": page, "limit": limit}
if form_id:
params["formId"] = form_id
if start_at:
params["startAt"] = start_at
if end_at:
params["endAt"] = end_at
return self._request("GET", "/forms/submissions", token, params=params)
def get_all_form_submissions(self, token, location_id, form_id=None,
page_size=100, max_submissions=None,
start_at=None, end_at=None,
progress_callback=None):
"""
Itera todas las páginas de submissions hasta agotar `meta.nextPage` o
alcanzar `max_submissions`. Deduplica por id. Protección anti-loop:
corta si una página se repite o si una página viene vacía.
start_at / end_at: opcionales para filtrar por createdAt (YYYY-MM-DD o ISO).
progress_callback(page_num, page_count, total_so_far): opcional.
"""
results = []
seen_ids = set()
page = 1
last_page_hash = None
empty_streak = 0
while True:
data = self.get_form_submissions_page(token, location_id, form_id=form_id,
page=page, limit=page_size,
start_at=start_at, end_at=end_at)
if not isinstance(data, dict):
break
subs = data.get("submissions") or []
if not subs:
empty_streak += 1
if empty_streak >= 2:
break
else:
empty_streak = 0
# Anti-loop: detecta misma página dos veces seguidas
page_signature = tuple(s.get("id") for s in subs)
if page_signature and page_signature == last_page_hash:
break
last_page_hash = page_signature
new_in_page = 0
for s in subs:
sid = s.get("id")
if not sid or sid in seen_ids:
continue
seen_ids.add(sid)
results.append(s)
new_in_page += 1
if max_submissions and len(results) >= max_submissions:
break
if progress_callback:
try:
progress_callback(page, len(subs), len(results))
except Exception:
pass
if max_submissions and len(results) >= max_submissions:
break
meta = data.get("meta") or {}
next_page = meta.get("nextPage")
if not next_page:
break
# Anti-loop dura: next_page no debe regresar al actual o anterior
try:
next_page_int = int(next_page)
except (TypeError, ValueError):
break
if next_page_int <= page:
break
page = next_page_int
return results