Skip to content

Instantly share code, notes, and snippets.

@panchicore
Created May 11, 2026 22:52
Show Gist options
  • Select an option

  • Save panchicore/2b01e2059c3eba02521db1cc69a03ff1 to your computer and use it in GitHub Desktop.

Select an option

Save panchicore/2b01e2059c3eba02521db1cc69a03ff1 to your computer and use it in GitHub Desktop.
Profile rename-validate pipeline — find slow analyzer steps (used for PR #3103)
#!/usr/bin/env python3
"""
Profile the rename-validate pipeline to find which analyzer step is slow.
Usage:
cd src/backend && python ../../scripts/profile_rename_validate.py \
--org-cuid cmf7c8pup000o0faz59yxbc62 \
--resource-type model \
--field-key associatedcrid
Requires: Flask app context (reads from the same DB as the running API).
"""
import argparse
import time
from contextlib import contextmanager
@contextmanager
def timer(label: str):
start = time.perf_counter()
yield
elapsed = time.perf_counter() - start
print(f" {elapsed:8.3f}s {label}")
def profile_validate(org_cuid: str, resource_type: str, field_key: str):
from api import app
with app.app_context():
from db.organization import Organization
org = Organization.get_by_cuid(org_cuid)
if not org:
print(f"ERROR: org {org_cuid} not found")
return
print(f"Org: {org.name} (id={org.id})")
print(f"Field: {resource_type}/{field_key}")
print()
# --- 1. Key validation ---
print("=== Key Validation ===")
with timer("CustomFieldKeyValidator.validate_rename_request"):
from custom_fields.validators.key_validator import CustomFieldKeyValidator
validator = CustomFieldKeyValidator(org, resource_type)
errors = validator.validate_rename_request(field_key, "___profile_test___")
if errors:
print(f" (validation errors: {errors})")
print()
# --- 2. Usage analysis (the slow part) ---
print("=== Usage Analysis (per-analyzer breakdown) ===")
from custom_fields.analyzers.usage_analyzer import FieldUsageAnalyzer
analyzer = FieldUsageAnalyzer(org, resource_type)
steps = [
("_analyze_schemas", field_key),
("_analyze_instances", field_key),
("_analyze_workflows", field_key),
("_analyze_permissions", field_key),
("_analyze_formulas", field_key),
("_analyze_page_layouts", field_key),
("_analyze_dashboards", field_key),
("_analyze_workflow_details", field_key),
("_analyze_workflow_executions", field_key),
("_analyze_saved_view_filters", field_key),
]
results = {}
total_start = time.perf_counter()
for method_name, arg in steps:
method = getattr(analyzer, method_name)
start = time.perf_counter()
result = method(arg)
elapsed = time.perf_counter() - start
results[method_name] = (elapsed, result)
print(f" {elapsed:8.3f}s {method_name} → {result}")
start = time.perf_counter()
ft_count, ft_details = analyzer._get_finding_type_config_data(field_key)
elapsed = time.perf_counter() - start
results["_get_finding_type_config_data"] = (elapsed, ft_count)
print(f" {elapsed:8.3f}s _get_finding_type_config_data → {ft_count}")
total_elapsed = time.perf_counter() - total_start
print()
print(f" Total analysis: {total_elapsed:.3f}s")
# --- 3. Rank by time ---
print()
print("=== Ranking (slowest first) ===")
ranked = sorted(results.items(), key=lambda x: x[1][0], reverse=True)
for method_name, (elapsed, result) in ranked:
pct = (elapsed / total_elapsed * 100) if total_elapsed > 0 else 0
bar = "█" * int(pct / 2)
print(f" {elapsed:8.3f}s ({pct:5.1f}%) {bar} {method_name} → {result}")
# --- 4. Row counts for context ---
print()
print("=== Row counts (for context) ===")
from db import db
from sqlalchemy import text
db.session.rollback()
queries = {
"workflows_execution_steps (org)": f"""
SELECT COUNT(*) FROM workflows_execution_steps wes
JOIN workflow_executions we ON wes.execution_id = we.id
JOIN workflows w ON we.workflow_id = w.id
WHERE w.organization_id = {org.id}
""",
"workflows_execution_steps (total)": "SELECT COUNT(*) FROM workflows_execution_steps",
"inventory_models (org)": f"SELECT COUNT(*) FROM inventory_models WHERE organization_id = {org.id}",
"saved_views (org)": f"SELECT COUNT(*) FROM saved_views WHERE organization_id = {org.id}",
"dashboard_visualizations (org)": f"SELECT COUNT(*) FROM dashboard_visualizations WHERE organization_id = {org.id}",
"role_permissions (org)": f"SELECT COUNT(*) FROM role_permissions WHERE organization_id = {org.id}",
}
for label, query in queries.items():
try:
count = db.session.execute(text(query)).scalar()
print(f" {count:>10,} {label}")
except Exception as e:
print(f" {'ERROR':>10} {label}: {e}")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Profile rename-validate pipeline")
parser.add_argument("--org-cuid", required=True)
parser.add_argument("--resource-type", default="model")
parser.add_argument("--field-key", required=True)
args = parser.parse_args()
profile_validate(args.org_cuid, args.resource_type, args.field_key)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment