Skip to content

Instantly share code, notes, and snippets.

@ethDreamer
Created March 18, 2026 21:57
Show Gist options
  • Select an option

  • Save ethDreamer/22703aca6dfdfaa565fd855ef2d585e4 to your computer and use it in GitHub Desktop.

Select an option

Save ethDreamer/22703aca6dfdfaa565fd855ef2d585e4 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Analyze block propagation/dissemination time as a function of block size.
Uses the "spread" approach to avoid clock skew issues:
- First observer defines the reference point (earliest propagation_slot_start_diff)
- Spread to p50/p90/p95 shows network dissemination time
- Clock skew affects all measurements equally, so differences are preserved
"""
import duckdb
import polars as pl
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
import numpy as np
# Configuration
START_DATE = datetime(2026, 2, 1)
END_DATE = datetime(2026, 2, 28)
NETWORK = "mainnet"
# Parquet file URL patterns
BLOCK_EVENTS_BASE = f"https://data.ethpandaops.io/xatu/{NETWORK}/databases/default/beacon_api_eth_v1_events_block/"
CANONICAL_BLOCKS_BASE = f"https://data.ethpandaops.io/xatu/{NETWORK}/databases/default/canonical_beacon_block/"
def generate_parquet_urls(base_url, start_date, end_date):
"""Generate list of parquet URLs for date range."""
urls = []
current = start_date
while current <= end_date:
url = f"{base_url}{current.year}/{current.month}/{current.day}.parquet"
urls.append(url)
current += timedelta(days=1)
return urls
def analyze_propagation():
"""Main analysis function."""
print(f"Analyzing block propagation from {START_DATE} to {END_DATE}")
print(f"Network: {NETWORK}\n")
# Generate URL lists
block_event_urls = generate_parquet_urls(BLOCK_EVENTS_BASE, START_DATE, END_DATE)
canonical_block_urls = generate_parquet_urls(CANONICAL_BLOCKS_BASE, START_DATE, END_DATE)
print(f"Querying {len(block_event_urls)} days of data...")
# Connect to DuckDB
db = duckdb.connect()
# Query: Calculate propagation spread per BLOCK (not slot!) with size info
# CRITICAL FIX: Group by block_root, not slot, to avoid mixing competing blocks
# CRITICAL FIX 2: Compute second_fastest separately to avoid Cartesian product
query = f"""
WITH block_observations AS (
SELECT
block as block_root,
slot,
propagation_slot_start_diff as propagation_ms,
meta_client_name
FROM read_parquet({block_event_urls})
),
canonical_blocks AS (
SELECT
block_root,
slot,
block_total_bytes,
block_total_bytes_compressed,
execution_payload_transactions_total_bytes,
execution_payload_transactions_total_bytes_compressed
FROM read_parquet({canonical_block_urls})
),
-- Compute second-fastest observation per block (in separate CTE to avoid duplication)
ranked AS (
SELECT
block_root,
propagation_ms,
ROW_NUMBER() OVER (PARTITION BY block_root ORDER BY propagation_ms ASC) AS rn
FROM block_observations
),
second_fastest AS (
SELECT
block_root,
MAX(CASE WHEN rn = 2 THEN propagation_ms END) AS second_fastest_ms
FROM ranked
GROUP BY block_root
),
-- Calculate stats per block_root (no join duplication here)
propagation_stats AS (
SELECT
block_root,
MIN(slot) as slot, -- All observations should have same slot for a given block_root
COUNT(*) as observer_count,
MIN(propagation_ms) as first_seen_ms,
PERCENTILE_CONT(0.01) WITHIN GROUP (ORDER BY propagation_ms) as p01_ms,
PERCENTILE_CONT(0.10) WITHIN GROUP (ORDER BY propagation_ms) as p10_ms,
PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY propagation_ms) as p50_ms,
PERCENTILE_CONT(0.90) WITHIN GROUP (ORDER BY propagation_ms) as p90_ms,
PERCENTILE_CONT(0.95) WITHIN GROUP (ORDER BY propagation_ms) as p95_ms,
MAX(propagation_ms) as last_seen_ms
FROM block_observations
GROUP BY block_root
HAVING COUNT(*) >= 5 -- Filter blocks seen by at least 5 observers
)
SELECT
p.block_root,
p.slot,
b.block_total_bytes,
b.block_total_bytes_compressed,
b.execution_payload_transactions_total_bytes,
b.execution_payload_transactions_total_bytes_compressed,
p.observer_count,
p.first_seen_ms,
s.second_fastest_ms,
p.p01_ms,
p.p10_ms,
p.p50_ms,
p.p90_ms,
p.p95_ms,
p.last_seen_ms,
-- Calculate spread using different reference points
-- MIN (first observer - most affected by clock skew)
p.p10_ms - p.first_seen_ms as spread_min_to_p10_ms,
p.p50_ms - p.first_seen_ms as spread_min_to_p50_ms,
p.p90_ms - p.first_seen_ms as spread_min_to_p90_ms,
p.p95_ms - p.first_seen_ms as spread_min_to_p95_ms,
-- Second fastest (more robust)
p.p10_ms - s.second_fastest_ms as spread_2nd_to_p10_ms,
p.p50_ms - s.second_fastest_ms as spread_2nd_to_p50_ms,
p.p90_ms - s.second_fastest_ms as spread_2nd_to_p90_ms,
p.p95_ms - s.second_fastest_ms as spread_2nd_to_p95_ms,
-- p10 as reference (even more robust for small sample sizes)
p.p50_ms - p.p10_ms as spread_p10_to_p50_ms,
p.p90_ms - p.p10_ms as spread_p10_to_p90_ms,
p.p95_ms - p.p10_ms as spread_p10_to_p95_ms
FROM propagation_stats p
LEFT JOIN second_fastest s ON p.block_root = s.block_root
JOIN canonical_blocks b ON p.block_root = b.block_root
WHERE b.block_total_bytes IS NOT NULL
ORDER BY p.slot
"""
print("Executing query (this may take a minute)...")
df = db.sql(query).pl()
print(f"\nFound {len(df)} UNIQUE BLOCKS (by block_root) with complete data")
print(f"Observer count range: {df['observer_count'].min()} - {df['observer_count'].max()}")
print(f"Block size range: {df['block_total_bytes'].min()/1024:.1f}KB - {df['block_total_bytes'].max()/1024:.1f}KB")
# Check for duplicate slots (multiple competing blocks)
unique_slots = df['slot'].n_unique()
print(f"Unique slots: {unique_slots}")
if len(df) > unique_slots:
print(f"⚠️ Found {len(df) - unique_slots} slots with multiple blocks (reorgs/competing blocks)")
# Now analyze which nodes are first most often (to check for clock skew)
print("\n" + "="*60)
print("CLOCK SKEW CHECK: Analyzing which nodes see blocks first")
print("="*60)
first_observer_query = f"""
WITH ranked_observations AS (
SELECT
block as block_root,
meta_client_name,
propagation_slot_start_diff,
ROW_NUMBER() OVER (PARTITION BY block ORDER BY propagation_slot_start_diff ASC) as rank
FROM read_parquet({block_event_urls})
)
SELECT
meta_client_name,
COUNT(*) as times_first,
AVG(propagation_slot_start_diff) as avg_first_time_ms
FROM ranked_observations
WHERE rank = 1
GROUP BY meta_client_name
ORDER BY times_first DESC
"""
print("Querying to find which nodes are first most often...")
first_observers = db.sql(first_observer_query).pl()
total_blocks = first_observers['times_first'].sum()
print(f"\nTop 10 nodes that see blocks first (out of {total_blocks} blocks):")
print("-" * 60)
for i, row in enumerate(first_observers.head(10).iter_rows(named=True), 1):
pct = 100 * row['times_first'] / total_blocks
client_name = row['meta_client_name'].decode('utf-8') if isinstance(row['meta_client_name'], bytes) else row['meta_client_name']
print(f"{i:2d}. {client_name:30s} {row['times_first']:6d} times ({pct:5.2f}%) - avg {row['avg_first_time_ms']:.0f}ms")
# Check if first observer is concentrated
top_node_pct = 100 * first_observers['times_first'][0] / total_blocks
if top_node_pct > 50:
print(f"\n⚠️ WARNING: Top node sees {top_node_pct:.1f}% of blocks first - possible clock skew!")
elif top_node_pct > 20:
print(f"\n⚠️ CAUTION: Top node sees {top_node_pct:.1f}% of blocks first - may indicate some clock skew")
else:
print(f"\n✓ Healthy distribution: Top node sees {top_node_pct:.1f}% of blocks first")
print(" This suggests geographic diversity rather than systematic clock skew")
return df
def plot_results(df):
"""Create visualizations."""
# Convert bytes to KB for easier reading
df = df.with_columns([
(pl.col('block_total_bytes') / 1024).alias('block_size_kb'),
(pl.col('execution_payload_transactions_total_bytes') / 1024).alias('exec_payload_size_kb')
])
# Filter extreme outliers (keep p99 to remove the worst outliers)
# Use p10-based spread (most robust for small sample sizes)
p99_threshold = df['spread_p10_to_p90_ms'].quantile(0.99)
print(f"\nFiltering outliers: keeping propagation times <= {p99_threshold:.0f}ms (p99)")
df_filtered = df.filter(pl.col('spread_p10_to_p90_ms') <= p99_threshold)
print(f"Removed {len(df) - len(df_filtered)} outliers ({100*(len(df)-len(df_filtered))/len(df):.1f}%)")
print(f"Using {len(df_filtered)} blocks for visualization\n")
df = df_filtered
# Create figure with subplots
fig, axes = plt.subplots(2, 2, figsize=(16, 12))
# Plot 1: Spread to p90 vs Total Block Size (hexbin for density)
ax = axes[0, 0]
hexbin = ax.hexbin(df['block_size_kb'].to_numpy(), df['spread_p10_to_p90_ms'].to_numpy(),
gridsize=30, cmap='YlOrRd', mincnt=1)
ax.set_xlabel('Block Size (KB)')
ax.set_ylabel('Propagation Spread to p90 (ms, p10 reference)')
ax.set_title('Block Dissemination Time (p10→p90) vs Total Block Size')
ax.grid(True, alpha=0.3)
plt.colorbar(hexbin, ax=ax, label='Count')
# Add trend line
z = np.polyfit(df['block_size_kb'].to_numpy(), df['spread_p10_to_p90_ms'].to_numpy(), 1)
p = np.poly1d(z)
x_trend = np.linspace(df['block_size_kb'].min(), df['block_size_kb'].max(), 100)
ax.plot(x_trend, p(x_trend), "blue", linewidth=2, label=f'Trend: y={z[0]:.2f}x+{z[1]:.2f}')
ax.legend()
# Plot 2: Spread to p90 vs Execution Payload Size (hexbin for density)
ax = axes[0, 1]
hexbin = ax.hexbin(df['exec_payload_size_kb'].to_numpy(), df['spread_p10_to_p90_ms'].to_numpy(),
gridsize=30, cmap='YlOrRd', mincnt=1)
ax.set_xlabel('Execution Payload Transactions Size (KB)')
ax.set_ylabel('Propagation Spread to p90 (ms, p10 reference)')
ax.set_title('Block Dissemination Time (p10→p90) vs Execution Payload Size')
ax.grid(True, alpha=0.3)
plt.colorbar(hexbin, ax=ax, label='Count')
# Add trend line
z = np.polyfit(df['exec_payload_size_kb'].to_numpy(), df['spread_p10_to_p90_ms'].to_numpy(), 1)
p = np.poly1d(z)
x_trend = np.linspace(df['exec_payload_size_kb'].min(), df['exec_payload_size_kb'].max(), 100)
ax.plot(x_trend, p(x_trend), "blue", linewidth=2, label=f'Trend: y={z[0]:.2f}x+{z[1]:.2f}')
ax.legend()
# Plot 3: Binned analysis - Multiple percentiles vs Block Size
# Use custom bins focusing on the relevant size range
ax = axes[1, 0]
df_np = df.to_pandas()
# Custom bins: more granular in the region we care about (50-800 KB)
bins = [0, 100, 200, 300, 400, 500, 600, 800, 1000, 1500, 3000]
df_np['size_bin'] = pd.cut(df_np['block_size_kb'], bins=bins)
# Calculate median for each bin and percentile (using p10 reference)
bin_centers = []
bin_labels = []
p50_medians = []
p90_medians = []
counts = []
for bin_label in df_np['size_bin'].cat.categories:
bin_data = df_np[df_np['size_bin'] == bin_label]
if len(bin_data) > 20: # Only include bins with enough data
bin_center = (bin_label.left + bin_label.right) / 2
bin_centers.append(bin_center)
bin_labels.append(f'{int(bin_label.left)}-{int(bin_label.right)}')
p50_medians.append(bin_data['spread_p10_to_p50_ms'].median())
p90_medians.append(bin_data['spread_p10_to_p90_ms'].median())
counts.append(len(bin_data))
ax.plot(bin_centers, p50_medians, 'o-', label='p10→p50 (median per bin)', color='blue', linewidth=2, markersize=8)
ax.plot(bin_centers, p90_medians, 'o-', label='p10→p90 (median per bin)', color='orange', linewidth=2, markersize=8)
# Annotate bins with counts
for i, (x, y, count) in enumerate(zip(bin_centers, p90_medians, counts)):
ax.annotate(f'n={count}', xy=(x, y), xytext=(0, 5),
textcoords='offset points', ha='center', fontsize=7, alpha=0.7)
ax.set_xlabel('Block Size (KB)')
ax.set_ylabel('Propagation Spread (ms, p10 reference)')
ax.set_title('Binned Analysis: Dissemination Time vs Block Size')
ax.legend()
ax.grid(True, alpha=0.3)
# Plot 4: Distribution of propagation times
ax = axes[1, 1]
ax.hist(df['spread_p10_to_p90_ms'], bins=50, alpha=0.7, edgecolor='black')
ax.set_xlabel('Propagation Spread p10→p90 (ms)')
ax.set_ylabel('Frequency')
ax.set_title('Distribution of Block Dissemination Times (p10→p90)')
ax.axvline(df['spread_p10_to_p90_ms'].median(), color='red', linestyle='--',
label=f'Median: {df["spread_p10_to_p90_ms"].median():.0f}ms')
ax.legend()
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('block_propagation_vs_size.png', dpi=300, bbox_inches='tight')
print("\nPlot saved to: block_propagation_vs_size.png")
plt.show()
def print_summary_stats(df):
"""Print summary statistics."""
print("\n" + "="*60)
print("SUMMARY STATISTICS")
print("="*60)
print("\nBlock Size Distribution:")
print(f" Mean: {df['block_total_bytes'].mean()/1024:.1f} KB")
print(f" Median: {df['block_total_bytes'].median()/1024:.1f} KB")
print(f" Min: {df['block_total_bytes'].min()/1024:.1f} KB")
print(f" Max: {df['block_total_bytes'].max()/1024:.1f} KB")
print("\nPropagation Spread to p90 - Comparing Reference Points:")
print(f" Using MIN (first observer):")
print(f" Mean: {df['spread_min_to_p90_ms'].mean():.1f} ms")
print(f" Median: {df['spread_min_to_p90_ms'].median():.1f} ms")
print(f" p95: {df['spread_min_to_p90_ms'].quantile(0.95):.1f} ms")
print(f"\n Using 2nd-fastest observer:")
print(f" Mean: {df['spread_2nd_to_p90_ms'].mean():.1f} ms")
print(f" Median: {df['spread_2nd_to_p90_ms'].median():.1f} ms")
print(f" p95: {df['spread_2nd_to_p90_ms'].quantile(0.95):.1f} ms")
print(f"\n Using p10 reference (RECOMMENDED for n=8-20):")
print(f" Mean: {df['spread_p10_to_p90_ms'].mean():.1f} ms")
print(f" Median: {df['spread_p10_to_p90_ms'].median():.1f} ms")
print(f" p95: {df['spread_p10_to_p90_ms'].quantile(0.95):.1f} ms")
# Check mean/median ratio for sanity
ratio = df['spread_p10_to_p90_ms'].mean() / df['spread_p10_to_p90_ms'].median()
print(f"\n Mean/Median ratio: {ratio:.2f}")
if ratio > 3:
print(f" ⚠️ High ratio suggests long tail or remaining outliers")
print("\nCorrelation Analysis (using p10→p90 spread):")
corr_total = np.corrcoef(df['block_total_bytes'].to_numpy(),
df['spread_p10_to_p90_ms'].to_numpy())[0,1]
corr_exec = np.corrcoef(df['execution_payload_transactions_total_bytes'].to_numpy(),
df['spread_p10_to_p90_ms'].to_numpy())[0,1]
print(f" Block size vs propagation spread: {corr_total:.3f}")
print(f" Exec payload size vs propagation spread: {corr_exec:.3f}")
if abs(corr_total) < 0.1:
print(f"\n ⚠️ Very weak correlation ({corr_total:.3f})")
print(f" This means: high variance, but quartile analysis may still show monotone trend")
# Bandwidth analysis using quartiles (more robust to variance)
print("\n" + "="*60)
print("EFFECTIVE DISSEMINATION BANDWIDTH ANALYSIS (p10→p90)")
print("="*60)
# Use quartile medians for more robust estimate
df_sorted = df.sort('block_total_bytes')
# Get Q1 and Q4 medians
q1_end = int(len(df_sorted) * 0.25)
q4_start = int(len(df_sorted) * 0.75)
q1_data = df_sorted[:q1_end]
q4_data = df_sorted[q4_start:]
q1_size_kb = q1_data['block_total_bytes'].median() / 1024
q1_time_ms = q1_data['spread_p10_to_p90_ms'].median()
q4_size_kb = q4_data['block_total_bytes'].median() / 1024
q4_time_ms = q4_data['spread_p10_to_p90_ms'].median()
# Calculate effective bandwidth from Q1 to Q4
delta_size_kb = q4_size_kb - q1_size_kb
delta_time_ms = q4_time_ms - q1_time_ms
print(f"\nQuartile-based Analysis (Q1 vs Q4):")
print(f" Q1 median: {q1_size_kb:.1f} KB → {q1_time_ms:.1f} ms dissemination")
print(f" Q4 median: {q4_size_kb:.1f} KB → {q4_time_ms:.1f} ms dissemination")
print(f" Delta: {delta_size_kb:.1f} KB takes {delta_time_ms:.1f} ms longer")
if delta_time_ms > 0:
bandwidth_kb_per_s = delta_size_kb / (delta_time_ms / 1000) # KB/s
bandwidth_mbps = bandwidth_kb_per_s * 8 / 1000 # Mbps
print(f"\n Effective dissemination bandwidth: {bandwidth_kb_per_s:.1f} KB/s = {bandwidth_mbps:.2f} Mbps")
print(f" Time per KB: {delta_time_ms/delta_size_kb:.2f} ms/KB")
# Estimate fixed overhead (extrapolate back to 0 KB)
estimated_overhead = q1_time_ms - (q1_size_kb * (delta_time_ms / delta_size_kb))
print(f"\n Estimated fixed overhead: {estimated_overhead:.0f} ms")
print(f" (gossip protocol, validation, processing)")
# Check if extrapolation is reasonable
print(f"\n Extrapolation Warning:")
print(f" This slope is based on data from {q1_size_kb:.0f} KB to {q4_size_kb:.0f} KB")
print(f" Extrapolating to 2200 KB ({2200/q4_size_kb:.1f}x beyond Q4 median) is risky")
# Practical implications within observed range
print(f"\n Predictions (within/near observed range):")
for test_size_kb in [128, 256, 512, 1024]:
predicted_time = estimated_overhead + (test_size_kb * (delta_time_ms / delta_size_kb))
beyond_q4 = test_size_kb > q4_size_kb * 1.2
warning = " ⚠️ extrapolation" if beyond_q4 else ""
print(f" {test_size_kb:4d} KB block → ~{predicted_time:.0f} ms to reach p90{warning}")
else:
print("\n ❌ Unable to calculate bandwidth - dissemination time doesn't increase with size")
print(" This suggests network topology and fixed overheads dominate over transfer time")
# Also note the high variance
print(f"\n Variance Note:")
print(f" Correlation coefficient: {corr_total:.3f}")
if abs(corr_total) < 0.1:
print(f" The size effect is WEAK relative to other factors")
print(f" Network conditions, topology, node diversity dominate")
# Binned analysis (using p10→p90 spread)
print("\n" + "="*60)
print("PROPAGATION BY BLOCK SIZE QUARTILES (p10→p90)")
print("="*60)
df_sorted = df.sort('block_total_bytes')
quartiles = [0, 0.25, 0.5, 0.75, 1.0]
print(f"\n{'Quartile':<12} {'Size Range':<20} {'Median p10→p90':<18} {'Slowdown vs Q1'}")
print("-" * 70)
q1_prop_median = None
for i in range(len(quartiles)-1):
start_idx = int(len(df_sorted) * quartiles[i])
end_idx = int(len(df_sorted) * quartiles[i+1])
subset = df_sorted[start_idx:end_idx]
size_min = subset['block_total_bytes'].min() / 1024
size_max = subset['block_total_bytes'].max() / 1024
size_median = subset['block_total_bytes'].median() / 1024
prop_median = subset['spread_p10_to_p90_ms'].median()
if i == 0:
q1_prop_median = prop_median
slowdown_str = "baseline"
else:
slowdown_pct = 100 * (prop_median - q1_prop_median) / q1_prop_median
slowdown_str = f"+{slowdown_pct:.1f}%"
print(f"Q{i+1:<11} {size_min:>6.0f}-{size_max:<6.0f} KB (med:{size_median:>5.0f}) {prop_median:>7.1f} ms {slowdown_str}")
# Also show extreme bins
print(f"\n Bottom 5%: {df_sorted[:int(len(df_sorted)*0.05)]['spread_p10_to_p90_ms'].median():.1f} ms median")
print(f" Top 5%: {df_sorted[int(len(df_sorted)*0.95):]['spread_p10_to_p90_ms'].median():.1f} ms median")
if __name__ == "__main__":
# Run analysis
df = analyze_propagation()
# Generate plots
plot_results(df)
# Print summary
print_summary_stats(df)
# Optionally save data (convert block_root to string for CSV compatibility)
df_to_save = df.with_columns(
pl.col('block_root').cast(pl.Utf8).alias('block_root')
)
df_to_save.write_csv('block_propagation_data.csv')
print("\nData saved to: block_propagation_data.csv")
duckdb>=0.9.0
polars>=0.19.0
pandas>=2.0.0
matplotlib>=3.7.0
numpy>=1.24.0
pyarrow>=10.0.0
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment