diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 45aee545f82ce..66afce2a1e542 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
import kafka.server.{BrokerReconfigurable, KafkaConfig}
-import kafka.utils._
+import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
@@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, TransactionIndex}
+import org.apache.kafka.storage.internals.utils.Throttler
import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
@@ -109,12 +110,7 @@ class LogCleaner(initialConfig: CleanerConfig,
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel)
/* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
- private[log] val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
- checkIntervalMs = 300,
- throttleDown = true,
- "cleaner-io",
- "bytes",
- time = time)
+ private[log] val throttler = new Throttler(config.maxIoBytesPerSecond, 300, "cleaner-io", "bytes", time)
private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]()
diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala
deleted file mode 100644
index 286343cd4449f..0000000000000
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import org.apache.kafka.common.utils.Time
-import org.apache.kafka.server.metrics.KafkaMetricsGroup
-
-import java.util.concurrent.TimeUnit
-import java.util.Random
-
-import scala.math._
-
-/**
- * A class to measure and throttle the rate of some process. The throttler takes a desired rate-per-second
- * (the units of the process don't matter, it could be bytes or a count of some other thing), and will sleep for
- * an appropriate amount of time when maybeThrottle() is called to attain the desired rate.
- *
- * @param desiredRatePerSec: The rate we want to hit in units/sec
- * @param checkIntervalMs: The interval at which to check our rate
- * @param throttleDown: Does throttling increase or decrease our rate?
- * @param time: The time implementation to use
- */
-@threadsafe
-class Throttler(@volatile var desiredRatePerSec: Double,
- checkIntervalMs: Long = 100L,
- throttleDown: Boolean = true,
- metricName: String = "throttler",
- units: String = "entries",
- time: Time = Time.SYSTEM) extends Logging {
-
- private val metricsGroup = new KafkaMetricsGroup(this.getClass)
-
- private val lock = new Object
- private val meter = metricsGroup.newMeter(metricName, units, TimeUnit.SECONDS)
- private val checkIntervalNs = TimeUnit.MILLISECONDS.toNanos(checkIntervalMs)
- private var periodStartNs: Long = time.nanoseconds
- private var observedSoFar: Double = 0.0
-
- def maybeThrottle(observed: Double): Unit = {
- val msPerSec = TimeUnit.SECONDS.toMillis(1)
- val nsPerSec = TimeUnit.SECONDS.toNanos(1)
- val currentDesiredRatePerSec = desiredRatePerSec
-
- meter.mark(observed.toLong)
- lock synchronized {
- observedSoFar += observed
- val now = time.nanoseconds
- val elapsedNs = now - periodStartNs
- // if we have completed an interval AND we have observed something, maybe
- // we should take a little nap
- if (elapsedNs > checkIntervalNs && observedSoFar > 0) {
- val rateInSecs = (observedSoFar * nsPerSec) / elapsedNs
- val needAdjustment = !(throttleDown ^ (rateInSecs > currentDesiredRatePerSec))
- if (needAdjustment) {
- // solve for the amount of time to sleep to make us hit the desired rate
- val desiredRateMs = currentDesiredRatePerSec / msPerSec.toDouble
- val elapsedMs = TimeUnit.NANOSECONDS.toMillis(elapsedNs)
- val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs)
- if (sleepTime > 0) {
- trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, currentDesiredRatePerSec, sleepTime))
- time.sleep(sleepTime)
- }
- }
- periodStartNs = time.nanoseconds()
- observedSoFar = 0
- }
- }
- }
-
- def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = {
- desiredRatePerSec = updatedDesiredRatePerSec
- }
-}
-
-object Throttler {
-
- def main(args: Array[String]): Unit = {
- val rand = new Random()
- val throttler = new Throttler(100000, 100, true, time = Time.SYSTEM)
- val interval = 30000
- var start = System.currentTimeMillis
- var total = 0
- while (true) {
- val value = rand.nextInt(1000)
- Thread.sleep(1)
- throttler.maybeThrottle(value)
- total += value
- val now = System.currentTimeMillis
- if (now - start >= interval) {
- println(total / (interval/1000.0))
- start = now
- total = 0
- }
- }
- }
-}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 7e72a4852cafe..99b1e35e4eed9 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -19,7 +19,7 @@ package kafka.log
import kafka.common._
import kafka.server.{BrokerTopicStats, KafkaConfig}
-import kafka.utils._
+import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
@@ -30,6 +30,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.utils.Throttler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers
@@ -59,7 +60,7 @@ class LogCleanerTest extends Logging {
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
val logConfig = new LogConfig(logProps)
val time = new MockTime()
- val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
+ val throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", time)
val tombstoneRetentionMs = 86400000
val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 77b63bf89ac03..2394bc2d028aa 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -20,7 +20,7 @@ package kafka.log
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.server.{BrokerTopicStats, KafkaConfig}
-import kafka.utils._
+import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
@@ -39,6 +39,7 @@ import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
+import org.apache.kafka.storage.internals.utils.Throttler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
@@ -1046,7 +1047,7 @@ class UnifiedLogTest {
ioBufferSize = 64 * 1024,
maxIoBufferSize = 64 * 1024,
dupBufferLoadFactor = 0.75,
- throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = mockTime),
+ throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", mockTime),
time = mockTime,
checkDone = _ => {})
diff --git a/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala b/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
deleted file mode 100755
index a34e2ea12cf5d..0000000000000
--- a/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.utils
-
-import org.apache.kafka.server.util.MockTime
-import org.junit.jupiter.api.Test
-import org.junit.jupiter.api.Assertions.{assertTrue, assertEquals}
-
-
-class ThrottlerTest {
- @Test
- def testThrottleDesiredRate(): Unit = {
- val throttleCheckIntervalMs = 100
- val desiredCountPerSec = 1000.0
- val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0
-
- val mockTime = new MockTime()
- val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
- checkIntervalMs = throttleCheckIntervalMs,
- time = mockTime)
-
- // Observe desiredCountPerInterval at t1
- val t1 = mockTime.milliseconds()
- throttler.maybeThrottle(desiredCountPerInterval)
- assertEquals(t1, mockTime.milliseconds())
-
- // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
- mockTime.sleep(throttleCheckIntervalMs + 1)
- throttler.maybeThrottle(desiredCountPerInterval)
- val t2 = mockTime.milliseconds()
- assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
-
- // Observe desiredCountPerInterval at t2
- throttler.maybeThrottle(desiredCountPerInterval)
- assertEquals(t2, mockTime.milliseconds())
-
- // Observe desiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
- mockTime.sleep(throttleCheckIntervalMs + 1)
- throttler.maybeThrottle(desiredCountPerInterval)
- val t3 = mockTime.milliseconds()
- assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs)
-
- val elapsedTimeMs = t3 - t1
- val actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs
- assertTrue(actualCountPerSec <= desiredCountPerSec)
- }
-
- @Test
- def testUpdateThrottleDesiredRate(): Unit = {
- val throttleCheckIntervalMs = 100
- val desiredCountPerSec = 1000.0
- val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0
- val updatedDesiredCountPerSec = 1500.0
- val updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0
-
- val mockTime = new MockTime()
- val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
- checkIntervalMs = throttleCheckIntervalMs,
- time = mockTime)
-
- // Observe desiredCountPerInterval at t1
- val t1 = mockTime.milliseconds()
- throttler.maybeThrottle(desiredCountPerInterval)
- assertEquals(t1, mockTime.milliseconds())
-
- // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
- mockTime.sleep(throttleCheckIntervalMs + 1)
- throttler.maybeThrottle(desiredCountPerInterval)
- val t2 = mockTime.milliseconds()
- assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
-
- val elapsedTimeMs = t2 - t1
- val actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs
- assertTrue(actualCountPerSec <= desiredCountPerSec)
-
- // Update ThrottleDesiredRate
- throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec)
-
- // Observe updatedDesiredCountPerInterval at t2
- throttler.maybeThrottle(updatedDesiredCountPerInterval)
- assertEquals(t2, mockTime.milliseconds())
-
- // Observe updatedDesiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
- mockTime.sleep(throttleCheckIntervalMs + 1)
- throttler.maybeThrottle(updatedDesiredCountPerInterval)
- val t3 = mockTime.milliseconds()
- assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs)
-
- val updatedElapsedTimeMs = t3 - t2
- val updatedActualCountPerSec = 2 * updatedDesiredCountPerInterval * 1000 / updatedElapsedTimeMs
- assertTrue(updatedActualCountPerSec <= updatedDesiredCountPerSec)
- }
-}
diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml
index 8fff85846c32b..7c4b96f077d4c 100644
--- a/gradle/spotbugs-exclude.xml
+++ b/gradle/spotbugs-exclude.xml
@@ -490,14 +490,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
-
-
-
-
-
-
-
-
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/utils/Throttler.java b/storage/src/main/java/org/apache/kafka/storage/internals/utils/Throttler.java
new file mode 100644
index 0000000000000..6622f2f57de35
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/utils/Throttler.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.utils;
+
+import com.yammer.metrics.core.Meter;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.metrics.KafkaMetricsGroup;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A class to measure and throttle the rate of some process.
+ */
+public class Throttler {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Throttler.class);
+ private static final long MS_PER_SEC = TimeUnit.SECONDS.toMillis(1);
+ private static final long NS_PER_SEC = TimeUnit.SECONDS.toNanos(1);
+
+ private final Object lock = new Object();
+ private final long checkIntervalNs;
+ private final Meter meter;
+ private final Time time;
+
+ private volatile double desiredRatePerSec;
+ private long periodStartNs;
+ private double observedSoFar;
+
+ /**
+ * The throttler takes a desired rate-per-second (the units of the process don't matter, it could be bytes
+ * or a count of some other thing), and will sleep for an appropriate amount of time when maybeThrottle()
+ * is called to attain the desired rate.
+ *
+ * @param desiredRatePerSec The rate we want to hit in units/sec
+ * @param checkIntervalMs The interval at which to check our rate
+ * @param metricName The name of the metric
+ * @param units The name of the unit
+ * @param time The time implementation to use
+ */
+ public Throttler(double desiredRatePerSec,
+ long checkIntervalMs,
+ String metricName,
+ String units,
+ Time time) {
+ this.desiredRatePerSec = desiredRatePerSec;
+ this.checkIntervalNs = TimeUnit.MILLISECONDS.toNanos(checkIntervalMs);
+ this.meter = new KafkaMetricsGroup(Throttler.class).newMeter(metricName, units, TimeUnit.SECONDS);
+ this.time = time;
+ this.periodStartNs = time.nanoseconds();
+ }
+
+ public void updateDesiredRatePerSec(double updatedDesiredRatePerSec) {
+ desiredRatePerSec = updatedDesiredRatePerSec;
+ }
+
+ public double desiredRatePerSec() {
+ return desiredRatePerSec;
+ }
+
+ public void maybeThrottle(double observed) {
+ double currentDesiredRatePerSec = desiredRatePerSec;
+ meter.mark((long) observed);
+ synchronized (lock) {
+ observedSoFar += observed;
+ long now = time.nanoseconds();
+ long elapsedNs = now - periodStartNs;
+ // if we have completed an interval AND we have observed something, maybe
+ // we should take a little nap
+ if (elapsedNs > checkIntervalNs && observedSoFar > 0) {
+ double rateInSecs = (observedSoFar * NS_PER_SEC) / elapsedNs;
+ if (rateInSecs > currentDesiredRatePerSec) {
+ // solve for the amount of time to sleep to make us hit the desired rate
+ double desiredRateMs = currentDesiredRatePerSec / MS_PER_SEC;
+ long elapsedMs = TimeUnit.NANOSECONDS.toMillis(elapsedNs);
+ long sleepTime = Math.round(observedSoFar / desiredRateMs - elapsedMs);
+ if (sleepTime > 0) {
+ LOG.trace("Natural rate is {} per second but desired rate is {}, sleeping for {} ms to compensate.", rateInSecs, currentDesiredRatePerSec, sleepTime);
+ time.sleep(sleepTime);
+ }
+ }
+ periodStartNs = time.nanoseconds();
+ observedSoFar = 0.0;
+ }
+ }
+ }
+}
diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/utils/ThrottlerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/utils/ThrottlerTest.java
new file mode 100644
index 0000000000000..6a42cb7d06ca4
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/storage/internals/utils/ThrottlerTest.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.utils;
+
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.util.MockTime;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+class ThrottlerTest {
+
+ @Test
+ public void testThrottleDesiredRate() {
+ long throttleCheckIntervalMs = 100L;
+ double desiredCountPerSec = 1000.0;
+ double desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0;
+
+ Time mockTime = new MockTime();
+ Throttler throttler = new Throttler(desiredCountPerSec,
+ throttleCheckIntervalMs,
+ "throttler",
+ "entries",
+ mockTime);
+
+ // Observe desiredCountPerInterval at t1
+ long t1 = mockTime.milliseconds();
+ throttler.maybeThrottle(desiredCountPerInterval);
+ assertEquals(t1, mockTime.milliseconds());
+
+ // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
+ mockTime.sleep(throttleCheckIntervalMs + 1);
+ throttler.maybeThrottle(desiredCountPerInterval);
+ long t2 = mockTime.milliseconds();
+ assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs);
+
+ // Observe desiredCountPerInterval at t2
+ throttler.maybeThrottle(desiredCountPerInterval);
+ assertEquals(t2, mockTime.milliseconds());
+
+ // Observe desiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
+ mockTime.sleep(throttleCheckIntervalMs + 1);
+ throttler.maybeThrottle(desiredCountPerInterval);
+ long t3 = mockTime.milliseconds();
+ assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs);
+
+ long elapsedTimeMs = t3 - t1;
+ double actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs;
+ assertTrue(actualCountPerSec <= desiredCountPerSec);
+ }
+
+ @Test
+ public void testUpdateThrottleDesiredRate() {
+ long throttleCheckIntervalMs = 100L;
+ double desiredCountPerSec = 1000.0;
+ double desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0;
+ double updatedDesiredCountPerSec = 1500.0;
+ double updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0;
+
+ Time mockTime = new MockTime();
+ Throttler throttler = new Throttler(desiredCountPerSec,
+ throttleCheckIntervalMs,
+ "throttler",
+ "entries",
+ mockTime);
+
+ // Observe desiredCountPerInterval at t1
+ long t1 = mockTime.milliseconds();
+ throttler.maybeThrottle(desiredCountPerInterval);
+ assertEquals(t1, mockTime.milliseconds());
+
+ // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
+ mockTime.sleep(throttleCheckIntervalMs + 1);
+ throttler.maybeThrottle(desiredCountPerInterval);
+ long t2 = mockTime.milliseconds();
+ assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs);
+
+ long elapsedTimeMs = t2 - t1;
+ double actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs;
+ assertTrue(actualCountPerSec <= desiredCountPerSec);
+
+ // Update ThrottleDesiredRate
+ throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec);
+
+ // Observe updatedDesiredCountPerInterval at t2
+ throttler.maybeThrottle(updatedDesiredCountPerInterval);
+ assertEquals(t2, mockTime.milliseconds());
+
+ // Observe updatedDesiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
+ mockTime.sleep(throttleCheckIntervalMs + 1);
+ throttler.maybeThrottle(updatedDesiredCountPerInterval);
+ long t3 = mockTime.milliseconds();
+ assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs);
+
+ long updatedElapsedTimeMs = t3 - t2;
+ double updatedActualCountPerSec = 2 * updatedDesiredCountPerInterval * 1000 / updatedElapsedTimeMs;
+ assertTrue(updatedActualCountPerSec <= updatedDesiredCountPerSec);
+ }
+}