Skip to content

Instantly share code, notes, and snippets.

@antonmry
Created May 5, 2026 20:59
Show Gist options
  • Select an option

  • Save antonmry/c596f9c47dfae32e25435ebd5de6bca1 to your computer and use it in GitHub Desktop.

Select an option

Save antonmry/c596f9c47dfae32e25435ebd5de6bca1 to your computer and use it in GitHub Desktop.
datafusion-tracing rule phase span repro

datafusion-tracing rule phase span repro

This is a minimal reproduction for datafusion-tracing rule phase spans staying open when a DataFusion analyzer rule returns an error before the closing sentinel rule runs.

Run:

cargo run

Expected output:

after failed query:
  analyze_logical_plan opened: 1
  analyze_logical_plan closed: 0
after successful query:
  analyze_logical_plan closed: 2
  successful_query parent: Some("analyze_logical_plan")

The important part is that successful_query is parented under the stale analyze_logical_plan span from the previous failed query.

[package]
name = "datafusion-tracing-rule-repro"
version = "0.1.0"
edition = "2024"
[dependencies]
datafusion = "53"
datafusion-tracing = "53.0.0"
tokio = { version = "1", features = ["full"] }
tracing = "0.1"
tracing-subscriber = "0.3"
use std::fmt;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use datafusion::common::config::ConfigOptions;
use datafusion::common::{DataFusionError, Result as DataFusionResult};
use datafusion::execution::SessionStateBuilder;
use datafusion::logical_expr::LogicalPlan;
use datafusion::optimizer::analyzer::AnalyzerRule;
use datafusion::prelude::{SessionConfig, SessionContext};
use datafusion_tracing::RuleInstrumentationOptions;
use tracing::field::{Field, Visit};
use tracing::{Id, Instrument as _, Subscriber};
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::{Layer, prelude::*};
#[derive(Clone, Debug)]
struct SpanEvent {
kind: &'static str,
name: String,
parent: Option<String>,
}
#[derive(Clone, Default)]
struct SpanEvents(Arc<Mutex<Vec<SpanEvent>>>);
impl SpanEvents {
fn snapshot(&self) -> Vec<SpanEvent> {
self.0.lock().expect("span events lock").clone()
}
}
struct CapturedSpanName(String);
#[derive(Default)]
struct SpanNameVisitor {
name: Option<String>,
}
impl Visit for SpanNameVisitor {
fn record_str(&mut self, field: &Field, value: &str) {
if field.name() == "otel.name" {
self.name = Some(value.to_string());
}
}
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) {
if field.name() == "otel.name" {
self.name = Some(format!("{value:?}"));
}
}
}
impl<S> Layer<S> for SpanEvents
where
S: Subscriber + for<'span> LookupSpan<'span>,
{
fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) {
let mut visitor = SpanNameVisitor::default();
attrs.record(&mut visitor);
let name = visitor
.name
.unwrap_or_else(|| attrs.metadata().name().to_string());
if let Some(span) = ctx.span(id) {
span.extensions_mut().insert(CapturedSpanName(name.clone()));
}
let parent = ctx.span(id).and_then(|span| {
span.parent().map(|parent| {
parent
.extensions()
.get::<CapturedSpanName>()
.map(|name| name.0.clone())
.unwrap_or_else(|| parent.metadata().name().to_string())
})
});
self.0.lock().expect("span events lock").push(SpanEvent {
kind: "new",
name,
parent,
});
}
fn on_close(&self, id: Id, ctx: Context<'_, S>) {
if let Some(span) = ctx.span(&id) {
let name = span
.extensions()
.get::<CapturedSpanName>()
.map(|name| name.0.clone())
.unwrap_or_else(|| span.metadata().name().to_string());
self.0.lock().expect("span events lock").push(SpanEvent {
kind: "close",
name,
parent: None,
});
}
}
}
struct FailOnceAnalyzer {
fail_next: AtomicBool,
}
impl FailOnceAnalyzer {
fn new() -> Self {
Self {
fail_next: AtomicBool::new(true),
}
}
}
impl fmt::Debug for FailOnceAnalyzer {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
formatter.debug_struct("FailOnceAnalyzer").finish()
}
}
impl AnalyzerRule for FailOnceAnalyzer {
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> DataFusionResult<LogicalPlan> {
if self.fail_next.swap(false, Ordering::SeqCst) {
return Err(DataFusionError::Internal(
"intentional analyzer failure".to_string(),
));
}
Ok(plan)
}
fn name(&self) -> &str {
"fail_once_analyzer"
}
}
fn context_with_rule_phase_tracing() -> SessionContext {
let session_state = SessionStateBuilder::new()
.with_config(SessionConfig::new())
.with_default_features()
.with_analyzer_rule(Arc::new(FailOnceAnalyzer::new()))
.build();
let session_state = datafusion_tracing::instrument_rules_with_trace_spans!(
target: "repro",
options: RuleInstrumentationOptions::phase_only(),
state: session_state
);
SessionContext::new_with_state(session_state)
}
async fn execute_query(ctx: &SessionContext) -> DataFusionResult<()> {
let df = ctx.sql("SELECT 1").await?;
df.collect().await?;
Ok(())
}
#[tokio::main(flavor = "current_thread")]
async fn main() -> DataFusionResult<()> {
let events = SpanEvents::default();
let subscriber = tracing_subscriber::registry()
.with(tracing_subscriber::filter::LevelFilter::TRACE)
.with(events.clone());
let _guard = tracing::subscriber::set_default(subscriber);
let ctx = context_with_rule_phase_tracing();
let failed = execute_query(&ctx)
.instrument(tracing::info_span!("failed_query"))
.await;
assert!(failed.is_err(), "first query should fail in analyzer");
let after_failure = events.snapshot();
let open_after_failure = count(&after_failure, "new", "analyze_logical_plan");
let close_after_failure = count(&after_failure, "close", "analyze_logical_plan");
let succeeded = execute_query(&ctx)
.instrument(tracing::info_span!("successful_query"))
.await;
assert!(succeeded.is_ok(), "second query should pass");
let after_success = events.snapshot();
let close_after_success = count(&after_success, "close", "analyze_logical_plan");
let successful_query_parent = after_success
.iter()
.find(|event| event.kind == "new" && event.name == "successful_query")
.and_then(|event| event.parent.as_deref());
println!("after failed query:");
println!(" analyze_logical_plan opened: {open_after_failure}");
println!(" analyze_logical_plan closed: {close_after_failure}");
println!("after successful query:");
println!(" analyze_logical_plan closed: {close_after_success}");
println!(" successful_query parent: {successful_query_parent:?}");
assert_eq!(open_after_failure, 1);
assert_eq!(close_after_failure, 0);
assert_eq!(close_after_success, 2);
assert_eq!(successful_query_parent, Some("analyze_logical_plan"));
Ok(())
}
fn count(events: &[SpanEvent], kind: &'static str, name: &str) -> usize {
events
.iter()
.filter(|event| event.kind == kind && event.name == name)
.count()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment