95 lines
3.5 KiB
Python
95 lines
3.5 KiB
Python
#!/usr/bin/env python3
|
|
"""Audit script compliance with MP custom-field safety rules."""
|
|
|
|
import argparse
|
|
import os
|
|
import re
|
|
|
|
ROOT_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
SCRIPTS_DIR = os.path.join(ROOT_DIR, "scripts")
|
|
|
|
SKIP_FILES = {"common.py", "audit_script_compliance.py"}
|
|
|
|
|
|
def read_text(path):
|
|
with open(path, "r", encoding="utf-8", errors="replace") as fh:
|
|
return fh.read()
|
|
|
|
|
|
def line_number(text, index):
|
|
return text.count("\n", 0, index) + 1
|
|
|
|
|
|
def line_at(text, index):
|
|
start = text.rfind("\n", 0, index) + 1
|
|
end = text.find("\n", index)
|
|
if end == -1:
|
|
end = len(text)
|
|
return text[start:end]
|
|
|
|
|
|
def find_issues(path):
|
|
text = read_text(path)
|
|
issues = []
|
|
uses_custom_fields = bool(re.search(r"customFields|custom_fields_json|custom_fields|field_id|fieldId", text))
|
|
mutates_custom_fields = bool(re.search(r"update_contact|create_contact|update_opportunity|create_opportunity|\"PUT\"|\"POST\"", text)) and "customFields" in text
|
|
schema_calls = bool(re.search(r"get_object_schema|get_schemas|/objects/", text))
|
|
|
|
if uses_custom_fields and not schema_calls:
|
|
issues.append((1, "Lee o compara custom fields sin resolver schema dinamico en el script."))
|
|
|
|
for match in re.finditer(r"[furbFURB]?[\"']https://services\.leadconnectorhq\.com/objects/[^\"']+|[furbFURB]?[\"']/objects/\{[^}]+\}|[furbFURB]?[\"']/objects/\w+", text):
|
|
prefix = text[max(0, match.start() - 600):match.start()]
|
|
if "/objects/" not in prefix and "get_objects_catalog" not in prefix:
|
|
issues.append((line_number(text, match.start()), "Posible llamada a schema de objeto sin consulta previa al catalogo /objects/."))
|
|
|
|
for match in re.finditer(r"customFields\s*[:=]\s*\{", text):
|
|
issues.append((line_number(text, match.start()), "customFields parece construirse como diccionario; GHL espera lista de {id, value}."))
|
|
|
|
for match in re.finditer(r"\{\s*[\"']id[\"']\s*:\s*[\"'][A-Za-z0-9_-]{8,}[\"']", text):
|
|
if line_at(text, match.start()).strip().startswith("#"):
|
|
continue
|
|
issues.append((line_number(text, match.start()), "Posible ID hardcodeado en payload de custom field."))
|
|
|
|
if mutates_custom_fields and "--dry-run" not in text and "--apply" not in text:
|
|
issues.append((1, "Script mutador de custom fields sin --dry-run o --apply evidente."))
|
|
|
|
return sorted(set(issues))
|
|
|
|
|
|
def audit_scripts():
|
|
results = []
|
|
for filename in sorted(os.listdir(SCRIPTS_DIR)):
|
|
if not filename.endswith(".py") or filename in SKIP_FILES:
|
|
continue
|
|
path = os.path.join(SCRIPTS_DIR, filename)
|
|
if not os.path.isfile(path):
|
|
continue
|
|
issues = find_issues(path)
|
|
results.append((filename, issues))
|
|
return results
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Audita cumplimiento tecnico de scripts MP.")
|
|
parser.add_argument("--fail-on-issues", action="store_true", help="Retorna codigo 1 si encuentra hallazgos.")
|
|
args = parser.parse_args()
|
|
|
|
results = audit_scripts()
|
|
issue_count = sum(len(issues) for _, issues in results)
|
|
print("=== AUDITORIA DE CUMPLIMIENTO DE SCRIPTS ===")
|
|
print(f"Scripts auditados: {len(results)}")
|
|
print(f"Hallazgos: {issue_count}\n")
|
|
|
|
for filename, issues in results:
|
|
status = "OK" if not issues else "REVISAR"
|
|
print(f"[{status}] {filename}")
|
|
for line, message in issues:
|
|
print(f" L{line}: {message}")
|
|
if issue_count and args.fail_on_issues:
|
|
raise SystemExit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|