Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix re-uploading segment when the previous upload failed #9631

Merged
merged 1 commit into from
Oct 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,12 @@ public static boolean setSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> proper
return setSegmentZKMetadata(propertyStore, tableNameWithType, segmentZKMetadata, -1);
}

public static boolean removeSegmentZKMetadata(ZkHelixPropertyStore<ZNRecord> propertyStore, String tableNameWithType,
String segmentName) {
return propertyStore.remove(constructPropertyStorePathForSegment(tableNameWithType, segmentName),
AccessOption.PERSISTENT);
}

@Nullable
public static ZNRecord getZnRecord(ZkHelixPropertyStore<ZNRecord> propertyStore, String path) {
Stat stat = new Stat();
Expand All @@ -238,7 +244,6 @@ public static SegmentZKMetadata getSegmentZKMetadata(ZkHelixPropertyStore<ZNReco
AccessOption.PERSISTENT);
return znRecord != null ? new SegmentZKMetadata(znRecord) : null;
}

@Nullable
public static UserConfig getUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String username) {
ZNRecord znRecord =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import javax.annotation.Nullable;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.helix.model.IdealState;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
Expand Down Expand Up @@ -67,10 +68,20 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata
long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
ZNRecord existingSegmentMetadataZNRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
boolean refreshOnly =
Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY));

ZNRecord existingSegmentMetadataZNRecord =
_pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName);
if (existingSegmentMetadataZNRecord != null && shouldProcessAsNewSegment(tableNameWithType, segmentName,
existingSegmentMetadataZNRecord, enableParallelPushProtection)) {
LOGGER.warn("Removing segment ZK metadata (recovering from previous upload failure) for table: {}, segment: {}",
tableNameWithType, segmentName);
Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType, segmentName),
"Failed to remove segment ZK metadata for table: %s, segment: %s", tableNameWithType, segmentName);
existingSegmentMetadataZNRecord = null;
}

if (existingSegmentMetadataZNRecord == null) {
// Add a new segment
if (refreshOnly) {
Expand Down Expand Up @@ -99,6 +110,44 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata
}
}

/**
* Returns {@code true} when the segment should be processed as new segment.
* <p>When segment ZK metadata exists, check if segment exists in the ideal state. If the previous upload failed after
* segment ZK metadata is created but before assigning the segment to the ideal state, we want to remove the existing
* segment ZK metadata and treat it as a new segment.
*/
private boolean shouldProcessAsNewSegment(String tableNameWithType, String segmentName,
ZNRecord existingSegmentMetadataZNRecord, boolean enableParallelPushProtection) {
IdealState idealState = _pinotHelixResourceManager.getTableIdealState(tableNameWithType);
Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType);
if (idealState.getInstanceStateMap(segmentName) != null) {
return false;
}
// Segment does not exist in the ideal state
if (enableParallelPushProtection) {
// Check segment upload start time when parallel push protection is enabled in case the segment is being uploaded
long segmentUploadStartTime = new SegmentZKMetadata(existingSegmentMetadataZNRecord).getSegmentUploadStartTime();
if (segmentUploadStartTime > 0) {
handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime);
}
}
return true;
}

private void handleParallelPush(String tableNameWithType, String segmentName, long segmentUploadStartTime) {
assert segmentUploadStartTime > 0;
if (System.currentTimeMillis() - segmentUploadStartTime > _controllerConf.getSegmentUploadTimeoutInMillis()) {
// Last segment upload does not finish properly, replace the segment
LOGGER.error("Segment: {} of table: {} was not properly uploaded, replacing it", segmentName, tableNameWithType);
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED, 1L);
} else {
// Another segment upload is in progress
throw new ControllerApplicationException(LOGGER,
String.format("Another segment upload is in progress for segment: %s of table: %s, retry later", segmentName,
tableNameWithType), Response.Status.CONFLICT);
}
}

private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI,
File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr,
Expand All @@ -117,17 +166,7 @@ private void processExistingSegment(String tableNameWithType, SegmentMetadata se
// When segment upload start time is larger than 0, that means another upload is in progress
long segmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime();
if (segmentUploadStartTime > 0) {
if (System.currentTimeMillis() - segmentUploadStartTime > _controllerConf.getSegmentUploadTimeoutInMillis()) {
// Last segment upload does not finish properly, replace the segment
LOGGER.error("Segment: {} of table: {} was not properly uploaded, replacing it", segmentName,
tableNameWithType);
_controllerMetrics.addMeteredGlobalValue(ControllerMeter.NUMBER_SEGMENT_UPLOAD_TIMEOUT_EXCEEDED, 1L);
} else {
// Another segment upload is in progress
throw new ControllerApplicationException(LOGGER,
String.format("Another segment upload is in progress for segment: %s of table: %s, retry later",
segmentName, tableNameWithType), Response.Status.CONFLICT);
}
handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime);
}

// Lock the segment by setting the upload start time in ZK
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2247,6 +2247,10 @@ public boolean updateZkMetadata(String tableNameWithType, SegmentZKMetadata segm
return ZKMetadataProvider.setSegmentZKMetadata(_propertyStore, tableNameWithType, segmentZKMetadata);
}

public boolean removeSegmentZKMetadata(String tableNameWithType, String segmentName) {
return ZKMetadataProvider.removeSegmentZKMetadata(_propertyStore, tableNameWithType, segmentName);
}

/**
* Delete the table on servers by sending table deletion message
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,9 @@ protected synchronized void deleteSegmentFromPropertyStoreAndLocal(String tableN
}
segmentsToDelete.removeAll(propStoreFailedSegs);

// TODO: If removing segments from deep store fails (e.g. controller crashes, deep store unavailable), these
// segments will become orphans and not easy to track because their ZK metadata are already deleted.
// Consider removing segments from deep store before cleaning up the ZK metadata.
removeSegmentsFromStore(tableName, segmentsToDelete, deletedSegmentsRetentionMs);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.Response;
import org.apache.commons.io.FileUtils;
import org.apache.helix.model.IdealState;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
Expand Down Expand Up @@ -220,7 +221,6 @@ public void testCompleteSegmentOperations()

zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
"downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders);

SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
assertEquals(segmentZKMetadata.getCrc(), 12345L);
Expand All @@ -233,6 +233,30 @@ public void testCompleteSegmentOperations()
assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
assertEquals(segmentZKMetadata.getSizeInBytes(), 10);

// Test if the same segment can be uploaded when the previous upload failed after segment ZK metadata is created but
// before segment is assigned to the ideal state
// Manually remove the segment from the ideal state
IdealState idealState = _resourceManager.getTableIdealState(OFFLINE_TABLE_NAME);
assertNotNull(idealState);
idealState.getRecord().getMapFields().remove(SEGMENT_NAME);
_resourceManager.getHelixAdmin()
.setResourceIdealState(_resourceManager.getHelixClusterName(), OFFLINE_TABLE_NAME, idealState);
// The segment should be uploaded as a new segment (push time should change, and refresh time shouldn't be set)
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
"downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders);
segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME);
assertNotNull(segmentZKMetadata);
assertEquals(segmentZKMetadata.getCrc(), 12345L);
assertEquals(segmentZKMetadata.getCreationTime(), 123L);
assertTrue(segmentZKMetadata.getPushTime() > pushTime);
pushTime = segmentZKMetadata.getPushTime();
assertTrue(pushTime > 0);
assertEquals(segmentZKMetadata.getRefreshTime(), Long.MIN_VALUE);
assertEquals(segmentZKMetadata.getDownloadUrl(), "downloadUrl");
assertEquals(segmentZKMetadata.getCrypterName(), "crypter");
assertEquals(segmentZKMetadata.getSegmentUploadStartTime(), -1);
assertEquals(segmentZKMetadata.getSizeInBytes(), 10);

// Upload the same segment with allowRefresh = false. Validate that an exception is thrown.
try {
zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null,
Expand Down