Skip to content

Instantly share code, notes, and snippets.

@mrchaofan
Last active March 7, 2023 08:13
Show Gist options
  • Select an option

  • Save mrchaofan/d5770bf504c825d35d5382cf178d5e86 to your computer and use it in GitHub Desktop.

Select an option

Save mrchaofan/d5770bf504c825d35d5382cf178d5e86 to your computer and use it in GitHub Desktop.
type Callback = (err?: Error) => void
type Task = (callback: Callback) => void
interface ITaskProcessorOptions {
maxConcurrency: number
}
export default class TaskProcessor {
private pendingTaskCount = 0
private queue: Array<{ task: Task, callback: Callback }> = []
private emptyCallbacks: Callback[] = []
constructor (private readonly options: ITaskProcessorOptions) {
}
public addTask (task: Task, callback: Callback): void {
if (this.pendingTaskCount < this.options.maxConcurrency) {
this.runTask(task, (er) => {
callback(er)
this.runTaskIfNotEmpty()
})
return
}
this.queue.push({
task,
callback,
})
}
public clear (): void {
this.queue = []
}
public waitForEmpty (callback: Callback): void {
if (this.queue.length === 0 && this.pendingTaskCount === 0) {
callback()
return
}
this.emptyCallbacks.push(callback)
}
private callEmptyCallbacks (): void {
const emptyCallbacks = this.emptyCallbacks
this.emptyCallbacks = []
emptyCallbacks.forEach(callback => {
callback()
})
}
private runTaskIfNotEmpty (): void {
if (this.queue.length === 0 && this.pendingTaskCount === 0) {
this.callEmptyCallbacks()
}
while (this.queue.length > 0 && this.pendingTaskCount < this.options.maxConcurrency) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const front = this.queue.shift()!
this.runTask(front.task, (er) => {
front.callback(er)
this.runTaskIfNotEmpty()
})
}
}
private runTask (task: Task, callback: Callback): void {
++this.pendingTaskCount
task((er) => {
--this.pendingTaskCount
callback(er)
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment