Skip to content

Instantly share code, notes, and snippets.

@zigforge
Forked from yonathan06/WorkerQueue.ts
Created December 9, 2024 14:58
Show Gist options
  • Select an option

  • Save zigforge/fd2776622090b6dd3e58f3564c7af86e to your computer and use it in GitHub Desktop.

Select an option

Save zigforge/fd2776622090b6dd3e58f3564c7af86e to your computer and use it in GitHub Desktop.
WorkerQueue.ts (54 lines lightweight typescript worker queue)
interface QueueItem<T, R> {
task: T;
resolve: (result: R) => void;
reject: (error: Error) => void;
}
type Worker<T, R> = (task: T) => Promise<R>;
export class WorkerQueue<T, R> {
queue: QueueItem<T, R>[];
worker: Worker<T, R>;
numOfWorkers: number;
runningWorkersCount: number;
constructor(worker: Worker<T, R>, numOfWorkers = 1) {
this.queue = [];
this.worker = worker;
this.numOfWorkers = numOfWorkers;
this.runningWorkersCount = 0;
}
push(item: T) {
const { promise, resolve, reject } = Promise.withResolvers<R>();
this.queue.push({
task: item,
resolve,
reject,
});
this.callWorkers();
return promise;
}
private callWorkers() {
for (let i = 0; i < this.numOfWorkers - this.runningWorkersCount; i++) {
this.callNextTask();
}
}
private async callNextTask() {
const item = this.queue.shift();
if (!item) {
return;
}
this.runningWorkersCount++;
try {
const result = await this.worker(item.task);
item.resolve(result);
} catch (e) {
item.reject(e as Error);
}
this.runningWorkersCount--;
this.callWorkers();
}
}
// EXAMPLE
const workerQueue = new WorkerQueue(async (num: number) => console.log(num), 10);
const promises: Promise<number> = []
for (let i = 0; i < 100; i++) {
promises.push(workerQueue.push(i));
}
await Promise.all(promises);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment