import json import os import sqlite3 import time from datetime import datetime import requests from paths import DB_PATH BASE_URL = "https://services.leadconnectorhq.com" API_VERSION = "2021-07-28" _AUDIT_DB_INITIALIZED = False def get_conn(): conn = sqlite3.connect(DB_PATH, timeout=30.0) conn.row_factory = sqlite3.Row conn.execute("PRAGMA foreign_keys = ON") conn.execute("PRAGMA busy_timeout = 30000") return conn def init_audit_db(): global _AUDIT_DB_INITIALIZED if _AUDIT_DB_INITIALIZED: return conn = get_conn() try: try: conn.execute("PRAGMA journal_mode=WAL") conn.execute("PRAGMA synchronous=NORMAL") except sqlite3.DatabaseError: pass conn.execute(""" CREATE TABLE IF NOT EXISTS script_runs ( id TEXT PRIMARY KEY, script_name TEXT NOT NULL, arguments TEXT, locations_json TEXT, execution_mode TEXT NOT NULL DEFAULT 'sequential', status TEXT NOT NULL DEFAULT 'running', started_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), finished_at TEXT, error_message TEXT ) """) conn.execute(""" CREATE TABLE IF NOT EXISTS script_change_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, run_id TEXT NOT NULL, location_id TEXT NOT NULL, object_type TEXT NOT NULL, object_id TEXT NOT NULL, field_id TEXT NOT NULL, field_name TEXT, old_value_json TEXT, new_value_json TEXT, status TEXT NOT NULL DEFAULT 'planned', planned_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), applied_at TEXT, rolled_back_at TEXT, error_message TEXT, FOREIGN KEY(run_id) REFERENCES script_runs(id) ON DELETE CASCADE ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_change_log_run ON script_change_log(run_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_change_log_object ON script_change_log(object_type, object_id)") conn.execute(""" CREATE TABLE IF NOT EXISTS script_run_control ( run_id TEXT PRIMARY KEY, pause_requested INTEGER NOT NULL DEFAULT 0, stop_requested INTEGER NOT NULL DEFAULT 0, updated_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), FOREIGN KEY(run_id) REFERENCES script_runs(id) ON DELETE CASCADE ) """) conn.commit() _AUDIT_DB_INITIALIZED = True finally: conn.close() def create_run(run_id, script_name, arguments="", locations=None, execution_mode="sequential"): init_audit_db() conn = get_conn() try: conn.execute(""" INSERT OR IGNORE INTO script_runs (id, script_name, arguments, locations_json, execution_mode, status) VALUES (?, ?, ?, ?, ?, 'running') """, (run_id, script_name, arguments or "", json.dumps(locations or []), execution_mode)) conn.execute(""" INSERT OR IGNORE INTO script_run_control (run_id, pause_requested, stop_requested) VALUES (?, 0, 0) """, (run_id,)) conn.commit() finally: conn.close() def update_run_status(run_id, status, error_message=None): init_audit_db() conn = get_conn() try: finished = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if status in {"success", "failed", "stopped", "rolled_back"} else None conn.execute(""" UPDATE script_runs SET status = ?, error_message = COALESCE(?, error_message), finished_at = COALESCE(?, finished_at) WHERE id = ? """, (status, error_message, finished, run_id)) conn.commit() finally: conn.close() def set_control(run_id, *, pause=None, stop=None): init_audit_db() conn = get_conn() try: current = conn.execute("SELECT * FROM script_run_control WHERE run_id = ?", (run_id,)).fetchone() pause_value = current["pause_requested"] if current and pause is None else int(bool(pause)) stop_value = current["stop_requested"] if current and stop is None else int(bool(stop)) conn.execute(""" INSERT INTO script_run_control (run_id, pause_requested, stop_requested, updated_at) VALUES (?, ?, ?, datetime('now', 'localtime')) ON CONFLICT(run_id) DO UPDATE SET pause_requested = excluded.pause_requested, stop_requested = excluded.stop_requested, updated_at = excluded.updated_at """, (run_id, pause_value, stop_value)) conn.commit() finally: conn.close() def get_control(run_id): init_audit_db() conn = get_conn() try: row = conn.execute("SELECT * FROM script_run_control WHERE run_id = ?", (run_id,)).fetchone() return dict(row) if row else {"pause_requested": 0, "stop_requested": 0} finally: conn.close() def wait_if_paused_or_stopped(run_id): if not run_id: return True while True: control = get_control(run_id) if control.get("stop_requested"): update_run_status(run_id, "stopped") return False if not control.get("pause_requested"): return True update_run_status(run_id, "paused") time.sleep(1) def json_value(value): return json.dumps(value, ensure_ascii=False) def record_change(run_id, location_id, object_type, object_id, field_id, field_name, old_value, new_value): if not run_id: return None init_audit_db() conn = get_conn() try: cur = conn.execute(""" INSERT INTO script_change_log (run_id, location_id, object_type, object_id, field_id, field_name, old_value_json, new_value_json, status) VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'planned') """, (run_id, location_id, object_type, object_id, field_id, field_name, json_value(old_value), json_value(new_value))) conn.commit() return cur.lastrowid finally: conn.close() def mark_change(change_id, status, error_message=None): if not change_id: return init_audit_db() conn = get_conn() try: applied_at = "datetime('now', 'localtime')" if status == "applied" else "applied_at" rolled_back_at = "datetime('now', 'localtime')" if status == "rolled_back" else "rolled_back_at" conn.execute(f""" UPDATE script_change_log SET status = ?, error_message = ?, applied_at = {applied_at}, rolled_back_at = {rolled_back_at} WHERE id = ? """, (status, error_message, change_id)) conn.commit() finally: conn.close() def list_runs(limit=20): init_audit_db() conn = get_conn() try: rows = conn.execute(""" SELECT r.*, COUNT(c.id) AS change_count, SUM(CASE WHEN c.status = 'applied' THEN 1 ELSE 0 END) AS applied_count, SUM(CASE WHEN c.status = 'rolled_back' THEN 1 ELSE 0 END) AS rolled_back_count FROM script_runs r LEFT JOIN script_change_log c ON c.run_id = r.id GROUP BY r.id ORDER BY r.started_at DESC LIMIT ? """, (limit,)).fetchall() return [dict(row) for row in rows] finally: conn.close() def list_changes(run_id): init_audit_db() conn = get_conn() try: rows = conn.execute("SELECT * FROM script_change_log WHERE run_id = ? ORDER BY id DESC", (run_id,)).fetchall() return [dict(row) for row in rows] finally: conn.close() def get_run_summary(run_id, error_limit=10): init_audit_db() conn = get_conn() try: run = conn.execute("SELECT * FROM script_runs WHERE id = ?", (run_id,)).fetchone() status_rows = conn.execute(""" SELECT status, COUNT(*) AS count FROM script_change_log WHERE run_id = ? GROUP BY status ORDER BY status """, (run_id,)).fetchall() object_rows = conn.execute(""" SELECT object_type, COUNT(*) AS count FROM script_change_log WHERE run_id = ? GROUP BY object_type ORDER BY object_type """, (run_id,)).fetchall() field_rows = conn.execute(""" SELECT COALESCE(field_name, '(sin nombre)') AS field_name, COUNT(*) AS count FROM script_change_log WHERE run_id = ? GROUP BY COALESCE(field_name, '(sin nombre)') ORDER BY count DESC, field_name """, (run_id,)).fetchall() failed_rows = conn.execute(""" SELECT location_id, object_type, object_id, field_name, error_message FROM script_change_log WHERE run_id = ? AND status = 'failed' ORDER BY id DESC LIMIT ? """, (run_id, error_limit)).fetchall() by_status = {row["status"]: row["count"] for row in status_rows} return { "run": dict(run) if run else None, "total_changes": sum(by_status.values()), "by_status": by_status, "by_object_type": {row["object_type"]: row["count"] for row in object_rows}, "by_field_name": {row["field_name"]: row["count"] for row in field_rows}, "failed_changes": [dict(row) for row in failed_rows], } finally: conn.close() def decode_json(value): if value is None: return None return json.loads(value) def resolve_custom_field_id(token, location_id, object_type, field_name): if not field_name: raise ValueError("Rollback requires field_name to resolve the dynamic custom field ID") import sync_engine object_key = "contact" if object_type == "contact" else "opportunity" schema = sync_engine.ghl_client.get_object_schema(token, location_id, object_key) field_id = schema.get(field_name) if not field_id: raise ValueError(f"Field '{field_name}' not found in {object_key} schema for {location_id}") return field_id def ghl_put_custom_field(token, object_type, object_id, field_id, value): endpoint = "/contacts/" if object_type == "contact" else "/opportunities/" url = f"{BASE_URL}{endpoint}{object_id}" headers = { "Accept": "application/json", "Authorization": f"Bearer {token}", "Version": API_VERSION, "Content-Type": "application/json", } response = requests.put(url, headers=headers, json={"customFields": [{"id": field_id, "value": value}]}, timeout=45) response.raise_for_status() def rollback_run(run_id): import sync_engine init_audit_db() tokens = sync_engine.get_tokens_map() changes = list_changes(run_id) targets = [c for c in changes if c["status"] == "applied"] update_run_status(run_id, "rollback_running") rolled_back = 0 failed = 0 for change in targets: token = tokens.get(change["location_id"]) if not token: mark_change(change["id"], "failed", "No token for rollback location") failed += 1 continue try: old_value = decode_json(change["old_value_json"]) field_id = resolve_custom_field_id( token, change["location_id"], change["object_type"], change["field_name"], ) ghl_put_custom_field(token, change["object_type"], change["object_id"], field_id, old_value) mark_change(change["id"], "rolled_back") rolled_back += 1 time.sleep(0.6) except Exception as exc: mark_change(change["id"], "failed", str(exc)) failed += 1 final_status = "rolled_back" if failed == 0 else "failed" update_run_status(run_id, final_status, None if failed == 0 else f"Rollback failures: {failed}") return {"run_id": run_id, "rolled_back": rolled_back, "failed": failed} init_audit_db()