Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix serial execution start and reset. #164

Merged
merged 1 commit into from
May 21, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 26 additions & 15 deletions src/main/java/com/oltpbenchmark/ThreadBench.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
*
*/


package com.oltpbenchmark;

import com.oltpbenchmark.LatencyRecord.Sample;
Expand All @@ -33,23 +32,24 @@
public class ThreadBench implements Thread.UncaughtExceptionHandler {
private static final Logger LOG = LoggerFactory.getLogger(ThreadBench.class);


private final BenchmarkState testState;
private final List<? extends Worker<? extends BenchmarkModule>> workers;
private final ArrayList<Thread> workerThreads;
private final List<WorkloadConfiguration> workConfs;
private final ArrayList<LatencyRecord.Sample> samples = new ArrayList<>();
private final int intervalMonitor;

private ThreadBench(List<? extends Worker<? extends BenchmarkModule>> workers, List<WorkloadConfiguration> workConfs, int intervalMonitoring) {
private ThreadBench(List<? extends Worker<? extends BenchmarkModule>> workers,
List<WorkloadConfiguration> workConfs, int intervalMonitoring) {
this.workers = workers;
this.workConfs = workConfs;
this.workerThreads = new ArrayList<>(workers.size());
this.intervalMonitor = intervalMonitoring;
this.testState = new BenchmarkState(workers.size() + 1);
}

public static Results runRateLimitedBenchmark(List<Worker<? extends BenchmarkModule>> workers, List<WorkloadConfiguration> workConfs, int intervalMonitoring) {
public static Results runRateLimitedBenchmark(List<Worker<? extends BenchmarkModule>> workers,
List<WorkloadConfiguration> workConfs, int intervalMonitoring) {
ThreadBench bench = new ThreadBench(workers, workConfs, intervalMonitoring);
return bench.runRateLimitedMultiPhase();
}
Expand Down Expand Up @@ -110,7 +110,6 @@ private Results runRateLimitedMultiPhase() {
}

this.createWorkerThreads();
testState.blockForStart();

// long measureStart = start;

Expand All @@ -132,6 +131,15 @@ private Results runRateLimitedMultiPhase() {
}
}

// Change testState to cold query if execution is serial, since we don't
// have a warm-up phase for serial execution but execute a cold and a
// measured query in sequence.
if (phase != null && phase.isLatencyRun()) {
synchronized (testState) {
testState.startColdQuery();
}
}

long intervalNs = getInterval(lowestRate, phase.getArrival());

long nextInterval = start + intervalNs;
Expand All @@ -148,6 +156,9 @@ private Results runRateLimitedMultiPhase() {
new MonitorThread(this.intervalMonitor).start();
}

// Allow workers to start work.
testState.blockForStart();

// Main Loop
while (true) {
// posting new work... and resetting the queue in case we have new
Expand Down Expand Up @@ -181,15 +192,13 @@ private Results runRateLimitedMultiPhase() {
diff = nextInterval - now;
}


boolean phaseComplete = false;
if (phase != null) {
if (phase.isLatencyRun())
// Latency runs (serial run through each query) have their own
// state to mark completion
{
phaseComplete = testState.getState()
== State.LATENCY_COMPLETE;
phaseComplete = testState.getState() == State.LATENCY_COMPLETE;
} else {
phaseComplete = testState.getState() == State.MEASURE
&& (start + delta <= now);
Expand Down Expand Up @@ -221,7 +230,11 @@ private Results runRateLimitedMultiPhase() {
measureEnd = now;
LOG.info("{} :: Waiting for all terminals to finish ..", StringUtil.bold("TERMINATE"));
} else if (phase != null) {
phase.resetSerial();
// Reset serial execution parameters.
if (phase.isLatencyRun()) {
phase.resetSerial();
testState.startColdQuery();
}
LOG.info(phase.currentPhaseString());
if (phase.getRate() < lowestRate) {
lowestRate = phase.getRate();
Expand All @@ -248,8 +261,7 @@ private Results runRateLimitedMultiPhase() {
do {
intervalNs += getInterval(lowestRate, phase.getArrival());
nextToAdd++;
}
while ((-diff) > intervalNs && !lastEntry);
} while ((-diff) > intervalNs && !lastEntry);
nextInterval += intervalNs;
}

Expand Down Expand Up @@ -344,7 +356,6 @@ private long getInterval(int lowestRate, Phase.Arrival arrival) {
@Override
public void uncaughtException(Thread t, Throwable e) {


// HERE WE HANDLE THE CASE IN WHICH ONE OF OUR WOKERTHREADS DIED
LOG.error(e.getMessage(), e);
System.exit(-1);
Expand All @@ -367,7 +378,7 @@ public static final class TimeBucketIterable implements Iterable<DistributionSta
/**
* @param samples
* @param windowSizeSeconds
* @param transactionType Allows to filter transactions by type
* @param transactionType Allows to filter transactions by type
*/
public TimeBucketIterable(Iterable<Sample> samples, int windowSizeSeconds, TransactionType transactionType) {
this.samples = samples;
Expand Down Expand Up @@ -396,7 +407,8 @@ private static final class TimeBucketIterator implements Iterator<DistributionSt
* @param windowSizeSeconds
* @param txType Allows to filter transactions by type
*/
public TimeBucketIterator(Iterator<LatencyRecord.Sample> samples, int windowSizeSeconds, TransactionType txType) {
public TimeBucketIterator(Iterator<LatencyRecord.Sample> samples, int windowSizeSeconds,
TransactionType txType) {
this.samples = samples;
this.windowSizeSeconds = windowSizeSeconds;
this.txType = txType;
Expand All @@ -413,7 +425,6 @@ public TimeBucketIterator(Iterator<LatencyRecord.Sample> samples, int windowSize

private void calculateNext() {


// Collect all samples in the time window
ArrayList<Integer> latencies = new ArrayList<>();
long endNanoseconds = nextStartNanosecond + (windowSizeSeconds * 1000000000L);
Expand Down