Skip to content

Instantly share code, notes, and snippets.

@vmrob
Last active March 29, 2021 23:40
Show Gist options
  • Select an option

  • Save vmrob/e4fde208302ae8979b57 to your computer and use it in GitHub Desktop.

Select an option

Save vmrob/e4fde208302ae8979b57 to your computer and use it in GitHub Desktop.

Revisions

  1. vmrob revised this gist Oct 2, 2015. 1 changed file with 11 additions and 17 deletions.
    28 changes: 11 additions & 17 deletions channel.cpp
    Original file line number Diff line number Diff line change
    @@ -68,21 +68,21 @@ class buffered_channel {
    return _open || !_queue.empty();
    }

    void operator<<(T lhs) {
    void send(T val) {
    if (!_open) {
    throw channel_closed("send attempt while closed");
    }
    _queue.push(std::move(lhs));
    _queue.push(std::move(val));
    }

    T& operator>>(T& rhs) {
    bool recv(T& val) {
    while (is_open()) {
    if (_queue.try_pop(rhs)) {
    return rhs;
    if (_queue.try_pop(val)) {
    return true;
    }
    std::this_thread::yield();
    }
    throw channel_closed("receive attempt while closed");
    return false;
    }
    };

    @@ -97,24 +97,19 @@ int main() {
    for (int i = 0; i < 2; ++i) {
    producers.emplace_back([&]{
    for (size_t i = 0; i < 1000; ++i) {
    input << 1;
    input.send(1);
    }
    });

    consumers.emplace_back([&]{
    size_t total = 0;
    size_t next = 0;

    try {
    while (input.is_open()) {
    input >> next;
    total += next;
    }
    } catch (channel_closed) {
    // nop
    while (input.recv(next)) {
    total += next;
    }

    output << total;
    output.send(total);
    });
    }

    @@ -133,8 +128,7 @@ int main() {
    size_t total = 0;
    size_t next = 0;

    while (output.is_open()) {
    output >> next;
    while (output.recv(next)) {
    total += next;
    }

  2. vmrob created this gist Oct 2, 2015.
    144 changes: 144 additions & 0 deletions channel.cpp
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,144 @@
    #include <future>
    #include <iostream>
    #include <thread>
    #include <queue>

    template <typename T>
    class concurrent_queue {
    private:
    std::queue<T> _queue;
    std::mutex _mutex;

    public:
    bool empty() {
    std::lock_guard<std::mutex> l(_mutex);
    return _queue.empty();
    }

    size_t size() {
    std::lock_guard<std::mutex> l(_mutex);
    return _queue.size();
    }

    bool try_pop(T& out) {
    std::lock_guard<std::mutex> l(_mutex);

    if (_queue.empty()) {
    return false;
    }

    out = std::move(_queue.front());

    _queue.pop();

    return true;
    }

    void push(T val) {
    std::lock_guard<std::mutex> l(_mutex);
    _queue.push(std::move(val));
    }
    };

    class channel_closed : public std::runtime_error {
    using std::runtime_error::runtime_error;
    };

    template <typename T>
    class buffered_channel {
    private:
    concurrent_queue<T> _queue;

    std::atomic_bool _open;

    public:
    buffered_channel() {
    _open.store(true);
    }

    ~buffered_channel() {
    drain();
    }

    void drain() {
    _open.store(false);
    }

    bool is_open() {
    return _open || !_queue.empty();
    }

    void operator<<(T lhs) {
    if (!_open) {
    throw channel_closed("send attempt while closed");
    }
    _queue.push(std::move(lhs));
    }

    T& operator>>(T& rhs) {
    while (is_open()) {
    if (_queue.try_pop(rhs)) {
    return rhs;
    }
    std::this_thread::yield();
    }
    throw channel_closed("receive attempt while closed");
    }
    };

    int main() {

    buffered_channel<size_t> input;
    buffered_channel<size_t> output;

    std::vector<std::thread> producers;
    std::vector<std::thread> consumers;

    for (int i = 0; i < 2; ++i) {
    producers.emplace_back([&]{
    for (size_t i = 0; i < 1000; ++i) {
    input << 1;
    }
    });

    consumers.emplace_back([&]{
    size_t total = 0;
    size_t next = 0;

    try {
    while (input.is_open()) {
    input >> next;
    total += next;
    }
    } catch (channel_closed) {
    // nop
    }

    output << total;
    });
    }

    for (auto&& t : producers) {
    t.join();
    }

    input.drain();

    for (auto&& t : consumers) {
    t.join();
    }

    output.drain();

    size_t total = 0;
    size_t next = 0;

    while (output.is_open()) {
    output >> next;
    total += next;
    }

    std::cout << "total: " << total << std::endl;

    return 0;
    }