Files
MP-Manager/script_audit.py
2026-05-30 14:31:19 -06:00

347 lines
12 KiB
Python

import json
import os
import sqlite3
import time
from datetime import datetime
import requests
from paths import DB_PATH
BASE_URL = "https://services.leadconnectorhq.com"
API_VERSION = "2021-07-28"
_AUDIT_DB_INITIALIZED = False
def get_conn():
conn = sqlite3.connect(DB_PATH, timeout=30.0)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
conn.execute("PRAGMA busy_timeout = 30000")
return conn
def init_audit_db():
global _AUDIT_DB_INITIALIZED
if _AUDIT_DB_INITIALIZED:
return
conn = get_conn()
try:
try:
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA synchronous=NORMAL")
except sqlite3.DatabaseError:
pass
conn.execute("""
CREATE TABLE IF NOT EXISTS script_runs (
id TEXT PRIMARY KEY,
script_name TEXT NOT NULL,
arguments TEXT,
locations_json TEXT,
execution_mode TEXT NOT NULL DEFAULT 'sequential',
status TEXT NOT NULL DEFAULT 'running',
started_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
finished_at TEXT,
error_message TEXT
)
""")
conn.execute("""
CREATE TABLE IF NOT EXISTS script_change_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
run_id TEXT NOT NULL,
location_id TEXT NOT NULL,
object_type TEXT NOT NULL,
object_id TEXT NOT NULL,
field_id TEXT NOT NULL,
field_name TEXT,
old_value_json TEXT,
new_value_json TEXT,
status TEXT NOT NULL DEFAULT 'planned',
planned_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
applied_at TEXT,
rolled_back_at TEXT,
error_message TEXT,
FOREIGN KEY(run_id) REFERENCES script_runs(id) ON DELETE CASCADE
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_change_log_run ON script_change_log(run_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_change_log_object ON script_change_log(object_type, object_id)")
conn.execute("""
CREATE TABLE IF NOT EXISTS script_run_control (
run_id TEXT PRIMARY KEY,
pause_requested INTEGER NOT NULL DEFAULT 0,
stop_requested INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
FOREIGN KEY(run_id) REFERENCES script_runs(id) ON DELETE CASCADE
)
""")
conn.commit()
_AUDIT_DB_INITIALIZED = True
finally:
conn.close()
def create_run(run_id, script_name, arguments="", locations=None, execution_mode="sequential"):
init_audit_db()
conn = get_conn()
try:
conn.execute("""
INSERT OR IGNORE INTO script_runs (id, script_name, arguments, locations_json, execution_mode, status)
VALUES (?, ?, ?, ?, ?, 'running')
""", (run_id, script_name, arguments or "", json.dumps(locations or []), execution_mode))
conn.execute("""
INSERT OR IGNORE INTO script_run_control (run_id, pause_requested, stop_requested)
VALUES (?, 0, 0)
""", (run_id,))
conn.commit()
finally:
conn.close()
def update_run_status(run_id, status, error_message=None):
init_audit_db()
conn = get_conn()
try:
finished = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if status in {"success", "failed", "stopped", "rolled_back"} else None
conn.execute("""
UPDATE script_runs
SET status = ?, error_message = COALESCE(?, error_message), finished_at = COALESCE(?, finished_at)
WHERE id = ?
""", (status, error_message, finished, run_id))
conn.commit()
finally:
conn.close()
def set_control(run_id, *, pause=None, stop=None):
init_audit_db()
conn = get_conn()
try:
current = conn.execute("SELECT * FROM script_run_control WHERE run_id = ?", (run_id,)).fetchone()
pause_value = current["pause_requested"] if current and pause is None else int(bool(pause))
stop_value = current["stop_requested"] if current and stop is None else int(bool(stop))
conn.execute("""
INSERT INTO script_run_control (run_id, pause_requested, stop_requested, updated_at)
VALUES (?, ?, ?, datetime('now', 'localtime'))
ON CONFLICT(run_id) DO UPDATE SET
pause_requested = excluded.pause_requested,
stop_requested = excluded.stop_requested,
updated_at = excluded.updated_at
""", (run_id, pause_value, stop_value))
conn.commit()
finally:
conn.close()
def get_control(run_id):
init_audit_db()
conn = get_conn()
try:
row = conn.execute("SELECT * FROM script_run_control WHERE run_id = ?", (run_id,)).fetchone()
return dict(row) if row else {"pause_requested": 0, "stop_requested": 0}
finally:
conn.close()
def wait_if_paused_or_stopped(run_id):
if not run_id:
return True
while True:
control = get_control(run_id)
if control.get("stop_requested"):
update_run_status(run_id, "stopped")
return False
if not control.get("pause_requested"):
return True
update_run_status(run_id, "paused")
time.sleep(1)
def json_value(value):
return json.dumps(value, ensure_ascii=False)
def record_change(run_id, location_id, object_type, object_id, field_id, field_name, old_value, new_value):
if not run_id:
return None
init_audit_db()
conn = get_conn()
try:
cur = conn.execute("""
INSERT INTO script_change_log
(run_id, location_id, object_type, object_id, field_id, field_name, old_value_json, new_value_json, status)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'planned')
""", (run_id, location_id, object_type, object_id, field_id, field_name, json_value(old_value), json_value(new_value)))
conn.commit()
return cur.lastrowid
finally:
conn.close()
def mark_change(change_id, status, error_message=None):
if not change_id:
return
init_audit_db()
conn = get_conn()
try:
applied_at = "datetime('now', 'localtime')" if status == "applied" else "applied_at"
rolled_back_at = "datetime('now', 'localtime')" if status == "rolled_back" else "rolled_back_at"
conn.execute(f"""
UPDATE script_change_log
SET status = ?, error_message = ?, applied_at = {applied_at}, rolled_back_at = {rolled_back_at}
WHERE id = ?
""", (status, error_message, change_id))
conn.commit()
finally:
conn.close()
def list_runs(limit=20):
init_audit_db()
conn = get_conn()
try:
rows = conn.execute("""
SELECT r.*,
COUNT(c.id) AS change_count,
SUM(CASE WHEN c.status = 'applied' THEN 1 ELSE 0 END) AS applied_count,
SUM(CASE WHEN c.status = 'rolled_back' THEN 1 ELSE 0 END) AS rolled_back_count
FROM script_runs r
LEFT JOIN script_change_log c ON c.run_id = r.id
GROUP BY r.id
ORDER BY r.started_at DESC
LIMIT ?
""", (limit,)).fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
def list_changes(run_id):
init_audit_db()
conn = get_conn()
try:
rows = conn.execute("SELECT * FROM script_change_log WHERE run_id = ? ORDER BY id DESC", (run_id,)).fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
def get_run_summary(run_id, error_limit=10):
init_audit_db()
conn = get_conn()
try:
run = conn.execute("SELECT * FROM script_runs WHERE id = ?", (run_id,)).fetchone()
status_rows = conn.execute("""
SELECT status, COUNT(*) AS count
FROM script_change_log
WHERE run_id = ?
GROUP BY status
ORDER BY status
""", (run_id,)).fetchall()
object_rows = conn.execute("""
SELECT object_type, COUNT(*) AS count
FROM script_change_log
WHERE run_id = ?
GROUP BY object_type
ORDER BY object_type
""", (run_id,)).fetchall()
field_rows = conn.execute("""
SELECT COALESCE(field_name, '(sin nombre)') AS field_name, COUNT(*) AS count
FROM script_change_log
WHERE run_id = ?
GROUP BY COALESCE(field_name, '(sin nombre)')
ORDER BY count DESC, field_name
""", (run_id,)).fetchall()
failed_rows = conn.execute("""
SELECT location_id, object_type, object_id, field_name, error_message
FROM script_change_log
WHERE run_id = ? AND status = 'failed'
ORDER BY id DESC
LIMIT ?
""", (run_id, error_limit)).fetchall()
by_status = {row["status"]: row["count"] for row in status_rows}
return {
"run": dict(run) if run else None,
"total_changes": sum(by_status.values()),
"by_status": by_status,
"by_object_type": {row["object_type"]: row["count"] for row in object_rows},
"by_field_name": {row["field_name"]: row["count"] for row in field_rows},
"failed_changes": [dict(row) for row in failed_rows],
}
finally:
conn.close()
def decode_json(value):
if value is None:
return None
return json.loads(value)
def resolve_custom_field_id(token, location_id, object_type, field_name):
if not field_name:
raise ValueError("Rollback requires field_name to resolve the dynamic custom field ID")
import sync_engine
object_key = "contact" if object_type == "contact" else "opportunity"
schema = sync_engine.ghl_client.get_object_schema(token, location_id, object_key)
field_id = schema.get(field_name)
if not field_id:
raise ValueError(f"Field '{field_name}' not found in {object_key} schema for {location_id}")
return field_id
def ghl_put_custom_field(token, object_type, object_id, field_id, value):
endpoint = "/contacts/" if object_type == "contact" else "/opportunities/"
url = f"{BASE_URL}{endpoint}{object_id}"
headers = {
"Accept": "application/json",
"Authorization": f"Bearer {token}",
"Version": API_VERSION,
"Content-Type": "application/json",
}
response = requests.put(url, headers=headers, json={"customFields": [{"id": field_id, "value": value}]}, timeout=45)
response.raise_for_status()
def rollback_run(run_id):
import sync_engine
init_audit_db()
tokens = sync_engine.get_tokens_map()
changes = list_changes(run_id)
targets = [c for c in changes if c["status"] == "applied"]
update_run_status(run_id, "rollback_running")
rolled_back = 0
failed = 0
for change in targets:
token = tokens.get(change["location_id"])
if not token:
mark_change(change["id"], "failed", "No token for rollback location")
failed += 1
continue
try:
old_value = decode_json(change["old_value_json"])
field_id = resolve_custom_field_id(
token,
change["location_id"],
change["object_type"],
change["field_name"],
)
ghl_put_custom_field(token, change["object_type"], change["object_id"], field_id, old_value)
mark_change(change["id"], "rolled_back")
rolled_back += 1
time.sleep(0.6)
except Exception as exc:
mark_change(change["id"], "failed", str(exc))
failed += 1
final_status = "rolled_back" if failed == 0 else "failed"
update_run_status(run_id, final_status, None if failed == 0 else f"Rollback failures: {failed}")
return {"run_id": run_id, "rolled_back": rolled_back, "failed": failed}
init_audit_db()