Files
2026-05-30 14:31:19 -06:00

206 lines
7.7 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""CLI read-only para inspeccionar runs de script_audit + logs estructurados.
Útil cuando el dashboard no está disponible o cuando quieres diagnosticar
rápido un run específico sin hacer click. Combina 3 fuentes:
1. script_audit.script_runs / script_change_log (SQLite) → estado declarativo
2. generated/logs/script_runs/{run_id}.jsonl → cronología por evento
3. generated/logs/errors.jsonl filtrado por run_id → excepciones técnicas
Uso:
python scripts/audit_run.py <run_id> # resumen completo del run
python scripts/audit_run.py --list 20 # últimos 20 runs
python scripts/audit_run.py <run_id> --json # output crudo para pipes
python scripts/audit_run.py <run_id> --events 50 # últimos N eventos del JSONL
"""
import argparse
import json
import os
import sys
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
try:
sys.stdout.reconfigure(encoding="utf-8")
except Exception:
pass
import script_audit # noqa: E402
from paths import LOGS_DIR # noqa: E402
from script_logger import read_run_log # noqa: E402
ERRORS_PATH = os.path.join(LOGS_DIR, "errors.jsonl")
def _read_errors_for_run(run_id):
"""Filtra errors.jsonl por context.run_id."""
if not run_id or not os.path.exists(ERRORS_PATH):
return []
out = []
for path in [ERRORS_PATH] + [f"{ERRORS_PATH}.{i}" for i in range(1, 6)]:
if not os.path.exists(path):
continue
try:
with open(path, encoding="utf-8") as f:
for ln in f:
ln = ln.strip()
if not ln or run_id not in ln:
continue
try:
obj = json.loads(ln)
except Exception:
continue
ctx = obj.get("context") or {}
if ctx.get("run_id") == run_id:
out.append(obj)
except Exception:
continue
return out
def cmd_list(limit):
runs = script_audit.list_runs(limit=limit)
if not runs:
print("(sin runs registrados)")
return
print(f"=== Últimos {len(runs)} runs ===")
print(f"{'ID':36} {'STATUS':12} {'CHANGES':>8} {'APPLIED':>8} {'STARTED':19} SCRIPT")
print("-" * 120)
for r in runs:
rid = (r.get("id") or "")[:36]
status = r.get("status") or ""
chg = r.get("change_count") or 0
applied = r.get("applied_count") or 0
started = r.get("started_at") or ""
script_name = r.get("script_name") or ""
print(f"{rid:36} {status:12} {chg:>8} {applied:>8} {started:19} {script_name}")
def cmd_show(run_id, as_json=False, events_limit=20):
summary = script_audit.get_run_summary(run_id)
changes = script_audit.list_changes(run_id)
events = read_run_log(run_id, limit=events_limit)
errors = _read_errors_for_run(run_id)
if as_json:
print(json.dumps({
"summary": summary,
"changes": changes,
"events": events,
"errors": errors,
}, ensure_ascii=False, indent=2, default=str))
return
run = summary.get("run") if summary else None
if not run:
print(f"[NO-RUN] No existe el run '{run_id}' en script_audit.")
# Pero quizá hay log JSONL (run sin auditoría).
if events:
print(f" - encontré {len(events)} evento(s) en logs/script_runs/{run_id}.jsonl")
if errors:
print(f" - encontré {len(errors)} error(es) en errors.jsonl con run_id={run_id}")
if not events and not errors:
print(" - tampoco hay logs JSONL ni errores asociados.")
return
print(f"=== RUN {run_id} ===")
print(f" script : {run.get('script_name')}")
print(f" status : {run.get('status')}")
print(f" started_at : {run.get('started_at')}")
print(f" finished_at : {run.get('finished_at') or '(en curso)'}")
print(f" execution_mode: {run.get('execution_mode')}")
if run.get('error_message'):
print(f" error_message : {run.get('error_message')}")
if run.get('arguments'):
print(f" arguments : {run.get('arguments')}")
locations = []
try:
locations = json.loads(run.get('locations_json') or '[]')
except Exception:
pass
if locations:
print(f" locations : {locations}")
print()
print(f" total_changes : {summary.get('total_changes', 0)}")
by_status = summary.get('by_status') or {}
if by_status:
print(f" by_status : {by_status}")
by_object = summary.get('by_object_type') or {}
if by_object:
print(f" by_object : {by_object}")
if changes:
print()
print(f"--- Cambios ({len(changes)}) ---")
print(f" {'#':>3} {'STATUS':12} {'OBJ':10} {'LOC':>14} {'OBJECT_ID':>36} FIELD")
for i, c in enumerate(changes, 1):
status = c.get('status') or ''
obj_type = (c.get('object_type') or '')[:10]
loc = (c.get('location_id') or '')[:14]
oid = (c.get('object_id') or '')[:36]
field = c.get('field_name') or c.get('field_id') or ''
err = c.get('error_message') or ''
print(f" {i:>3} {status:12} {obj_type:10} {loc:>14} {oid:>36} {field}{(' | err: ' + err) if err else ''}")
failed = summary.get('failed_changes') or []
if failed:
print()
print(f"--- Failed changes (top {len(failed)}) ---")
for f in failed:
print(f" loc={f.get('location_id')} obj={f.get('object_type')}:{(f.get('object_id') or '')[:12]} field={f.get('field_name')} err={f.get('error_message')}")
if events:
print()
print(f"--- Últimos {len(events)} eventos del log estructurado ---")
for ev in events:
ts = ev.get('ts', '')[:23]
lvl = (ev.get('level') or '').upper()[:5]
evname = ev.get('event') or ''
wid = ev.get('worker_id', '')
loc = (ev.get('location_id') or '')[:8]
wf = (ev.get('workflow_id') or '')[:8]
extras = []
for k in ("status", "anomalies", "duration_ms", "change_id", "error_id", "message"):
v = ev.get(k)
if v is not None:
extras.append(f"{k}={v}")
extras_str = " ".join(extras)[:200]
print(f" {ts} {lvl:5} {evname:18} w={wid} loc={loc} wf={wf} {extras_str}")
if errors:
print()
print(f"--- Errores técnicos en errors.jsonl ({len(errors)}) ---")
for e in errors[:20]:
print(f" {e.get('timestamp', '')[:19]} id={e.get('error_id')} event={e.get('event')}")
exc = e.get('exception') or {}
if exc.get('type') or exc.get('message'):
print(f" {exc.get('type', '')}: {(exc.get('message') or '')[:200]}")
def main():
parser = argparse.ArgumentParser(description="Inspector read-only de runs de script_audit + logs.")
parser.add_argument("run_id", nargs="?", help="ID del run a inspeccionar. Omitir con --list.")
parser.add_argument("--list", type=int, metavar="N", help="Listar últimos N runs y salir.")
parser.add_argument("--json", action="store_true", help="Salida en JSON crudo (para pipes).")
parser.add_argument("--events", type=int, default=20, help="Cuántos eventos del log mostrar (default 20).")
args = parser.parse_args()
if args.list:
cmd_list(args.list)
return
if not args.run_id:
parser.error("Debes pasar <run_id> o --list N.")
cmd_show(args.run_id, as_json=args.json, events_limit=args.events)
if __name__ == "__main__":
main()