Skip to content

Commit

Permalink
Allow moveToFinalLocation in METADATA push based on config (#8823)
Browse files Browse the repository at this point in the history
METADATA push didn't allow the option of moveSegmentToFinalLocation. This meant that if someone had generated segments in a location that was not the deep store, there was absolutely no way to move those segments into deep store without manual scripting.
  • Loading branch information
npawar authored Jun 15, 2022
1 parent 28452e0 commit 378bdec
Show file tree
Hide file tree
Showing 8 changed files with 472 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ public static class CustomHeaders {
public static final String UPLOAD_TYPE = "UPLOAD_TYPE";
public static final String REFRESH_ONLY = "REFRESH_ONLY";
public static final String DOWNLOAD_URI = "DOWNLOAD_URI";

/**
* This header is only used for METADATA push, to allow controller to copy segment to deep store,
* if segment was not placed in the deep store to begin with
*/
public static final String COPY_SEGMENT_TO_DEEP_STORE = "COPY_SEGMENT_TO_DEEP_STORE";
public static final String SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER = "Pinot-SegmentZKMetadataCustomMapModifier";
public static final String CRYPTER = "CRYPTER";
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ public Response downloadSegment(
}

private SuccessResponse uploadSegment(@Nullable String tableName, TableType tableType,
@Nullable FormDataMultiPart multiPart, boolean moveSegmentToFinalLocation, boolean enableParallelPushProtection,
@Nullable FormDataMultiPart multiPart, boolean copySegmentToFinalLocation, boolean enableParallelPushProtection,
boolean allowRefresh, HttpHeaders headers, Request request) {
if (StringUtils.isNotEmpty(tableName)) {
TableType tableTypeFromTableName = TableNameBuilder.getTableTypeFromTableName(tableName);
Expand All @@ -213,13 +213,15 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER);

String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE);
String downloadURI = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
String sourceDownloadURIStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI);
String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER);
String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR);

File tempEncryptedFile = null;
File tempDecryptedFile = null;
File tempSegmentDir = null;
// The downloadUri for putting into segment zk metadata
String segmentDownloadURIStr = sourceDownloadURIStr;
try {
ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance();
String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID();
Expand All @@ -238,20 +240,22 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
"Segment file (as multipart/form-data) is required for SEGMENT upload mode",
Response.Status.BAD_REQUEST);
}
if (!moveSegmentToFinalLocation && StringUtils.isEmpty(downloadURI)) {
if (!copySegmentToFinalLocation && StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER,
"Download URI is required if segment should not be copied to the deep store",
"Source download URI is required in header field 'DOWNLOAD_URI' if segment should not be copied to "
+ "the deep store",
Response.Status.BAD_REQUEST);
}
createSegmentFileFromMultipart(multiPart, destFile);
segmentSizeInBytes = destFile.length();
break;
case URI:
if (StringUtils.isEmpty(downloadURI)) {
throw new ControllerApplicationException(LOGGER, "Download URI is required for URI upload mode",
if (StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER,
"Source download URI is required in header field 'DOWNLOAD_URI' for URI upload mode",
Response.Status.BAD_REQUEST);
}
downloadSegmentFileFromURI(downloadURI, destFile, tableName);
downloadSegmentFileFromURI(sourceDownloadURIStr, destFile, tableName);
segmentSizeInBytes = destFile.length();
break;
case METADATA:
Expand All @@ -260,14 +264,19 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl
"Segment metadata file (as multipart/form-data) is required for METADATA upload mode",
Response.Status.BAD_REQUEST);
}
if (StringUtils.isEmpty(downloadURI)) {
throw new ControllerApplicationException(LOGGER, "Download URI is required for METADATA upload mode",
if (StringUtils.isEmpty(sourceDownloadURIStr)) {
throw new ControllerApplicationException(LOGGER,
"Source download URI is required in header field 'DOWNLOAD_URI' for METADATA upload mode",
Response.Status.BAD_REQUEST);
}
moveSegmentToFinalLocation = false;
// override copySegmentToFinalLocation if override provided in headers:COPY_SEGMENT_TO_DEEP_STORE
// else set to false for backward compatibility
String copySegmentToDeepStore =
extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE);
copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore);
createSegmentFileFromMultipart(multiPart, destFile);
try {
URI segmentURI = new URI(downloadURI);
URI segmentURI = new URI(sourceDownloadURIStr);
PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme());
segmentSizeInBytes = pinotFS.length(segmentURI);
} catch (Exception e) {
Expand Down Expand Up @@ -332,24 +341,25 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl

// Update download URI if controller is responsible for moving the segment to the deep store
URI finalSegmentLocationURI = null;
if (moveSegmentToFinalLocation) {
if (copySegmentToFinalLocation) {
URI dataDirURI = provider.getDataDirURI();
String dataDirPath = dataDirURI.toString();
String encodedSegmentName = URIUtils.encode(segmentName);
String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName);
if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) {
downloadURI = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName);
} else {
downloadURI = finalSegmentLocationPath;
segmentDownloadURIStr = finalSegmentLocationPath;
}
finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath);
}
LOGGER.info("Using download URI: {} for segment: {} of table: {} (move segment: {})", downloadURI, segmentFile,
tableNameWithType, moveSegmentToFinalLocation);
LOGGER.info("Using segment download URI: {} for segment: {} of table: {} (move segment: {})",
segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation);

ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics);
zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile,
downloadURI, crypterName, segmentSizeInBytes, enableParallelPushProtection, allowRefresh, headers);
zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI,
segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes,
enableParallelPushProtection, allowRefresh, headers);

return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType);
} catch (WebApplicationException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.api.upload;

import com.google.common.base.Preconditions;
import java.io.File;
import java.net.URI;
import javax.annotation.Nullable;
Expand All @@ -29,6 +30,7 @@
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.FileUploadDownloadClient.FileUploadType;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.api.exception.ControllerApplicationException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
Expand Down Expand Up @@ -60,7 +62,8 @@ public ZKOperator(PinotHelixResourceManager pinotHelixResourceManager, Controlle
}

public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata,
@Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName,
FileUploadType uploadType, @Nullable URI finalSegmentLocationURI, File segmentFile,
@Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName,
long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
Expand All @@ -76,8 +79,9 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata
Response.Status.GONE);
}
LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType);
processNewSegment(tableNameWithType, segmentMetadata, finalSegmentLocationURI, segmentFile, downloadUrl,
crypterName, segmentSizeInBytes, enableParallelPushProtection, headers);
processNewSegment(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, segmentFile,
sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection,
headers);
} else {
// Refresh an existing segment
if (!allowRefresh) {
Expand All @@ -89,16 +93,16 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata
tableNameWithType), Response.Status.CONFLICT);
}
LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType);
processExistingSegment(tableNameWithType, segmentMetadata, existingSegmentMetadataZNRecord,
finalSegmentLocationURI, segmentFile, downloadUrl, crypterName, segmentSizeInBytes,
enableParallelPushProtection, headers);
processExistingSegment(tableNameWithType, segmentMetadata, uploadType, existingSegmentMetadataZNRecord,
finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName,
segmentSizeInBytes, enableParallelPushProtection, headers);
}
}

private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, File segmentFile,
String downloadUrl, @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection,
HttpHeaders headers)
FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI,
File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr,
@Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
int expectedVersion = existingSegmentMetadataZNRecord.getVersion();
Expand Down Expand Up @@ -179,8 +183,7 @@ private void processExistingSegment(String tableNameWithType, SegmentMetadata se
"New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing "
+ "segment {}", newCrc, existingCrc, segmentName);
if (finalSegmentLocationURI != null) {
moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI);
LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr,
finalSegmentLocationURI);
}

Expand All @@ -191,12 +194,12 @@ private void processExistingSegment(String tableNameWithType, SegmentMetadata se
if (customMapModifier == null) {
// If no modifier is provided, use the custom map from the segment metadata
segmentZKMetadata.setCustomMap(null);
ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl,
crypterName, segmentSizeInBytes);
ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata,
segmentDownloadURIStr, crypterName, segmentSizeInBytes);
} else {
// If modifier is provided, first set the custom map from the segment metadata, then apply the modifier
ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, downloadUrl,
crypterName, segmentSizeInBytes);
ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata,
segmentDownloadURIStr, crypterName, segmentSizeInBytes);
segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap()));
}
if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) {
Expand Down Expand Up @@ -237,16 +240,17 @@ private void checkCRC(HttpHeaders headers, String tableNameWithType, String segm
}
}

private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata,
@Nullable URI finalSegmentLocationURI, File segmentFile, String downloadUrl, @Nullable String crypterName,
long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers)
private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType,
@Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr,
String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes,
boolean enableParallelPushProtection, HttpHeaders headers)
throws Exception {
String segmentName = segmentMetadata.getName();
SegmentZKMetadata newSegmentZKMetadata;
try {
newSegmentZKMetadata =
ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, downloadUrl, crypterName,
segmentSizeInBytes);
ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, segmentDownloadURIStr,
crypterName, segmentSizeInBytes);
} catch (IllegalArgumentException e) {
throw new ControllerApplicationException(LOGGER,
String.format("Got invalid segment metadata when adding segment: %s for table: %s, reason: %s", segmentName,
Expand Down Expand Up @@ -274,8 +278,7 @@ private void processNewSegment(String tableNameWithType, SegmentMetadata segment

if (finalSegmentLocationURI != null) {
try {
moveSegmentToPermanentDirectory(segmentFile, finalSegmentLocationURI);
LOGGER.info("Moved segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr,
finalSegmentLocationURI);
} catch (Exception e) {
// Cleanup the Zk entry and the segment from the permanent directory if it exists.
Expand Down Expand Up @@ -310,9 +313,39 @@ private void processNewSegment(String tableNameWithType, SegmentMetadata segment
}
}

private void moveSegmentToPermanentDirectory(File segmentFile, URI finalSegmentLocationURI)
private void copySegmentToDeepStore(String tableNameWithType, String segmentName, FileUploadType uploadType,
File segmentFile, String sourceDownloadURIStr, URI finalSegmentLocationURI)
throws Exception {
if (uploadType == FileUploadType.METADATA) {
// In Metadata push, local segmentFile only contains metadata.
// Copy segment over from sourceDownloadURI to final location.
copyFromSegmentURIToDeepStore(new URI(sourceDownloadURIStr), finalSegmentLocationURI);
LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
finalSegmentLocationURI);
} else {
// In push types other than METADATA, local segmentFile contains the complete segment.
// Move local segment to final location
copyFromSegmentFileToDeepStore(segmentFile, finalSegmentLocationURI);
LOGGER.info("Copied segment: {} of table: {} to final location: {}", segmentName, tableNameWithType,
finalSegmentLocationURI);
}
}

private void copyFromSegmentFileToDeepStore(File segmentFile, URI finalSegmentLocationURI)
throws Exception {
LOGGER.info("Copying segment from: {} to: {}", segmentFile.getAbsolutePath(), finalSegmentLocationURI);
PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copyFromLocalFile(segmentFile, finalSegmentLocationURI);
}

private void copyFromSegmentURIToDeepStore(URI sourceDownloadURI, URI finalSegmentLocationURI)
throws Exception {
if (sourceDownloadURI.equals(finalSegmentLocationURI)) {
LOGGER.info("Skip copying segment as sourceDownloadURI: {} is the same as finalSegmentLocationURI",
sourceDownloadURI);
} else {
Preconditions.checkState(sourceDownloadURI.getScheme().equals(finalSegmentLocationURI.getScheme()));
LOGGER.info("Copying segment from: {} to: {}", sourceDownloadURI, finalSegmentLocationURI);
PinotFSFactory.create(finalSegmentLocationURI.getScheme()).copy(sourceDownloadURI, finalSegmentLocationURI);
}
}
}
Loading

0 comments on commit 378bdec

Please sign in to comment.