From a18cd91e31272bce464d04bdc53e7247fad17c32 Mon Sep 17 00:00:00 2001 From: quisacrc <97376530+quisacrc@users.noreply.github.com> Date: Mon, 20 Jun 2022 04:30:12 +0200 Subject: [PATCH 1/3] Update WorkloadState.java --- src/main/java/com/oltpbenchmark/WorkloadState.java | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main/java/com/oltpbenchmark/WorkloadState.java b/src/main/java/com/oltpbenchmark/WorkloadState.java index 865a9b4e0..581d995e3 100644 --- a/src/main/java/com/oltpbenchmark/WorkloadState.java +++ b/src/main/java/com/oltpbenchmark/WorkloadState.java @@ -71,11 +71,17 @@ public void addToQueue(int amount, boolean resetQueues) { return; } else { // Add the specified number of procedures to the end of the queue. - for (int i = 0; i < amount; ++i) { + for (int i = 0; i < amount && workQueue.size() <= RATE_QUEUE_LIMIT; ++i) { workQueue.add(new SubmittedProcedure(currentPhase.chooseTransaction())); } } + // The following approach has 2 issues. + // 1. If the rate is very high, it unnecessarily enqueues and dequeues for the excessive amount + // every time the work is added to the queue. + // 2. If the rate is very high, the memory overflow occurs. Even if we increase the Java heap size, + // if the rate is also raised, the memory error will occur again. + // We can elminitate this memory error entirely with a simple fix of the algorithm. // Can't keep up with current rate? Remove the oldest transactions // (from the front of the queue). while (workQueue.size() > RATE_QUEUE_LIMIT) { From bb3be8b277a58bb074b3d6971bcdb0692314bfb1 Mon Sep 17 00:00:00 2001 From: quisacrc <97376530+quisacrc@users.noreply.github.com> Date: Mon, 20 Jun 2022 16:20:49 +0200 Subject: [PATCH 2/3] Update WorkloadState.java --- src/main/java/com/oltpbenchmark/WorkloadState.java | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/main/java/com/oltpbenchmark/WorkloadState.java b/src/main/java/com/oltpbenchmark/WorkloadState.java index 581d995e3..8c9917ce3 100644 --- a/src/main/java/com/oltpbenchmark/WorkloadState.java +++ b/src/main/java/com/oltpbenchmark/WorkloadState.java @@ -71,23 +71,12 @@ public void addToQueue(int amount, boolean resetQueues) { return; } else { // Add the specified number of procedures to the end of the queue. + // If we can't keep up with current rate, truncate transactions for (int i = 0; i < amount && workQueue.size() <= RATE_QUEUE_LIMIT; ++i) { workQueue.add(new SubmittedProcedure(currentPhase.chooseTransaction())); } } - // The following approach has 2 issues. - // 1. If the rate is very high, it unnecessarily enqueues and dequeues for the excessive amount - // every time the work is added to the queue. - // 2. If the rate is very high, the memory overflow occurs. Even if we increase the Java heap size, - // if the rate is also raised, the memory error will occur again. - // We can elminitate this memory error entirely with a simple fix of the algorithm. - // Can't keep up with current rate? Remove the oldest transactions - // (from the front of the queue). - while (workQueue.size() > RATE_QUEUE_LIMIT) { - workQueue.remove(); - } - // Wake up sleeping workers to deal with the new work. int numToWake = Math.min(amount, workersWaiting); for (int i = 0; i < numToWake; ++i) { From 7bd50d7b26a253ee5c0ec49f67b1c5bdb6d55d46 Mon Sep 17 00:00:00 2001 From: Andy Pavlo Date: Mon, 20 Jun 2022 13:55:37 -0400 Subject: [PATCH 3/3] Refactor WorkloadState.addToQueue to only notify threads for the amount of work we've added. --- .../java/com/oltpbenchmark/WorkloadState.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/oltpbenchmark/WorkloadState.java b/src/main/java/com/oltpbenchmark/WorkloadState.java index 8c9917ce3..de12ec8ce 100644 --- a/src/main/java/com/oltpbenchmark/WorkloadState.java +++ b/src/main/java/com/oltpbenchmark/WorkloadState.java @@ -59,27 +59,29 @@ public WorkloadState(BenchmarkState benchmarkState, List works, int num_t * Add a request to do work. */ public void addToQueue(int amount, boolean resetQueues) { + int workAdded = 0; + synchronized (this) { if (resetQueues) { workQueue.clear(); } - // Only use the work queue if the phase is enabled and rate limited. if (currentPhase == null || currentPhase.isDisabled() || !currentPhase.isRateLimited() || currentPhase.isSerial()) { return; - } else { - // Add the specified number of procedures to the end of the queue. - // If we can't keep up with current rate, truncate transactions - for (int i = 0; i < amount && workQueue.size() <= RATE_QUEUE_LIMIT; ++i) { - workQueue.add(new SubmittedProcedure(currentPhase.chooseTransaction())); - } + } + + // Add the specified number of procedures to the end of the queue. + // If we can't keep up with current rate, truncate transactions + for (int i = 0; i < amount && workQueue.size() <= RATE_QUEUE_LIMIT; ++i) { + workQueue.add(new SubmittedProcedure(currentPhase.chooseTransaction())); + workAdded++; } // Wake up sleeping workers to deal with the new work. - int numToWake = Math.min(amount, workersWaiting); - for (int i = 0; i < numToWake; ++i) { + int numToWake = Math.min(workAdded, workersWaiting); + while (numToWake-- > 0) { this.notify(); } }