#!/usr/bin/env python3 # -*- coding: utf-8 -*- """Super script de origen "Sucursal" para contactos creados por un usuario. Unifica la lógica que estaba dispersa (fix_web_user_branch_contacts.py, tag_canal_origen_workflow.py, sync_contact_sucursal_to_opportunity.py, fill_sucursal_tienda_from_location.py) en una sola herramienta auditada que SOLO opera sobre sucursales (excluye Marca y demos). Criterio de identificación: ``contact.createdBy.source in {"WEB_USER", "MOBILE_USER"}`` — los contactos replicados desde Marca llegan por integración (source = INTEGRATION); los creados manualmente en la sucursal por un empleado quedan como WEB_USER (UI web) o MOBILE_USER (app móvil). ``createdBy`` SOLO viene en el GET individual del contacto: el listado paginado ``GET /contacts/`` lo omite. Fase 1 (dry-run, modo por defecto): - Muestra por sucursal la distribución real de ``createdBy.source``. - Imprime el plan de cambios por contacto/oportunidad sin escribir nada. Fase 2 (--apply --run-id), orden estricto contacto -> oportunidad: 1. Contacto: tag único ``sucursal`` (quita formulario / facebook-ads), Canal de Origen = SUCURSAL. Si le falta Sucursal / TIENDA, se completan desde el Verificador CSV (valor determinístico por location_id). 2. TODAS las oportunidades del contacto: Canal de Origen de la Oportunidad = Sucursal, y se propaga Sucursal / TIENDA del contacto. NO toca "Fuente de Prospecto" (contiene valores de negocio como ALIANZA / PROSPECCIÓN que no deben sobrescribirse). Todo cambio queda en script_audit (reversible por run_id desde el dashboard). """ import argparse import os import sys from collections import Counter ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT_DIR not in sys.path: sys.path.insert(0, ROOT_DIR) import script_audit # noqa: E402 import sync_engine # noqa: E402 # get_object_schema_fields para opps import common # noqa: E402 # FIELD_ALIASES + normalize_name from tag_canal_origen_workflow import ( # noqa: E402 MAIN_LOCATION_ID, contact_display_name, get_all_contacts, get_all_opportunities_for_contact, get_custom_field_value, get_opportunity, get_schemas, ghl_request, load_locations, resolve_opp_field_ids, safe_add_contact_tag, safe_remove_contact_tag, safe_update_contact_field, safe_update_opportunity_field, ) from fill_sucursal_tienda_from_location import load_verifier_map # noqa: E402 SCRIPT_NAME = "fix_branch_user_origin.py" # --- Constantes de negocio (alineadas a los scripts existentes, no inventar) --- # Fuentes de creación "por un usuario" dentro de la sucursal (no integración): # WEB_USER -> CRM web UI # MOBILE_USER -> app móvil del CRM USER_SOURCES = {"WEB_USER", "MOBILE_USER"} SUCURSAL_TAG = "sucursal" ORIGIN_TAGS_TO_REMOVE = ["formulario", "facebook-ads"] CONTACT_CANAL_VALUE = "SUCURSAL" OPP_CANAL_FIELD = "Canal de Origen de la Oportunidad" OPP_CANAL_VALUE = "Sucursal" # Demos: convención del repo (IDs hardcodeados + nombre que contiene "demo"). DEMO_LOCATION_IDS = {"Vf7qQl3L9vakJ8hDtQ8e", "Z64WQKORPVwXb5mn68Ef"} # --------------------------------------------------------------------------- # # Selección de cuentas (solo sucursales productivas) # --------------------------------------------------------------------------- # def is_demo_account(account): name = (account.get("nombre") or "").lower() return account["location_id"] in DEMO_LOCATION_IDS or "demo" in name def select_branch_locations(args): branches = [ acc for acc in load_locations(include_main=False) if acc["location_id"] != MAIN_LOCATION_ID and not is_demo_account(acc) ] if args.location: if args.location == MAIN_LOCATION_ID: raise SystemExit("Este script NO aplica a la cuenta de Marca.") if args.location in DEMO_LOCATION_IDS: raise SystemExit("Este script NO aplica a cuentas demo.") matches = [acc for acc in branches if acc["location_id"] == args.location] if not matches: raise SystemExit(f"Location {args.location} no es una sucursal productiva del CSV.") return matches if args.all: return branches raise SystemExit("Especifica --location o --all. Sin --apply corre en dry-run.") # --------------------------------------------------------------------------- # # Resolución de campos por alias (tolerante a mayúsculas/variantes) # --------------------------------------------------------------------------- # def resolve_schema_field_id(schema_dict, alias_key, fallback_name): """schema_dict es {name: id} (de get_schemas). Resuelve por FIELD_ALIASES.""" candidates = common.FIELD_ALIASES.get(alias_key, [fallback_name]) norm_to_id = {common.normalize_name(name): fid for name, fid in (schema_dict or {}).items()} for cand in candidates: fid = norm_to_id.get(common.normalize_name(cand)) if fid: return fid return None def resolve_opp_field_ids_by_alias(opp_fields_list, alias_key, fallback_name): """Devuelve TODOS los IDs de opp que matcheen el alias (maneja duplicados).""" candidates = common.FIELD_ALIASES.get(alias_key, [fallback_name]) target_norm = {common.normalize_name(name) for name in candidates} return [ field["id"] for field in (opp_fields_list or []) if field.get("name") and field.get("id") and common.normalize_name(field["name"]) in target_norm ] def get_contact_full(contact_id, token): """Trae el contacto individual. ``createdBy`` SOLO viene aquí: el listado paginado ``GET /contacts/`` lo omite (siempre llega None).""" data = ghl_request("GET", f"/contacts/{contact_id}", token) inner = data.get("contact") return inner if isinstance(inner, dict) else data def is_user_created_contact(contact): """True si lo creó un empleado en la sucursal (web o app móvil).""" created_by = contact.get("createdBy") or {} return created_by.get("source") in USER_SOURCES # --------------------------------------------------------------------------- # # Procesamiento por sucursal # --------------------------------------------------------------------------- # def process_location(account, *, dry_run, run_id, verifier_map): location_id = account["location_id"] token = account["token"] name = account["nombre"] local = { "user_detected": 0, "sin_sucursal": 0, "contacts_corrected": 0, "opps_corrected": 0, "skipped_locations": 0, "errors": 0, } print(f"\n{'=' * 70}\n{name} ({location_id})\n{'=' * 70}") # Valores autoritativos de Sucursal / TIENDA para esta location (Verificador). entry = (verifier_map or {}).get(location_id) or {} verif_sucursal = (entry.get("sucursal") or "").strip() verif_tienda = (entry.get("tienda") or "").strip() if entry: print(f" Verificador -> Sucursal: '{verif_sucursal}' | TIENDA: '{verif_tienda}'") else: print(" AVISO: location no está en el Verificador; no se podrán completar Sucursal/TIENDA faltantes.") schemas = get_schemas(location_id, token, "contact", "opportunity") contact_schema = schemas["contact"] opp_fields_list = sync_engine.ghl_client.get_object_schema_fields(token, location_id, "opportunity") contact_canal_id = resolve_schema_field_id(contact_schema, "canal_origen", "Canal de Origen") contact_sucursal_id = resolve_schema_field_id(contact_schema, "sucursal", "Sucursal") contact_tienda_id = resolve_schema_field_id(contact_schema, "tienda", "TIENDA") if not contact_canal_id: print(" SKIP: falta el campo de contacto 'Canal de Origen'") local["skipped_locations"] = 1 return local opp_sucursal_ids = resolve_opp_field_ids_by_alias(opp_fields_list, "sucursal", "Sucursal") opp_tienda_ids = resolve_opp_field_ids_by_alias(opp_fields_list, "tienda", "TIENDA") opp_canal_ids = resolve_opp_field_ids(opp_fields_list, OPP_CANAL_FIELD) contacts = get_all_contacts(location_id, token) print(f" Contactos totales: {len(contacts)}") # --- Fase 1: createdBy.source SOLO viene en el GET individual; el listado # paginado lo omite. Hacemos un GET por contacto para clasificar fielmente. --- dist = Counter() user_contacts = [] for summary in contacts: if not script_audit.wait_if_paused_or_stopped(run_id): print("\n Detención segura solicitada durante el escaneo.") break full = get_contact_full(summary["id"], token) src = (full.get("createdBy") or {}).get("source") dist[src or "(vacío)"] += 1 if is_user_created_contact(full): user_contacts.append(full) print(" Distribución createdBy.source (GET individual):") for src, cnt in dist.most_common(): marker = " <-- objetivo (creado por usuario)" if src in USER_SOURCES else "" print(f" {src}: {cnt}{marker}") local["user_detected"] = len(user_contacts) print(f" Creados por usuario (WEB_USER + MOBILE_USER) a corregir: {local['user_detected']}") # --- Fase 2: corrección contacto -> oportunidades (sobre los full ya leídos) --- for contact in user_contacts: if not script_audit.wait_if_paused_or_stopped(run_id): print("\n Detención segura solicitada. Saliendo antes del siguiente contacto.") break cid = contact["id"] display = contact_display_name(contact) contact_touched = False # Tag de origen único: dejar 'sucursal', quitar formulario / facebook-ads. if safe_add_contact_tag(run_id, location_id, contact, SUCURSAL_TAG, token, dry_run): contact_touched = True print(f" [contacto] {display} TAG+ {SUCURSAL_TAG}") for tag in ORIGIN_TAGS_TO_REMOVE: if safe_remove_contact_tag(run_id, location_id, contact, tag, token, dry_run): contact_touched = True print(f" [contacto] {display} TAG- {tag}") # Canal de Origen del contacto = SUCURSAL (NO se toca Fuente de Prospecto). if safe_update_contact_field(run_id, location_id, contact, contact_canal_id, "Canal de Origen", CONTACT_CANAL_VALUE, token, dry_run): contact_touched = True print(f" [contacto] {display} Canal de Origen -> {CONTACT_CANAL_VALUE}") # Sucursal / TIENDA del contacto: usar el valor existente; si falta, # completarlo desde el Verificador (la sucursal es determinística). target_sucursal = (get_custom_field_value(contact, contact_sucursal_id) or "").strip() if contact_sucursal_id else "" target_tienda = (get_custom_field_value(contact, contact_tienda_id) or "").strip() if contact_tienda_id else "" if not target_sucursal and verif_sucursal and contact_sucursal_id: if safe_update_contact_field(run_id, location_id, contact, contact_sucursal_id, "Sucursal", verif_sucursal, token, dry_run): contact_touched = True print(f" [contacto] {display} Sucursal -> {verif_sucursal} (Verificador)") target_sucursal = verif_sucursal if not target_tienda and verif_tienda and contact_tienda_id: if safe_update_contact_field(run_id, location_id, contact, contact_tienda_id, "TIENDA", verif_tienda, token, dry_run): contact_touched = True print(f" [contacto] {display} TIENDA -> {verif_tienda} (Verificador)") target_tienda = verif_tienda if not target_sucursal: local["sin_sucursal"] += 1 print(f" [contacto] {display} sin Sucursal y sin dato en Verificador: no se propaga a opps") if contact_touched: local["contacts_corrected"] += 1 # ---------- OPORTUNIDADES DESPUÉS (todas) ---------- for opp_summary in get_all_opportunities_for_contact(location_id, cid, token): opp_id = opp_summary.get("id") if not opp_id: continue opportunity = get_opportunity(location_id, opp_id, token) or opp_summary opp_touched = False # Canal de Origen de la Oportunidad = Sucursal. for field_id in opp_canal_ids: if safe_update_opportunity_field(run_id, location_id, opp_id, opportunity, field_id, OPP_CANAL_FIELD, OPP_CANAL_VALUE, token, dry_run): opp_touched = True # Propaga Sucursal / TIENDA del contacto a la oportunidad. if target_sucursal: for field_id in opp_sucursal_ids: if safe_update_opportunity_field(run_id, location_id, opp_id, opportunity, field_id, "Sucursal", target_sucursal, token, dry_run): opp_touched = True if target_tienda: for field_id in opp_tienda_ids: if safe_update_opportunity_field(run_id, location_id, opp_id, opportunity, field_id, "TIENDA", target_tienda, token, dry_run): opp_touched = True if opp_touched: local["opps_corrected"] += 1 print(f" [opp] {opp_id} | {opportunity.get('name') or display} -> Sucursal") print(f" -> {name}: usuario={local['user_detected']} (sin Sucursal={local['sin_sucursal']}), " f"contactos corregidos={local['contacts_corrected']}, " f"opps corregidas={local['opps_corrected']}") return local # --------------------------------------------------------------------------- # # CLI # --------------------------------------------------------------------------- # def main(): if hasattr(sys.stdout, "reconfigure"): sys.stdout.reconfigure(encoding="utf-8") parser = argparse.ArgumentParser( description="Corrige Canal de Origen/Sucursal/tags de contactos creados por usuario " "(WEB_USER + MOBILE_USER) en sucursales. Dry-run por defecto." ) parser.add_argument("--location", help="Location ID de una sucursal específica") parser.add_argument("--all", action="store_true", help="Procesa todas las sucursales productivas (excluye Marca y demos)") parser.add_argument("--apply", action="store_true", help="Aplica los cambios en el CRM. Sin este flag corre en dry-run.") parser.add_argument("--run-id", help="ID de auditoría (lo suministra el dashboard; " "en CLI se crea uno si se aplica)") args = parser.parse_args() dry_run = not args.apply run_id = args.run_id accounts = select_branch_locations(args) verifier_map = load_verifier_map() print("=" * 70) print("SUPER SCRIPT - ORIGEN SUCURSAL (contactos creados por usuario)") print("=" * 70) print(f"Modo: {'DRY-RUN (sin cambios)' if dry_run else 'APPLY (escribe en el CRM)'}") print(f"Criterio: createdBy.source in {sorted(USER_SOURCES)}") print(f"Sucursales objetivo: {len(accounts)} (Marca y demos excluidas)") # Run de auditoría idempotente (no-op si el dashboard ya lo creó). Solo al aplicar. if not dry_run and run_id: script_audit.create_run(run_id, SCRIPT_NAME, arguments=" ".join(sys.argv[1:]), locations=[acc["location_id"] for acc in accounts]) grand = {"user_detected": 0, "sin_sucursal": 0, "contacts_corrected": 0, "opps_corrected": 0, "skipped_locations": 0, "errors": 0} for account in accounts: try: local = process_location(account, dry_run=dry_run, run_id=run_id, verifier_map=verifier_map) for key in grand: grand[key] += local.get(key, 0) except Exception as exc: # noqa: BLE001 - continuar con las demás sucursales grand["errors"] += 1 print(f"\nERROR en {account['nombre']}: {exc}") print("\n" + "=" * 70) print(f"RESUMEN: creados por usuario={grand['user_detected']} " f"(sin Sucursal={grand['sin_sucursal']}), " f"contactos corregidos={grand['contacts_corrected']}, " f"opps corregidas={grand['opps_corrected']}, " f"locations omitidas={grand['skipped_locations']}, errores={grand['errors']}") if dry_run: print("Dry-run terminado. Revisa el plan y vuelve a correr con --apply --run-id para aplicar.") if not dry_run and run_id: script_audit.update_run_status( run_id, "failed" if grand["errors"] else "success", f"{grand['errors']} errores" if grand["errors"] else None, ) if grand["errors"]: raise SystemExit(1) if __name__ == "__main__": main()