개요
지금 watchOS를 통해 healthCare Connection API의 데이터를 callback을 통해 비동기로 받아오는 샘플 코드가 있었다. callbackFlow가 뭐지(?) 라고 요리조리 뜯어보는 과정에서 kotlin의 channel이라는 개념이 나왔고, MVI 아키텍처를 적용하는 과정에서도 SideEffect를 처리할 때 Channel을 썼던 코드를 본 기억이 있다. 그래서 먼저 channel 부터 알아야 할 필요성을 느끼게 되었고, 이렇게 정리하게 되었다.
1. Channel이란?
Channel은 코루틴 간의 데이터 스트림을 공유할 수 있도록 하는 파이프의 역할을 한다. 또한 데이터를 생산하는측과 소비하는 측 간의 데이터 흐름을 관리해 생산자 - 소비자 패턴을 쉽게 구현할 수 있도록 한다.
Channel은 Java의 BlockingQueue와 매우 비슷하다.
BlockingQueue?
자바의 java.util.concurrent 패키지에서 제공하는 인터페이스로, 여러 스레드가 동시에 안전하게 데이터를 주고 받을 수 있게 설계된 큐이다.
어떻게 안전하게 주고받게 하냐?
큐에 데이터를 추가하거나 제거할 때 특정 조건을 만족할 때까지 스레드를 대기시키는 기능을 제공한다.
주요 메서드
put(E e) -> 큐에 데이터가 추가될 공간이 생길 때까지 대기한 후 데이터를 추가한다.
take() -> 큐에 데이터가 존재할때까지 대기한 후 데이터를 꺼내 반환한다.
Channel과의 차이점이 있다면, put(), take()를 설명한 것처럼 BlockingQueue는 스레드가 차단 상태로 바뀌지만, Channel은 Suspend로 일시 중지 상태로 비동기적으로 작동한다.
Channel 기본 사용법
send(), receive() = for 반복문, close()
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
fun main() = runBlocking {
// 채널 생성
val channel = Channel<Int>()
// 데이터 생산자
launch {
for (x in 1..5) {
println("Sent $x")
channel.send(x) // 데이터를 채널에 전송
}
channel.close() // 더 이상 데이터가 없음을 알림
}
// 데이터 소비자
launch {
for (y in channel) { // 채널이 닫힐 때까지 데이터를 받음
println("Received $y")
}
}
// Sent: 1
// Received 1
// Received 2
// Sent: 2
// Sent: 3
// Received 3
// Sent: 4
// Received 4
// Sent: 5
// Received 5
}
send() -> 데이터를 채널에 전송
close() -> 더 이상 보낼 데이터가 없을 때 채널을 닫는다.
receive() -> 데이터를 채널로부터 받는다. 이때 for 반복문을 통해 채널의 데이터 전체를 받아올 수 있다!
이때, send() 및 receive()의 경우 suspend 함수이기 때문에 코루틴 안에서 선언해줘야한다.
produce(), consumeEach()
val channel2 = produce<Int> {
for(i in 1..10){
channel.send(i)
}
}
channel2.consumeEach {
println("Receive $it")
}
채널은 위에 언급했듯이 생산자 - 소비자 패턴으로 더 쉽게 구현할 수 있도록 하는 확장함수가 있다. 바로 produce와 consumeEach이다.
이 뿐만 아니라 consumeAsFlow()를 통해 Flow 형식으로 받을 수 있는데, Channel이 Hot Stream이기 때문에 소비자를 기다리지 않고 한 번에 데이터를 방출한다. 그렇기에 Flow의 장점인 filter, map과 같이 가공해야 할때 사용하면 유용하다!
2. Channel의 특징 (Channel을 왜 쓸까?)
1) 단일 소비자 패턴
-> Channel은 기본적으로 단일 소비자가 데이터를 처리하도록 설계되어 있다. 그래서 이벤트를 단 하나의 소비자가 처리해야 하는 경우에 적합하다.
* 참고
이 부분이 SharedFlow와의 차이라고 생각한다. SharedFlow의 경우에는 이벤트가 여러 소비자에게 동시에 전달되는 브로드캐스트 상황에 적합하다.
2) 백프레셔 처리
-> 백프레셔 (= 생산자가 소비자보다 빠르게 데이터를 발행하는 상황)에 있어 Channel은 여러 버퍼링 전략을 통해 해결할 수 있도록 한다.
3. 채널의 타입
Channel은 4가지 타입을 바탕으로 위에 언급한 백프레셔를 처리하는(= 버퍼링하는) 전략을 제공한다. 하나씩 확인해보자
1) Rendezvous
-> 버퍼가 없는 채널로 소비자가 준비될 때까지 생산자가 보내는 것을 중단했다가, 준비되면 그때 보낸다.
fun main() = runBlocking {
val channel = Channel<Int>(Channel.RENDEZVOUS)
// 생산자 코루틴
launch {
for (x in 1..3) {
delay(50)
println("생산자 : $x")
channel.send(x) // 소비자가 준비될 때까지 대기
}
}
// 소비자 코루틴
launch {
for (y in 1..3) {
delay(100)
val receive = channel.receive()
println("소비자 : $receive")
}
}
return@runBlocking
}
// 생산자 : 1
// 소비자 : 1
// 생산자 : 2
// 소비자 : 2
// 생산자 : 3
// 소비자 : 3
2) BUFFERED
-> 버퍼를 통해 소비자가 데이터를 처리하기 전에 생산자는 데이터를 추가로 보낼 수가 있다.
fun main() = runBlocking {
val channel = Channel<Int>(Channel.BUFFERED)
// 생산자 코루틴
launch {
for (x in 1..5) {
delay(50)
println("생산자 : $x")
channel.send(x) // 소비자가 준비될 때까지 대기
}
}
// 소비자 코루틴
launch {
for (y in 1..5) {
delay(100)
val receive = channel.receive()
println("소비자 : $receive")
}
}
return@runBlocking
}
// 생산자 : 1
// 소비자 : 1
// 생산자 : 2
// 생산자 : 3
// 소비자 : 2
// 생산자 : 4
// 생산자 : 5
// 소비자 : 3
// 소비자 : 4
// 소비자 : 5
여기서 val channel = Channel<Int>(2) 와 같이 적게 되면, 버퍼의 크기를 정할 수 있다.
3) UNLIMITED
-> BUFFER와 동일한 기능을 가지고 있지만, 차이점은 버퍼 용량에 제한이 없다는 것이다. 그래서 send()할 때 일시 중단할 일이 없지만, receive()시 버퍼가 비어있다면 일시 중단된다.
fun main() = runBlocking {
val channel = Channel<Int>(Channel.UNLIMITED)
// 생산자 코루틴
launch {
for (x in 1..5) {
delay(20)
println("생산자 : $x")
channel.send(x) // 소비자가 준비될 때까지 대기
}
}
// 소비자 코루틴
launch {
for (y in 1..5) {
delay(100)
val receive = channel.receive()
println("소비자 : $receive")
}
}
return@runBlocking
}
// 생산자 : 1
// 생산자 : 2
// 생산자 : 3
// 생산자 : 4
// 소비자 : 1
// 생산자 : 5
// 소비자 : 2
// 소비자 : 3
// 소비자 : 4
// 소비자 : 5
4) CONFLATED
-> 버퍼가 1인 Channel을 유지하고, 최신의 데이터만 유지된다! 즉 소비자가 데이터를 받기 전에 생산자가 데이터를 보낸다면, 이전에 보냈던 데이터를 덮어써 결국 최신의 데이터만 소비자는 받게된다.
package com.example.toy.flow
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val channel = Channel<Int>(Channel.CONFLATED)
// 생산자 코루틴
launch {
for (x in 1..5) {
delay(20)
println("생산자 : $x")
channel.send(x) // 소비자가 준비될 때까지 대기
}
}
// 소비자 코루틴
launch {
for (y in 1..5) {
delay(100)
val receive = channel.receive()
println("소비자 : $receive")
}
}
return@runBlocking
}
// 생산자 : 1
// 생산자 : 2
// 생산자 : 3
// 생산자 : 4
// 소비자 : 4
// 생산자 : 5
// 소비자 : 5
4. BufferOverflow 정책
위에서 채널에 들어가는 매개변수로 capacity가 있었고, 그 아래에 onBufferOverflow가 있었다.
얘는 Channel의 버퍼가 꽉 찼을 때 데이터를 어떻게 처리할지 결정하는 것! 총 3가지가 있는데 이름이 직관적이여서 알기가 쉽다.
SUSPEND
-> 버퍼가 가득 찬 경우 생산자가 데이터를 보내는 것을 중지한다! 즉, 소비자가 데이터를 소비할 때까지 생산자는 일시 중단 상태가 된다.
DROP_OLDEST
-> 버퍼가 가득 찬 경우 가장 오래된 데이터를 삭제하고 새로운 데이터를 버퍼에 추가한다.
DROP_LATEST
-> 버퍼가 가득 찬 경우 새로 들어오는 데이터를 삭제한다. 즉, 기존의 버퍼 데이터는 계속 유지하고 새로운 데이터는 무시하는 방식이다.
정리하자면, Channel은 코루틴 간에 데이터를 주고 받기 위해 만들어졌고, 생산자 - 소비자의 패턴! 버퍼라는 것을 통해 백프레셔를 해결하도록 해준다. 그리고 SharedFlow와의 차이는 단일 대상으 한다는 점.
'Language > kotlin' 카테고리의 다른 글
[Kotlin] 접근 제한자(Visibility Modifier) (0) | 2025.02.13 |
---|---|
[Kotlin] Enum 클래스 (0) | 2025.02.05 |
[Kotlin] StateFlow 및 SharedFlow (2) | 2024.10.07 |
[Kotlin] Flow - 소개 및 연산자 (8) | 2024.09.29 |
[Kotlin] 고차 함수와 람다 표현식 (1) | 2024.08.10 |