From 254bd5e44f4a9ea3cb9035cbbc5746ae49c2bd55 Mon Sep 17 00:00:00 2001 From: Anja Gruenheid Date: Fri, 20 May 2022 13:13:40 -0700 Subject: [PATCH] Fix serial execution start and reset. --- .../java/com/oltpbenchmark/ThreadBench.java | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/oltpbenchmark/ThreadBench.java b/src/main/java/com/oltpbenchmark/ThreadBench.java index 6a3b6ca1d..2c143a65d 100644 --- a/src/main/java/com/oltpbenchmark/ThreadBench.java +++ b/src/main/java/com/oltpbenchmark/ThreadBench.java @@ -15,7 +15,6 @@ * */ - package com.oltpbenchmark; import com.oltpbenchmark.LatencyRecord.Sample; @@ -33,7 +32,6 @@ public class ThreadBench implements Thread.UncaughtExceptionHandler { private static final Logger LOG = LoggerFactory.getLogger(ThreadBench.class); - private final BenchmarkState testState; private final List> workers; private final ArrayList workerThreads; @@ -41,7 +39,8 @@ public class ThreadBench implements Thread.UncaughtExceptionHandler { private final ArrayList samples = new ArrayList<>(); private final int intervalMonitor; - private ThreadBench(List> workers, List workConfs, int intervalMonitoring) { + private ThreadBench(List> workers, + List workConfs, int intervalMonitoring) { this.workers = workers; this.workConfs = workConfs; this.workerThreads = new ArrayList<>(workers.size()); @@ -49,7 +48,8 @@ private ThreadBench(List> workers, L this.testState = new BenchmarkState(workers.size() + 1); } - public static Results runRateLimitedBenchmark(List> workers, List workConfs, int intervalMonitoring) { + public static Results runRateLimitedBenchmark(List> workers, + List workConfs, int intervalMonitoring) { ThreadBench bench = new ThreadBench(workers, workConfs, intervalMonitoring); return bench.runRateLimitedMultiPhase(); } @@ -110,7 +110,6 @@ private Results runRateLimitedMultiPhase() { } this.createWorkerThreads(); - testState.blockForStart(); // long measureStart = start; @@ -132,6 +131,15 @@ private Results runRateLimitedMultiPhase() { } } + // Change testState to cold query if execution is serial, since we don't + // have a warm-up phase for serial execution but execute a cold and a + // measured query in sequence. + if (phase != null && phase.isLatencyRun()) { + synchronized (testState) { + testState.startColdQuery(); + } + } + long intervalNs = getInterval(lowestRate, phase.getArrival()); long nextInterval = start + intervalNs; @@ -148,6 +156,9 @@ private Results runRateLimitedMultiPhase() { new MonitorThread(this.intervalMonitor).start(); } + // Allow workers to start work. + testState.blockForStart(); + // Main Loop while (true) { // posting new work... and resetting the queue in case we have new @@ -181,15 +192,13 @@ private Results runRateLimitedMultiPhase() { diff = nextInterval - now; } - boolean phaseComplete = false; if (phase != null) { if (phase.isLatencyRun()) // Latency runs (serial run through each query) have their own // state to mark completion { - phaseComplete = testState.getState() - == State.LATENCY_COMPLETE; + phaseComplete = testState.getState() == State.LATENCY_COMPLETE; } else { phaseComplete = testState.getState() == State.MEASURE && (start + delta <= now); @@ -221,7 +230,11 @@ private Results runRateLimitedMultiPhase() { measureEnd = now; LOG.info("{} :: Waiting for all terminals to finish ..", StringUtil.bold("TERMINATE")); } else if (phase != null) { - phase.resetSerial(); + // Reset serial execution parameters. + if (phase.isLatencyRun()) { + phase.resetSerial(); + testState.startColdQuery(); + } LOG.info(phase.currentPhaseString()); if (phase.getRate() < lowestRate) { lowestRate = phase.getRate(); @@ -248,8 +261,7 @@ private Results runRateLimitedMultiPhase() { do { intervalNs += getInterval(lowestRate, phase.getArrival()); nextToAdd++; - } - while ((-diff) > intervalNs && !lastEntry); + } while ((-diff) > intervalNs && !lastEntry); nextInterval += intervalNs; } @@ -344,7 +356,6 @@ private long getInterval(int lowestRate, Phase.Arrival arrival) { @Override public void uncaughtException(Thread t, Throwable e) { - // HERE WE HANDLE THE CASE IN WHICH ONE OF OUR WOKERTHREADS DIED LOG.error(e.getMessage(), e); System.exit(-1); @@ -367,7 +378,7 @@ public static final class TimeBucketIterable implements Iterable samples, int windowSizeSeconds, TransactionType transactionType) { this.samples = samples; @@ -396,7 +407,8 @@ private static final class TimeBucketIterator implements Iterator samples, int windowSizeSeconds, TransactionType txType) { + public TimeBucketIterator(Iterator samples, int windowSizeSeconds, + TransactionType txType) { this.samples = samples; this.windowSizeSeconds = windowSizeSeconds; this.txType = txType; @@ -413,7 +425,6 @@ public TimeBucketIterator(Iterator samples, int windowSize private void calculateNext() { - // Collect all samples in the time window ArrayList latencies = new ArrayList<>(); long endNanoseconds = nextStartNanosecond + (windowSizeSeconds * 1000000000L);