Skip to content

Instantly share code, notes, and snippets.

@blakelee
Last active September 7, 2023 06:59
Show Gist options
  • Select an option

  • Save blakelee/c0a87240810221d4e8b5401512d548b1 to your computer and use it in GitHub Desktop.

Select an option

Save blakelee/c0a87240810221d4e8b5401512d548b1 to your computer and use it in GitHub Desktop.
A utility class for managing a queue of suspended tasks in an ordered manner with an optional "idle" callback can be invoked when the queue becomes empty.
/**
* A utility class for managing a queue of suspended tasks (represented by Deferred objects) within
* a given CoroutineScope. Tasks are executed in an ordered manner, and an "idle" callback can be
* invoked when the queue becomes empty.
*
* @param coroutineScope The CoroutineScope in which tasks will be executed.
* @param R The type of result produced by the tasks.
*/
class DeferredQueue<R>(private val coroutineScope: CoroutineScope) {
// The queue for holding Deferred tasks to be executed.
private val queue = Channel<Deferred<R>>(Channel.RENDEZVOUS)
// An optional callback to be invoked when the queue is empty.
private var onIdle: (suspend () -> Unit) = {}
init {
coroutineScope.launch {
while (true) {
select {
queue.onReceive { element ->
element.join()
}
// When there are no items in the queue, invoke the onIdle callback if set then wait for an item in the queue
// to continue
onTimeout(1) {
onIdle()
queue.receive().join()
}
}
}
}
}
/**
* Set an "idle" callback function to be invoked when the queue becomes empty.
* This can be used to perform additional actions when the queue is idle.
*
* @param block A suspending lambda representing the callback action.
* @return This DeferredQueue instance to allow for method chaining.
*/
fun onIdle(block: suspend () -> Unit): DeferredQueue<R> = also { onIdle = block }
/**
* Add a task to the queue to be executed. The task is represented by a suspending lambda.
*
* @param block A suspending lambda representing the task to be executed.
* @return The result of the task once it completes.
*/
suspend fun add(block: suspend CoroutineScope.() -> R): R {
val deferred = coroutineScope.async(start = CoroutineStart.LAZY) {
block()
}
queue.send(deferred)
return deferred.await()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment