import com.basebeta.utility.logging.kprint import kotlinx.atomicfu.atomic import kotlinx.coroutines.* import kotlinx.coroutines.channels.BroadcastChannel import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.* /** * Only can use with coroutines 1.3.6-native-mt. See alternative implementation at the bottom for 1.3.5 and under. * * More idiomatic implementation that can land once next native-mt version is published to maven * Relevant fix is here https://github.com/Kotlin/kotlinx.coroutines/commit/87b36106f8b0c649cd435dd02143d9899ed5c75c * See last comment in https://github.com/Kotlin/kotlinx.coroutines/issues/1831 for more on the issue */ class StateMachine( val scope: CoroutineScope, private val initialState: T, private val sideEffects: List<(Flow, () -> T) -> Flow>, private val reducer: suspend (accumulator: T, value: R) -> T ) { private val _viewState: ConflatedBroadcastChannel = ConflatedBroadcastChannel(initialState) val viewState = _viewState as Flow private val inputActions: Channel = Channel() init { scope.launch { val lastState = atomic(initialState) //use atomicfu for atomics val multicaster = inputActions.multicast(scope) val flowList = sideEffects.map { sideEffect -> sideEffect(multicaster.asFlow(), { lastState.value }) }.run { toMutableList().apply { add(multicaster.asFlow()) } } flowList.merge().onEach { kprint("result $it") } .onCompletion { inputActions.cancel() } .scan(lastState.value, reducer) .distinctUntilChanged() .collect { outputState -> lastState.value = outputState _viewState.send(outputState) } } } fun dispatchAction(action: R) = scope.launch { kprint("Received input action: $action") inputActions.send(action) } } fun Channel.multicast(scope: CoroutineScope): BroadcastChannel { val channel = this return scope.broadcast { for (x in channel) { send(x) } }.also { it.invokeOnClose { channel.cancel() } } } /* * Stable implementation for coroutines native-mt-1.3.5 and under */ class StateMachine( val scope: CoroutineScope, private val initialState: T, private val sideEffects: List<(Flow, () -> T) -> Flow>, private val reducer: suspend (accumulator: T, value: R) -> T ) { val _viewState: ConflatedBroadcastChannel = ConflatedBroadcastChannel(initialState) val viewState: Flow = _viewState as Flow private var isInitialized = atomic(false) private val inputActions: BroadcastChannel = BroadcastChannel(Channel.BUFFERED) init { scope.launch { val lastState = StateWrapper(initialState) val flowList = sideEffects.map { sideEffect -> sideEffect(inputActions.asFlow(), { lastState.state }) }.run { toMutableList().apply { add(inputActions.asFlow()) } } flowList.onBindMerge { isInitialized.value = true } .onEach { kprint("result: $it") } .onCompletion { inputActions.cancel() } .scan(lastState.state, reducer) .distinctUntilChanged() .collect { outputState -> kprint("state emitted: $outputState") lastState.state = outputState viewState.send(outputState) } } } fun dispatchAction(action: R) = scope.launch { kprint("Received input action $action") while (!isInitialized.value) { yield() } inputActions.send(action) } } /** * Ensures that every down stream flow is bound to the upstream */ fun List>.onBindMerge(onBind: () -> Unit): Flow { var boundFlows = atomic(0) return channelFlow { forEach { flow -> launch { flow.onStart { if (boundFlows.incrementAndGet() == size) onBind() }.collect { send(it) } } } } }