Files
MP-Manager/scripts/ghl_branch_analysis.py
2026-05-30 14:31:19 -06:00

477 lines
20 KiB
Python

#!/usr/bin/env python3
"""
GHL Analytics - Parallel branch analysis for Monte Providencia
Enhanced with custom field validation.
Usage: python3 ghl_branch_analysis.py [--location LOC_ID] [--brief] [--full]
Analyzes per branch:
- Contact source distribution (SUCURSAL, Formulario, FACEBOOK, etc.)
- Canal de Origen (contact & opportunity) — validates against expected values
- Fuente de Prospecto (contact & opportunity) — validates against expected values
- Contact / Opportunity counts and status breakdown
"""
import json
import argparse
import os
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
# Set project root in sys.path
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
if ROOT_DIR not in sys.path:
sys.path.insert(0, ROOT_DIR)
import sync_engine
# ─── Valid values for custom fields ───────────────────────────────
VALID_CONTACT_CANAL = {"FACEBOOK", "SUCURSAL", "FORMULARIO"}
VALID_CONTACT_FUENTE = {"SUCURSAL", "LEAD DIGITAL"}
VALID_OPP_CANAL = {"Facebook", "Sucursal", "Formulario"}
VALID_OPP_FUENTE = {"SUCURSAL", "LEAD DIGITAL"}
# Credentials are dynamically loaded from sync_engine (Bucéfalo - Mesa de control - API Tokens - MP.csv)
# ─── Field name candidates in different locations ─────────────────
OPP_CANAL_NAMES = ["CANAL DE ORIGEN", "Canal de Origen de la Oportunidad", "Canal de Origen"]
OPP_FUENTE_NAMES = ["Fuente de Prospecto", "Fuente del Prospecto", "FUENTE DE PROSPECTO"]
# ─── API Helpers ──────────────────────────────────────────────────
def get_object_schema(loc_id, token, obj_key):
"""GET /objects/ then /objects/{key}?locationId= -> {fieldName: fieldId}"""
try:
headers = {
'Accept': 'application/json',
'Authorization': f'Bearer {token}',
'Version': '2021-04-15'
}
params = {"locationId": loc_id}
catalog_url = "https://services.leadconnectorhq.com/objects/"
requests.get(catalog_url, headers=headers, params=params, timeout=30)
url = f"https://services.leadconnectorhq.com/objects/{obj_key}"
res = requests.get(url, headers=headers, params=params, timeout=30)
if res.status_code != 200:
return {}
data = res.json()
if 'error' in data:
return {}
return {f['name']: f['id'] for f in (data.get('fields') or [])}
except Exception as e:
return {}
def get_contacts(loc_id, token):
"""POST /contacts/search → list of contacts with customFields (no additionalFields needed)"""
try:
headers = {
'Accept': 'application/json',
'Authorization': f'Bearer {token}',
'Version': '2021-07-28',
'Content-Type': 'application/json'
}
url = "https://services.leadconnectorhq.com/contacts/search"
body = {"locationId": loc_id, "pageLimit": 500}
res = requests.post(url, headers=headers, json=body, timeout=30)
if res.status_code not in (200, 201):
return []
data = res.json()
return data.get('contacts') or []
except Exception as e:
return []
def get_opportunities(loc_id, token):
"""POST /opportunities/search → list of all opportunities with customFields"""
try:
headers = {
'Accept': 'application/json',
'Authorization': f'Bearer {token}',
'Version': '2021-07-28',
'Content-Type': 'application/json'
}
url = "https://services.leadconnectorhq.com/opportunities/search"
body = {"locationId": loc_id, "limit": 100}
res = requests.post(url, headers=headers, json=body, timeout=30)
if res.status_code not in (200, 201):
return []
data = res.json()
return data.get('opportunities') or []
except Exception as e:
return []
def get_field_value(obj, field_id):
"""
Extract a custom field value from a contact or opportunity.
Contacts use 'value', opportunities use 'fieldValueString' (or 'value').
"""
custom_fields = obj.get('customFields') or []
for cf in custom_fields:
if cf.get('id') == field_id:
val = cf.get('fieldValueString') or cf.get('value')
return val if val else None
return None
def find_field_id(schema, names):
"""Try multiple field names in schema, return (name, id) or (None, None)."""
for name in names:
fid = schema.get(name)
if fid:
return name, fid
return None, None
# ─── Analysis ─────────────────────────────────────────────────────
def analyze_location(loc_id, token, name, brief=False):
"""
Full analysis of a single location:
- Contacts: count, source distribution, Canal de Origen, Fuente de Prospecto
- Opportunities: count, status, CANAL DE ORIGEN, Fuente de Prospecto
- Validation: flag invalid values
"""
try:
# ── Schemas ──
contact_schema = get_object_schema(loc_id, token, "contact")
opp_schema = get_object_schema(loc_id, token, "opportunity")
c_canal_id = contact_schema.get("Canal de Origen")
c_fuente_id = contact_schema.get("Fuente de Prospecto")
opp_canal_name, o_canal_id = find_field_id(opp_schema, OPP_CANAL_NAMES)
opp_fuente_name, o_fuente_id = find_field_id(opp_schema, OPP_FUENTE_NAMES)
# ── Data ──
contacts = get_contacts(loc_id, token)
opportunities = get_opportunities(loc_id, token)
# ── Contact analysis ──
total_contacts = len(contacts)
# Source distribution
source_dist = {}
for c in contacts:
src = c.get('source') or '(none)'
source_dist[src] = source_dist.get(src, 0) + 1
# Contact Canal de Origen
c_canal_valid = 0
c_canal_invalid = 0
c_canal_empty = 0
c_canal_dist = {}
if c_canal_id:
for c in contacts:
val = get_field_value(c, c_canal_id)
if not val:
c_canal_empty += 1
elif val in VALID_CONTACT_CANAL:
c_canal_valid += 1
c_canal_dist[val] = c_canal_dist.get(val, 0) + 1
else:
c_canal_invalid += 1
c_canal_dist[val] = c_canal_dist.get(val, 0) + 1
# Contact Fuente de Prospecto
c_fuente_valid = 0
c_fuente_invalid = 0
c_fuente_empty = 0
c_fuente_dist = {}
if c_fuente_id:
for c in contacts:
val = get_field_value(c, c_fuente_id)
if not val:
c_fuente_empty += 1
elif val in VALID_CONTACT_FUENTE:
c_fuente_valid += 1
c_fuente_dist[val] = c_fuente_dist.get(val, 0) + 1
else:
c_fuente_invalid += 1
c_fuente_dist[val] = c_fuente_dist.get(val, 0) + 1
# ── Opportunity analysis ──
total_opps = len(opportunities)
opp_by_status = {}
for o in opportunities:
s = o.get('status', 'unknown')
opp_by_status[s] = opp_by_status.get(s, 0) + 1
# Opp CANAL DE ORIGEN
o_canal_valid = 0
o_canal_invalid = 0
o_canal_empty = 0
o_canal_dist = {}
if o_canal_id:
for o in opportunities:
val = get_field_value(o, o_canal_id)
if not val:
o_canal_empty += 1
elif val in VALID_OPP_CANAL:
o_canal_valid += 1
o_canal_dist[val] = o_canal_dist.get(val, 0) + 1
else:
o_canal_invalid += 1
o_canal_dist[val] = o_canal_dist.get(val, 0) + 1
# Opp Fuente de Prospecto
o_fuente_valid = 0
o_fuente_invalid = 0
o_fuente_empty = 0
o_fuente_dist = {}
if o_fuente_id:
for o in opportunities:
val = get_field_value(o, o_fuente_id)
if not val:
o_fuente_empty += 1
elif val in VALID_OPP_FUENTE:
o_fuente_valid += 1
o_fuente_dist[val] = o_fuente_dist.get(val, 0) + 1
else:
o_fuente_invalid += 1
o_fuente_dist[val] = o_fuente_dist.get(val, 0) + 1
# Contacts with / without opportunities (embedded in contact data)
contacts_with_opps = 0
embedded_opps = 0
for c in contacts:
opps = c.get('opportunities') or []
if opps:
contacts_with_opps += 1
embedded_opps += len(opps)
if brief:
return {
'name': name,
'contacts': total_contacts,
'opportunities': total_opps,
'open': opp_by_status.get('open', 0),
'won': opp_by_status.get('won', 0),
'lost': opp_by_status.get('lost', 0),
'abandoned': opp_by_status.get('abandoned', 0),
'c_canal_invalid': c_canal_invalid + c_canal_empty,
'c_fuente_invalid': c_fuente_invalid + c_fuente_empty,
'o_canal_invalid': o_canal_invalid + o_canal_empty,
'o_fuente_invalid': o_fuente_invalid + o_fuente_empty,
}
return {
'name': name,
'locId': loc_id,
'contacts': total_contacts,
'contactsWithOpps': contacts_with_opps,
'noOpps': total_contacts - contacts_with_opps,
'embeddedOpps': embedded_opps,
'opportunities': total_opps,
'byStatus': opp_by_status,
'sourceDist': source_dist,
'cCanal': {'dist': c_canal_dist, 'valid': c_canal_valid, 'invalid': c_canal_invalid, 'empty': c_canal_empty},
'cFuente': {'dist': c_fuente_dist, 'valid': c_fuente_valid, 'invalid': c_fuente_invalid, 'empty': c_fuente_empty},
'oCanal': {'dist': o_canal_dist, 'valid': o_canal_valid, 'invalid': o_canal_invalid, 'empty': o_canal_empty},
'oFuente': {'dist': o_fuente_dist, 'valid': o_fuente_valid, 'invalid': o_fuente_invalid, 'empty': o_fuente_empty},
'schemas': {
'c_canal_id': c_canal_id,
'c_fuente_id': c_fuente_id,
'o_canal_id': o_canal_id,
'o_fuente_id': o_fuente_id,
}
}
except Exception as e:
return {'name': name, 'locId': loc_id, 'error': str(e)}
# ─── Output Formatters ────────────────────────────────────────────
def print_brief(results):
"""Print summary table with validation columns."""
totals = {'contacts': 0, 'opportunities': 0, 'open': 0, 'won': 0, 'lost': 0,
'abandoned': 0, 'c_canal_invalid': 0, 'c_fuente_invalid': 0,
'o_canal_invalid': 0, 'o_fuente_invalid': 0}
print("\n" + "=" * 120)
print("GHL ANALYTICS - MONTE PROVIDENCIA — FIELD VALIDATION")
print("=" * 120)
header = (f"\n{'Sucursal':<20} {'Cont':>5} {'Opps':>5} {'Abrt':>5} {'Gand':>5} "
f"{'Diff':>5} {'❌C.Cnl':>7} {'❌C.Fte':>7} {'❌O.Cnl':>7} {'❌O.Fte':>7}")
print(header)
print("-" * 120)
for r in sorted(results, key=lambda x: x.get('name', '')):
diff = r.get('opportunities', 0) - r.get('contacts', 0)
diff_str = f"+{diff}" if diff > 0 else str(diff)
cci = r.get('c_canal_invalid', 0)
cfi = r.get('c_fuente_invalid', 0)
oci = r.get('o_canal_invalid', 0)
ofi = r.get('o_fuente_invalid', 0)
# Highlight branches with issues
flag = " ⚠️" if (cci + cfi + oci + ofi) > 0 else ""
print(f"{r['name']:<20} {r['contacts']:>5} {r['opportunities']:>5} "
f"{r.get('open', 0):>5} {r.get('won', 0):>5} {diff_str:>5} "
f"{cci:>7} {cfi:>7} {oci:>7} {ofi:>7}{flag}")
for k in totals:
totals[k] += r.get(k, 0)
print("-" * 120)
total_diff = totals['opportunities'] - totals['contacts']
diff_str = f"+{total_diff}" if total_diff > 0 else str(total_diff)
print(f"{'TOTAL':<20} {totals['contacts']:>5} {totals['opportunities']:>5} "
f"{totals['open']:>5} {totals['won']:>5} {diff_str:>5} "
f"{totals['c_canal_invalid']:>7} {totals['c_fuente_invalid']:>7} "
f"{totals['o_canal_invalid']:>7} {totals['o_fuente_invalid']:>7}")
print(f"\n ❌C.Cnl = Contactos con Canal de Origen inválido/vacío")
print(f" ❌C.Fte = Contactos con Fuente de Prospecto inválido/vacío")
print(f" ❌O.Cnl = Oportunidades con CANAL DE ORIGEN inválido/vacío")
print(f" ❌O.Fte = Oportunidades con Fuente de Prospecto inválido/vacío")
print(f"\n Status: Abiertos={totals['open']} | Ganados={totals['won']} | "
f"Perdidos={totals['lost']} | Abandonados={totals['abandoned']}")
def print_full(results):
"""Print full detailed report per branch."""
print("\n" + "=" * 80)
print("GHL ANALYTICS - MONTE PROVIDENCIA — FULL REPORT")
print("=" * 80)
totals = {'contacts': 0, 'opps': 0, 'open': 0, 'won': 0, 'lost': 0, 'abandoned': 0}
for r in sorted(results, key=lambda x: x.get('name', '')):
if 'error' in r:
print(f"\n{r['name']}: ERROR - {r['error']}")
continue
s = r.get('byStatus', {})
src = r.get('sourceDist', {})
ccan = r.get('cCanal', {})
cfue = r.get('cFuente', {})
ocan = r.get('oCanal', {})
ofue = r.get('oFuente', {})
print(f"\n{'' * 80}")
print(f"📊 {r['name']} ({r['locId']})")
print(f"{'' * 80}")
# Counts
print(f" Contactos: {r['contacts']} | Con opps: {r['contactsWithOpps']} | Sin opps: {r['noOpps']}")
print(f" Oportunidades: {r['opportunities']} (embebidas en contacts: {r.get('embeddedOpps', '?')})")
print(f" Status: Abiertos={s.get('open', 0)} | Ganados={s.get('won', 0)} | "
f"Perdidos={s.get('lost', 0)} | Abandonados={s.get('abandoned', 0)}")
# Source
if src:
print(f"\n 📋 Contact Source:")
for v, c in sorted(src.items(), key=lambda x: -x[1]):
bar = "" * min(c, 50)
print(f" {v:<15} {c:>4} {bar}")
# Contact Canal de Origen
if ccan:
print(f"\n 🔵 Contacto — Canal de Origen (válidos: {', '.join(sorted(VALID_CONTACT_CANAL))}):")
print(f" ✅ Válidos: {ccan.get('valid', 0)} | ❌ Inválidos: {ccan.get('invalid', 0)} | ⚪ Vacíos: {ccan.get('empty', 0)}")
for v, c in sorted(ccan.get('dist', {}).items(), key=lambda x: -x[1]):
valid_mark = "" if v in VALID_CONTACT_CANAL else ""
print(f" {valid_mark} {v:<20} {c:>4}")
# Contact Fuente de Prospecto
if cfue:
print(f"\n 🟢 Contacto — Fuente de Prospecto (válidos: {', '.join(sorted(VALID_CONTACT_FUENTE))}):")
print(f" ✅ Válidos: {cfue.get('valid', 0)} | ❌ Inválidos: {cfue.get('invalid', 0)} | ⚪ Vacíos: {cfue.get('empty', 0)}")
for v, c in sorted(cfue.get('dist', {}).items(), key=lambda x: -x[1]):
valid_mark = "" if v in VALID_CONTACT_FUENTE else ""
print(f" {valid_mark} {v:<20} {c:>4}")
# Opp CANAL DE ORIGEN
if ocan:
print(f"\n 🔵 Oportunidad — CANAL DE ORIGEN (válidos: {', '.join(sorted(VALID_OPP_CANAL))}):")
print(f" ✅ Válidos: {ocan.get('valid', 0)} | ❌ Inválidos: {ocan.get('invalid', 0)} | ⚪ Vacíos: {ocan.get('empty', 0)}")
for v, c in sorted(ocan.get('dist', {}).items(), key=lambda x: -x[1]):
valid_mark = "" if v in VALID_OPP_CANAL else ""
print(f" {valid_mark} {v:<20} {c:>4}")
# Opp Fuente de Prospecto
if ofue:
print(f"\n 🟢 Oportunidad — Fuente de Prospecto (válidos: {', '.join(sorted(VALID_OPP_FUENTE))}):")
print(f" ✅ Válidos: {ofue.get('valid', 0)} | ❌ Inválidos: {ofue.get('invalid', 0)} | ⚪ Vacíos: {ofue.get('empty', 0)}")
for v, c in sorted(ofue.get('dist', {}).items(), key=lambda x: -x[1]):
valid_mark = "" if v in VALID_OPP_FUENTE else ""
print(f" {valid_mark} {v:<20} {c:>4}")
# Totals
totals['contacts'] += r['contacts']
totals['opps'] += r['opportunities']
totals['open'] += s.get('open', 0)
totals['won'] += s.get('won', 0)
totals['lost'] += s.get('lost', 0)
totals['abandoned'] += s.get('abandoned', 0)
print(f"\n{'=' * 80}")
print(f"TOTALES: {totals['contacts']} contactos | {totals['opps']} oportunidades")
print(f"Abiertos: {totals['open']} | Ganados: {totals['won']} | Perdidos: {totals['lost']} | Abandonados: {totals['abandoned']}")
# ─── Main ─────────────────────────────────────────────────────────
if __name__ == "__main__":
# Configure stdout to use UTF-8 encoding to prevent UnicodeEncodeError on Windows
if sys.platform == "win32":
try:
sys.stdout.reconfigure(encoding='utf-8')
except AttributeError:
pass
parser = argparse.ArgumentParser(description='GHL Analytics - Branch analysis with field validation')
parser.add_argument('--location', help='Specific location ID to analyze')
parser.add_argument('--brief', action='store_true', help='Brief output (table with validation flags)')
args = parser.parse_args()
# Load dynamic credentials from sync_engine
try:
accounts = sync_engine.parse_accounts_csv()
except Exception as e:
print(f"Error loading credentials from CSV: {e}")
sys.exit(1)
main_acc = next((acc for acc in accounts if acc["location_id"] == "GbKkBpCmKu2QmloKFHy3"), None)
branches = [acc for acc in accounts if acc["location_id"] != "GbKkBpCmKu2QmloKFHy3"]
all_branches = branches + ([main_acc] if main_acc else [])
if args.location:
loc = next((acc for acc in all_branches if acc["location_id"] == args.location), None)
if loc:
r = analyze_location(loc["location_id"], loc["token"], loc["nombre"], brief=args.brief)
if args.brief:
print_brief([r])
else:
print_full([r])
else:
print(f"Location {args.location} not found")
else:
# All locations in parallel
if not branches:
print("No branch locations found in CSV to analyze.")
sys.exit(0)
print(" 🔍 Analyzing all branches in parallel (10 workers)...")
with ThreadPoolExecutor(max_workers=10) as executor:
futures = {
executor.submit(analyze_location, acc["location_id"], acc["token"], acc["nombre"], args.brief): acc
for acc in branches
}
results = []
for future in as_completed(futures):
results.append(future.result())
if args.brief:
print_brief(results)
else:
print_full(results)