Android

关注公众号 jb51net

关闭
首页 > 软件编程 > Android > Kotlin 协程Channel使用

Kotlin 协程之Channel的概念和基本使用详解

作者:XeonYu

文章介绍协程在复杂场景中使用Channel进行数据传递与控制,涵盖创建参数、缓冲策略、操作方式及异常处理,适用于持续数据流、多协程协作等,需注意容量配置和状态管理,本文给大家介绍Kotlin协程之Channel的概念和基本使用,感兴趣的朋友一起看看吧

前言

在 专栏 之前的文章中,我们已经知道了协程的启动、挂起、取消、异常以及常用的协程作用域等基础应用。
这些基础应用适合的场景是一次性任务,执行完就结束了的场景。

launch / async 适合的场景

而对于一些相对复杂的场景,例如:持续的数据流、需要在不同的协程之间传递数据、需要顺序或背压控制等场景,基础的 launch / async
就不够用了。

例如:

类似这种持续的、需要顺序控制、或者多个协程配合执行的场景,就需要用到 Channel 了。

Channel 的概念和基本使用

概念

顾名思义,Channel 有管道、通道的意思。Channel 跟 Java 中的 BlockingQueue 很相似,区别在于 Channel 是挂起的,不是阻塞的。

Channel 的核心特点就是能够在不同的协程之间进行数据传递,并且能够控制数据传递的顺序。
使用起来很简单,基本就分为以下几步:

  1. 创建 Channel
  2. 通过 channel.send 发送数据
  3. 通过 channel.receive 接收数据

整体的概念也比较简单形象,就是一根管道,一个口子发送数据,一个口子接收数据。

Channel 的创建

先来看下 Channel 的源码,可以看到会根据传入的参数选择不同的实现。

public fun <E> Channel(
    capacity: Int = RENDEZVOUS,
    onBufferOverflow: BufferOverflow = BufferOverflow.SUSPEND,
    onUndeliveredElement: ((E) -> Unit)? = null
): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> {
            if (onBufferOverflow == BufferOverflow.SUSPEND)
                BufferedChannel(RENDEZVOUS, onUndeliveredElement) // an efficient implementation of rendezvous channel
            else
                ConflatedBufferedChannel(
                    1,
                    onBufferOverflow,
                    onUndeliveredElement
                ) // support buffer overflow with buffered channel
        }
        CONFLATED -> {
            require(onBufferOverflow == BufferOverflow.SUSPEND) {
                "CONFLATED capacity cannot be used with non-default onBufferOverflow"
            }
            ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
        }
        UNLIMITED -> BufferedChannel(
            UNLIMITED,
            onUndeliveredElement
        ) // ignores onBufferOverflow: it has buffer, but it never overflows
        BUFFERED -> { // uses default capacity with SUSPEND
            if (onBufferOverflow == BufferOverflow.SUSPEND) BufferedChannel(
                CHANNEL_DEFAULT_CAPACITY,
                onUndeliveredElement
            )
            else ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
        }
        else -> {
            if (onBufferOverflow === BufferOverflow.SUSPEND) BufferedChannel(capacity, onUndeliveredElement)
            else ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
        }
    }

参数概览

参数类型默认值描述
capacityIntRENDEZVOUS通道容量,决定缓冲区大小和行为模式
onBufferOverflowBufferOverflowSUSPEND缓冲区溢出时的处理策略
onUndeliveredElement((E) -> Unit)?null元素未能送达时的回调函数

capacity(容量配置)

capacity 参数决定了 Channel 的缓冲行为和容量大小:

onBufferOverflow(溢出策略)

当缓冲区满时的处理策略:

onUndeliveredElement(未送达回调)

当元素无法送达时的清理回调函数:

参数组合效果

