Skip to content

Instantly share code, notes, and snippets.

@semmel
Last active November 11, 2025 14:10
Show Gist options
  • Select an option

  • Save semmel/0edd6d46ce06d5f32db4f811e0a14a07 to your computer and use it in GitHub Desktop.

Select an option

Save semmel/0edd6d46ce06d5f32db4f811e0a14a07 to your computer and use it in GitHub Desktop.
Parallelism in Promise (Task) Queues
import * as M from '@most/core';
import { newDefaultScheduler } from '@most/scheduler';
import { pipe } from 'fp-ts/function';
import * as T from 'fp-ts/Task';
import * as A from 'fp-ts/Array';
import { o } from 'ramda';
import {last as lastM } from 'most-last';
import * as Rx from 'rxjs';
const taskReducer = <A>(acc: Promise<A[]>, ta: T.Task<A>) =>
acc.then(as => ta().then((a:A) => [...as, a]));
const taskSettledReducer = <A>(acc: Promise<PromiseSettledResult<A>[]>, ta: T.Task<A>) =>
acc.then(as => ta()
.then((a:A) => [...as, {status: 'fulfilled', value: a} as PromiseFulfilledResult<A>])
.catch(e => [...as, {status: 'rejected', reason: e} as PromiseRejectedResult])
);
const xs = [[ 1, 200 ], [ 2, 500 ], [ 3, 200 ], [ 4, 500 ], [ 5, 200 ], [ 6, 500 ]];
const POOL_SIZE = 2;
const squareTask = ([x, dt]: number[]) => (): Promise<number> => new Promise(res => setTimeout(res, dt, x * x));
const square = ([x, dt]: number[]): Promise<number> => new Promise(res => setTimeout(res, dt, x * x));
// baseline: all sequentially
console.time("all seq");
await pipe(xs,
A.map(squareTask),
A.reduce(Promise.resolve([]), taskReducer)
);
console.timeEnd("all seq");
// baseline: all in parallel
console.time("all parallel");
await pipe(xs,
A.map(square),
ps => Promise.all(ps)
);
console.timeEnd("all parallel");
// sequence of parallel chunks
console.time("seq of par chunks");
await pipe(xs,
A.map(squareTask),
A.chunksOf(POOL_SIZE),
A.map(A.sequence(T.ApplicativePar)),
A.sequence(T.ApplicativeSeq),
T.map(A.flatten)
)();
console.timeEnd("seq of par chunks");
// Collect the outcome //
const tasks: T.Task<number>[] = [
() => Promise.reject("oops!"),
squareTask([9, 500]),
squareTask([99, 500])
];
// all parallel
await pipe(tasks,
A.map(task => task()),
Promise.allSettled.bind(Promise)
);
// all sequential
await pipe(tasks,
A.reduce(Promise.resolve([]), taskSettledReducer<number>)
);
const most3tierStream = pipe(
M.periodic(0),
M.withItems(xs),
M.mergeMapConcurrently(o(M.fromPromise, square), POOL_SIZE),
M.scan((acc: number[], squareNumber: number) => [...acc, squareNumber], []),
lastM,
M.tap(squares => {
console.timeEnd("most pool");
console.log(squares); })
);
console.time("most pool");
M.runEffects(most3tierStream, newDefaultScheduler());
console.time("Rx pool");
await Rx.lastValueFrom(Rx.from(xs).pipe(
Rx.mergeMap(o(Rx.from, square), POOL_SIZE),
Rx.reduce((acc: number[], squareNumber: number) => [...acc, squareNumber], [])
));
console.timeEnd("Rx pool");

Run async computations with custom parallelism

Contestants

  • native Promise vs.
  • Task A ≡ () -> Promise A from fp-ts vs.
  • p-queue library
  • Observables e.g. most stream library

Scenario

E.g. six tasks with durations 2, 5, 2, 5, 2, and 5.

sequential: T = ∑t = 21
|->|---->|->|---->|->|---->|->|---->|

all in parallel: T = max(t) = 5
|>
|--->
|>
|--->
|>
|--->

batches of 2 in parallel: T = 5 x 3 = 15
|>
|--->
     |>
     |--->
          |>
          |--->
           
parallel pool of 2 workers: T = 12
|>
|--->
  |>
    |--->
     |>
       |--->

Setup

// work computation takes a number to be squared together with the duration
const square = ([x, dt]):Promise<number> => new Promise(res => setTimeout(res, dt, x * x));
const squareT = ([x, dt]) => (): Promise<number> => new Promise(res => setTimeout(res, dt, x * x));

// execute 12 square operations with durations
const xs = [
	[ 1, 0 ], [ 2, 50 ], [ 3, 250 ], 
	[ 4, 0 ], [ 5, 50 ],  [ 6, 250 ],
	[ 7, 0 ], [ 8, 50 ], [ 9, 250 ], 
	[ 10, 0 ], [ 11, 50 ], [ 12, 250 ]
];
// added durations: 1200ms 

Libs Used

import * as A from 'fp-ts/Array';
import * as T from 'fp-ts/Task';
import { flow, pipe } from 'fp-ts/function'

Native Promise

// some helper functions
const allPromises = <T>(promises: Promise<T>[]): Promise<T[]> => 
  Promise.all(promises);
const mapP = <A, B>(fa2b: (a:A) => B) => (pa: Promise<A>) => pa.then(fa2b);
const taskReducer = <A>(acc: Promise<A[]>, ta: T.Task<A>) => 
    acc.then(as => ta().then(a => A.append(a)(as)));

// all sequentially
await pipe(xs,
  A.map(squareT),
  A.reduce(Promise.resolve([]), taskReducer)
); // [ 1, 4, 9, … ] ~> 1.2 secs = ∑ durations

// sequence of parallel chunks
await pipe(xs,
  A.chunksOf(3),
  A.map(xs => () => pipe(
    xs,
    A.map(square),
    allPromises
  )),
  A.reduce(Promise.resolve([]), taskReducer),
  mapP(A.flatten)
); // [1, 4, 9, …] ~> 1sec = 4 ⨉ 250

// all in parallel
await pipe(xs,
  A.map(square),
  allPromises
); // [1, 4, 9, …] ~> 250ms = max(durations)

Using fp-ts with Promises aka fp-ts/Task

// sequentially
await pipe(xs,
  A.map(squareT),
  A.sequence(T.ApplicativeSeq)
)();

// sequence of parallel chunks
await pipe(xs,
  A.map(squareT),
  A.chunksOf(3),
  A.map(A.sequence(T.ApplicativePar)),
  A.sequence(T.ApplicativeSeq),
  T.map(A.flatten)
)(); // [1, 4, 9, …] ~> 1sec

// all in parallel
await pipe(xs,
  A.map(squareT),
  A.sequence(T.ApplicativePar)
)();

Using the p-queue library

const singleTrackQueue = new PQueue({concurrency: 1});
const threeTrackQueue = new PQueue({concurrency: 3});
const infiniteWidthTrackQueue = new PQueue();

// sequentially
await f.pipe(xs, 
  A.map(squareT), 
  A.map(t => singleTrackQueue.add(t)), 
  allPromises
); // ~> 1.5 secs

// with parallelity
await f.pipe(xls, 
  A.map(squareT), 
  A.map(t => threeTrackQueue.add(t)), 
  allPromises
); // [1, 4, ..., 144] ~> 2 secs

// all in parallel
await f.pipe(xs, 
  A.map(squareT), 
  A.map(t => infiniteWidthTrackQueue.add(t)), 
  allPromises
); // ~> 0.5 secs
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment