@@ -458,6 +458,15 @@ void dispatch() {
458
458
boolean skipFinal = false ;
459
459
try {
460
460
for (;;) {
461
+ /*
462
+ * We need to read terminalEvent before checking the queue for emptyness because
463
+ * all enqueue happens before setting the terminal event.
464
+ * If it were the other way around, when the emission is paused between
465
+ * checking isEmpty and checking terminalEvent, some other thread might
466
+ * have produced elements and set the terminalEvent and we'd quit emitting
467
+ * prematurely.
468
+ */
469
+ Object term = terminalEvent ;
461
470
/*
462
471
* See if the queue is empty; since we need this information multiple
463
472
* times later on, we read it one.
@@ -468,7 +477,7 @@ void dispatch() {
468
477
// if the queue is empty and the terminal event was received, quit
469
478
// and don't bother restoring emitting to false: no further activity is
470
479
// possible at this point
471
- if (checkTerminated (terminalEvent , empty )) {
480
+ if (checkTerminated (term , empty )) {
472
481
skipFinal = true ;
473
482
return ;
474
483
}
@@ -508,10 +517,11 @@ void dispatch() {
508
517
// it may happen everyone has unsubscribed between here and producers.get()
509
518
// or we have no subscribers at all to begin with
510
519
if (len == unsubscribed ) {
520
+ term = terminalEvent ;
511
521
// so let's consume a value from the queue
512
522
Object v = queue .poll ();
513
523
// or terminate if there was a terminal event and the queue is empty
514
- if (checkTerminated (terminalEvent , v == null )) {
524
+ if (checkTerminated (term , v == null )) {
515
525
skipFinal = true ;
516
526
return ;
517
527
}
@@ -748,4 +758,4 @@ public void unsubscribe() {
748
758
}
749
759
}
750
760
}
751
- }
761
+ }
0 commit comments