Skip to content

Instantly share code, notes, and snippets.

@p4checo
Last active December 16, 2015 19:29
Show Gist options
  • Select an option

  • Save p4checo/5485036 to your computer and use it in GitHub Desktop.

Select an option

Save p4checo/5485036 to your computer and use it in GitHub Desktop.

Revisions

  1. p4checo revised this gist Apr 29, 2013. 1 changed file with 0 additions and 3 deletions.
    3 changes: 0 additions & 3 deletions main.cpp
    Original file line number Diff line number Diff line change
    @@ -2,9 +2,6 @@
    // main.cpp
    // TripleBufferBenchMark
    //
    // Created by André Pacheco Neves on 29/04/13.
    // Copyright (c) 2013 VisionArea. All rights reserved.
    //

    #include "TripleBuffer.hxx"
    #include "TripleBufferQueue.hxx"
  2. p4checo created this gist Apr 29, 2013.
    175 changes: 175 additions & 0 deletions TripleBuffer.hxx
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,175 @@
    //============================================================================
    // Name : TripleBuffer.hxx
    // Author : André Pacheco Neves
    // Version : 1.0 (27/01/13)
    // Copyright : Copyright (c) 2013, André Pacheco Neves
    // All rights reserved.
    //
    // Redistribution and use in source and binary forms, with or without
    // modification, are permitted provided that the following conditions are met:
    // * Redistributions of source code must retain the above copyright
    // notice, this list of conditions and the following disclaimer.
    // * Redistributions in binary form must reproduce the above copyright
    // notice, this list of conditions and the following disclaimer in the
    // documentation and/or other materials provided with the distribution.
    // * Neither the name of the <organization> nor the
    // names of its contributors may be used to endorse or promote products
    // derived from this software without specific prior written permission.
    //
    // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
    // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
    // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
    // DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
    // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
    // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
    // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
    // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
    // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    // Description : Template class for a TripleBuffer as a concurrency mechanism, using atomic operations
    // Credits : http://remis-thoughts.blogspot.pt/2012/01/triple-buffering-as-concurrency_30.html
    //============================================================================

    #ifndef TRIPLEBUFFER_HXX_
    #define TRIPLEBUFFER_HXX_

    #include <atomic>

    using namespace std;

    template <typename T>
    class TripleBuffer
    {

    public:

    TripleBuffer<T>();
    TripleBuffer<T>(const T& init);

    // non-copyable behavior
    TripleBuffer<T>(const TripleBuffer<T>&) = delete;
    TripleBuffer<T>& operator=(const TripleBuffer<T>&) = delete;

    T snap() const; // get the current snap to read
    void write(const T newT); // write a new value
    bool newSnap(); // swap to the latest value, if any
    void flipWriter(); // flip writer positions dirty / clean

    T readLast(); // wrapper to read the last available element (newSnap + snap)
    void update(T newT); // wrapper to update with a new element (write + flipWriter)

    private:

    bool isNewWrite(uint_fast8_t flags); // check if the newWrite bit is 1
    uint_fast8_t swapSnapWithClean(uint_fast8_t flags); // swap Snap and Clean indexes
    uint_fast8_t newWriteSwapCleanWithDirty(uint_fast8_t flags); // set newWrite to 1 and swap Clean and Dirty indexes

    // 8 bit flags are (unused) (new write) (2x dirty) (2x clean) (2x snap)
    // newWrite = (flags & 0x40)
    // dirtyIndex = (flags & 0x30) >> 4
    // cleanIndex = (flags & 0xC) >> 2
    // snapIndex = (flags & 0x3)
    mutable atomic_uint_fast8_t flags;

    T buffer[3];
    };

    // include implementation in header since it is a template

    template <typename T>
    TripleBuffer<T>::TripleBuffer(){

    T dummy = T();

    buffer[0] = dummy;
    buffer[1] = dummy;
    buffer[2] = dummy;

    flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
    }

    template <typename T>
    TripleBuffer<T>::TripleBuffer(const T& init){

    buffer[0] = init;
    buffer[1] = init;
    buffer[2] = init;

    flags.store(0x6, std::memory_order_relaxed); // initially dirty = 0, clean = 1 and snap = 2
    }

    template <typename T>
    T TripleBuffer<T>::snap() const{

    return buffer[flags.load(std::memory_order_consume) & 0x3]; // read snap index
    }

    template <typename T>
    void TripleBuffer<T>::write(const T newT){

    buffer[(flags.load(std::memory_order_consume) & 0x30) >> 4] = newT; // write into dirty index
    }

    template <typename T>
    bool TripleBuffer<T>::newSnap(){

    uint_fast8_t flagsNow;
    uint_fast8_t newFlags;
    do {
    flagsNow = flags.load(std::memory_order_consume);
    if( !isNewWrite(flagsNow) ) // nothing new, no need to swap
    return false;
    newFlags = swapSnapWithClean(flagsNow);
    } while(!flags.compare_exchange_weak(flagsNow,
    newFlags,
    memory_order_release,
    memory_order_consume));

    return true;
    }

    template <typename T>
    void TripleBuffer<T>::flipWriter(){

    uint_fast8_t flagsNow;
    uint_fast8_t newFlags;
    do {
    flagsNow = flags.load(std::memory_order_consume);
    newFlags = newWriteSwapCleanWithDirty(flagsNow);
    } while(!flags.compare_exchange_weak(flagsNow,
    newFlags,
    memory_order_release,
    memory_order_consume));
    }

    template <typename T>
    T TripleBuffer<T>::readLast(){
    newSnap(); // get most recent value
    return snap(); // return it
    }

    template <typename T>
    void TripleBuffer<T>::update(T newT){
    write(newT); // write new value
    flipWriter(); // change dirty/clean buffer positions for the next update
    }

    template <typename T>
    bool TripleBuffer<T>::isNewWrite(uint_fast8_t flags){
    // check if the newWrite bit is 1
    return ((flags & 0x40) != 0);
    }

    template <typename T>
    uint_fast8_t TripleBuffer<T>::swapSnapWithClean(uint_fast8_t flags){
    // swap snap with clean
    return (flags & 0x30) | ((flags & 0x3) << 2) | ((flags & 0xC) >> 2);
    }

    template <typename T>
    uint_fast8_t TripleBuffer<T>::newWriteSwapCleanWithDirty(uint_fast8_t flags){
    // set newWrite bit to 1 and swap clean with dirty
    return 0x40 | ((flags & 0xC) << 2) | ((flags & 0x30) >> 2) | (flags & 0x3);
    }

    #endif /* TRIPLEBUFFER_HXX_ */
    48 changes: 48 additions & 0 deletions TripleBufferQueue.hxx
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,48 @@
    //
    // TripleBufferQueue.hxx
    // TripleBufferBenchmark
    //

    #include <atomic>

    class TripleBufferQueue
    {
    public:
    TripleBufferQueue() : head(0), tail(0) {}

    bool push(void *value)
    {
    bool no_drop = true;
    size_t c = head.load(m1);
    size_t n = next(c);
    if (n == tail.load(m2))
    {
    c = next(n);
    n = next(c);
    no_drop = false;
    }
    head.store(n, m3);
    buffer[c] = value;
    return no_drop;
    }
    bool pop(void **value)
    {
    size_t t = tail.load(m1);
    if (t == head.load(m2))
    return false;
    *value = buffer[tail];
    tail.store(next(t), m3);
    return true;
    }
    private:
    const static std::memory_order
    m1 = std::memory_order_relaxed,
    m2 = std::memory_order_acquire,
    m3 = std::memory_order_release;

    const static size_t size = 3;
    void * buffer[size];
    std::atomic<size_t> head, tail;

    size_t next(size_t current) { return (current + 1) % size; }
    };
    54 changes: 54 additions & 0 deletions example_run
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,54 @@
    Example run:

    Producer at 120Hz
    Consumer at 60Hz

    TripleBufferQueue (20 runs, 10 seconds each)
    # Produced Consumed +-Delta Total
    1 496 494 2 990
    2 589 588 1 1177
    3 599 597 2 1196
    4 597 595 2 1192
    5 599 598 1 1197
    6 592 590 2 1182
    7 590 588 2 1178
    8 595 593 2 1188
    9 598 596 2 1194
    10 582 580 2 1162
    11 600 599 1 1199
    12 573 572 1 1145
    13 599 597 2 1196
    14 590 588 2 1178
    15 600 599 1 1199
    16 599 598 1 1197
    17 600 599 1 1199
    18 593 592 1 1185
    19 600 598 2 1198
    20 600 599 1 1199
    --- -------- -------- -------- --------
    Avg 589.55 588.00 1.55 1177.55

    TripleBuffer (20 runs, 10 seconds each)
    # Produced Consumed +-Delta Total
    1 1200 600 600 1800
    2 1200 599 601 1799
    3 1200 599 601 1799
    4 1200 599 601 1799
    5 1200 599 601 1799
    6 1200 599 601 1799
    7 1200 600 600 1800
    8 1200 600 600 1800
    9 1200 600 600 1800
    10 1200 600 600 1800
    11 1199 600 599 1799
    12 1200 599 601 1799
    13 1200 599 601 1799
    14 1200 600 600 1800
    15 1200 600 600 1800
    16 1200 599 601 1799
    17 1200 600 600 1800
    18 1199 600 599 1799
    19 1200 599 601 1799
    20 1200 599 601 1799
    --- -------- -------- -------- --------
    Avg 1199.90 599.50 600.40 1799.40
    298 changes: 298 additions & 0 deletions main.cpp
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,298 @@
    //
    // main.cpp
    // TripleBufferBenchMark
    //
    // Created by André Pacheco Neves on 29/04/13.
    // Copyright (c) 2013 VisionArea. All rights reserved.
    //

    #include "TripleBuffer.hxx"
    #include "TripleBufferQueue.hxx"

    #include <thread>
    #include <iostream>
    #include <vector>
    #include <cmath>

    typedef std::chrono::high_resolution_clock Clock;
    typedef std::chrono::nanoseconds nanoseconds;

    enum ThreadType{
    PRODUCER = 0,
    CONSUMER
    };

    //--------------------------
    // Configuration

    static const int RUN_SECONDS = 1;
    static const int NUM_RUNS = 20;

    static const bool FREE_LOOP_MODE = false;

    static const long NANOS_IN_SEC = 1000000000L;

    static const int PRODUCER_RATE = 120; // number of producer writes per second
    static const long PRODUCER_TICK_PERIOD = roundf(NANOS_IN_SEC / (float) PRODUCER_RATE); // nanoseconds

    static const int CONSUMER_RATE = 60; // number of consumer reads per second
    static const long CONSUMER_TICK_PERIOD = roundf(NANOS_IN_SEC / (float) CONSUMER_RATE); // nanoseconds

    //---------------------------
    // TripleBufferQueue

    typedef struct ArgTBQ{
    TripleBufferQueue tbq;
    std::atomic<bool> run;
    int counter[2];
    std::thread *consumer;
    std::thread *producer;
    } ArgTBQ;

    void tbqProducer(ArgTBQ* a){

    if( FREE_LOOP_MODE ){
    int i=0;
    while(a->run){

    if(a->tbq.push(&i))
    a->counter[PRODUCER]++;
    i++;
    }
    }
    else {

    Clock::time_point currentTime = Clock::now();
    double accumulator = 0;

    int i=0;

    while (a->run) {
    Clock::time_point newTime = Clock::now();

    double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count();

    currentTime = newTime;
    accumulator += frameTime;

    while (accumulator >= PRODUCER_TICK_PERIOD){
    if(a->tbq.push(&i))
    a->counter[PRODUCER]++;
    i++;

    accumulator -= PRODUCER_TICK_PERIOD;
    }
    }
    }
    }

    void tbqConsumer(ArgTBQ* a){

    if( FREE_LOOP_MODE ){
    while(a->run){

    int* j;
    if(a->tbq.pop((void**)&j))
    a->counter[CONSUMER]++;
    }
    }
    else {

    Clock::time_point currentTime = Clock::now();
    double accumulator = 0;

    while (a->run) {
    Clock::time_point newTime = Clock::now();

    double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count();

    currentTime = newTime;
    accumulator += frameTime;

    while (accumulator >= CONSUMER_TICK_PERIOD){
    int* j;
    if(a->tbq.pop((void**)&j))
    a->counter[CONSUMER]++;

    accumulator -= CONSUMER_TICK_PERIOD;
    }
    }
    }
    }

    void testTripleBufferQueue(ArgTBQ *args){

    args->run = true;
    args->counter[PRODUCER] = 0;
    args->counter[CONSUMER] = 0;

    args->producer = new std::thread(tbqProducer, args);
    args->consumer = new std::thread(tbqConsumer, args);

    }

    //---------------------------
    // TripleBuffer

    typedef struct ArgTB{
    TripleBuffer<int> tb;
    std::atomic<bool> run;
    volatile int counter[2];
    std::thread *consumer;
    std::thread *producer;
    } ArgTB;

    void tbProducer(ArgTB* a){

    if( FREE_LOOP_MODE ){
    int i=0;
    while(a->run){

    a->tb.update(i);
    a->counter[PRODUCER]++;
    i++;
    }
    }
    else {

    Clock::time_point currentTime = Clock::now();
    double accumulator = 0;
    int i=0;

    while (a->run) {
    Clock::time_point newTime = Clock::now();

    double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count();

    currentTime = newTime;
    accumulator += frameTime;

    while (accumulator >= PRODUCER_TICK_PERIOD){
    a->tb.update(i);
    a->counter[PRODUCER]++;
    i++;

    accumulator -= PRODUCER_TICK_PERIOD;
    }
    }
    }
    }

    void tbConsumer(ArgTB* a){

    if( FREE_LOOP_MODE ){
    while(a->run){

    a->tb.readLast();
    a->counter[CONSUMER]++;
    }
    }
    else {

    Clock::time_point currentTime = Clock::now();
    double accumulator = 0;

    while (a->run) {
    Clock::time_point newTime = Clock::now();

    double frameTime = std::chrono::duration_cast<nanoseconds>(newTime - currentTime).count();

    currentTime = newTime;
    accumulator += frameTime;

    while (accumulator >= CONSUMER_TICK_PERIOD){
    a->tb.readLast();
    a->counter[CONSUMER]++;

    accumulator -= CONSUMER_TICK_PERIOD;
    }
    }
    }
    }

    void testTripleBuffer(ArgTB *args){

    args->run = true;
    args->counter[PRODUCER] = 0;
    args->counter[CONSUMER] = 0;

    args->producer = new std::thread(tbProducer, args);
    args->consumer = new std::thread(tbConsumer, args);
    }

    //---------------------------
    // Benchmark

    int main(int argc, const char * argv[])
    {
    long sumPrd=0, sumCon=0;

    // -----------------------

    std::vector<ArgTBQ> argsTBQ(NUM_RUNS);

    printf(" TripleBufferQueue (%d runs, %d seconds each)\n", NUM_RUNS, RUN_SECONDS);
    printf(" # Produced Consumed +-Delta Total\n");

    for (int i=0; i<NUM_RUNS; i++){

    testTripleBufferQueue(&argsTBQ[i]);

    std::this_thread::sleep_for(chrono::nanoseconds(RUN_SECONDS * NANOS_IN_SEC));

    argsTBQ[i].run = false;

    argsTBQ[i].producer->join();
    argsTBQ[i].consumer->join();

    delete argsTBQ[i].producer;
    delete argsTBQ[i].consumer;

    int prd = argsTBQ[i].counter[PRODUCER];
    int con = argsTBQ[i].counter[CONSUMER];

    sumPrd += prd;
    sumCon += con;

    printf("%3d %10d %10d %10d %10d\n", (i+1), prd, con, prd - con, prd + con);
    }
    printf("--- -------- -------- -------- --------\n");
    printf("Avg %10.2f %10.2f %10.2f %10.2f\n\n", (float)sumPrd/NUM_RUNS, (float)sumCon/NUM_RUNS, (float)(sumPrd - sumCon)/NUM_RUNS,(float)(sumPrd + sumCon)/NUM_RUNS);

    // ------------------------

    std::vector<ArgTB> argsTB(NUM_RUNS);

    printf(" TripleBuffer (%d runs, %d seconds each)\n", NUM_RUNS, RUN_SECONDS);
    printf(" # Produced Consumed +-Delta Total\n");

    sumPrd=0;
    sumCon=0;

    for (int i=0; i<NUM_RUNS; i++){

    testTripleBuffer(&argsTB[i]);

    std::this_thread::sleep_for(chrono::nanoseconds(RUN_SECONDS * NANOS_IN_SEC));

    argsTB[i].run = false;

    argsTB[i].producer->join();
    argsTB[i].consumer->join();

    delete argsTB[i].producer;
    delete argsTB[i].consumer;

    int prd = argsTB[i].counter[PRODUCER];
    int con = argsTB[i].counter[CONSUMER];

    sumPrd += prd;
    sumCon += con;

    printf("%3d %10d %10d %10d %10d\n", (i+1), prd, con, prd - con, prd + con);
    }
    printf("--- -------- -------- -------- --------\n");
    printf("Avg %10.2f %10.2f %10.2f %10.2f\n\n", (float)sumPrd/NUM_RUNS, (float)sumCon/NUM_RUNS, (float)(sumPrd - sumCon)/NUM_RUNS,(float)(sumPrd + sumCon)/NUM_RUNS);

    return 0;
    }