|
use std::fmt; |
|
use std::sync::atomic::{AtomicBool, Ordering}; |
|
use std::sync::{Arc, Mutex}; |
|
|
|
use datafusion::common::config::ConfigOptions; |
|
use datafusion::common::{DataFusionError, Result as DataFusionResult}; |
|
use datafusion::execution::SessionStateBuilder; |
|
use datafusion::logical_expr::LogicalPlan; |
|
use datafusion::optimizer::analyzer::AnalyzerRule; |
|
use datafusion::prelude::{SessionConfig, SessionContext}; |
|
use datafusion_tracing::RuleInstrumentationOptions; |
|
use tracing::field::{Field, Visit}; |
|
use tracing::{Id, Instrument as _, Subscriber}; |
|
use tracing_subscriber::layer::Context; |
|
use tracing_subscriber::registry::LookupSpan; |
|
use tracing_subscriber::{Layer, prelude::*}; |
|
|
|
#[derive(Clone, Debug)] |
|
struct SpanEvent { |
|
kind: &'static str, |
|
name: String, |
|
parent: Option<String>, |
|
} |
|
|
|
#[derive(Clone, Default)] |
|
struct SpanEvents(Arc<Mutex<Vec<SpanEvent>>>); |
|
|
|
impl SpanEvents { |
|
fn snapshot(&self) -> Vec<SpanEvent> { |
|
self.0.lock().expect("span events lock").clone() |
|
} |
|
} |
|
|
|
struct CapturedSpanName(String); |
|
|
|
#[derive(Default)] |
|
struct SpanNameVisitor { |
|
name: Option<String>, |
|
} |
|
|
|
impl Visit for SpanNameVisitor { |
|
fn record_str(&mut self, field: &Field, value: &str) { |
|
if field.name() == "otel.name" { |
|
self.name = Some(value.to_string()); |
|
} |
|
} |
|
|
|
fn record_debug(&mut self, field: &Field, value: &dyn fmt::Debug) { |
|
if field.name() == "otel.name" { |
|
self.name = Some(format!("{value:?}")); |
|
} |
|
} |
|
} |
|
|
|
impl<S> Layer<S> for SpanEvents |
|
where |
|
S: Subscriber + for<'span> LookupSpan<'span>, |
|
{ |
|
fn on_new_span(&self, attrs: &tracing::span::Attributes<'_>, id: &Id, ctx: Context<'_, S>) { |
|
let mut visitor = SpanNameVisitor::default(); |
|
attrs.record(&mut visitor); |
|
let name = visitor |
|
.name |
|
.unwrap_or_else(|| attrs.metadata().name().to_string()); |
|
|
|
if let Some(span) = ctx.span(id) { |
|
span.extensions_mut().insert(CapturedSpanName(name.clone())); |
|
} |
|
|
|
let parent = ctx.span(id).and_then(|span| { |
|
span.parent().map(|parent| { |
|
parent |
|
.extensions() |
|
.get::<CapturedSpanName>() |
|
.map(|name| name.0.clone()) |
|
.unwrap_or_else(|| parent.metadata().name().to_string()) |
|
}) |
|
}); |
|
|
|
self.0.lock().expect("span events lock").push(SpanEvent { |
|
kind: "new", |
|
name, |
|
parent, |
|
}); |
|
} |
|
|
|
fn on_close(&self, id: Id, ctx: Context<'_, S>) { |
|
if let Some(span) = ctx.span(&id) { |
|
let name = span |
|
.extensions() |
|
.get::<CapturedSpanName>() |
|
.map(|name| name.0.clone()) |
|
.unwrap_or_else(|| span.metadata().name().to_string()); |
|
|
|
self.0.lock().expect("span events lock").push(SpanEvent { |
|
kind: "close", |
|
name, |
|
parent: None, |
|
}); |
|
} |
|
} |
|
} |
|
|
|
struct FailOnceAnalyzer { |
|
fail_next: AtomicBool, |
|
} |
|
|
|
impl FailOnceAnalyzer { |
|
fn new() -> Self { |
|
Self { |
|
fail_next: AtomicBool::new(true), |
|
} |
|
} |
|
} |
|
|
|
impl fmt::Debug for FailOnceAnalyzer { |
|
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { |
|
formatter.debug_struct("FailOnceAnalyzer").finish() |
|
} |
|
} |
|
|
|
impl AnalyzerRule for FailOnceAnalyzer { |
|
fn analyze(&self, plan: LogicalPlan, _config: &ConfigOptions) -> DataFusionResult<LogicalPlan> { |
|
if self.fail_next.swap(false, Ordering::SeqCst) { |
|
return Err(DataFusionError::Internal( |
|
"intentional analyzer failure".to_string(), |
|
)); |
|
} |
|
|
|
Ok(plan) |
|
} |
|
|
|
fn name(&self) -> &str { |
|
"fail_once_analyzer" |
|
} |
|
} |
|
|
|
fn context_with_rule_phase_tracing() -> SessionContext { |
|
let session_state = SessionStateBuilder::new() |
|
.with_config(SessionConfig::new()) |
|
.with_default_features() |
|
.with_analyzer_rule(Arc::new(FailOnceAnalyzer::new())) |
|
.build(); |
|
|
|
let session_state = datafusion_tracing::instrument_rules_with_trace_spans!( |
|
target: "repro", |
|
options: RuleInstrumentationOptions::phase_only(), |
|
state: session_state |
|
); |
|
|
|
SessionContext::new_with_state(session_state) |
|
} |
|
|
|
async fn execute_query(ctx: &SessionContext) -> DataFusionResult<()> { |
|
let df = ctx.sql("SELECT 1").await?; |
|
df.collect().await?; |
|
Ok(()) |
|
} |
|
|
|
#[tokio::main(flavor = "current_thread")] |
|
async fn main() -> DataFusionResult<()> { |
|
let events = SpanEvents::default(); |
|
let subscriber = tracing_subscriber::registry() |
|
.with(tracing_subscriber::filter::LevelFilter::TRACE) |
|
.with(events.clone()); |
|
let _guard = tracing::subscriber::set_default(subscriber); |
|
|
|
let ctx = context_with_rule_phase_tracing(); |
|
|
|
let failed = execute_query(&ctx) |
|
.instrument(tracing::info_span!("failed_query")) |
|
.await; |
|
assert!(failed.is_err(), "first query should fail in analyzer"); |
|
|
|
let after_failure = events.snapshot(); |
|
let open_after_failure = count(&after_failure, "new", "analyze_logical_plan"); |
|
let close_after_failure = count(&after_failure, "close", "analyze_logical_plan"); |
|
|
|
let succeeded = execute_query(&ctx) |
|
.instrument(tracing::info_span!("successful_query")) |
|
.await; |
|
assert!(succeeded.is_ok(), "second query should pass"); |
|
|
|
let after_success = events.snapshot(); |
|
let close_after_success = count(&after_success, "close", "analyze_logical_plan"); |
|
let successful_query_parent = after_success |
|
.iter() |
|
.find(|event| event.kind == "new" && event.name == "successful_query") |
|
.and_then(|event| event.parent.as_deref()); |
|
|
|
println!("after failed query:"); |
|
println!(" analyze_logical_plan opened: {open_after_failure}"); |
|
println!(" analyze_logical_plan closed: {close_after_failure}"); |
|
println!("after successful query:"); |
|
println!(" analyze_logical_plan closed: {close_after_success}"); |
|
println!(" successful_query parent: {successful_query_parent:?}"); |
|
|
|
assert_eq!(open_after_failure, 1); |
|
assert_eq!(close_after_failure, 0); |
|
assert_eq!(close_after_success, 2); |
|
assert_eq!(successful_query_parent, Some("analyze_logical_plan")); |
|
|
|
Ok(()) |
|
} |
|
|
|
fn count(events: &[SpanEvent], kind: &'static str, name: &str) -> usize { |
|
events |
|
.iter() |
|
.filter(|event| event.kind == kind && event.name == name) |
|
.count() |
|
} |