#!/usr/bin/env python3 """ GHL Analytics - Parallel branch analysis for Monte Providencia Enhanced with custom field validation. Usage: python3 ghl_branch_analysis.py [--location LOC_ID] [--brief] [--full] Analyzes per branch: - Contact source distribution (SUCURSAL, Formulario, FACEBOOK, etc.) - Canal de Origen (contact & opportunity) — validates against expected values - Fuente de Prospecto (contact & opportunity) — validates against expected values - Contact / Opportunity counts and status breakdown """ import json import argparse import os import sys from concurrent.futures import ThreadPoolExecutor, as_completed import requests # Set project root in sys.path ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) import sync_engine # ─── Valid values for custom fields ─────────────────────────────── VALID_CONTACT_CANAL = {"FACEBOOK", "SUCURSAL", "FORMULARIO"} VALID_CONTACT_FUENTE = {"SUCURSAL", "LEAD DIGITAL"} VALID_OPP_CANAL = {"Facebook", "Sucursal", "Formulario"} VALID_OPP_FUENTE = {"SUCURSAL", "LEAD DIGITAL"} # Credentials are dynamically loaded from sync_engine (Bucéfalo - Mesa de control - API Tokens - MP.csv) # ─── Field name candidates in different locations ───────────────── OPP_CANAL_NAMES = ["CANAL DE ORIGEN", "Canal de Origen de la Oportunidad", "Canal de Origen"] OPP_FUENTE_NAMES = ["Fuente de Prospecto", "Fuente del Prospecto", "FUENTE DE PROSPECTO"] # ─── API Helpers ────────────────────────────────────────────────── def get_object_schema(loc_id, token, obj_key): """GET /objects/ then /objects/{key}?locationId= -> {fieldName: fieldId}""" try: headers = { 'Accept': 'application/json', 'Authorization': f'Bearer {token}', 'Version': '2021-04-15' } params = {"locationId": loc_id} catalog_url = "https://services.leadconnectorhq.com/objects/" requests.get(catalog_url, headers=headers, params=params, timeout=30) url = f"https://services.leadconnectorhq.com/objects/{obj_key}" res = requests.get(url, headers=headers, params=params, timeout=30) if res.status_code != 200: return {} data = res.json() if 'error' in data: return {} return {f['name']: f['id'] for f in (data.get('fields') or [])} except Exception as e: return {} def get_contacts(loc_id, token): """POST /contacts/search → list of contacts with customFields (no additionalFields needed)""" try: headers = { 'Accept': 'application/json', 'Authorization': f'Bearer {token}', 'Version': '2021-07-28', 'Content-Type': 'application/json' } url = "https://services.leadconnectorhq.com/contacts/search" body = {"locationId": loc_id, "pageLimit": 500} res = requests.post(url, headers=headers, json=body, timeout=30) if res.status_code not in (200, 201): return [] data = res.json() return data.get('contacts') or [] except Exception as e: return [] def get_opportunities(loc_id, token): """POST /opportunities/search → list of all opportunities with customFields""" try: headers = { 'Accept': 'application/json', 'Authorization': f'Bearer {token}', 'Version': '2021-07-28', 'Content-Type': 'application/json' } url = "https://services.leadconnectorhq.com/opportunities/search" body = {"locationId": loc_id, "limit": 100} res = requests.post(url, headers=headers, json=body, timeout=30) if res.status_code not in (200, 201): return [] data = res.json() return data.get('opportunities') or [] except Exception as e: return [] def get_field_value(obj, field_id): """ Extract a custom field value from a contact or opportunity. Contacts use 'value', opportunities use 'fieldValueString' (or 'value'). """ custom_fields = obj.get('customFields') or [] for cf in custom_fields: if cf.get('id') == field_id: val = cf.get('fieldValueString') or cf.get('value') return val if val else None return None def find_field_id(schema, names): """Try multiple field names in schema, return (name, id) or (None, None).""" for name in names: fid = schema.get(name) if fid: return name, fid return None, None # ─── Analysis ───────────────────────────────────────────────────── def analyze_location(loc_id, token, name, brief=False): """ Full analysis of a single location: - Contacts: count, source distribution, Canal de Origen, Fuente de Prospecto - Opportunities: count, status, CANAL DE ORIGEN, Fuente de Prospecto - Validation: flag invalid values """ try: # ── Schemas ── contact_schema = get_object_schema(loc_id, token, "contact") opp_schema = get_object_schema(loc_id, token, "opportunity") c_canal_id = contact_schema.get("Canal de Origen") c_fuente_id = contact_schema.get("Fuente de Prospecto") opp_canal_name, o_canal_id = find_field_id(opp_schema, OPP_CANAL_NAMES) opp_fuente_name, o_fuente_id = find_field_id(opp_schema, OPP_FUENTE_NAMES) # ── Data ── contacts = get_contacts(loc_id, token) opportunities = get_opportunities(loc_id, token) # ── Contact analysis ── total_contacts = len(contacts) # Source distribution source_dist = {} for c in contacts: src = c.get('source') or '(none)' source_dist[src] = source_dist.get(src, 0) + 1 # Contact Canal de Origen c_canal_valid = 0 c_canal_invalid = 0 c_canal_empty = 0 c_canal_dist = {} if c_canal_id: for c in contacts: val = get_field_value(c, c_canal_id) if not val: c_canal_empty += 1 elif val in VALID_CONTACT_CANAL: c_canal_valid += 1 c_canal_dist[val] = c_canal_dist.get(val, 0) + 1 else: c_canal_invalid += 1 c_canal_dist[val] = c_canal_dist.get(val, 0) + 1 # Contact Fuente de Prospecto c_fuente_valid = 0 c_fuente_invalid = 0 c_fuente_empty = 0 c_fuente_dist = {} if c_fuente_id: for c in contacts: val = get_field_value(c, c_fuente_id) if not val: c_fuente_empty += 1 elif val in VALID_CONTACT_FUENTE: c_fuente_valid += 1 c_fuente_dist[val] = c_fuente_dist.get(val, 0) + 1 else: c_fuente_invalid += 1 c_fuente_dist[val] = c_fuente_dist.get(val, 0) + 1 # ── Opportunity analysis ── total_opps = len(opportunities) opp_by_status = {} for o in opportunities: s = o.get('status', 'unknown') opp_by_status[s] = opp_by_status.get(s, 0) + 1 # Opp CANAL DE ORIGEN o_canal_valid = 0 o_canal_invalid = 0 o_canal_empty = 0 o_canal_dist = {} if o_canal_id: for o in opportunities: val = get_field_value(o, o_canal_id) if not val: o_canal_empty += 1 elif val in VALID_OPP_CANAL: o_canal_valid += 1 o_canal_dist[val] = o_canal_dist.get(val, 0) + 1 else: o_canal_invalid += 1 o_canal_dist[val] = o_canal_dist.get(val, 0) + 1 # Opp Fuente de Prospecto o_fuente_valid = 0 o_fuente_invalid = 0 o_fuente_empty = 0 o_fuente_dist = {} if o_fuente_id: for o in opportunities: val = get_field_value(o, o_fuente_id) if not val: o_fuente_empty += 1 elif val in VALID_OPP_FUENTE: o_fuente_valid += 1 o_fuente_dist[val] = o_fuente_dist.get(val, 0) + 1 else: o_fuente_invalid += 1 o_fuente_dist[val] = o_fuente_dist.get(val, 0) + 1 # Contacts with / without opportunities (embedded in contact data) contacts_with_opps = 0 embedded_opps = 0 for c in contacts: opps = c.get('opportunities') or [] if opps: contacts_with_opps += 1 embedded_opps += len(opps) if brief: return { 'name': name, 'contacts': total_contacts, 'opportunities': total_opps, 'open': opp_by_status.get('open', 0), 'won': opp_by_status.get('won', 0), 'lost': opp_by_status.get('lost', 0), 'abandoned': opp_by_status.get('abandoned', 0), 'c_canal_invalid': c_canal_invalid + c_canal_empty, 'c_fuente_invalid': c_fuente_invalid + c_fuente_empty, 'o_canal_invalid': o_canal_invalid + o_canal_empty, 'o_fuente_invalid': o_fuente_invalid + o_fuente_empty, } return { 'name': name, 'locId': loc_id, 'contacts': total_contacts, 'contactsWithOpps': contacts_with_opps, 'noOpps': total_contacts - contacts_with_opps, 'embeddedOpps': embedded_opps, 'opportunities': total_opps, 'byStatus': opp_by_status, 'sourceDist': source_dist, 'cCanal': {'dist': c_canal_dist, 'valid': c_canal_valid, 'invalid': c_canal_invalid, 'empty': c_canal_empty}, 'cFuente': {'dist': c_fuente_dist, 'valid': c_fuente_valid, 'invalid': c_fuente_invalid, 'empty': c_fuente_empty}, 'oCanal': {'dist': o_canal_dist, 'valid': o_canal_valid, 'invalid': o_canal_invalid, 'empty': o_canal_empty}, 'oFuente': {'dist': o_fuente_dist, 'valid': o_fuente_valid, 'invalid': o_fuente_invalid, 'empty': o_fuente_empty}, 'schemas': { 'c_canal_id': c_canal_id, 'c_fuente_id': c_fuente_id, 'o_canal_id': o_canal_id, 'o_fuente_id': o_fuente_id, } } except Exception as e: return {'name': name, 'locId': loc_id, 'error': str(e)} # ─── Output Formatters ──────────────────────────────────────────── def print_brief(results): """Print summary table with validation columns.""" totals = {'contacts': 0, 'opportunities': 0, 'open': 0, 'won': 0, 'lost': 0, 'abandoned': 0, 'c_canal_invalid': 0, 'c_fuente_invalid': 0, 'o_canal_invalid': 0, 'o_fuente_invalid': 0} print("\n" + "=" * 120) print("GHL ANALYTICS - MONTE PROVIDENCIA — FIELD VALIDATION") print("=" * 120) header = (f"\n{'Sucursal':<20} {'Cont':>5} {'Opps':>5} {'Abrt':>5} {'Gand':>5} " f"{'Diff':>5} {'❌C.Cnl':>7} {'❌C.Fte':>7} {'❌O.Cnl':>7} {'❌O.Fte':>7}") print(header) print("-" * 120) for r in sorted(results, key=lambda x: x.get('name', '')): diff = r.get('opportunities', 0) - r.get('contacts', 0) diff_str = f"+{diff}" if diff > 0 else str(diff) cci = r.get('c_canal_invalid', 0) cfi = r.get('c_fuente_invalid', 0) oci = r.get('o_canal_invalid', 0) ofi = r.get('o_fuente_invalid', 0) # Highlight branches with issues flag = " ⚠️" if (cci + cfi + oci + ofi) > 0 else "" print(f"{r['name']:<20} {r['contacts']:>5} {r['opportunities']:>5} " f"{r.get('open', 0):>5} {r.get('won', 0):>5} {diff_str:>5} " f"{cci:>7} {cfi:>7} {oci:>7} {ofi:>7}{flag}") for k in totals: totals[k] += r.get(k, 0) print("-" * 120) total_diff = totals['opportunities'] - totals['contacts'] diff_str = f"+{total_diff}" if total_diff > 0 else str(total_diff) print(f"{'TOTAL':<20} {totals['contacts']:>5} {totals['opportunities']:>5} " f"{totals['open']:>5} {totals['won']:>5} {diff_str:>5} " f"{totals['c_canal_invalid']:>7} {totals['c_fuente_invalid']:>7} " f"{totals['o_canal_invalid']:>7} {totals['o_fuente_invalid']:>7}") print(f"\n ❌C.Cnl = Contactos con Canal de Origen inválido/vacío") print(f" ❌C.Fte = Contactos con Fuente de Prospecto inválido/vacío") print(f" ❌O.Cnl = Oportunidades con CANAL DE ORIGEN inválido/vacío") print(f" ❌O.Fte = Oportunidades con Fuente de Prospecto inválido/vacío") print(f"\n Status: Abiertos={totals['open']} | Ganados={totals['won']} | " f"Perdidos={totals['lost']} | Abandonados={totals['abandoned']}") def print_full(results): """Print full detailed report per branch.""" print("\n" + "=" * 80) print("GHL ANALYTICS - MONTE PROVIDENCIA — FULL REPORT") print("=" * 80) totals = {'contacts': 0, 'opps': 0, 'open': 0, 'won': 0, 'lost': 0, 'abandoned': 0} for r in sorted(results, key=lambda x: x.get('name', '')): if 'error' in r: print(f"\n❌ {r['name']}: ERROR - {r['error']}") continue s = r.get('byStatus', {}) src = r.get('sourceDist', {}) ccan = r.get('cCanal', {}) cfue = r.get('cFuente', {}) ocan = r.get('oCanal', {}) ofue = r.get('oFuente', {}) print(f"\n{'─' * 80}") print(f"📊 {r['name']} ({r['locId']})") print(f"{'─' * 80}") # Counts print(f" Contactos: {r['contacts']} | Con opps: {r['contactsWithOpps']} | Sin opps: {r['noOpps']}") print(f" Oportunidades: {r['opportunities']} (embebidas en contacts: {r.get('embeddedOpps', '?')})") print(f" Status: Abiertos={s.get('open', 0)} | Ganados={s.get('won', 0)} | " f"Perdidos={s.get('lost', 0)} | Abandonados={s.get('abandoned', 0)}") # Source if src: print(f"\n 📋 Contact Source:") for v, c in sorted(src.items(), key=lambda x: -x[1]): bar = "█" * min(c, 50) print(f" {v:<15} {c:>4} {bar}") # Contact Canal de Origen if ccan: print(f"\n 🔵 Contacto — Canal de Origen (válidos: {', '.join(sorted(VALID_CONTACT_CANAL))}):") print(f" ✅ Válidos: {ccan.get('valid', 0)} | ❌ Inválidos: {ccan.get('invalid', 0)} | ⚪ Vacíos: {ccan.get('empty', 0)}") for v, c in sorted(ccan.get('dist', {}).items(), key=lambda x: -x[1]): valid_mark = "✅" if v in VALID_CONTACT_CANAL else "❌" print(f" {valid_mark} {v:<20} {c:>4}") # Contact Fuente de Prospecto if cfue: print(f"\n 🟢 Contacto — Fuente de Prospecto (válidos: {', '.join(sorted(VALID_CONTACT_FUENTE))}):") print(f" ✅ Válidos: {cfue.get('valid', 0)} | ❌ Inválidos: {cfue.get('invalid', 0)} | ⚪ Vacíos: {cfue.get('empty', 0)}") for v, c in sorted(cfue.get('dist', {}).items(), key=lambda x: -x[1]): valid_mark = "✅" if v in VALID_CONTACT_FUENTE else "❌" print(f" {valid_mark} {v:<20} {c:>4}") # Opp CANAL DE ORIGEN if ocan: print(f"\n 🔵 Oportunidad — CANAL DE ORIGEN (válidos: {', '.join(sorted(VALID_OPP_CANAL))}):") print(f" ✅ Válidos: {ocan.get('valid', 0)} | ❌ Inválidos: {ocan.get('invalid', 0)} | ⚪ Vacíos: {ocan.get('empty', 0)}") for v, c in sorted(ocan.get('dist', {}).items(), key=lambda x: -x[1]): valid_mark = "✅" if v in VALID_OPP_CANAL else "❌" print(f" {valid_mark} {v:<20} {c:>4}") # Opp Fuente de Prospecto if ofue: print(f"\n 🟢 Oportunidad — Fuente de Prospecto (válidos: {', '.join(sorted(VALID_OPP_FUENTE))}):") print(f" ✅ Válidos: {ofue.get('valid', 0)} | ❌ Inválidos: {ofue.get('invalid', 0)} | ⚪ Vacíos: {ofue.get('empty', 0)}") for v, c in sorted(ofue.get('dist', {}).items(), key=lambda x: -x[1]): valid_mark = "✅" if v in VALID_OPP_FUENTE else "❌" print(f" {valid_mark} {v:<20} {c:>4}") # Totals totals['contacts'] += r['contacts'] totals['opps'] += r['opportunities'] totals['open'] += s.get('open', 0) totals['won'] += s.get('won', 0) totals['lost'] += s.get('lost', 0) totals['abandoned'] += s.get('abandoned', 0) print(f"\n{'=' * 80}") print(f"TOTALES: {totals['contacts']} contactos | {totals['opps']} oportunidades") print(f"Abiertos: {totals['open']} | Ganados: {totals['won']} | Perdidos: {totals['lost']} | Abandonados: {totals['abandoned']}") # ─── Main ───────────────────────────────────────────────────────── if __name__ == "__main__": # Configure stdout to use UTF-8 encoding to prevent UnicodeEncodeError on Windows if sys.platform == "win32": try: sys.stdout.reconfigure(encoding='utf-8') except AttributeError: pass parser = argparse.ArgumentParser(description='GHL Analytics - Branch analysis with field validation') parser.add_argument('--location', help='Specific location ID to analyze') parser.add_argument('--brief', action='store_true', help='Brief output (table with validation flags)') args = parser.parse_args() # Load dynamic credentials from sync_engine try: accounts = sync_engine.parse_accounts_csv() except Exception as e: print(f"Error loading credentials from CSV: {e}") sys.exit(1) main_acc = next((acc for acc in accounts if acc["location_id"] == "GbKkBpCmKu2QmloKFHy3"), None) branches = [acc for acc in accounts if acc["location_id"] != "GbKkBpCmKu2QmloKFHy3"] all_branches = branches + ([main_acc] if main_acc else []) if args.location: loc = next((acc for acc in all_branches if acc["location_id"] == args.location), None) if loc: r = analyze_location(loc["location_id"], loc["token"], loc["nombre"], brief=args.brief) if args.brief: print_brief([r]) else: print_full([r]) else: print(f"Location {args.location} not found") else: # All locations in parallel if not branches: print("No branch locations found in CSV to analyze.") sys.exit(0) print(" 🔍 Analyzing all branches in parallel (10 workers)...") with ThreadPoolExecutor(max_workers=10) as executor: futures = { executor.submit(analyze_location, acc["location_id"], acc["token"], acc["nombre"], args.brief): acc for acc in branches } results = [] for future in as_completed(futures): results.append(future.result()) if args.brief: print_brief(results) else: print_full(results)