Skip to content

Commit

Permalink
Support server level consumption throttle (apache#12292)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored Jan 24, 2024
1 parent 91ffcc7 commit f43664d
Show file tree
Hide file tree
Showing 7 changed files with 394 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,12 @@ public PinotMeter getMeteredTableValue(final String tableName, final M meter) {
return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
}

public PinotMeter getMeteredValue(final M meter) {
final PinotMetricName metricName =
PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + meter.getMeterName());
return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS);
}

private String getTableFullMeterName(final String tableName, final M meter) {
String meterName = meter.getMeterName();
return _metricPrefix + getTableName(tableName) + "." + meterName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.pinot.core.data.manager.realtime;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -31,26 +30,37 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMeter;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


/**
* This class is responsible for creating realtime consumption rate limiters. The rate limit, specified in
* StreamConfig of table config, is for the entire topic. The effective rate limit for each partition is simply the
* specified rate limit divided by the partition count.
* This class is responsible for creating realtime consumption rate limiters.
* It contains one rate limiter for the entire server and multiple table partition level rate limiters.
* Server rate limiter is used to throttle the overall consumption rate of the server and configured via
* cluster or server config.
* For table partition level rate limiter, the rate limit value specified in StreamConfig of table config, is for the
* entire topic. The effective rate limit for each partition is simply the specified rate limit divided by the
* partition count.
* This class leverages a cache for storing partition count for different topics as retrieving partition count from
* stream is a bit expensive and also the same count will be used of all partition consumers of the same topic.
*/
public class RealtimeConsumptionRateManager {
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumptionRateManager.class);
private static final int CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES = 10;

private static final String SERVER_CONSUMPTION_RATE_METRIC_KEY_NAME =
ServerMeter.REALTIME_ROWS_CONSUMED.getMeterName();
private ConsumptionRateLimiter _serverRateLimiter = NOOP_RATE_LIMITER;

// stream config object is required for fetching the partition count from the stream
private final LoadingCache<StreamConfig, Integer> _streamConfigToTopicPartitionCountMap;
private volatile boolean _isThrottlingAllowed = false;
Expand All @@ -73,9 +83,28 @@ public void enableThrottling() {
_isThrottlingAllowed = true;
}

public ConsumptionRateLimiter createServerRateLimiter(PinotConfiguration serverConfig, ServerMetrics serverMetrics) {
double serverRateLimit =
serverConfig.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT);
if (serverRateLimit <= 0) {
LOGGER.warn("Invalid server consumption rate limit: {}, throttling is disabled", serverRateLimit);
_serverRateLimiter = NOOP_RATE_LIMITER;
} else {
LOGGER.info("A server consumption rate limiter is set up with rate limit: {}", serverRateLimit);
MetricEmitter metricEmitter = new MetricEmitter(serverMetrics, SERVER_CONSUMPTION_RATE_METRIC_KEY_NAME);
_serverRateLimiter = new RateLimiterImpl(serverRateLimit, metricEmitter);
}
return _serverRateLimiter;
}

public ConsumptionRateLimiter getServerRateLimiter() {
return _serverRateLimiter;
}

public ConsumptionRateLimiter createRateLimiter(StreamConfig streamConfig, String tableName,
ServerMetrics serverMetrics, String metricKeyName) {
if (!streamConfig.getTopicConsumptionRateLimit().isPresent()) {
if (streamConfig.getTopicConsumptionRateLimit().isEmpty()) {
return NOOP_RATE_LIMITER;
}
int partitionCount;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,8 @@ public void deleteSegmentFile() {
private final boolean _isOffHeap;
private final boolean _nullHandlingEnabled;
private final SegmentCommitterFactory _segmentCommitterFactory;
private final ConsumptionRateLimiter _rateLimiter;
private final ConsumptionRateLimiter _partitionRateLimiter;
private final ConsumptionRateLimiter _serverRateLimiter;

private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime;
private final CompletionMode _segmentCompletionMode;
Expand Down Expand Up @@ -516,7 +517,8 @@ protected boolean consumeLoop()
*/
private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeSleepTimeMillis) {
int messageCount = messagesAndOffsets.getMessageCount();
_rateLimiter.throttle(messageCount);
_partitionRateLimiter.throttle(messageCount);
_serverRateLimiter.throttle(messageCount);

PinotMeter realtimeRowsConsumedMeter = null;
PinotMeter realtimeRowsDroppedMeter = null;
Expand Down Expand Up @@ -605,6 +607,7 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi
realtimeRowsConsumedMeter =
_serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_CONSUMED, 1,
realtimeRowsConsumedMeter);
_serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L);
} catch (Exception e) {
_numRowsErrored++;
String errorMessage = String.format("Caught exception while indexing the record: %s", transformedRow);
Expand Down Expand Up @@ -1395,8 +1398,9 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf
_memoryManager = new DirectMemoryManager(_segmentNameStr, _serverMetrics);
}

_rateLimiter = RealtimeConsumptionRateManager.getInstance()
_partitionRateLimiter = RealtimeConsumptionRateManager.getInstance()
.createRateLimiter(_streamConfig, _tableNameWithType, _serverMetrics, _clientId);
_serverRateLimiter = RealtimeConsumptionRateManager.getInstance().getServerRateLimiter();

List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
String sortedColumn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.CommonConstants;
import org.testng.annotations.Test;

import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.*;
import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter;
import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.MetricEmitter;
import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.NOOP_RATE_LIMITER;
import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.RateLimiterImpl;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
Expand All @@ -51,6 +49,10 @@ public class RealtimeConsumptionRateManagerTest {
private static final StreamConfig STREAM_CONFIG_A = mock(StreamConfig.class);
private static final StreamConfig STREAM_CONFIG_B = mock(StreamConfig.class);
private static final StreamConfig STREAM_CONFIG_C = mock(StreamConfig.class);
private static final PinotConfiguration SERVER_CONFIG_1 = mock(PinotConfiguration.class);
private static final PinotConfiguration SERVER_CONFIG_2 = mock(PinotConfiguration.class);
private static final PinotConfiguration SERVER_CONFIG_3 = mock(PinotConfiguration.class);
private static final PinotConfiguration SERVER_CONFIG_4 = mock(PinotConfiguration.class);
private static RealtimeConsumptionRateManager _consumptionRateManager;

static {
Expand All @@ -65,6 +67,15 @@ public class RealtimeConsumptionRateManagerTest {
when(STREAM_CONFIG_B.getTopicConsumptionRateLimit()).thenReturn(Optional.of(RATE_LIMIT_FOR_ENTIRE_TOPIC));
when(STREAM_CONFIG_C.getTopicConsumptionRateLimit()).thenReturn(Optional.empty());
_consumptionRateManager = new RealtimeConsumptionRateManager(cache);

when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0);
when(SERVER_CONFIG_2.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(2.5);
when(SERVER_CONFIG_3.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(0.0);
when(SERVER_CONFIG_4.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT,
CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(-1.0);
}

@Test
Expand All @@ -83,7 +94,27 @@ public void testCreateRateLimiter() {
}

@Test
public void testBuildCache() throws Exception {
public void testCreateServerRateLimiter() {
// Server config 1
ConsumptionRateLimiter rateLimiter = _consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_1, null);
assertEquals(5.0, ((RateLimiterImpl) rateLimiter).getRate(), DELTA);

// Server config 2
rateLimiter = _consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_2, null);
assertEquals(2.5, ((RateLimiterImpl) rateLimiter).getRate(), DELTA);

// Server config 3
rateLimiter = _consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_3, null);
assertEquals(rateLimiter, NOOP_RATE_LIMITER);

// Server config 4
rateLimiter = _consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_4, null);
assertEquals(rateLimiter, NOOP_RATE_LIMITER);
}

@Test
public void testBuildCache()
throws Exception {
PartitionCountFetcher partitionCountFetcher = mock(PartitionCountFetcher.class);
LoadingCache<StreamConfig, Integer> cache = buildCache(partitionCountFetcher, 500, TimeUnit.MILLISECONDS);
when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(10);
Expand Down Expand Up @@ -150,21 +181,21 @@ public void testMetricEmitter() {
now = Clock.fixed(Instant.parse("2022-08-10T12:01:05Z"), ZoneOffset.UTC).instant();
int sumOfMsgsInPrevMinute = sum(numMsgs);
int expectedRatio = calcExpectedRatio(rateLimitInMinutes, sumOfMsgsInPrevMinute);
numMsgs = new int[] {35};
numMsgs = new int[]{35};
assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), expectedRatio);

// 3rd minute
now = Clock.fixed(Instant.parse("2022-08-10T12:02:25Z"), ZoneOffset.UTC).instant();
sumOfMsgsInPrevMinute = sum(numMsgs);
expectedRatio = calcExpectedRatio(rateLimitInMinutes, sumOfMsgsInPrevMinute);
numMsgs = new int[] {0};
numMsgs = new int[]{0};
assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), expectedRatio);

// 4th minute
now = Clock.fixed(Instant.parse("2022-08-10T12:03:15Z"), ZoneOffset.UTC).instant();
sumOfMsgsInPrevMinute = sum(numMsgs);
expectedRatio = calcExpectedRatio(rateLimitInMinutes, sumOfMsgsInPrevMinute);
numMsgs = new int[] {10, 20};
numMsgs = new int[]{10, 20};
assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), expectedRatio);
now = Clock.fixed(Instant.parse("2022-08-10T12:03:20Z"), ZoneOffset.UTC).instant();
assertEquals(metricEmitter.emitMetric(numMsgs[1], rateLimit, now), expectedRatio);
Expand All @@ -173,7 +204,7 @@ public void testMetricEmitter() {
now = Clock.fixed(Instant.parse("2022-08-10T12:04:30Z"), ZoneOffset.UTC).instant();
sumOfMsgsInPrevMinute = sum(numMsgs);
expectedRatio = calcExpectedRatio(rateLimitInMinutes, sumOfMsgsInPrevMinute);
numMsgs = new int[] {5};
numMsgs = new int[]{5};
assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), expectedRatio);
}

Expand Down
Loading

0 comments on commit f43664d

Please sign in to comment.