Skip to content

Instantly share code, notes, and snippets.

@puemos
Last active May 6, 2018 07:53
Show Gist options
  • Select an option

  • Save puemos/f68f3edc78bcc248af9e7c01def3f0d9 to your computer and use it in GitHub Desktop.

Select an option

Save puemos/f68f3edc78bcc248af9e7c01def3f0d9 to your computer and use it in GitHub Desktop.
import { buffers, channel } from "redux-saga";
import { all, call, fork, put, take } from "redux-saga/effects";
/**
* creates a queue
*
* @param {GeneratorFunction} [handler] request handler
* @param {number} [workersCount=1] number of workers
*/
function* createConcurrentTaskQueue(handler, workersCount = 1) {
// create a channel to queue incoming action
const queueChannel = yield call(channel, buffers.expanding());
// watch for actions and delegate them as tasks to the workers
function* watcher() {
// create a channel to queue incoming tasks
const workersChannel = yield call(channel, buffers.expanding());
// create n worker 'threads'
yield all(Array(workersCount).fill(fork(worker, workersChannel)));
while (true) {
const { payload } = yield take(queueChannel);
yield put(workersChannel, payload);
}
}
// worker that wait for a task
function* worker(chan) {
while (true) {
const payload = yield take(chan);
yield handler(payload);
}
}
function* queue(action){
yield put(addTaskChannel, action)
}
return {
watcher,
queue,
};
}
export default createConcurrentTaskQueue;
function* handler(payload){
...
}
function* bulkMsgs(action){
const { msgs } = action.payload
const concurrentTaskQueue = yield createConcurrentTaskQueue(handler, 3)
for (msg in msgs){
concurrentTaskQueue.queue({ type: 'SEND', payload: msg })
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment