Skip to content

Commit

Permalink
Merge #3529 into 3.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
simonbasle committed Nov 4, 2022
2 parents ddc159a + 9fe3241 commit 2a83001
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 150 deletions.
1 change: 1 addition & 0 deletions reactor-core/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ blockHoundTest {
jcstress {
mode = 'quick' //quick, default, tough
jcstressDependency 'org.openjdk.jcstress:jcstress-core:0.15'
heapPerFork = 512
}

// inherit basic test task + common configuration in root
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import org.openjdk.jcstress.annotations.Actor;
import org.openjdk.jcstress.annotations.Arbiter;
Expand All @@ -29,10 +30,10 @@
import org.openjdk.jcstress.infra.results.IIZ_Result;
import org.openjdk.jcstress.infra.results.Z_Result;

public abstract class SchedulersStressTest {
public abstract class BasicSchedulersStressTest {

private static void restart(Scheduler scheduler) {
scheduler.disposeGracefully().block(Duration.ofMillis(100));
scheduler.disposeGracefully().block(Duration.ofMillis(500));
// TODO: in 3.6.x: remove restart capability and this validation
scheduler.start();
}
Expand Down Expand Up @@ -112,43 +113,14 @@ public void arbiter(Z_Result r) {
}
}

@JCStressTest
@Outcome(id = {"true"}, expect = Expect.ACCEPTABLE, desc = "Task scheduled after racing restart")
@State
public static class BoundedElasticSchedulerStartDisposeStressTest {

private final BoundedElasticScheduler scheduler =
new BoundedElasticScheduler(1, 1, Thread::new, 5);
{
scheduler.init();
}

@Actor
public void restart1() {
restart(scheduler);
}

@Actor
public void restart2() {
restart(scheduler);
}

@Arbiter
public void arbiter(Z_Result r) {
// At this stage, at least one actor called scheduler.start(),
// so we should be able to execute a task.
r.r1 = canScheduleTask(scheduler);
scheduler.dispose();
}
}

@JCStressTest
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
desc = "Scheduler in consistent state upon concurrent dispose and " +
"eventually disposed.")
@State
public static class SingleSchedulerDisposeGracefullyStressTest {

private final CountDownLatch latch = new CountDownLatch(2);
private final SingleScheduler scheduler = new SingleScheduler(Thread::new);

{
Expand All @@ -157,23 +129,34 @@ public static class SingleSchedulerDisposeGracefullyStressTest {

@Actor
public void disposeGracefully1(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void disposeGracefully2(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r2 = scheduler.state.initialResource.hashCode();
}

@Arbiter
public void arbiter(IIZ_Result r) {
try {
latch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Validate both disposals left the Scheduler in consistent state,
// assuming the await process coordinates on the resources as identified
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}

Expand All @@ -184,6 +167,7 @@ public void arbiter(IIZ_Result r) {
@State
public static class ParallelSchedulerDisposeGracefullyStressTest {

private final CountDownLatch latch = new CountDownLatch(2);
private final ParallelScheduler scheduler =
new ParallelScheduler(10, Thread::new);

Expand All @@ -193,59 +177,34 @@ public static class ParallelSchedulerDisposeGracefullyStressTest {

@Actor
public void disposeGracefully1(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void disposeGracefully2(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
r.r2 = scheduler.state.initialResource.hashCode();
}

@Arbiter
public void arbiter(IIZ_Result r) {
// Validate both disposals left the Scheduler in consistent state,
// assuming the await process coordinates on the resources as identified
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
}
}

@JCStressTest
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
desc = "Scheduler in consistent state upon concurrent dispose and " +
"eventually disposed.")
@State
public static class BoundedElasticSchedulerDisposeGracefullyStressTest {

private final BoundedElasticScheduler scheduler =
new BoundedElasticScheduler(4, 4, Thread::new, 5);

{
scheduler.init();
}

@Actor
public void disposeGracefully1(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void disposeGracefully2(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r2 = scheduler.state.initialResource.hashCode();
}

@Arbiter
public void arbiter(IIZ_Result r) {
try {
latch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
// Validate both disposals left the Scheduler in consistent state,
// assuming the await process coordinates on the resources as identified
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}

Expand All @@ -264,8 +223,15 @@ public static class SingleSchedulerDisposeGracefullyAndDisposeStressTest {

@Actor
public void disposeGracefully(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
final CountDownLatch latch = new CountDownLatch(1);
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
try {
latch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Actor
Expand All @@ -281,6 +247,11 @@ public void arbiter(IIZ_Result r) {
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}

Expand All @@ -300,45 +271,15 @@ public static class ParallelSchedulerDisposeGracefullyAndDisposeStressTest {

@Actor
public void disposeGracefully(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
}

@Actor
public void dispose(IIZ_Result r) {
scheduler.dispose();
r.r2 = scheduler.state.initialResource.hashCode();
}

@Arbiter
public void arbiter(IIZ_Result r) {
// Validate both disposals left the Scheduler in consistent state,
// assuming the await process coordinates on the resources as identified
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
}
}

@JCStressTest
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
desc = "Scheduler in consistent state upon concurrent dispose and " +
"disposeGracefully, eventually disposed.")
@State
public static class BoundedElasticSchedulerDisposeGracefullyAndDisposeStressTest {


private final BoundedElasticScheduler scheduler =
new BoundedElasticScheduler(4, 4, Thread::new, 5);

{
scheduler.init();
}

@Actor
public void disposeGracefully(IIZ_Result r) {
scheduler.disposeGracefully().subscribe();
final CountDownLatch latch = new CountDownLatch(1);
scheduler.disposeGracefully().doFinally(sig -> latch.countDown()).subscribe();
r.r1 = scheduler.state.initialResource.hashCode();
try {
latch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

@Actor
Expand All @@ -354,6 +295,11 @@ public void arbiter(IIZ_Result r) {
// by r.r1 and r.r2, which should be equal.
boolean consistentState = r.r1 == r.r2;
r.r3 = consistentState && scheduler.isDisposed();
if (consistentState) {
//when that condition is true, we erase the r1/r2 state. that should greatly limit
//the output of "interesting acceptable state" in the dump should and error occur
r.r1 = r.r2 = 0;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,7 @@ public void start() {
}

@Override
public boolean await(BoundedServices boundedServices, long timeout, TimeUnit timeUnit)
throws InterruptedException {
public boolean await(BoundedServices boundedServices, long timeout, TimeUnit timeUnit) throws InterruptedException {
if (!boundedServices.evictor.awaitTermination(timeout, timeUnit)) {
return false;
}
Expand Down Expand Up @@ -512,7 +511,7 @@ private BoundedServices() {
this.clock = parent.clock;
this.idleQueue = new ConcurrentLinkedDeque<>();
this.busyStates = ALL_IDLE;
this.evictor = Executors.newScheduledThreadPool(1, EVICTOR_FACTORY);
this.evictor = Executors.newSingleThreadScheduledExecutor(EVICTOR_FACTORY);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ public boolean isDisposed() {

@Override
public boolean await(ScheduledExecutorService resource, long timeout, TimeUnit timeUnit)
throws InterruptedException {
throws InterruptedException {
return resource.awaitTermination(timeout, timeUnit);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,15 @@ public void start() {
}
}

@Override
public boolean await(ScheduledExecutorService[] resource, long timeout, TimeUnit timeUnit) throws InterruptedException {
for (ScheduledExecutorService executor : resource) {
if (!executor.awaitTermination(timeout, timeUnit)) {
return false;
}
}
return true;
}
@Override
public boolean await(ScheduledExecutorService[] resource, long timeout, TimeUnit timeUnit) throws InterruptedException {
for (ScheduledExecutorService executor : resource) {
if (!executor.awaitTermination(timeout, timeUnit)) {
return false;
}
}
return true;
}

@Override
public void dispose() {
Expand Down
Loading

0 comments on commit 2a83001

Please sign in to comment.