ENG-857: Replacing Celery with Async Workers — Internal Briefing
Branch: feature/ENG-857-no-celery (agentic-backend + terraform)
Status: Deployed to HMG, ready for load testing
- App: https://github.com/Faction-V/agentic-backend/pull/410
- Terraform: https://github.com/Faction-V/gofigure_terraform/pull/630
We're replacing our Celery worker infrastructure with a lightweight asyncio-based worker. Same Redis, same queue semantics, but 5x more concurrent workflows using 4x fewer resources.
Celery was designed for CPU-bound task queues. Our workflow execution is I/O-bound (waiting on AI API responses, database queries, S3 uploads). Running one workflow per Celery worker process wastes resources — each pod sits idle while waiting for HTTP responses.
18 Celery pods x 1 workflow each = 18 concurrent workflows
18 pods x 0.5 CPU = 9 CPU total
Each Celery worker is a separate OS process with its own memory footprint, running a single workflow at a time. Most of that time is spent waiting on network I/O.
2 async worker pods x 50 workflows each = 100 concurrent workflows
2 pods x 1 CPU = 2 CPU total
Each async worker runs 50 concurrent workflows as asyncio tasks in a single process. While one workflow waits for an API response, others make progress. A semaphore caps concurrency to bound memory usage.
flowchart TB
subgraph Before["BEFORE: Celery Architecture"]
direction TB
subgraph API1["FastAPI API"]
A1["Run Workflow Request"]
end
subgraph Celery["Celery Infrastructure"]
B1[("Redis Broker")]
C1["Celery Worker 1<br>Single workflow per process"]
D1["Celery Worker 2<br>Single workflow per process"]
E1["Celery Worker N<br>Single workflow per process"]
F1["Flower UI<br>Monitoring"]
G1["Celery Exporter<br>Prometheus metrics"]
end
subgraph Mon1["Monitoring"]
H1["Prometheus"]
I1["Grafana"]
end
A1 -- "celery.delay" --> B1
B1 --> C1 & D1 & E1
F1 -. inspect .-> B1
G1 -. scrape celery .-> B1
H1 -- scrape --> F1 & G1
I1 -- query --> H1
end
subgraph After["AFTER: Async Worker Architecture"]
direction TB
subgraph API2["FastAPI API"]
A2["Run Workflow Request"]
end
subgraph Worker["Async Worker Process"]
B2[("Redis List<br>BRPOP")]
C2["Worker Pod 1<br>aiohttp + asyncio<br>50 concurrent workflows"]
D2["Worker Pod 2<br>aiohttp + asyncio<br>50 concurrent workflows"]
E2["/metrics endpoint<br>/healthz endpoint"]
end
subgraph Scaling["KEDA Autoscaler"]
K["Redis List Length Trigger"]
end
subgraph Mon2["Monitoring"]
H2["Prometheus<br>Kubernetes SD"]
I2["Grafana"]
end
A2 -- "LPUSH JSON" --> B2
B2 -- BRPOP --> C2 & D2
C2 --- E2
D2 --- E2
K -. LLEN .-> B2
K -. scale .-> C2 & D2
H2 -- "scrape :8080" --> E2
I2 -- query --> H2
end
| Component | Before | After |
|---|---|---|
| Task queue | celery.delay() |
redis.lpush() JSON job |
| Worker process | Celery worker (1 task/process) | asyncio loop (50 tasks/process) |
| Concurrency | OS process per workflow | asyncio.Task + Semaphore |
| Health check | celery inspect ping |
HTTP GET /healthz |
| Metrics | celery-exporter + Flower | Built-in /metrics (prometheus_client) |
Key files:
src/services/worker/worker.py— The async worker (replaces Celery worker)src/services/worker/queue.py— Job enqueue (replacescelery.delay())src/services/worker/executor.py— Workflow execution (extracted from Celery task)
| Component | Before | After |
|---|---|---|
| Worker deployment | celery-worker (18 replicas) |
workflow-worker (2 replicas) |
| Monitoring sidecars | Flower + celery-exporter | None (built into worker) |
| KEDA trigger | Prometheus query (celery metrics) | Native Redis LLEN |
| Resource per pod | 0.5 CPU / 1.5Gi | 1 CPU / 3Gi |
| Total resources | 9 CPU / 27Gi (18 pods) | 2 CPU / 6Gi (2 pods) |
- Celery app configuration and all Celery dependencies
- Flower deployment and service
- celery-exporter deployment and service
flowerandcelery[redis]frompyproject.toml
We replaced the Celery monitoring stack with built-in Prometheus metrics:
| Metric | Type | What it measures |
|---|---|---|
workflow_jobs_active |
Gauge | Currently executing workflows |
workflow_jobs_queued |
Gauge | Jobs waiting in Redis queue |
workflow_jobs_total{status} |
Counter | Total jobs by outcome (success/failure/cancelled) |
workflow_job_duration_seconds |
Histogram | Execution time distribution |
Grafana dashboard with 10 panels: active workflows, queue depth, success rate, throughput, duration percentiles, per-pod breakdown, and queue stall alerting.
Built-in benchmark mode for demonstrating concurrency without consuming AI tokens:
# In the terraform dir:
just load-test-start 200 # Enable benchmark mode, push 200 synthetic jobs
just load-test-status # Check queue depth, active jobs, pod count
just load-test-stop # Drain queue, restore normal modeBenchmark mode (WORKER_BENCHMARK_MODE=1) replaces real workflow execution with random 2-30s sleeps, exercising the full queue/metrics/scaling pipeline.
Same 5-minute grace period as Celery:
- SIGTERM received → worker stops accepting new jobs
- In-flight workflows continue executing
asyncio.wait(pending, timeout=300)— 5 min to finish- Kubernetes
terminationGracePeriodSeconds=300matches
- Load test on HMG to validate concurrency
- Monitor Grafana dashboard during real workflow runs
- Roll out to staging environments
- Update production workspaces