class SafeZip { companion object { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) fun zip(single1: Single, single2: Single, zipper: (T, U) -> R): Single { val errorHandler = SingleErrorHandler() return Single.zip( single1.toErrorSafeSingle(errorHandler), single2.toErrorSafeSingle(errorHandler), BiFunction { t, u -> zipper(t, u) }) } private fun Single.toErrorSafeSingle(errorHandler: SingleErrorHandler) = ErrorSafeSingle(this, errorHandler) } } private class SingleErrorHandler { var disposed = false fun setDisposed(error: Throwable): Single { disposed = true return Single.error(error) } } private class ErrorSafeSingle( private val delegate: Single, private val errorHandler: SingleErrorHandler ) : Single() { override fun subscribeActual(observer: SingleObserver) = delegate.onErrorResumeNext { errorHandler.setDisposed(it) } .subscribe(ErrorSafeSingleObserver(observer, errorHandler)) } private class ErrorSafeSingleObserver( private val source: SingleObserver, private val errorHandler: SingleErrorHandler ) : SingleObserver { override fun onSuccess(t: T) = source.onSuccess(t) override fun onSubscribe(d: Disposable) = source.onSubscribe(d) override fun onError(e: Throwable) = synchronized(errorHandler.disposed) { if (!errorHandler.disposed) source.onError(e) } }