Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Program using interval and take does not terminate #388

Closed
samuelgruetter opened this issue Sep 17, 2013 · 3 comments
Closed

Program using interval and take does not terminate #388

samuelgruetter opened this issue Sep 17, 2013 · 3 comments

Comments

@samuelgruetter
Copy link
Contributor

When I run the following snippet:

static Action1<Long> onNextFunc(final String who) {
    return new Action1<Long>() {
        public void call(Long x) {
            System.out.println(who + " got " + x);
        }            
    };
}
static Action1<Throwable> onErrorFunc(final String who) {
    return new Action1<Throwable>() {
        public void call(Throwable t) {
            t.printStackTrace();
        }            
    };
}    
static Action0 onCompleteFunc(final String who) {
    return new Action0() {
        public void call() {
            System.out.println(who + " complete");
        }            
    };
}
public static void main(String[] args) {
    Observable<Long> oneNumberPerSecond = Observable.interval(1, TimeUnit.SECONDS).take(5);
    oneNumberPerSecond.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"), onCompleteFunc("subscriber 1"));
}

then I get (as expected) the following output:

subscriber 1 got 0
subscriber 1 got 1
subscriber 1 got 2
subscriber 1 got 3
subscriber 1 got 4
subscriber 1 complete

However, the program does not terminate, and that's unexpected.

I ran this test using rxjava-core-0.13.2-SNAPSHOT.jar built from commit 00d7c3b (Sat Sep 14 08:37:09 2013 -0700).

Due to this problem, some tests in the RxScalaDemo that I'm working on do not terminate, which is annoying.

@benjchristensen
Copy link
Member

Just replicated it ... taking a look to see if I can find the cause.

@benjchristensen
Copy link
Member

The OperationInterval class was creating a new ExecutorService on every invocation, and that is never shut down, and launches non-daemon threads.

Schedulers.executor(Executors.newSingleThreadScheduledExecutor())

Changing that, the demo code now doesn't even run as the app quits immediately as it's all async (as it should).

I revised the example to use BlockingObservable for the demo (this and unit tests are where I generally use BlockingObservable) and it runs, then exits nicely.

    public static void main(String[] args) {
        Observable<Long> oneNumberPerSecond = Observable.interval(1, TimeUnit.SECONDS).take(5);
        oneNumberPerSecond.toBlockingObservable().forEach(onNextFunc("subscriber 1"));
    }

I'll submit a fix shortly.

@samuelgruetter
Copy link
Contributor Author

Thanks, now it behaves as I'd expect, but the example I'd like to get to work still doesn't because of problems with groupBy #289 (comment) ...

jihoonson pushed a commit to jihoonson/RxJava that referenced this issue Mar 6, 2020
ReactiveX#388)

* refactor circuit breaker aspect to remove API type and add completable future support

* Add Rxjava 2 support to Sprung aspect of circuit breaker

* fix the gradle build

* tmp fix the gradle build dependency issue

* make the circuit breaker aspect smart ! :) to know which logic need to be done based into the runtime situation

* code cleanup and adding needed java doc

* review comments

* make it public for the extension aspects

* review comments and spring config generalization

* review comments

* execption handling properly for the circuit breaker aspects

* javadoc update
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants