import { buffers, channel } from "redux-saga"; import { all, call, fork, put, take } from "redux-saga/effects"; /** * creates a queue * * @param {GeneratorFunction} [handler] request handler * @param {number} [concurrent=1] number of workers */ function* ConcurrentTaskQueue(handler, concurrent = 1) { const queueChannel = yield call(channel, buffers.expanding()); function* watcher() { try { // create a channel to queue incoming requests const workersChannel = yield call(channel, buffers.expanding()); // create n worker 'threads' yield all(Array(concurrent).fill(fork(worker, workersChannel))); while (true) { const { payload } = yield take(queueChannel); yield put(workersChannel, payload); } } finally { } } function* worker(chan) { while (true) { const payload = yield take(chan); yield handler(payload); } } function* queue(action){ yield put(addTaskChannel, action) } return { watcher, queue, }; } export default ConcurrentTaskQueue;