347 lines
12 KiB
Python
347 lines
12 KiB
Python
import json
|
|
import os
|
|
import sqlite3
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
|
|
from paths import DB_PATH
|
|
|
|
BASE_URL = "https://services.leadconnectorhq.com"
|
|
API_VERSION = "2021-07-28"
|
|
_AUDIT_DB_INITIALIZED = False
|
|
|
|
|
|
def get_conn():
|
|
conn = sqlite3.connect(DB_PATH, timeout=30.0)
|
|
conn.row_factory = sqlite3.Row
|
|
conn.execute("PRAGMA foreign_keys = ON")
|
|
conn.execute("PRAGMA busy_timeout = 30000")
|
|
return conn
|
|
|
|
|
|
def init_audit_db():
|
|
global _AUDIT_DB_INITIALIZED
|
|
if _AUDIT_DB_INITIALIZED:
|
|
return
|
|
|
|
conn = get_conn()
|
|
try:
|
|
try:
|
|
conn.execute("PRAGMA journal_mode=WAL")
|
|
conn.execute("PRAGMA synchronous=NORMAL")
|
|
except sqlite3.DatabaseError:
|
|
pass
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS script_runs (
|
|
id TEXT PRIMARY KEY,
|
|
script_name TEXT NOT NULL,
|
|
arguments TEXT,
|
|
locations_json TEXT,
|
|
execution_mode TEXT NOT NULL DEFAULT 'sequential',
|
|
status TEXT NOT NULL DEFAULT 'running',
|
|
started_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
|
|
finished_at TEXT,
|
|
error_message TEXT
|
|
)
|
|
""")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS script_change_log (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
run_id TEXT NOT NULL,
|
|
location_id TEXT NOT NULL,
|
|
object_type TEXT NOT NULL,
|
|
object_id TEXT NOT NULL,
|
|
field_id TEXT NOT NULL,
|
|
field_name TEXT,
|
|
old_value_json TEXT,
|
|
new_value_json TEXT,
|
|
status TEXT NOT NULL DEFAULT 'planned',
|
|
planned_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
|
|
applied_at TEXT,
|
|
rolled_back_at TEXT,
|
|
error_message TEXT,
|
|
FOREIGN KEY(run_id) REFERENCES script_runs(id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_change_log_run ON script_change_log(run_id)")
|
|
conn.execute("CREATE INDEX IF NOT EXISTS idx_change_log_object ON script_change_log(object_type, object_id)")
|
|
conn.execute("""
|
|
CREATE TABLE IF NOT EXISTS script_run_control (
|
|
run_id TEXT PRIMARY KEY,
|
|
pause_requested INTEGER NOT NULL DEFAULT 0,
|
|
stop_requested INTEGER NOT NULL DEFAULT 0,
|
|
updated_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
|
|
FOREIGN KEY(run_id) REFERENCES script_runs(id) ON DELETE CASCADE
|
|
)
|
|
""")
|
|
conn.commit()
|
|
_AUDIT_DB_INITIALIZED = True
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def create_run(run_id, script_name, arguments="", locations=None, execution_mode="sequential"):
|
|
init_audit_db()
|
|
conn = get_conn()
|
|
try:
|
|
conn.execute("""
|
|
INSERT OR IGNORE INTO script_runs (id, script_name, arguments, locations_json, execution_mode, status)
|
|
VALUES (?, ?, ?, ?, ?, 'running')
|
|
""", (run_id, script_name, arguments or "", json.dumps(locations or []), execution_mode))
|
|
conn.execute("""
|
|
INSERT OR IGNORE INTO script_run_control (run_id, pause_requested, stop_requested)
|
|
VALUES (?, 0, 0)
|
|
""", (run_id,))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def update_run_status(run_id, status, error_message=None):
|
|
init_audit_db()
|
|
conn = get_conn()
|
|
try:
|
|
finished = datetime.now().strftime("%Y-%m-%d %H:%M:%S") if status in {"success", "failed", "stopped", "rolled_back"} else None
|
|
conn.execute("""
|
|
UPDATE script_runs
|
|
SET status = ?, error_message = COALESCE(?, error_message), finished_at = COALESCE(?, finished_at)
|
|
WHERE id = ?
|
|
""", (status, error_message, finished, run_id))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def set_control(run_id, *, pause=None, stop=None):
|
|
init_audit_db()
|
|
conn = get_conn()
|
|
try:
|
|
current = conn.execute("SELECT * FROM script_run_control WHERE run_id = ?", (run_id,)).fetchone()
|
|
pause_value = current["pause_requested"] if current and pause is None else int(bool(pause))
|
|
stop_value = current["stop_requested"] if current and stop is None else int(bool(stop))
|
|
conn.execute("""
|
|
INSERT INTO script_run_control (run_id, pause_requested, stop_requested, updated_at)
|
|
VALUES (?, ?, ?, datetime('now', 'localtime'))
|
|
ON CONFLICT(run_id) DO UPDATE SET
|
|
pause_requested = excluded.pause_requested,
|
|
stop_requested = excluded.stop_requested,
|
|
updated_at = excluded.updated_at
|
|
""", (run_id, pause_value, stop_value))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_control(run_id):
|
|
init_audit_db()
|
|
conn = get_conn()
|
|
try:
|
|
row = conn.execute("SELECT * FROM script_run_control WHERE run_id = ?", (run_id,)).fetchone()
|
|
return dict(row) if row else {"pause_requested": 0, "stop_requested": 0}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def wait_if_paused_or_stopped(run_id):
|
|
if not run_id:
|
|
return True
|
|
while True:
|
|
control = get_control(run_id)
|
|
if control.get("stop_requested"):
|
|
update_run_status(run_id, "stopped")
|
|
return False
|
|
if not control.get("pause_requested"):
|
|
return True
|
|
update_run_status(run_id, "paused")
|
|
time.sleep(1)
|
|
|
|
|
|
def json_value(value):
|
|
return json.dumps(value, ensure_ascii=False)
|
|
|
|
|
|
def record_change(run_id, location_id, object_type, object_id, field_id, field_name, old_value, new_value):
|
|
if not run_id:
|
|
return None
|
|
init_audit_db()
|
|
conn = get_conn()
|
|
try:
|
|
cur = conn.execute("""
|
|
INSERT INTO script_change_log
|
|
(run_id, location_id, object_type, object_id, field_id, field_name, old_value_json, new_value_json, status)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, 'planned')
|
|
""", (run_id, location_id, object_type, object_id, field_id, field_name, json_value(old_value), json_value(new_value)))
|
|
conn.commit()
|
|
return cur.lastrowid
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def mark_change(change_id, status, error_message=None):
|
|
if not change_id:
|
|
return
|
|
init_audit_db()
|
|
conn = get_conn()
|
|
try:
|
|
applied_at = "datetime('now', 'localtime')" if status == "applied" else "applied_at"
|
|
rolled_back_at = "datetime('now', 'localtime')" if status == "rolled_back" else "rolled_back_at"
|
|
conn.execute(f"""
|
|
UPDATE script_change_log
|
|
SET status = ?, error_message = ?, applied_at = {applied_at}, rolled_back_at = {rolled_back_at}
|
|
WHERE id = ?
|
|
""", (status, error_message, change_id))
|
|
conn.commit()
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def list_runs(limit=20):
|
|
init_audit_db()
|
|
conn = get_conn()
|
|
try:
|
|
rows = conn.execute("""
|
|
SELECT r.*,
|
|
COUNT(c.id) AS change_count,
|
|
SUM(CASE WHEN c.status = 'applied' THEN 1 ELSE 0 END) AS applied_count,
|
|
SUM(CASE WHEN c.status = 'rolled_back' THEN 1 ELSE 0 END) AS rolled_back_count
|
|
FROM script_runs r
|
|
LEFT JOIN script_change_log c ON c.run_id = r.id
|
|
GROUP BY r.id
|
|
ORDER BY r.started_at DESC
|
|
LIMIT ?
|
|
""", (limit,)).fetchall()
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def list_changes(run_id):
|
|
init_audit_db()
|
|
conn = get_conn()
|
|
try:
|
|
rows = conn.execute("SELECT * FROM script_change_log WHERE run_id = ? ORDER BY id DESC", (run_id,)).fetchall()
|
|
return [dict(row) for row in rows]
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def get_run_summary(run_id, error_limit=10):
|
|
init_audit_db()
|
|
conn = get_conn()
|
|
try:
|
|
run = conn.execute("SELECT * FROM script_runs WHERE id = ?", (run_id,)).fetchone()
|
|
status_rows = conn.execute("""
|
|
SELECT status, COUNT(*) AS count
|
|
FROM script_change_log
|
|
WHERE run_id = ?
|
|
GROUP BY status
|
|
ORDER BY status
|
|
""", (run_id,)).fetchall()
|
|
object_rows = conn.execute("""
|
|
SELECT object_type, COUNT(*) AS count
|
|
FROM script_change_log
|
|
WHERE run_id = ?
|
|
GROUP BY object_type
|
|
ORDER BY object_type
|
|
""", (run_id,)).fetchall()
|
|
field_rows = conn.execute("""
|
|
SELECT COALESCE(field_name, '(sin nombre)') AS field_name, COUNT(*) AS count
|
|
FROM script_change_log
|
|
WHERE run_id = ?
|
|
GROUP BY COALESCE(field_name, '(sin nombre)')
|
|
ORDER BY count DESC, field_name
|
|
""", (run_id,)).fetchall()
|
|
failed_rows = conn.execute("""
|
|
SELECT location_id, object_type, object_id, field_name, error_message
|
|
FROM script_change_log
|
|
WHERE run_id = ? AND status = 'failed'
|
|
ORDER BY id DESC
|
|
LIMIT ?
|
|
""", (run_id, error_limit)).fetchall()
|
|
by_status = {row["status"]: row["count"] for row in status_rows}
|
|
return {
|
|
"run": dict(run) if run else None,
|
|
"total_changes": sum(by_status.values()),
|
|
"by_status": by_status,
|
|
"by_object_type": {row["object_type"]: row["count"] for row in object_rows},
|
|
"by_field_name": {row["field_name"]: row["count"] for row in field_rows},
|
|
"failed_changes": [dict(row) for row in failed_rows],
|
|
}
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def decode_json(value):
|
|
if value is None:
|
|
return None
|
|
return json.loads(value)
|
|
|
|
|
|
def resolve_custom_field_id(token, location_id, object_type, field_name):
|
|
if not field_name:
|
|
raise ValueError("Rollback requires field_name to resolve the dynamic custom field ID")
|
|
import sync_engine
|
|
|
|
object_key = "contact" if object_type == "contact" else "opportunity"
|
|
schema = sync_engine.ghl_client.get_object_schema(token, location_id, object_key)
|
|
field_id = schema.get(field_name)
|
|
if not field_id:
|
|
raise ValueError(f"Field '{field_name}' not found in {object_key} schema for {location_id}")
|
|
return field_id
|
|
|
|
|
|
def ghl_put_custom_field(token, object_type, object_id, field_id, value):
|
|
endpoint = "/contacts/" if object_type == "contact" else "/opportunities/"
|
|
url = f"{BASE_URL}{endpoint}{object_id}"
|
|
headers = {
|
|
"Accept": "application/json",
|
|
"Authorization": f"Bearer {token}",
|
|
"Version": API_VERSION,
|
|
"Content-Type": "application/json",
|
|
}
|
|
response = requests.put(url, headers=headers, json={"customFields": [{"id": field_id, "value": value}]}, timeout=45)
|
|
response.raise_for_status()
|
|
|
|
|
|
def rollback_run(run_id):
|
|
import sync_engine
|
|
|
|
init_audit_db()
|
|
tokens = sync_engine.get_tokens_map()
|
|
changes = list_changes(run_id)
|
|
targets = [c for c in changes if c["status"] == "applied"]
|
|
update_run_status(run_id, "rollback_running")
|
|
rolled_back = 0
|
|
failed = 0
|
|
|
|
for change in targets:
|
|
token = tokens.get(change["location_id"])
|
|
if not token:
|
|
mark_change(change["id"], "failed", "No token for rollback location")
|
|
failed += 1
|
|
continue
|
|
try:
|
|
old_value = decode_json(change["old_value_json"])
|
|
field_id = resolve_custom_field_id(
|
|
token,
|
|
change["location_id"],
|
|
change["object_type"],
|
|
change["field_name"],
|
|
)
|
|
ghl_put_custom_field(token, change["object_type"], change["object_id"], field_id, old_value)
|
|
mark_change(change["id"], "rolled_back")
|
|
rolled_back += 1
|
|
time.sleep(0.6)
|
|
except Exception as exc:
|
|
mark_change(change["id"], "failed", str(exc))
|
|
failed += 1
|
|
|
|
final_status = "rolled_back" if failed == 0 else "failed"
|
|
update_run_status(run_id, final_status, None if failed == 0 else f"Rollback failures: {failed}")
|
|
return {"run_id": run_id, "rolled_back": rolled_back, "failed": failed}
|
|
|
|
|
|
init_audit_db()
|