Created
December 23, 2019 01:37
-
-
Save linsea/9e6e329b359ea66e973e8a535e630c75 to your computer and use it in GitHub Desktop.
Revisions
-
linsea created this gist
Dec 23, 2019 .There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters. Learn more about bidirectional Unicode charactersOriginal file line number Diff line number Diff line change @@ -0,0 +1,268 @@ ## Kotlin 通道 `Deferred` 值提供了一种在协程之间传递单个值的方便手段, 而 `Channel` 提供一种传递一系列值(即 Stream 流)的方法. ### Channel 基础 `Channel` 概念上与Java的 `BlockingQueue` 很相似, 但有一个重要的不同点, `BlockingQueue` 的 `put` 和 `take` 方法是阻塞的, 而`Channel`对应的 `send` 和 `receive` 方法是可挂起. ``` import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { val channel = Channel<Int>() launch { // this might be heavy CPU-consuming computation or async logic, we'll just send five squares for (x in 1..5) channel.send(x * x) } // here we print five received integers: repeat(5) { println(channel.receive()) } println("Done!") } ``` ``` 1 4 9 16 25 Done! ``` ### 关闭与迭代 Channel 不像队列, 通道关闭意味着不会再有数据到来, 接收端可以使用 `for` 循环来接收通道的数据. 概念上, `close` 方法像是发送了一个关闭标记, 然后循环将停止. ``` import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { val channel = Channel<Int>() launch { for (x in 1..5) channel.send(x * x) channel.close() // we're done sending } // here we print received values using `for` loop (until the channel is closed) for (y in channel) println(y) println("Done!") } ``` ### 构建通道生产者 生产者-消费者模式很常见, 使用 `produce` 协程构建器可以轻松地实现生产者. 消费端可以使用 `consumeEach` 扩展函数 , 它可以取代 `for` 循环迭代. ``` import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { //注意返回的类型 for (x in 1..5) send(x * x) } // 注意: produceSquares() 是定义在 CoroutineScope 上的扩展方法 fun main() = runBlocking { val squares = produceSquares() squares.consumeEach { println(it) } println("Done!") } ``` ### 管道 (Pipelines) 管道是一种模式, 它有一个生产者协程, 可能生产出无限的一系列的值. 另外有一个或多个消费协程, 消费值, 然后也可能生产出另外一些值. ``` import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { val numbers = produceNumbers() // produces integers from 1 and on val squares = square(numbers) // squares integers for (i in 1..5) println(squares.receive()) // print first five println("Done!") // we are done coroutineContext.cancelChildren() // cancel children coroutines } //生产者 fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 while (true) send(x++) // infinite stream of integers starting from 1 } //消费者 fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce { for (x in numbers) send(x * x) } ``` 注意生产者和消费者都是定义在 `CoroutineScope` 上的扩展函数, 这样我们可以依赖结构化并发保证取消或结束时没有泄漏. ### 扇出 (Fan-out) - 一对多生产消费模式 多个消费者协程可能消费同一个管道, 如下: ``` import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking<Unit> { val producer = produceNumbers() repeat(5) { launchProcessor(it, producer) } delay(950) producer.cancel() // cancel producer coroutine and thus kill them all } fun CoroutineScope.produceNumbers() = produce<Int> { var x = 1 // start from 1 while (true) { send(x++) // produce next delay(100) // wait 0.1s } } fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch { for (msg in channel) { println("Processor #$id received $msg") } } ``` ``` Processor #2 received 1 Processor #4 received 2 Processor #0 received 3 Processor #1 received 4 Processor #3 received 5 Processor #2 received 6 Processor #4 received 7 Processor #0 received 8 Processor #1 received 9 Processor #3 received 10 ``` 注意: 取消生产者协程会关闭管道, 最终终止在其上迭代的消费者协程. `launchProcessor` 方法内, 在 Channel 迭代的 `for` 循环与前面介绍的 `consumeEach` 不同, 即使在多协程模式下, 它也是完全安全的, 如果一个消费者协程失败了, 其他的仍然可以正确迭代Channel, 而使用 `consumeEach` 则会**在异常结束时会取消 Channel**, 这样会影响其他消费协程. ### 扇入 (Fan-in) - 多对一生产消费模式 有时多个生产协程会往一个管道中生产数据, 而只有一个消费协程. ``` import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { val channel = Channel<String>() launch { sendString(channel, "foo", 200L) } launch { sendString(channel, "BAR!", 500L) } repeat(6) { // receive first six println(channel.receive()) } println("ddd") coroutineContext.cancelChildren() // cancel all children to let main finish } suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) { while (true) { delay(time) channel.send(s) } } ``` ``` foo foo BAR! foo foo foo ``` ### 缓冲 Channel Channel 默认是不带缓存区的, 如果发送方先执行, 则需要挂起等待接收方消费后才会发送下一个数据, 反之亦然. Channel 的工厂方法以及 `produce` builder 方法有一个可选的 `capacity` 参数可以指定缓冲区的大小, 它使生产者在挂起等待前可以最多发送多少个数据, 与 `BlockingQueue` 的缓冲区大小参数类似, 如果 `BlockingQueue` 的缓冲区满了, 则发送操作会阻塞. ``` import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking<Unit> { val channel = Channel<Int>(4) // create buffered channel val sender = launch { // launch sender coroutine repeat(10) { println("Sending $it") // print before sending each element channel.send(it) // will suspend when buffer is full } } // don't receive anything... just wait.... delay(1000) sender.cancel() // cancel sender coroutine } ``` ``` Sending 0 Sending 1 Sending 2 Sending 3 Sending 4 ``` 前4个元素发送到管道了, 但第5个只是打印出日志, 并没有发送成功, 而是挂起了. ### Channel 是公平的 Channel 按调用的顺序来决定谁先发送或接收数据, 即 FIFO 算法. ``` data class Ball(var hits: Int) fun main() = runBlocking { val table = Channel<Ball>() // a shared table launch { player("ping", table) } launch { player("pong", table) } table.send(Ball(0)) // serve the ball delay(1000) // delay 1 second coroutineContext.cancelChildren() // game over, cancel them } suspend fun player(name: String, table: Channel<Ball>) { for (ball in table) { // receive the ball in a loop ball.hits++ println("$name $ball") delay(300) // wait a bit // 这里发送数据, 但即使在for循环内马上接收数据, 但另一个协程已经在他处等待, 优先接收了. table.send(ball) } } ``` ``` ping Ball(hits=1) pong Ball(hits=2) ping Ball(hits=3) pong Ball(hits=4) ``` 虽然 Channel 是公平的, 但因为执行器(`executor`)的原因, 可能看出来不公平. ### 时钟 Channel 有时需要一个信号发生器, 每隔一定的周期或者延迟发送一个信号, 以便做一些同步动作或者重复性的工作. `Ticker` Channel 就是这种信号生产器通道. 它可以使用 `ticker` 工厂方法创建. `Ticker` Channel只产生`Unit`类型的数值, 并且有两种产生信号的时机模式, 固定周期(`FIXED_PERIOD`)或者固定延迟(`FIXED_DELAY`), 固定延迟以消费者接收到数据时开始计时. ``` import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking<Unit> { val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay println("Next element is not ready in 50 ms: $nextElement") nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 100 ms: $nextElement") // Emulate large consumption delays println("Consumer pauses for 150ms") delay(150) // Next element is available immediately nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Next element is available immediately after large consumer delay: $nextElement") // Note that the pause between `receive` calls is taken into account and next element arrives faster nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") tickerChannel.cancel() // indicate that no more elements are needed } ``` ``` Initial element is available immediately: kotlin.Unit Next element is not ready in 50 ms: null Next element is ready in 100 ms: kotlin.Unit Consumer pauses for 150ms Next element is available immediately after large consumer delay: kotlin.Unit Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit ``` `Ticker` 通道可以觉察可能暂停的消费者, 并且当消费者消费太慢时, 停止生产, 自我调整以维护一个固定的生产数据的频率.