From d6571b85a3c6b59ee90b7cd5e4105a8f456b46c0 Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Tue, 25 Jan 2022 10:39:24 -0800 Subject: [PATCH 1/2] adding isInstantDelete API for segment deletion --- .../PinotSegmentRestletResource.java | 26 ++++-- .../helix/core/PinotHelixResourceManager.java | 10 ++- .../helix/core/SegmentDeletionManager.java | 81 ++++++++++++------- .../core/util/SegmentDeletionManagerTest.java | 6 +- 4 files changed, 82 insertions(+), 41 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 7078e6032e6d..2e9458c4da2a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -571,12 +571,15 @@ public SuccessResponse reloadAllSegmentsDeprecated2( @ApiOperation(value = "Delete a segment", notes = "Delete a segment") public SuccessResponse deleteSegment( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, - @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName) { + @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName, + @ApiParam(value = "If instant delete without retention") @QueryParam("isInstantDelete") @DefaultValue("false") + String isInstantDelete) { + boolean isInstantDeletion = !isInstantDelete.equalsIgnoreCase("false"); segmentName = URIUtils.decode(segmentName); TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE; String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName)); + deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName), isInstantDeletion); return new SuccessResponse("Segment deleted"); } @@ -587,14 +590,18 @@ public SuccessResponse deleteSegment( @ApiOperation(value = "Delete all segments", notes = "Delete all segments") public SuccessResponse deleteAllSegments( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, - @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr) { + @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr, + @ApiParam(value = "If instant delete without retention") @QueryParam("isInstantDelete") @DefaultValue("false") + String isInstantDelete) { + boolean isInstantDeletion = !isInstantDelete.equalsIgnoreCase("false"); TableType tableType = Constants.validateTableType(tableTypeStr); if (tableType == null) { throw new ControllerApplicationException(LOGGER, "Table type must not be null", Status.BAD_REQUEST); } String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false)); + deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false), + isInstantDeletion); return new SuccessResponse("All segments of table " + tableNameWithType + " deleted"); } @@ -607,7 +614,9 @@ public SuccessResponse deleteAllSegments( notes = "Delete the segments in the JSON array payload") public SuccessResponse deleteSegments( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, - List segments) { + @ApiParam(value = "If instant delete without retention") @QueryParam("isInstantDelete") @DefaultValue("false") + String isInstantDelete, List segments) { + boolean isInstantDeletion = !isInstantDelete.equalsIgnoreCase("false"); int numSegments = segments.size(); if (numSegments == 0) { throw new ControllerApplicationException(LOGGER, "Segments must be provided", Status.BAD_REQUEST); @@ -622,7 +631,7 @@ public SuccessResponse deleteSegments( TableType tableType = isRealtimeSegment ? TableType.REALTIME : TableType.OFFLINE; String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - deleteSegmentsInternal(tableNameWithType, segments); + deleteSegmentsInternal(tableNameWithType, segments, isInstantDeletion); if (numSegments <= 5) { return new SuccessResponse("Deleted segments: " + segments + " from table: " + tableNameWithType); } else { @@ -630,8 +639,9 @@ public SuccessResponse deleteSegments( } } - private void deleteSegmentsInternal(String tableNameWithType, List segments) { - PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments); + private void deleteSegmentsInternal(String tableNameWithType, List segments, boolean isInstantDeletion) { + PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments, + isInstantDeletion); if (!response.isSuccessful()) { throw new ControllerApplicationException(LOGGER, "Failed to delete segments from table: " + tableNameWithType + ", error message: " + response.getMessage(), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index ad2919296da9..4402c3178452 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -686,20 +686,26 @@ public List getSegmentsZKMetadata(String tableNameWithType) { return ZKMetadataProvider.getSegmentsZKMetadata(_propertyStore, tableNameWithType); } + public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List segmentNames) { + return deleteSegments(tableNameWithType, segmentNames, false); + } + /** * Delete a list of segments from ideal state and remove them from the local storage. * * @param tableNameWithType Table name with type suffix * @param segmentNames List of names of segment to be deleted + * @param isInstanceDelete Indicate if the deleted segments will be put in retention before deletion. * @return Request response */ - public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List segmentNames) { + public synchronized PinotResourceManagerResponse deleteSegments(String tableNameWithType, List segmentNames, + boolean isInstanceDelete) { try { LOGGER.info("Trying to delete segments: {} from table: {} ", segmentNames, tableNameWithType); Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType), "Table name: %s is not a valid table name with type suffix", tableNameWithType); HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames); - _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames); + _segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, isInstanceDelete); return PinotResourceManagerResponse.success("Segment " + segmentNames + " deleted"); } catch (final Exception e) { LOGGER.error("Caught exception while deleting segment: {} from table: {}", segmentNames, tableNameWithType, e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java index 9044f286e071..a042faf31216 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java @@ -83,21 +83,25 @@ public void stop() { } public void deleteSegments(final String tableName, final Collection segmentIds) { - deleteSegmentsWithDelay(tableName, segmentIds, DEFAULT_DELETION_DELAY_SECONDS); + deleteSegments(tableName, segmentIds, false); + } + + public void deleteSegments(final String tableName, final Collection segmentIds, boolean isInstantDeletion) { + deleteSegmentsWithDelay(tableName, segmentIds, isInstantDeletion, DEFAULT_DELETION_DELAY_SECONDS); } protected void deleteSegmentsWithDelay(final String tableName, final Collection segmentIds, - final long deletionDelaySeconds) { + final boolean isInstantDeletion, final long deletionDelaySeconds) { _executorService.schedule(new Runnable() { @Override public void run() { - deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, deletionDelaySeconds); + deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, isInstantDeletion, deletionDelaySeconds); } }, deletionDelaySeconds, TimeUnit.SECONDS); } protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableName, Collection segmentIds, - long deletionDelay) { + boolean isInstantDeletion, long deletionDelay) { // Check if segment got removed from ExternalView or IdealState ExternalView externalView = _helixAdmin.getResourceExternalView(_helixClusterName, tableName); IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName); @@ -149,7 +153,7 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN } segmentsToDelete.removeAll(propStoreFailedSegs); - removeSegmentsFromStore(tableName, segmentsToDelete); + removeSegmentsFromStore(tableName, segmentsToDelete, isInstantDeletion); } LOGGER.info("Deleted {} segments from table {}:{}", segmentsToDelete.size(), tableName, @@ -158,47 +162,68 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN if (!segmentsToRetryLater.isEmpty()) { long effectiveDeletionDelay = Math.min(deletionDelay * 2, MAX_DELETION_DELAY_SECONDS); LOGGER.info("Postponing deletion of {} segments from table {}", segmentsToRetryLater.size(), tableName); - deleteSegmentsWithDelay(tableName, segmentsToRetryLater, effectiveDeletionDelay); + deleteSegmentsWithDelay(tableName, segmentsToRetryLater, isInstantDeletion, effectiveDeletionDelay); return; } } public void removeSegmentsFromStore(String tableNameWithType, List segments) { + removeSegmentsFromStore(tableNameWithType, segments, false); + } + + public void removeSegmentsFromStore(String tableNameWithType, List segments, boolean isInstantDeletion) { for (String segment : segments) { - removeSegmentFromStore(tableNameWithType, segment); + removeSegmentFromStore(tableNameWithType, segment, isInstantDeletion); } } - protected void removeSegmentFromStore(String tableNameWithType, String segmentId) { + protected void removeSegmentFromStore(String tableNameWithType, String segmentId, boolean isInstantDeletion) { // Ignore HLC segments as they are not stored in Pinot FS if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) { return; } if (_dataDir != null) { - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - URI fileToMoveURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId)); - URI deletedSegmentDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, URIUtils.encode(segmentId)); - PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme()); + if (isInstantDeletion) { + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId)); + PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme()); - try { - if (pinotFS.exists(fileToMoveURI)) { - // Overwrites the file if it already exists in the target directory. - if (pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true)) { - // Updates last modified. - // Touch is needed here so that removeAgedDeletedSegments() works correctly. - pinotFS.touch(deletedSegmentDestURI); - LOGGER.info("Moved segment {} from {} to {}", segmentId, fileToMoveURI.toString(), - deletedSegmentDestURI.toString()); + try { + if (pinotFS.delete(fileToDeleteURI, false)) { + LOGGER.info("Deleted segment {} from {}", segmentId, fileToDeleteURI.toString()); } else { - LOGGER.warn("Failed to move segment {} from {} to {}", segmentId, fileToMoveURI.toString(), - deletedSegmentDestURI.toString()); + LOGGER.warn("Failed to delete segment {} from {}", segmentId, fileToDeleteURI.toString()); } - } else { - LOGGER.warn("Failed to find local segment file for segment {}", fileToMoveURI.toString()); + } catch (IOException e) { + LOGGER.warn("Could not delete segment {} from {}", segmentId, fileToDeleteURI.toString(), e); + } + } else { + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + URI fileToMoveURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId)); + URI deletedSegmentDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, + URIUtils.encode(segmentId)); + PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme()); + + try { + if (pinotFS.exists(fileToMoveURI)) { + // Overwrites the file if it already exists in the target directory. + if (pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true)) { + // Updates last modified. + // Touch is needed here so that removeAgedDeletedSegments() works correctly. + pinotFS.touch(deletedSegmentDestURI); + LOGGER.info("Moved segment {} from {} to {}", segmentId, fileToMoveURI.toString(), + deletedSegmentDestURI.toString()); + } else { + LOGGER.warn("Failed to move segment {} from {} to {}", segmentId, fileToMoveURI.toString(), + deletedSegmentDestURI.toString()); + } + } else { + LOGGER.warn("Failed to find local segment file for segment {}", fileToMoveURI.toString()); + } + } catch (IOException e) { + LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToMoveURI.toString(), + deletedSegmentDestURI.toString(), e); } - } catch (IOException e) { - LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToMoveURI.toString(), - deletedSegmentDestURI.toString(), e); } } else { LOGGER.info("dataDir is not configured, won't delete segment {} from disk", segmentId); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java index 1c21a985479d..9ec41691cd12 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/util/SegmentDeletionManagerTest.java @@ -285,17 +285,17 @@ public static class FakeDeletionManager extends SegmentDeletionManager { } public void deleteSegmentsFromPropertyStoreAndLocal(String tableName, Collection segments) { - super.deleteSegmentFromPropertyStoreAndLocal(tableName, segments, 0L); + super.deleteSegmentFromPropertyStoreAndLocal(tableName, segments, false, 0L); } @Override - protected void removeSegmentFromStore(String tableName, String segmentId) { + protected void removeSegmentFromStore(String tableName, String segmentId, boolean isInstantDelete) { _segmentsRemovedFromStore.add(segmentId); } @Override protected void deleteSegmentsWithDelay(final String tableName, final Collection segmentIds, - final long deletionDelaySeconds) { + final boolean isInstantDeletion, final long deletionDelaySeconds) { _segmentsToRetry.addAll(segmentIds); } } From fc6d981d4bd84b147df33fb3b9aad44ad5866a7f Mon Sep 17 00:00:00 2001 From: Rong Rong Date: Tue, 25 Jan 2022 12:36:18 -0800 Subject: [PATCH 2/2] address diff comments --- .../PinotSegmentRestletResource.java | 28 ++++++------ .../helix/core/SegmentDeletionManager.java | 43 +++++++++---------- 2 files changed, 34 insertions(+), 37 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java index 2e9458c4da2a..cae981b575de 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java @@ -572,14 +572,14 @@ public SuccessResponse reloadAllSegmentsDeprecated2( public SuccessResponse deleteSegment( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, @ApiParam(value = "Name of the segment", required = true) @PathParam("segmentName") @Encoded String segmentName, - @ApiParam(value = "If instant delete without retention") @QueryParam("isInstantDelete") @DefaultValue("false") - String isInstantDelete) { - boolean isInstantDeletion = !isInstantDelete.equalsIgnoreCase("false"); + @ApiParam(value = "Whether to delete the segment instantly or move to deleted_segment prefix and let " + + "RetentionManager handle the actual file deletion") @QueryParam("instantDelete") @DefaultValue("false") + boolean isInstantDelete) { segmentName = URIUtils.decode(segmentName); TableType tableType = SegmentName.isRealtimeSegmentName(segmentName) ? TableType.REALTIME : TableType.OFFLINE; String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName), isInstantDeletion); + deleteSegmentsInternal(tableNameWithType, Collections.singletonList(segmentName), isInstantDelete); return new SuccessResponse("Segment deleted"); } @@ -591,9 +591,9 @@ public SuccessResponse deleteSegment( public SuccessResponse deleteAllSegments( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, @ApiParam(value = "OFFLINE|REALTIME", required = true) @QueryParam("type") String tableTypeStr, - @ApiParam(value = "If instant delete without retention") @QueryParam("isInstantDelete") @DefaultValue("false") - String isInstantDelete) { - boolean isInstantDeletion = !isInstantDelete.equalsIgnoreCase("false"); + @ApiParam(value = "Whether to delete the segment instantly or move to deleted_segment prefix and let " + + "RetentionManager handle the actual file deletion") @QueryParam("instantDelete") @DefaultValue("false") + boolean isInstantDelete) { TableType tableType = Constants.validateTableType(tableTypeStr); if (tableType == null) { throw new ControllerApplicationException(LOGGER, "Table type must not be null", Status.BAD_REQUEST); @@ -601,7 +601,7 @@ public SuccessResponse deleteAllSegments( String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); deleteSegmentsInternal(tableNameWithType, _pinotHelixResourceManager.getSegmentsFor(tableNameWithType, false), - isInstantDeletion); + isInstantDelete); return new SuccessResponse("All segments of table " + tableNameWithType + " deleted"); } @@ -614,9 +614,9 @@ public SuccessResponse deleteAllSegments( notes = "Delete the segments in the JSON array payload") public SuccessResponse deleteSegments( @ApiParam(value = "Name of the table", required = true) @PathParam("tableName") String tableName, - @ApiParam(value = "If instant delete without retention") @QueryParam("isInstantDelete") @DefaultValue("false") - String isInstantDelete, List segments) { - boolean isInstantDeletion = !isInstantDelete.equalsIgnoreCase("false"); + @ApiParam(value = "Whether to delete the segment instantly or move to deleted_segment prefix and let " + + "RetentionManager handle the actual file deletion") @QueryParam("instantDelete") @DefaultValue("false") + boolean isInstantDelete, List segments) { int numSegments = segments.size(); if (numSegments == 0) { throw new ControllerApplicationException(LOGGER, "Segments must be provided", Status.BAD_REQUEST); @@ -631,7 +631,7 @@ public SuccessResponse deleteSegments( TableType tableType = isRealtimeSegment ? TableType.REALTIME : TableType.OFFLINE; String tableNameWithType = ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager, tableName, tableType, LOGGER).get(0); - deleteSegmentsInternal(tableNameWithType, segments, isInstantDeletion); + deleteSegmentsInternal(tableNameWithType, segments, isInstantDelete); if (numSegments <= 5) { return new SuccessResponse("Deleted segments: " + segments + " from table: " + tableNameWithType); } else { @@ -639,9 +639,9 @@ public SuccessResponse deleteSegments( } } - private void deleteSegmentsInternal(String tableNameWithType, List segments, boolean isInstantDeletion) { + private void deleteSegmentsInternal(String tableNameWithType, List segments, boolean isInstantDelete) { PinotResourceManagerResponse response = _pinotHelixResourceManager.deleteSegments(tableNameWithType, segments, - isInstantDeletion); + isInstantDelete); if (!response.isSuccessful()) { throw new ControllerApplicationException(LOGGER, "Failed to delete segments from table: " + tableNameWithType + ", error message: " + response.getMessage(), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java index a042faf31216..2aed8d55a2b5 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/SegmentDeletionManager.java @@ -82,16 +82,16 @@ public void stop() { _executorService.shutdownNow(); } - public void deleteSegments(final String tableName, final Collection segmentIds) { + public void deleteSegments(String tableName, Collection segmentIds) { deleteSegments(tableName, segmentIds, false); } - public void deleteSegments(final String tableName, final Collection segmentIds, boolean isInstantDeletion) { + public void deleteSegments(String tableName, Collection segmentIds, boolean isInstantDeletion) { deleteSegmentsWithDelay(tableName, segmentIds, isInstantDeletion, DEFAULT_DELETION_DELAY_SECONDS); } - protected void deleteSegmentsWithDelay(final String tableName, final Collection segmentIds, - final boolean isInstantDeletion, final long deletionDelaySeconds) { + protected void deleteSegmentsWithDelay(String tableName, Collection segmentIds, + boolean isInstantDeletion, long deletionDelaySeconds) { _executorService.schedule(new Runnable() { @Override public void run() { @@ -183,11 +183,11 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId return; } if (_dataDir != null) { + String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); + URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId)); + PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme()); if (isInstantDeletion) { - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId)); - PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme()); - + // delete the segment file directly try { if (pinotFS.delete(fileToDeleteURI, false)) { LOGGER.info("Deleted segment {} from {}", segmentId, fileToDeleteURI.toString()); @@ -198,31 +198,28 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId LOGGER.warn("Could not delete segment {} from {}", segmentId, fileToDeleteURI.toString(), e); } } else { - String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType); - URI fileToMoveURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId)); - URI deletedSegmentDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, + // move the segment file to deleted segments first and let retention manager handler the deletion + URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, URIUtils.encode(segmentId)); - PinotFS pinotFS = PinotFSFactory.create(fileToMoveURI.getScheme()); - try { - if (pinotFS.exists(fileToMoveURI)) { + if (pinotFS.exists(fileToDeleteURI)) { // Overwrites the file if it already exists in the target directory. - if (pinotFS.move(fileToMoveURI, deletedSegmentDestURI, true)) { + if (pinotFS.move(fileToDeleteURI, deletedSegmentMoveDestURI, true)) { // Updates last modified. // Touch is needed here so that removeAgedDeletedSegments() works correctly. - pinotFS.touch(deletedSegmentDestURI); - LOGGER.info("Moved segment {} from {} to {}", segmentId, fileToMoveURI.toString(), - deletedSegmentDestURI.toString()); + pinotFS.touch(deletedSegmentMoveDestURI); + LOGGER.info("Moved segment {} from {} to {}", segmentId, fileToDeleteURI.toString(), + deletedSegmentMoveDestURI.toString()); } else { - LOGGER.warn("Failed to move segment {} from {} to {}", segmentId, fileToMoveURI.toString(), - deletedSegmentDestURI.toString()); + LOGGER.warn("Failed to move segment {} from {} to {}", segmentId, fileToDeleteURI.toString(), + deletedSegmentMoveDestURI.toString()); } } else { - LOGGER.warn("Failed to find local segment file for segment {}", fileToMoveURI.toString()); + LOGGER.warn("Failed to find local segment file for segment {}", fileToDeleteURI.toString()); } } catch (IOException e) { - LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToMoveURI.toString(), - deletedSegmentDestURI.toString(), e); + LOGGER.warn("Could not move segment {} from {} to {}", segmentId, fileToDeleteURI.toString(), + deletedSegmentMoveDestURI.toString(), e); } } } else {