Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Select an option

  • Save thomasnield/55a7169cccd7117641b68d149803a89e to your computer and use it in GitHub Desktop.

Select an option

Save thomasnield/55a7169cccd7117641b68d149803a89e to your computer and use it in GitHub Desktop.
RxJava - Replayed Observable Invalidation
import io.reactivex.Observable
import java.time.LocalTime
import java.util.concurrent.TimeUnit
fun main(args: Array<String>) {
val source = Observable.interval(5, TimeUnit.SECONDS)
.startWith(0)
.map {
Observable.fromCallable { LocalTime.now() }
.replay(1).autoConnect()
}.replay(1)
.autoConnect()
.take(1)
.flatMap { it }
//initiate and cache first value
source.subscribe(::println)
//sleep 3 seconds, cache persists
Thread.sleep(3000)
source.subscribe(::println)
//sleep another 3 seconds, cache clears and new value caches
Thread.sleep(3000)
source.subscribe(::println)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment