#include #include #include #include #include #include #include #include std::mutex mtx; std::condition_variable condvar; std::atomic wip; // workers in progress std::queue queue; void producer(int n, std::mt19937 &generator) { for (int i = 0; i != n; ++i) { auto sleep_time = 100 + generator() % 1000; { std::lock_guard lg(mtx); queue.push(i); std::cout << "[tid=" << std::this_thread::get_id() << "] pushing " << i << " add sleep for " << sleep_time << "ms" << std::endl; } condvar.notify_all(); // sleep std::this_thread::sleep_for(std::chrono::milliseconds(sleep_time)); } --wip; condvar.notify_all(); } void consumer() { while (true) { std::unique_lock ul(mtx); condvar.wait(ul, [] { return !wip || !queue.empty(); }); // spurious wakeup protection, i.e: //while (wip || queue.empty()) // condvar.wait(ul); while (!queue.empty()) { std::cout << "[tid=" << std::this_thread::get_id() << "] consuming " << queue.front() << std::endl; queue.pop(); } if (wip == 0) break; } } int main() { const int max_tasks = 10; // how many int do we need from the each producer? // Random generator std::mt19937 generator((unsigned int)std::chrono::system_clock::now().time_since_epoch().count()); std::vector threads(std::thread::hardware_concurrency()); // threads == vCPUs // start all std::transform(threads.begin(), threads.end(), threads.begin(), [&generator](auto *thr) { ++wip; return new std::thread(producer, max_tasks, std::ref(generator)); }); std::thread cons(consumer); // join all std::for_each(threads.begin(), threads.end(), [](auto *thr) { thr->join(); delete thr; }); cons.join(); std::cout << "Completed!" << std::endl; }