#if 0 // self-compiling code: just 'chmod +x' this file and run it directly! c++ -std=c++11 -Wall -Wextra -pedantic -Werror -g -O3 -march=native $0 || exit 1 exec ./a.out $* #endif // SPDX-FileCopyrightText: 2023 Martin J. Fiedler // SPDX-License-Identifier: MIT // C++11 implementation for the One Billion Row Challenge // (https://github.com/gunnarmorling/1brc) // // The input file must be named "measurements.txt". // // Techniques used here: // - memory mapping: Instead of read()ing chunks of the input file, it's // mmap()ed in its entirety. // - multithreading: Split the file into as many (approximately) equal-sized // chunks as there are threads, process them separately, // and merge the measurements at the end. // - integer values: As it turns out, strtod() / atof() are *very* slow! // Since the values are only using (at most) one decimal // digit anyway, a fast custom parser for those can be used, // and all math can be integer. // - custom hash map: std::unordered_map is also slow, so a custom hash map // implementation is used. It's using a simple // rotate-and-XOR or rotate-and-add hash function, and can // use a configurable number of steps of quadratic probing // before falling back to a linked list for each bucket. // - custom allocator: To avoid implicit locks, each thread gets its own heap // for run-time allocations (such as the key strings of // the hash map, and the extra objects in the linked list) // with a simple bump allocator. // Everything else is standard C++11 fare. #include #include #include #include #include #include #include #include #include #include #include #include #include // CONFIGURATION SECTION. int threadCount = 8; // default thread count (can be set on command line too) // hash map configuration constexpr int hashBuckets = 2048; // number of buckets in the hash map constexpr int probeLimit = 1; // number of steps of quadratic probing // before falling back to a linked list typedef uint16_t hash_t; // data type to use for the hash constexpr int rotateAmount = 13; // bits to rotate left after each step #define HASH_OP ^ // operation to add the current byte: + or ^ // derived parameters constexpr hash_t hashMask = hash_t(hashBuckets - 1); static_assert((hashBuckets & (hashBuckets - 1)) == 0, "hashBuckets must be a power of two"); /////////////////////////////////////////////////////////////////////////////// // memory-mapped input helper class #ifdef _WIN32 // WARNING: Win32 support hasn't been tested as thoroughly! #define WIN32_LEAN_AND_MEAN #define NOMINMAX #include class MemoryMappedInputFile { HANDLE hFile = INVALID_HANDLE_VALUE; HANDLE hMap = NULL; public: void* data = nullptr; size_t size = 0; inline MemoryMappedInputFile(const char* filename) { hFile = CreateFileA(filename, GENERIC_READ, FILE_SHARE_READ, nullptr, OPEN_EXISTING, 0, nullptr); if (hFile == INVALID_HANDLE_VALUE) { return; } DWORD sizeH, sizeL; sizeL = GetFileSize(hFile, &sizeH); size = (size_t(sizeH) << 32) | size_t(sizeL); hMap = CreateFileMappingA(hFile, nullptr, PAGE_READONLY, sizeH, sizeL, nullptr); if (hMap == NULL) { return; } data = MapViewOfFile(hMap, FILE_MAP_READ, 0, 0, size); } inline ~MemoryMappedInputFile() { if (data) { UnmapViewOfFile(data); } if (hMap != NULL) { CloseHandle(hMap); } if (hFile != INVALID_HANDLE_VALUE) { CloseHandle(hFile); } } }; #else #include #include #include #include #include class MemoryMappedInputFile { int fd = -1; public: void* data = nullptr; size_t size = 0; inline MemoryMappedInputFile(const char* filename) { fd = open(filename, O_RDONLY); if (fd < 0) { return; } size = lseek(fd, 0, SEEK_END); if (size == size_t(-1)) { return; } data = mmap(nullptr, size, PROT_READ, MAP_SHARED, fd, 0); } inline ~MemoryMappedInputFile() { if (data) { munmap(data, size); } if (fd >= 0) { close(fd); } } }; #endif /////////////////////////////////////////////////////////////////////////////// // simple bump allocator heap (so we don't need to use malloc() much at runtime) class Heap { struct Page { Page* prev; size_t offset; uint8_t data[1]; }; size_t pageSize; Page* currentPage = nullptr; void newPage(); public: inline Heap(size_t pageSize_=1048576) : pageSize(pageSize_) { newPage(); } void* alloc(size_t size); inline char* addString(const char* str) { size_t size = strlen(str) + 1; void* res = alloc(size); memcpy(res, static_cast(str), size); return static_cast(res); } template inline T* addObject() { T* obj = static_cast(alloc(sizeof(T))); new(obj) T; return obj; } ~Heap(); }; void Heap::newPage() { Page* p = static_cast(malloc(pageSize + sizeof(Page))); assert(p != nullptr); p->prev = currentPage; p->offset = 0; currentPage = p; } void* Heap::alloc(size_t size) { assert(size <= pageSize); if ((currentPage->offset + size) > pageSize) { newPage(); } void* res = static_cast(¤tPage->data[currentPage->offset]); currentPage->offset = (currentPage->offset + size + 15) & (~15); return res; } Heap::~Heap() { while (currentPage) { Page *prev = currentPage->prev; free(static_cast(currentPage)); currentPage = prev; } } /////////////////////////////////////////////////////////////////////////////// // single statistics slot struct StatsSlot { const char* key = nullptr; // key ("city name"); if NULL, slot is unused StatsSlot* next = nullptr; // linked list of // using stats as fixed-point values with a precision of 1/10th, // because libc's strtod() is too slow to be useful int16_t vMin = 32767; int16_t vMax = -32768; int count = 0; int64_t sum = 0; // note: the size of this structure is exactly 32 bytes ... isn't that convenient? inline StatsSlot() {} void mergeFrom(const StatsSlot& source); inline void mergeTo(StatsSlot& dest) const { dest.mergeFrom(*this); } }; // statistics generator class, representing a single worker thread class StatsGenerator { std::array map; // main hash map Heap heap; // keys and additional linked list slots go here std::thread *thread = nullptr; // thread (non-NULL while running) static void threadFunc(StatsGenerator* self); // main key lookup function; always returns a valid slot, either a newly- // created one, or an existing one StatsSlot* get(const char* key); public: char* pos = nullptr; // start position (inside of run(): current position) char* end = nullptr; // one past end position (MUST end with a newline!) inline StatsGenerator() {} void run(); // process until pos == end void runThread(); // spawn a new thread that executes run() void joinThread(); // wait for thread to finish void mergeFrom(StatsGenerator& source); void mergeTo(StatsGenerator& dest) { dest.mergeFrom(*this); } void dump(FILE *out); // run a callback function on all used statistics slots void visit(std::function visitor) const; }; StatsSlot* StatsGenerator::get(const char* key) { // compute rotate-and-ADD/XOR hash hash_t h = 0; for (const char* keyPos = key;;) { uint8_t c = uint8_t(*keyPos++); if (!c) { break; } h = (h << rotateAmount) | (h >> (sizeof(h) * 8 - rotateAmount)); h = h HASH_OP hash_t(c); } // look up (and, possibly, create) a slot using a hash map with quadratic probing StatsSlot* slot = &map[h & hashMask]; for (int probe = 0;;) { if (!slot->key) { // empty slot -> claim it slot->key = heap.addString(key); assert(slot->next == nullptr); assert(slot->count == 0); return slot; } else if (!strcmp(slot->key, key)) { // correct slot found return slot; } else if (slot->next) { // other slot -> follow the linked list slot = slot->next; } else if (probe < probeLimit) { // end of list -> try quadratic probing h += (++probe); slot = &map[h & hashMask]; } else { // still no free slot found -> add one to the last examined slot's list slot->next = heap.addObject(); slot = slot->next; slot->key = heap.addString(key); return slot; } } } void StatsGenerator::visit(std::function visitor) const { for (hash_t h = 0; h <= hashMask; ++h) { const StatsSlot* slot = &map[h]; do { if (slot->key) { visitor(*slot); } } while ((slot = slot->next)); } } void StatsGenerator::run() { constexpr size_t MaxKeyLen = 80; char key[MaxKeyLen]; while (pos < end) { // parse the key char *keypos = key; char c; do { c = *pos++; *keypos++ = c; } while (c != ';'); keypos[-1] = '\0'; // parse the value bool negative = (*pos == '-'); if (negative) { ++pos; } int16_t value = 0; bool haveDot = false; while (pos < end) { char c = *pos++; if (c == '\n') { break; } if ((c >= '0') && (c <= '9')) { value = value * 10 + c - '0'; } if (c == '.') { haveDot = true; } } if (!haveDot) { value *= 10; } if (negative) { value = -value; } // enter stats StatsSlot *slot = get(key); if (value < slot->vMin) { slot->vMin = value; } if (value > slot->vMax) { slot->vMax = value; } slot->sum += int64_t(value); slot->count++; } } void StatsGenerator::threadFunc(StatsGenerator* self) { self->run(); } void StatsGenerator::runThread() { assert(!thread); thread = new std::thread(StatsGenerator::threadFunc, this); } void StatsGenerator::joinThread() { if (thread) { thread->join(); delete thread; thread = nullptr; } } void StatsSlot::mergeFrom(const StatsSlot& source) { if (source.vMin < vMin) { vMin = source.vMin; } if (source.vMax > vMax) { vMax = source.vMax; } sum += source.sum; count += source.count; } void StatsGenerator::mergeFrom(StatsGenerator& source) { source.joinThread(); source.visit([&] (const StatsSlot& slot) { get(slot.key)->mergeFrom(slot); }); } void StatsGenerator::dump(FILE *out) { std::vector keys; visit([&] (const StatsSlot& slot) { keys.emplace_back(slot.key); }); std::sort(keys.begin(), keys.end()); const char* prefix = "{"; for (const auto& key : keys) { const StatsSlot* slot = get(key.c_str()); fprintf(out, "%s%s=%.1f/%.1f/%.1f", prefix, key.c_str(), double(slot->vMin) * 0.1, double(slot->sum) / double(slot->count) * 0.1, double(slot->vMax) * 0.1); prefix = ", "; } fprintf(out, "}\n"); } /////////////////////////////////////////////////////////////////////////////// int main(int argc, char* argv[]) { if (argc > 1) { // override thread count; special setting: 0 = don't use threads at all threadCount = std::max(atoi(argv[1]), 0); } auto tStart = std::chrono::steady_clock::now(); MemoryMappedInputFile infile("measurements.txt"); if (!infile.data) { fprintf(stderr, "ERROR: failed to load input file\n"); return 1; } std::vector threads(threadCount); char *startPos = static_cast(infile.data); char *splitPos = startPos; for (int i = 0; i < threadCount; ++i) { threads[i].pos = splitPos; splitPos = startPos + (infile.size * (i + 1) / threadCount); if (i < (threadCount - 1)) { // always split at line boundary while (splitPos[-1] != '\n') { ++splitPos; } } threads[i].end = splitPos; threads[i].runThread(); } StatsGenerator result; if (threadCount < 1) { // run true single-threaded mode result.pos = startPos; result.end = startPos + infile.size; result.run(); } for (auto& thread : threads) { result.mergeFrom(thread); } result.dump(stdout); auto tEnd = std::chrono::steady_clock::now(); fprintf(stderr, "took %.3f seconds with %d threads\n", std::chrono::duration_cast>(tEnd - tStart).count(), threadCount); return 0; }