#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Cliente reusable para la API REST de Baserow (instancia self-hosted). Patrón análogo a scripts/n8n_workflow_lib.py: autenticación, lectura, mutaciones con dry-run, y backup previo. Las credenciales viven en el bloque `#Baserow` de `n8n/n8n credencials.txt` (host/mail/password) — NUNCA se imprimen. API (Baserow): POST /api/user/token-auth/ {username,password} -> {access_token (JWT), ...} Header: Authorization: JWT GET /api/applications/ GET /api/database/fields/table/{table}/ GET /api/database/rows/table/{table}/?user_field_names=true&size=200&page=N PATCH/POST /api/database/rows/table/{table}/[{row_id}/]?user_field_names=true Ejemplo: from scripts.baserow_client import BaserowClient c = BaserowClient.from_credentials() rows = c.list_rows(750) c.backup_table(750, label="verificador") c.update_row(750, row_id, {"SC BUCEFALO": "85974 - MP - Eugenia"}, dry_run=True) """ import datetime import json import os import re import sys import warnings warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!") import requests # noqa: E402 ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) from paths import MIGRATIONS_DIR # noqa: E402 CRED_PATH = os.path.join(ROOT_DIR, "n8n", "n8n credencials.txt") class BaserowError(RuntimeError): pass def load_baserow_credentials(path=None): """Lee el bloque `#Baserow` de n8n/n8n credencials.txt -> (host, mail, password).""" path = path or CRED_PATH with open(path, "r", encoding="utf-8") as fh: text = fh.read() if "#Baserow" not in text: raise BaserowError(f"No se encontró el bloque #Baserow en {path}") blk = text.split("#Baserow", 1)[1] host = re.search(r"url/host:\s*(\S+)", blk) mail = re.search(r"mail:\s*(\S+)", blk) pw = re.search(r"password:\s*(\S+)", blk) if not (host and mail and pw): raise BaserowError("Credenciales Baserow incompletas (url/host, mail, password).") return host.group(1).rstrip("/"), mail.group(1), pw.group(1) class BaserowClient: def __init__(self, host, token, timeout=45): self.host = host.rstrip("/") self._token = token self.timeout = timeout self.headers = {"Authorization": f"JWT {token}", "Content-Type": "application/json"} @classmethod def from_credentials(cls, path=None, timeout=45): host, mail, pw = load_baserow_credentials(path) resp = requests.post( f"{host}/api/user/token-auth/", json={"username": mail, "password": pw}, timeout=timeout, ) if resp.status_code != 200: raise BaserowError(f"token-auth falló: HTTP {resp.status_code}") data = resp.json() token = data.get("access_token") or data.get("token") if not token: raise BaserowError("token-auth no devolvió access_token/token.") return cls(host, token, timeout=timeout) # ---- Lectura ---- def list_applications(self): r = requests.get(f"{self.host}/api/applications/", headers=self.headers, timeout=self.timeout) r.raise_for_status() return r.json() def list_fields(self, table): r = requests.get(f"{self.host}/api/database/fields/table/{table}/", headers=self.headers, timeout=self.timeout) r.raise_for_status() return r.json() def list_rows(self, table, user_field_names=True): """Devuelve TODAS las filas (paginado).""" rows = [] page = 1 while True: params = {"size": 200, "page": page} if user_field_names: params["user_field_names"] = "true" r = requests.get(f"{self.host}/api/database/rows/table/{table}/", headers=self.headers, params=params, timeout=self.timeout) r.raise_for_status() data = r.json() batch = data.get("results", []) rows.extend(batch) if not data.get("next") or not batch: break page += 1 return rows def backup_table(self, table, label="table"): rows = self.list_rows(table) os.makedirs(MIGRATIONS_DIR, exist_ok=True) ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S") safe = re.sub(r"[^A-Za-z0-9_-]", "_", label) path = os.path.join(MIGRATIONS_DIR, f"baserow_{safe}_{table}_{ts}.json") with open(path, "w", encoding="utf-8") as fh: json.dump(rows, fh, ensure_ascii=False, indent=2) return path # ---- Mutaciones (dry-run por defecto) ---- def update_row(self, table, row_id, fields, *, dry_run=True): """PATCH una fila (fields por nombre de columna). dry_run no llama API.""" action = {"op": "update", "table": table, "row_id": row_id, "fields": fields} if dry_run: return {"dry_run": True, **action} r = requests.patch( f"{self.host}/api/database/rows/table/{table}/{row_id}/", headers=self.headers, params={"user_field_names": "true"}, json=fields, timeout=self.timeout, ) if r.status_code not in (200, 201): raise BaserowError(f"PATCH fila {row_id} tabla {table}: HTTP {r.status_code} {r.text[:300]}") return r.json() def create_row(self, table, fields, *, dry_run=True): """POST una fila nueva. dry_run no llama API.""" action = {"op": "create", "table": table, "fields": fields} if dry_run: return {"dry_run": True, **action} r = requests.post( f"{self.host}/api/database/rows/table/{table}/", headers=self.headers, params={"user_field_names": "true"}, json=fields, timeout=self.timeout, ) if r.status_code not in (200, 201): raise BaserowError(f"POST fila tabla {table}: HTTP {r.status_code} {r.text[:300]}") return r.json() def delete_row(self, table, row_id, *, dry_run=True): """DELETE una fila. dry_run no llama API. Reversible vía backup_table().""" action = {"op": "delete", "table": table, "row_id": row_id} if dry_run: return {"dry_run": True, **action} r = requests.delete( f"{self.host}/api/database/rows/table/{table}/{row_id}/", headers=self.headers, timeout=self.timeout, ) if r.status_code not in (200, 204): raise BaserowError(f"DELETE fila {row_id} tabla {table}: HTTP {r.status_code} {r.text[:300]}") return {"deleted": row_id} # ---- DDL: tablas y campos (schema) ---- def list_tables(self, database_id): r = requests.get(f"{self.host}/api/database/tables/database/{database_id}/", headers=self.headers, timeout=self.timeout) r.raise_for_status() return r.json() def create_table(self, database_id, name, *, dry_run=True): """Crea una tabla en una database. Baserow la crea con campos default (Name primario + Notes + Active). dry_run no llama API.""" action = {"op": "create_table", "database_id": database_id, "name": name} if dry_run: return {"dry_run": True, **action} r = requests.post(f"{self.host}/api/database/tables/database/{database_id}/", headers=self.headers, json={"name": name}, timeout=self.timeout) if r.status_code not in (200, 201): raise BaserowError(f"POST tabla en db {database_id}: HTTP {r.status_code} {r.text[:300]}") return r.json() def create_field(self, table_id, name, field_type="text", *, dry_run=True, **extra): """Crea un campo en una tabla. field_type: 'text','long_text','boolean','date'...""" body = {"name": name, "type": field_type, **extra} action = {"op": "create_field", "table_id": table_id, **body} if dry_run: return {"dry_run": True, **action} r = requests.post(f"{self.host}/api/database/fields/table/{table_id}/", headers=self.headers, json=body, timeout=self.timeout) if r.status_code not in (200, 201): raise BaserowError(f"POST campo {name!r} tabla {table_id}: HTTP {r.status_code} {r.text[:300]}") return r.json() def update_field(self, field_id, fields, *, dry_run=True): """PATCH un campo (renombrar, cambiar tipo). fields ej {'name':..,'type':'text'}.""" action = {"op": "update_field", "field_id": field_id, "fields": fields} if dry_run: return {"dry_run": True, **action} r = requests.patch(f"{self.host}/api/database/fields/{field_id}/", headers=self.headers, json=fields, timeout=self.timeout) if r.status_code not in (200, 201): raise BaserowError(f"PATCH campo {field_id}: HTTP {r.status_code} {r.text[:300]}") return r.json() def delete_field(self, field_id, *, dry_run=True): action = {"op": "delete_field", "field_id": field_id} if dry_run: return {"dry_run": True, **action} r = requests.delete(f"{self.host}/api/database/fields/{field_id}/", headers=self.headers, timeout=self.timeout) if r.status_code not in (200, 204): raise BaserowError(f"DELETE campo {field_id}: HTTP {r.status_code} {r.text[:300]}") return {"deleted_field": field_id} if __name__ == "__main__": # Smoke test read-only: autentica y lista DB/tablas (sin imprimir secretos). c = BaserowClient.from_credentials() print("Auth OK. Aplicaciones:") for app in c.list_applications(): print(f" DB id={app.get('id')} name={app.get('name')!r} tablas={len(app.get('tables') or [])}")