Skip to content

Instantly share code, notes, and snippets.

@antaalt
Last active July 12, 2020 15:26
Show Gist options
  • Select an option

  • Save antaalt/82542580223c099eeced6797149e6ba9 to your computer and use it in GitHub Desktop.

Select an option

Save antaalt/82542580223c099eeced6797149e6ba9 to your computer and use it in GitHub Desktop.
Worker class to easily implement parallel tasks.
#pragma once
#include <queue>
#include <thread>
#include <mutex>
#include <atomic>
#include <condition_variable>
template <typename T>
class Worker {
public:
Worker() : m_running(false) {}
virtual ~Worker() {
stop();
wait();
}
// Start the worker
void start() {
if (m_running)
return;
m_running = true;
m_thread = std::thread(&Thread<T>::loop, this);
}
// Signal the worker to stop
void stop() {
if (!m_running)
return;
m_running = false;
std::unique_lock<std::mutex> lock(m_mutexQueue);
std::queue<T>().swap(m_queue);
lock.unlock();
m_condition.notify_one();
}
// Wait for worker to stop
void wait() {
if (m_thread.joinable())
m_thread.join();
}
// Empty the queue
void reset() {
std::lock_guard<std::mutex> lock(m_mutexQueue);
std::queue<T>().swap(m_queue);
}
// Add an element to the queue
void add(T &&data) {
std::unique_lock<std::mutex> lock(m_mutexQueue);
m_queue.push(std::move(data));
lock.unlock();
m_condition.notify_one();
}
void add(const T &data) {
std::unique_lock<std::mutex> lock(m_mutexQueue);
m_queue.push(data);
lock.unlock();
m_condition.notify_one();
}
private:
void loop() {
initialize();
while (condition())
{
std::unique_lock<std::mutex> lock(m_mutexQueue);
while (m_queue.empty() && m_running)
m_condition.wait(lock);
if (!m_queue.empty())
{
T front = m_queue.front();
m_queue.pop();
lock.unlock();
task(front);
}
}
destroy();
}
protected:
// Task to run on each loop
virtual void task(const T &data) = 0;
// Condition to keep the loop running
// Default loop only stop if queue is empty & m_running is false
virtual bool condition() {
std::lock_guard<std::mutex> lock(m_mutexQueue);
return !m_queue.empty() || m_running;
}
// Initialize the thread data
virtual void initialize() {}
// Destroy the thread data
virtual void destroy() {}
private:
std::atomic<bool> m_running;
std::thread m_thread;
std::queue<T> m_queue;
std::mutex m_mutexQueue;
std::condition_variable m_condition;
};
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment