Skip to content

Instantly share code, notes, and snippets.

@antonmry
Created May 6, 2026 14:09
Show Gist options
  • Select an option

  • Save antonmry/7f2355b83cd1cf2c37ea92bea7cd245b to your computer and use it in GitHub Desktop.

Select an option

Save antonmry/7f2355b83cd1cf2c37ea92bea7cd245b to your computer and use it in GitHub Desktop.

datafusion-tracing-drop-repro

Minimal reproducer for datafusion-tracing issue #27: "Execution plan takes a long time to drop".

The bug

When datafusion-tracing is used together with an OpenTelemetry subscriber backed by a SimpleSpanProcessor, dropping an Arc<dyn ExecutionPlan> can block the calling thread for several seconds, pinning the CPU.

The reporter observed:

2025-10-25T02:19:11Z  dropping plan now, strong_count: 1, weak_count: 0
2025-10-25T02:19:20Z  background thread drop completed

~8 seconds to drop a plan that has no remaining references.

Root cause

datafusion-tracing wraps every physical plan node in an InstrumentedExec. Each InstrumentedExec creates one OTel span. When the root plan is dropped, all node spans are closed in sequence.

SimpleSpanProcessor::on_end drives the exporter synchronously via futures_executor::block_on, so every span close blocks the calling thread for the full OTLP round-trip:

plan_drop_time ≈ N_nodes × OTLP_round_trip_latency

With a query plan that has 40 nodes and a collector responding in 200 ms (e.g. Jaeger under load, logfire, a remote backend):

40 nodes × 200 ms = 8 seconds

This matches exactly what the reporter saw.

Fix

Replace SimpleSpanProcessor with BatchSpanProcessor.

The batch processor sends spans to a channel (non-blocking) and exports them on a dedicated background OS thread. drop() returns immediately regardless of exporter latency.

Reproducer

The program runs three steps:

Step What it shows
1 — count How many OTel spans one query produces (= number of plan nodes)
2 — problem SimpleSpanProcessor: drop() blocks for N × latency
3 — fix BatchSpanProcessor: drop() returns instantly

SIMULATED_LATENCY (default 50 ms/span) simulates a real OTLP backend. Scale it up to reproduce the reported 8–10 s delay.

Run

cargo run

To test with a real Jaeger instance (step 2b):

docker run --rm -p 4317:4317 -p 16686:16686 \
  cr.jaegertracing.io/jaegertracing/jaeger:2.14.0

cargo run -- --otlp

Output (50 ms simulated latency, 6-node plan)

=== Step 1: count spans produced by one query ===
  query plan nodes wrapped by InstrumentedExec: 6
  → dropping the plan closes 6 OTel spans
  → with SimpleSpanProcessor + 50ms/span latency: expected drop time = 6 × 50ms = 300ms

=== Step 2: problem — SimpleSpanProcessor (synchronous export) ===
  drop() blocked for: 328ms  (exported 6 spans × 50ms)

=== Step 3: fix — BatchSpanProcessor (async export) ===
  drop() returned in:  456µs  ← instant, spans queued
  background flush:     55ms  (exported 6 spans)

Step 2 matches the prediction from step 1. Step 3 shows drop() is 700× faster with BatchSpanProcessor; the same export work happens off the hot path.

Query used

SELECT sum(numeric_value) FROM my_table
WHERE filter_column_1 = 'filter_value_1'
  AND filter_column_2 = 'filter_value_2'

This matches the aggregate + filter shape described in the issue. A more complex query (joins, CTEs, repartitions) would produce more plan nodes and a proportionally longer drop time with SimpleSpanProcessor.

[package]
name = "datafusion-tracing-drop-repro"
version = "0.1.0"
edition = "2024"
[dependencies]
datafusion = "53"
datafusion-tracing = "53.0.0"
futures = "0.3"
opentelemetry = { version = "0.31", features = ["trace"] }
opentelemetry-otlp = { version = "0.31", features = ["grpc-tonic"] }
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio", "rt-tokio-current-thread", "testing"] }
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-opentelemetry = { version = "0.32" }
tracing-subscriber = { version = "0.3", features = ["registry"] }
/// Repro for https://github.com/datafusion-contrib/datafusion-tracing/issues/27
///
/// "Execution plan takes a long time to drop"
///
/// When datafusion-tracing is used with a real OTel subscriber and a
/// SimpleSpanProcessor, dropping an Arc<dyn ExecutionPlan> can take seconds.
///
/// Root cause
/// ----------
/// Each InstrumentedExec node creates one OTel span. When the plan is dropped,
/// every node's span is closed. SimpleSpanProcessor::on_end() drives the
/// exporter synchronously via futures_executor::block_on, so:
///
/// plan_drop_time ≈ N_nodes × OTLP_round_trip_latency
///
/// With a moderate OTLP latency (e.g. 200 ms to Jaeger under load) and a
/// query plan with 40 nodes, that is 8 seconds — exactly as reported.
///
/// Fix
/// ---
/// Replace SimpleSpanProcessor with BatchSpanProcessor. The batch processor
/// queues spans in a channel (non-blocking) and exports them on a background
/// thread. drop() returns immediately regardless of exporter latency.
///
/// This program shows all three steps:
/// Step 1 — count: how many spans does one query create?
/// Step 2 — problem: slow drop with SimpleSpanProcessor
/// Step 3 — fix: instant drop with BatchSpanProcessor
///
/// Usage:
/// cargo run runs steps 1-3 (self-contained)
/// cargo run -- --otlp step 2 with a real Jaeger on localhost:4317
use datafusion::arrow::array::{Float64Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion::execution::SessionStateBuilder;
use datafusion::prelude::*;
use datafusion_tracing::{InstrumentationOptions, instrument_with_info_spans};
use futures::StreamExt;
use opentelemetry::KeyValue;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::Resource;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::{
BatchSpanProcessor, InMemorySpanExporter, SdkTracerProvider, Sampler,
SimpleSpanProcessor, SpanData,
};
use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};
use tracing_opentelemetry::OpenTelemetryLayer;
use tracing_subscriber::prelude::*;
const QUERY: &str = "SELECT sum(numeric_value) FROM my_table \
WHERE filter_column_1 = 'filter_value_1' \
AND filter_column_2 = 'filter_value_2'";
const N_ROWS: usize = 100_000;
// ── simulated exporter latency ───────────────────────────────────────────────
// 50 ms per span simulates a moderate OTLP round-trip (e.g. Jaeger under load).
// Scale up to reproduce the reporter's 8–10 s delay.
const SIMULATED_LATENCY: Duration = Duration::from_millis(50);
// ── data helpers ─────────────────────────────────────────────────────────────
fn make_batch() -> RecordBatch {
let schema = Arc::new(Schema::new(vec![
Field::new("numeric_value", DataType::Float64, false),
Field::new("filter_column_1", DataType::Utf8, false),
Field::new("filter_column_2", DataType::Utf8, false),
]));
RecordBatch::try_new(
schema,
vec![
Arc::new(Float64Array::from(vec![1.0f64; N_ROWS])),
Arc::new(StringArray::from(vec!["filter_value_1"; N_ROWS])),
Arc::new(StringArray::from(vec!["filter_value_2"; N_ROWS])),
],
)
.unwrap()
}
async fn make_ctx() -> SessionContext {
let instrument_rule = instrument_with_info_spans!(
options: InstrumentationOptions {
record_metrics: true,
..Default::default()
}
);
let state = SessionStateBuilder::new()
.with_default_features()
.with_physical_optimizer_rule(instrument_rule)
.build();
let ctx = SessionContext::new_with_state(state);
ctx.register_batch("my_table", make_batch()).unwrap();
ctx
}
// ── SlowExporter ─────────────────────────────────────────────────────────────
// Simulates a slow OTLP backend (network I/O, collector under load, etc.).
// std::thread::sleep is intentional: the export future must block the calling
// thread so that SimpleSpanProcessor's block_on() feels the full latency.
#[derive(Debug)]
struct SlowExporter {
latency: Duration,
exported: Arc<AtomicUsize>,
}
impl SlowExporter {
fn new(latency: Duration) -> (Self, Arc<AtomicUsize>) {
let counter = Arc::new(AtomicUsize::new(0));
(Self { latency, exported: counter.clone() }, counter)
}
}
impl opentelemetry_sdk::trace::SpanExporter for SlowExporter {
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult {
std::thread::sleep(self.latency); // simulate round-trip to OTLP backend
self.exported.fetch_add(batch.len(), Ordering::Relaxed);
Ok(())
}
}
// ── OTel provider helpers ─────────────────────────────────────────────────────
fn resource() -> Resource {
Resource::builder()
.with_attribute(KeyValue::new("service.name", "datafusion-drop-repro"))
.build()
}
fn install_subscriber(provider: &SdkTracerProvider) -> impl Drop {
let tracer = provider.tracer("repro");
let layer = OpenTelemetryLayer::new(tracer)
.with_filter(tracing::level_filters::LevelFilter::INFO);
tracing::subscriber::set_default(tracing_subscriber::registry().with(layer))
}
// ── execute + clone + timed drop ─────────────────────────────────────────────
async fn execute_and_time_drop(ctx: &SessionContext) -> Duration {
let plan = ctx.sql(QUERY).await.unwrap().create_physical_plan().await.unwrap();
let plan_clone = plan.clone(); // mirrors the reporter's usage
let task_ctx = ctx.task_ctx();
for part in 0..plan.properties().partitioning.partition_count() {
let mut stream = plan.execute(part, task_ctx.clone()).unwrap();
while let Some(b) = stream.next().await { b.unwrap(); }
}
drop(plan); // plan_clone is now the sole owner (strong_count=1)
let t = Instant::now();
drop(plan_clone); // ← this is what the reporter observes as slow
t.elapsed()
}
// ── main ─────────────────────────────────────────────────────────────────────
#[tokio::main(flavor = "current_thread")]
async fn main() {
let use_otlp = std::env::args().any(|a| a == "--otlp");
// ── Step 1: count spans ───────────────────────────────────────────────────
// Use InMemorySpanExporter + SimpleSpanProcessor to count how many OTel
// spans a single query produces. These spans are all closed when the plan
// is dropped — one per InstrumentedExec node.
println!("=== Step 1: count spans produced by one query ===");
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(exporter.clone()))
.with_resource(resource())
.with_sampler(Sampler::AlwaysOn)
.build();
{
let _sub = install_subscriber(&provider);
let ctx = make_ctx().await;
let before = exporter.get_finished_spans().unwrap().len();
execute_and_time_drop(&ctx).await;
let n_spans = exporter.get_finished_spans().unwrap().len() - before;
println!(" query plan nodes wrapped by InstrumentedExec: {n_spans}");
println!(
" → dropping the plan closes {n_spans} OTel spans",
);
println!(
" → with SimpleSpanProcessor + {SIMULATED_LATENCY:?}/span latency: \
expected drop time = {n_spans} × {SIMULATED_LATENCY:?} = {:?}",
SIMULATED_LATENCY * n_spans as u32,
);
}
provider.shutdown().ok();
// ── Step 2: problem — SimpleSpanProcessor ────────────────────────────────
// SimpleSpanProcessor::on_end() blocks the thread per span via block_on().
// Plan drop time ≈ N_spans × OTLP_latency.
println!("\n=== Step 2: problem — SimpleSpanProcessor (synchronous export) ===");
let (slow_exporter, counter) = SlowExporter::new(SIMULATED_LATENCY);
let provider = SdkTracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(slow_exporter))
.with_resource(resource())
.with_sampler(Sampler::AlwaysOn)
.build();
{
let _sub = install_subscriber(&provider);
let ctx = make_ctx().await;
let drop_time = execute_and_time_drop(&ctx).await;
let n = counter.load(Ordering::Relaxed);
println!(" drop() blocked for: {drop_time:?} (exported {n} spans × {SIMULATED_LATENCY:?})");
}
provider.shutdown().ok();
// ── Step 3: fix — BatchSpanProcessor ─────────────────────────────────────
// BatchSpanProcessor::on_end() sends to a channel (non-blocking).
// A background OS thread handles the actual export.
// Plan drop is instant; export cost is paid later, off the hot path.
println!("\n=== Step 3: fix — BatchSpanProcessor (async export) ===");
let (slow_exporter, counter) = SlowExporter::new(SIMULATED_LATENCY);
let provider = SdkTracerProvider::builder()
.with_span_processor(BatchSpanProcessor::builder(slow_exporter).build())
.with_resource(resource())
.with_sampler(Sampler::AlwaysOn)
.build();
{
let _sub = install_subscriber(&provider);
let ctx = make_ctx().await;
let drop_time = execute_and_time_drop(&ctx).await;
println!(" drop() returned in: {drop_time:?} ← instant, spans queued");
}
// Flush remaining spans (on a blocking thread to avoid current_thread deadlock)
let flush_start = Instant::now();
tokio::task::spawn_blocking(move || provider.shutdown().ok())
.await
.ok();
let n = counter.load(Ordering::Relaxed);
println!(" background flush: {:?} (exported {n} spans)", flush_start.elapsed());
// ── Optional: real OTLP (cargo run -- --otlp) ────────────────────────────
if use_otlp {
use opentelemetry_otlp::WithExportConfig;
println!("\n=== Step 2b: real OTLP via gRPC (SimpleSpanProcessor) ===");
println!(" (requires Jaeger: docker run --rm -p 4317:4317 -p 16686:16686 \\\n\
\t\t\t cr.jaegertracing.io/jaegertracing/jaeger:2.14.0)");
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint("http://localhost:4317")
.with_timeout(Duration::from_secs(10))
.build()
.expect("OTLP exporter — is Jaeger running on :4317?");
let provider = SdkTracerProvider::builder()
.with_span_processor(SimpleSpanProcessor::new(exporter))
.with_resource(resource())
.with_sampler(Sampler::AlwaysOn)
.build();
{
let _sub = install_subscriber(&provider);
let ctx = make_ctx().await;
let drop_time = execute_and_time_drop(&ctx).await;
println!(" drop() blocked for: {drop_time:?}");
}
tokio::task::spawn_blocking(move || provider.shutdown().ok())
.await
.ok();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment