import sqlite3 import os import json from datetime import datetime from paths import DB_PATH def get_db_connection(): conn = sqlite3.connect(DB_PATH, timeout=30.0) conn.row_factory = sqlite3.Row # Configurar claves foráneas conn.execute("PRAGMA foreign_keys = ON") return conn def init_db(): conn = get_db_connection() try: # Tabla accounts conn.execute(""" CREATE TABLE IF NOT EXISTS accounts ( location_id TEXT PRIMARY KEY, nombre TEXT NOT NULL, type TEXT NOT NULL, -- 'brand' o 'branch' whatsapp_widget TEXT, company_owner TEXT NOT NULL DEFAULT 'Monte Providencia' ) """) # Tabla contacts conn.execute(""" CREATE TABLE IF NOT EXISTS contacts ( id TEXT NOT NULL, location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE, first_name TEXT, last_name TEXT, email TEXT, phone TEXT, tags TEXT, -- JSON array de tags custom_fields_json TEXT, -- JSON array de custom fields date_added TEXT, updated_at TEXT, synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), PRIMARY KEY (id, location_id) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_contacts_location ON contacts(location_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_contacts_email ON contacts(email)") conn.execute("CREATE INDEX IF NOT EXISTS idx_contacts_phone ON contacts(phone)") # Tabla pipelines conn.execute(""" CREATE TABLE IF NOT EXISTS pipelines ( id TEXT NOT NULL, location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE, name TEXT NOT NULL, stages_json TEXT NOT NULL, -- JSON array con stages [{id, name, order}] date_added TEXT, date_updated TEXT, raw_json TEXT, synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), PRIMARY KEY (id, location_id) ) """) ensure_column(conn, "pipelines", "date_added", "TEXT") ensure_column(conn, "pipelines", "date_updated", "TEXT") ensure_column(conn, "pipelines", "raw_json", "TEXT") conn.execute("CREATE INDEX IF NOT EXISTS idx_pipelines_location ON pipelines(location_id)") # Tabla opportunities conn.execute(""" CREATE TABLE IF NOT EXISTS opportunities ( id TEXT NOT NULL, location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE, name TEXT, status TEXT, -- 'open', 'won', 'lost', 'abandoned' pipeline_id TEXT, pipeline_stage_id TEXT, monetary_value REAL, contact_id TEXT, date_added TEXT, custom_fields_json TEXT, synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), PRIMARY KEY (id, location_id) ) """) ensure_column(conn, "opportunities", "custom_fields_json", "TEXT") conn.execute("CREATE INDEX IF NOT EXISTS idx_opps_location ON opportunities(location_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_opps_pipeline ON opportunities(pipeline_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_opps_contact ON opportunities(contact_id)") # Tabla object_schemas: cache local de campos por objeto/location. conn.execute(""" CREATE TABLE IF NOT EXISTS object_schemas ( location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE, object_key TEXT NOT NULL, field_id TEXT NOT NULL, field_name TEXT NOT NULL, field_key TEXT, field_type TEXT, raw_json TEXT, synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), PRIMARY KEY (location_id, object_key, field_id) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_object_schemas_location ON object_schemas(location_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_object_schemas_object ON object_schemas(object_key)") # Tabla sync_log conn.execute(""" CREATE TABLE IF NOT EXISTS sync_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, location_id TEXT REFERENCES accounts(location_id) ON DELETE CASCADE, status TEXT NOT NULL, -- 'running', 'success', 'failed' contacts_synced INTEGER DEFAULT 0, opps_synced INTEGER DEFAULT 0, error_message TEXT, started_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), finished_at TEXT ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_sync_log_location ON sync_log(location_id)") # Tabla error_log: diagnóstico técnico persistente de errores de ejecución. conn.execute(""" CREATE TABLE IF NOT EXISTS error_log ( id INTEGER PRIMARY KEY AUTOINCREMENT, error_id TEXT NOT NULL UNIQUE, event TEXT NOT NULL, exception_type TEXT, exception_message TEXT, context_json TEXT NOT NULL, created_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_error_log_created ON error_log(created_at)") conn.execute("CREATE INDEX IF NOT EXISTS idx_error_log_event ON error_log(event)") # Tabla workflows conn.execute(""" CREATE TABLE IF NOT EXISTS workflows ( id TEXT NOT NULL, location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE, name TEXT NOT NULL, status TEXT NOT NULL, -- 'active', 'inactive', 'draft' trigger TEXT, created_at TEXT, updated_at TEXT, synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), PRIMARY KEY (id, location_id) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_workflows_location ON workflows(location_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_workflows_status ON workflows(status)") ensure_column(conn, "workflows", "updated_at", "TEXT") # Migración aditiva: campo 'source' nativo de GHL (web user, integration, etc.). # Se persiste para el audit de Sucursal vs Form y para distinguir origen del contacto. ensure_column(conn, "contacts", "source", "TEXT") # Tabla forms (catálogo de formularios por location) conn.execute(""" CREATE TABLE IF NOT EXISTS forms ( id TEXT NOT NULL, location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE, name TEXT NOT NULL, raw_json TEXT, synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), PRIMARY KEY (id, location_id) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_forms_location ON forms(location_id)") # Tabla form_submissions (snapshots de envíos de formularios) conn.execute(""" CREATE TABLE IF NOT EXISTS form_submissions ( id TEXT NOT NULL, location_id TEXT NOT NULL REFERENCES accounts(location_id) ON DELETE CASCADE, form_id TEXT NOT NULL, contact_id TEXT, name TEXT, email TEXT, phone TEXT, sucursal_value TEXT, -- valor literal del campo Sucursal extraído de others created_at TEXT, others_json TEXT, -- objeto others completo del submission raw_json TEXT, synced_at TEXT NOT NULL DEFAULT (datetime('now', 'localtime')), PRIMARY KEY (id, location_id) ) """) conn.execute("CREATE INDEX IF NOT EXISTS idx_form_submissions_location ON form_submissions(location_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_form_submissions_form ON form_submissions(form_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_form_submissions_contact ON form_submissions(contact_id)") conn.execute("CREATE INDEX IF NOT EXISTS idx_form_submissions_created ON form_submissions(created_at)") conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def ensure_column(conn, table_name, column_name, column_definition): columns = conn.execute(f"PRAGMA table_info({table_name})").fetchall() if column_name not in {column[1] for column in columns}: conn.execute(f"ALTER TABLE {table_name} ADD COLUMN {column_name} {column_definition}") # --- OPERACIONES DE ACCOUNTS --- def save_accounts(accounts_list): """ Guarda o actualiza las cuentas (sucursales y marca). accounts_list: lista de diccionarios con keys: location_id, nombre, type, whatsapp_widget, company_owner """ conn = get_db_connection() try: cursor = conn.cursor() for acc in accounts_list: cursor.execute(""" INSERT INTO accounts (location_id, nombre, type, whatsapp_widget, company_owner) VALUES (?, ?, ?, ?, ?) ON CONFLICT(location_id) DO UPDATE SET nombre = excluded.nombre, type = excluded.type, whatsapp_widget = excluded.whatsapp_widget, company_owner = excluded.company_owner """, ( acc['location_id'], acc['nombre'], acc['type'], acc.get('whatsapp_widget'), acc.get('company_owner', 'Monte Providencia') )) conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def get_accounts(): conn = get_db_connection() try: rows = conn.execute("SELECT * FROM accounts ORDER BY type DESC, nombre ASC").fetchall() return [dict(row) for row in rows] finally: conn.close() def get_account(location_id): conn = get_db_connection() try: row = conn.execute("SELECT * FROM accounts WHERE location_id = ?", (location_id,)).fetchone() return dict(row) if row else None finally: conn.close() # --- OPERACIONES DE SINCRONIZACIÓN --- def save_contacts(location_id, contacts_list): """ Guarda contactos usando la estrategia DELETE-INSERT dentro de una transacción. Deduplica en memoria y blinda usando INSERT OR REPLACE para evitar IntegrityErrors. """ conn = get_db_connection() try: cursor = conn.cursor() cursor.execute("BEGIN") # Borrar contactos viejos cursor.execute("DELETE FROM contacts WHERE location_id = ?", (location_id,)) # Deduplicar lote en memoria por ID seen = set() unique_contacts = [] for c in contacts_list: cid = c.get('id') if cid and cid not in seen: seen.add(cid) unique_contacts.append(c) # Insertar lote deduplicado for c in unique_contacts: tags = json.dumps(c.get('tags', [])) if isinstance(c.get('tags'), list) else c.get('tags', '[]') custom_fields = json.dumps(c.get('customFields', [])) if isinstance(c.get('customFields'), list) else c.get('customFields', '[]') cursor.execute(""" INSERT OR REPLACE INTO contacts (id, location_id, first_name, last_name, email, phone, tags, custom_fields_json, date_added, updated_at, source) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( c['id'], location_id, c.get('firstName'), c.get('lastName'), c.get('email'), c.get('phone'), tags, custom_fields, c.get('dateAdded'), c.get('updatedAt'), c.get('source') )) conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def save_pipelines(location_id, pipelines_list): conn = get_db_connection() try: cursor = conn.cursor() cursor.execute("BEGIN") cursor.execute("DELETE FROM pipelines WHERE location_id = ?", (location_id,)) seen = set() unique_pipelines = [] for p in pipelines_list: pid = p.get('id') if pid and pid not in seen: seen.add(pid) unique_pipelines.append(p) for p in unique_pipelines: stages = json.dumps(p.get('stages', [])) if isinstance(p.get('stages'), list) else p.get('stages', '[]') cursor.execute(""" INSERT OR REPLACE INTO pipelines (id, location_id, name, stages_json, date_added, date_updated, raw_json) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( p['id'], location_id, p['name'], stages, p.get('dateAdded') or p.get('createdAt') or p.get('date_added'), p.get('dateUpdated') or p.get('updatedAt') or p.get('date_updated'), json.dumps(p, ensure_ascii=False, default=str) )) conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def save_opportunities(location_id, opps_list): conn = get_db_connection() try: cursor = conn.cursor() cursor.execute("BEGIN") cursor.execute("DELETE FROM opportunities WHERE location_id = ?", (location_id,)) seen = set() unique_opps = [] for o in opps_list: oid = o.get('id') if oid and oid not in seen: seen.add(oid) unique_opps.append(o) for o in unique_opps: custom_fields = json.dumps(o.get('customFields', [])) if isinstance(o.get('customFields'), list) else o.get('customFields', '[]') cursor.execute(""" INSERT OR REPLACE INTO opportunities (id, location_id, name, status, pipeline_id, pipeline_stage_id, monetary_value, contact_id, date_added, custom_fields_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( o['id'], location_id, o.get('name'), o.get('status', 'open').lower(), o.get('pipelineId'), o.get('pipelineStageId'), o.get('monetaryValue', 0.0), o.get('contactId'), o.get('dateAdded'), custom_fields )) conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def save_single_opportunity(location_id, o): """ Guarda o actualiza una única oportunidad en SQLite (UPSERT). """ conn = get_db_connection() try: cursor = conn.cursor() cursor.execute("BEGIN") custom_fields = json.dumps(o.get('customFields', [])) if isinstance(o.get('customFields'), list) else o.get('customFields', '[]') cursor.execute(""" INSERT OR REPLACE INTO opportunities (id, location_id, name, status, pipeline_id, pipeline_stage_id, monetary_value, contact_id, date_added, custom_fields_json) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( o['id'], location_id, o.get('name'), o.get('status', 'open').lower(), o.get('pipelineId'), o.get('pipelineStageId'), o.get('monetaryValue', 0.0), o.get('contactId'), o.get('dateAdded'), custom_fields )) conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def save_single_contact(location_id, c): """ Guarda o actualiza un único contacto en SQLite (UPSERT). """ conn = get_db_connection() try: cursor = conn.cursor() cursor.execute("BEGIN") tags = json.dumps(c.get('tags', [])) if isinstance(c.get('tags'), list) else c.get('tags', '[]') custom_fields = json.dumps(c.get('customFields', [])) if isinstance(c.get('customFields'), list) else c.get('customFields', '[]') cursor.execute(""" INSERT OR REPLACE INTO contacts (id, location_id, first_name, last_name, email, phone, tags, custom_fields_json, date_added, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( c['id'], location_id, c.get('firstName'), c.get('lastName'), c.get('email'), c.get('phone'), tags, custom_fields, c.get('dateAdded'), c.get('updatedAt') )) conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def save_object_schema(location_id, object_key, fields): conn = get_db_connection() try: cursor = conn.cursor() cursor.execute("BEGIN") cursor.execute( "DELETE FROM object_schemas WHERE location_id = ? AND object_key = ?", (location_id, object_key) ) seen = set() for field in fields or []: field_id = field.get("id") or field.get("fieldId") field_name = field.get("name") or field.get("label") if not field_id or not field_name or field_id in seen: continue seen.add(field_id) cursor.execute(""" INSERT OR REPLACE INTO object_schemas (location_id, object_key, field_id, field_name, field_key, field_type, raw_json) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( location_id, object_key, field_id, field_name, field.get("key") or field.get("fieldKey"), field.get("dataType") or field.get("type") or field.get("fieldType"), json.dumps(field, ensure_ascii=False, default=str), )) conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def get_object_schemas(location_id=None, object_key=None): conn = get_db_connection() try: sql = "SELECT * FROM object_schemas WHERE 1=1" params = [] if location_id: sql += " AND location_id = ?" params.append(location_id) if object_key: sql += " AND object_key = ?" params.append(object_key) sql += " ORDER BY location_id, object_key, field_name" return [dict(row) for row in conn.execute(sql, params).fetchall()] finally: conn.close() # --- LOGS DE SINCRONIZACIÓN --- def create_sync_log(location_id): conn = get_db_connection() try: cursor = conn.cursor() cursor.execute(""" INSERT INTO sync_log (location_id, status) VALUES (?, 'running') """, (location_id,)) log_id = cursor.lastrowid conn.commit() return log_id finally: conn.close() def update_sync_log(log_id, status, contacts_synced, opps_synced, error_message=None): conn = get_db_connection() try: cursor = conn.cursor() cursor.execute(""" UPDATE sync_log SET status = ?, contacts_synced = ?, opps_synced = ?, error_message = ?, finished_at = datetime('now', 'localtime') WHERE id = ? """, (status, contacts_synced, opps_synced, error_message, log_id)) conn.commit() finally: conn.close() def get_sync_logs(limit=20): conn = get_db_connection() try: rows = conn.execute(""" SELECT s.*, a.nombre as account_name FROM sync_log s LEFT JOIN accounts a ON s.location_id = a.location_id ORDER BY s.started_at DESC LIMIT ? """, (limit,)).fetchall() return [dict(row) for row in rows] finally: conn.close() # --- LOGS DE ERRORES --- def insert_error_log(error_id, event, exception_type=None, exception_message=None, context=None): conn = get_db_connection() try: conn.execute(""" INSERT OR IGNORE INTO error_log (error_id, event, exception_type, exception_message, context_json) VALUES (?, ?, ?, ?, ?) """, ( error_id, event, exception_type, exception_message, json.dumps(context or {}, ensure_ascii=False, default=str), )) conn.commit() finally: conn.close() def get_error_logs(limit=50): conn = get_db_connection() try: rows = conn.execute(""" SELECT * FROM error_log ORDER BY created_at DESC, id DESC LIMIT ? """, (limit,)).fetchall() results = [] for row in rows: item = dict(row) item["context"] = json.loads(item.pop("context_json") or "{}") results.append(item) return results finally: conn.close() # --- CONSULTAS Y MÉTRICAS --- def get_account_metrics(location_id): conn = get_db_connection() try: # Conteo de contactos contact_count = conn.execute( "SELECT COUNT(*) FROM contacts WHERE location_id = ?", (location_id,) ).fetchone()[0] # Conteo de contactos sin oportunidad asociada en esta misma location. # Sirve para señalizar discrepancia en el Master de Sucursales. contacts_without_opp_count = conn.execute(""" SELECT COUNT(*) FROM contacts c WHERE c.location_id = ? AND NOT EXISTS ( SELECT 1 FROM opportunities o WHERE o.location_id = c.location_id AND o.contact_id = c.id ) """, (location_id,)).fetchone()[0] # Conteo de oportunidades por estado y valor monetario opps_rows = conn.execute(""" SELECT status, COUNT(*) as count, SUM(monetary_value) as total_val FROM opportunities WHERE location_id = ? GROUP BY status """, (location_id,)).fetchall() opps_by_status = { "open": {"count": 0, "value": 0.0}, "won": {"count": 0, "value": 0.0}, "lost": {"count": 0, "value": 0.0}, "abandoned": {"count": 0, "value": 0.0} } total_opps = 0 total_value = 0.0 for row in opps_rows: stat = row['status'].lower() if row['status'] else 'open' if stat in opps_by_status: opps_by_status[stat] = { "count": row['count'], "value": row['total_val'] or 0.0 } total_opps += row['count'] total_value += row['total_val'] or 0.0 # Conteo de pipelines pipeline_count = conn.execute( "SELECT COUNT(*) FROM pipelines WHERE location_id = ?", (location_id,) ).fetchone()[0] # Último sync log last_sync = conn.execute(""" SELECT * FROM sync_log WHERE location_id = ? ORDER BY started_at DESC LIMIT 1 """, (location_id,)).fetchone() return { "contacts_count": contact_count, "contacts_without_opp_count": contacts_without_opp_count, "opps_count": total_opps, "opps_value": total_value, "opps_by_status": opps_by_status, "pipelines_count": pipeline_count, "last_sync": dict(last_sync) if last_sync else None } finally: conn.close() def get_global_metrics(): conn = get_db_connection() try: contact_count = conn.execute("SELECT COUNT(*) FROM contacts").fetchone()[0] opps_count = conn.execute("SELECT COUNT(*) FROM opportunities").fetchone()[0] opps_value = conn.execute("SELECT SUM(monetary_value) FROM opportunities").fetchone()[0] or 0.0 branch_count = conn.execute("SELECT COUNT(*) FROM accounts WHERE type = 'branch'").fetchone()[0] # Sincronizaciones activas / fallidas hoy running_syncs = conn.execute( "SELECT COUNT(*) FROM sync_log WHERE status = 'running'" ).fetchone()[0] failed_syncs_today = conn.execute(""" SELECT COUNT(*) FROM sync_log WHERE status = 'failed' AND date(started_at) = date('now', 'localtime') """).fetchone()[0] return { "total_contacts": contact_count, "total_opportunities": opps_count, "total_value": opps_value, "branch_count": branch_count, "running_syncs": running_syncs, "failed_syncs_today": failed_syncs_today } finally: conn.close() # --- CONSULTAS DE CONTACTOS Y OPORTUNIDADES --- def get_contacts(location_id, search_query=None, limit=100, offset=0, without_opp=False): conn = get_db_connection() try: sql = "SELECT * FROM contacts WHERE location_id = ?" params = [location_id] if search_query: sql += " AND (first_name LIKE ? OR last_name LIKE ? OR email LIKE ? OR phone LIKE ?)" q = f"%{search_query}%" params.extend([q, q, q, q]) # Filtro "sin oportunidad": solo contactos sin opp asociada en esta location. if without_opp: sql += ( " AND NOT EXISTS (" " SELECT 1 FROM opportunities o" " WHERE o.location_id = contacts.location_id AND o.contact_id = contacts.id" " )" ) sql += " ORDER BY date_added DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) rows = conn.execute(sql, params).fetchall() contacts = [] for r in rows: d = dict(r) d['tags'] = json.loads(d['tags']) if d['tags'] else [] d['custom_fields'] = json.loads(d['custom_fields_json']) if d['custom_fields_json'] else [] contacts.append(d) return contacts finally: conn.close() def get_contact_by_id(location_id, contact_id): """Retorna un contacto puntual desde SQLite con tags y custom_fields parseados.""" conn = get_db_connection() try: row = conn.execute( "SELECT * FROM contacts WHERE location_id = ? AND id = ?", (location_id, contact_id), ).fetchone() if not row: return None d = dict(row) d['tags'] = json.loads(d['tags']) if d['tags'] else [] d['custom_fields'] = json.loads(d['custom_fields_json']) if d['custom_fields_json'] else [] return d finally: conn.close() def contact_has_opportunity(location_id, contact_id): """True si el contacto tiene al menos una oportunidad cacheada en esa location.""" conn = get_db_connection() try: row = conn.execute( "SELECT 1 FROM opportunities WHERE location_id = ? AND contact_id = ? LIMIT 1", (location_id, contact_id), ).fetchone() return bool(row) finally: conn.close() def get_pipelines(location_id): conn = get_db_connection() try: rows = conn.execute("SELECT * FROM pipelines WHERE location_id = ?", (location_id,)).fetchall() pipelines = [] for r in rows: d = dict(r) d['stages'] = json.loads(d['stages_json']) if d['stages_json'] else [] pipelines.append(d) return pipelines finally: conn.close() def get_opportunities(location_id, pipeline_id=None): conn = get_db_connection() try: sql = "SELECT o.*, c.first_name, c.last_name, c.email, c.phone FROM opportunities o LEFT JOIN contacts c ON o.contact_id = c.id AND o.location_id = c.location_id WHERE o.location_id = ?" params = [location_id] if pipeline_id: sql += " AND o.pipeline_id = ?" params.append(pipeline_id) rows = conn.execute(sql, params).fetchall() return [dict(r) for r in rows] finally: conn.close() def save_workflows(location_id, workflows_list): """ Guarda o actualiza la lista de workflows para una ubicación. """ conn = get_db_connection() try: cursor = conn.cursor() cursor.execute("BEGIN") # Borrar workflows viejos para esta ubicación cursor.execute("DELETE FROM workflows WHERE location_id = ?", (location_id,)) for wf in workflows_list: cursor.execute(""" INSERT INTO workflows (id, location_id, name, status, trigger, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( wf.get('id'), location_id, wf.get('name'), wf.get('status', 'draft'), wf.get('trigger'), wf.get('createdAt'), wf.get('updatedAt') )) conn.commit() except Exception as e: conn.rollback() raise e finally: conn.close() def get_workflows(location_id=None): """ Retorna todos los workflows o los de una ubicación específica. """ conn = get_db_connection() try: if location_id: rows = conn.execute(""" SELECT w.*, a.nombre as account_name FROM workflows w JOIN accounts a ON w.location_id = a.location_id WHERE w.location_id = ? ORDER BY w.name ASC """, (location_id,)).fetchall() else: rows = conn.execute(""" SELECT w.*, a.nombre as account_name FROM workflows w JOIN accounts a ON w.location_id = a.location_id ORDER BY a.nombre ASC, w.name ASC """, ()).fetchall() return [dict(row) for row in rows] finally: conn.close() # --- OPERACIONES DE FORMS / FORM SUBMISSIONS --- def save_forms(location_id, forms_list): """ Reemplaza por location los formularios catalogados desde GHL. forms_list: [{id, name, locationId, ...}] """ conn = get_db_connection() try: cur = conn.cursor() cur.execute("BEGIN") cur.execute("DELETE FROM forms WHERE location_id = ?", (location_id,)) for f in forms_list: fid = f.get("id") if not fid: continue cur.execute(""" INSERT INTO forms (id, location_id, name, raw_json) VALUES (?, ?, ?, ?) """, ( fid, location_id, f.get("name") or "", json.dumps(f, ensure_ascii=False), )) conn.commit() except Exception: conn.rollback() raise finally: conn.close() def _extract_sucursal_from_others(others, field_ids): """Prueba cada field_id en orden hasta encontrar uno con valor poblado. Soporta strings y listas. Devuelve string limpio o None. field_ids puede ser un str (compat con la version vieja) o lista de strs. """ if not others or not field_ids: return None if isinstance(field_ids, str): field_ids = [field_ids] for fid in field_ids: if not fid or not isinstance(others, dict): continue raw = others.get(fid) if raw is None: continue if isinstance(raw, list): val = ", ".join(str(x) for x in raw if x is not None).strip() if val: return val elif isinstance(raw, str): val = raw.strip() if val: return val else: val = str(raw).strip() if val: return val return None def upsert_form_submissions(location_id, submissions, sucursal_field_id=None): """ Upsert (INSERT OR REPLACE) de submissions. NO borra históricos: el endpoint de GHL no garantiza orden temporal y necesitamos acumular. submissions: lista de dicts crudos del endpoint GHL. sucursal_field_id: field_id (str) o lista de field_ids en orden de prioridad para extraer el value del campo Sucursal desde submission['others']. Soportar lista permite tolerar versiones historicas del formulario que usaban otro field_id. """ if not submissions: return 0 conn = get_db_connection() try: cur = conn.cursor() cur.execute("BEGIN") n = 0 for s in submissions: sid = s.get("id") if not sid: continue others = s.get("others") or {} sucursal_value = _extract_sucursal_from_others(others, sucursal_field_id) cur.execute(""" INSERT OR REPLACE INTO form_submissions (id, location_id, form_id, contact_id, name, email, phone, sucursal_value, created_at, others_json, raw_json, synced_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, datetime('now', 'localtime')) """, ( sid, location_id, s.get("formId"), s.get("contactId"), s.get("name"), s.get("email"), (others.get("phone") if isinstance(others, dict) else None) or s.get("phone"), sucursal_value, s.get("createdAt"), json.dumps(others, ensure_ascii=False) if others else None, json.dumps(s, ensure_ascii=False), )) n += 1 conn.commit() return n except Exception: conn.rollback() raise finally: conn.close() def reextract_sucursal_values(location_id, sucursal_field_ids): """Recorre los form_submissions guardados y rellena sucursal_value usando la lista de field_ids candidatos (en orden de prioridad). Util cuando se descubre un field_id historico nuevo: re-procesa sin pegarle de nuevo a GHL. Devuelve (updated, total) — cuantos quedaron poblados y cuantos hay totales. """ conn = get_db_connection() try: rows = conn.execute( "SELECT id, others_json FROM form_submissions WHERE location_id = ?", (location_id,) ).fetchall() updated = 0 cur = conn.cursor() cur.execute("BEGIN") for r in rows: try: others = json.loads(r["others_json"] or "{}") except Exception: continue val = _extract_sucursal_from_others(others, sucursal_field_ids) cur.execute( "UPDATE form_submissions SET sucursal_value = ? WHERE id = ? AND location_id = ?", (val, r["id"], location_id), ) if val: updated += 1 conn.commit() return updated, len(rows) except Exception: conn.rollback() raise finally: conn.close() def get_forms(location_id=None): conn = get_db_connection() try: if location_id: rows = conn.execute( "SELECT * FROM forms WHERE location_id = ? ORDER BY name", (location_id,) ).fetchall() else: rows = conn.execute("SELECT * FROM forms ORDER BY location_id, name").fetchall() return [dict(r) for r in rows] finally: conn.close() def get_form_submissions(location_id=None, form_id=None, with_sucursal_only=False): conn = get_db_connection() try: sql = "SELECT * FROM form_submissions WHERE 1=1" params = [] if location_id: sql += " AND location_id = ?" params.append(location_id) if form_id: sql += " AND form_id = ?" params.append(form_id) if with_sucursal_only: sql += " AND sucursal_value IS NOT NULL AND TRIM(sucursal_value) != ''" sql += " ORDER BY created_at DESC" rows = conn.execute(sql, params).fetchall() return [dict(r) for r in rows] finally: conn.close() def count_form_submissions(location_id=None): conn = get_db_connection() try: if location_id: row = conn.execute( "SELECT COUNT(*) AS n FROM form_submissions WHERE location_id = ?", (location_id,) ).fetchone() else: row = conn.execute("SELECT COUNT(*) AS n FROM form_submissions").fetchone() return row["n"] if row else 0 finally: conn.close() # Inicializar Base de Datos al importar init_db()