Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add retention period to deleted segment files and allow table level o… #8176

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,8 @@ public synchronized PinotResourceManagerResponse deleteSegments(String tableName
Preconditions.checkArgument(TableNameBuilder.isTableResource(tableNameWithType),
"Table name: %s is not a valid table name with type suffix", tableNameWithType);
HelixHelper.removeSegmentsFromIdealState(_helixZkManager, tableNameWithType, segmentNames);
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames);
TableConfig tableConfig = ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
_segmentDeletionManager.deleteSegments(tableNameWithType, segmentNames, tableConfig);
return PinotResourceManagerResponse.success("Segment " + segmentNames + " deleted");
} catch (final Exception e) {
LOGGER.error("Caught exception while deleting segment: {} from table: {}", segmentNames, tableNameWithType, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,21 @@

import java.io.IOException;
import java.net.URI;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixAdmin;
import org.apache.helix.ZNRecord;
Expand All @@ -40,10 +44,12 @@
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.URIUtils;
import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.filesystem.PinotFSFactory;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -53,22 +59,34 @@ public class SegmentDeletionManager {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentDeletionManager.class);
private static final long MAX_DELETION_DELAY_SECONDS = 300L; // Maximum of 5 minutes back-off to retry the deletion
private static final long DEFAULT_DELETION_DELAY_SECONDS = 2L;

// Retention date format will be written as suffix to deleted segments under `Deleted_Segments` folder. for example:
// `Deleted_Segments/myTable/myTable_mySegment_0__RETENTION_UNTIL__20220202_120000` to indicate that this segment
// file will be permanently deleted after Feb 2nd 2022 12PM.
private static final String DELETED_SEGMENTS = "Deleted_Segments";
private static final String RETENTION_UNTIL_SEPARATOR = "__RETENTION_UNTIL__";
private static final String RETENTION_DATE_FORMAT_STR = "yyyyMMdd_HHmmss";
private static final SimpleDateFormat RETENTION_DATE_FORMAT;

static {
RETENTION_DATE_FORMAT = new SimpleDateFormat(RETENTION_DATE_FORMAT_STR, Locale.getDefault());
RETENTION_DATE_FORMAT.setTimeZone(TimeZone.getDefault());
}

private final ScheduledExecutorService _executorService;
private final String _dataDir;
private final String _helixClusterName;
private final HelixAdmin _helixAdmin;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final int _defaultDeletedSegmentsRetentionInDays;
private final long _defaultDeletedSegmentsRetentionMs;

public SegmentDeletionManager(String dataDir, HelixAdmin helixAdmin, String helixClusterName,
ZkHelixPropertyStore<ZNRecord> propertyStore, int deletedSegmentsRetentionInDays) {
_dataDir = dataDir;
_helixAdmin = helixAdmin;
_helixClusterName = helixClusterName;
_propertyStore = propertyStore;
_defaultDeletedSegmentsRetentionInDays = deletedSegmentsRetentionInDays;
_defaultDeletedSegmentsRetentionMs = TimeUnit.DAYS.toMillis(deletedSegmentsRetentionInDays);

_executorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
@Override
Expand All @@ -84,22 +102,29 @@ public void stop() {
_executorService.shutdownNow();
}

public void deleteSegments(final String tableName, final Collection<String> segmentIds) {
deleteSegmentsWithDelay(tableName, segmentIds, DEFAULT_DELETION_DELAY_SECONDS);
public void deleteSegments(String tableName, Collection<String> segmentIds) {
deleteSegments(tableName, segmentIds, null);
}

protected void deleteSegmentsWithDelay(final String tableName, final Collection<String> segmentIds,
final long deletionDelaySeconds) {
public void deleteSegments(String tableName, Collection<String> segmentIds,
TableConfig tableConfig) {
long deletedSegmentsRetentionMs = getRetentionMsFromTableConfig(tableConfig);
deleteSegmentsWithDelay(tableName, segmentIds, deletedSegmentsRetentionMs, DEFAULT_DELETION_DELAY_SECONDS);
}

protected void deleteSegmentsWithDelay(String tableName, Collection<String> segmentIds,
long deletedSegmentsRetentionMs, long deletionDelaySeconds) {
_executorService.schedule(new Runnable() {
@Override
public void run() {
deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, deletionDelaySeconds);
deleteSegmentFromPropertyStoreAndLocal(tableName, segmentIds, deletedSegmentsRetentionMs,
deletionDelaySeconds);
}
}, deletionDelaySeconds, TimeUnit.SECONDS);
}

protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableName, Collection<String> segmentIds,
long deletionDelay) {
long deletedSegmentsRetentionMs, long deletionDelay) {
// Check if segment got removed from ExternalView or IdealState
ExternalView externalView = _helixAdmin.getResourceExternalView(_helixClusterName, tableName);
IdealState idealState = _helixAdmin.getResourceIdealState(_helixClusterName, tableName);
Expand Down Expand Up @@ -151,7 +176,7 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN
}
segmentsToDelete.removeAll(propStoreFailedSegs);

removeSegmentsFromStore(tableName, segmentsToDelete);
removeSegmentsFromStore(tableName, segmentsToDelete, deletedSegmentsRetentionMs);
}

LOGGER.info("Deleted {} segments from table {}:{}", segmentsToDelete.size(), tableName,
Expand All @@ -160,18 +185,29 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN
if (!segmentsToRetryLater.isEmpty()) {
long effectiveDeletionDelay = Math.min(deletionDelay * 2, MAX_DELETION_DELAY_SECONDS);
LOGGER.info("Postponing deletion of {} segments from table {}", segmentsToRetryLater.size(), tableName);
deleteSegmentsWithDelay(tableName, segmentsToRetryLater, effectiveDeletionDelay);
deleteSegmentsWithDelay(tableName, segmentsToRetryLater, deletedSegmentsRetentionMs, effectiveDeletionDelay);
return;
}
}

public void removeSegmentsFromStore(String tableNameWithType, List<String> segments) {
removeSegmentsFromStore(tableNameWithType, segments, _defaultDeletedSegmentsRetentionMs, true);
}

public void removeSegmentsFromStore(String tableNameWithType, List<String> segments,
long deletedSegmentsRetentionMs) {
removeSegmentsFromStore(tableNameWithType, segments, deletedSegmentsRetentionMs, false);
}

public void removeSegmentsFromStore(String tableNameWithType, List<String> segments,
long deletedSegmentsRetentionMs, boolean usedDefaultClusterRetention) {
for (String segment : segments) {
removeSegmentFromStore(tableNameWithType, segment);
removeSegmentFromStore(tableNameWithType, segment, deletedSegmentsRetentionMs, usedDefaultClusterRetention);
}
}

protected void removeSegmentFromStore(String tableNameWithType, String segmentId) {
protected void removeSegmentFromStore(String tableNameWithType, String segmentId,
long deletedSegmentsRetentionMs, boolean usedDefaultClusterRetention) {
// Ignore HLC segments as they are not stored in Pinot FS
if (SegmentName.isHighLevelConsumerSegmentName(segmentId)) {
return;
Expand All @@ -180,7 +216,7 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
URI fileToDeleteURI = URIUtils.getUri(_dataDir, rawTableName, URIUtils.encode(segmentId));
PinotFS pinotFS = PinotFSFactory.create(fileToDeleteURI.getScheme());
if (_defaultDeletedSegmentsRetentionInDays <= 0) {
if (deletedSegmentsRetentionMs <= 0) {
// delete the segment file instantly if retention is set to zero
try {
if (pinotFS.delete(fileToDeleteURI, true)) {
Expand All @@ -193,8 +229,9 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId
}
} else {
// move the segment file to deleted segments first and let retention manager handler the deletion
URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName,
URIUtils.encode(segmentId));
String deletedFileName = usedDefaultClusterRetention ? URIUtils.encode(segmentId)
: getDeletedSegmentFileName(URIUtils.encode(segmentId), deletedSegmentsRetentionMs);
URI deletedSegmentMoveDestURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS, rawTableName, deletedFileName);
try {
if (pinotFS.exists(fileToDeleteURI)) {
// Overwrites the file if it already exists in the target directory.
Expand Down Expand Up @@ -223,9 +260,8 @@ protected void removeSegmentFromStore(String tableNameWithType, String segmentId

/**
* Removes aged deleted segments from the deleted directory
* @param retentionInDays: retention for deleted segments in days
*/
public void removeAgedDeletedSegments(int retentionInDays) {
public void removeAgedDeletedSegments() {
if (_dataDir != null) {
URI deletedDirURI = URIUtils.getUri(_dataDir, DELETED_SEGMENTS);
PinotFS pinotFS = PinotFSFactory.create(deletedDirURI.getScheme());
Expand Down Expand Up @@ -254,8 +290,8 @@ public void removeAgedDeletedSegments(int retentionInDays) {
int numFilesDeleted = 0;
for (String targetFile : targetFiles) {
URI targetURI = URIUtils.getUri(targetFile);
Date dateToDelete = DateTime.now().minusDays(retentionInDays).toDate();
if (pinotFS.lastModified(targetURI) < dateToDelete.getTime()) {
long deletionTimeMs = getDeletionTimeMsFromFile(targetFile, pinotFS.lastModified(targetURI));
if (System.currentTimeMillis() >= deletionTimeMs) {
if (!pinotFS.delete(targetURI, true)) {
LOGGER.warn("Cannot remove file {} from deleted directory.", targetURI.toString());
} else {
Expand All @@ -278,4 +314,38 @@ public void removeAgedDeletedSegments(int retentionInDays) {
LOGGER.info("dataDir is not configured, won't delete any expired segments from deleted directory.");
}
}

private String getDeletedSegmentFileName(String fileName, long deletedSegmentsRetentionMs) {
return fileName + RETENTION_UNTIL_SEPARATOR + RETENTION_DATE_FORMAT.format(new Date(
System.currentTimeMillis() + deletedSegmentsRetentionMs));
}

private long getDeletionTimeMsFromFile(String targetFile, long lastModifiedTime) {
String[] split = StringUtils.splitByWholeSeparator(targetFile, RETENTION_UNTIL_SEPARATOR);
if (split.length == 2) {
try {
return RETENTION_DATE_FORMAT.parse(split[1]).getTime();
} catch (Exception e) {
LOGGER.warn("No retention suffix found for file: {}", targetFile);
}
}
LOGGER.info("Fallback to using default cluster retention config: {} ms", _defaultDeletedSegmentsRetentionMs);
return lastModifiedTime + _defaultDeletedSegmentsRetentionMs;
}

private long getRetentionMsFromTableConfig(TableConfig tableConfig) {
long retentionMs = _defaultDeletedSegmentsRetentionMs;
if (tableConfig != null) {
SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
if (!StringUtils.isEmpty(validationConfig.getDeletedSegmentsRetentionPeriod())) {
try {
retentionMs = TimeUtils.convertPeriodToMillis(validationConfig.getDeletedSegmentsRetentionPeriod());
} catch (Exception e) {
LOGGER.warn("Unable to parse deleted segment retention config for table {}, using to default: {} ms",
tableConfig.getTableName(), _defaultDeletedSegmentsRetentionMs, e);
}
}
}
return retentionMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,13 @@ public class RetentionManager extends ControllerPeriodicTask<Void> {

private static final Logger LOGGER = LoggerFactory.getLogger(RetentionManager.class);

private final int _deletedSegmentsRetentionInDays;

public RetentionManager(PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager, ControllerConf config, ControllerMetrics controllerMetrics) {
super("RetentionManager", config.getRetentionControllerFrequencyInSeconds(),
config.getRetentionManagerInitialDelayInSeconds(), pinotHelixResourceManager, leadControllerManager,
controllerMetrics);
_deletedSegmentsRetentionInDays = config.getDeletedSegmentsRetentionInDays();

LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}, deletedSegmentsRetentionInDays: {}",
getIntervalInSeconds(), _deletedSegmentsRetentionInDays);
LOGGER.info("Starting RetentionManager with runFrequencyInSeconds: {}", getIntervalInSeconds());
}

@Override
Expand All @@ -97,8 +93,8 @@ protected void processTable(String tableNameWithType) {

@Override
protected void postprocess() {
LOGGER.info("Removing aged (more than {} days) deleted segments for all tables", _deletedSegmentsRetentionInDays);
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments(_deletedSegmentsRetentionInDays);
LOGGER.info("Removing aged deleted segments for all tables");
_pinotHelixResourceManager.getSegmentDeletionManager().removeAgedDeletedSegments();
}

private void manageRetentionForTable(TableConfig tableConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixAdmin;
import org.apache.helix.ZNRecord;
import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.PinotMetricUtils;
Expand All @@ -41,6 +44,7 @@
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.zookeeper.data.Stat;
import org.joda.time.Duration;
import org.joda.time.Interval;
import org.mockito.ArgumentMatchers;
Expand All @@ -49,7 +53,6 @@
import org.testng.Assert;
import org.testng.annotations.Test;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.*;

Expand Down Expand Up @@ -103,7 +106,7 @@ private void testDifferentTimeUnits(long pastTimeStamp, TimeUnit timeUnit, long
SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager();

// Verify that the removeAgedDeletedSegments() method in deletion manager is actually called.
verify(deletionManager, times(1)).removeAgedDeletedSegments(anyInt());
verify(deletionManager, times(1)).removeAgedDeletedSegments();

// Verify that the deleteSegments method is actually called.
verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList());
Expand Down Expand Up @@ -177,7 +180,7 @@ public Void answer(InvocationOnMock invocationOnMock)
throws Throwable {
return null;
}
}).when(deletionManager).removeAgedDeletedSegments(anyInt());
}).when(deletionManager).removeAgedDeletedSegments();
when(resourceManager.getSegmentDeletionManager()).thenReturn(deletionManager);

// If and when PinotHelixResourceManager.deleteSegments() is invoked, make sure that the segments deleted
Expand All @@ -197,6 +200,15 @@ public Object answer(InvocationOnMock invocationOnMock)
return null;
}
}).when(resourceManager).deleteSegments(anyString(), ArgumentMatchers.anyList());

// fake segment lineage.
SegmentLineage segmentLineage = new SegmentLineage(REALTIME_TABLE_NAME);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to mock a segment lineage? Is this change related?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch not anymore since we fake the segment deletion manager. forgot to clean up


ZkHelixPropertyStore<ZNRecord> mockPropertyStore = mock(ZkHelixPropertyStore.class);
when(mockPropertyStore.get(anyString(), any(Stat.class), anyInt())).thenReturn(segmentLineage.toZNRecord());
when(mockPropertyStore.set(anyString(), any(ZNRecord.class), anyInt(), anyInt())).thenReturn(true);

when(resourceManager.getPropertyStore()).thenReturn(mockPropertyStore);
}

// This test makes sure that we clean up the segments marked OFFLINE in realtime for more than 7 days
Expand Down Expand Up @@ -229,7 +241,7 @@ public void testRealtimeLLCCleanup()
SegmentDeletionManager deletionManager = pinotHelixResourceManager.getSegmentDeletionManager();

// Verify that the removeAgedDeletedSegments() method in deletion manager is actually called.
verify(deletionManager, times(1)).removeAgedDeletedSegments(anyInt());
verify(deletionManager, times(1)).removeAgedDeletedSegments();

// Verify that the deleteSegments method is actually called.
verify(pinotHelixResourceManager, times(1)).deleteSegments(anyString(), anyList());
Expand Down Expand Up @@ -296,6 +308,15 @@ private PinotHelixResourceManager setupSegmentMetadata(TableConfig tableConfig,
when(helixAdmin.getResourceIdealState(HELIX_CLUSTER_NAME, REALTIME_TABLE_NAME)).thenReturn(idealState);
when(pinotHelixResourceManager.getHelixAdmin()).thenReturn(helixAdmin);

// fake segment lineage.
SegmentLineage segmentLineage = new SegmentLineage(REALTIME_TABLE_NAME);

ZkHelixPropertyStore<ZNRecord> mockPropertyStore = mock(ZkHelixPropertyStore.class);
when(mockPropertyStore.get(anyString(), any(Stat.class), anyInt())).thenReturn(segmentLineage.toZNRecord());
when(mockPropertyStore.set(anyString(), any(ZNRecord.class), anyInt(), anyInt())).thenReturn(true);

when(pinotHelixResourceManager.getPropertyStore()).thenReturn(mockPropertyStore);

return pinotHelixResourceManager;
}

Expand Down
Loading