Skip to content

Instantly share code, notes, and snippets.

@linsea
Created December 23, 2019 01:37
Show Gist options
  • Select an option

  • Save linsea/9e6e329b359ea66e973e8a535e630c75 to your computer and use it in GitHub Desktop.

Select an option

Save linsea/9e6e329b359ea66e973e8a535e630c75 to your computer and use it in GitHub Desktop.

Revisions

  1. linsea created this gist Dec 23, 2019.
    268 changes: 268 additions & 0 deletions Kotlin 通道.md
    Original file line number Diff line number Diff line change
    @@ -0,0 +1,268 @@
    ## Kotlin 通道

    `Deferred` 值提供了一种在协程之间传递单个值的方便手段, 而 `Channel` 提供一种传递一系列值(即 Stream 流)的方法.

    ### Channel 基础
    `Channel` 概念上与Java的 `BlockingQueue` 很相似, 但有一个重要的不同点, `BlockingQueue``put``take` 方法是阻塞的, 而`Channel`对应的 `send``receive` 方法是可挂起.
    ```
    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.*
    fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
    // this might be heavy CPU-consuming computation or async logic, we'll just send five squares
    for (x in 1..5) channel.send(x * x)
    }
    // here we print five received integers:
    repeat(5) { println(channel.receive()) }
    println("Done!")
    }
    ```
    ```
    1
    4
    9
    16
    25
    Done!
    ```

    ### 关闭与迭代 Channel
    不像队列, 通道关闭意味着不会再有数据到来, 接收端可以使用 `for` 循环来接收通道的数据. 概念上, `close` 方法像是发送了一个关闭标记, 然后循环将停止.
    ```
    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.*
    fun main() = runBlocking {
    val channel = Channel<Int>()
    launch {
    for (x in 1..5) channel.send(x * x)
    channel.close() // we're done sending
    }
    // here we print received values using `for` loop (until the channel is closed)
    for (y in channel) println(y)
    println("Done!")
    }
    ```

    ### 构建通道生产者
    生产者-消费者模式很常见, 使用 `produce` 协程构建器可以轻松地实现生产者. 消费端可以使用 `consumeEach` 扩展函数 , 它可以取代 `for` 循环迭代.
    ```
    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.*
    fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce { //注意返回的类型
    for (x in 1..5) send(x * x)
    }
    // 注意: produceSquares() 是定义在 CoroutineScope 上的扩展方法
    fun main() = runBlocking {
    val squares = produceSquares()
    squares.consumeEach { println(it) }
    println("Done!")
    }
    ```

    ### 管道 (Pipelines)
    管道是一种模式, 它有一个生产者协程, 可能生产出无限的一系列的值. 另外有一个或多个消费协程, 消费值, 然后也可能生产出另外一些值.
    ```
    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.*
    fun main() = runBlocking {
    val numbers = produceNumbers() // produces integers from 1 and on
    val squares = square(numbers) // squares integers
    for (i in 1..5) println(squares.receive()) // print first five
    println("Done!") // we are done
    coroutineContext.cancelChildren() // cancel children coroutines
    }
    //生产者
    fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // infinite stream of integers starting from 1
    }
    //消费者
    fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x)
    }
    ```
    注意生产者和消费者都是定义在 `CoroutineScope` 上的扩展函数, 这样我们可以依赖结构化并发保证取消或结束时没有泄漏.

    ### 扇出 (Fan-out) - 一对多生产消费模式
    多个消费者协程可能消费同一个管道, 如下:
    ```
    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.*
    fun main() = runBlocking<Unit> {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
    }
    fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1 // start from 1
    while (true) {
    send(x++) // produce next
    delay(100) // wait 0.1s
    }
    }
    fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
    println("Processor #$id received $msg")
    }
    }
    ```
    ```
    Processor #2 received 1
    Processor #4 received 2
    Processor #0 received 3
    Processor #1 received 4
    Processor #3 received 5
    Processor #2 received 6
    Processor #4 received 7
    Processor #0 received 8
    Processor #1 received 9
    Processor #3 received 10
    ```
    注意: 取消生产者协程会关闭管道, 最终终止在其上迭代的消费者协程. `launchProcessor` 方法内, 在 Channel 迭代的 `for` 循环与前面介绍的 `consumeEach` 不同, 即使在多协程模式下, 它也是完全安全的, 如果一个消费者协程失败了, 其他的仍然可以正确迭代Channel, 而使用 `consumeEach` 则会**在异常结束时会取消 Channel**, 这样会影响其他消费协程.

    ### 扇入 (Fan-in) - 多对一生产消费模式
    有时多个生产协程会往一个管道中生产数据, 而只有一个消费协程.
    ```
    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.*
    fun main() = runBlocking {
    val channel = Channel<String>()
    launch { sendString(channel, "foo", 200L) }
    launch { sendString(channel, "BAR!", 500L) }
    repeat(6) { // receive first six
    println(channel.receive())
    }
    println("ddd")
    coroutineContext.cancelChildren() // cancel all children to let main finish
    }
    suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
    while (true) {
    delay(time)
    channel.send(s)
    }
    }
    ```
    ```
    foo
    foo
    BAR!
    foo
    foo
    foo
    ```

    ### 缓冲 Channel

    Channel 默认是不带缓存区的, 如果发送方先执行, 则需要挂起等待接收方消费后才会发送下一个数据, 反之亦然. Channel 的工厂方法以及 `produce` builder 方法有一个可选的 `capacity` 参数可以指定缓冲区的大小, 它使生产者在挂起等待前可以最多发送多少个数据, 与 `BlockingQueue` 的缓冲区大小参数类似, 如果 `BlockingQueue` 的缓冲区满了, 则发送操作会阻塞.
    ```
    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.*
    fun main() = runBlocking<Unit> {
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch { // launch sender coroutine
    repeat(10) {
    println("Sending $it") // print before sending each element
    channel.send(it) // will suspend when buffer is full
    }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine
    }
    ```
    ```
    Sending 0
    Sending 1
    Sending 2
    Sending 3
    Sending 4
    ```
    前4个元素发送到管道了, 但第5个只是打印出日志, 并没有发送成功, 而是挂起了.

    ### Channel 是公平的
    Channel 按调用的顺序来决定谁先发送或接收数据, 即 FIFO 算法.
    ```
    data class Ball(var hits: Int)
    fun main() = runBlocking {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
    }
    suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
    ball.hits++
    println("$name $ball")
    delay(300) // wait a bit
    // 这里发送数据, 但即使在for循环内马上接收数据, 但另一个协程已经在他处等待, 优先接收了.
    table.send(ball)
    }
    }
    ```
    ```
    ping Ball(hits=1)
    pong Ball(hits=2)
    ping Ball(hits=3)
    pong Ball(hits=4)
    ```
    虽然 Channel 是公平的, 但因为执行器(`executor`)的原因, 可能看出来不公平.

    ### 时钟 Channel
    有时需要一个信号发生器, 每隔一定的周期或者延迟发送一个信号, 以便做一些同步动作或者重复性的工作. `Ticker` Channel 就是这种信号生产器通道. 它可以使用 `ticker` 工厂方法创建. `Ticker` Channel只产生`Unit`类型的数值, 并且有两种产生信号的时机模式, 固定周期(`FIXED_PERIOD`)或者固定延迟(`FIXED_DELAY`), 固定延迟以消费者接收到数据时开始计时.
    ```
    import kotlinx.coroutines.*
    import kotlinx.coroutines.channels.*
    fun main() = runBlocking<Unit> {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel
    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Initial element is available immediately: $nextElement") // initial delay hasn't passed yet
    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements has 100ms delay
    println("Next element is not ready in 50 ms: $nextElement")
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 100 ms: $nextElement")
    // Emulate large consumption delays
    println("Consumer pauses for 150ms")
    delay(150)
    // Next element is available immediately
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    println("Next element is available immediately after large consumer delay: $nextElement")
    // Note that the pause between `receive` calls is taken into account and next element arrives faster
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
    tickerChannel.cancel() // indicate that no more elements are needed
    }
    ```
    ```
    Initial element is available immediately: kotlin.Unit
    Next element is not ready in 50 ms: null
    Next element is ready in 100 ms: kotlin.Unit
    Consumer pauses for 150ms
    Next element is available immediately after large consumer delay: kotlin.Unit
    Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
    ```
    `Ticker` 通道可以觉察可能暂停的消费者, 并且当消费者消费太慢时, 停止生产, 自我调整以维护一个固定的生产数据的频率.