capacityonBufferOverflow行为适用场景
RENDEZVOUSSUSPEND无缓冲,同步通信严格的生产者-消费者同步
BUFFEREDSUSPEND有限缓冲,满时挂起一般的异步处理,默认的缓冲数量是 64
UNLIMITEDSUSPEND缓冲长度为 Int.MAX_VALUE高吞吐量场景(生产上不建议使用,有内存方面的风险)
CONFLATEDDROP_OLDEST无缓冲,只保留最新值状态更新、实时数据
自定义大小SUSPEND固定大小,满时挂起批量处理、批量任务
自定义大小DROP_OLDEST固定大小,丢弃旧数据获取最近 N 个元素
自定义大小DROP_LATEST固定大小,拒绝新数据保护重要历史数据

Capacity

RENDEZVOUS(会合模式)

特点:

使用示例:

suspend fun demonstrateRendezvousChannel() {
    // 创建 RENDEZVOUS Channel(默认容量为 0),默认什么都不传就是 rendezvous 模式,Channel<String>()
    val rendezvousChannel = Channel<String>(Channel.RENDEZVOUS)
    // 启动发送者协程
    val senderJob = GlobalScope.launch {
        println("[发送者] 准备发送消息...")
        rendezvousChannel.send("Hello from RENDEZVOUS!")
        println("[发送者] 消息已发送")
        rendezvousChannel.send("Second message")
        println("[发送者] 第二条消息已发送")
        rendezvousChannel.close()
    }
    // 启动接收者协程
    val receiverJob = GlobalScope.launch {
        delay(1000) // 延迟1秒,发送者会等待接收者准备好
        println("[接收者] 开始接收消息...")
        for (message in rendezvousChannel) {
            println("[接收者] 收到消息: $message")
            delay(500) // 模拟处理时间
        }
        println("[接收者] Channel已关闭")
    }
    // 等待所有协程完成
    joinAll(senderJob, receiverJob)
}

执行结果

CONFLATED(只留最新值)

特点:

源码分析:

CONFLATED -> {
    require(onBufferOverflow == BufferOverflow.SUSPEND) {
        "CONFLATED capacity cannot be used with non-default onBufferOverflow"
    }
    ConflatedBufferedChannel(1, BufferOverflow.DROP_OLDEST, onUndeliveredElement)
}

使用示例:

suspend fun demonstrateConflatedChannel() {
    // 创建 CONFLATED Channel,相当于:Channel<String>(1, BufferOverflow.DROP_OLDEST)
    val conflatedChannel = Channel<String>(Channel.CONFLATED)
    // 快速发送多个消息
    val senderJob = GlobalScope.launch {
        repeat(5) { i ->
            val message = "Update-$i"
            conflatedChannel.send(message)
            println("[发送者] 发送更新: $message")
            delay(100) // 短暂延迟
        }
        conflatedChannel.close()
    }
    // 慢速接收者
    val receiverJob = GlobalScope.launch {
        delay(1000) // 延迟1秒,让发送者发送完所有消息
        println("[接收者] 开始接收(只会收到最新的值)...")
        for (message in conflatedChannel) {
            println("[接收者] 收到: $message")
        }
    }
    joinAll(senderJob, receiverJob)
}

UNLIMITED(无限容量)

特点:

suspend fun demonstrateUnlimitedChannel() {
    val unlimitedChannel = Channel<String>(Channel.UNLIMITED)
    val senderJob = GlobalScope.launch {
        repeat(10) { i ->
            val message = "Message-$i"
            unlimitedChannel.send(message)
            println("[发送者] 立即发送: $message")
        }
        unlimitedChannel.close()
        println("[发送者] 所有消息已发送,Channel已关闭")
    }
    val receiverJob = GlobalScope.launch {
        delay(1000) // 延迟1秒开始接收
        println("[接收者] 开始慢速接收...")
        for (message in unlimitedChannel) {
            println("[接收者] 处理: $message")
            delay(300) // 模拟处理时间
        }
    }
    joinAll(senderJob, receiverJob)
}

BUFFERED(有限容量)

特点:

源码分析:

BUFFERED -> {
    if (onBufferOverflow == BufferOverflow.SUSPEND)
        BufferedChannel(CHANNEL_DEFAULT_CAPACITY, onUndeliveredElement)
    else
        ConflatedBufferedChannel(1, onBufferOverflow, onUndeliveredElement)
}

使用示例:

suspend fun demonstrateBufferedDefaultChannel() {
    // 创建 BUFFERED Channel(默认容量为 64)
    val bufferedChannel = Channel<String>(Channel.BUFFERED)
    val senderJob = GlobalScope.launch {
        repeat(100) { i ->
            bufferedChannel.send("Message-$i")
            println("[发送者] 发送 Message-$i")
        }
        bufferedChannel.close()
    }
    val receiverJob = GlobalScope.launch {
        delay(1000) // 延迟接收
        for (message in bufferedChannel) {
            println("[接收者] 收到: $message")
            delay(50)
        }
    }
    joinAll(senderJob, receiverJob)
}

与下面自定义容量效果类似。

自定义容量

特点:

源码分析:

else -> {
    if (onBufferOverflow === BufferOverflow.SUSPEND)
        BufferedChannel(capacity, onUndeliveredElement)
    else
        ConflatedBufferedChannel(capacity, onBufferOverflow, onUndeliveredElement)
}

使用示例:

suspend fun demonstrateBufferedChannel() {
    // 创建容量为3的缓冲Channel
    val bufferedChannel = Channel<Int>(capacity = 3)
    // 启动发送者协程
    val senderJob = GlobalScope.launch {
        repeat(5) { i ->
            println("[发送者] 发送数字: $i")
            bufferedChannel.send(i)
            println("[发送者] 数字 $i 已发送")
        }
        bufferedChannel.close()
        println("[发送者] Channel已关闭")
    }
    // 启动接收者协程,延迟接收以观察缓冲效果
    val receiverJob = GlobalScope.launch {
        delay(2000) // 延迟2秒开始接收
        println("[接收者] 开始接收数字...")
        for (number in bufferedChannel) {
            println("[接收者] 收到数字: $number")
            delay(800) // 模拟慢速处理
        }
    }
    joinAll(senderJob, receiverJob)
}

可以看到,因为默认的溢出策略是 SUSPEND,所以当缓冲区满了时,发送者会被挂起,直到接收者处理完一个元素,才会继续发送。

BufferOverflow 策略详解

当 Channel 的缓冲区满时,BufferOverflow 参数决定了如何处理新的发送请求:

SUSPEND(默认策略)

suspend fun demonstrateBasicOperations() {
    //容量为 2,溢出策略为SUSPEND
    val channel = Channel<String>(capacity = 2, onBufferOverflow = BufferOverflow.SUSPEND)
    //发送的速度快
    val job1 = GlobalScope.launch {
        repeat(5) {
            channel.send("Message-$it")
            println("[发送者] 发送 Message-$it")
        }
        channel.close()
    }
    val job2 = GlobalScope.launch {
        //除了用 channel.recrive 外,也可以直接 用 for 循环接收数据
        for (message in channel) {
            //接收的速度慢
            delay(1000)
            println("[接收者] 接收到: $message")
        }
    }
    joinAll(job1, job2)
}

DROP_LATEST

suspend fun demonstrateBasicOperations() {
    val channel = Channel<String>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_LATEST)
    val job1 = GlobalScope.launch {
        repeat(5) {
            channel.send("Message-$it")
            println("[发送者] 发送 Message-$it")
        }
        channel.close()
    }
    val job2 = GlobalScope.launch {
        for (message in channel) {
            delay(1000)
            println("[接收者] 接收到: $message")
        }
    }
    joinAll(job1, job2)
}

可以看到,当缓冲区满时,会把新数据丢弃掉,因此,接收端只接收到了旧数据。

DROP_OLDEST

suspend fun demonstrateBasicOperations() {
    val channel = Channel<String>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST)
    val job1 = GlobalScope.launch {
        repeat(5) {
            channel.send("Message-$it")
            println("[发送者] 发送 Message-$it")
        }
        channel.close()
    }
    val job2 = GlobalScope.launch {
        for (message in channel) {
            delay(1000)
            println("[接收者] 接收到: $message")
        }
    }
    joinAll(job1, job2)
}

需要注意的是,当缓冲区满了之后,1 和 2 被丢弃了,3 和 4 被放进去了。从这里可以看出,丢弃数据时,并不是把最早的旧数据丢掉,这里跟内部的实现有关。

onUndeliveredElement 回调

当元素无法送达时(如 Channel 被取消或关闭),会调用此回调函数

suspend fun demonstrateBasicOperations() {
    val channel = Channel<String>(capacity = 2, onBufferOverflow = BufferOverflow.DROP_OLDEST) {
        println("[Channel] 缓冲区已满,无法放到缓冲区,值:${it}")
    }
    // 演示基本的send和receive操作
    val job1 = GlobalScope.launch {
        repeat(5) {
            channel.send("Message-$it")
            println("[发送者] 发送 Message-$it")
        }
        channel.close()
    }
    val job2 = GlobalScope.launch {
        for (message in channel) {
            delay(1000)
            println("[接收者] 接收到: $message")
        }
    }
    joinAll(job1, job2)
}

Channel 操作方式

Channel 提供了两种操作方式:阻塞操作和非阻塞操作。

阻塞操作(send/receive)

send()receive() 方法都是挂起方法,它们会阻塞当前协程,直到完成操作。

非阻塞操作(trySend/tryReceive)

trySend()tryReceive() 是 Channel 提供的非阻塞操作 API。与阻塞版本不同,这些方法会立即返回结果,不会挂起当前协程,也不会抛出异常。

操作对比

操作类型阻塞版本非阻塞版本行为差异
发送send()trySend()send() 会挂起直到有空间;trySend() 立即返回结果
接收receive()tryReceive()receive() 会挂起直到有数据;tryReceive() 立即返回结果

返回值类型

ChannelResult 是一个密封类,通过密封类中的成员 isSuccessgetOrNull() 可以判断操作是否成功。

大部分场景下,send / receive + 合理的 Channel 配置就能解决问题,trySend/tryReceive 更多的是想达到如下效果:

runBlocking {
    val channel = Channel<Int>(2)
    val sendJob = launch {
        repeat(5) {
            delay(100)
            val sendResult = channel.trySend(it)
            sendResult.onSuccess {
                println("发送成功")
            }.onFailure {
                println("发送失败")
            }.onClosed {
                println("通道已关闭")
            }
        }
    }
    val receiveJob = launch {
        for (i in channel) {
            delay(300)
            println("接收到数据:${i}")
        }
    }
    joinAll(sendJob, receiveJob)
}

Channel 状态管理

Channel 在其生命周期中会经历以下几个关键状态:

API

Close(关闭操作)

示例:

    suspend fun demonstrateChannelClose() {
    val channel = Channel<String>(1)
    val producer = GlobalScope.launch {
        try {
            for (i in 1..5) {
                val message = "Message $i"
                println("准备发送: $message")
                channel.send(message)
                println("成功发送: $message")
                delay(100)
            }
        } catch (e: ClosedSendChannelException) {
            println("生产者: Channel已关闭,无法发送数据 - ${e.message}")
        }
    }
    val consumer = GlobalScope.launch {
        try {
            for (message in channel) {
                println("接收到: $message")
                delay(200)
            }
            println("消费者: Channel已关闭,退出接收循环")
        } catch (e: Exception) {
            println("消费者异常: ${e.message}")
        }
    }
    delay(300) // 模拟让一些数据能够被接收到
    // 检查Channel状态
    println("关闭前状态:")
    println("  isClosedForSend: ${channel.isClosedForSend}")
    println("  isClosedForReceive: ${channel.isClosedForReceive}")
    // 关闭Channel
    println("\n正在关闭Channel...")
    channel.close()
    // 检查关闭后的状态
    println("关闭后状态:")
    println("  isClosedForSend: ${channel.isClosedForSend}")
    println("  isClosedForReceive: ${channel.isClosedForReceive}")
    // 等待协程完成
    producer.join()
    consumer.join()
    println("最终状态:")
    println("  isClosedForSend: ${channel.isClosedForSend}")
    println("  isClosedForReceive: ${channel.isClosedForReceive}")
}

Cancel(取消操作)

cancel() 方法用于强制取消 Channel,它会:

suspend fun demonstrateChannelCancel() {
    val channel = Channel<String>(capacity = 5) {
        println("消息未被接收:${it}")
    }
    val producer = GlobalScope.launch {
        try {
            for (i in 1..8) {
                val message = "Message $i"
                println("尝试发送: $message")
                channel.send(message)
                println("成功发送: $message")
                delay(100)
            }
        } catch (e: CancellationException) {
            println("生产者: Channel被取消 - ${e.message}")
        }
    }
    val consumer = GlobalScope.launch {
        try {
            for (message in channel) {
                println("接收到: $message")
                delay(300)
            }
        } catch (e: CancellationException) {
            println("消费者: 协程被取消 - ${e.message}")
        }
    }
    delay(400) // 让一些操作执行
    println("\n取消前状态:")
    println("  isClosedForSend: ${channel.isClosedForSend}")
    println("  isClosedForReceive: ${channel.isClosedForReceive}")
    // 取消Channel
    println("\n正在取消Channel...")
    channel.cancel(CancellationException("主动取消Channel"))
    println("取消后状态:")
    println("  isClosedForSend: ${channel.isClosedForSend}")
    println("  isClosedForReceive: ${channel.isClosedForReceive}")
    // 等待协程完成
    producer.join()
    consumer.join()
}

Channel 异常处理

在使用 Channel 的过程中,会遇到各种异常情况。主要包括以下几种类型:

ClosedSendChannelException

触发条件:

示例:

suspend fun demonstrateClosedSendException() {
    val channel = Channel<String>()
    // 关闭 Channel
    channel.close()
    try {
        // 尝试在已关闭的 Channel 上发送数据
        channel.send("This will throw exception")
    } catch (e: ClosedSendChannelException) {
        println("捕获异常: ${e.message}")
        println("异常类型: ${e::class.simpleName}")
    }
}

ClosedReceiveChannelException

触发条件:

示例:

suspend fun demonstrateClosedReceiveException() {
    val channel = Channel<String>()
    // 关闭 Channel
    channel.close()
    try {
        // 尝试从已关闭且空的 Channel 接收数据
        val message = channel.receive()
        println("收到消息: $message")
    } catch (e: ClosedReceiveChannelException) {
        println("捕获异常: ${e.message}")
        println("异常类型: ${e::class.simpleName}")
    }
}

CancellationException

触发条件:

示例:

suspend fun demonstrateCancellationException() {
    val channel = Channel<String>()
    val job = GlobalScope.launch {
        try {
            // 这个操作会被取消
            channel.send("This will be cancelled")
        } catch (e: CancellationException) {
            println("发送操作被取消: ${e.message}")
            throw e // 重新抛出 CancellationException
        }
    }
    delay(100)
    // 取消 Channel
    channel.cancel(CancellationException("手动取消 Channel"))
    try {
        job.join()
    } catch (e: CancellationException) {
        println("协程被取消: ${e.message}")
    }
}

异常与状态关系

Channel 状态send() 行为receive() 行为trySend() 行为tryReceive() 行为
活跃状态正常发送或挂起正常接收或挂起返回成功/失败结果返回成功/失败结果
发送端关闭抛出 ClosedSendChannelException正常接收缓冲区数据返回失败结果正常返回结果
接收端关闭抛出 ClosedSendChannelException抛出 ClosedReceiveChannelException返回失败结果返回失败结果
已取消抛出 CancellationException抛出 CancellationException返回失败结果返回失败结果

异常处理技巧

使用非阻塞操作避免异常

非阻塞操作不会抛出异常,而是返回结果对象:

suspend fun safeChannelOperations() {
    val channel = Channel<String>()
    // 安全的发送操作
    val sendResult = channel.trySend("Safe message")
    when {
        sendResult.isSuccess -> println("发送成功")
        sendResult.isFailure -> println("发送失败: ${sendResult.exceptionOrNull()}")
        sendResult.isClosed -> println("Channel 已关闭")
    }
    // 安全的接收操作
    val receiveResult = channel.tryReceive()
    when {
        receiveResult.isSuccess -> println("接收到: ${receiveResult.getOrNull()}")
        receiveResult.isFailure -> println("接收失败: ${receiveResult.exceptionOrNull()}")
        receiveResult.isClosed -> println("Channel 已关闭")
    }
}

健壮的异常处理

suspend fun robustChannelUsage() {
    val channel = Channel<String>()
    val producer = GlobalScope.launch {
        try {
            repeat(5) { i ->
                if (channel.isClosedForSend) {
                    println("Channel 已关闭,停止发送")
                    break
                }
                channel.send("Message $i")
                delay(100)
            }
        } catch (e: ClosedSendChannelException) {
            println("生产者: Channel 已关闭")
        } catch (e: CancellationException) {
            println("生产者: 操作被取消")
            throw e // 重新抛出取消异常
        } finally {
            println("生产者: 清理资源")
        }
    }
    val consumer = GlobalScope.launch {
        try {
            while (!channel.isClosedForReceive) {
                try {
                    val message = channel.receive()
                    println("消费者: 收到 $message")
                } catch (e: ClosedReceiveChannelException) {
                    println("消费者: Channel 已关闭且无更多数据")
                    break
                }
                delay(200)
            }
        } catch (e: CancellationException) {
            println("消费者: 操作被取消")
            throw e
        } finally {
            println("消费者: 清理资源")
        }
    }
    delay(1000)
    channel.close()
    joinAll(producer, consumer)
}

总结

Channel 关键概念对比

特性RENDEZVOUSCONFLATEDBUFFEREDUNLIMITED自定义容量
容量0164Int.MAX_VALUE指定值
缓冲行为无缓冲,同步只保留最新值有限缓冲无限缓冲有限缓冲
发送阻塞缓冲满时缓冲满时
适用场景严格同步状态更新一般异步高吞吐量批量处理
内存风险中等可控

溢出策略对比

策略行为性能特点适用场景
SUSPEND挂起发送操作提供背压控制确保数据完整性
DROP_OLDEST丢弃旧元素发送不阻塞实时数据流
DROP_LATEST丢弃新元素发送不阻塞保护历史数据

操作方式

操作类型阻塞版本非阻塞版本异常处理返回值
发送send()trySend()抛出异常ChannelResult<Unit>
接收receive()tryReceive()抛出异常ChannelResult<T>
特点会挂起协程立即返回需要 try-catch通过结果对象判断

Channel 状态生命周期

状态描述send()receive()检查方法
活跃正常工作状态✅ 正常✅ 正常-
发送关闭调用 close() 后❌ 异常✅ 可接收缓冲区数据isClosedForSend
接收关闭缓冲区清空后❌ 异常❌ 异常isClosedForReceive
已取消调用 cancel() 后❌ 异常❌ 异常-

总体来说,Channel 是一种非常强大的协程通信机制,它可以帮助我们在协程之间进行安全、高效的通信。在使用 Channel时,我们需要注意异常处理、缓冲区容量、溢出策略等问题。

感谢阅读,如果对你有帮助请三连(点赞、收藏、加关注)支持。有任何疑问或建议,欢迎在评论区留言讨论。如需转载,请注明出处:喻志强的博客

到此这篇关于Kotlin 协程之Channel的概念和基本使用详解的文章就介绍到这了,更多相关Kotlin 协程Channel使用内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

您可能感兴趣的文章:
阅读全文