Skip to content

Instantly share code, notes, and snippets.

@puemos
Last active May 6, 2018 07:53
Show Gist options
  • Select an option

  • Save puemos/f68f3edc78bcc248af9e7c01def3f0d9 to your computer and use it in GitHub Desktop.

Select an option

Save puemos/f68f3edc78bcc248af9e7c01def3f0d9 to your computer and use it in GitHub Desktop.

Revisions

  1. puemos revised this gist May 6, 2018. 1 changed file with 7 additions and 6 deletions.
    13 changes: 7 additions & 6 deletions create-concurrent-task-queue.js
    Original file line number Diff line number Diff line change
    @@ -9,28 +9,29 @@ import { all, call, fork, put, take } from "redux-saga/effects";
    */
    function* createConcurrentTaskQueue(handler, workersCount = 1) {

    // create a channel to queue incoming action
    // a channel to queue incoming action
    const queueChannel = yield call(channel, buffers.expanding());

    // watch for actions and delegate them as tasks to the workers
    function* watcher() {
    // create a channel to queue incoming tasks
    // a channel to queue incoming tasks
    const workersChannel = yield call(channel, buffers.expanding());

    // create n worker 'threads'
    yield all(Array(workersCount).fill(fork(worker, workersChannel)));

    // wait for a task and once it arrive assign it to one of the workers
    // wait for a tasks
    while (true) {
    // incoming task
    const { payload } = yield take(queueChannel);
    // assign the task to one of the workers
    yield put(workersChannel, payload);
    }
    }

    // worker that waits for a task
    // a single worker
    function* worker(chan) {
    while (true) {
    // take an task
    // incoming task
    const payload = yield take(chan);
    // handle it with the given handler arg
    yield handler(payload);
  2. puemos revised this gist May 6, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion create-concurrent-task-queue.js
    Original file line number Diff line number Diff line change
    @@ -39,7 +39,7 @@ function* createConcurrentTaskQueue(handler, workersCount = 1) {

    return {
    watcher,
    addTaskChannel,
    queueChannel,
    };
    }

  3. puemos revised this gist May 6, 2018. 1 changed file with 1 addition and 6 deletions.
    7 changes: 1 addition & 6 deletions create-concurrent-task-queue.js
    Original file line number Diff line number Diff line change
    @@ -37,14 +37,9 @@ function* createConcurrentTaskQueue(handler, workersCount = 1) {
    }
    }

    // add a task to the queue
    function* queue(action){
    yield put(addTaskChannel, action)
    }

    return {
    watcher,
    queue,
    addTaskChannel,
    };
    }

  4. puemos revised this gist May 6, 2018. 1 changed file with 5 additions and 1 deletion.
    6 changes: 5 additions & 1 deletion create-concurrent-task-queue.js
    Original file line number Diff line number Diff line change
    @@ -20,20 +20,24 @@ function* createConcurrentTaskQueue(handler, workersCount = 1) {
    // create n worker 'threads'
    yield all(Array(workersCount).fill(fork(worker, workersChannel)));

    // wait for a task and once it arrive assign it to one of the workers
    while (true) {
    const { payload } = yield take(queueChannel);
    yield put(workersChannel, payload);
    }
    }

    // worker that wait for a task
    // worker that waits for a task
    function* worker(chan) {
    while (true) {
    // take an task
    const payload = yield take(chan);
    // handle it with the given handler arg
    yield handler(payload);
    }
    }

    // add a task to the queue
    function* queue(action){
    yield put(addTaskChannel, action)
    }
  5. puemos renamed this gist May 6, 2018. 1 changed file with 0 additions and 0 deletions.
  6. puemos revised this gist May 6, 2018. 1 changed file with 0 additions and 11 deletions.
    11 changes: 0 additions & 11 deletions msgs.js
    Original file line number Diff line number Diff line change
    @@ -1,11 +0,0 @@
    function* handler(payload){
    ...
    }

    function* bulkMsgs(action){
    const { msgs } = action.payload
    const concurrentTaskQueue = yield createConcurrentTaskQueue(handler, 3)
    for (msg in msgs){
    concurrentTaskQueue.queue({ type: 'SEND', payload: msg })
    }
    }
  7. puemos revised this gist May 2, 2018. 1 changed file with 11 additions and 0 deletions.
    11 changes: 11 additions & 0 deletions msgs.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,11 @@
    function* handler(payload){
    ...
    }

    function* bulkMsgs(action){
    const { msgs } = action.payload
    const concurrentTaskQueue = yield createConcurrentTaskQueue(handler, 3)
    for (msg in msgs){
    concurrentTaskQueue.queue({ type: 'SEND', payload: msg })
    }
    }
  8. puemos revised this gist May 2, 2018. 1 changed file with 2 additions and 0 deletions.
    2 changes: 2 additions & 0 deletions createConcurrentTaskQueue.js
    Original file line number Diff line number Diff line change
    @@ -12,6 +12,7 @@ function* createConcurrentTaskQueue(handler, workersCount = 1) {
    // create a channel to queue incoming action
    const queueChannel = yield call(channel, buffers.expanding());

    // watch for actions and delegate them as tasks to the workers
    function* watcher() {
    // create a channel to queue incoming tasks
    const workersChannel = yield call(channel, buffers.expanding());
    @@ -25,6 +26,7 @@ function* createConcurrentTaskQueue(handler, workersCount = 1) {
    }
    }

    // worker that wait for a task
    function* worker(chan) {
    while (true) {
    const payload = yield take(chan);
  9. puemos revised this gist May 2, 2018. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions createConcurrentTaskQueue.js
    Original file line number Diff line number Diff line change
    @@ -5,9 +5,9 @@ import { all, call, fork, put, take } from "redux-saga/effects";
    * creates a queue
    *
    * @param {GeneratorFunction} [handler] request handler
    * @param {number} [concurrent=1] number of workers
    * @param {number} [workersCount=1] number of workers
    */
    function* createConcurrentTaskQueue(handler, concurrent = 1) {
    function* createConcurrentTaskQueue(handler, workersCount = 1) {

    // create a channel to queue incoming action
    const queueChannel = yield call(channel, buffers.expanding());
    @@ -17,7 +17,7 @@ function* createConcurrentTaskQueue(handler, concurrent = 1) {
    const workersChannel = yield call(channel, buffers.expanding());

    // create n worker 'threads'
    yield all(Array(concurrent).fill(fork(worker, workersChannel)));
    yield all(Array(workersCount).fill(fork(worker, workersChannel)));

    while (true) {
    const { payload } = yield take(queueChannel);
  10. puemos renamed this gist May 2, 2018. 1 changed file with 0 additions and 0 deletions.
    File renamed without changes.
  11. puemos revised this gist May 2, 2018. 1 changed file with 12 additions and 12 deletions.
    24 changes: 12 additions & 12 deletions queue1.js
    Original file line number Diff line number Diff line change
    @@ -7,21 +7,21 @@ import { all, call, fork, put, take } from "redux-saga/effects";
    * @param {GeneratorFunction} [handler] request handler
    * @param {number} [concurrent=1] number of workers
    */
    function* ConcurrentTaskQueue(handler, concurrent = 1) {
    function* createConcurrentTaskQueue(handler, concurrent = 1) {

    // create a channel to queue incoming action
    const queueChannel = yield call(channel, buffers.expanding());

    function* watcher() {
    try {
    // create a channel to queue incoming requests
    const workersChannel = yield call(channel, buffers.expanding());
    // create a channel to queue incoming tasks
    const workersChannel = yield call(channel, buffers.expanding());

    // create n worker 'threads'
    yield all(Array(concurrent).fill(fork(worker, workersChannel)));
    // create n worker 'threads'
    yield all(Array(concurrent).fill(fork(worker, workersChannel)));

    while (true) {
    const { payload } = yield take(queueChannel);
    yield put(workersChannel, payload);
    }
    } finally {
    while (true) {
    const { payload } = yield take(queueChannel);
    yield put(workersChannel, payload);
    }
    }

    @@ -42,4 +42,4 @@ function* ConcurrentTaskQueue(handler, concurrent = 1) {
    };
    }

    export default ConcurrentTaskQueue;
    export default createConcurrentTaskQueue;
  12. puemos revised this gist May 2, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion queue1.js
    Original file line number Diff line number Diff line change
    @@ -42,4 +42,4 @@ function* ConcurrentTaskQueue(handler, concurrent = 1) {
    };
    }

    export default createQueue;
    export default ConcurrentTaskQueue;
  13. puemos revised this gist May 2, 2018. 1 changed file with 14 additions and 10 deletions.
    24 changes: 14 additions & 10 deletions queue1.js
    Original file line number Diff line number Diff line change
    @@ -8,33 +8,37 @@ import { all, call, fork, put, take } from "redux-saga/effects";
    * @param {number} [concurrent=1] number of workers
    */
    function* ConcurrentTaskQueue(handler, concurrent = 1) {
    const addTaskChannel = yield call(channel, buffers.expanding());
    function* watchRequests() {
    const queueChannel = yield call(channel, buffers.expanding());
    function* watcher() {
    try {
    // create a channel to queue incoming requests
    const runChannel = yield call(channel, buffers.expanding());
    const workersChannel = yield call(channel, buffers.expanding());

    // create n worker 'threads'
    yield all(Array(concurrent).fill(fork(handleRequest, runChannel)));
    yield all(Array(concurrent).fill(fork(worker, workersChannel)));

    while (true) {
    const { payload } = yield take(addTaskChannel);
    yield put(runChannel, payload);
    const { payload } = yield take(queueChannel);
    yield put(workersChannel, payload);
    }
    } finally {
    }
    }

    function* handleRequest(chan) {
    function* worker(chan) {
    while (true) {
    const payload = yield take(chan);
    yield handler(payload);
    }
    }


    function* queue(action){
    yield put(addTaskChannel, action)
    }

    return {
    watcher: watchRequests,
    addTaskChannel
    watcher,
    queue,
    };
    }

  14. puemos revised this gist May 2, 2018. 1 changed file with 1 addition and 1 deletion.
    2 changes: 1 addition & 1 deletion queue1.js
    Original file line number Diff line number Diff line change
    @@ -7,7 +7,7 @@ import { all, call, fork, put, take } from "redux-saga/effects";
    * @param {GeneratorFunction} [handler] request handler
    * @param {number} [concurrent=1] number of workers
    */
    function* createQueue(handler, concurrent = 1) {
    function* ConcurrentTaskQueue(handler, concurrent = 1) {
    const addTaskChannel = yield call(channel, buffers.expanding());
    function* watchRequests() {
    try {
  15. puemos revised this gist May 2, 2018. 1 changed file with 3 additions and 3 deletions.
    6 changes: 3 additions & 3 deletions queue1.js
    Original file line number Diff line number Diff line change
    @@ -4,10 +4,10 @@ import { all, call, fork, put, take } from "redux-saga/effects";
    /**
    * creates a queue
    *
    * @param {GeneratorFunction} [handle=() => {}] request handler
    * @param {GeneratorFunction} [handler] request handler
    * @param {number} [concurrent=1] number of workers
    */
    function* createQueue(handle = () => {}, concurrent = 1) {
    function* createQueue(handler, concurrent = 1) {
    const addTaskChannel = yield call(channel, buffers.expanding());
    function* watchRequests() {
    try {
    @@ -28,7 +28,7 @@ function* createQueue(handle = () => {}, concurrent = 1) {
    function* handleRequest(chan) {
    while (true) {
    const payload = yield take(chan);
    yield handle(payload);
    yield handler(payload);
    }
    }

  16. puemos created this gist May 2, 2018.
    41 changes: 41 additions & 0 deletions queue1.js
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,41 @@
    import { buffers, channel } from "redux-saga";
    import { all, call, fork, put, take } from "redux-saga/effects";

    /**
    * creates a queue
    *
    * @param {GeneratorFunction} [handle=() => {}] request handler
    * @param {number} [concurrent=1] number of workers
    */
    function* createQueue(handle = () => {}, concurrent = 1) {
    const addTaskChannel = yield call(channel, buffers.expanding());
    function* watchRequests() {
    try {
    // create a channel to queue incoming requests
    const runChannel = yield call(channel, buffers.expanding());

    // create n worker 'threads'
    yield all(Array(concurrent).fill(fork(handleRequest, runChannel)));

    while (true) {
    const { payload } = yield take(addTaskChannel);
    yield put(runChannel, payload);
    }
    } finally {
    }
    }

    function* handleRequest(chan) {
    while (true) {
    const payload = yield take(chan);
    yield handle(payload);
    }
    }

    return {
    watcher: watchRequests,
    addTaskChannel
    };
    }

    export default createQueue;