Files
MP-Manager/script_logger.py
2026-05-30 14:31:19 -06:00

186 lines
6.5 KiB
Python

"""Logger estructurado por-run para scripts (incluido Playwright paralelo).
Escribe una línea JSON por evento a generated/logs/script_runs/{run_id}.jsonl.
Thread-safe: cada instancia serializa appends con un Lock interno. Pensado para
que los bulks paralelos puedan emitir eventos desde N workers sin race.
Complementa (no reemplaza):
- print() a stdout: que SSE tail-ea en vivo para el dashboard.
- error_logging.log_error: errores técnicos con sanitización + rotación.
- script_audit: estado de mutaciones (planned/applied/failed) en SQLite.
Para forensics post-mortem hay 4 fuentes de verdad:
1. script_runs[run_id] + script_change_log → ¿qué se intentó/aplicó?
2. generated/logs/script_runs/{run_id}.jsonl → cronología de eventos
3. generated/logs/errors.jsonl filtrado por run_id → excepciones
4. generated/browser/screenshots/* → estado visual al fallar
"""
import json
import os
import sys
import threading
import time
from datetime import datetime
from paths import LOGS_DIR
SCRIPT_RUNS_DIR = os.path.join(LOGS_DIR, "script_runs")
def _now_iso():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
class RunLogger:
"""Emisor de eventos estructurados para un run específico.
Uso típico (en un bulk paralelo):
rlog = RunLogger(run_id, "ghl_browser_workflow_manager.py")
rlog.info("bulk_start", workers=4, items=12)
# ... en cada worker:
rlog.info("item_start", worker_id=2, location_id=loc, workflow_id=wf)
rlog.info("item_done", worker_id=2, location_id=loc, workflow_id=wf,
change_id=cid, status="applied", duration_ms=42100)
rlog.error("item_failed", worker_id=2, location_id=loc, workflow_id=wf,
error_id=eid, message=str(exc))
rlog.close()
Si run_id es None/"", el logger entra en modo no-op (no escribe nada) —
útil para mantener backward-compat cuando el script corre sin --run-id.
"""
def __init__(self, run_id, script_name):
self.run_id = run_id or ""
self.script_name = script_name or "unknown"
self._lock = threading.Lock()
self._path = None
self._fp = None
self._closed = False
self._enabled = bool(self.run_id)
if not self._enabled:
return
try:
os.makedirs(SCRIPT_RUNS_DIR, exist_ok=True)
self._path = os.path.join(SCRIPT_RUNS_DIR, f"{self.run_id}.jsonl")
# Open in append mode con line buffering para que cada evento se
# vea inmediatamente si alguien hace tail -f.
self._fp = open(self._path, "a", encoding="utf-8", buffering=1)
except Exception:
# Nunca fallar por logging — degradar a no-op.
self._enabled = False
self._fp = None
@property
def path(self):
return self._path
def _write(self, payload):
if not self._enabled or self._closed or self._fp is None:
return
try:
line = json.dumps(payload, ensure_ascii=False, default=str)
except Exception:
try:
line = json.dumps({"ts": _now_iso(), "level": "error",
"event": "logger_serialize_failed",
"raw_repr": repr(payload)[:500]})
except Exception:
return
with self._lock:
if self._closed or self._fp is None:
return
try:
self._fp.write(line + "\n")
self._fp.flush()
except Exception:
# No relanzar — un error de IO no debe tumbar un bulk en marcha.
pass
def event(self, level, event, **payload):
body = {
"ts": _now_iso(),
"level": (level or "info").lower(),
"run_id": self.run_id,
"script": self.script_name,
"event": event,
}
# Sanitizar valores grandes / no-serializables.
for k, v in payload.items():
if v is None:
body[k] = None
elif isinstance(v, (str, int, float, bool)):
body[k] = v if not isinstance(v, str) else v[:2000]
else:
try:
json.dumps(v, default=str)
body[k] = v
except Exception:
body[k] = repr(v)[:500]
self._write(body)
def info(self, event, **payload):
self.event("info", event, **payload)
def warn(self, event, **payload):
self.event("warn", event, **payload)
def error(self, event, **payload):
self.event("error", event, **payload)
def close(self):
if not self._enabled or self._closed:
return
try:
self.event("info", "logger_closed", pid=os.getpid())
finally:
with self._lock:
self._closed = True
if self._fp is not None:
try:
self._fp.close()
except Exception:
pass
self._fp = None
# Soporte uso como context manager.
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
self.close()
return False
def read_run_log(run_id, limit=None):
"""Devuelve lista de eventos parseados del JSONL de un run. Útil para audit_run.py."""
if not run_id:
return []
path = os.path.join(SCRIPT_RUNS_DIR, f"{run_id}.jsonl")
if not os.path.exists(path):
return []
out = []
with open(path, encoding="utf-8") as f:
for ln in f:
ln = ln.strip()
if not ln:
continue
try:
out.append(json.loads(ln))
except Exception:
out.append({"event": "unparseable", "raw": ln[:200]})
if limit and limit > 0:
out = out[-limit:]
return out
if __name__ == "__main__":
# Self-test mínimo (no destructivo).
rid = "selftest_" + datetime.now().strftime("%Y%m%d_%H%M%S")
log = RunLogger(rid, "script_logger.py")
log.info("bulk_start", workers=2, items=3)
log.info("item_done", worker_id=1, location_id="DEMO", change_id=42, status="applied", duration_ms=1234)
log.error("item_failed", worker_id=2, location_id="DEMO", error_id="abc-123", message="ejemplo")
log.close()
print(f"OK: events written to {log.path}")
print(f"events read back: {len(read_run_log(rid))}")