Last active
December 16, 2015 19:29
-
-
Save p4checo/5485036 to your computer and use it in GitHub Desktop.
Revisions
-
p4checo revised this gist
Apr 29, 2013 . 1 changed file with 0 additions and 3 deletions.There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -2,9 +2,6 @@ // main.cpp // TripleBufferBenchMark // #include "TripleBuffer.hxx" #include "TripleBufferQueue.hxx" -
p4checo created this gist
Apr 29, 2013 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,175 @@ //============================================================================ // Name : TripleBuffer.hxx // Author : André Pacheco Neves // Version : 1.0 (27/01/13) // Copyright : Copyright (c) 2013, André Pacheco Neves // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // * Neither the name of the <organization> nor the // names of its contributors may be used to endorse or promote products // derived from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // Description : Template class for a TripleBuffer as a concurrency mechanism, using atomic operations // Credits : http://remis-thoughts.blogspot.pt/2012/01/triple-buffering-as-concurrency_30.html //============================================================================ #ifndef TRIPLEBUFFER_HXX_ #define TRIPLEBUFFER_HXX_ #include <atomic> using namespace std; template <typename T> class TripleBuffer { public: TripleBuffer<T>(); TripleBuffer<T>(const T& init); // non-copyable behavior TripleBuffer<T>(const TripleBuffer<T>&) = delete; TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete; T snap() const; // get the current snap to read void write(const T newT); // write a new value bool newSnap(); // swap to the latest value, if any void flipWriter(); // flip writer positions dirty / clean T readLast(); // wrapper to read the last available element (newSnap + snap) void update(T newT); // wrapper to update with a new element (write + flipWriter) private: bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1 uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes // 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap) // newWrite = (flags & 0x40) // dirtyIndex = (flags & 0x30) >> 4 // cleanIndex = (flags & 0xC) >> 2 // snapIndex = (flags & 0x3) mutable atomic_uint_fast8_t flags; T buffer[3]; }; // include implementation in header since it is a template template <typename T> TripleBuffer<T>::TripleBuffer(){ T dummy = T(); buffer[0] = dummy; buffer[1] = dummy; buffer[2] = dummy; flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2 } template <typename T> TripleBuffer<T>::TripleBuffer(const T& init){ buffer[0] = init; buffer[1] = init; buffer[2] = init; flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2 } template <typename T> T TripleBuffer<T>::snap() const{ return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index } template <typename T> void TripleBuffer<T>::write(const T newT){ buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index } template <typename T> bool TripleBuffer<T>::newSnap(){ uint_fast8_t flagsNow; uint_fast8_t newFlags; do { flagsNow = flags.load(std::memory_order_consume); if( !isNewWrite(flagsNow) ) // nothing new, no need to swap return false; newFlags = swapSnapWithClean(flagsNow); } while(!flags.compare_exchange_weak(flagsNow, newFlags, memory_order_release, memory_order_consume)); return true; } template <typename T> void TripleBuffer<T>::flipWriter(){ uint_fast8_t flagsNow; uint_fast8_t newFlags; do { flagsNow = flags.load(std::memory_order_consume); newFlags = newWriteSwapCleanWithDirty(flagsNow); } while(!flags.compare_exchange_weak(flagsNow, newFlags, memory_order_release, memory_order_consume)); } template <typename T> T TripleBuffer<T>::readLast(){ newSnap(); // get most recent value return snap(); // return it } template <typename T> void TripleBuffer<T>::update(T newT){ write(newT); // write new value flipWriter(); // change dirty/clean buffer positions for the next update } template <typename T> bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){ // check if the newWrite bit is 1 return ((flags & 0x40) != 0); } template <typename T> uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){ // swap snap with clean return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2); } template <typename T> uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){ // set newWrite bit to 1 and swap clean with dirty return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3); } #endif /* TRIPLEBUFFER_HXX_ */ This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,48 @@ // // TripleBufferQueue.hxx // TripleBufferBenchmark // #include <atomic> class TripleBufferQueue { public: TripleBufferQueue() : head(0), tail(0) {} bool push(void *value) { bool no_drop = true; size_t c = head.load(m1); size_t n = next(c); if (n == tail.load(m2)) { c = next(n); n = next(c); no_drop = false; } head.store(n, m3); buffer[c] = value; return no_drop; } bool pop(void **value) { size_t t = tail.load(m1); if (t == head.load(m2)) return false; *value = buffer[tail]; tail.store(next(t), m3); return true; } private: const static std::memory_order m1 = std::memory_order_relaxed, m2 = std::memory_order_acquire, m3 = std::memory_order_release; const static size_t size = 3; void * buffer[size]; std::atomic<size_t> head, tail; size_t next(size_t current) { return (current + 1) % size; } }; This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,54 @@ Example run: Producer at 120Hz Consumer at 60Hz TripleBufferQueue (20 runs, 10 seconds each) # Produced Consumed +-Delta Total 1 496 494 2 990 2 589 588 1 1177 3 599 597 2 1196 4 597 595 2 1192 5 599 598 1 1197 6 592 590 2 1182 7 590 588 2 1178 8 595 593 2 1188 9 598 596 2 1194 10 582 580 2 1162 11 600 599 1 1199 12 573 572 1 1145 13 599 597 2 1196 14 590 588 2 1178 15 600 599 1 1199 16 599 598 1 1197 17 600 599 1 1199 18 593 592 1 1185 19 600 598 2 1198 20 600 599 1 1199 --- -------- -------- -------- -------- Avg 589.55 588.00 1.55 1177.55 TripleBuffer (20 runs, 10 seconds each) # Produced Consumed +-Delta Total 1 1200 600 600 1800 2 1200 599 601 1799 3 1200 599 601 1799 4 1200 599 601 1799 5 1200 599 601 1799 6 1200 599 601 1799 7 1200 600 600 1800 8 1200 600 600 1800 9 1200 600 600 1800 10 1200 600 600 1800 11 1199 600 599 1799 12 1200 599 601 1799 13 1200 599 601 1799 14 1200 600 600 1800 15 1200 600 600 1800 16 1200 599 601 1799 17 1200 600 600 1800 18 1199 600 599 1799 19 1200 599 601 1799 20 1200 599 601 1799 --- -------- -------- -------- -------- Avg 1199.90 599.50 600.40 1799.40 This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,298 @@ // // main.cpp // TripleBufferBenchMark // // Created by André Pacheco Neves on 29/04/13. // Copyright (c) 2013 VisionArea. All rights reserved. // #include "TripleBuffer.hxx" #include "TripleBufferQueue.hxx" #include <thread> #include <iostream> #include <vector> #include <cmath> typedef std::chrono::high_resolution_clock Clock; typedef std::chrono::nanoseconds nanoseconds; enum ThreadType{ PRODUCER = 0, CONSUMER }; //-------------------------- // Configuration static const int RUN_SECONDS = 1; static const int NUM_RUNS = 20; static const bool FREE_LOOP_MODE = false; static const long NANOS_IN_SEC = 1000000000L; static const int PRODUCER_RATE = 120; // number of producer writes per second static const long PRODUCER_TICK_PERIOD = roundf(NANOS_IN_SEC / (float) PRODUCER_RATE); // nanoseconds static const int CONSUMER_RATE = 60; // number of consumer reads per second static const long CONSUMER_TICK_PERIOD = roundf(NANOS_IN_SEC / (float) CONSUMER_RATE); // nanoseconds //--------------------------- // TripleBufferQueue typedef struct ArgTBQ{ TripleBufferQueue tbq; std::atomic<bool> run; int counter[2]; std::thread *consumer; std::thread *producer; } ArgTBQ; void tbqProducer(ArgTBQ* a){ if( FREE_LOOP_MODE ){ int i=0; while(a->run){ if(a->tbq.push(&i)) a->counter[PRODUCER]++; i++; } } else { Clock::time_point currentTime = Clock::now(); double accumulator = 0; int i=0; while (a->run) { Clock::time_point newTime = Clock::now(); double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count(); currentTime = newTime; accumulator += frameTime; while (accumulator >= PRODUCER_TICK_PERIOD){ if(a->tbq.push(&i)) a->counter[PRODUCER]++; i++; accumulator -= PRODUCER_TICK_PERIOD; } } } } void tbqConsumer(ArgTBQ* a){ if( FREE_LOOP_MODE ){ while(a->run){ int* j; if(a->tbq.pop((void**)&j)) a->counter[CONSUMER]++; } } else { Clock::time_point currentTime = Clock::now(); double accumulator = 0; while (a->run) { Clock::time_point newTime = Clock::now(); double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count(); currentTime = newTime; accumulator += frameTime; while (accumulator >= CONSUMER_TICK_PERIOD){ int* j; if(a->tbq.pop((void**)&j)) a->counter[CONSUMER]++; accumulator -= CONSUMER_TICK_PERIOD; } } } } void testTripleBufferQueue(ArgTBQ *args){ args->run = true; args->counter[PRODUCER] = 0; args->counter[CONSUMER] = 0; args->producer = new std::thread(tbqProducer, args); args->consumer = new std::thread(tbqConsumer, args); } //--------------------------- // TripleBuffer typedef struct ArgTB{ TripleBuffer<int> tb; std::atomic<bool> run; volatile int counter[2]; std::thread *consumer; std::thread *producer; } ArgTB; void tbProducer(ArgTB* a){ if( FREE_LOOP_MODE ){ int i=0; while(a->run){ a->tb.update(i); a->counter[PRODUCER]++; i++; } } else { Clock::time_point currentTime = Clock::now(); double accumulator = 0; int i=0; while (a->run) { Clock::time_point newTime = Clock::now(); double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count(); currentTime = newTime; accumulator += frameTime; while (accumulator >= PRODUCER_TICK_PERIOD){ a->tb.update(i); a->counter[PRODUCER]++; i++; accumulator -= PRODUCER_TICK_PERIOD; } } } } void tbConsumer(ArgTB* a){ if( FREE_LOOP_MODE ){ while(a->run){ a->tb.readLast(); a->counter[CONSUMER]++; } } else { Clock::time_point currentTime = Clock::now(); double accumulator = 0; while (a->run) { Clock::time_point newTime = Clock::now(); double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count(); currentTime = newTime; accumulator += frameTime; while (accumulator >= CONSUMER_TICK_PERIOD){ a->tb.readLast(); a->counter[CONSUMER]++; accumulator -= CONSUMER_TICK_PERIOD; } } } } void testTripleBuffer(ArgTB *args){ args->run = true; args->counter[PRODUCER] = 0; args->counter[CONSUMER] = 0; args->producer = new std::thread(tbProducer, args); args->consumer = new std::thread(tbConsumer, args); } //--------------------------- // Benchmark int main(int argc, const char * argv[]) { long sumPrd=0, sumCon=0; // ----------------------- std::vector<ArgTBQ> argsTBQ(NUM_RUNS); printf(" TripleBufferQueue (%d runs, %d seconds each)\n", NUM_RUNS, RUN_SECONDS); printf(" # Produced Consumed +-Delta Total\n"); for (int i=0; i<NUM_RUNS; i++){ testTripleBufferQueue(&argsTBQ[i]); std::this_thread::sleep_for(chrono::nanoseconds(RUN_SECONDS * NANOS_IN_SEC)); argsTBQ[i].run = false; argsTBQ[i].producer->join(); argsTBQ[i].consumer->join(); delete argsTBQ[i].producer; delete argsTBQ[i].consumer; int prd = argsTBQ[i].counter[PRODUCER]; int con = argsTBQ[i].counter[CONSUMER]; sumPrd += prd; sumCon += con; printf("%3d %10d %10d %10d %10d\n", (i+1), prd, con, prd - con, prd + con); } printf("--- -------- -------- -------- --------\n"); printf("Avg %10.2f %10.2f %10.2f %10.2f\n\n", (float)sumPrd/NUM_RUNS, (float)sumCon/NUM_RUNS, (float)(sumPrd - sumCon)/NUM_RUNS,(float)(sumPrd + sumCon)/NUM_RUNS); // ------------------------ std::vector<ArgTB> argsTB(NUM_RUNS); printf(" TripleBuffer (%d runs, %d seconds each)\n", NUM_RUNS, RUN_SECONDS); printf(" # Produced Consumed +-Delta Total\n"); sumPrd=0; sumCon=0; for (int i=0; i<NUM_RUNS; i++){ testTripleBuffer(&argsTB[i]); std::this_thread::sleep_for(chrono::nanoseconds(RUN_SECONDS * NANOS_IN_SEC)); argsTB[i].run = false; argsTB[i].producer->join(); argsTB[i].consumer->join(); delete argsTB[i].producer; delete argsTB[i].consumer; int prd = argsTB[i].counter[PRODUCER]; int con = argsTB[i].counter[CONSUMER]; sumPrd += prd; sumCon += con; printf("%3d %10d %10d %10d %10d\n", (i+1), prd, con, prd - con, prd + con); } printf("--- -------- -------- -------- --------\n"); printf("Avg %10.2f %10.2f %10.2f %10.2f\n\n", (float)sumPrd/NUM_RUNS, (float)sumCon/NUM_RUNS, (float)(sumPrd - sumCon)/NUM_RUNS,(float)(sumPrd + sumCon)/NUM_RUNS); return 0; }