diff --git a/src/main/java/com/oltpbenchmark/WorkloadState.java b/src/main/java/com/oltpbenchmark/WorkloadState.java index 865a9b4e0..de12ec8ce 100644 --- a/src/main/java/com/oltpbenchmark/WorkloadState.java +++ b/src/main/java/com/oltpbenchmark/WorkloadState.java @@ -59,32 +59,29 @@ public WorkloadState(BenchmarkState benchmarkState, List works, int num_t * Add a request to do work. */ public void addToQueue(int amount, boolean resetQueues) { + int workAdded = 0; + synchronized (this) { if (resetQueues) { workQueue.clear(); } - // Only use the work queue if the phase is enabled and rate limited. if (currentPhase == null || currentPhase.isDisabled() || !currentPhase.isRateLimited() || currentPhase.isSerial()) { return; - } else { - // Add the specified number of procedures to the end of the queue. - for (int i = 0; i < amount; ++i) { - workQueue.add(new SubmittedProcedure(currentPhase.chooseTransaction())); - } } - - // Can't keep up with current rate? Remove the oldest transactions - // (from the front of the queue). - while (workQueue.size() > RATE_QUEUE_LIMIT) { - workQueue.remove(); + + // Add the specified number of procedures to the end of the queue. + // If we can't keep up with current rate, truncate transactions + for (int i = 0; i < amount && workQueue.size() <= RATE_QUEUE_LIMIT; ++i) { + workQueue.add(new SubmittedProcedure(currentPhase.chooseTransaction())); + workAdded++; } // Wake up sleeping workers to deal with the new work. - int numToWake = Math.min(amount, workersWaiting); - for (int i = 0; i < numToWake; ++i) { + int numToWake = Math.min(workAdded, workersWaiting); + while (numToWake-- > 0) { this.notify(); } }