package com.wizbii.wizbiiandroid.services import rx.Observable import rx.subjects.PublishSubject /** * POC of a reactive data store. */ abstract class ReactiveStore { /** * Local version of the data. */ private var value: T? = null /** * Bus for notifying new versions of the data to all subscribers. */ private var bus = PublishSubject.create() /** * Observable for fetching the remote version of the data. */ protected abstract fun getRemoteData(): Observable /** * Updates the remote data and emits the updated value. */ protected abstract fun updateRemoteData(parameters: Any): Observable /** * Infinite stream of value updates. */ fun values(): Observable { return Observable.create { if (value != null) { it.onNext(value) bus.subscribe(it) } else { bus.subscribe(it) getRemoteData().subscribe( { onNewValue(it) }, { // TODO wrap errors into values without stopping the observable bus.onError(it) bus = PublishSubject.create() } ) } } } private fun onNewValue(it: T) { value = it bus.onNext(it) } fun updateValue(parameters: Any): Observable { return updateRemoteData(parameters).doOnNext { onNewValue(it) } } }