Files
2026-05-30 14:31:19 -06:00

1049 lines
36 KiB
Python

import sqlite3
import os
import json
from datetime import datetime
from paths import DB_PATH
def get_db_connection():
conn = sqlite3.connect(DB_PATH, timeout=30.0)
conn.row_factory = sqlite3.Row
# Configurar claves foráneas
conn.execute("PRAGMA foreign_keys = ON")
return conn
def init_db():
conn = get_db_connection()
try:
# Tabla accounts
conn.execute("""
CREATE TABLE IF NOT EXISTS accounts (
location_id TEXT PRIMARY KEY,
nombre TEXT NOT NULL,
type TEXT NOT NULL, -- 'brand' o 'branch'
whatsapp_widget TEXT,
company_owner TEXT NOT NULL DEFAULT 'Monte Providencia'
)
""")
# Tabla contacts
conn.execute("""
CREATE TABLE IF NOT EXISTS contacts (
id TEXT NOT NULL,
location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE,
first_name TEXT,
last_name TEXT,
email TEXT,
phone TEXT,
tags TEXT, -- JSON array de tags
custom_fields_json TEXT, -- JSON array de custom fields
date_added TEXT,
updated_at TEXT,
synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
PRIMARY KEY (id, location_id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_contacts_location ON contacts(location_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_contacts_email ON contacts(email)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_contacts_phone ON contacts(phone)")
# Tabla pipelines
conn.execute("""
CREATE TABLE IF NOT EXISTS pipelines (
id TEXT NOT NULL,
location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE,
name TEXT NOT NULL,
stages_json TEXT NOT NULL, -- JSON array con stages [{id, name, order}]
date_added TEXT,
date_updated TEXT,
raw_json TEXT,
synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
PRIMARY KEY (id, location_id)
)
""")
ensure_column(conn, "pipelines", "date_added", "TEXT")
ensure_column(conn, "pipelines", "date_updated", "TEXT")
ensure_column(conn, "pipelines", "raw_json", "TEXT")
conn.execute("CREATE INDEX IF NOT EXISTS idx_pipelines_location ON pipelines(location_id)")
# Tabla opportunities
conn.execute("""
CREATE TABLE IF NOT EXISTS opportunities (
id TEXT NOT NULL,
location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE,
name TEXT,
status TEXT, -- 'open', 'won', 'lost', 'abandoned'
pipeline_id TEXT,
pipeline_stage_id TEXT,
monetary_value REAL,
contact_id TEXT,
date_added TEXT,
custom_fields_json TEXT,
synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
PRIMARY KEY (id, location_id)
)
""")
ensure_column(conn, "opportunities", "custom_fields_json", "TEXT")
conn.execute("CREATE INDEX IF NOT EXISTS idx_opps_location ON opportunities(location_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_opps_pipeline ON opportunities(pipeline_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_opps_contact ON opportunities(contact_id)")
# Tabla object_schemas: cache local de campos por objeto/location.
conn.execute("""
CREATE TABLE IF NOT EXISTS object_schemas (
location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE,
object_key TEXT NOT NULL,
field_id TEXT NOT NULL,
field_name TEXT NOT NULL,
field_key TEXT,
field_type TEXT,
raw_json TEXT,
synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
PRIMARY KEY (location_id, object_key, field_id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_object_schemas_location ON object_schemas(location_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_object_schemas_object ON object_schemas(object_key)")
# Tabla sync_log
conn.execute("""
CREATE TABLE IF NOT EXISTS sync_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
location_id TEXT REFERENCES accounts(location_id) ON DELETE CASCADE,
status TEXT NOT NULL, -- 'running', 'success', 'failed'
contacts_synced INTEGER DEFAULT 0,
opps_synced INTEGER DEFAULT 0,
error_message TEXT,
started_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
finished_at TEXT
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_log_location ON sync_log(location_id)")
# Tabla error_log: diagnóstico técnico persistente de errores de ejecución.
conn.execute("""
CREATE TABLE IF NOT EXISTS error_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
error_id TEXT NOT NULL UNIQUE,
event TEXT NOT NULL,
exception_type TEXT,
exception_message TEXT,
context_json TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime'))
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_error_log_created ON error_log(created_at)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_error_log_event ON error_log(event)")
# Tabla workflows
conn.execute("""
CREATE TABLE IF NOT EXISTS workflows (
id TEXT NOT NULL,
location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE,
name TEXT NOT NULL,
status TEXT NOT NULL, -- 'active', 'inactive', 'draft'
trigger TEXT,
created_at TEXT,
updated_at TEXT,
synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
PRIMARY KEY (id, location_id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_workflows_location ON workflows(location_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_workflows_status ON workflows(status)")
ensure_column(conn, "workflows", "updated_at", "TEXT")
# Migración aditiva: campo 'source' nativo de GHL (web user, integration, etc.).
# Se persiste para el audit de Sucursal vs Form y para distinguir origen del contacto.
ensure_column(conn, "contacts", "source", "TEXT")
# Tabla forms (catálogo de formularios por location)
conn.execute("""
CREATE TABLE IF NOT EXISTS forms (
id TEXT NOT NULL,
location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE,
name TEXT NOT NULL,
raw_json TEXT,
synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
PRIMARY KEY (id, location_id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_forms_location ON forms(location_id)")
# Tabla form_submissions (snapshots de envíos de formularios)
conn.execute("""
CREATE TABLE IF NOT EXISTS form_submissions (
id TEXT NOT NULL,
location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE,
form_id TEXT NOT NULL,
contact_id TEXT,
name TEXT,
email TEXT,
phone TEXT,
sucursal_value TEXT, -- valor literal del campo Sucursal extraído de others
created_at TEXT,
others_json TEXT, -- objeto others completo del submission
raw_json TEXT,
synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')),
PRIMARY KEY (id, location_id)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_submissions_location ON form_submissions(location_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_submissions_form ON form_submissions(form_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_submissions_contact ON form_submissions(contact_id)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_form_submissions_created ON form_submissions(created_at)")
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def ensure_column(conn, table_name, column_name, column_definition):
columns = conn.execute(f"PRAGMA table_info({table_name})").fetchall()
if column_name not in {column[1] for column in columns}:
conn.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_definition}")
# --- OPERACIONES DE ACCOUNTS ---
def save_accounts(accounts_list):
"""
Guarda o actualiza las cuentas (sucursales y marca).
accounts_list: lista de diccionarios con keys: location_id, nombre, type, whatsapp_widget, company_owner
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
for acc in accounts_list:
cursor.execute("""
INSERT INTO accounts (location_id, nombre, type, whatsapp_widget, company_owner)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(location_id) DO UPDATE SET
nombre = excluded.nombre,
type = excluded.type,
whatsapp_widget = excluded.whatsapp_widget,
company_owner = excluded.company_owner
""", (
acc['location_id'],
acc['nombre'],
acc['type'],
acc.get('whatsapp_widget'),
acc.get('company_owner', 'Monte Providencia')
))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def get_accounts():
conn = get_db_connection()
try:
rows = conn.execute("SELECT * FROM accounts ORDER BY type DESC, nombre ASC").fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
def get_account(location_id):
conn = get_db_connection()
try:
row = conn.execute("SELECT * FROM accounts WHERE location_id = ?", (location_id,)).fetchone()
return dict(row) if row else None
finally:
conn.close()
# --- OPERACIONES DE SINCRONIZACIÓN ---
def save_contacts(location_id, contacts_list):
"""
Guarda contactos usando la estrategia DELETE-INSERT dentro de una transacción.
Deduplica en memoria y blinda usando INSERT OR REPLACE para evitar IntegrityErrors.
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute("BEGIN")
# Borrar contactos viejos
cursor.execute("DELETE FROM contacts WHERE location_id = ?", (location_id,))
# Deduplicar lote en memoria por ID
seen = set()
unique_contacts = []
for c in contacts_list:
cid = c.get('id')
if cid and cid not in seen:
seen.add(cid)
unique_contacts.append(c)
# Insertar lote deduplicado
for c in unique_contacts:
tags = json.dumps(c.get('tags', [])) if isinstance(c.get('tags'), list) else c.get('tags', '[]')
custom_fields = json.dumps(c.get('customFields', [])) if isinstance(c.get('customFields'), list) else c.get('customFields', '[]')
cursor.execute("""
INSERT OR REPLACE INTO contacts (id, location_id, first_name, last_name, email, phone, tags, custom_fields_json, date_added, updated_at, source)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
c['id'],
location_id,
c.get('firstName'),
c.get('lastName'),
c.get('email'),
c.get('phone'),
tags,
custom_fields,
c.get('dateAdded'),
c.get('updatedAt'),
c.get('source')
))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def save_pipelines(location_id, pipelines_list):
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute("BEGIN")
cursor.execute("DELETE FROM pipelines WHERE location_id = ?", (location_id,))
seen = set()
unique_pipelines = []
for p in pipelines_list:
pid = p.get('id')
if pid and pid not in seen:
seen.add(pid)
unique_pipelines.append(p)
for p in unique_pipelines:
stages = json.dumps(p.get('stages', [])) if isinstance(p.get('stages'), list) else p.get('stages', '[]')
cursor.execute("""
INSERT OR REPLACE INTO pipelines (id, location_id, name, stages_json, date_added, date_updated, raw_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
p['id'],
location_id,
p['name'],
stages,
p.get('dateAdded') or p.get('createdAt') or p.get('date_added'),
p.get('dateUpdated') or p.get('updatedAt') or p.get('date_updated'),
json.dumps(p, ensure_ascii=False, default=str)
))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def save_opportunities(location_id, opps_list):
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute("BEGIN")
cursor.execute("DELETE FROM opportunities WHERE location_id = ?", (location_id,))
seen = set()
unique_opps = []
for o in opps_list:
oid = o.get('id')
if oid and oid not in seen:
seen.add(oid)
unique_opps.append(o)
for o in unique_opps:
custom_fields = json.dumps(o.get('customFields', [])) if isinstance(o.get('customFields'), list) else o.get('customFields', '[]')
cursor.execute("""
INSERT OR REPLACE INTO opportunities (id, location_id, name, status, pipeline_id, pipeline_stage_id, monetary_value, contact_id, date_added, custom_fields_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
o['id'],
location_id,
o.get('name'),
o.get('status', 'open').lower(),
o.get('pipelineId'),
o.get('pipelineStageId'),
o.get('monetaryValue', 0.0),
o.get('contactId'),
o.get('dateAdded'),
custom_fields
))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def save_single_opportunity(location_id, o):
"""
Guarda o actualiza una única oportunidad en SQLite (UPSERT).
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute("BEGIN")
custom_fields = json.dumps(o.get('customFields', [])) if isinstance(o.get('customFields'), list) else o.get('customFields', '[]')
cursor.execute("""
INSERT OR REPLACE INTO opportunities (id, location_id, name, status, pipeline_id, pipeline_stage_id, monetary_value, contact_id, date_added, custom_fields_json)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
o['id'],
location_id,
o.get('name'),
o.get('status', 'open').lower(),
o.get('pipelineId'),
o.get('pipelineStageId'),
o.get('monetaryValue', 0.0),
o.get('contactId'),
o.get('dateAdded'),
custom_fields
))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def save_single_contact(location_id, c):
"""
Guarda o actualiza un único contacto en SQLite (UPSERT).
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute("BEGIN")
tags = json.dumps(c.get('tags', [])) if isinstance(c.get('tags'), list) else c.get('tags', '[]')
custom_fields = json.dumps(c.get('customFields', [])) if isinstance(c.get('customFields'), list) else c.get('customFields', '[]')
cursor.execute("""
INSERT OR REPLACE INTO contacts (id, location_id, first_name, last_name, email, phone, tags, custom_fields_json, date_added, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""", (
c['id'],
location_id,
c.get('firstName'),
c.get('lastName'),
c.get('email'),
c.get('phone'),
tags,
custom_fields,
c.get('dateAdded'),
c.get('updatedAt')
))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def save_object_schema(location_id, object_key, fields):
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute("BEGIN")
cursor.execute(
"DELETE FROM object_schemas WHERE location_id = ? AND object_key = ?",
(location_id, object_key)
)
seen = set()
for field in fields or []:
field_id = field.get("id") or field.get("fieldId")
field_name = field.get("name") or field.get("label")
if not field_id or not field_name or field_id in seen:
continue
seen.add(field_id)
cursor.execute("""
INSERT OR REPLACE INTO object_schemas
(location_id, object_key, field_id, field_name, field_key, field_type, raw_json)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
location_id,
object_key,
field_id,
field_name,
field.get("key") or field.get("fieldKey"),
field.get("dataType") or field.get("type") or field.get("fieldType"),
json.dumps(field, ensure_ascii=False, default=str),
))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def get_object_schemas(location_id=None, object_key=None):
conn = get_db_connection()
try:
sql = "SELECT * FROM object_schemas WHERE 1=1"
params = []
if location_id:
sql += " AND location_id = ?"
params.append(location_id)
if object_key:
sql += " AND object_key = ?"
params.append(object_key)
sql += " ORDER BY location_id, object_key, field_name"
return [dict(row) for row in conn.execute(sql, params).fetchall()]
finally:
conn.close()
# --- LOGS DE SINCRONIZACIÓN ---
def create_sync_log(location_id):
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute("""
INSERT INTO sync_log (location_id, status)
VALUES (?, 'running')
""", (location_id,))
log_id = cursor.lastrowid
conn.commit()
return log_id
finally:
conn.close()
def update_sync_log(log_id, status, contacts_synced, opps_synced, error_message=None):
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute("""
UPDATE sync_log
SET status = ?,
contacts_synced = ?,
opps_synced = ?,
error_message = ?,
finished_at = datetime('now', 'localtime')
WHERE id = ?
""", (status, contacts_synced, opps_synced, error_message, log_id))
conn.commit()
finally:
conn.close()
def get_sync_logs(limit=20):
conn = get_db_connection()
try:
rows = conn.execute("""
SELECT s.*, a.nombre as account_name
FROM sync_log s
LEFT JOIN accounts a ON s.location_id = a.location_id
ORDER BY s.started_at DESC
LIMIT ?
""", (limit,)).fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
# --- LOGS DE ERRORES ---
def insert_error_log(error_id, event, exception_type=None, exception_message=None, context=None):
conn = get_db_connection()
try:
conn.execute("""
INSERT OR IGNORE INTO error_log
(error_id, event, exception_type, exception_message, context_json)
VALUES (?, ?, ?, ?, ?)
""", (
error_id,
event,
exception_type,
exception_message,
json.dumps(context or {}, ensure_ascii=False, default=str),
))
conn.commit()
finally:
conn.close()
def get_error_logs(limit=50):
conn = get_db_connection()
try:
rows = conn.execute("""
SELECT *
FROM error_log
ORDER BY created_at DESC, id DESC
LIMIT ?
""", (limit,)).fetchall()
results = []
for row in rows:
item = dict(row)
item["context"] = json.loads(item.pop("context_json") or "{}")
results.append(item)
return results
finally:
conn.close()
# --- CONSULTAS Y MÉTRICAS ---
def get_account_metrics(location_id):
conn = get_db_connection()
try:
# Conteo de contactos
contact_count = conn.execute(
"SELECT COUNT(*) FROM contacts WHERE location_id = ?", (location_id,)
).fetchone()[0]
# Conteo de contactos sin oportunidad asociada en esta misma location.
# Sirve para señalizar discrepancia en el Master de Sucursales.
contacts_without_opp_count = conn.execute("""
SELECT COUNT(*) FROM contacts c
WHERE c.location_id = ?
AND NOT EXISTS (
SELECT 1 FROM opportunities o
WHERE o.location_id = c.location_id AND o.contact_id = c.id
)
""", (location_id,)).fetchone()[0]
# Conteo de oportunidades por estado y valor monetario
opps_rows = conn.execute("""
SELECT status, COUNT(*) as count, SUM(monetary_value) as total_val
FROM opportunities
WHERE location_id = ?
GROUP BY status
""", (location_id,)).fetchall()
opps_by_status = {
"open": {"count": 0, "value": 0.0},
"won": {"count": 0, "value": 0.0},
"lost": {"count": 0, "value": 0.0},
"abandoned": {"count": 0, "value": 0.0}
}
total_opps = 0
total_value = 0.0
for row in opps_rows:
stat = row['status'].lower() if row['status'] else 'open'
if stat in opps_by_status:
opps_by_status[stat] = {
"count": row['count'],
"value": row['total_val'] or 0.0
}
total_opps += row['count']
total_value += row['total_val'] or 0.0
# Conteo de pipelines
pipeline_count = conn.execute(
"SELECT COUNT(*) FROM pipelines WHERE location_id = ?", (location_id,)
).fetchone()[0]
# Último sync log
last_sync = conn.execute("""
SELECT * FROM sync_log
WHERE location_id = ?
ORDER BY started_at DESC
LIMIT 1
""", (location_id,)).fetchone()
return {
"contacts_count": contact_count,
"contacts_without_opp_count": contacts_without_opp_count,
"opps_count": total_opps,
"opps_value": total_value,
"opps_by_status": opps_by_status,
"pipelines_count": pipeline_count,
"last_sync": dict(last_sync) if last_sync else None
}
finally:
conn.close()
def get_global_metrics():
conn = get_db_connection()
try:
contact_count = conn.execute("SELECT COUNT(*) FROM contacts").fetchone()[0]
opps_count = conn.execute("SELECT COUNT(*) FROM opportunities").fetchone()[0]
opps_value = conn.execute("SELECT SUM(monetary_value) FROM opportunities").fetchone()[0] or 0.0
branch_count = conn.execute("SELECT COUNT(*) FROM accounts WHERE type = 'branch'").fetchone()[0]
# Sincronizaciones activas / fallidas hoy
running_syncs = conn.execute(
"SELECT COUNT(*) FROM sync_log WHERE status = 'running'"
).fetchone()[0]
failed_syncs_today = conn.execute("""
SELECT COUNT(*) FROM sync_log
WHERE status = 'failed' AND date(started_at) = date('now', 'localtime')
""").fetchone()[0]
return {
"total_contacts": contact_count,
"total_opportunities": opps_count,
"total_value": opps_value,
"branch_count": branch_count,
"running_syncs": running_syncs,
"failed_syncs_today": failed_syncs_today
}
finally:
conn.close()
# --- CONSULTAS DE CONTACTOS Y OPORTUNIDADES ---
def get_contacts(location_id, search_query=None, limit=100, offset=0, without_opp=False):
conn = get_db_connection()
try:
sql = "SELECT * FROM contacts WHERE location_id = ?"
params = [location_id]
if search_query:
sql += " AND (first_name LIKE ? OR last_name LIKE ? OR email LIKE ? OR phone LIKE ?)"
q = f"%{search_query}%"
params.extend([q, q, q, q])
# Filtro "sin oportunidad": solo contactos sin opp asociada en esta location.
if without_opp:
sql += (
" AND NOT EXISTS ("
" SELECT 1 FROM opportunities o"
" WHERE o.location_id = contacts.location_id AND o.contact_id = contacts.id"
" )"
)
sql += " ORDER BY date_added DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
rows = conn.execute(sql, params).fetchall()
contacts = []
for r in rows:
d = dict(r)
d['tags'] = json.loads(d['tags']) if d['tags'] else []
d['custom_fields'] = json.loads(d['custom_fields_json']) if d['custom_fields_json'] else []
contacts.append(d)
return contacts
finally:
conn.close()
def get_contact_by_id(location_id, contact_id):
"""Retorna un contacto puntual desde SQLite con tags y custom_fields parseados."""
conn = get_db_connection()
try:
row = conn.execute(
"SELECT * FROM contacts WHERE location_id = ? AND id = ?",
(location_id, contact_id),
).fetchone()
if not row:
return None
d = dict(row)
d['tags'] = json.loads(d['tags']) if d['tags'] else []
d['custom_fields'] = json.loads(d['custom_fields_json']) if d['custom_fields_json'] else []
return d
finally:
conn.close()
def contact_has_opportunity(location_id, contact_id):
"""True si el contacto tiene al menos una oportunidad cacheada en esa location."""
conn = get_db_connection()
try:
row = conn.execute(
"SELECT 1 FROM opportunities WHERE location_id = ? AND contact_id = ? LIMIT 1",
(location_id, contact_id),
).fetchone()
return bool(row)
finally:
conn.close()
def get_pipelines(location_id):
conn = get_db_connection()
try:
rows = conn.execute("SELECT * FROM pipelines WHERE location_id = ?", (location_id,)).fetchall()
pipelines = []
for r in rows:
d = dict(r)
d['stages'] = json.loads(d['stages_json']) if d['stages_json'] else []
pipelines.append(d)
return pipelines
finally:
conn.close()
def get_opportunities(location_id, pipeline_id=None):
conn = get_db_connection()
try:
sql = "SELECT o.*, c.first_name, c.last_name, c.email, c.phone FROM opportunities o LEFT JOIN contacts c ON o.contact_id = c.id AND o.location_id = c.location_id WHERE o.location_id = ?"
params = [location_id]
if pipeline_id:
sql += " AND o.pipeline_id = ?"
params.append(pipeline_id)
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def save_workflows(location_id, workflows_list):
"""
Guarda o actualiza la lista de workflows para una ubicación.
"""
conn = get_db_connection()
try:
cursor = conn.cursor()
cursor.execute("BEGIN")
# Borrar workflows viejos para esta ubicación
cursor.execute("DELETE FROM workflows WHERE location_id = ?", (location_id,))
for wf in workflows_list:
cursor.execute("""
INSERT INTO workflows (id, location_id, name, status, trigger, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (
wf.get('id'),
location_id,
wf.get('name'),
wf.get('status', 'draft'),
wf.get('trigger'),
wf.get('createdAt'),
wf.get('updatedAt')
))
conn.commit()
except Exception as e:
conn.rollback()
raise e
finally:
conn.close()
def get_workflows(location_id=None):
"""
Retorna todos los workflows o los de una ubicación específica.
"""
conn = get_db_connection()
try:
if location_id:
rows = conn.execute("""
SELECT w.*, a.nombre as account_name
FROM workflows w
JOIN accounts a ON w.location_id = a.location_id
WHERE w.location_id = ?
ORDER BY w.name ASC
""", (location_id,)).fetchall()
else:
rows = conn.execute("""
SELECT w.*, a.nombre as account_name
FROM workflows w
JOIN accounts a ON w.location_id = a.location_id
ORDER BY a.nombre ASC, w.name ASC
""", ()).fetchall()
return [dict(row) for row in rows]
finally:
conn.close()
# --- OPERACIONES DE FORMS / FORM SUBMISSIONS ---
def save_forms(location_id, forms_list):
"""
Reemplaza por location los formularios catalogados desde GHL.
forms_list: [{id, name, locationId, ...}]
"""
conn = get_db_connection()
try:
cur = conn.cursor()
cur.execute("BEGIN")
cur.execute("DELETE FROM forms WHERE location_id = ?", (location_id,))
for f in forms_list:
fid = f.get("id")
if not fid:
continue
cur.execute("""
INSERT INTO forms (id, location_id, name, raw_json)
VALUES (?, ?, ?, ?)
""", (
fid,
location_id,
f.get("name") or "",
json.dumps(f, ensure_ascii=False),
))
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
def _extract_sucursal_from_others(others, field_ids):
"""Prueba cada field_id en orden hasta encontrar uno con valor poblado.
Soporta strings y listas. Devuelve string limpio o None.
field_ids puede ser un str (compat con la version vieja) o lista de strs.
"""
if not others or not field_ids:
return None
if isinstance(field_ids, str):
field_ids = [field_ids]
for fid in field_ids:
if not fid or not isinstance(others, dict):
continue
raw = others.get(fid)
if raw is None:
continue
if isinstance(raw, list):
val = ", ".join(str(x) for x in raw if x is not None).strip()
if val:
return val
elif isinstance(raw, str):
val = raw.strip()
if val:
return val
else:
val = str(raw).strip()
if val:
return val
return None
def upsert_form_submissions(location_id, submissions, sucursal_field_id=None):
"""
Upsert (INSERT OR REPLACE) de submissions. NO borra históricos: el endpoint
de GHL no garantiza orden temporal y necesitamos acumular.
submissions: lista de dicts crudos del endpoint GHL.
sucursal_field_id: field_id (str) o lista de field_ids en orden de prioridad
para extraer el value del campo Sucursal desde submission['others'].
Soportar lista permite tolerar versiones historicas del formulario que
usaban otro field_id.
"""
if not submissions:
return 0
conn = get_db_connection()
try:
cur = conn.cursor()
cur.execute("BEGIN")
n = 0
for s in submissions:
sid = s.get("id")
if not sid:
continue
others = s.get("others") or {}
sucursal_value = _extract_sucursal_from_others(others, sucursal_field_id)
cur.execute("""
INSERT OR REPLACE INTO form_submissions
(id, location_id, form_id, contact_id, name, email, phone,
sucursal_value, created_at, others_json, raw_json, synced_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now', 'localtime'))
""", (
sid,
location_id,
s.get("formId"),
s.get("contactId"),
s.get("name"),
s.get("email"),
(others.get("phone") if isinstance(others, dict) else None) or s.get("phone"),
sucursal_value,
s.get("createdAt"),
json.dumps(others, ensure_ascii=False) if others else None,
json.dumps(s, ensure_ascii=False),
))
n += 1
conn.commit()
return n
except Exception:
conn.rollback()
raise
finally:
conn.close()
def reextract_sucursal_values(location_id, sucursal_field_ids):
"""Recorre los form_submissions guardados y rellena sucursal_value usando la
lista de field_ids candidatos (en orden de prioridad). Util cuando se descubre
un field_id historico nuevo: re-procesa sin pegarle de nuevo a GHL.
Devuelve (updated, total) — cuantos quedaron poblados y cuantos hay totales.
"""
conn = get_db_connection()
try:
rows = conn.execute(
"SELECT id, others_json FROM form_submissions WHERE location_id = ?",
(location_id,)
).fetchall()
updated = 0
cur = conn.cursor()
cur.execute("BEGIN")
for r in rows:
try:
others = json.loads(r["others_json"] or "{}")
except Exception:
continue
val = _extract_sucursal_from_others(others, sucursal_field_ids)
cur.execute(
"UPDATE form_submissions SET sucursal_value = ? WHERE id = ? AND location_id = ?",
(val, r["id"], location_id),
)
if val:
updated += 1
conn.commit()
return updated, len(rows)
except Exception:
conn.rollback()
raise
finally:
conn.close()
def get_forms(location_id=None):
conn = get_db_connection()
try:
if location_id:
rows = conn.execute(
"SELECT * FROM forms WHERE location_id = ? ORDER BY name",
(location_id,)
).fetchall()
else:
rows = conn.execute("SELECT * FROM forms ORDER BY location_id, name").fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def get_form_submissions(location_id=None, form_id=None, with_sucursal_only=False):
conn = get_db_connection()
try:
sql = "SELECT * FROM form_submissions WHERE 1=1"
params = []
if location_id:
sql += " AND location_id = ?"
params.append(location_id)
if form_id:
sql += " AND form_id = ?"
params.append(form_id)
if with_sucursal_only:
sql += " AND sucursal_value IS NOT NULL AND TRIM(sucursal_value) != ''"
sql += " ORDER BY created_at DESC"
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows]
finally:
conn.close()
def count_form_submissions(location_id=None):
conn = get_db_connection()
try:
if location_id:
row = conn.execute(
"SELECT COUNT(*) AS n FROM form_submissions WHERE location_id = ?",
(location_id,)
).fetchone()
else:
row = conn.execute("SELECT COUNT(*) AS n FROM form_submissions").fetchone()
return row["n"] if row else 0
finally:
conn.close()
# Inicializar Base de Datos al importar
init_db()