diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java index ebb400387f73..ac938fb73bb6 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/ZKMetadataProvider.java @@ -219,6 +219,12 @@ public static boolean setSegmentZKMetadata(ZkHelixPropertyStore proper return setSegmentZKMetadata(propertyStore, tableNameWithType, segmentZKMetadata, -1); } + public static boolean removeSegmentZKMetadata(ZkHelixPropertyStore propertyStore, String tableNameWithType, + String segmentName) { + return propertyStore.remove(constructPropertyStorePathForSegment(tableNameWithType, segmentName), + AccessOption.PERSISTENT); + } + @Nullable public static ZNRecord getZnRecord(ZkHelixPropertyStore propertyStore, String path) { Stat stat = new Stat(); @@ -238,7 +244,6 @@ public static SegmentZKMetadata getSegmentZKMetadata(ZkHelixPropertyStore propertyStore, String username) { ZNRecord znRecord = diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index 56f478595ab0..3d33a358f33c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -24,6 +24,7 @@ import javax.annotation.Nullable; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import org.apache.helix.model.IdealState; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; @@ -67,10 +68,20 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers) throws Exception { String segmentName = segmentMetadata.getName(); - ZNRecord existingSegmentMetadataZNRecord = - _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); boolean refreshOnly = Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY)); + + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + if (existingSegmentMetadataZNRecord != null && shouldProcessAsNewSegment(tableNameWithType, segmentName, + existingSegmentMetadataZNRecord, enableParallelPushProtection)) { + LOGGER.warn("Removing segment ZK metadata (recovering from previous upload failure) for table: {}, segment: {}", + tableNameWithType, segmentName); + Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType, segmentName), + "Failed to remove segment ZK metadata for table: %s, segment: %s", tableNameWithType, segmentName); + existingSegmentMetadataZNRecord = null; + } + if (existingSegmentMetadataZNRecord == null) { // Add a new segment if (refreshOnly) { @@ -99,6 +110,44 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata } } + /** + * Returns {@code true} when the segment should be processed as new segment. + *

When segment ZK metadata exists, check if segment exists in the ideal state. If the previous upload failed after + * segment ZK metadata is created but before assigning the segment to the ideal state, we want to remove the existing + * segment ZK metadata and treat it as a new segment. + */ + private boolean shouldProcessAsNewSegment(String tableNameWithType, String segmentName, + ZNRecord existingSegmentMetadataZNRecord, boolean enableParallelPushProtection) { + IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType); + Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType); + if (idealState.getInstanceStateMap(segmentName) != null) { + return false; + } + // Segment does not exist in the ideal state + if (enableParallelPushProtection) { + // Check segment upload start time when parallel push protection is enabled in case the segment is being uploaded + long segmentUploadStartTime = new SegmentZKMetadata(existingSegmentMetadataZNRecord).getSegmentUploadStartTime(); + if (segmentUploadStartTime > 0) { + handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime); + } + } + return true; + } + + private void handleParallelPush(String tableNameWithType, String segmentName, long segmentUploadStartTime) { + assert segmentUploadStartTime > 0; + if (System.currentTimeMillis() - segmentUploadStartTime > _controllerConf.getSegmentUploadTimeoutInMillis()) { + // Last segment upload does not finish properly, replace the segment + LOGGER.error("Segment: {} of table: {} was not properly uploaded, replacing it", segmentName, tableNameWithType); + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED, 1L); + } else { + // Another segment upload is in progress + throw new ControllerApplicationException(LOGGER, + String.format("Another segment upload is in progress for segment: %s of table: %s, retry later", segmentName, + tableNameWithType), Response.Status.CONFLICT); + } + } + private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @@ -117,17 +166,7 @@ private void processExistingSegment(String tableNameWithType, SegmentMetadata se // When segment upload start time is larger than 0, that means another upload is in progress long segmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime(); if (segmentUploadStartTime > 0) { - if (System.currentTimeMillis() - segmentUploadStartTime > _controllerConf.getSegmentUploadTimeoutInMillis()) { - // Last segment upload does not finish properly, replace the segment - LOGGER.error("Segment: {} of table: {} was not properly uploaded, replacing it", segmentName, - tableNameWithType); - _controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED, 1L); - } else { - // Another segment upload is in progress - throw new ControllerApplicationException(LOGGER, - String.format("Another segment upload is in progress for segment: %s of table: %s, retry later", - segmentName, tableNameWithType), Response.Status.CONFLICT); - } + handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime); } // Lock the segment by setting the upload start time in ZK diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index e03343eae0ec..df4df3c5ee59 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2247,6 +2247,10 @@ public boolean updateZkMetadata(String tableNameWithType, SegmentZKMetadata segm return ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata); } + public boolean removeSegmentZKMetadata(String tableNameWithType, String segmentName) { + return ZKMetadataProvider.removeSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName); + } + /** * Delete the table on servers by sending table deletion message */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java index 9fd9b5dbcdfa..570a263ac8fc 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java @@ -180,6 +180,9 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN } segmentsToDelete.removeAll(propStoreFailedSegs); + // TODO: If removing segments from deep store fails (e.g. controller crashes, deep store unavailable), these + // segments will become orphans and not easy to track because their ZK metadata are already deleted. + // Consider removing segments from deep store before cleaning up the ZK metadata. removeSegmentsFromStore(tableName, segmentsToDelete, deletedSegmentsRetentionMs); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java index d5566e24a94c..e54f62b81c64 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java @@ -27,6 +27,7 @@ import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import org.apache.commons.io.FileUtils; +import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType; @@ -220,7 +221,6 @@ public void testCompleteSegmentOperations() zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders); - SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); assertNotNull(segmentZKMetadata); assertEquals(segmentZKMetadata.getCrc(), 12345L); @@ -233,6 +233,30 @@ public void testCompleteSegmentOperations() assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1); assertEquals(segmentZKMetadata.getSizeInBytes(), 10); + // Test if the same segment can be uploaded when the previous upload failed after segment ZK metadata is created but + // before segment is assigned to the ideal state + // Manually remove the segment from the ideal state + IdealState idealState = _resourceManager.getTableIdealState(OFFLINE_TABLE_NAME); + assertNotNull(idealState); + idealState.getRecord().getMapFields().remove(SEGMENT_NAME); + _resourceManager.getHelixAdmin() + .setResourceIdealState(_resourceManager.getHelixClusterName(), OFFLINE_TABLE_NAME, idealState); + // The segment should be uploaded as a new segment (push time should change, and refresh time shouldn't be set) + zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, + "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders); + segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); + assertNotNull(segmentZKMetadata); + assertEquals(segmentZKMetadata.getCrc(), 12345L); + assertEquals(segmentZKMetadata.getCreationTime(), 123L); + assertTrue(segmentZKMetadata.getPushTime() > pushTime); + pushTime = segmentZKMetadata.getPushTime(); + assertTrue(pushTime > 0); + assertEquals(segmentZKMetadata.getRefreshTime(), Long.MIN_VALUE); + assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl"); + assertEquals(segmentZKMetadata.getCrypterName(), "crypter"); + assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1); + assertEquals(segmentZKMetadata.getSizeInBytes(), 10); + // Upload the same segment with allowRefresh = false. Validate that an exception is thrown. try { zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,