Skip to content

Instantly share code, notes, and snippets.

@copyleftdev
Created April 12, 2026 02:36
Show Gist options
  • Select an option

  • Save copyleftdev/72ddc99a1163118ce6c17f83c2cff11a to your computer and use it in GitHub Desktop.

Select an option

Save copyleftdev/72ddc99a1163118ce6c17f83c2cff11a to your computer and use it in GitHub Desktop.
Palimpsest: Determinism Verification Harness
//! Determinism verification harness.
//!
//! Crawls a SimulatedWeb twice with the same seed and asserts
//! bit-identical results. Any divergence is a Law 1 violation.
use std::path::Path;
use std::sync::Arc;
use chrono::Utc;
use palimpsest_core::hash::ContentHash;
use palimpsest_core::time::CaptureInstant;
use palimpsest_core::types::CrawlContextId;
use palimpsest_fetch::links::extract_links;
use palimpsest_frontier::policy::PolitenessPolicy;
use palimpsest_frontier::scheduler::Frontier;
use palimpsest_index::entry::IndexEntry;
use palimpsest_index::memory::InMemoryIndex;
use palimpsest_index::query::IndexQuery;
use palimpsest_storage::blob::BlobStore;
use palimpsest_storage::memory::InMemoryBlobStore;
use tracing::info;
use url::Url;
use crate::fetcher::SimulatedFetcher;
use crate::web::SimulatedWeb;
/// Result of a determinism verification run.
#[derive(Debug)]
pub struct VerificationResult {
/// URLs fetched in this run (in order).
pub urls: Vec<String>,
/// Content hashes of all stored blobs (sorted).
pub blob_hashes: Vec<String>,
/// Index entries (sorted by URL + timestamp).
pub index_entries: Vec<(String, String)>, // (url, content_hash)
/// Total pages fetched.
pub pages_fetched: usize,
/// Total errors.
pub errors: usize,
}
/// Run a single crawl against a SimulatedWeb and collect results.
pub async fn run_simulated_crawl(
web: Arc<SimulatedWeb>,
seeds: &[Url],
max_depth: u32,
max_urls: usize,
) -> VerificationResult {
let seed = web.seed();
let fetcher = SimulatedFetcher::new(Arc::clone(&web));
let mut frontier = Frontier::new(seed, PolitenessPolicy::no_delay());
let storage = InMemoryBlobStore::new();
let mut index = InMemoryIndex::new();
let mut logical_clock: u64 = 0;
let mut urls_fetched = Vec::new();
let mut errors = 0;
// Seed the frontier.
for seed_url in seeds {
frontier.push_seed(seed_url.clone());
}
// Crawl loop (sequential for determinism verification).
let mut total_fetched = 0;
let now = chrono::DateTime::from_timestamp(1714000000, 0).unwrap_or_else(Utc::now);
while total_fetched < max_urls {
let Some(entry) = frontier.pop(now) else {
break;
};
if entry.depth > max_depth {
continue;
}
logical_clock += 1;
let capture_instant = CaptureInstant::new(now, logical_clock);
match fetcher.fetch(&entry.url, capture_instant) {
Ok(result) => {
total_fetched += 1;
urls_fetched.push(entry.url.to_string());
// Store.
if let Ok(hash) = storage.put(result.response_record.payload.clone()).await {
index.insert(IndexEntry::new(
entry.url.clone(),
capture_instant,
hash,
CrawlContextId(1),
));
}
// Extract links from HTML only.
if result.is_html() && entry.depth < max_depth {
let body = String::from_utf8_lossy(&result.response_record.payload);
let discovered = extract_links(&body, &entry.url);
for link in discovered {
let parent_hash = ContentHash::of(entry.url.as_str().as_bytes());
frontier.push_discovered(link, entry.depth + 1, parent_hash);
}
}
}
Err(_) => {
errors += 1;
}
}
}
// Collect results in deterministic order.
let all_entries = index.query(&IndexQuery::default());
let index_entries: Vec<(String, String)> = all_entries
.iter()
.map(|e| (e.url.to_string(), e.content_hash.as_hex()))
.collect();
// Collect blob hashes.
let mut blob_hashes: Vec<String> = index_entries.iter().map(|(_, h)| h.clone()).collect();
blob_hashes.sort();
blob_hashes.dedup();
VerificationResult {
urls: urls_fetched,
blob_hashes,
index_entries,
pages_fetched: total_fetched,
errors,
}
}
/// Verify determinism: crawl twice with the same seed, assert identical results.
///
/// This is the core proof that the system satisfies Law 1.
/// Any divergence between run_a and run_b is a determinism violation.
pub async fn verify_determinism(
web_factory: impl Fn() -> SimulatedWeb,
seeds: &[Url],
max_depth: u32,
max_urls: usize,
) -> Result<VerificationResult, String> {
let web_a = Arc::new(web_factory());
let web_b = Arc::new(web_factory());
info!("determinism verification: starting run A");
let result_a = run_simulated_crawl(web_a, seeds, max_depth, max_urls).await;
info!("determinism verification: starting run B");
let result_b = run_simulated_crawl(web_b, seeds, max_depth, max_urls).await;
// Compare results.
if result_a.urls != result_b.urls {
return Err(format!(
"URL order diverged: run A fetched {} URLs, run B fetched {}.\n\
First difference at index {}",
result_a.urls.len(),
result_b.urls.len(),
result_a
.urls
.iter()
.zip(result_b.urls.iter())
.position(|(a, b)| a != b)
.unwrap_or(result_a.urls.len().min(result_b.urls.len())),
));
}
if result_a.blob_hashes != result_b.blob_hashes {
return Err(format!(
"Blob hashes diverged: {} vs {} unique blobs",
result_a.blob_hashes.len(),
result_b.blob_hashes.len(),
));
}
if result_a.index_entries != result_b.index_entries {
return Err("Index entries diverged between runs".into());
}
info!(
pages = result_a.pages_fetched,
blobs = result_a.blob_hashes.len(),
"determinism verification: PASSED — identical results"
);
Ok(result_a)
}
/// Verify that crawl resumption preserves determinism.
///
/// Crawl N pages, save frontier, load frontier, crawl M more.
/// Compare against a continuous crawl of N+M pages.
/// If they produce the same final index, resumption is deterministic.
pub async fn verify_resumption_determinism(
web_factory: impl Fn() -> SimulatedWeb,
seeds: &[Url],
max_depth: u32,
first_batch: usize,
second_batch: usize,
frontier_path: &Path,
) -> Result<(), String> {
// Run A: continuous crawl of first_batch + second_batch.
let web_a = Arc::new(web_factory());
let result_continuous =
run_simulated_crawl(web_a, seeds, max_depth, first_batch + second_batch).await;
// Run B: split crawl — first_batch, save, load, second_batch.
let web_b = Arc::new(web_factory());
let seed = web_b.seed();
// Phase 1: crawl first_batch, save frontier.
let fetcher = SimulatedFetcher::new(Arc::clone(&web_b));
let mut frontier = Frontier::new(seed, PolitenessPolicy::no_delay());
let storage = InMemoryBlobStore::new();
let mut index = InMemoryIndex::new();
let mut logical_clock: u64 = 0;
for seed_url in seeds {
frontier.push_seed(seed_url.clone());
}
let now = chrono::DateTime::from_timestamp(1714000000, 0).unwrap_or_else(Utc::now);
let mut total_fetched = 0;
while total_fetched < first_batch {
let Some(entry) = frontier.pop(now) else {
break;
};
if entry.depth > max_depth {
continue;
}
logical_clock += 1;
let capture_instant = CaptureInstant::new(now, logical_clock);
if let Ok(result) = fetcher.fetch(&entry.url, capture_instant) {
total_fetched += 1;
if let Ok(hash) = storage.put(result.response_record.payload.clone()).await {
index.insert(IndexEntry::new(
entry.url.clone(),
capture_instant,
hash,
CrawlContextId(1),
));
}
if result.is_html() && entry.depth < max_depth {
let body = String::from_utf8_lossy(&result.response_record.payload);
for link in extract_links(&body, &entry.url) {
let parent_hash = ContentHash::of(entry.url.as_str().as_bytes());
frontier.push_discovered(link, entry.depth + 1, parent_hash);
}
}
}
}
// Save frontier.
frontier
.save(frontier_path)
.map_err(|e| format!("frontier save failed: {e}"))?;
// Phase 2: load frontier, crawl second_batch more.
let mut frontier2 = Frontier::new(seed, PolitenessPolicy::no_delay());
frontier2
.load(frontier_path)
.map_err(|e| format!("frontier load failed: {e}"))?;
let mut phase2_fetched = 0;
while phase2_fetched < second_batch {
let Some(entry) = frontier2.pop(now) else {
break;
};
if entry.depth > max_depth {
continue;
}
logical_clock += 1;
let capture_instant = CaptureInstant::new(now, logical_clock);
if let Ok(result) = fetcher.fetch(&entry.url, capture_instant) {
phase2_fetched += 1;
if let Ok(hash) = storage.put(result.response_record.payload.clone()).await {
index.insert(IndexEntry::new(
entry.url.clone(),
capture_instant,
hash,
CrawlContextId(1),
));
}
if result.is_html() && entry.depth < max_depth {
let body = String::from_utf8_lossy(&result.response_record.payload);
for link in extract_links(&body, &entry.url) {
let parent_hash = ContentHash::of(entry.url.as_str().as_bytes());
frontier2.push_discovered(link, entry.depth + 1, parent_hash);
}
}
}
}
// Compare: split crawl should produce same URLs as continuous crawl.
let split_entries = index.query(&IndexQuery::default());
let split_urls: Vec<String> = split_entries.iter().map(|e| e.url.to_string()).collect();
if result_continuous.urls.len() != split_urls.len() {
return Err(format!(
"resumption diverged: continuous fetched {}, split fetched {}",
result_continuous.urls.len(),
split_urls.len(),
));
}
// Compare blob sets (order may differ due to split timing).
let mut continuous_hashes: Vec<String> = result_continuous.blob_hashes.clone();
let mut split_hashes: Vec<String> = split_entries
.iter()
.map(|e| e.content_hash.as_hex())
.collect();
continuous_hashes.sort();
continuous_hashes.dedup();
split_hashes.sort();
split_hashes.dedup();
if continuous_hashes != split_hashes {
return Err(format!(
"blob hashes diverged: continuous {} unique, split {} unique",
continuous_hashes.len(),
split_hashes.len(),
));
}
info!(
continuous = result_continuous.pages_fetched,
split = total_fetched + phase2_fetched,
"resumption determinism: PASSED — same content"
);
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment