271 lines
9.0 KiB
Python
271 lines
9.0 KiB
Python
#!/usr/bin/env python3
|
|
"""Correct branch contacts created manually by WEB_USER to sucursal origin."""
|
|
|
|
import argparse
|
|
from urllib.parse import quote
|
|
|
|
from tag_canal_origen_workflow import (
|
|
contact_display_name,
|
|
get_all_contacts,
|
|
get_custom_field_value,
|
|
get_schemas,
|
|
get_opportunity,
|
|
ghl_request,
|
|
load_locations,
|
|
safe_update_contact_field,
|
|
safe_update_opportunity_field,
|
|
script_audit,
|
|
)
|
|
|
|
SUCURSAL_TAG = "sucursal"
|
|
DIGITAL_TAGS = ["formulario", "facebook-ads"]
|
|
WEB_USER_SOURCE = "WEB_USER"
|
|
|
|
CONTACT_CANAL_VALUE = "SUCURSAL"
|
|
CONTACT_FUENTE_VALUE = "SUCURSAL"
|
|
OPPORTUNITY_CANAL_VALUE = "Sucursal"
|
|
OPPORTUNITY_FUENTE_VALUE = "SUCURSAL"
|
|
LEAD_DIGITAL_VALUE = "LEAD DIGITAL"
|
|
|
|
|
|
def select_branch_locations(args):
|
|
branches = load_locations(include_main=False)
|
|
if args.location:
|
|
matches = [acc for acc in branches if acc["location_id"] == args.location]
|
|
if not matches:
|
|
raise SystemExit(f"Location {args.location} is not a branch location in the CSV")
|
|
return matches
|
|
if args.all:
|
|
return branches
|
|
raise SystemExit("Specify --location <branch-location-id> or --all. Use --dry-run to preview without changes.")
|
|
|
|
|
|
def is_web_user_contact(contact):
|
|
created_by = contact.get("createdBy") or {}
|
|
return created_by.get("source") == WEB_USER_SOURCE
|
|
|
|
|
|
def normalized_tags(contact):
|
|
return {str(tag).lower(): str(tag) for tag in contact.get("tags", []) or []}
|
|
|
|
|
|
def update_contact_tags(contact, tags):
|
|
contact["tags"] = tags
|
|
|
|
|
|
def add_contact_tag(contact_id, tag, token):
|
|
ghl_request("POST", f"/contacts/{contact_id}/tags", token, json_body={"tags": [tag]})
|
|
|
|
|
|
def remove_contact_tag(location_id, contact_id, tag, token):
|
|
encoded_tag = quote(tag, safe="")
|
|
ghl_request("DELETE", f"/contacts/{contact_id}/tags/{encoded_tag}", token, params={"locationId": location_id})
|
|
|
|
|
|
def safe_add_contact_tag(run_id, location_id, contact, tag, token, dry_run=False):
|
|
tag_map = normalized_tags(contact)
|
|
if tag.lower() in tag_map:
|
|
return False
|
|
|
|
old_tags = list(contact.get("tags", []) or [])
|
|
new_tags = old_tags + [tag]
|
|
change_id = script_audit.record_change(run_id, location_id, "contact", contact["id"], f"tag:{tag}", "tags", old_tags, new_tags)
|
|
if dry_run:
|
|
update_contact_tags(contact, new_tags)
|
|
return True
|
|
|
|
try:
|
|
add_contact_tag(contact["id"], tag, token)
|
|
script_audit.mark_change(change_id, "applied")
|
|
update_contact_tags(contact, new_tags)
|
|
return True
|
|
except Exception as exc:
|
|
script_audit.mark_change(change_id, "failed", str(exc))
|
|
raise
|
|
|
|
|
|
def safe_remove_contact_tag(run_id, location_id, contact, tag, token, dry_run=False):
|
|
tag_map = normalized_tags(contact)
|
|
existing_tag = tag_map.get(tag.lower())
|
|
if not existing_tag:
|
|
return False
|
|
|
|
old_tags = list(contact.get("tags", []) or [])
|
|
new_tags = [current for current in old_tags if str(current).lower() != tag.lower()]
|
|
change_id = script_audit.record_change(run_id, location_id, "contact", contact["id"], f"tag:{tag}", "tags", old_tags, new_tags)
|
|
if dry_run:
|
|
update_contact_tags(contact, new_tags)
|
|
return True
|
|
|
|
try:
|
|
remove_contact_tag(location_id, contact["id"], existing_tag, token)
|
|
script_audit.mark_change(change_id, "applied")
|
|
update_contact_tags(contact, new_tags)
|
|
return True
|
|
except Exception as exc:
|
|
script_audit.mark_change(change_id, "failed", str(exc))
|
|
raise
|
|
|
|
|
|
def get_contact_opportunities(location_id, contact_id, token):
|
|
try:
|
|
data = ghl_request(
|
|
"POST",
|
|
"/opportunities/search",
|
|
token,
|
|
json_body={"locationId": location_id, "contactId": contact_id, "limit": 100},
|
|
)
|
|
except Exception as exc:
|
|
print(f" WARN no se pudieron consultar oportunidades: {exc}")
|
|
return []
|
|
return data.get("opportunities", []) or []
|
|
|
|
|
|
def oldest_opportunity(opportunities):
|
|
if not opportunities:
|
|
return None
|
|
return sorted(opportunities, key=lambda opp: opp.get("dateAdded") or "9999-12-31T23:59:59Z")[0]
|
|
|
|
|
|
def update_oldest_opportunity(run_id, location_id, contact, opportunity_schema, token, dry_run=False):
|
|
opportunities = get_contact_opportunities(location_id, contact["id"], token)
|
|
oldest = oldest_opportunity(opportunities)
|
|
if not oldest:
|
|
return False
|
|
|
|
opportunity_id = oldest.get("id")
|
|
opportunity = get_opportunity(location_id, opportunity_id, token) or oldest
|
|
updated = False
|
|
|
|
field_updates = [
|
|
("Canal de Origen de la Oportunidad", OPPORTUNITY_CANAL_VALUE, "always"),
|
|
("Fuente de Prospecto", OPPORTUNITY_FUENTE_VALUE, "always"),
|
|
("Fuente del cliente potencial", OPPORTUNITY_FUENTE_VALUE, "lead_digital_only"),
|
|
]
|
|
|
|
for field_name, value, mode in field_updates:
|
|
field_id = opportunity_schema.get(field_name)
|
|
if not field_id:
|
|
continue
|
|
current_value = get_custom_field_value(opportunity, field_id)
|
|
if mode == "lead_digital_only" and current_value != LEAD_DIGITAL_VALUE:
|
|
continue
|
|
if safe_update_opportunity_field(run_id, location_id, opportunity_id, opportunity, field_id, field_name, value, token, dry_run):
|
|
updated = True
|
|
|
|
return updated
|
|
|
|
|
|
def process_location(account, dry_run=False, run_id=None):
|
|
location_id = account["location_id"]
|
|
token = account["token"]
|
|
name = account["nombre"]
|
|
|
|
schemas = get_schemas(location_id, token, "contact", "opportunity")
|
|
contact_schema = schemas["contact"]
|
|
opportunity_schema = schemas["opportunity"]
|
|
contact_canal_id = contact_schema.get("Canal de Origen")
|
|
contact_fuente_id = contact_schema.get("Fuente de Prospecto")
|
|
|
|
if not contact_canal_id or not contact_fuente_id:
|
|
print(
|
|
f"\n{name} - missing required contact fields "
|
|
f"(Canal: {bool(contact_canal_id)}, Fuente: {bool(contact_fuente_id)})"
|
|
)
|
|
return 0
|
|
|
|
contacts = get_all_contacts(location_id, token)
|
|
web_user_contacts = 0
|
|
corrected_contacts = 0
|
|
updated_opportunities = 0
|
|
skipped_opportunities = 0
|
|
|
|
for contact in contacts:
|
|
if not script_audit.wait_if_paused_or_stopped(run_id):
|
|
print("\nDetención segura solicitada. Saliendo antes del siguiente contacto.")
|
|
break
|
|
if not is_web_user_contact(contact):
|
|
continue
|
|
|
|
web_user_contacts += 1
|
|
contact_changed = False
|
|
|
|
if safe_add_contact_tag(run_id, location_id, contact, SUCURSAL_TAG, token, dry_run):
|
|
contact_changed = True
|
|
for tag in DIGITAL_TAGS:
|
|
if safe_remove_contact_tag(run_id, location_id, contact, tag, token, dry_run):
|
|
contact_changed = True
|
|
|
|
if safe_update_contact_field(
|
|
run_id,
|
|
location_id,
|
|
contact,
|
|
contact_canal_id,
|
|
"Canal de Origen",
|
|
CONTACT_CANAL_VALUE,
|
|
token,
|
|
dry_run,
|
|
):
|
|
contact_changed = True
|
|
|
|
if safe_update_contact_field(
|
|
run_id,
|
|
location_id,
|
|
contact,
|
|
contact_fuente_id,
|
|
"Fuente de Prospecto",
|
|
CONTACT_FUENTE_VALUE,
|
|
token,
|
|
dry_run,
|
|
):
|
|
contact_changed = True
|
|
|
|
if not contact_changed:
|
|
continue
|
|
|
|
corrected_contacts += 1
|
|
print(f" OK contacto {contact_display_name(contact)} | WEB_USER -> sucursal")
|
|
|
|
if update_oldest_opportunity(run_id, location_id, contact, opportunity_schema, token, dry_run):
|
|
updated_opportunities += 1
|
|
print(" OK oportunidad mas antigua actualizada")
|
|
else:
|
|
skipped_opportunities += 1
|
|
|
|
print(
|
|
f"\n{name}: {web_user_contacts} WEB_USER detectados, "
|
|
f"{corrected_contacts} contactos corregidos, "
|
|
f"{updated_opportunities} oportunidades actualizadas, "
|
|
f"{skipped_opportunities} sin cambio de oportunidad"
|
|
)
|
|
return corrected_contacts
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Correct WEB_USER branch contacts to sucursal origin")
|
|
parser.add_argument("--location", help="Specific branch location ID to process")
|
|
parser.add_argument("--all", action="store_true", help="Process all branch locations from the CSV")
|
|
parser.add_argument("--dry-run", action="store_true", help="Preview changes without writing to GHL")
|
|
parser.add_argument("--run-id", help="Audit run ID supplied by the dashboard")
|
|
args = parser.parse_args()
|
|
|
|
print("\n" + "=" * 60)
|
|
print("GHL FIX - WEB_USER BRANCH CONTACTS -> SUCURSAL")
|
|
print("=" * 60)
|
|
if args.dry_run:
|
|
print("DRY RUN - no changes will be made\n")
|
|
|
|
total = 0
|
|
for account in select_branch_locations(args):
|
|
try:
|
|
total += process_location(account, args.dry_run, args.run_id)
|
|
except Exception as exc:
|
|
print(f"\nERROR {account['nombre']}: {exc}")
|
|
|
|
print(f"\n{'=' * 60}")
|
|
print(f"TOTAL: {total} contactos WEB_USER corregidos")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|