Skip to content

Commit

Permalink
Fix serial execution start and reset. (cmu-db#164)
Browse files Browse the repository at this point in the history
  • Loading branch information
anjagruenheid authored May 21, 2022
1 parent 29936f3 commit 23498ac
Showing 1 changed file with 26 additions and 15 deletions.
41 changes: 26 additions & 15 deletions src/main/java/com/oltpbenchmark/ThreadBench.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*
*/


package com.oltpbenchmark;

import com.oltpbenchmark.LatencyRecord.Sample;
Expand All @@ -33,23 +32,24 @@
public class ThreadBench implements Thread.UncaughtExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(ThreadBench.class);


private final BenchmarkState testState;
private final List<? extends Worker<? extends BenchmarkModule>> workers;
private final ArrayList<Thread> workerThreads;
private final List<WorkloadConfiguration> workConfs;
private final ArrayList<LatencyRecord.Sample> samples = new ArrayList<>();
private final int intervalMonitor;

private ThreadBench(List<? extends Worker<? extends BenchmarkModule>> workers, List<WorkloadConfiguration> workConfs, int intervalMonitoring) {
private ThreadBench(List<? extends Worker<? extends BenchmarkModule>> workers,
List<WorkloadConfiguration> workConfs, int intervalMonitoring) {
this.workers = workers;
this.workConfs = workConfs;
this.workerThreads = new ArrayList<>(workers.size());
this.intervalMonitor = intervalMonitoring;
this.testState = new BenchmarkState(workers.size() + 1);
}

public static Results runRateLimitedBenchmark(List<Worker<? extends BenchmarkModule>> workers, List<WorkloadConfiguration> workConfs, int intervalMonitoring) {
public static Results runRateLimitedBenchmark(List<Worker<? extends BenchmarkModule>> workers,
List<WorkloadConfiguration> workConfs, int intervalMonitoring) {
ThreadBench bench = new ThreadBench(workers, workConfs, intervalMonitoring);
return bench.runRateLimitedMultiPhase();
}
Expand Down Expand Up @@ -110,7 +110,6 @@ private Results runRateLimitedMultiPhase() {
}

this.createWorkerThreads();
testState.blockForStart();

// long measureStart = start;

Expand All @@ -132,6 +131,15 @@ private Results runRateLimitedMultiPhase() {
}
}

// Change testState to cold query if execution is serial, since we don't
// have a warm-up phase for serial execution but execute a cold and a
// measured query in sequence.
if (phase != null && phase.isLatencyRun()) {
synchronized (testState) {
testState.startColdQuery();
}
}

long intervalNs = getInterval(lowestRate, phase.getArrival());

long nextInterval = start + intervalNs;
Expand All @@ -148,6 +156,9 @@ private Results runRateLimitedMultiPhase() {
new MonitorThread(this.intervalMonitor).start();
}

// Allow workers to start work.
testState.blockForStart();

// Main Loop
while (true) {
// posting new work... and resetting the queue in case we have new
Expand Down Expand Up @@ -181,15 +192,13 @@ private Results runRateLimitedMultiPhase() {
diff = nextInterval - now;
}


boolean phaseComplete = false;
if (phase != null) {
if (phase.isLatencyRun())
// Latency runs (serial run through each query) have their own
// state to mark completion
{
phaseComplete = testState.getState()
== State.LATENCY_COMPLETE;
phaseComplete = testState.getState() == State.LATENCY_COMPLETE;
} else {
phaseComplete = testState.getState() == State.MEASURE
&& (start + delta <= now);
Expand Down Expand Up @@ -221,7 +230,11 @@ private Results runRateLimitedMultiPhase() {
measureEnd = now;
LOG.info("{} :: Waiting for all terminals to finish ..", StringUtil.bold("TERMINATE"));
} else if (phase != null) {
phase.resetSerial();
// Reset serial execution parameters.
if (phase.isLatencyRun()) {
phase.resetSerial();
testState.startColdQuery();
}
LOG.info(phase.currentPhaseString());
if (phase.getRate() < lowestRate) {
lowestRate = phase.getRate();
Expand All @@ -248,8 +261,7 @@ private Results runRateLimitedMultiPhase() {
do {
intervalNs += getInterval(lowestRate, phase.getArrival());
nextToAdd++;
}
while ((-diff) > intervalNs && !lastEntry);
} while ((-diff) > intervalNs && !lastEntry);
nextInterval += intervalNs;
}

Expand Down Expand Up @@ -344,7 +356,6 @@ private long getInterval(int lowestRate, Phase.Arrival arrival) {
@Override
public void uncaughtException(Thread t, Throwable e) {


// HERE WE HANDLE THE CASE IN WHICH ONE OF OUR WOKERTHREADS DIED
LOG.error(e.getMessage(), e);
System.exit(-1);
Expand All @@ -367,7 +378,7 @@ public static final class TimeBucketIterable implements Iterable<DistributionSta
/**
* @param samples
* @param windowSizeSeconds
* @param transactionType Allows to filter transactions by type
* @param transactionType Allows to filter transactions by type
*/
public TimeBucketIterable(Iterable<Sample> samples, int windowSizeSeconds, TransactionType transactionType) {
this.samples = samples;
Expand Down Expand Up @@ -396,7 +407,8 @@ private static final class TimeBucketIterator implements Iterator<DistributionSt
* @param windowSizeSeconds
* @param txType Allows to filter transactions by type
*/
public TimeBucketIterator(Iterator<LatencyRecord.Sample> samples, int windowSizeSeconds, TransactionType txType) {
public TimeBucketIterator(Iterator<LatencyRecord.Sample> samples, int windowSizeSeconds,
TransactionType txType) {
this.samples = samples;
this.windowSizeSeconds = windowSizeSeconds;
this.txType = txType;
Expand All @@ -413,7 +425,6 @@ public TimeBucketIterator(Iterator<LatencyRecord.Sample> samples, int windowSize

private void calculateNext() {


// Collect all samples in the time window
ArrayList<Integer> latencies = new ArrayList<>();
long endNanoseconds = nextStartNanosecond + (windowSizeSeconds * 1000000000L);
Expand Down

0 comments on commit 23498ac

Please sign in to comment.