Skip to content

Instantly share code, notes, and snippets.

@jordotech
Last active March 21, 2026 20:58
Show Gist options
  • Select an option

  • Save jordotech/b607f58bb68b7492d96bcfc8abdf0a9a to your computer and use it in GitHub Desktop.

Select an option

Save jordotech/b607f58bb68b7492d96bcfc8abdf0a9a to your computer and use it in GitHub Desktop.
ENG-857: Replacing Celery with Async Workers — Internal Briefing

ENG-857: Replacing Celery with Async Workers — Internal Briefing

Replacing Celery with Async Workers (ENG-857)

Branch: feature/ENG-857-no-celery (agentic-backend + terraform) Status: Deployed to HMG, ready for load testing

Pull Requests

TL;DR

We're replacing our Celery worker infrastructure with a lightweight asyncio-based worker. Same Redis, same queue semantics, but 5x more concurrent workflows using 4x fewer resources.

Why?

Celery was designed for CPU-bound task queues. Our workflow execution is I/O-bound (waiting on AI API responses, database queries, S3 uploads). Running one workflow per Celery worker process wastes resources — each pod sits idle while waiting for HTTP responses.

The Problem

18 Celery pods x 1 workflow each = 18 concurrent workflows
18 pods x 0.5 CPU = 9 CPU total

Each Celery worker is a separate OS process with its own memory footprint, running a single workflow at a time. Most of that time is spent waiting on network I/O.

The Solution

2 async worker pods x 50 workflows each = 100 concurrent workflows
2 pods x 1 CPU = 2 CPU total

Each async worker runs 50 concurrent workflows as asyncio tasks in a single process. While one workflow waits for an API response, others make progress. A semaphore caps concurrency to bound memory usage.

Architecture

Before: Celery

side-by-side mermaid chart

flowchart TB
 subgraph Before["BEFORE: Celery Architecture"]
   direction TB
   subgraph API1["FastAPI API"]
        A1["Run Workflow Request"]
   end
   subgraph Celery["Celery Infrastructure"]
        B1[("Redis Broker")]
        C1["Celery Worker 1<br>Single workflow per process"]
        D1["Celery Worker 2<br>Single workflow per process"]
        E1["Celery Worker N<br>Single workflow per process"]
        F1["Flower UI<br>Monitoring"]
        G1["Celery Exporter<br>Prometheus metrics"]
   end
   subgraph Mon1["Monitoring"]
        H1["Prometheus"]
        I1["Grafana"]
   end
    A1 -- "celery.delay" --> B1
    B1 --> C1 & D1 & E1
    F1 -. inspect .-> B1
    G1 -. scrape celery .-> B1
    H1 -- scrape --> F1 & G1
    I1 -- query --> H1
 end

 subgraph After["AFTER: Async Worker Architecture"]
   direction TB
   subgraph API2["FastAPI API"]
        A2["Run Workflow Request"]
   end
   subgraph Worker["Async Worker Process"]
        B2[("Redis List<br>BRPOP")]
        C2["Worker Pod 1<br>aiohttp + asyncio<br>50 concurrent workflows"]
        D2["Worker Pod 2<br>aiohttp + asyncio<br>50 concurrent workflows"]
        E2["/metrics endpoint<br>/healthz endpoint"]
   end
   subgraph Scaling["KEDA Autoscaler"]
        K["Redis List Length Trigger"]
   end
   subgraph Mon2["Monitoring"]
        H2["Prometheus<br>Kubernetes SD"]
        I2["Grafana"]
   end
    A2 -- "LPUSH JSON" --> B2
    B2 -- BRPOP --> C2 & D2
    C2 --- E2
    D2 --- E2
    K -. LLEN .-> B2
    K -. scale .-> C2 & D2
    H2 -- "scrape :8080" --> E2
    I2 -- query --> H2
 end

Loading

What Changed

Application Code (agentic-backend)

Component Before After
Task queue celery.delay() redis.lpush() JSON job
Worker process Celery worker (1 task/process) asyncio loop (50 tasks/process)
Concurrency OS process per workflow asyncio.Task + Semaphore
Health check celery inspect ping HTTP GET /healthz
Metrics celery-exporter + Flower Built-in /metrics (prometheus_client)

Key files:

  • src/services/worker/worker.py — The async worker (replaces Celery worker)
  • src/services/worker/queue.py — Job enqueue (replaces celery.delay())
  • src/services/worker/executor.py — Workflow execution (extracted from Celery task)

Infrastructure (terraform)

Component Before After
Worker deployment celery-worker (18 replicas) workflow-worker (2 replicas)
Monitoring sidecars Flower + celery-exporter None (built into worker)
KEDA trigger Prometheus query (celery metrics) Native Redis LLEN
Resource per pod 0.5 CPU / 1.5Gi 1 CPU / 3Gi
Total resources 9 CPU / 27Gi (18 pods) 2 CPU / 6Gi (2 pods)

Removed

  • Celery app configuration and all Celery dependencies
  • Flower deployment and service
  • celery-exporter deployment and service
  • flower and celery[redis] from pyproject.toml

Observability

We replaced the Celery monitoring stack with built-in Prometheus metrics:

Metric Type What it measures
workflow_jobs_active Gauge Currently executing workflows
workflow_jobs_queued Gauge Jobs waiting in Redis queue
workflow_jobs_total{status} Counter Total jobs by outcome (success/failure/cancelled)
workflow_job_duration_seconds Histogram Execution time distribution

Grafana dashboard with 10 panels: active workflows, queue depth, success rate, throughput, duration percentiles, per-pod breakdown, and queue stall alerting.

HMG grafana dashboard

screencapture-agentic-backend-grafana-hmg-eu-west-2-capitol-ai-d-953a93b3-7d07-4182-a264-3185ec1adc11-workflow-worker-2026-03-21-15_53_11

Load Testing

Built-in benchmark mode for demonstrating concurrency without consuming AI tokens:

# In the terraform dir:
just load-test-start 200   # Enable benchmark mode, push 200 synthetic jobs
just load-test-status       # Check queue depth, active jobs, pod count
just load-test-stop         # Drain queue, restore normal mode

Benchmark mode (WORKER_BENCHMARK_MODE=1) replaces real workflow execution with random 2-30s sleeps, exercising the full queue/metrics/scaling pipeline.

Graceful Shutdown

Same 5-minute grace period as Celery:

  1. SIGTERM received → worker stops accepting new jobs
  2. In-flight workflows continue executing
  3. asyncio.wait(pending, timeout=300) — 5 min to finish
  4. Kubernetes terminationGracePeriodSeconds=300 matches

Next Steps

  • Load test on HMG to validate concurrency
  • Monitor Grafana dashboard during real workflow runs
  • Roll out to staging environments
  • Update production workspaces
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment