|
/// Repro for https://github.com/datafusion-contrib/datafusion-tracing/issues/27 |
|
/// |
|
/// "Execution plan takes a long time to drop" |
|
/// |
|
/// When datafusion-tracing is used with a real OTel subscriber and a |
|
/// SimpleSpanProcessor, dropping an Arc<dyn ExecutionPlan> can take seconds. |
|
/// |
|
/// Root cause |
|
/// ---------- |
|
/// Each InstrumentedExec node creates one OTel span. When the plan is dropped, |
|
/// every node's span is closed. SimpleSpanProcessor::on_end() drives the |
|
/// exporter synchronously via futures_executor::block_on, so: |
|
/// |
|
/// plan_drop_time ≈ N_nodes × OTLP_round_trip_latency |
|
/// |
|
/// With a moderate OTLP latency (e.g. 200 ms to Jaeger under load) and a |
|
/// query plan with 40 nodes, that is 8 seconds — exactly as reported. |
|
/// |
|
/// Fix |
|
/// --- |
|
/// Replace SimpleSpanProcessor with BatchSpanProcessor. The batch processor |
|
/// queues spans in a channel (non-blocking) and exports them on a background |
|
/// thread. drop() returns immediately regardless of exporter latency. |
|
/// |
|
/// This program shows all three steps: |
|
/// Step 1 — count: how many spans does one query create? |
|
/// Step 2 — problem: slow drop with SimpleSpanProcessor |
|
/// Step 3 — fix: instant drop with BatchSpanProcessor |
|
/// |
|
/// Usage: |
|
/// cargo run runs steps 1-3 (self-contained) |
|
/// cargo run -- --otlp step 2 with a real Jaeger on localhost:4317 |
|
use datafusion::arrow::array::{Float64Array, StringArray}; |
|
use datafusion::arrow::datatypes::{DataType, Field, Schema}; |
|
use datafusion::arrow::record_batch::RecordBatch; |
|
use datafusion::execution::SessionStateBuilder; |
|
use datafusion::prelude::*; |
|
use datafusion_tracing::{InstrumentationOptions, instrument_with_info_spans}; |
|
use futures::StreamExt; |
|
use opentelemetry::KeyValue; |
|
use opentelemetry::trace::TracerProvider as _; |
|
use opentelemetry_sdk::Resource; |
|
use opentelemetry_sdk::error::OTelSdkResult; |
|
use opentelemetry_sdk::trace::{ |
|
BatchSpanProcessor, InMemorySpanExporter, SdkTracerProvider, Sampler, |
|
SimpleSpanProcessor, SpanData, |
|
}; |
|
use std::sync::Arc; |
|
use std::sync::atomic::{AtomicUsize, Ordering}; |
|
use std::time::{Duration, Instant}; |
|
use tracing_opentelemetry::OpenTelemetryLayer; |
|
use tracing_subscriber::prelude::*; |
|
|
|
const QUERY: &str = "SELECT sum(numeric_value) FROM my_table \ |
|
WHERE filter_column_1 = 'filter_value_1' \ |
|
AND filter_column_2 = 'filter_value_2'"; |
|
const N_ROWS: usize = 100_000; |
|
|
|
// ── simulated exporter latency ─────────────────────────────────────────────── |
|
// 50 ms per span simulates a moderate OTLP round-trip (e.g. Jaeger under load). |
|
// Scale up to reproduce the reporter's 8–10 s delay. |
|
const SIMULATED_LATENCY: Duration = Duration::from_millis(50); |
|
|
|
// ── data helpers ───────────────────────────────────────────────────────────── |
|
|
|
fn make_batch() -> RecordBatch { |
|
let schema = Arc::new(Schema::new(vec![ |
|
Field::new("numeric_value", DataType::Float64, false), |
|
Field::new("filter_column_1", DataType::Utf8, false), |
|
Field::new("filter_column_2", DataType::Utf8, false), |
|
])); |
|
RecordBatch::try_new( |
|
schema, |
|
vec![ |
|
Arc::new(Float64Array::from(vec![1.0f64; N_ROWS])), |
|
Arc::new(StringArray::from(vec!["filter_value_1"; N_ROWS])), |
|
Arc::new(StringArray::from(vec!["filter_value_2"; N_ROWS])), |
|
], |
|
) |
|
.unwrap() |
|
} |
|
|
|
async fn make_ctx() -> SessionContext { |
|
let instrument_rule = instrument_with_info_spans!( |
|
options: InstrumentationOptions { |
|
record_metrics: true, |
|
..Default::default() |
|
} |
|
); |
|
let state = SessionStateBuilder::new() |
|
.with_default_features() |
|
.with_physical_optimizer_rule(instrument_rule) |
|
.build(); |
|
let ctx = SessionContext::new_with_state(state); |
|
ctx.register_batch("my_table", make_batch()).unwrap(); |
|
ctx |
|
} |
|
|
|
// ── SlowExporter ───────────────────────────────────────────────────────────── |
|
// Simulates a slow OTLP backend (network I/O, collector under load, etc.). |
|
// std::thread::sleep is intentional: the export future must block the calling |
|
// thread so that SimpleSpanProcessor's block_on() feels the full latency. |
|
|
|
#[derive(Debug)] |
|
struct SlowExporter { |
|
latency: Duration, |
|
exported: Arc<AtomicUsize>, |
|
} |
|
|
|
impl SlowExporter { |
|
fn new(latency: Duration) -> (Self, Arc<AtomicUsize>) { |
|
let counter = Arc::new(AtomicUsize::new(0)); |
|
(Self { latency, exported: counter.clone() }, counter) |
|
} |
|
} |
|
|
|
impl opentelemetry_sdk::trace::SpanExporter for SlowExporter { |
|
async fn export(&self, batch: Vec<SpanData>) -> OTelSdkResult { |
|
std::thread::sleep(self.latency); // simulate round-trip to OTLP backend |
|
self.exported.fetch_add(batch.len(), Ordering::Relaxed); |
|
Ok(()) |
|
} |
|
} |
|
|
|
// ── OTel provider helpers ───────────────────────────────────────────────────── |
|
|
|
fn resource() -> Resource { |
|
Resource::builder() |
|
.with_attribute(KeyValue::new("service.name", "datafusion-drop-repro")) |
|
.build() |
|
} |
|
|
|
fn install_subscriber(provider: &SdkTracerProvider) -> impl Drop { |
|
let tracer = provider.tracer("repro"); |
|
let layer = OpenTelemetryLayer::new(tracer) |
|
.with_filter(tracing::level_filters::LevelFilter::INFO); |
|
tracing::subscriber::set_default(tracing_subscriber::registry().with(layer)) |
|
} |
|
|
|
// ── execute + clone + timed drop ───────────────────────────────────────────── |
|
|
|
async fn execute_and_time_drop(ctx: &SessionContext) -> Duration { |
|
let plan = ctx.sql(QUERY).await.unwrap().create_physical_plan().await.unwrap(); |
|
let plan_clone = plan.clone(); // mirrors the reporter's usage |
|
|
|
let task_ctx = ctx.task_ctx(); |
|
for part in 0..plan.properties().partitioning.partition_count() { |
|
let mut stream = plan.execute(part, task_ctx.clone()).unwrap(); |
|
while let Some(b) = stream.next().await { b.unwrap(); } |
|
} |
|
|
|
drop(plan); // plan_clone is now the sole owner (strong_count=1) |
|
|
|
let t = Instant::now(); |
|
drop(plan_clone); // ← this is what the reporter observes as slow |
|
t.elapsed() |
|
} |
|
|
|
// ── main ───────────────────────────────────────────────────────────────────── |
|
|
|
#[tokio::main(flavor = "current_thread")] |
|
async fn main() { |
|
let use_otlp = std::env::args().any(|a| a == "--otlp"); |
|
|
|
// ── Step 1: count spans ─────────────────────────────────────────────────── |
|
// Use InMemorySpanExporter + SimpleSpanProcessor to count how many OTel |
|
// spans a single query produces. These spans are all closed when the plan |
|
// is dropped — one per InstrumentedExec node. |
|
println!("=== Step 1: count spans produced by one query ==="); |
|
let exporter = InMemorySpanExporter::default(); |
|
let provider = SdkTracerProvider::builder() |
|
.with_span_processor(SimpleSpanProcessor::new(exporter.clone())) |
|
.with_resource(resource()) |
|
.with_sampler(Sampler::AlwaysOn) |
|
.build(); |
|
{ |
|
let _sub = install_subscriber(&provider); |
|
let ctx = make_ctx().await; |
|
|
|
let before = exporter.get_finished_spans().unwrap().len(); |
|
execute_and_time_drop(&ctx).await; |
|
let n_spans = exporter.get_finished_spans().unwrap().len() - before; |
|
|
|
println!(" query plan nodes wrapped by InstrumentedExec: {n_spans}"); |
|
println!( |
|
" → dropping the plan closes {n_spans} OTel spans", |
|
); |
|
println!( |
|
" → with SimpleSpanProcessor + {SIMULATED_LATENCY:?}/span latency: \ |
|
expected drop time = {n_spans} × {SIMULATED_LATENCY:?} = {:?}", |
|
SIMULATED_LATENCY * n_spans as u32, |
|
); |
|
} |
|
provider.shutdown().ok(); |
|
|
|
// ── Step 2: problem — SimpleSpanProcessor ──────────────────────────────── |
|
// SimpleSpanProcessor::on_end() blocks the thread per span via block_on(). |
|
// Plan drop time ≈ N_spans × OTLP_latency. |
|
println!("\n=== Step 2: problem — SimpleSpanProcessor (synchronous export) ==="); |
|
let (slow_exporter, counter) = SlowExporter::new(SIMULATED_LATENCY); |
|
let provider = SdkTracerProvider::builder() |
|
.with_span_processor(SimpleSpanProcessor::new(slow_exporter)) |
|
.with_resource(resource()) |
|
.with_sampler(Sampler::AlwaysOn) |
|
.build(); |
|
{ |
|
let _sub = install_subscriber(&provider); |
|
let ctx = make_ctx().await; |
|
let drop_time = execute_and_time_drop(&ctx).await; |
|
let n = counter.load(Ordering::Relaxed); |
|
println!(" drop() blocked for: {drop_time:?} (exported {n} spans × {SIMULATED_LATENCY:?})"); |
|
} |
|
provider.shutdown().ok(); |
|
|
|
// ── Step 3: fix — BatchSpanProcessor ───────────────────────────────────── |
|
// BatchSpanProcessor::on_end() sends to a channel (non-blocking). |
|
// A background OS thread handles the actual export. |
|
// Plan drop is instant; export cost is paid later, off the hot path. |
|
println!("\n=== Step 3: fix — BatchSpanProcessor (async export) ==="); |
|
let (slow_exporter, counter) = SlowExporter::new(SIMULATED_LATENCY); |
|
let provider = SdkTracerProvider::builder() |
|
.with_span_processor(BatchSpanProcessor::builder(slow_exporter).build()) |
|
.with_resource(resource()) |
|
.with_sampler(Sampler::AlwaysOn) |
|
.build(); |
|
{ |
|
let _sub = install_subscriber(&provider); |
|
let ctx = make_ctx().await; |
|
let drop_time = execute_and_time_drop(&ctx).await; |
|
println!(" drop() returned in: {drop_time:?} ← instant, spans queued"); |
|
} |
|
// Flush remaining spans (on a blocking thread to avoid current_thread deadlock) |
|
let flush_start = Instant::now(); |
|
tokio::task::spawn_blocking(move || provider.shutdown().ok()) |
|
.await |
|
.ok(); |
|
let n = counter.load(Ordering::Relaxed); |
|
println!(" background flush: {:?} (exported {n} spans)", flush_start.elapsed()); |
|
|
|
// ── Optional: real OTLP (cargo run -- --otlp) ──────────────────────────── |
|
if use_otlp { |
|
use opentelemetry_otlp::WithExportConfig; |
|
println!("\n=== Step 2b: real OTLP via gRPC (SimpleSpanProcessor) ==="); |
|
println!(" (requires Jaeger: docker run --rm -p 4317:4317 -p 16686:16686 \\\n\ |
|
\t\t\t cr.jaegertracing.io/jaegertracing/jaeger:2.14.0)"); |
|
let exporter = opentelemetry_otlp::SpanExporter::builder() |
|
.with_tonic() |
|
.with_endpoint("http://localhost:4317") |
|
.with_timeout(Duration::from_secs(10)) |
|
.build() |
|
.expect("OTLP exporter — is Jaeger running on :4317?"); |
|
let provider = SdkTracerProvider::builder() |
|
.with_span_processor(SimpleSpanProcessor::new(exporter)) |
|
.with_resource(resource()) |
|
.with_sampler(Sampler::AlwaysOn) |
|
.build(); |
|
{ |
|
let _sub = install_subscriber(&provider); |
|
let ctx = make_ctx().await; |
|
let drop_time = execute_and_time_drop(&ctx).await; |
|
println!(" drop() blocked for: {drop_time:?}"); |
|
} |
|
tokio::task::spawn_blocking(move || provider.shutdown().ok()) |
|
.await |
|
.ok(); |
|
} |
|
} |