Skip to content

Commit

Permalink
MINOR: Move Throttler to storage module (apache#16023)
Browse files Browse the repository at this point in the history
Reviewers: Chia-Ping Tsai <[email protected]>
  • Loading branch information
mimaison authored and gongxuanzhang committed Jun 12, 2024
1 parent f83cdfa commit 3a72cb4
Show file tree
Hide file tree
Showing 8 changed files with 226 additions and 238 deletions.
10 changes: 3 additions & 7 deletions core/src/main/scala/kafka/log/LogCleaner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.concurrent.TimeUnit
import kafka.common._
import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
import kafka.server.{BrokerReconfigurable, KafkaConfig}
import kafka.utils._
import kafka.utils.{Logging, Pool}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.errors.{CorruptRecordException, KafkaStorageException}
Expand All @@ -35,6 +35,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time}
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, LastRecord, LogDirFailureChannel, LogSegment, LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, TransactionIndex}
import org.apache.kafka.storage.internals.utils.Throttler

import scala.jdk.CollectionConverters._
import scala.collection.mutable.ListBuffer
Expand Down Expand Up @@ -109,12 +110,7 @@ class LogCleaner(initialConfig: CleanerConfig,
private[log] val cleanerManager = new LogCleanerManager(logDirs, logs, logDirFailureChannel)

/* a throttle used to limit the I/O of all the cleaner threads to a user-specified maximum rate */
private[log] val throttler = new Throttler(desiredRatePerSec = config.maxIoBytesPerSecond,
checkIntervalMs = 300,
throttleDown = true,
"cleaner-io",
"bytes",
time = time)
private[log] val throttler = new Throttler(config.maxIoBytesPerSecond, 300, "cleaner-io", "bytes", time)

private[log] val cleaners = mutable.ArrayBuffer[CleanerThread]()

Expand Down
111 changes: 0 additions & 111 deletions core/src/main/scala/kafka/utils/Throttler.scala

This file was deleted.

5 changes: 3 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package kafka.log

import kafka.common._
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.utils._
import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
Expand All @@ -30,6 +30,7 @@ import org.apache.kafka.coordinator.transaction.TransactionLogConfigs
import org.apache.kafka.server.metrics.KafkaMetricsGroup
import org.apache.kafka.server.util.MockTime
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, CleanerConfig, LogAppendInfo, LogConfig, LogDirFailureChannel, LogFileUtils, LogSegment, LogSegments, LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, ProducerStateManagerConfig}
import org.apache.kafka.storage.internals.utils.Throttler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
import org.mockito.ArgumentMatchers
Expand Down Expand Up @@ -59,7 +60,7 @@ class LogCleanerTest extends Logging {
logProps.put(TopicConfig.CLEANUP_POLICY_CONFIG, TopicConfig.CLEANUP_POLICY_COMPACT)
val logConfig = new LogConfig(logProps)
val time = new MockTime()
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
val throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", time)
val tombstoneRetentionMs = 86400000
val largeTimestamp = Long.MaxValue - tombstoneRetentionMs - 1
val producerStateManagerConfig = new ProducerStateManagerConfig(TransactionLogConfigs.PRODUCER_ID_EXPIRATION_MS_DEFAULT, false)
Expand Down
5 changes: 3 additions & 2 deletions core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package kafka.log
import kafka.common.{OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
import kafka.log.remote.RemoteLogManager
import kafka.server.{BrokerTopicStats, KafkaConfig}
import kafka.utils._
import kafka.utils.TestUtils
import org.apache.kafka.common.compress.Compression
import org.apache.kafka.common.config.TopicConfig
import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
Expand All @@ -39,6 +39,7 @@ import org.apache.kafka.server.util.{KafkaScheduler, MockTime, Scheduler}
import org.apache.kafka.storage.internals.checkpoint.{LeaderEpochCheckpointFile, PartitionMetadataFile}
import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache
import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, EpochEntry, FetchIsolation, LogConfig, LogFileUtils, LogOffsetMetadata, LogOffsetSnapshot, LogOffsetsListener, LogSegment, LogSegments, LogStartOffsetIncrementReason, ProducerStateManager, ProducerStateManagerConfig, RecordValidationException, VerificationGuard}
import org.apache.kafka.storage.internals.utils.Throttler
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.junit.jupiter.params.ParameterizedTest
Expand Down Expand Up @@ -1046,7 +1047,7 @@ class UnifiedLogTest {
ioBufferSize = 64 * 1024,
maxIoBufferSize = 64 * 1024,
dupBufferLoadFactor = 0.75,
throttler = new Throttler(Double.MaxValue, Long.MaxValue, false, time = mockTime),
throttler = new Throttler(Double.MaxValue, Long.MaxValue, "throttler", "entries", mockTime),
time = mockTime,
checkDone = _ => {})

Expand Down
108 changes: 0 additions & 108 deletions core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala

This file was deleted.

8 changes: 0 additions & 8 deletions gradle/spotbugs-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -490,14 +490,6 @@ For a detailed description of spotbugs bug categories, see https://spotbugs.read
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH"/>
</Match>

<Match>
<!-- Suppress a warning about an intentional infinite loop. -->
<Package name="kafka.utils"/>
<Source name="Throttler.scala"/>
<Method name="main"/>
<Bug pattern="IL_INFINITE_LOOP"/>
</Match>

<Match>
<!-- Suppress a spurious warning about calling notify without modifying
other state under the monitor. -->
Expand Down
Loading

0 comments on commit 3a72cb4

Please sign in to comment.