Files
MP-Manager/scripts/baserow_client.py
2026-05-30 14:31:19 -06:00

232 lines
9.7 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Cliente reusable para la API REST de Baserow (instancia self-hosted).
Patrón análogo a scripts/n8n_workflow_lib.py: autenticación, lectura, mutaciones
con dry-run, y backup previo. Las credenciales viven en el bloque `#Baserow` de
`n8n/n8n credencials.txt` (host/mail/password) — NUNCA se imprimen.
API (Baserow):
POST /api/user/token-auth/ {username,password} -> {access_token (JWT), ...}
Header: Authorization: JWT <access_token>
GET /api/applications/
GET /api/database/fields/table/{table}/
GET /api/database/rows/table/{table}/?user_field_names=true&size=200&page=N
PATCH/POST /api/database/rows/table/{table}/[{row_id}/]?user_field_names=true
Ejemplo:
from scripts.baserow_client import BaserowClient
c = BaserowClient.from_credentials()
rows = c.list_rows(750)
c.backup_table(750, label="verificador")
c.update_row(750, row_id, {"SC BUCEFALO": "85974 - MP - Eugenia"}, dry_run=True)
"""
import datetime
import json
import os
import re
import sys
import warnings
warnings.filterwarnings("ignore", message=r"urllib3 .* doesn't match a supported version!")
import requests # noqa: E402
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
from paths import MIGRATIONS_DIR # noqa: E402
CRED_PATH = os.path.join(ROOT_DIR, "n8n", "n8n credencials.txt")
class BaserowError(RuntimeError):
pass
def load_baserow_credentials(path=None):
"""Lee el bloque `#Baserow` de n8n/n8n credencials.txt -> (host, mail, password)."""
path = path or CRED_PATH
with open(path, "r", encoding="utf-8") as fh:
text = fh.read()
if "#Baserow" not in text:
raise BaserowError(f"No se encontró el bloque #Baserow en {path}")
blk = text.split("#Baserow", 1)[1]
host = re.search(r"url/host:\s*(\S+)", blk)
mail = re.search(r"mail:\s*(\S+)", blk)
pw = re.search(r"password:\s*(\S+)", blk)
if not (host and mail and pw):
raise BaserowError("Credenciales Baserow incompletas (url/host, mail, password).")
return host.group(1).rstrip("/"), mail.group(1), pw.group(1)
class BaserowClient:
def __init__(self, host, token, timeout=45):
self.host = host.rstrip("/")
self._token = token
self.timeout = timeout
self.headers = {"Authorization": f"JWT {token}", "Content-Type": "application/json"}
@classmethod
def from_credentials(cls, path=None, timeout=45):
host, mail, pw = load_baserow_credentials(path)
resp = requests.post(
f"{host}/api/user/token-auth/",
json={"username": mail, "password": pw},
timeout=timeout,
)
if resp.status_code != 200:
raise BaserowError(f"token-auth falló: HTTP {resp.status_code}")
data = resp.json()
token = data.get("access_token") or data.get("token")
if not token:
raise BaserowError("token-auth no devolvió access_token/token.")
return cls(host, token, timeout=timeout)
# ---- Lectura ----
def list_applications(self):
r = requests.get(f"{self.host}/api/applications/", headers=self.headers, timeout=self.timeout)
r.raise_for_status()
return r.json()
def list_fields(self, table):
r = requests.get(f"{self.host}/api/database/fields/table/{table}/",
headers=self.headers, timeout=self.timeout)
r.raise_for_status()
return r.json()
def list_rows(self, table, user_field_names=True):
"""Devuelve TODAS las filas (paginado)."""
rows = []
page = 1
while True:
params = {"size": 200, "page": page}
if user_field_names:
params["user_field_names"] = "true"
r = requests.get(f"{self.host}/api/database/rows/table/{table}/",
headers=self.headers, params=params, timeout=self.timeout)
r.raise_for_status()
data = r.json()
batch = data.get("results", [])
rows.extend(batch)
if not data.get("next") or not batch:
break
page += 1
return rows
def backup_table(self, table, label="table"):
rows = self.list_rows(table)
os.makedirs(MIGRATIONS_DIR, exist_ok=True)
ts = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
safe = re.sub(r"[^A-Za-z0-9_-]", "_", label)
path = os.path.join(MIGRATIONS_DIR, f"baserow_{safe}_{table}_{ts}.json")
with open(path, "w", encoding="utf-8") as fh:
json.dump(rows, fh, ensure_ascii=False, indent=2)
return path
# ---- Mutaciones (dry-run por defecto) ----
def update_row(self, table, row_id, fields, *, dry_run=True):
"""PATCH una fila (fields por nombre de columna). dry_run no llama API."""
action = {"op": "update", "table": table, "row_id": row_id, "fields": fields}
if dry_run:
return {"dry_run": True, **action}
r = requests.patch(
f"{self.host}/api/database/rows/table/{table}/{row_id}/",
headers=self.headers, params={"user_field_names": "true"},
json=fields, timeout=self.timeout,
)
if r.status_code not in (200, 201):
raise BaserowError(f"PATCH fila {row_id} tabla {table}: HTTP {r.status_code} {r.text[:300]}")
return r.json()
def create_row(self, table, fields, *, dry_run=True):
"""POST una fila nueva. dry_run no llama API."""
action = {"op": "create", "table": table, "fields": fields}
if dry_run:
return {"dry_run": True, **action}
r = requests.post(
f"{self.host}/api/database/rows/table/{table}/",
headers=self.headers, params={"user_field_names": "true"},
json=fields, timeout=self.timeout,
)
if r.status_code not in (200, 201):
raise BaserowError(f"POST fila tabla {table}: HTTP {r.status_code} {r.text[:300]}")
return r.json()
def delete_row(self, table, row_id, *, dry_run=True):
"""DELETE una fila. dry_run no llama API. Reversible vía backup_table()."""
action = {"op": "delete", "table": table, "row_id": row_id}
if dry_run:
return {"dry_run": True, **action}
r = requests.delete(
f"{self.host}/api/database/rows/table/{table}/{row_id}/",
headers=self.headers, timeout=self.timeout,
)
if r.status_code not in (200, 204):
raise BaserowError(f"DELETE fila {row_id} tabla {table}: HTTP {r.status_code} {r.text[:300]}")
return {"deleted": row_id}
# ---- DDL: tablas y campos (schema) ----
def list_tables(self, database_id):
r = requests.get(f"{self.host}/api/database/tables/database/{database_id}/",
headers=self.headers, timeout=self.timeout)
r.raise_for_status()
return r.json()
def create_table(self, database_id, name, *, dry_run=True):
"""Crea una tabla en una database. Baserow la crea con campos default
(Name primario + Notes + Active). dry_run no llama API."""
action = {"op": "create_table", "database_id": database_id, "name": name}
if dry_run:
return {"dry_run": True, **action}
r = requests.post(f"{self.host}/api/database/tables/database/{database_id}/",
headers=self.headers, json={"name": name}, timeout=self.timeout)
if r.status_code not in (200, 201):
raise BaserowError(f"POST tabla en db {database_id}: HTTP {r.status_code} {r.text[:300]}")
return r.json()
def create_field(self, table_id, name, field_type="text", *, dry_run=True, **extra):
"""Crea un campo en una tabla. field_type: 'text','long_text','boolean','date'..."""
body = {"name": name, "type": field_type, **extra}
action = {"op": "create_field", "table_id": table_id, **body}
if dry_run:
return {"dry_run": True, **action}
r = requests.post(f"{self.host}/api/database/fields/table/{table_id}/",
headers=self.headers, json=body, timeout=self.timeout)
if r.status_code not in (200, 201):
raise BaserowError(f"POST campo {name!r} tabla {table_id}: HTTP {r.status_code} {r.text[:300]}")
return r.json()
def update_field(self, field_id, fields, *, dry_run=True):
"""PATCH un campo (renombrar, cambiar tipo). fields ej {'name':..,'type':'text'}."""
action = {"op": "update_field", "field_id": field_id, "fields": fields}
if dry_run:
return {"dry_run": True, **action}
r = requests.patch(f"{self.host}/api/database/fields/{field_id}/",
headers=self.headers, json=fields, timeout=self.timeout)
if r.status_code not in (200, 201):
raise BaserowError(f"PATCH campo {field_id}: HTTP {r.status_code} {r.text[:300]}")
return r.json()
def delete_field(self, field_id, *, dry_run=True):
action = {"op": "delete_field", "field_id": field_id}
if dry_run:
return {"dry_run": True, **action}
r = requests.delete(f"{self.host}/api/database/fields/{field_id}/",
headers=self.headers, timeout=self.timeout)
if r.status_code not in (200, 204):
raise BaserowError(f"DELETE campo {field_id}: HTTP {r.status_code} {r.text[:300]}")
return {"deleted_field": field_id}
if __name__ == "__main__":
# Smoke test read-only: autentica y lista DB/tablas (sin imprimir secretos).
c = BaserowClient.from_credentials()
print("Auth OK. Aplicaciones:")
for app in c.list_applications():
print(f" DB id={app.get('id')} name={app.get('name')!r} tablas={len(app.get('tables') or [])}")