From e4e1116156d44d5e7a52ad8fb51a57d5e5755710 Mon Sep 17 00:00:00 2001 From: Mickael Maison Date: Wed, 22 May 2024 18:47:31 +0200 Subject: [PATCH] MINOR: Move Throttler to storage module (#16023) Reviewers: Chia-Ping Tsai --- .../src/main/scala/kafka/log/LogCleaner.scala | 10 +- .../main/scala/kafka/utils/Throttler.scala | 111 ----------------- .../scala/unit/kafka/log/LogCleanerTest.scala | 5 +- .../scala/unit/kafka/log/UnifiedLogTest.scala | 5 +- .../unit/kafka/utils/ThrottlerTest.scala | 108 ---------------- gradle/spotbugs-exclude.xml | 8 -- .../storage/internals/utils/Throttler.java | 102 ++++++++++++++++ .../internals/utils/ThrottlerTest.java | 115 ++++++++++++++++++ 8 files changed, 226 insertions(+), 238 deletions(-) delete mode 100644 core/src/main/scala/kafka/utils/Throttler.scala delete mode 100755 core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala create mode 100644 storage/src/main/java/org/apache/kafka/storage/internals/utils/Throttler.java create mode 100644 storage/src/test/java/org/apache/kafka/storage/internals/utils/ThrottlerTest.java diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala index 45aee545f82ce..66afce2a1e542 100644 --- a/core/src/main/scala/kafka/log/LogCleaner.scala +++ b/core/src/main/scala/kafka/log/LogCleaner.scala @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit import kafka.common._ import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName} import kafka.server.{BrokerReconfigurable, KafkaConfig} -import kafka.utils._ +import kafka.utils.{Logging, Pool} import org.apache.kafka.common.{KafkaException, TopicPartition} import org.apache.kafka.common.config.ConfigException import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException} @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time} import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.ShutdownableThread import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, TransactionIndex} +import org.apache.kafka.storage.internals.utils.Throttler import scala.jdk.CollectionConverters._ import scala.collection.mutable.ListBuffer @@ -109,12 +110,7 @@ class LogCleaner(initialConfig: CleanerConfig, private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel) /* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */ - private[log] val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond, - checkIntervalMs = 300, - throttleDown = true, - "cleaner-io", - "bytes", - time = time) + private[log] val throttler = new Throttler(config.maxIoBytesPerSecond, 300, "cleaner-io", "bytes", time) private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]() diff --git a/core/src/main/scala/kafka/utils/Throttler.scala b/core/src/main/scala/kafka/utils/Throttler.scala deleted file mode 100644 index 286343cd4449f..0000000000000 --- a/core/src/main/scala/kafka/utils/Throttler.scala +++ /dev/null @@ -1,111 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import org.apache.kafka.common.utils.Time -import org.apache.kafka.server.metrics.KafkaMetricsGroup - -import java.util.concurrent.TimeUnit -import java.util.Random - -import scala.math._ - -/** - * A class to measure and throttle the rate of some process. The throttler takes a desired rate-per-second - * (the units of the process don't matter, it could be bytes or a count of some other thing), and will sleep for - * an appropriate amount of time when maybeThrottle() is called to attain the desired rate. - * - * @param desiredRatePerSec: The rate we want to hit in units/sec - * @param checkIntervalMs: The interval at which to check our rate - * @param throttleDown: Does throttling increase or decrease our rate? - * @param time: The time implementation to use - */ -@threadsafe -class Throttler(@volatile var desiredRatePerSec: Double, - checkIntervalMs: Long = 100L, - throttleDown: Boolean = true, - metricName: String = "throttler", - units: String = "entries", - time: Time = Time.SYSTEM) extends Logging { - - private val metricsGroup = new KafkaMetricsGroup(this.getClass) - - private val lock = new Object - private val meter = metricsGroup.newMeter(metricName, units, TimeUnit.SECONDS) - private val checkIntervalNs = TimeUnit.MILLISECONDS.toNanos(checkIntervalMs) - private var periodStartNs: Long = time.nanoseconds - private var observedSoFar: Double = 0.0 - - def maybeThrottle(observed: Double): Unit = { - val msPerSec = TimeUnit.SECONDS.toMillis(1) - val nsPerSec = TimeUnit.SECONDS.toNanos(1) - val currentDesiredRatePerSec = desiredRatePerSec - - meter.mark(observed.toLong) - lock synchronized { - observedSoFar += observed - val now = time.nanoseconds - val elapsedNs = now - periodStartNs - // if we have completed an interval AND we have observed something, maybe - // we should take a little nap - if (elapsedNs > checkIntervalNs && observedSoFar > 0) { - val rateInSecs = (observedSoFar * nsPerSec) / elapsedNs - val needAdjustment = !(throttleDown ^ (rateInSecs > currentDesiredRatePerSec)) - if (needAdjustment) { - // solve for the amount of time to sleep to make us hit the desired rate - val desiredRateMs = currentDesiredRatePerSec / msPerSec.toDouble - val elapsedMs = TimeUnit.NANOSECONDS.toMillis(elapsedNs) - val sleepTime = round(observedSoFar / desiredRateMs - elapsedMs) - if (sleepTime > 0) { - trace("Natural rate is %f per second but desired rate is %f, sleeping for %d ms to compensate.".format(rateInSecs, currentDesiredRatePerSec, sleepTime)) - time.sleep(sleepTime) - } - } - periodStartNs = time.nanoseconds() - observedSoFar = 0 - } - } - } - - def updateDesiredRatePerSec(updatedDesiredRatePerSec: Double): Unit = { - desiredRatePerSec = updatedDesiredRatePerSec - } -} - -object Throttler { - - def main(args: Array[String]): Unit = { - val rand = new Random() - val throttler = new Throttler(100000, 100, true, time = Time.SYSTEM) - val interval = 30000 - var start = System.currentTimeMillis - var total = 0 - while (true) { - val value = rand.nextInt(1000) - Thread.sleep(1) - throttler.maybeThrottle(value) - total += value - val now = System.currentTimeMillis - if (now - start >= interval) { - println(total / (interval/1000.0)) - start = now - total = 0 - } - } - } -} diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala index 7e72a4852cafe..99b1e35e4eed9 100644 --- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala @@ -19,7 +19,7 @@ package kafka.log import kafka.common._ import kafka.server.{BrokerTopicStats, KafkaConfig} -import kafka.utils._ +import kafka.utils.{CoreUtils, Logging, Pool, TestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig @@ -30,6 +30,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfigs import org.apache.kafka.server.metrics.KafkaMetricsGroup import org.apache.kafka.server.util.MockTime import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig} +import org.apache.kafka.storage.internals.utils.Throttler import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, Test} import org.mockito.ArgumentMatchers @@ -59,7 +60,7 @@ class LogCleanerTest extends Logging { logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT) val logConfig = new LogConfig(logProps) val time = new MockTime() - val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time) + val throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", time) val tombstoneRetentionMs = 86400000 val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1 val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false) diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala index 77b63bf89ac03..2394bc2d028aa 100755 --- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala +++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala @@ -20,7 +20,7 @@ package kafka.log import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException} import kafka.log.remote.RemoteLogManager import kafka.server.{BrokerTopicStats, KafkaConfig} -import kafka.utils._ +import kafka.utils.TestUtils import org.apache.kafka.common.compress.Compression import org.apache.kafka.common.config.TopicConfig import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid} @@ -39,6 +39,7 @@ import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler} import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile} import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard} +import org.apache.kafka.storage.internals.utils.Throttler import org.junit.jupiter.api.Assertions._ import org.junit.jupiter.api.{AfterEach, BeforeEach, Test} import org.junit.jupiter.params.ParameterizedTest @@ -1046,7 +1047,7 @@ class UnifiedLogTest { ioBufferSize = 64 * 1024, maxIoBufferSize = 64 * 1024, dupBufferLoadFactor = 0.75, - throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = mockTime), + throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", mockTime), time = mockTime, checkDone = _ => {}) diff --git a/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala b/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala deleted file mode 100755 index a34e2ea12cf5d..0000000000000 --- a/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import org.apache.kafka.server.util.MockTime -import org.junit.jupiter.api.Test -import org.junit.jupiter.api.Assertions.{assertTrue, assertEquals} - - -class ThrottlerTest { - @Test - def testThrottleDesiredRate(): Unit = { - val throttleCheckIntervalMs = 100 - val desiredCountPerSec = 1000.0 - val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0 - - val mockTime = new MockTime() - val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec, - checkIntervalMs = throttleCheckIntervalMs, - time = mockTime) - - // Observe desiredCountPerInterval at t1 - val t1 = mockTime.milliseconds() - throttler.maybeThrottle(desiredCountPerInterval) - assertEquals(t1, mockTime.milliseconds()) - - // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1, - mockTime.sleep(throttleCheckIntervalMs + 1) - throttler.maybeThrottle(desiredCountPerInterval) - val t2 = mockTime.milliseconds() - assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs) - - // Observe desiredCountPerInterval at t2 - throttler.maybeThrottle(desiredCountPerInterval) - assertEquals(t2, mockTime.milliseconds()) - - // Observe desiredCountPerInterval at t2 + throttleCheckIntervalMs + 1 - mockTime.sleep(throttleCheckIntervalMs + 1) - throttler.maybeThrottle(desiredCountPerInterval) - val t3 = mockTime.milliseconds() - assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs) - - val elapsedTimeMs = t3 - t1 - val actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs - assertTrue(actualCountPerSec <= desiredCountPerSec) - } - - @Test - def testUpdateThrottleDesiredRate(): Unit = { - val throttleCheckIntervalMs = 100 - val desiredCountPerSec = 1000.0 - val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0 - val updatedDesiredCountPerSec = 1500.0 - val updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0 - - val mockTime = new MockTime() - val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec, - checkIntervalMs = throttleCheckIntervalMs, - time = mockTime) - - // Observe desiredCountPerInterval at t1 - val t1 = mockTime.milliseconds() - throttler.maybeThrottle(desiredCountPerInterval) - assertEquals(t1, mockTime.milliseconds()) - - // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1, - mockTime.sleep(throttleCheckIntervalMs + 1) - throttler.maybeThrottle(desiredCountPerInterval) - val t2 = mockTime.milliseconds() - assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs) - - val elapsedTimeMs = t2 - t1 - val actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs - assertTrue(actualCountPerSec <= desiredCountPerSec) - - // Update ThrottleDesiredRate - throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec) - - // Observe updatedDesiredCountPerInterval at t2 - throttler.maybeThrottle(updatedDesiredCountPerInterval) - assertEquals(t2, mockTime.milliseconds()) - - // Observe updatedDesiredCountPerInterval at t2 + throttleCheckIntervalMs + 1 - mockTime.sleep(throttleCheckIntervalMs + 1) - throttler.maybeThrottle(updatedDesiredCountPerInterval) - val t3 = mockTime.milliseconds() - assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs) - - val updatedElapsedTimeMs = t3 - t2 - val updatedActualCountPerSec = 2 * updatedDesiredCountPerInterval * 1000 / updatedElapsedTimeMs - assertTrue(updatedActualCountPerSec <= updatedDesiredCountPerSec) - } -} diff --git a/gradle/spotbugs-exclude.xml b/gradle/spotbugs-exclude.xml index 8fff85846c32b..7c4b96f077d4c 100644 --- a/gradle/spotbugs-exclude.xml +++ b/gradle/spotbugs-exclude.xml @@ -490,14 +490,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read - - - - - - - - diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/utils/Throttler.java b/storage/src/main/java/org/apache/kafka/storage/internals/utils/Throttler.java new file mode 100644 index 0000000000000..6622f2f57de35 --- /dev/null +++ b/storage/src/main/java/org/apache/kafka/storage/internals/utils/Throttler.java @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.utils; + +import com.yammer.metrics.core.Meter; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.metrics.KafkaMetricsGroup; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.TimeUnit; + +/** + * A class to measure and throttle the rate of some process. + */ +public class Throttler { + + private static final Logger LOG = LoggerFactory.getLogger(Throttler.class); + private static final long MS_PER_SEC = TimeUnit.SECONDS.toMillis(1); + private static final long NS_PER_SEC = TimeUnit.SECONDS.toNanos(1); + + private final Object lock = new Object(); + private final long checkIntervalNs; + private final Meter meter; + private final Time time; + + private volatile double desiredRatePerSec; + private long periodStartNs; + private double observedSoFar; + + /** + * The throttler takes a desired rate-per-second (the units of the process don't matter, it could be bytes + * or a count of some other thing), and will sleep for an appropriate amount of time when maybeThrottle() + * is called to attain the desired rate. + * + * @param desiredRatePerSec The rate we want to hit in units/sec + * @param checkIntervalMs The interval at which to check our rate + * @param metricName The name of the metric + * @param units The name of the unit + * @param time The time implementation to use + */ + public Throttler(double desiredRatePerSec, + long checkIntervalMs, + String metricName, + String units, + Time time) { + this.desiredRatePerSec = desiredRatePerSec; + this.checkIntervalNs = TimeUnit.MILLISECONDS.toNanos(checkIntervalMs); + this.meter = new KafkaMetricsGroup(Throttler.class).newMeter(metricName, units, TimeUnit.SECONDS); + this.time = time; + this.periodStartNs = time.nanoseconds(); + } + + public void updateDesiredRatePerSec(double updatedDesiredRatePerSec) { + desiredRatePerSec = updatedDesiredRatePerSec; + } + + public double desiredRatePerSec() { + return desiredRatePerSec; + } + + public void maybeThrottle(double observed) { + double currentDesiredRatePerSec = desiredRatePerSec; + meter.mark((long) observed); + synchronized (lock) { + observedSoFar += observed; + long now = time.nanoseconds(); + long elapsedNs = now - periodStartNs; + // if we have completed an interval AND we have observed something, maybe + // we should take a little nap + if (elapsedNs > checkIntervalNs && observedSoFar > 0) { + double rateInSecs = (observedSoFar * NS_PER_SEC) / elapsedNs; + if (rateInSecs > currentDesiredRatePerSec) { + // solve for the amount of time to sleep to make us hit the desired rate + double desiredRateMs = currentDesiredRatePerSec / MS_PER_SEC; + long elapsedMs = TimeUnit.NANOSECONDS.toMillis(elapsedNs); + long sleepTime = Math.round(observedSoFar / desiredRateMs - elapsedMs); + if (sleepTime > 0) { + LOG.trace("Natural rate is {} per second but desired rate is {}, sleeping for {} ms to compensate.", rateInSecs, currentDesiredRatePerSec, sleepTime); + time.sleep(sleepTime); + } + } + periodStartNs = time.nanoseconds(); + observedSoFar = 0.0; + } + } + } +} diff --git a/storage/src/test/java/org/apache/kafka/storage/internals/utils/ThrottlerTest.java b/storage/src/test/java/org/apache/kafka/storage/internals/utils/ThrottlerTest.java new file mode 100644 index 0000000000000..6a42cb7d06ca4 --- /dev/null +++ b/storage/src/test/java/org/apache/kafka/storage/internals/utils/ThrottlerTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.storage.internals.utils; + +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.server.util.MockTime; +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.assertEquals; + + +class ThrottlerTest { + + @Test + public void testThrottleDesiredRate() { + long throttleCheckIntervalMs = 100L; + double desiredCountPerSec = 1000.0; + double desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0; + + Time mockTime = new MockTime(); + Throttler throttler = new Throttler(desiredCountPerSec, + throttleCheckIntervalMs, + "throttler", + "entries", + mockTime); + + // Observe desiredCountPerInterval at t1 + long t1 = mockTime.milliseconds(); + throttler.maybeThrottle(desiredCountPerInterval); + assertEquals(t1, mockTime.milliseconds()); + + // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1, + mockTime.sleep(throttleCheckIntervalMs + 1); + throttler.maybeThrottle(desiredCountPerInterval); + long t2 = mockTime.milliseconds(); + assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs); + + // Observe desiredCountPerInterval at t2 + throttler.maybeThrottle(desiredCountPerInterval); + assertEquals(t2, mockTime.milliseconds()); + + // Observe desiredCountPerInterval at t2 + throttleCheckIntervalMs + 1 + mockTime.sleep(throttleCheckIntervalMs + 1); + throttler.maybeThrottle(desiredCountPerInterval); + long t3 = mockTime.milliseconds(); + assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs); + + long elapsedTimeMs = t3 - t1; + double actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs; + assertTrue(actualCountPerSec <= desiredCountPerSec); + } + + @Test + public void testUpdateThrottleDesiredRate() { + long throttleCheckIntervalMs = 100L; + double desiredCountPerSec = 1000.0; + double desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs / 1000.0; + double updatedDesiredCountPerSec = 1500.0; + double updatedDesiredCountPerInterval = updatedDesiredCountPerSec * throttleCheckIntervalMs / 1000.0; + + Time mockTime = new MockTime(); + Throttler throttler = new Throttler(desiredCountPerSec, + throttleCheckIntervalMs, + "throttler", + "entries", + mockTime); + + // Observe desiredCountPerInterval at t1 + long t1 = mockTime.milliseconds(); + throttler.maybeThrottle(desiredCountPerInterval); + assertEquals(t1, mockTime.milliseconds()); + + // Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1, + mockTime.sleep(throttleCheckIntervalMs + 1); + throttler.maybeThrottle(desiredCountPerInterval); + long t2 = mockTime.milliseconds(); + assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs); + + long elapsedTimeMs = t2 - t1; + double actualCountPerSec = 2 * desiredCountPerInterval * 1000 / elapsedTimeMs; + assertTrue(actualCountPerSec <= desiredCountPerSec); + + // Update ThrottleDesiredRate + throttler.updateDesiredRatePerSec(updatedDesiredCountPerSec); + + // Observe updatedDesiredCountPerInterval at t2 + throttler.maybeThrottle(updatedDesiredCountPerInterval); + assertEquals(t2, mockTime.milliseconds()); + + // Observe updatedDesiredCountPerInterval at t2 + throttleCheckIntervalMs + 1 + mockTime.sleep(throttleCheckIntervalMs + 1); + throttler.maybeThrottle(updatedDesiredCountPerInterval); + long t3 = mockTime.milliseconds(); + assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs); + + long updatedElapsedTimeMs = t3 - t2; + double updatedActualCountPerSec = 2 * updatedDesiredCountPerInterval * 1000 / updatedElapsedTimeMs; + assertTrue(updatedActualCountPerSec <= updatedDesiredCountPerSec); + } +}