Skip to content

Instantly share code, notes, and snippets.

@vmrob
Last active March 29, 2021 23:40
Show Gist options
  • Select an option

  • Save vmrob/e4fde208302ae8979b57 to your computer and use it in GitHub Desktop.

Select an option

Save vmrob/e4fde208302ae8979b57 to your computer and use it in GitHub Desktop.
Go channels in C++
#include <future>
#include <iostream>
#include <thread>
#include <queue>
template <typename T>
class concurrent_queue {
private:
std::queue<T> _queue;
std::mutex _mutex;
public:
bool empty() {
std::lock_guard<std::mutex> l(_mutex);
return _queue.empty();
}
size_t size() {
std::lock_guard<std::mutex> l(_mutex);
return _queue.size();
}
bool try_pop(T& out) {
std::lock_guard<std::mutex> l(_mutex);
if (_queue.empty()) {
return false;
}
out = std::move(_queue.front());
_queue.pop();
return true;
}
void push(T val) {
std::lock_guard<std::mutex> l(_mutex);
_queue.push(std::move(val));
}
};
class channel_closed : public std::runtime_error {
using std::runtime_error::runtime_error;
};
template <typename T>
class buffered_channel {
private:
concurrent_queue<T> _queue;
std::atomic_bool _open;
public:
buffered_channel() {
_open.store(true);
}
~buffered_channel() {
drain();
}
void drain() {
_open.store(false);
}
bool is_open() {
return _open || !_queue.empty();
}
void operator<<(T lhs) {
if (!_open) {
throw channel_closed("send attempt while closed");
}
_queue.push(std::move(lhs));
}
T& operator>>(T& rhs) {
while (is_open()) {
if (_queue.try_pop(rhs)) {
return rhs;
}
std::this_thread::yield();
}
throw channel_closed("receive attempt while closed");
}
};
int main() {
buffered_channel<size_t> input;
buffered_channel<size_t> output;
std::vector<std::thread> producers;
std::vector<std::thread> consumers;
for (int i = 0; i < 2; ++i) {
producers.emplace_back([&]{
for (size_t i = 0; i < 1000; ++i) {
input << 1;
}
});
consumers.emplace_back([&]{
size_t total = 0;
size_t next = 0;
try {
while (input.is_open()) {
input >> next;
total += next;
}
} catch (channel_closed) {
// nop
}
output << total;
});
}
for (auto&& t : producers) {
t.join();
}
input.drain();
for (auto&& t : consumers) {
t.join();
}
output.drain();
size_t total = 0;
size_t next = 0;
while (output.is_open()) {
output >> next;
total += next;
}
std::cout << "total: " << total << std::endl;
return 0;
}
@swr1bm86
Copy link
Copy Markdown

Hi, I just encountered with

libc++abi.dylib: terminating with uncaught exception of type std::__1::system_error: mutex lock failed: Invalid argument

if I run the following code

#include <iostream>
#include <string>
#include <functional>
#include <thread>
#include <chrono>

using namespace std;
using namespace std::chrono;

void subscribe() {
    buffered_channel<int> chan;
    thread t([&] () {
        int a;
        while (chan.recv(a)) {
            cout << "callback!!!" << a << endl;
        }
    });
    t.detach();
    chan.send(1);
}

int main(int argc, char** argv) {
    subscribe();
    this_thread::sleep_for(seconds(1));
    return 0;
}

but if I execute the body of subscribe function directly inside main, everything will be fine which is very confusing, any idea about this issue? many thanks in advance.

@keraba
Copy link
Copy Markdown

keraba commented Aug 21, 2020

The reason for your crash is that 'chan' is destroyed as soon as the main thread leaves 'subscribe()'. When the other thread accesses it, it crashes. By moving it to 'main()', the channel lives until the end of the program.

@oldbane
Copy link
Copy Markdown

oldbane commented Mar 5, 2021

Nice.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment