Last active
September 7, 2023 06:59
-
-
Save blakelee/c0a87240810221d4e8b5401512d548b1 to your computer and use it in GitHub Desktop.
A utility class for managing a queue of suspended tasks in an ordered manner with an optional "idle" callback can be invoked when the queue becomes empty.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /** | |
| * A utility class for managing a queue of suspended tasks (represented by Deferred objects) within | |
| * a given CoroutineScope. Tasks are executed in an ordered manner, and an "idle" callback can be | |
| * invoked when the queue becomes empty. | |
| * | |
| * @param coroutineScope The CoroutineScope in which tasks will be executed. | |
| * @param R The type of result produced by the tasks. | |
| */ | |
| class DeferredQueue<R>(private val coroutineScope: CoroutineScope) { | |
| // The queue for holding Deferred tasks to be executed. | |
| private val queue = Channel<Deferred<R>>(Channel.RENDEZVOUS) | |
| // An optional callback to be invoked when the queue is empty. | |
| private var onIdle: (suspend () -> Unit) = {} | |
| init { | |
| coroutineScope.launch { | |
| while (true) { | |
| select { | |
| queue.onReceive { element -> | |
| element.join() | |
| } | |
| // When there are no items in the queue, invoke the onIdle callback if set then wait for an item in the queue | |
| // to continue | |
| onTimeout(1) { | |
| onIdle() | |
| queue.receive().join() | |
| } | |
| } | |
| } | |
| } | |
| } | |
| /** | |
| * Set an "idle" callback function to be invoked when the queue becomes empty. | |
| * This can be used to perform additional actions when the queue is idle. | |
| * | |
| * @param block A suspending lambda representing the callback action. | |
| * @return This DeferredQueue instance to allow for method chaining. | |
| */ | |
| fun onIdle(block: suspend () -> Unit): DeferredQueue<R> = also { onIdle = block } | |
| /** | |
| * Add a task to the queue to be executed. The task is represented by a suspending lambda. | |
| * | |
| * @param block A suspending lambda representing the task to be executed. | |
| * @return The result of the task once it completes. | |
| */ | |
| suspend fun add(block: suspend CoroutineScope.() -> R): R { | |
| val deferred = coroutineScope.async(start = CoroutineStart.LAZY) { | |
| block() | |
| } | |
| queue.send(deferred) | |
| return deferred.await() | |
| } | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment