Contestants
- native Promise vs.
Task A ≡ () -> Promise Afrom fp-ts vs.- p-queue library
- Observables e.g. most stream library
E.g. six tasks with durations 2, 5, 2, 5, 2, and 5.
sequential: T = ∑t = 21
|->|---->|->|---->|->|---->|->|---->|
all in parallel: T = max(t) = 5
|>
|--->
|>
|--->
|>
|--->
batches of 2 in parallel: T = 5 x 3 = 15
|>
|--->
|>
|--->
|>
|--->
parallel pool of 2 workers: T = 12
|>
|--->
|>
|--->
|>
|--->
// work computation takes a number to be squared together with the duration
const square = ([x, dt]):Promise<number> => new Promise(res => setTimeout(res, dt, x * x));
const squareT = ([x, dt]) => (): Promise<number> => new Promise(res => setTimeout(res, dt, x * x));
// execute 12 square operations with durations
const xs = [
[ 1, 0 ], [ 2, 50 ], [ 3, 250 ],
[ 4, 0 ], [ 5, 50 ], [ 6, 250 ],
[ 7, 0 ], [ 8, 50 ], [ 9, 250 ],
[ 10, 0 ], [ 11, 50 ], [ 12, 250 ]
];
// added durations: 1200ms import * as A from 'fp-ts/Array';
import * as T from 'fp-ts/Task';
import { flow, pipe } from 'fp-ts/function'// some helper functions
const allPromises = <T>(promises: Promise<T>[]): Promise<T[]> =>
Promise.all(promises);
const mapP = <A, B>(fa2b: (a:A) => B) => (pa: Promise<A>) => pa.then(fa2b);
const taskReducer = <A>(acc: Promise<A[]>, ta: T.Task<A>) =>
acc.then(as => ta().then(a => A.append(a)(as)));
// all sequentially
await pipe(xs,
A.map(squareT),
A.reduce(Promise.resolve([]), taskReducer)
); // [ 1, 4, 9, … ] ~> 1.2 secs = ∑ durations
// sequence of parallel chunks
await pipe(xs,
A.chunksOf(3),
A.map(xs => () => pipe(
xs,
A.map(square),
allPromises
)),
A.reduce(Promise.resolve([]), taskReducer),
mapP(A.flatten)
); // [1, 4, 9, …] ~> 1sec = 4 ⨉ 250
// all in parallel
await pipe(xs,
A.map(square),
allPromises
); // [1, 4, 9, …] ~> 250ms = max(durations)// sequentially
await pipe(xs,
A.map(squareT),
A.sequence(T.ApplicativeSeq)
)();
// sequence of parallel chunks
await pipe(xs,
A.map(squareT),
A.chunksOf(3),
A.map(A.sequence(T.ApplicativePar)),
A.sequence(T.ApplicativeSeq),
T.map(A.flatten)
)(); // [1, 4, 9, …] ~> 1sec
// all in parallel
await pipe(xs,
A.map(squareT),
A.sequence(T.ApplicativePar)
)();const singleTrackQueue = new PQueue({concurrency: 1});
const threeTrackQueue = new PQueue({concurrency: 3});
const infiniteWidthTrackQueue = new PQueue();
// sequentially
await f.pipe(xs,
A.map(squareT),
A.map(t => singleTrackQueue.add(t)),
allPromises
); // ~> 1.5 secs
// with parallelity
await f.pipe(xls,
A.map(squareT),
A.map(t => threeTrackQueue.add(t)),
allPromises
); // [1, 4, ..., 144] ~> 2 secs
// all in parallel
await f.pipe(xs,
A.map(squareT),
A.map(t => infiniteWidthTrackQueue.add(t)),
allPromises
); // ~> 0.5 secs