Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OperatorAny unsubscribe issue #1935

Closed
myinsiders opened this issue Dec 7, 2014 · 3 comments
Closed

OperatorAny unsubscribe issue #1935

myinsiders opened this issue Dec 7, 2014 · 3 comments

Comments

@myinsiders
Copy link

OperatorAny (eg used in Observable.isEmpty) does an unsubscribe after seeing that there are any items in the Observable it's subscribing to, to avoid needing to generate un-needed items.

Unfortunately it seems to be unsubscribing subscribing Observables as well, which can cause it to break.

Here is an example, this code:

object OperatorAnyIssue extends App {

  def debug(message: String, value: Any): Unit = printf(s"%30s %-14s $value\n", "[" + Thread.currentThread().getName + "]", message)

  def sleepThenGenerateInverse(n: Boolean)() = {
    debug("Generating", !n)
    Thread.sleep(800)
    debug("Sending", !n)
    n
  }

  Observable.from(1 to 5)
    .doOnNext(debug("Generated", _))
    .doOnUnsubscribe(debug("Unsubscribed generator", ""))
    .isEmpty // to fix change this line to: .toSeq.map(_.isEmpty)
    .doOnNext(debug("Is empty", _))
    .flatMap(i => Observable.defer { Observable.just(sleepThenGenerateInverse(i)) }.subscribeOn(IOScheduler()))
    .doOnUnsubscribe(debug("Unsubscribed", ""))
    .subscribe(debug("Received", _))

  Thread.sleep(10000)
}

outputs the following - the sleepThenGenerateInverse function gets cut off half way through:

                        [main] Generated      1
                        [main] Is empty       false
                        [main] Unsubscribed   
   [RxCachedThreadScheduler-1] Generating     true
                        [main] Unsubscribed generator 

Changing the .isEmpty line to .toSeq.map(_.isEmpty) results in the following, which is the expected behaviour:

                        [main] Generated      1
                        [main] Generated      2
                        [main] Generated      3
                        [main] Generated      4
                        [main] Generated      5
                        [main] Is empty       false
   [RxCachedThreadScheduler-1] Generating     true
   [RxCachedThreadScheduler-1] Sending        true
   [RxCachedThreadScheduler-1] Received       true
   [RxCachedThreadScheduler-1] Unsubscribed   
   [RxCachedThreadScheduler-1] Unsubscribed generator 


@akarnokd
Copy link
Member

akarnokd commented Dec 8, 2014

Hi and thanks for the report. If you can, try the fixes in PR #1938 and see if it works for you as well.

@myinsiders
Copy link
Author

With the PR I get:

                        [main] Generated      1
                        [main] Is empty       false
                        [main] Unsubscribed generator 
   [RxCachedThreadScheduler-1] Generating     true
   [RxCachedThreadScheduler-1] Sending        true
   [RxCachedThreadScheduler-1] Received       false
   [RxCachedThreadScheduler-1] Unsubscribed   

Which is correct. It would be great if this can be included in 1.0.3

@akarnokd
Copy link
Member

The fix should be in 1.0.4+. Thanks for reporting.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant