diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java index 6dccf789d1c0..456d6ecb1420 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metrics/AbstractMetrics.java @@ -337,6 +337,12 @@ public PinotMeter getMeteredTableValue(final String tableName, final M meter) { return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS); } + public PinotMeter getMeteredValue(final M meter) { + final PinotMetricName metricName = + PinotMetricUtils.makePinotMetricName(_clazz, _metricPrefix + meter.getMeterName()); + return PinotMetricUtils.makePinotMeter(_metricsRegistry, metricName, meter.getUnit(), TimeUnit.SECONDS); + } + private String getTableFullMeterName(final String tableName, final M meter) { String meterName = meter.getMeterName(); return _metricPrefix + getTableName(tableName) + "." + meterName; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java index bc5d16e915dd..c866ce26543a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java @@ -16,7 +16,6 @@ * specific language governing permissions and limitations * under the License. */ - package org.apache.pinot.core.data.manager.realtime; import com.google.common.annotations.VisibleForTesting; @@ -31,19 +30,26 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.metrics.ServerGauge; +import org.apache.pinot.common.metrics.ServerMeter; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactory; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; import org.apache.pinot.spi.stream.StreamMetadataProvider; +import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * This class is responsible for creating realtime consumption rate limiters. The rate limit, specified in - * StreamConfig of table config, is for the entire topic. The effective rate limit for each partition is simply the - * specified rate limit divided by the partition count. + * This class is responsible for creating realtime consumption rate limiters. + * It contains one rate limiter for the entire server and multiple table partition level rate limiters. + * Server rate limiter is used to throttle the overall consumption rate of the server and configured via + * cluster or server config. + * For table partition level rate limiter, the rate limit value specified in StreamConfig of table config, is for the + * entire topic. The effective rate limit for each partition is simply the specified rate limit divided by the + * partition count. * This class leverages a cache for storing partition count for different topics as retrieving partition count from * stream is a bit expensive and also the same count will be used of all partition consumers of the same topic. */ @@ -51,6 +57,10 @@ public class RealtimeConsumptionRateManager { private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeConsumptionRateManager.class); private static final int CACHE_ENTRY_EXPIRATION_TIME_IN_MINUTES = 10; + private static final String SERVER_CONSUMPTION_RATE_METRIC_KEY_NAME = + ServerMeter.REALTIME_ROWS_CONSUMED.getMeterName(); + private ConsumptionRateLimiter _serverRateLimiter = NOOP_RATE_LIMITER; + // stream config object is required for fetching the partition count from the stream private final LoadingCache _streamConfigToTopicPartitionCountMap; private volatile boolean _isThrottlingAllowed = false; @@ -73,9 +83,28 @@ public void enableThrottling() { _isThrottlingAllowed = true; } + public ConsumptionRateLimiter createServerRateLimiter(PinotConfiguration serverConfig, ServerMetrics serverMetrics) { + double serverRateLimit = + serverConfig.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT, + CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT); + if (serverRateLimit <= 0) { + LOGGER.warn("Invalid server consumption rate limit: {}, throttling is disabled", serverRateLimit); + _serverRateLimiter = NOOP_RATE_LIMITER; + } else { + LOGGER.info("A server consumption rate limiter is set up with rate limit: {}", serverRateLimit); + MetricEmitter metricEmitter = new MetricEmitter(serverMetrics, SERVER_CONSUMPTION_RATE_METRIC_KEY_NAME); + _serverRateLimiter = new RateLimiterImpl(serverRateLimit, metricEmitter); + } + return _serverRateLimiter; + } + + public ConsumptionRateLimiter getServerRateLimiter() { + return _serverRateLimiter; + } + public ConsumptionRateLimiter createRateLimiter(StreamConfig streamConfig, String tableName, ServerMetrics serverMetrics, String metricKeyName) { - if (!streamConfig.getTopicConsumptionRateLimit().isPresent()) { + if (streamConfig.getTopicConsumptionRateLimit().isEmpty()) { return NOOP_RATE_LIMITER; } int partitionCount; diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index 282f3c72a104..d64e85fadae5 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -298,7 +298,8 @@ public void deleteSegmentFile() { private final boolean _isOffHeap; private final boolean _nullHandlingEnabled; private final SegmentCommitterFactory _segmentCommitterFactory; - private final ConsumptionRateLimiter _rateLimiter; + private final ConsumptionRateLimiter _partitionRateLimiter; + private final ConsumptionRateLimiter _serverRateLimiter; private final StreamPartitionMsgOffset _latestStreamOffsetAtStartupTime; private final CompletionMode _segmentCompletionMode; @@ -516,7 +517,8 @@ protected boolean consumeLoop() */ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePipeSleepTimeMillis) { int messageCount = messagesAndOffsets.getMessageCount(); - _rateLimiter.throttle(messageCount); + _partitionRateLimiter.throttle(messageCount); + _serverRateLimiter.throttle(messageCount); PinotMeter realtimeRowsConsumedMeter = null; PinotMeter realtimeRowsDroppedMeter = null; @@ -605,6 +607,7 @@ private boolean processStreamEvents(MessageBatch messagesAndOffsets, long idlePi realtimeRowsConsumedMeter = _serverMetrics.addMeteredTableValue(_clientId, ServerMeter.REALTIME_ROWS_CONSUMED, 1, realtimeRowsConsumedMeter); + _serverMetrics.addMeteredGlobalValue(ServerMeter.REALTIME_ROWS_CONSUMED, 1L); } catch (Exception e) { _numRowsErrored++; String errorMessage = String.format("Caught exception while indexing the record: %s", transformedRow); @@ -1395,8 +1398,9 @@ public RealtimeSegmentDataManager(SegmentZKMetadata segmentZKMetadata, TableConf _memoryManager = new DirectMemoryManager(_segmentNameStr, _serverMetrics); } - _rateLimiter = RealtimeConsumptionRateManager.getInstance() + _partitionRateLimiter = RealtimeConsumptionRateManager.getInstance() .createRateLimiter(_streamConfig, _tableNameWithType, _serverMetrics, _clientId); + _serverRateLimiter = RealtimeConsumptionRateManager.getInstance().getServerRateLimiter(); List sortedColumns = indexLoadingConfig.getSortedColumns(); String sortedColumn; diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java index 21c58f9afa2e..325066fa1622 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManagerTest.java @@ -27,14 +27,12 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.spi.env.PinotConfiguration; import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.CommonConstants; import org.testng.annotations.Test; import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.*; -import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.ConsumptionRateLimiter; -import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.MetricEmitter; -import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.NOOP_RATE_LIMITER; -import static org.apache.pinot.core.data.manager.realtime.RealtimeConsumptionRateManager.RateLimiterImpl; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -51,6 +49,10 @@ public class RealtimeConsumptionRateManagerTest { private static final StreamConfig STREAM_CONFIG_A = mock(StreamConfig.class); private static final StreamConfig STREAM_CONFIG_B = mock(StreamConfig.class); private static final StreamConfig STREAM_CONFIG_C = mock(StreamConfig.class); + private static final PinotConfiguration SERVER_CONFIG_1 = mock(PinotConfiguration.class); + private static final PinotConfiguration SERVER_CONFIG_2 = mock(PinotConfiguration.class); + private static final PinotConfiguration SERVER_CONFIG_3 = mock(PinotConfiguration.class); + private static final PinotConfiguration SERVER_CONFIG_4 = mock(PinotConfiguration.class); private static RealtimeConsumptionRateManager _consumptionRateManager; static { @@ -65,6 +67,15 @@ public class RealtimeConsumptionRateManagerTest { when(STREAM_CONFIG_B.getTopicConsumptionRateLimit()).thenReturn(Optional.of(RATE_LIMIT_FOR_ENTIRE_TOPIC)); when(STREAM_CONFIG_C.getTopicConsumptionRateLimit()).thenReturn(Optional.empty()); _consumptionRateManager = new RealtimeConsumptionRateManager(cache); + + when(SERVER_CONFIG_1.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT, + CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(5.0); + when(SERVER_CONFIG_2.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT, + CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(2.5); + when(SERVER_CONFIG_3.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT, + CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(0.0); + when(SERVER_CONFIG_4.getProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT, + CommonConstants.Server.DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT)).thenReturn(-1.0); } @Test @@ -83,7 +94,27 @@ public void testCreateRateLimiter() { } @Test - public void testBuildCache() throws Exception { + public void testCreateServerRateLimiter() { + // Server config 1 + ConsumptionRateLimiter rateLimiter = _consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_1, null); + assertEquals(5.0, ((RateLimiterImpl) rateLimiter).getRate(), DELTA); + + // Server config 2 + rateLimiter = _consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_2, null); + assertEquals(2.5, ((RateLimiterImpl) rateLimiter).getRate(), DELTA); + + // Server config 3 + rateLimiter = _consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_3, null); + assertEquals(rateLimiter, NOOP_RATE_LIMITER); + + // Server config 4 + rateLimiter = _consumptionRateManager.createServerRateLimiter(SERVER_CONFIG_4, null); + assertEquals(rateLimiter, NOOP_RATE_LIMITER); + } + + @Test + public void testBuildCache() + throws Exception { PartitionCountFetcher partitionCountFetcher = mock(PartitionCountFetcher.class); LoadingCache cache = buildCache(partitionCountFetcher, 500, TimeUnit.MILLISECONDS); when(partitionCountFetcher.fetch(STREAM_CONFIG_A)).thenReturn(10); @@ -150,21 +181,21 @@ public void testMetricEmitter() { now = Clock.fixed(Instant.parse("2022-08-10T12:01:05Z"), ZoneOffset.UTC).instant(); int sumOfMsgsInPrevMinute = sum(numMsgs); int expectedRatio = calcExpectedRatio(rateLimitInMinutes, sumOfMsgsInPrevMinute); - numMsgs = new int[] {35}; + numMsgs = new int[]{35}; assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), expectedRatio); // 3rd minute now = Clock.fixed(Instant.parse("2022-08-10T12:02:25Z"), ZoneOffset.UTC).instant(); sumOfMsgsInPrevMinute = sum(numMsgs); expectedRatio = calcExpectedRatio(rateLimitInMinutes, sumOfMsgsInPrevMinute); - numMsgs = new int[] {0}; + numMsgs = new int[]{0}; assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), expectedRatio); // 4th minute now = Clock.fixed(Instant.parse("2022-08-10T12:03:15Z"), ZoneOffset.UTC).instant(); sumOfMsgsInPrevMinute = sum(numMsgs); expectedRatio = calcExpectedRatio(rateLimitInMinutes, sumOfMsgsInPrevMinute); - numMsgs = new int[] {10, 20}; + numMsgs = new int[]{10, 20}; assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), expectedRatio); now = Clock.fixed(Instant.parse("2022-08-10T12:03:20Z"), ZoneOffset.UTC).instant(); assertEquals(metricEmitter.emitMetric(numMsgs[1], rateLimit, now), expectedRatio); @@ -173,7 +204,7 @@ public void testMetricEmitter() { now = Clock.fixed(Instant.parse("2022-08-10T12:04:30Z"), ZoneOffset.UTC).instant(); sumOfMsgsInPrevMinute = sum(numMsgs); expectedRatio = calcExpectedRatio(rateLimitInMinutes, sumOfMsgsInPrevMinute); - numMsgs = new int[] {5}; + numMsgs = new int[]{5}; assertEquals(metricEmitter.emitMetric(numMsgs[0], rateLimit, now), expectedRatio); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.java new file mode 100644 index 000000000000..ca1253a0f60f --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeConsumptionRateLimiterClusterIntegrationTest.java @@ -0,0 +1,298 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import java.io.File; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Random; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.common.metrics.ServerMeter; +import org.apache.pinot.common.metrics.ServerMetrics; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.config.table.ingestion.IngestionConfig; +import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.metrics.PinotMeter; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.ReadMode; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.util.TestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + + +public class RealtimeConsumptionRateLimiterClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest { + private static final Logger LOGGER = + LoggerFactory.getLogger(RealtimeConsumptionRateLimiterClusterIntegrationTest.class); + + private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test"; + private static final long RANDOM_SEED = System.currentTimeMillis(); + private static final Random RANDOM = new Random(RANDOM_SEED); + private static final double SERVER_RATE_LIMIT = 100; + + private final boolean _isDirectAlloc = RANDOM.nextBoolean(); + private final boolean _isConsumerDirConfigured = RANDOM.nextBoolean(); + private final boolean _enableLeadControllerResource = RANDOM.nextBoolean(); + private List _avroFiles; + + @Override + protected String getLoadMode() { + return ReadMode.mmap.name(); + } + + @Override + public void startController() + throws Exception { + super.startController(); + enableResourceConfigForLeadControllerResource(_enableLeadControllerResource); + } + + @Override + protected void overrideServerConf(PinotConfiguration configuration) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_ALLOCATION, true); + configuration.setProperty(CommonConstants.Server.CONFIG_OF_REALTIME_OFFHEAP_DIRECT_ALLOCATION, _isDirectAlloc); + if (_isConsumerDirConfigured) { + configuration.setProperty(CommonConstants.Server.CONFIG_OF_CONSUMER_DIR, CONSUMER_DIRECTORY); + } + configuration.setProperty(CommonConstants.Server.CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT, SERVER_RATE_LIMIT); + } + + @Override + protected IngestionConfig getIngestionConfig() { + IngestionConfig ingestionConfig = new IngestionConfig(); + ingestionConfig.setStreamIngestionConfig( + new StreamIngestionConfig(Collections.singletonList(getStreamConfigMap()))); + return ingestionConfig; + } + + @Override + protected long getCountStarResult() { + // all the data that was ingested from Kafka also got uploaded via the controller's upload endpoint + return super.getCountStarResult() * 2; + } + + @BeforeClass + @Override + public void setUp() + throws Exception { + // Remove the consumer directory + FileUtils.deleteQuietly(new File(CONSUMER_DIRECTORY)); + + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir); + + // Start the Pinot cluster + startZk(); + startController(); + startBroker(); + startServer(); + + // Start Kafka + startKafka(); + + // Unpack the Avro files + _avroFiles = unpackAvroData(_tempDir); + + // Push data into Kafka + pushAvroIntoKafka(_avroFiles); + } + + @AfterClass + @Override + public void tearDown() + throws Exception { + FileUtils.deleteDirectory(new File(CONSUMER_DIRECTORY)); + stopServer(); + stopBroker(); + stopController(); + stopKafka(); + stopZk(); + FileUtils.deleteDirectory(_tempDir); + } + + @Test + public void testOneTableRateLimit() + throws Exception { + String tableName = getTableName(); + try { + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + long startTime = System.currentTimeMillis(); + TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0)); + addTableConfig(tableConfig); + for (int i = 0; i < 60; i++) { + if (!isTableLoaded(tableName)) { + Thread.sleep(1000L); + } else { + break; + } + } + PinotMeter realtimeRowConsumedMeter = ServerMetrics.get().getMeteredValue(ServerMeter.REALTIME_ROWS_CONSUMED); + long startCount = getCurrentCountStarResult(tableName); + for (int i = 1; i <= 10; i++) { + Thread.sleep(1000L); + long currentCount = getCurrentCountStarResult(tableName); + double currentRate = (currentCount - startCount) / (double) (System.currentTimeMillis() - startTime) * 1000; + LOGGER.info("Second = " + i + ", realtimeRowConsumedMeter = " + realtimeRowConsumedMeter.oneMinuteRate() + + ", currentCount = " + currentCount + ", currentRate = " + currentRate); + Assert.assertTrue(realtimeRowConsumedMeter.oneMinuteRate() < SERVER_RATE_LIMIT, + "Rate should be less than " + SERVER_RATE_LIMIT); + Assert.assertTrue(currentRate < SERVER_RATE_LIMIT * 1.5, // Put some leeway for the rate calculation + "Rate should be less than " + SERVER_RATE_LIMIT); + } + } finally { + dropRealtimeTable(tableName); + waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(tableName)); + } + } + + @Test + public void testTwoTableRateLimit() + throws Exception { + String tableName1 = "testTable1"; + String tableName2 = "testTable2"; + + try { + // Create and upload the schema and table config + Schema schema1 = createSchema(); + schema1.setSchemaName("testTable1"); + addSchema(schema1); + Schema schema2 = createSchema(); + schema2.setSchemaName("testTable2"); + addSchema(schema2); + long startTime = System.currentTimeMillis(); + + TableConfig tableConfig1 = createRealtimeTableConfig(tableName1); + addTableConfig(tableConfig1); + TableConfig tableConfig2 = createRealtimeTableConfig(tableName2); + addTableConfig(tableConfig2); + for (int i = 0; i < 60; i++) { + if (!isTableLoaded(tableName1) || !isTableLoaded(tableName2)) { + Thread.sleep(1000L); + } else { + break; + } + } + + PinotMeter serverRowConsumedMeter = ServerMetrics.get().getMeteredValue(ServerMeter.REALTIME_ROWS_CONSUMED); + long startCount1 = getCurrentCountStarResult(tableName1); + long startCount2 = getCurrentCountStarResult(tableName2); + for (int i = 1; i <= 10; i++) { + Thread.sleep(1000L); + long currentCount1 = getCurrentCountStarResult(tableName1); + long currentCount2 = getCurrentCountStarResult(tableName2); + long currentServerCount = currentCount1 + currentCount2; + long currentTimeMillis = System.currentTimeMillis(); + double currentRate1 = (currentCount1 - startCount1) / (double) (currentTimeMillis - startTime) * 1000; + double currentRate2 = (currentCount2 - startCount2) / (double) (currentTimeMillis - startTime) * 1000; + double currentServerRate = currentRate1 + currentRate2; + LOGGER.info("Second = " + i + ", serverRowConsumedMeter = " + serverRowConsumedMeter.oneMinuteRate() + + ", currentCount1 = " + currentCount1 + ", currentRate1 = " + currentRate1 + + ", currentCount2 = " + currentCount2 + ", currentRate2 = " + currentRate2 + + ", currentServerCount = " + currentServerCount + ", currentServerRate = " + currentServerRate + ); + + Assert.assertTrue(serverRowConsumedMeter.oneMinuteRate() < SERVER_RATE_LIMIT, + "Rate should be less than " + SERVER_RATE_LIMIT + ", serverOneMinuteRate = " + serverRowConsumedMeter + .oneMinuteRate()); + Assert.assertTrue(currentServerRate < SERVER_RATE_LIMIT * 1.5, + // Put some leeway for the rate calculation + "Whole table ingestion rate should be less than " + SERVER_RATE_LIMIT + ", currentRate1 = " + currentRate1 + + ", currentRate2 = " + currentRate2 + ", currentServerRate = " + currentServerRate); + } + } finally { + dropRealtimeTable(tableName1); + waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(tableName1)); + dropRealtimeTable(tableName2); + waitForTableDataManagerRemoved(TableNameBuilder.REALTIME.tableNameWithType(tableName2)); + } + } + + protected TableConfig createRealtimeTableConfig() { + return createRealtimeTableConfig(getTableName()); + } + + protected TableConfig createRealtimeTableConfig(String tableName) { + return new TableConfigBuilder(TableType.REALTIME).setTableName(tableName) + .setTimeColumnName(getTimeColumnName()).setSortedColumn(getSortedColumn()) + .setInvertedIndexColumns(getInvertedIndexColumns()).setNoDictionaryColumns(getNoDictionaryColumns()) + .setRangeIndexColumns(getRangeIndexColumns()).setBloomFilterColumns(getBloomFilterColumns()) + .setFieldConfigList(getFieldConfigs()).setNumReplicas(getNumReplicas()).setSegmentVersion(getSegmentVersion()) + .setLoadMode(getLoadMode()).setTaskConfig(getTaskConfig()).setBrokerTenant(getBrokerTenant()) + .setServerTenant(getServerTenant()).setIngestionConfig(getIngestionConfig()).setQueryConfig(getQueryConfig()) + .setStreamConfigs(getStreamConfigs()).setNullHandlingEnabled(getNullHandlingEnabled()).build(); + } + + private boolean isTableLoaded(String tableName) { + try { + return getCurrentCountStarResult(tableName) > 0; + } catch (Exception e) { + return false; + } + } + + @Override + protected Map getStreamConfigs() { + return null; + } + + @Test(enabled = false) + public void testDictionaryBasedQueries(boolean useMultiStageQueryEngine) { + // Do nothing + } + + @Test(enabled = false) + public void testGeneratedQueries(boolean useMultiStageQueryEngine) { + // Do nothing + } + + @Test(enabled = false) + public void testHardcodedQueries(boolean useMultiStageQueryEngine) { + // Do nothing + } + + @Test(enabled = false) + public void testInstanceShutdown() { + // Do nothing + } + + @Test(enabled = false) + public void testQueriesFromQueryFile(boolean useMultiStageQueryEngine) { + // Do nothing + } + + @Test(enabled = false) + public void testQueryExceptions(boolean useMultiStageQueryEngine) { + // Do nothing + } + + @Test(enabled = false) + public void testHardcodedServerPartitionedSqlQueries() { + // Do nothing + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 2b771707d291..10d9e6bec1e8 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -205,7 +205,6 @@ public void init(PinotConfiguration serverConf) ThreadResourceUsageProvider.setThreadMemoryMeasurementEnabled( _serverConf.getProperty(Server.CONFIG_OF_ENABLE_THREAD_ALLOCATED_BYTES_MEASUREMENT, Server.DEFAULT_THREAD_ALLOCATED_BYTES_MEASUREMENT)); - // Set data table version send to broker. int dataTableVersion = _serverConf.getProperty(Server.CONFIG_OF_CURRENT_DATA_TABLE_VERSION, DataTableBuilderFactory.DEFAULT_VERSION); @@ -573,6 +572,10 @@ public void start() ServerConf serverConf = new ServerConf(_serverConf); _serverInstance = new ServerInstance(serverConf, _helixManager, accessControlFactory); ServerMetrics serverMetrics = _serverInstance.getServerMetrics(); + + // Enable Server level realtime ingestion rate limier + RealtimeConsumptionRateManager.getInstance().createServerRateLimiter(_serverConf, serverMetrics); + InstanceDataManager instanceDataManager = _serverInstance.getInstanceDataManager(); instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> _isServerReadyToServeQueries); // initialize the thread accountant for query killing diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 1ba40c9003a2..190de271514d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -619,6 +619,11 @@ public static class Server { // This is also the default in the case a user misconfigures this by setting to <= 0. public static final int DEFAULT_STARTUP_REALTIME_MIN_FRESHNESS_MS = 10000; + // Config for realtime consumption message rate limit + public static final String CONFIG_OF_SERVER_CONSUMPTION_RATE_LIMIT = "pinot.server.consumption.rate.limit"; + // Default to 0.0 (no limit) + public static final double DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT = 0.0; + public static final String DEFAULT_READ_MODE = "mmap"; // Whether to reload consuming segment on scheme update public static final boolean DEFAULT_RELOAD_CONSUMING_SEGMENT = true;