use aho_corasick::AhoCorasick; use anyhow::Result; use crossbeam_channel::{bounded, Receiver, Sender}; use crossbeam_utils::thread; use itertools::Itertools; use memmap::Mmap; use std::convert::TryInto; use std::fs::File; use std::str::from_utf8; const NUM_WORKERS: usize = 32; const CHUNK_SIZE: usize = 10 * 1024 * 1024; const DUMP_FILE: &'static str = "/home/naitik/Downloads/enwiki-20210101-pages-articles-multistream.xml"; const DEBUG: bool = true; fn tag_text<'a>(doc: &'a roxmltree::Document, tag: &'static str) -> &'a str { doc.descendants() .find(|n| n.has_tag_name(tag)) .expect("tag to be found") .text() .expect("tag to have text") } fn process_chunk(data: &Mmap, receiver: &Receiver<(usize, usize)>) -> Result<()> { let ac = AhoCorasick::new_auto_configured(&["", ""]); for (start, end) in receiver { let chunk = &data[start..end]; for (start, end) in ac.find_iter(&chunk).tuples() { assert_eq!(start.pattern(), 0); assert_eq!(end.pattern(), 1); let text = from_utf8(&chunk[start.start()..end.end()])?; let doc = roxmltree::Document::parse(text)?; let id: u64 = tag_text(&doc, "id").parse()?; let title = tag_text(&doc, "title"); println!("id: {}, title: {}", id, title); } } Ok(()) } fn read_dump(data: &Mmap, len: usize, sender: Sender<(usize, usize)>) -> Result<()> { let ac = AhoCorasick::new_auto_configured(&[""]); let mut start = ac .find(data.as_ref()) .expect("to find start of page") .start(); let ac = AhoCorasick::new_auto_configured(&[""]); loop { if DEBUG && start > 1_000_000 { break; } if start >= len { break; } let mut end = start + CHUNK_SIZE; if end >= len { end = len; } end = match ac.find(&data[end..]) { Some(m) => end + m.end(), None => match twoway::rfind_bytes(&data[start..], b"") { Some(v) => start + v + 7, None => break, }, }; sender.send((start, end))?; start = end; } Ok(()) } fn main() -> Result<()> { let file = File::open(DUMP_FILE)?; let len: usize = file.metadata()?.len().try_into()?; let data = unsafe { Mmap::map(&file)? }; let (sender, receiver) = bounded(NUM_WORKERS); thread::scope(|s| { s.spawn(|_| read_dump(&data, len, sender).unwrap()); for _ in 0..NUM_WORKERS { s.spawn(|_| process_chunk(&data, &receiver).unwrap()); } }) .unwrap(); Ok(()) }