성장기록지
Kotlin Data Stream (Sequence, Hot, Cold Stream) 본문
Data Stream
- 데이터의 순차적인 흐름을 의미
- 동기적으로 동작하는 data Stream의 대표적 예시로 Sequence가 있다.
- 비동기적으로 동작하는 data Stream의 대표적 예시로 Flow가 있다.
- Flow는 다음 글에 자세히 다루겠다.
Sequence(List와 비교)
- List는 모든 값을 한번에 생성하고, 반환한다.
- 하지만 Sequence는 순차적으로 데이터를 처리 후 반환한다
이렇게 간단하게 설명한다고 이해가 쉽지는 않으니 조금 더 자세히 설명해보겠다.
Sequence의 목적
- sequence의 주요 목적은 컬렉션 각 요소를 지연 처리하는 것이다.
- 주로 하나의 스레드에서 순차적으로 요소를 처리하는데 초점을 맞추고 있다.
- 순차적인 처리에서 병렬 처리는 결함 요소가 될 수 있기에 효율적인 단일 스레드 처리를 목적으로 한다.
- 따라서 sequence는 element-by-element order로 연산을 처리한다.
element-by-element order vs step-by-step order
- element-by-elemnt order는 요소 각각의 처리를 완료하고 다음 단계로 넘어가는 것을 의미한다.
- List와 같은 Iterable은 step-by-step order로 모든 요소에 단계에 처리하는 연산을 적용한 뒤 다음 단계로 넘어간다.
- 코드를 예시로 한번 보겠다.
// iterable
listOf(1, 2, 3, 4)
.filter { println("[ITEM]: $it"); it % 2 == 0 }
.map { println("[EVEN ITEM]: $it"); it * 11 }
.forEach { println("[MAPPED ITEM]: $it") }
// sequence
listOf(1, 2, 3, 4).asSequence()
.filter { println("[ITEM]: $it"); it % 2 == 0 }
.map { println("[EVEN ITEM]: $it"); it * 11 }
.forEach { println("[MAPPED ITEM]: $it") }
로직 상으로 동일하지만, 결과를 확인하면 차이가 있음을 알 수 있다.
iterable (List)의 경우 모든 아이템이 각 스텝을 실행한 뒤 다음 스텝으로 넘어갔지만,
sequence의 경우 아이템이 스텝의 조건을 충족할 때까지 수행한 뒤 다음 아이템 실행으로 넘어가는 것을 확인할 수 있다.
이를 도식화하여 보면 다음과 같다.
sequence는 아이템별로 실행되기에 short circuit을 통해 연산 성능을 높일 수 있는 것이다!
데이터 스트림의 생산자와 소비자
- 데이터 스트림에는 생산자와 소비자가 있다.
- 생산자는 데이터를 방출하여 소비자에게 지속적으로 데이터를 전달한다.
- sequence에서는 forEach와 같은 최종연산자가 소비자라고 볼 수 있다.
데이터 스트림의 Hot, Cold Stream
- Hot Stream은 데이터 소비와 무관하게 작업을 수행한다.
- 구독하는 소비자가 있든 없든 데이터를 생산한다.
- 따라서 데이터 요청 시 최신 데이터를 수신할 수 있다.
- 반면 Cold Stream은 소비자가 구독할 때 데이터를 생산하고, 각각 독립적인 데이터를 제공한다.
둘의 특징을 예시를 들어보며 이해해 보도록 하겠다.
Cold Stream
cold Stream은 아래와 같은 3가지 특징이 있다.
- 데이터가 내부에서 생성된다.
- 소비자가 소비를 시작할 때 데이터를 생산한다.
- Cold Stream은 하나의 생산자에 하나의 소비자만 존재한다. (UniCast)
Cold Stream인 Flow를 통해 하나씩 알아보겠다.
데이터가 내부에서 생성된다.
flow builder인 flow, flowOf, asFlow로 데이터가 내부에서 생성되는지 확인해보았다.
아래처럼 전부 내부에서 생성되는 것을 확인할 수 있었다.
Cold Stream은 소비자가 소비를 시작할 때 데이터를 생산한다.
- flow는 소비를 시작하는 함수인 종단연산자(collect, fold, reduce, first등)가 호출되지 않으면 데이터를 생산하지 않는다.
- 물론 중간연산자(map, onEach, filter 등)도 종단연산자가 호출되어야 실행된다.
- 위에 설명한 Sequence도 해당된다.
val coldStream = flow<Int> {
emit(1)
emit(2)
}
// 이 시점에 Cold Stream의 데이터가 생산된다.
coldStream.collect {
println(it)
}
Cold Stream은 하나의 생산자에 하나의 소비자만 존재한다. (UniCast)
val coldStream = flow<Int> {
emit(1)
emit(2)
}
coldStream.collect {
println(it) // 1 2 수신
}
coldStream.collect {
println(it) // 1 2 수신
}
- flow를 여러 곳에서 collect할 수 있다.
- 하지만 collect를 할때마다 flow의 block이 새롭게 실행되며 이것은 이전 구독과는 독립적이다.
Hot Stream
Hot Stream의 특징은 다음과 같다.
- 데이터가 외부에서 생성된다.
- 소비자가 소비를 신경쓰지 않고 데이터를 생산한다.
- 하나의 생산자에 다수의 소비자가 존재한다.
Cold Stream인 StateFlow를 통해 하나씩 알아보겠다.
데이터가 외부에서 생성된다.
stateIn은 flow를 StateFlow로 만들어주는 역할을 합니다.
StateFlow는 이처럼 내부에서 미리 데이터를 만드는 게 아니라
flow와 같이 외부의 데이터를 전달 받아 관리해준다.
val stateCount = flow {
for (i in 1 ..100) {
delay(1000L)
emit(i)
}
}.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000L),
initialValue = 99
)
아래는 Viewmodel의 UiState 예시인데, value를 통해 외부에서 데이터를 넣어주기도 한다.
private val _boardLocalUiState = MutableStateFlow<BoardLocalUiState<List<Board.Item>>>(BoardLocalUiState.Loading)
val boardLocalUiState = _boardLocalUiState.asStateFlow()
fun requestBoardLocalItem() = viewModelScope.launch {
_boardLocalUiState.value = BoardLocalUiState.Loading // 1. 상태를 "로딩"으로 설정
runCatching {
requestBoardItemsFind() // 2. 데이터를 요청
}.onSuccess { items ->
_boardLocalUiState.value = items?.let {
BoardLocalUiState.Success(it) // 3. 성공시 데이터를 담아 Success 상태로 업데이트
} ?: BoardLocalUiState.Error("items is Null or Empty") // 4. Null or Empty 처리
}.onFailure {
_boardLocalUiState.value = BoardLocalUiState.Error("requestBoardItemsFind Fail") // 5. 실패시 Error 상태로 업데이트
}
}
소비자의 소비를 신경쓰지않고 생산한다.
이 점은 hot Stream인 데이터 스트림마다 조금씩 다르지만, 우선 StateFlow로 설명하도록 하겠다.
StateFlow는 소비자가 없을 때에도 생산하는것은 아니고,
소비자가 구독을 시작할 때 값을 만들기 시작하고 다른 소비자가 구독할 때 데이터가 계속 발행되고 있기 때문에,
A, B 소비자가 구독한 시간 차이만큼 옛날 데이터를 받지 못하는것이다.
(SharedFlow는 구독하지 않아도 생산한다.)
Viewmodel에 다음과 같은 StateFlow가 있다고 하자 .
val stateCount = flow {
for (i in 1 ..100) {
delay(1000L)
emit(i)
}
}.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000L),
initialValue = 99
)
그다음 fragment에서 다음과 같이 5.5초 대기 후 collect한다면,
구독을 시작한 시점부터 1부터 출력이 된다.
CoroutineScope(Dispatchers.IO).launch {
delay(5500L)
boardViewModel.stateCount.collect {
Log.d("statecount1", it.toString())
}
}
CoroutineScope(Dispatchers.IO).launch {
delay(5500L)
boardViewModel.stateCount.collect {
Log.d("statecount2", it.toString())
}
}
flow의 상세 설명과 마찬가지로 다음 글을 통해서 다른 hot stream들과 비교해보도록 하겠다.
하나의 생산자에 다수의 소비자가 구독할 수 있다.
위의 fragment 코드에서 아래 코루틴스코프의 delay만을 제거해보겠다.
(viewmodel 코드는 같음)
CoroutineScope(Dispatchers.IO).launch {
delay(5500L)
boardViewModel.stateCount.collect {
Log.d("statecount1", it.toString())
}
}
CoroutineScope(Dispatchers.IO).launch {
boardViewModel.stateCount.collect {
Log.d("statecount2", it.toString())
}
}
이대로 실행을 시킨다면 statecount1의 경우 초기값인 99부터 , 1 , 2, 3, ~100의 값을 다 받을 수 있지만
5.5초 대기를 한 statecount1의 경우 5~100의 값을 받을 수 있습니다.
즉 stateFlow는 어떤 소비자든 구독을 시작하면 데이터를 발행하고,
flow 와다르게 데이터를 처음부터 만드는 것이 아닌 최신 데이터를 방출한다.
따라서 각기 다른 코루틴에 존재하는 수신자가 하나의 데이터 stream을 소비하고 있는것이고,
하나의 생산자에 다수의 소비자가 구독할 수 있다고 할 수 있다.
참고 자료
https://woo-chang.tistory.com/85
https://medium.com/@apfhdznzl/flow%EC%99%80-channel-cold-stream%EA%B3%BC-hot-stream-c42c64cf4996
https://www.youtube.com/watch?v=NCLmN26zki0
https://yang-droid.tistory.com/59
'안드로이드 > 안드로이드 지식' 카테고리의 다른 글
Kotlin Flow란? (0) | 2025.01.24 |
---|---|
Compose Structure 정리하기 (compiler, Runtime, UI) (0) | 2025.01.22 |
Compose Stability 자세히 알아보기! (0) | 2025.01.21 |
Compose Stability란? (0) | 2025.01.20 |
안드로이드 MVVM 아키텍쳐란? (0) | 2025.01.13 |