import requests import time import threading from urllib.parse import urlparse, parse_qs import error_logging BASE_URL = "https://services.leadconnectorhq.com" API_VERSION = "2021-07-28" class GHLClient: def __init__(self): self._last_request_times = {} # token -> float self._lock = threading.Lock() self._local = threading.local() def _wait_for_rate_limit(self, token): """ Implementa un rate limiting estricto de 110ms entre peticiones por token (Location/Recurso), ajustado al límite de ráfaga de GHL V2 (100 peticiones por cada 10 segundos por ubicación). """ with self._lock: now = time.time() last_time = self._last_request_times.get(token, 0) elapsed = now - last_time if elapsed < 0.110: sleep_time = 0.110 - elapsed time.sleep(sleep_time) self._last_request_times[token] = time.time() def _request(self, method, endpoint, token, params=None, json=None, attempts=3, version=None): url = f"{BASE_URL}{endpoint}" if not endpoint.startswith("http") else endpoint headers = { "Authorization": f"Bearer {token}", "Version": version or API_VERSION, "Accept": "application/json", "Content-Type": "application/json" } # Inicializar sesión TCP persistente por hilo para reutilización de sockets/SSL keep-alive if not hasattr(self._local, "session"): self._local.session = requests.Session() self._wait_for_rate_limit(token) last_response = None for attempt in range(attempts): try: response = self._local.session.request(method, url, headers=headers, params=params, json=json, timeout=15) last_response = response # 429 Rate Limit - Backoff Lineal de 5 segundos if response.status_code == 429: sleep_sec = 5 + attempt time.sleep(sleep_sec) continue # 5xx Server Error - Backoff Exponencial if response.status_code >= 500: sleep_sec = 2 * (2 ** attempt) time.sleep(sleep_sec) continue # 401 Unauthorized - Error Inmediato if response.status_code == 401: error = Exception(f"GHL HTTP 401: Token inválido o no autorizado") self._log_request_error(error, method, url, params, json, response, attempt + 1) raise error # Devolver JSON if response.status_code in (200, 201): return response.json() elif response.status_code == 204: return {"success": True} else: try: response.raise_for_status() except requests.exceptions.RequestException as e: self._log_request_error(e, method, url, params, json, response, attempt + 1) raise except requests.exceptions.RequestException as e: response = getattr(e, "response", None) if response is not None and 400 <= response.status_code < 500 and response.status_code != 429: raise e if attempt == attempts - 1: self._log_request_error(e, method, url, params, json, last_response, attempt + 1) raise e time.sleep(1 * (attempt + 1)) error = Exception(f"Fallaron todos los reintentos para la petición {method} {url}") self._log_request_error(error, method, url, params, json, last_response, attempts) raise error def _log_request_error(self, exc, method, url, params, json_body, response, attempt): context = { "ghl_method": method, "ghl_url": url, "params": params, "json_body": json_body, "attempt": attempt, } if response is not None: context.update({ "ghl_status_code": response.status_code, "ghl_response_body": response.text[:4000], }) error_logging.log_error("ghl_request_failed", exc, context) # --- SERVICIOS DE CONTACTOS --- def get_contacts(self, token, location_id, limit=100, start_after=None, start_after_id=None): """ Obtiene una página de contactos usando la API de GHL. """ params = {"locationId": location_id, "limit": min(limit, 100)} if start_after: params["startAfter"] = start_after if start_after_id: params["startAfterId"] = start_after_id return self._request("GET", "/contacts/", token, params=params) def get_all_contacts(self, token, location_id): """ Pagina todos los contactos de una location hasta agotar la API. Sin tope artificial: el único corte es la paginación nativa de GHL (batch vacío, batch menor al limit, total alcanzado o cursor agotado). """ contacts = [] start_after = None start_after_id = None limit = 100 while True: data = self.get_contacts(token, location_id, limit=limit, start_after=start_after, start_after_id=start_after_id) batch = data.get("contacts", []) if not batch: break contacts.extend(batch) meta = data.get("meta", {}) total_reported = meta.get("total", 0) or 0 if len(batch) < limit or (total_reported > 0 and len(contacts) >= total_reported): break next_page = meta.get("nextPage") if next_page == "" or next_page is None: break next_page_url = meta.get("nextPageUrl") if not next_page_url: break parsed = urlparse(next_page_url) queries = parse_qs(parsed.query) cursors = queries.get("startAfter") cursor_ids = queries.get("startAfterId") start_after = cursors[0] if cursors else None start_after_id = cursor_ids[0] if cursor_ids else None if not start_after and not start_after_id: break return contacts def create_contact(self, token, contact_data): return self._request("POST", "/contacts/", token, json=contact_data) def update_contact(self, token, contact_id, contact_data): return self._request("PUT", f"/contacts/{contact_id}", token, json=contact_data) def delete_contact(self, token, contact_id, location_id): return self._request("DELETE", f"/contacts/{contact_id}", token, params={"locationId": location_id}) def add_contact_tag(self, token, contact_id, tags): # tags es una lista de strings return self._request("POST", f"/contacts/{contact_id}/tags", token, json={"tags": tags}) def delete_contact_tag(self, token, contact_id, tag_name, location_id): return self._request("DELETE", f"/contacts/{contact_id}/tags/{tag_name}", token, params={"locationId": location_id}) # --- SERVICIOS DE OPORTUNIDADES --- def search_opportunities(self, token, location_id, limit=100, page=1, pipeline_id=None): """ Obtiene oportunidades usando el endpoint POST /opportunities/search (camelCase!). Acepta pipeline_id opcional para filtrar por un pipeline puntual. El filtro por pipeline se envia como query param snake_case (regla de GHL). """ body = { "locationId": location_id, "limit": min(limit, 100), "page": page } params = {} if pipeline_id: params["pipeline_id"] = pipeline_id return self._request("POST", "/opportunities/search", token, params=params or None, json=body) def get_all_opportunities(self, token, location_id): """ Pagina todas las oportunidades de una location hasta agotar la API. Sin tope artificial: el único corte es la paginación nativa de GHL (batch vacío, batch menor al limit, o total alcanzado). """ opportunities = [] page = 1 limit = 100 while True: try: data = self.search_opportunities(token, location_id, limit=limit, page=page) except Exception as e: error_logging.log_error("ghl_get_all_opportunities_page_failed", e, { "location_id": location_id, "page": page, "accumulated_opps": len(opportunities) }) break batch = data.get("opportunities", []) if not batch: break opportunities.extend(batch) total_reported = data.get("total", 0) or 0 if len(batch) < limit or (total_reported > 0 and len(opportunities) >= total_reported): break page += 1 return opportunities def create_opportunity(self, token, opp_data): return self._request("POST", "/opportunities/", token, json=opp_data) def get_opportunity(self, token, opp_id): """GET /opportunities/{id} → trae la opp completa con su contactId, pipelineId, pipelineStageId, status, customFields, etc. Usar tras una mutación para refrescar el cache local con la verdad de Bucéfalo.""" return self._request("GET", f"/opportunities/{opp_id}", token) def update_opportunity(self, token, opp_id, opp_data): return self._request("PUT", f"/opportunities/{opp_id}", token, json=opp_data) def update_opportunity_status(self, token, opp_id, status): return self._request("PUT", f"/opportunities/{opp_id}/status", token, json={"status": status}) def delete_opportunity(self, token, opp_id, location_id): return self._request("DELETE", f"/opportunities/{opp_id}", token, params={"locationId": location_id}) # --- SERVICIOS DE PIPELINES --- def get_pipelines(self, token, location_id): """ Obtiene pipelines de oportunidades usando GET /opportunities/pipelines. """ try: data = self._request("GET", "/opportunities/pipelines", token, params={"locationId": location_id}) return data.get("pipelines", []) except Exception as e: error_id = error_logging.log_error("ghl_get_opportunity_pipelines_failed", e, {"location_id": location_id}) print(f"Error fetching opportunity pipelines for {location_id}: {e} | error_id={error_id}") try: data = self._request("GET", "/pipelines/", token, params={"locationId": location_id}) return data.get("pipelines", []) except Exception as e: # Capturar error y devolver vacío para activar el fallback sintético existente. error_id = error_logging.log_error("ghl_get_pipelines_failed", e, {"location_id": location_id}) print(f"Error fetching legacy pipelines for {location_id}: {e} | error_id={error_id}") return [] # --- SERVICIOS DE WORKFLOWS --- def get_workflows(self, token, location_id): """ Obtiene workflows usando GET /workflows/ con locationId en query params. """ try: data = self._request("GET", "/workflows/", token, params={"locationId": location_id}) return data.get("workflows", []) except Exception as e: error_id = error_logging.log_error("ghl_get_workflows_failed", e, {"location_id": location_id}) print(f"Error fetching workflows for {location_id}: {e} | error_id={error_id}") return [] # --- SERVICIOS DE CUSTOM FIELDS --- def get_custom_fields(self, token, location_id): return self._request("GET", f"/locations/{location_id}/customFields", token) # --- SERVICIOS DE LOCATIONS --- def get_location(self, token, location_id): """ Obtiene metadata de una location usando GET /locations/{locationId}. """ return self._request("GET", f"/locations/{location_id}", token) # --- SERVICIOS DE ESQUEMAS DINÁMICOS (OBJECTS) --- def get_objects_catalog(self, token, location_id): """ Obtiene el catálogo de objetos de GHL para inicializar y autorizar la consulta de esquemas. """ try: return self._request("GET", "/objects/", token, params={"locationId": location_id}, version="2021-04-15") except Exception: return {} def get_object_schema(self, token, location_id, object_key): """ Obtiene el esquema completo (campos estándar y personalizados) de un objeto (contact o opportunity). """ try: fields = self.get_object_schema_fields(token, location_id, object_key) return {field["name"]: field["id"] for field in fields if field.get("name") and field.get("id")} except Exception as e: print(f"Error al obtener esquema del objeto {object_key} para {location_id}: {e}") return {} def get_object_schema_fields(self, token, location_id, object_key): """ Obtiene la lista cruda de campos de un objeto para cachearla en SQLite. """ try: self.get_objects_catalog(token, location_id) data = self._request("GET", f"/objects/{object_key}", token, params={"locationId": location_id}, version="2021-04-15") return data.get("fields", []) except Exception as e: print(f"Error al obtener campos del objeto {object_key} para {location_id}: {e}") return [] # --- SERVICIOS DE FORMS (FORMULARIOS Y SUBMISSIONS) --- def get_forms(self, token, location_id): """ Lista los formularios de una location. GET /forms/?locationId=... Devuelve: lista de {id, locationId, name} """ try: data = self._request("GET", "/forms/", token, params={"locationId": location_id}) return data.get("forms", []) if isinstance(data, dict) else [] except Exception as e: print(f"Error al obtener formularios de {location_id}: {e}") return [] def get_form_submissions_page(self, token, location_id, form_id=None, page=1, limit=100, start_at=None, end_at=None): """ Una página de submissions. GHL acepta page y limit (max 100), y opcional startAt / endAt (formato 'YYYY-MM-DD' o ISO 8601; unix timestamp da 422). NOTA: sin startAt/endAt la API parece capar el resultado a ~1 mes hacia atras. Para histórico completo hay que paginar por ventanas de fecha. Devuelve el dict crudo {submissions, meta, traceId}. """ params = {"locationId": location_id, "page": page, "limit": limit} if form_id: params["formId"] = form_id if start_at: params["startAt"] = start_at if end_at: params["endAt"] = end_at return self._request("GET", "/forms/submissions", token, params=params) def get_all_form_submissions(self, token, location_id, form_id=None, page_size=100, max_submissions=None, start_at=None, end_at=None, progress_callback=None): """ Itera todas las páginas de submissions hasta agotar `meta.nextPage` o alcanzar `max_submissions`. Deduplica por id. Protección anti-loop: corta si una página se repite o si una página viene vacía. start_at / end_at: opcionales para filtrar por createdAt (YYYY-MM-DD o ISO). progress_callback(page_num, page_count, total_so_far): opcional. """ results = [] seen_ids = set() page = 1 last_page_hash = None empty_streak = 0 while True: data = self.get_form_submissions_page(token, location_id, form_id=form_id, page=page, limit=page_size, start_at=start_at, end_at=end_at) if not isinstance(data, dict): break subs = data.get("submissions") or [] if not subs: empty_streak += 1 if empty_streak >= 2: break else: empty_streak = 0 # Anti-loop: detecta misma página dos veces seguidas page_signature = tuple(s.get("id") for s in subs) if page_signature and page_signature == last_page_hash: break last_page_hash = page_signature new_in_page = 0 for s in subs: sid = s.get("id") if not sid or sid in seen_ids: continue seen_ids.add(sid) results.append(s) new_in_page += 1 if max_submissions and len(results) >= max_submissions: break if progress_callback: try: progress_callback(page, len(subs), len(results)) except Exception: pass if max_submissions and len(results) >= max_submissions: break meta = data.get("meta") or {} next_page = meta.get("nextPage") if not next_page: break # Anti-loop dura: next_page no debe regresar al actual o anterior try: next_page_int = int(next_page) except (TypeError, ValueError): break if next_page_int <= page: break page = next_page_int return results