Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add MaybeK wrapper for RxJava Maybe #862

Merged
merged 10 commits into from
May 27, 2018
22 changes: 11 additions & 11 deletions infographic/arrow-infographic.txt
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,7 @@
[<typeclasses>Each]<-[<instances>Each Instances|ListKEachInstance|OptionEachInstance|EitherEachInstance|TryEachInstance|SequenceKEachInstance|NonEmptyListEachInstance|MapKEachInstance]
[<typeclasses>Monoid]<-[<instances>Monoid Instances|OptionMonoidInstance|ConstMonoidInstance|Tuple2MonoidInstance|ListKMonoidInstance|SequenceKMonoidInstance|MapKMonoidInstance|SetKMonoidInstance|SortedMapKMonoidInstance|IOMonoidInstance]
[<typeclasses>Semigroup]<-[<instances>Semigroup Instances|ConstSemigroupInstance|OptionSemigroupInstance|SetKSemigroupInstance|NonEmptyListSemigroupInstance|SequenceKSemigroupInstance|ListKSemigroupInstance|MapKSemigroupInstance|SortedMapKSemigroupInstance|IOMonoidInstance|IOSemigroupInstance]
[<typeclasses>Traverse]<-[<instances>Traverse Instances|OptionTraverseInstance|ConstTraverseInstance|TryTraverseInstance|IdTraverseInstance|Tuple2TraverseInstance|EitherTraverseInstance|ListKTraverseInstance|ValidatedTraverseInstance|OptionTTraverseInstance|MapKTraverseInstance|NonEmptyListTraverseInstance|CoproductTraverseInstance|SequenceKTraverseInstance|SortedMapKTraverseInstance|IorTraverseInstance|FlowableKTraverseInstance|ObservableKTraverseInstance]
[<typeclasses>Foldable]<-[<instances>Foldable Instances|IdFoldableInstance|EitherFoldableInstance|Tuple2FoldableInstance|OptionFoldableInstance|TryFoldableInstance|ConstFoldableInstance|SortedMapKFoldableInstance|SequenceKFoldableInstance|NonEmptyListFoldableInstance|CoproductFoldableInstance|IorFoldableInstance|SetKFoldableInstance|OptionTFoldableInstance|ListKFoldableInstance|ValidatedFoldableInstance|MapKFoldableInstance|ObservableKFoldableInstance|FlowableKFoldableInstance]
[<typeclasses>MonadDefer]<-[<instances>MonadDefer Instances|IOMonadDeferInstance|DeferredKMonadDeferInstance|SingleKMonadDeferInstance|FlowableKMonadDeferInstance|ObservableKMonadDeferInstance]
[<typeclasses>Async]<-[<instances>Async Instances|IOAsyncInstance|DeferredKAsyncInstance|FlowableKAsyncInstance|ObservableKAsyncInstance|SingleKAsyncInstance]
[<typeclasses>Effect]<-[<instances>Effect Instances|IOEffectInstance|DeferredKEffectInstance|ObservableKEffectInstance|FlowableKEffectInstance|SingleKEffectInstance]
[<typeclasses>ApplicativeError]<-[<instances>ApplicativeError Instances|EitherApplicativeErrorInstance|TryApplicativeErrorInstance|OptionApplicativeErrorInstance|ValidatedApplicativeErrorInstance|StateTApplicativeErrorInstance|KleisliApplicativeErrorInstance|IOApplicativeErrorInstance|DeferredKApplicativeErrorInstance|ObservableKApplicativeErrorInstance|SingleKApplicativeErrorInstance|FlowableKApplicativeErrorInstance]
[<typeclasses>MonadError]<-[<instances>MonadError Instances|EitherMonadErrorInstance|TryMonadErrorInstance|OptionMonadErrorInstance|KleisliMonadErrorInstance|StateTMonadErrorInstance|IOMonadErrorInstance|DeferredKMonadErrorInstance|SingleKMonadErrorInstance|FlowableKMonadErrorInstance|ObservableKMonadErrorInstance]
[<typeclasses>Comonad]<-[<instances>Comonad Instances|Tuple2ComonadInstance|Function0ComonadInstance|IdComonadInstance|EvalComonadInstance|NonEmptyListComonadInstance|CoproductComonadInstance|CofreeComonadInstance]
[<typeclasses>Functor]<-[<instances>Functor Instances|Function1FunctorInstance|Tuple2FunctorInstance|ConstFunctorInstance|Function0FunctorInstance|EitherFunctorInstance|IdFunctorInstance|OptionFunctorInstance|EvalFunctorInstance|TryFunctorInstance|NonEmptyListFunctorInstance|IorFunctorInstance|CoproductFunctorInstance|KleisliFunctorInstance|SequenceKFunctorInstance|MapKFunctorInstance|StateTFunctorInstance|SortedMapKFunctorInstance|WriterTFunctorInstance|ListKFunctorInstance|OptionTFunctorInstance|ValidatedFunctorInstance|IOFunctorInstance|CofreeFunctorInstance|FreeApplicativeFunctorInstance|CoyonedaFunctorInstance|FreeFunctorInstance|YonedaFunctorInstance|DeferredKFunctorInstance|SingleKFunctorInstance|FlowableKFunctorInstance|ObservableKFunctorInstance]
[<typeclasses>Applicative]<-[<instances>Applicative Instances|Tuple2ApplicativeInstance|Function1ApplicativeInstance|EvalApplicativeInstance|EitherApplicativeInstance|OptionApplicativeInstance|IdApplicativeInstance|TryApplicativeInstance|ConstApplicativeInstance|Function0ApplicativeInstance|IorApplicativeInstance|StateTApplicativeInstance|OptionTApplicativeInstance|ListKApplicativeInstance|WriterTApplicativeInstance|SequenceKApplicativeInstance|NonEmptyListApplicativeInstance|ValidatedApplicativeInstance|KleisliApplicativeInstance|IOApplicativeInstance|FreeApplicativeInstance|FreeApplicativeApplicativeInstance|DeferredKApplicativeInstance|SingleKApplicativeInstance|FlowableKApplicativeInstance|ObservableKApplicativeInstance]
[<typeclasses>Monad]<-[<instances>Monad Instances|TryMonadInstance|IdMonadInstance|OptionMonadInstance|EvalMonadInstance|EitherMonadInstance|Tuple2MonadInstance|Function0MonadInstance|Function1MonadInstance|KleisliMonadInstance|OptionTMonadInstance|WriterTMonadInstance|ListKMonadInstance|StateTMonadInstance|IorMonadInstance|NonEmptyListMonadInstance|SequenceKMonadInstance|IOMonadInstance|FreeMonadInstance|DeferredKMonadInstance|ObservableKMonadInstance|FlowableKMonadInstance|SingleKMonadInstance]
[<typeclasses>MonadState]<-[<instances>MonadState Instances|StateTMonadStateInstance]
[<typeclasses>MonadReader]<-[<instances>MonadReader Instances|KleisliMonadReaderInstance|Function1MonadReaderInstance]
[<typeclasses>MonadCombine]<-[<instances>MonadCombine Instances|StateTMonadCombineInstance|ListKMonadCombineInstance]
Expand All @@ -66,4 +56,14 @@
[<typeclasses>MonadWriter]<-[<instances>MonadWriter Instances|WriterTMonadWriterInstance]
[<typeclasses>Recursive]<-[<instances>Recursive Instances|MuRecursiveInstance|NuRecursiveInstance|FixRecursiveInstance]
[<typeclasses>Birecursive]<-[<instances>Birecursive Instances|FixBirecursiveInstance|NuBirecursiveInstance|MuBirecursiveInstance]
[<typeclasses>Corecursive]<-[<instances>Corecursive Instances|FixCorecursiveInstance|NuCorecursiveInstance|MuCorecursiveInstance]
[<typeclasses>Corecursive]<-[<instances>Corecursive Instances|FixCorecursiveInstance|NuCorecursiveInstance|MuCorecursiveInstance]
[<typeclasses>Traverse]<-[<instances>Traverse Instances|OptionTraverseInstance|ConstTraverseInstance|TryTraverseInstance|IdTraverseInstance|Tuple2TraverseInstance|EitherTraverseInstance|ListKTraverseInstance|ValidatedTraverseInstance|OptionTTraverseInstance|MapKTraverseInstance|NonEmptyListTraverseInstance|CoproductTraverseInstance|SequenceKTraverseInstance|SortedMapKTraverseInstance|IorTraverseInstance|FlowableKTraverseInstance|ObservableKTraverseInstance]
[<typeclasses>Foldable]<-[<instances>Foldable Instances|IdFoldableInstance|EitherFoldableInstance|Tuple2FoldableInstance|OptionFoldableInstance|TryFoldableInstance|ConstFoldableInstance|SortedMapKFoldableInstance|SequenceKFoldableInstance|NonEmptyListFoldableInstance|CoproductFoldableInstance|IorFoldableInstance|SetKFoldableInstance|OptionTFoldableInstance|ListKFoldableInstance|ValidatedFoldableInstance|MapKFoldableInstance|ObservableKFoldableInstance|FlowableKFoldableInstance|MaybeKFoldableInstance]
[<typeclasses>MonadDefer]<-[<instances>MonadDefer Instances|IOMonadDeferInstance|DeferredKMonadDeferInstance|SingleKMonadDeferInstance|FlowableKMonadDeferInstance|ObservableKMonadDeferInstance|MaybeKMonadDeferInstance]
[<typeclasses>Async]<-[<instances>Async Instances|IOAsyncInstance|DeferredKAsyncInstance|FlowableKAsyncInstance|ObservableKAsyncInstance|SingleKAsyncInstance|MaybeKAsyncInstance]
[<typeclasses>Effect]<-[<instances>Effect Instances|IOEffectInstance|DeferredKEffectInstance|ObservableKEffectInstance|FlowableKEffectInstance|SingleKEffectInstance|MaybeKEffectInstance]
[<typeclasses>ApplicativeError]<-[<instances>ApplicativeError Instances|EitherApplicativeErrorInstance|TryApplicativeErrorInstance|OptionApplicativeErrorInstance|ValidatedApplicativeErrorInstance|StateTApplicativeErrorInstance|KleisliApplicativeErrorInstance|IOApplicativeErrorInstance|DeferredKApplicativeErrorInstance|ObservableKApplicativeErrorInstance|SingleKApplicativeErrorInstance|FlowableKApplicativeErrorInstance|MaybeKApplicativeErrorInstance]
[<typeclasses>MonadError]<-[<instances>MonadError Instances|EitherMonadErrorInstance|TryMonadErrorInstance|OptionMonadErrorInstance|KleisliMonadErrorInstance|StateTMonadErrorInstance|IOMonadErrorInstance|DeferredKMonadErrorInstance|SingleKMonadErrorInstance|FlowableKMonadErrorInstance|ObservableKMonadErrorInstance|MaybeKMonadErrorInstance]
[<typeclasses>Functor]<-[<instances>Functor Instances|Function1FunctorInstance|Tuple2FunctorInstance|ConstFunctorInstance|Function0FunctorInstance|EitherFunctorInstance|IdFunctorInstance|OptionFunctorInstance|EvalFunctorInstance|TryFunctorInstance|NonEmptyListFunctorInstance|IorFunctorInstance|CoproductFunctorInstance|KleisliFunctorInstance|SequenceKFunctorInstance|MapKFunctorInstance|StateTFunctorInstance|SortedMapKFunctorInstance|WriterTFunctorInstance|ListKFunctorInstance|OptionTFunctorInstance|ValidatedFunctorInstance|IOFunctorInstance|CofreeFunctorInstance|FreeApplicativeFunctorInstance|CoyonedaFunctorInstance|FreeFunctorInstance|YonedaFunctorInstance|DeferredKFunctorInstance|SingleKFunctorInstance|FlowableKFunctorInstance|ObservableKFunctorInstance|MaybeKFunctorInstance]
[<typeclasses>Applicative]<-[<instances>Applicative Instances|Tuple2ApplicativeInstance|Function1ApplicativeInstance|EvalApplicativeInstance|EitherApplicativeInstance|OptionApplicativeInstance|IdApplicativeInstance|TryApplicativeInstance|ConstApplicativeInstance|Function0ApplicativeInstance|IorApplicativeInstance|StateTApplicativeInstance|OptionTApplicativeInstance|ListKApplicativeInstance|WriterTApplicativeInstance|SequenceKApplicativeInstance|NonEmptyListApplicativeInstance|ValidatedApplicativeInstance|KleisliApplicativeInstance|IOApplicativeInstance|FreeApplicativeInstance|FreeApplicativeApplicativeInstance|DeferredKApplicativeInstance|SingleKApplicativeInstance|FlowableKApplicativeInstance|ObservableKApplicativeInstance|MaybeKApplicativeInstance]
[<typeclasses>Monad]<-[<instances>Monad Instances|TryMonadInstance|IdMonadInstance|OptionMonadInstance|EvalMonadInstance|EitherMonadInstance|Tuple2MonadInstance|Function0MonadInstance|Function1MonadInstance|KleisliMonadInstance|OptionTMonadInstance|WriterTMonadInstance|ListKMonadInstance|StateTMonadInstance|IorMonadInstance|NonEmptyListMonadInstance|SequenceKMonadInstance|IOMonadInstance|FreeMonadInstance|DeferredKMonadInstance|ObservableKMonadInstance|FlowableKMonadInstance|SingleKMonadInstance|MaybeKMonadInstance]
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package arrow.effects

import arrow.core.*
import arrow.effects.typeclasses.Proc
import arrow.higherkind
import io.reactivex.Maybe
import io.reactivex.MaybeEmitter

fun <A> Maybe<A>.k(): MaybeK<A> = MaybeK(this)

fun <A> MaybeKOf<A>.value(): Maybe<A> = this.fix().maybe

@higherkind
data class MaybeK<A>(val maybe: Maybe<A>) : MaybeKOf<A>, MaybeKKindedJ<A> {

fun <B> map(f: (A) -> B): MaybeK<B> =
maybe.map(f).k()

fun <B> ap(fa: MaybeKOf<(A) -> B>): MaybeK<B> =
flatMap { a -> fa.fix().map { ff -> ff(a) } }

fun <B> flatMap(f: (A) -> MaybeKOf<B>): MaybeK<B> =
maybe.flatMap { f(it).fix().maybe }.k()

fun <C> fold(ifEmpty: () -> C, ifSome: (A) -> C): C {
return if (this.maybe.isEmpty.blockingGet()) {
ifEmpty()
} else {
ifSome(this.maybe.blockingGet())
}
}

fun <B> foldLeft(b: B, f: (B, A) -> B): B =
when (maybe) {
maybe.isEmpty -> b
else -> f(b, maybe.blockingGet())
}

fun <B> foldRight(lb: Eval<B>, f: (A, Eval<B>) -> Eval<B>): Eval<B> =
when (maybe) {
maybe.isEmpty -> lb
else -> f(maybe.blockingGet(), lb)
}

fun isEmpty(): Boolean = maybe.isEmpty.blockingGet()

fun nonEmpty(): Boolean = !isEmpty()

fun exists(predicate: Predicate<A>): Boolean = fold({ false }, { a -> predicate(a) })

fun forall(p: Predicate<A>): Boolean = fold({ true }, p)

fun handleErrorWith(function: (Throwable) -> MaybeK<A>): MaybeK<A> =
maybe.onErrorResumeNext { t: Throwable -> function(t).maybe }.k()

fun runAsync(cb: (Either<Throwable, A>) -> MaybeKOf<Unit>): MaybeK<Unit> =
maybe.flatMap { cb(Right(it)).value() }.onErrorResumeNext(io.reactivex.functions.Function { cb(Left(it)).value() }).k()

companion object {
fun <A> just(a: A): MaybeK<A> =
Maybe.just(a).k()

fun <A> raiseError(t: Throwable): MaybeK<A> =
Maybe.error<A>(t).k()

operator fun <A> invoke(fa: () -> A): MaybeK<A> =
defer { just(fa()) }

fun <A> defer(fa: () -> MaybeKOf<A>): MaybeK<A> =
Maybe.defer { fa().value() }.k()

fun <A> async(fa: Proc<A>): MaybeK<A> =
Maybe.create({ emitter: MaybeEmitter<A> ->
fa { either: Either<Throwable, A> ->
either.fold({
emitter.onError(it)
}, {
emitter.onSuccess(it)
})

}
}).k()

tailrec fun <A, B> tailRecM(a: A, f: (A) -> MaybeKOf<Either<A, B>>): MaybeK<B> {
val either = f(a).fix().value().blockingGet()
return when (either) {
is Either.Left -> tailRecM(either.a, f)
is Either.Right -> Maybe.just(either.b).k()
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
package arrow.effects

import arrow.Kind
import arrow.core.Either
import arrow.core.Eval
import arrow.effects.typeclasses.Async
import arrow.effects.typeclasses.Effect
import arrow.effects.typeclasses.MonadDefer
import arrow.effects.typeclasses.Proc
import arrow.instance
import arrow.typeclasses.*

@instance(MaybeK::class)
interface MaybeKFunctorInstance : Functor<ForMaybeK> {
override fun <A, B> Kind<ForMaybeK, A>.map(f: (A) -> B): MaybeK<B> =
fix().map(f)
}

@instance(MaybeK::class)
interface MaybeKApplicativeInstance : Applicative<ForMaybeK> {
override fun <A, B> MaybeKOf<A>.ap(ff: MaybeKOf<(A) -> B>): MaybeK<B> =
fix().ap(ff)

override fun <A, B> Kind<ForMaybeK, A>.map(f: (A) -> B): MaybeK<B> =
fix().map(f)

override fun <A> just(a: A): MaybeK<A> =
MaybeK.just(a)
}

@instance(MaybeK::class)
interface MaybeKMonadInstance : Monad<ForMaybeK> {
override fun <A, B> MaybeKOf<A>.ap(ff: MaybeKOf<(A) -> B>): MaybeK<B> =
fix().ap(ff)

override fun <A, B> MaybeKOf<A>.flatMap(f: (A) -> Kind<ForMaybeK, B>): MaybeK<B> =
fix().flatMap(f)

override fun <A, B> MaybeKOf<A>.map(f: (A) -> B): MaybeK<B> =
fix().map(f)

override fun <A, B> tailRecM(a: A, f: kotlin.Function1<A, MaybeKOf<arrow.core.Either<A, B>>>): MaybeK<B> =
MaybeK.tailRecM(a, f)

override fun <A> just(a: A): MaybeK<A> =
MaybeK.just(a)
}

@instance(MaybeK::class)
interface MaybeKFoldableInstance : Foldable<ForMaybeK> {

override fun <A, B> Kind<ForMaybeK, A>.foldLeft(b: B, f: (B, A) -> B): B =
fix().foldLeft(b, f)

override fun <A, B> Kind<ForMaybeK, A>.foldRight(lb: Eval<B>, f: (A, Eval<B>) -> Eval<B>): Eval<B> =
fix().foldRight(lb, f)

override fun <A> Kind<ForMaybeK, A>.isEmpty(): Boolean =
fix().isEmpty()

override fun <A> Kind<ForMaybeK, A>.exists(p: (A) -> Boolean): Boolean =
fix().exists(p)

override fun <A> MaybeKOf<A>.forAll(p: (A) -> Boolean): Boolean =
fix().forall(p)

override fun <A> Kind<ForMaybeK, A>.nonEmpty(): Boolean =
fix().nonEmpty()
}

@instance(MaybeK::class)
interface MaybeKApplicativeErrorInstance :
MaybeKApplicativeInstance,
ApplicativeError<ForMaybeK, Throwable> {
override fun <A> raiseError(e: Throwable): MaybeK<A> =
MaybeK.raiseError(e)

override fun <A> MaybeKOf<A>.handleErrorWith(f: (Throwable) -> MaybeKOf<A>): MaybeK<A> =
fix().handleErrorWith { f(it).fix() }
}

@instance(MaybeK::class)
interface MaybeKMonadErrorInstance :
MaybeKMonadInstance,
MonadError<ForMaybeK, Throwable> {
override fun <A> raiseError(e: Throwable): MaybeK<A> =
MaybeK.raiseError(e)

override fun <A> MaybeKOf<A>.handleErrorWith(f: (Throwable) -> MaybeKOf<A>): MaybeK<A> =
fix().handleErrorWith { f(it).fix() }
}

@instance(MaybeK::class)
interface MaybeKMonadDeferInstance :
MaybeKMonadErrorInstance,
MonadDefer<ForMaybeK> {
override fun <A> defer(fa: () -> MaybeKOf<A>): MaybeK<A> =
MaybeK.defer(fa)
}

@instance(MaybeK::class)
interface MaybeKAsyncInstance :
MaybeKMonadDeferInstance,
Async<ForMaybeK> {
override fun <A> async(fa: Proc<A>): MaybeK<A> =
MaybeK.async(fa)
}

@instance(MaybeK::class)
interface MaybeKEffectInstance :
MaybeKAsyncInstance,
Effect<ForMaybeK> {
override fun <A> MaybeKOf<A>.runAsync(cb: (Either<Throwable, A>) -> MaybeKOf<Unit>): MaybeK<Unit> =
fix().runAsync(cb)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package arrow.effects

import arrow.test.UnitSpec
import arrow.test.laws.*
import arrow.typeclasses.Eq
import arrow.typeclasses.bindingCatch
import io.kotlintest.KTestJUnitRunner
import io.kotlintest.matchers.shouldNotBe
import io.reactivex.Maybe
import io.reactivex.observers.TestObserver
import io.reactivex.schedulers.Schedulers
import org.junit.runner.RunWith
import java.util.concurrent.TimeUnit

@RunWith(KTestJUnitRunner::class)
class MaybeKTests : UnitSpec() {

fun <T> EQ(): Eq<MaybeKOf<T>> = object : Eq<MaybeKOf<T>> {
override fun MaybeKOf<T>.eqv(b: MaybeKOf<T>): Boolean =
try {
this.value().blockingGet() == b.value().blockingGet()
} catch (throwable: Throwable) {
val errA = try {
this.value().blockingGet()
throw IllegalArgumentException()
} catch (err: Throwable) {
err
}
val errB = try {
b.value().blockingGet()
throw IllegalStateException()
} catch (err: Throwable) {
err
}
errA == errB
}

}

init {
testLaws(
FunctorLaws.laws(MaybeK.functor(), { MaybeK.just(it) }, EQ()),
ApplicativeLaws.laws(MaybeK.applicative(), EQ()),
MonadLaws.laws(MaybeK.monad(), EQ()),
FoldableLaws.laws(MaybeK.foldable(), { MaybeK.just(it) }, Eq.any()),
MonadErrorLaws.laws(MaybeK.monadError(), EQ(), EQ(), EQ()),
ApplicativeErrorLaws.laws(MaybeK.applicativeError(), EQ(), EQ(), EQ()),
MonadSuspendLaws.laws(MaybeK.monadDefer(), EQ(), EQ(), EQ()),
AsyncLaws.laws(MaybeK.async(), EQ(), EQ(), EQ()),
AsyncLaws.laws(MaybeK.effect(), EQ(), EQ(), EQ())
)

"Multi-thread Maybes finish correctly" {
val value: Maybe<Long> = MaybeK.monadError().bindingCatch {
val a = Maybe.timer(2, TimeUnit.SECONDS).k().bind()
a
}.value()

val test: TestObserver<Long> = value.test()
test.awaitDone(5, TimeUnit.SECONDS)
test.assertTerminated().assertComplete().assertNoErrors().assertValue(0)
}

"Multi-thread Maybes should run on their required threads" {
val originalThread: Thread = Thread.currentThread()
var threadRef: Thread? = null

val value: Maybe<Long> = MaybeK.monadError().bindingCatch {
val a = Maybe.timer(2, TimeUnit.SECONDS, Schedulers.newThread()).k().bind()
threadRef = Thread.currentThread()
val b = Maybe.just(a).observeOn(Schedulers.newThread()).k().bind()
b
}.value()

val test: TestObserver<Long> = value.test()
val lastThread: Thread = test.awaitDone(5, TimeUnit.SECONDS).lastThread()
val nextThread = (threadRef?.name ?: "")

nextThread shouldNotBe originalThread.name
lastThread.name shouldNotBe originalThread.name
lastThread.name shouldNotBe nextThread
}

"Maybe dispose forces binding to cancel without completing too" {
val value: Maybe<Long> = MaybeK.monadError().bindingCatch {
val a = Maybe.timer(3, TimeUnit.SECONDS).k().bind()
a
}.value()

val test: TestObserver<Long> = value.doOnSubscribe { subscription ->
Maybe.timer(1, TimeUnit.SECONDS).subscribe { a ->
subscription.dispose()
}
}.test()

test.awaitTerminalEvent(5, TimeUnit.SECONDS)
test.assertNotTerminated().assertNotComplete().assertNoErrors().assertNoValues()
}
}
}