From e49842b5e9b4a385732ddd1c6a81c99620dd9f5b Mon Sep 17 00:00:00 2001 From: "Xiaotian (Jackie) Jiang" Date: Fri, 21 Oct 2022 23:35:00 -0700 Subject: [PATCH] Support reloading consuming segment using force commit --- .../realtime/RealtimeTableDataManager.java | 2 +- .../tests/BaseClusterIntegrationTest.java | 2 +- ...ggregateMetricsClusterIntegrationTest.java | 6 -- ...> BaseRealtimeClusterIntegrationTest.java} | 2 +- ...ontrollerPeriodicTasksIntegrationTest.java | 5 -- .../tests/DedupIntegrationTest.java | 5 -- ...ceKafkaRealtimeClusterIntegrationTest.java | 7 +- ...merHLCRealtimeClusterIntegrationTest.java} | 2 +- .../HLCRealtimeClusterIntegrationTest.java | 30 ++++++++ .../tests/HybridClusterIntegrationTest.java | 32 +++------ ...DecoderRealtimeClusterIntegrationTest.java | 7 +- .../LLCRealtimeClusterIntegrationTest.java | 31 ++++---- ...loadLLCRealtimeClusterIntegrationTest.java | 7 +- ...eSegmentsMinionClusterIntegrationTest.java | 7 +- .../SegmentCompletionIntegrationTest.java | 5 -- ...tionLLCRealtimeClusterIntegrationTest.java | 5 -- ...IndicesRealtimeClusterIntegrationTest.java | 6 -- .../integration/tests/TlsIntegrationTest.java | 5 -- ...sertTableSegmentUploadIntegrationTest.java | 5 -- .../tests/UrlAuthRealtimeIntegrationTest.java | 5 -- .../data/manager/SegmentDataManager.java | 6 +- .../mutable/MutableSegmentImpl.java | 45 +++--------- .../VirtualColumnProviderFactory.java | 8 +-- .../helix/HelixInstanceDataManager.java | 56 ++++++--------- .../helix/HelixInstanceDataManagerTest.java | 70 ------------------- 25 files changed, 94 insertions(+), 267 deletions(-) rename pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/{RealtimeClusterIntegrationTest.java => BaseRealtimeClusterIntegrationTest.java} (98%) rename pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/{FlakyConsumerRealtimeClusterIntegrationTest.java => FlakyConsumerHLCRealtimeClusterIntegrationTest.java} (97%) create mode 100644 pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HLCRealtimeClusterIntegrationTest.java delete mode 100644 pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java index c8eb0a86c125..d780daaed1bf 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java @@ -381,7 +381,7 @@ public void addSegment(ImmutableSegment immutableSegment) { } // TODO: Change dedup handling to handle segment replacement - if (isDedupEnabled()) { + if (isDedupEnabled() && immutableSegment instanceof ImmutableSegmentImpl) { buildDedupMeta((ImmutableSegmentImpl) immutableSegment); } super.addSegment(immutableSegment); diff --git a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java index b8761200be5b..b133a275514b 100644 --- a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java +++ b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java @@ -133,7 +133,7 @@ protected long getCountStarResult() { } protected boolean useLlc() { - return false; + return true; } protected boolean useKafkaTransaction() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java index 2faa62c27804..9814e36b7d72 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/AggregateMetricsClusterIntegrationTest.java @@ -83,12 +83,6 @@ public void setUp() waitForAllDocsLoaded(600_000L); } - @Override - protected boolean useLlc() { - // NOTE: Aggregate metrics is only available with LLC. - return true; - } - @Override protected void waitForAllDocsLoaded(long timeoutMs) { // NOTE: For aggregate metrics, we need to test the aggregation result instead of the document count because diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java similarity index 98% rename from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java rename to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java index f8791b5b394c..a8cccc73b708 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseRealtimeClusterIntegrationTest.java @@ -37,7 +37,7 @@ /** * Integration test that creates a Kafka broker, creates a Pinot cluster that consumes from Kafka and queries Pinot. */ -public class RealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSet { +public abstract class BaseRealtimeClusterIntegrationTest extends BaseClusterIntegrationTestSet { @BeforeClass public void setUp() diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java index 64a894b52859..46d486becc03 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ControllerPeriodicTasksIntegrationTest.java @@ -79,11 +79,6 @@ protected String getTableName() { return _currentTable; } - @Override - protected boolean useLlc() { - return true; - } - @Override protected int getNumReplicas() { return NUM_REPLICAS; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java index d133c5c02b2e..3cb6003356c8 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DedupIntegrationTest.java @@ -96,11 +96,6 @@ protected String getAvroTarFileName() { return "dedupIngestionTestData.tar.gz"; } - @Override - protected boolean useLlc() { - return true; - } - @Override protected String getPartitionColumn() { return "id"; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java index c90093595dfc..c899c0799855 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExactlyOnceKafkaRealtimeClusterIntegrationTest.java @@ -25,12 +25,7 @@ import org.apache.pinot.spi.utils.ReadMode; -public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest { - - @Override - protected boolean useLlc() { - return true; - } +public class ExactlyOnceKafkaRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest { @Override protected boolean useKafkaTransaction() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerHLCRealtimeClusterIntegrationTest.java similarity index 97% rename from pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java rename to pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerHLCRealtimeClusterIntegrationTest.java index b05244f9c3fe..e8318ab189eb 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/FlakyConsumerHLCRealtimeClusterIntegrationTest.java @@ -33,7 +33,7 @@ /** * Integration test that simulates a flaky Kafka consumer. */ -public class FlakyConsumerRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest { +public class FlakyConsumerHLCRealtimeClusterIntegrationTest extends HLCRealtimeClusterIntegrationTest { @Override protected String getStreamConsumerFactoryClassName() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HLCRealtimeClusterIntegrationTest.java new file mode 100644 index 000000000000..0f5744367711 --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HLCRealtimeClusterIntegrationTest.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +/** + * Integration test for high-level Kafka consumer. + */ +public class HLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest { + + @Override + protected boolean useLlc() { + return false; + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java index 87963c8e845b..422672c9d9d3 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java @@ -135,13 +135,12 @@ public void testSegmentMetadataApi() segmentMetadataFromDirectEndpoint.get("segment.total.docs")); } - // TODO: This test fails when using `llc` consumer mode. Needs investigation @Test public void testSegmentListApi() throws Exception { { - String jsonOutputStr = sendGetRequest( - _controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.OFFLINE.toString())); + String jsonOutputStr = + sendGetRequest(_controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.OFFLINE.toString())); JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr); // There should be one element in the array JsonNode element = array.get(0); @@ -149,37 +148,26 @@ public void testSegmentListApi() Assert.assertEquals(segments.size(), 8); } { - String jsonOutputStr = sendGetRequest( - _controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.REALTIME.toString())); + String jsonOutputStr = + sendGetRequest(_controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.REALTIME.toString())); JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr); // There should be one element in the array JsonNode element = array.get(0); JsonNode segments = element.get("REALTIME"); - Assert.assertEquals(segments.size(), 3); + Assert.assertEquals(segments.size(), 24); } { String jsonOutputStr = sendGetRequest(_controllerRequestURLBuilder.forSegmentListAPI(getTableName())); JsonNode array = JsonUtils.stringToJsonNode(jsonOutputStr); - // there should be 2 elements in the array now. - int realtimeIndex = 0; - int offlineIndex = 1; - JsonNode element = array.get(realtimeIndex); - if (!element.has("REALTIME")) { - realtimeIndex = 1; - offlineIndex = 0; - } - JsonNode offlineElement = array.get(offlineIndex); - JsonNode realtimeElement = array.get(realtimeIndex); - - JsonNode realtimeSegments = realtimeElement.get("REALTIME"); - Assert.assertEquals(realtimeSegments.size(), 3); - - JsonNode offlineSegments = offlineElement.get("OFFLINE"); + JsonNode offlineSegments = array.get(0).get("OFFLINE"); Assert.assertEquals(offlineSegments.size(), 8); + JsonNode realtimeSegments = array.get(1).get("REALTIME"); + Assert.assertEquals(realtimeSegments.size(), 24); } } - @Test + // NOTE: Reload consuming segment will force commit it, so run this test after segment list api test + @Test(dependsOnMethods = "testSegmentListApi") public void testReload() throws Exception { super.testReload(true); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java index 2e021aacde91..93de24252aa4 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest.java @@ -68,7 +68,7 @@ * TODO: Add separate module-level tests and remove the randomness of this test */ public class KafkaConfluentSchemaRegistryAvroMessageDecoderRealtimeClusterIntegrationTest - extends RealtimeClusterIntegrationTest { + extends BaseRealtimeClusterIntegrationTest { private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test"; private static final String TEST_UPDATED_INVERTED_INDEX_QUERY = "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305"; @@ -185,11 +185,6 @@ protected boolean injectTombstones() { return true; } - @Override - protected boolean useLlc() { - return true; - } - @Override protected String getLoadMode() { return ReadMode.mmap.name(); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java index c0272f89544d..5c589bd9cb32 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java @@ -55,10 +55,10 @@ /** - * Integration test that extends RealtimeClusterIntegrationTest but uses low-level Kafka consumer. + * Integration test for low-level Kafka consumer. * TODO: Add separate module-level tests and remove the randomness of this test */ -public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest { +public class LLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest { private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test"; private static final String TEST_UPDATED_INVERTED_INDEX_QUERY = "SELECT COUNT(*) FROM mytable WHERE DivActualElapsedTime = 305"; @@ -77,11 +77,6 @@ protected boolean injectTombstones() { return true; } - @Override - protected boolean useLlc() { - return true; - } - @Override protected String getLoadMode() { return ReadMode.mmap.name(); @@ -155,17 +150,17 @@ private void uploadSegmentsToController(String tableName, File tarDir, boolean o if (changeCrc) { changeCrcInSegmentZKMetadata(tableName, segmentTarFile.toString()); } - assertEquals(fileUploadDownloadClient - .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName, - TableType.REALTIME).getStatusCode(), HttpStatus.SC_OK); + assertEquals( + fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, + tableName, TableType.REALTIME).getStatusCode(), HttpStatus.SC_OK); } else { // Upload segments in parallel ExecutorService executorService = Executors.newFixedThreadPool(numSegments); List> futures = new ArrayList<>(numSegments); for (File segmentTarFile : segmentTarFiles) { - futures.add(executorService.submit(() -> fileUploadDownloadClient - .uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), segmentTarFile, tableName, - TableType.REALTIME).getStatusCode())); + futures.add(executorService.submit( + () -> fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI, segmentTarFile.getName(), + segmentTarFile, tableName, TableType.REALTIME).getStatusCode())); } executorService.shutdown(); for (Future future : futures) { @@ -253,10 +248,12 @@ public void testInvertedIndexTriggering() JsonNode queryResponse1 = postQuery(TEST_UPDATED_INVERTED_INDEX_QUERY); // Total docs should not change during reload assertEquals(queryResponse1.get("totalDocs").asLong(), numTotalDocs); - assertEquals(queryResponse1.get("numConsumingSegmentsQueried").asLong(), 2); - assertTrue(queryResponse1.get("minConsumingFreshnessTimeMs").asLong() > _startTime); - assertTrue(queryResponse1.get("minConsumingFreshnessTimeMs").asLong() < System.currentTimeMillis()); - return queryResponse1.get("numEntriesScannedInFilter").asLong() == 0; + + long numConsumingSegmentsQueried = queryResponse1.get("numConsumingSegmentsQueried").asLong(); + long minConsumingFreshnessTimeMs = queryResponse1.get("minConsumingFreshnessTimeMs").asLong(); + long numEntriesScannedInFilter = queryResponse1.get("numEntriesScannedInFilter").asLong(); + return numConsumingSegmentsQueried == 2 && minConsumingFreshnessTimeMs > _startTime + && minConsumingFreshnessTimeMs < System.currentTimeMillis() && numEntriesScannedInFilter == 0; } catch (Exception e) { throw new RuntimeException(e); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java index 77a5fd959c8e..964a854f98ef 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/PeerDownloadLLCRealtimeClusterIntegrationTest.java @@ -70,7 +70,7 @@ * (1) All the segments on all servers are in either ONLINE or CONSUMING states * (2) For segments failed during deep store upload, the corresponding segment download url string is empty in Zk. */ -public class PeerDownloadLLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegrationTest { +public class PeerDownloadLLCRealtimeClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest { private static final Logger LOGGER = LoggerFactory.getLogger(PeerDownloadLLCRealtimeClusterIntegrationTest.class); private static final String CONSUMER_DIRECTORY = "/tmp/consumer-test"; @@ -151,11 +151,6 @@ public void startController() enableResourceConfigForLeadControllerResource(_enableLeadControllerResource); } - @Override - protected boolean useLlc() { - return true; - } - @Nullable @Override protected String getLoadMode() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java index 447171e3aa29..218ab7e01a2c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/RealtimeToOfflineSegmentsMinionClusterIntegrationTest.java @@ -51,7 +51,7 @@ * With every task run, a new segment is created in the offline table for 1 day. Watermark also keeps progressing * accordingly. */ -public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends RealtimeClusterIntegrationTest { +public class RealtimeToOfflineSegmentsMinionClusterIntegrationTest extends BaseRealtimeClusterIntegrationTest { private PinotHelixTaskResourceManager _helixTaskResourceManager; private PinotTaskManager _taskManager; @@ -67,11 +67,6 @@ protected TableTaskConfig getTaskConfig() { Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, new HashMap<>())); } - @Override - protected boolean useLlc() { - return true; - } - @BeforeClass public void setUp() throws Exception { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java index eea299254517..b7af297517ee 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentCompletionIntegrationTest.java @@ -61,11 +61,6 @@ public class SegmentCompletionIntegrationTest extends BaseClusterIntegrationTest private HelixManager _serverHelixManager; private String _currentSegment; - @Override - protected boolean useLlc() { - return true; - } - @Override protected int getNumKafkaPartitions() { return NUM_KAFKA_PARTITIONS; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java index 8f5199e6a80b..561cd67bc694 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentPartitionLLCRealtimeClusterIntegrationTest.java @@ -110,11 +110,6 @@ protected long getCountStarResult() { return _countStarResult; } - @Override - protected boolean useLlc() { - return true; - } - @Nullable @Override protected String getPartitionColumn() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java index 7af80d036371..3ad8eda2cad1 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TextIndicesRealtimeClusterIntegrationTest.java @@ -72,12 +72,6 @@ public String getTimeColumnName() { return TIME_COLUMN_NAME; } - // TODO: Support Lucene index on HLC consuming segments - @Override - protected boolean useLlc() { - return true; - } - @Nullable @Override protected String getSortedColumn() { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java index 7d21ee7c48de..177b442a9c1c 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TlsIntegrationTest.java @@ -242,11 +242,6 @@ protected TableTaskConfig getTaskConfig() { return new TableTaskConfig(Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, prop)); } - @Override - protected boolean useLlc() { - return true; - } - @Override public void addSchema(Schema schema) throws IOException { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java index 534c716b588c..e7966e3608b9 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UpsertTableSegmentUploadIntegrationTest.java @@ -129,11 +129,6 @@ protected String getAvroTarFileName() { return "upsert_test.tar.gz"; } - @Override - protected boolean useLlc() { - return true; - } - @Override protected String getPartitionColumn() { return PRIMARY_KEY_COL; diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java index bc8764e9f655..e8389b377f59 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/UrlAuthRealtimeIntegrationTest.java @@ -152,11 +152,6 @@ protected TableTaskConfig getTaskConfig() { Collections.singletonMap(MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE, properties)); } - @Override - protected boolean useLlc() { - return true; - } - @Override public void addSchema(Schema schema) throws IOException { diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java index 6835ab0dce34..46933a835636 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/SegmentDataManager.java @@ -26,17 +26,13 @@ * Base segment data manager to maintain reference count for the segment. */ public abstract class SegmentDataManager { + private final long _loadTimeMs = System.currentTimeMillis(); private int _referenceCount = 1; - private long _loadTimeMs = System.currentTimeMillis(); public long getLoadTimeMs() { return _loadTimeMs; } - public void setLoadTimeMs(long loadTimeMs) { - _loadTimeMs = loadTimeMs; - } - @VisibleForTesting public synchronized int getReferenceCount() { return _referenceCount; diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java index 65212465242e..7b83864cd110 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImpl.java @@ -19,7 +19,6 @@ package org.apache.pinot.segment.local.indexsegment.mutable; import com.google.common.base.Preconditions; -import com.google.common.collect.Sets; import it.unimi.dsi.fastutil.ints.IntArrays; import java.io.Closeable; import java.io.File; @@ -161,10 +160,6 @@ public class MutableSegmentImpl implements MutableSegment { private volatile long _latestIngestionTimeMs = Long.MIN_VALUE; private RealtimeLuceneIndexRefreshState.RealtimeLuceneReaders _realtimeLuceneReaders; - // If the table schema is changed before the consuming segment is committed, newly added columns would appear in - // _newlyAddedColumnsFieldMap. - private final Map _newlyAddedColumnsFieldMap = new ConcurrentHashMap(); - private final Map _newlyAddedPhysicalColumnsFieldMap = new ConcurrentHashMap(); private final UpsertConfig.Mode _upsertMode; private final String _upsertComparisonColumn; @@ -482,19 +477,6 @@ public long getMaxTime() { return Long.MIN_VALUE; } - public void addExtraColumns(Schema newSchema) { - for (String columnName : newSchema.getColumnNames()) { - if (!_schema.getColumnNames().contains(columnName)) { - FieldSpec fieldSpec = newSchema.getFieldSpecFor(columnName); - _newlyAddedColumnsFieldMap.put(columnName, fieldSpec); - if (!fieldSpec.isVirtualColumn()) { - _newlyAddedPhysicalColumnsFieldMap.put(columnName, fieldSpec); - } - } - } - _logger.info("Newly added columns: " + _newlyAddedColumnsFieldMap); - } - @Override public boolean index(GenericRow row, @Nullable RowMetadata rowMetadata) throws IOException { @@ -951,40 +933,33 @@ public SegmentMetadata getSegmentMetadata() { @Override public Set getColumnNames() { - // Return all column names, virtual and physical. - return Sets.union(_schema.getColumnNames(), _newlyAddedColumnsFieldMap.keySet()); + return _schema.getColumnNames(); } @Override public Set getPhysicalColumnNames() { HashSet physicalColumnNames = new HashSet<>(); - for (FieldSpec fieldSpec : _physicalFieldSpecs) { physicalColumnNames.add(fieldSpec.getName()); } - // We should include newly added columns in the physical columns - return Sets.union(physicalColumnNames, _newlyAddedPhysicalColumnsFieldMap.keySet()); + return physicalColumnNames; } @Override public DataSource getDataSource(String column) { - FieldSpec fieldSpec = _schema.getFieldSpecFor(column); - if (fieldSpec == null || fieldSpec.isVirtualColumn()) { - // Column is either added during ingestion, or was initiated with a virtual column provider - if (fieldSpec == null) { - // If the column was added during ingestion, we will construct the column provider based on its fieldSpec to - // provide values - fieldSpec = _newlyAddedColumnsFieldMap.get(column); - Preconditions.checkNotNull(fieldSpec, - "FieldSpec for " + column + " should not be null. " + "Potentially invalid column name specified."); - } + IndexContainer indexContainer = _indexContainerMap.get(column); + if (indexContainer != null) { + // Physical column + return indexContainer.toDataSource(); + } else { + // Virtual column + FieldSpec fieldSpec = _schema.getFieldSpecFor(column); + Preconditions.checkState(fieldSpec != null && fieldSpec.isVirtualColumn(), "Failed to find column: %s", column); // TODO: Refactor virtual column provider to directly generate data source VirtualColumnContext virtualColumnContext = new VirtualColumnContext(fieldSpec, _numDocsIndexed); VirtualColumnProvider virtualColumnProvider = VirtualColumnProviderFactory.buildProvider(virtualColumnContext); return new ImmutableDataSource(virtualColumnProvider.buildMetadata(virtualColumnContext), virtualColumnProvider.buildColumnIndexContainer(virtualColumnContext)); - } else { - return _indexContainerMap.get(column).toDataSource(); } } diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java index 4ddeee2fc594..e1605a048c47 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/virtualcolumn/VirtualColumnProviderFactory.java @@ -37,13 +37,7 @@ private VirtualColumnProviderFactory() { public static VirtualColumnProvider buildProvider(VirtualColumnContext virtualColumnContext) { String virtualColumnProvider = virtualColumnContext.getFieldSpec().getVirtualColumnProvider(); try { - // Use the preset virtualColumnProvider if available - if (virtualColumnProvider != null && !virtualColumnProvider - .equals(DefaultNullValueVirtualColumnProvider.class.getName())) { - return PluginManager.get().createInstance(virtualColumnProvider); - } - // Create the columnProvider that returns default null values based on the virtualColumnContext - return new DefaultNullValueVirtualColumnProvider(); + return PluginManager.get().createInstance(virtualColumnProvider); } catch (Exception e) { throw new IllegalStateException("Caught exception while creating instance of: " + virtualColumnProvider, e); } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java index 49c932d911b2..fe373c0fbe80 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java @@ -18,7 +18,6 @@ */ package org.apache.pinot.server.starter.helix; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.cache.CacheBuilder; import com.google.common.cache.CacheLoader; @@ -58,11 +57,8 @@ import org.apache.pinot.segment.local.data.manager.SegmentDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManager; import org.apache.pinot.segment.local.data.manager.TableDataManagerConfig; -import org.apache.pinot.segment.local.indexsegment.mutable.MutableSegmentImpl; import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig; import org.apache.pinot.segment.local.utils.SegmentLocks; -import org.apache.pinot.segment.spi.ImmutableSegment; -import org.apache.pinot.segment.spi.IndexSegment; import org.apache.pinot.segment.spi.SegmentMetadata; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoader; import org.apache.pinot.segment.spi.loader.SegmentDirectoryLoaderContext; @@ -246,8 +242,8 @@ public void deleteSegment(String tableNameWithType, String segmentName) } // We might clean up further more with the specific segment loader. But note that tableDataManager object or // even the TableConfig might not be present any more at this point. - SegmentDirectoryLoader segmentLoader = SegmentDirectoryLoaderRegistry - .getSegmentDirectoryLoader(_instanceDataManagerConfig.getSegmentDirectoryLoader()); + SegmentDirectoryLoader segmentLoader = SegmentDirectoryLoaderRegistry.getSegmentDirectoryLoader( + _instanceDataManagerConfig.getSegmentDirectoryLoader()); if (segmentLoader != null) { LOGGER.info("Deleting segment: {} further with segment loader: {}", segmentName, _instanceDataManagerConfig.getSegmentDirectoryLoader()); @@ -373,16 +369,28 @@ private void reloadSegmentWithMetadata(String tableNameWithType, SegmentMetadata File indexDir = segmentMetadata.getIndexDir(); if (indexDir == null) { + // Use force commit to reload consuming segment SegmentDataManager segmentDataManager = tableDataManager.acquireSegment(segmentName); if (segmentDataManager == null) { + LOGGER.warn("Failed to find segment data manager for table: {}, segment: {}, skipping reloading segment", + tableNameWithType, segmentName); return; } try { - if (reloadMutableSegment(tableNameWithType, segmentName, segmentDataManager, schema)) { - // A mutable segment has been found and reloaded. - segmentDataManager.setLoadTimeMs(System.currentTimeMillis()); + if (!_instanceDataManagerConfig.shouldReloadConsumingSegment()) { + LOGGER.warn("Skip reloading consuming segment: {} in table: {} as configured", segmentName, + tableNameWithType); return; } + // TODO: Support force committing HLC consuming segment + if (!(segmentDataManager instanceof LLRealtimeSegmentDataManager)) { + LOGGER.warn("Cannot reload non-LLC consuming segment: {} in table: {}", segmentName, tableNameWithType); + return; + } + LOGGER.info("Reloading (force committing) LLC consuming segment: {} in table: {}", segmentName, + tableNameWithType); + ((LLRealtimeSegmentDataManager) segmentDataManager).forceCommit(); + return; } finally { tableDataManager.releaseSegment(segmentDataManager); } @@ -406,30 +414,6 @@ private void reloadSegmentWithMetadata(String tableNameWithType, SegmentMetadata } } - /** - * Try to reload a mutable segment. - * @return true if the segment is mutable and loaded; false if the segment is immutable. - */ - @VisibleForTesting - boolean reloadMutableSegment(String tableNameWithType, String segmentName, SegmentDataManager segmentDataManager, - @Nullable Schema schema) { - IndexSegment segment = segmentDataManager.getSegment(); - if (segment instanceof ImmutableSegment) { - LOGGER.info("Found an immutable segment: {} in table: {}", segmentName, tableNameWithType); - return false; - } - // Found a mutable/consuming segment from REALTIME table. - if (!_instanceDataManagerConfig.shouldReloadConsumingSegment()) { - LOGGER.info("Skip reloading REALTIME consuming segment: {} in table: {}", segmentName, tableNameWithType); - return true; - } - LOGGER.info("Reloading REALTIME consuming segment: {} in table: {}", segmentName, tableNameWithType); - Preconditions.checkState(schema != null, "Failed to find schema for table: {}", tableNameWithType); - MutableSegmentImpl mutableSegment = (MutableSegmentImpl) segment; - mutableSegment.addExtraColumns(schema); - return true; - } - @Override public void addOrReplaceSegment(String tableNameWithType, String segmentName) throws Exception { @@ -541,9 +525,9 @@ public SegmentUploader getSegmentUploader() { @Override public void forceCommit(String tableNameWithType, Set segmentNames) { - Preconditions.checkArgument(TableNameBuilder.isRealtimeTableResource(tableNameWithType), String - .format("Force commit is only supported for segments of realtime tables - table name: %s segment names: %s", - tableNameWithType, segmentNames)); + Preconditions.checkArgument(TableNameBuilder.isRealtimeTableResource(tableNameWithType), String.format( + "Force commit is only supported for segments of realtime tables - table name: %s segment names: %s", + tableNameWithType, segmentNames)); TableDataManager tableDataManager = _tableDataManagerMap.get(tableNameWithType); if (tableDataManager != null) { segmentNames.forEach(segName -> { diff --git a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java b/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java deleted file mode 100644 index a8809bf81fed..000000000000 --- a/pinot-server/src/test/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManagerTest.java +++ /dev/null @@ -1,70 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.server.starter.helix; - -import java.io.File; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.commons.io.FileUtils; -import org.apache.helix.HelixManager; -import org.apache.pinot.common.metrics.ServerMetrics; -import org.apache.pinot.segment.local.data.manager.SegmentDataManager; -import org.apache.pinot.segment.spi.ImmutableSegment; -import org.apache.pinot.segment.spi.MutableSegment; -import org.apache.pinot.spi.env.PinotConfiguration; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.Test; - -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; - - -public class HelixInstanceDataManagerTest { - private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "HelixInstanceDataManagerTest"); - - @AfterMethod - public void tearDown() - throws Exception { - FileUtils.deleteDirectory(TEMP_DIR); - } - - @Test - public void testReloadMutableSegment() - throws ConfigurationException { - // Provides required configs to init the instance data manager. - PinotConfiguration config = new PinotConfiguration(); - config.setProperty("id", "id01"); - config.setProperty("dataDir", TEMP_DIR.getAbsolutePath()); - config.setProperty("segmentTarDir", TEMP_DIR.getAbsolutePath()); - config.setProperty("readMode", "mmap"); - config.setProperty("reload.consumingSegment", "false"); - - HelixInstanceDataManager mgr = new HelixInstanceDataManager(); - mgr.init(config, mock(HelixManager.class), mock(ServerMetrics.class)); - - SegmentDataManager segMgr = mock(SegmentDataManager.class); - - when(segMgr.getSegment()).thenReturn(mock(ImmutableSegment.class)); - assertFalse(mgr.reloadMutableSegment("table01", "seg01", segMgr, null)); - - when(segMgr.getSegment()).thenReturn(mock(MutableSegment.class)); - assertTrue(mgr.reloadMutableSegment("table01", "seg01", segMgr, null)); - } -}