Created
April 12, 2026 02:36
-
-
Save copyleftdev/72ddc99a1163118ce6c17f83c2cff11a to your computer and use it in GitHub Desktop.
Palimpsest: Determinism Verification Harness
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| //! Determinism verification harness. | |
| //! | |
| //! Crawls a SimulatedWeb twice with the same seed and asserts | |
| //! bit-identical results. Any divergence is a Law 1 violation. | |
| use std::path::Path; | |
| use std::sync::Arc; | |
| use chrono::Utc; | |
| use palimpsest_core::hash::ContentHash; | |
| use palimpsest_core::time::CaptureInstant; | |
| use palimpsest_core::types::CrawlContextId; | |
| use palimpsest_fetch::links::extract_links; | |
| use palimpsest_frontier::policy::PolitenessPolicy; | |
| use palimpsest_frontier::scheduler::Frontier; | |
| use palimpsest_index::entry::IndexEntry; | |
| use palimpsest_index::memory::InMemoryIndex; | |
| use palimpsest_index::query::IndexQuery; | |
| use palimpsest_storage::blob::BlobStore; | |
| use palimpsest_storage::memory::InMemoryBlobStore; | |
| use tracing::info; | |
| use url::Url; | |
| use crate::fetcher::SimulatedFetcher; | |
| use crate::web::SimulatedWeb; | |
| /// Result of a determinism verification run. | |
| #[derive(Debug)] | |
| pub struct VerificationResult { | |
| /// URLs fetched in this run (in order). | |
| pub urls: Vec<String>, | |
| /// Content hashes of all stored blobs (sorted). | |
| pub blob_hashes: Vec<String>, | |
| /// Index entries (sorted by URL + timestamp). | |
| pub index_entries: Vec<(String, String)>, // (url, content_hash) | |
| /// Total pages fetched. | |
| pub pages_fetched: usize, | |
| /// Total errors. | |
| pub errors: usize, | |
| } | |
| /// Run a single crawl against a SimulatedWeb and collect results. | |
| pub async fn run_simulated_crawl( | |
| web: Arc<SimulatedWeb>, | |
| seeds: &[Url], | |
| max_depth: u32, | |
| max_urls: usize, | |
| ) -> VerificationResult { | |
| let seed = web.seed(); | |
| let fetcher = SimulatedFetcher::new(Arc::clone(&web)); | |
| let mut frontier = Frontier::new(seed, PolitenessPolicy::no_delay()); | |
| let storage = InMemoryBlobStore::new(); | |
| let mut index = InMemoryIndex::new(); | |
| let mut logical_clock: u64 = 0; | |
| let mut urls_fetched = Vec::new(); | |
| let mut errors = 0; | |
| // Seed the frontier. | |
| for seed_url in seeds { | |
| frontier.push_seed(seed_url.clone()); | |
| } | |
| // Crawl loop (sequential for determinism verification). | |
| let mut total_fetched = 0; | |
| let now = chrono::DateTime::from_timestamp(1714000000, 0).unwrap_or_else(Utc::now); | |
| while total_fetched < max_urls { | |
| let Some(entry) = frontier.pop(now) else { | |
| break; | |
| }; | |
| if entry.depth > max_depth { | |
| continue; | |
| } | |
| logical_clock += 1; | |
| let capture_instant = CaptureInstant::new(now, logical_clock); | |
| match fetcher.fetch(&entry.url, capture_instant) { | |
| Ok(result) => { | |
| total_fetched += 1; | |
| urls_fetched.push(entry.url.to_string()); | |
| // Store. | |
| if let Ok(hash) = storage.put(result.response_record.payload.clone()).await { | |
| index.insert(IndexEntry::new( | |
| entry.url.clone(), | |
| capture_instant, | |
| hash, | |
| CrawlContextId(1), | |
| )); | |
| } | |
| // Extract links from HTML only. | |
| if result.is_html() && entry.depth < max_depth { | |
| let body = String::from_utf8_lossy(&result.response_record.payload); | |
| let discovered = extract_links(&body, &entry.url); | |
| for link in discovered { | |
| let parent_hash = ContentHash::of(entry.url.as_str().as_bytes()); | |
| frontier.push_discovered(link, entry.depth + 1, parent_hash); | |
| } | |
| } | |
| } | |
| Err(_) => { | |
| errors += 1; | |
| } | |
| } | |
| } | |
| // Collect results in deterministic order. | |
| let all_entries = index.query(&IndexQuery::default()); | |
| let index_entries: Vec<(String, String)> = all_entries | |
| .iter() | |
| .map(|e| (e.url.to_string(), e.content_hash.as_hex())) | |
| .collect(); | |
| // Collect blob hashes. | |
| let mut blob_hashes: Vec<String> = index_entries.iter().map(|(_, h)| h.clone()).collect(); | |
| blob_hashes.sort(); | |
| blob_hashes.dedup(); | |
| VerificationResult { | |
| urls: urls_fetched, | |
| blob_hashes, | |
| index_entries, | |
| pages_fetched: total_fetched, | |
| errors, | |
| } | |
| } | |
| /// Verify determinism: crawl twice with the same seed, assert identical results. | |
| /// | |
| /// This is the core proof that the system satisfies Law 1. | |
| /// Any divergence between run_a and run_b is a determinism violation. | |
| pub async fn verify_determinism( | |
| web_factory: impl Fn() -> SimulatedWeb, | |
| seeds: &[Url], | |
| max_depth: u32, | |
| max_urls: usize, | |
| ) -> Result<VerificationResult, String> { | |
| let web_a = Arc::new(web_factory()); | |
| let web_b = Arc::new(web_factory()); | |
| info!("determinism verification: starting run A"); | |
| let result_a = run_simulated_crawl(web_a, seeds, max_depth, max_urls).await; | |
| info!("determinism verification: starting run B"); | |
| let result_b = run_simulated_crawl(web_b, seeds, max_depth, max_urls).await; | |
| // Compare results. | |
| if result_a.urls != result_b.urls { | |
| return Err(format!( | |
| "URL order diverged: run A fetched {} URLs, run B fetched {}.\n\ | |
| First difference at index {}", | |
| result_a.urls.len(), | |
| result_b.urls.len(), | |
| result_a | |
| .urls | |
| .iter() | |
| .zip(result_b.urls.iter()) | |
| .position(|(a, b)| a != b) | |
| .unwrap_or(result_a.urls.len().min(result_b.urls.len())), | |
| )); | |
| } | |
| if result_a.blob_hashes != result_b.blob_hashes { | |
| return Err(format!( | |
| "Blob hashes diverged: {} vs {} unique blobs", | |
| result_a.blob_hashes.len(), | |
| result_b.blob_hashes.len(), | |
| )); | |
| } | |
| if result_a.index_entries != result_b.index_entries { | |
| return Err("Index entries diverged between runs".into()); | |
| } | |
| info!( | |
| pages = result_a.pages_fetched, | |
| blobs = result_a.blob_hashes.len(), | |
| "determinism verification: PASSED — identical results" | |
| ); | |
| Ok(result_a) | |
| } | |
| /// Verify that crawl resumption preserves determinism. | |
| /// | |
| /// Crawl N pages, save frontier, load frontier, crawl M more. | |
| /// Compare against a continuous crawl of N+M pages. | |
| /// If they produce the same final index, resumption is deterministic. | |
| pub async fn verify_resumption_determinism( | |
| web_factory: impl Fn() -> SimulatedWeb, | |
| seeds: &[Url], | |
| max_depth: u32, | |
| first_batch: usize, | |
| second_batch: usize, | |
| frontier_path: &Path, | |
| ) -> Result<(), String> { | |
| // Run A: continuous crawl of first_batch + second_batch. | |
| let web_a = Arc::new(web_factory()); | |
| let result_continuous = | |
| run_simulated_crawl(web_a, seeds, max_depth, first_batch + second_batch).await; | |
| // Run B: split crawl — first_batch, save, load, second_batch. | |
| let web_b = Arc::new(web_factory()); | |
| let seed = web_b.seed(); | |
| // Phase 1: crawl first_batch, save frontier. | |
| let fetcher = SimulatedFetcher::new(Arc::clone(&web_b)); | |
| let mut frontier = Frontier::new(seed, PolitenessPolicy::no_delay()); | |
| let storage = InMemoryBlobStore::new(); | |
| let mut index = InMemoryIndex::new(); | |
| let mut logical_clock: u64 = 0; | |
| for seed_url in seeds { | |
| frontier.push_seed(seed_url.clone()); | |
| } | |
| let now = chrono::DateTime::from_timestamp(1714000000, 0).unwrap_or_else(Utc::now); | |
| let mut total_fetched = 0; | |
| while total_fetched < first_batch { | |
| let Some(entry) = frontier.pop(now) else { | |
| break; | |
| }; | |
| if entry.depth > max_depth { | |
| continue; | |
| } | |
| logical_clock += 1; | |
| let capture_instant = CaptureInstant::new(now, logical_clock); | |
| if let Ok(result) = fetcher.fetch(&entry.url, capture_instant) { | |
| total_fetched += 1; | |
| if let Ok(hash) = storage.put(result.response_record.payload.clone()).await { | |
| index.insert(IndexEntry::new( | |
| entry.url.clone(), | |
| capture_instant, | |
| hash, | |
| CrawlContextId(1), | |
| )); | |
| } | |
| if result.is_html() && entry.depth < max_depth { | |
| let body = String::from_utf8_lossy(&result.response_record.payload); | |
| for link in extract_links(&body, &entry.url) { | |
| let parent_hash = ContentHash::of(entry.url.as_str().as_bytes()); | |
| frontier.push_discovered(link, entry.depth + 1, parent_hash); | |
| } | |
| } | |
| } | |
| } | |
| // Save frontier. | |
| frontier | |
| .save(frontier_path) | |
| .map_err(|e| format!("frontier save failed: {e}"))?; | |
| // Phase 2: load frontier, crawl second_batch more. | |
| let mut frontier2 = Frontier::new(seed, PolitenessPolicy::no_delay()); | |
| frontier2 | |
| .load(frontier_path) | |
| .map_err(|e| format!("frontier load failed: {e}"))?; | |
| let mut phase2_fetched = 0; | |
| while phase2_fetched < second_batch { | |
| let Some(entry) = frontier2.pop(now) else { | |
| break; | |
| }; | |
| if entry.depth > max_depth { | |
| continue; | |
| } | |
| logical_clock += 1; | |
| let capture_instant = CaptureInstant::new(now, logical_clock); | |
| if let Ok(result) = fetcher.fetch(&entry.url, capture_instant) { | |
| phase2_fetched += 1; | |
| if let Ok(hash) = storage.put(result.response_record.payload.clone()).await { | |
| index.insert(IndexEntry::new( | |
| entry.url.clone(), | |
| capture_instant, | |
| hash, | |
| CrawlContextId(1), | |
| )); | |
| } | |
| if result.is_html() && entry.depth < max_depth { | |
| let body = String::from_utf8_lossy(&result.response_record.payload); | |
| for link in extract_links(&body, &entry.url) { | |
| let parent_hash = ContentHash::of(entry.url.as_str().as_bytes()); | |
| frontier2.push_discovered(link, entry.depth + 1, parent_hash); | |
| } | |
| } | |
| } | |
| } | |
| // Compare: split crawl should produce same URLs as continuous crawl. | |
| let split_entries = index.query(&IndexQuery::default()); | |
| let split_urls: Vec<String> = split_entries.iter().map(|e| e.url.to_string()).collect(); | |
| if result_continuous.urls.len() != split_urls.len() { | |
| return Err(format!( | |
| "resumption diverged: continuous fetched {}, split fetched {}", | |
| result_continuous.urls.len(), | |
| split_urls.len(), | |
| )); | |
| } | |
| // Compare blob sets (order may differ due to split timing). | |
| let mut continuous_hashes: Vec<String> = result_continuous.blob_hashes.clone(); | |
| let mut split_hashes: Vec<String> = split_entries | |
| .iter() | |
| .map(|e| e.content_hash.as_hex()) | |
| .collect(); | |
| continuous_hashes.sort(); | |
| continuous_hashes.dedup(); | |
| split_hashes.sort(); | |
| split_hashes.dedup(); | |
| if continuous_hashes != split_hashes { | |
| return Err(format!( | |
| "blob hashes diverged: continuous {} unique, split {} unique", | |
| continuous_hashes.len(), | |
| split_hashes.len(), | |
| )); | |
| } | |
| info!( | |
| continuous = result_continuous.pages_fetched, | |
| split = total_fetched + phase2_fetched, | |
| "resumption determinism: PASSED — same content" | |
| ); | |
| Ok(()) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment