Skip to content

Commit

Permalink
add auth token for segment replace rest APIs (#8146)
Browse files Browse the repository at this point in the history
Add auth token param for segment replace rest APIs, as done for the other rest APIs in FileUploadDownloadClient
  • Loading branch information
klsince authored Feb 7, 2022
1 parent df1c268 commit 08f0f32
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -436,17 +436,24 @@ private static HttpUriRequest getSendSegmentJsonRequest(URI uri, String jsonStri
return requestBuilder.build();
}

private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs) {
private static HttpUriRequest getStartReplaceSegmentsRequest(URI uri, String jsonRequestBody, int socketTimeoutMs,
@Nullable String authToken) {
RequestBuilder requestBuilder =
RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1).setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE)
.setEntity(new StringEntity(jsonRequestBody, ContentType.APPLICATION_JSON));
if (StringUtils.isNotBlank(authToken)) {
requestBuilder.addHeader("Authorization", authToken);
}
setTimeout(requestBuilder, socketTimeoutMs);
return requestBuilder.build();
}

private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs) {
private static HttpUriRequest getEndReplaceSegmentsRequest(URI uri, int socketTimeoutMs, @Nullable String authToken) {
RequestBuilder requestBuilder = RequestBuilder.post(uri).setVersion(HttpVersion.HTTP_1_1)
.setHeader(HttpHeaders.CONTENT_TYPE, JSON_CONTENT_TYPE);
if (StringUtils.isNotBlank(authToken)) {
requestBuilder.addHeader("Authorization", authToken);
}
setTimeout(requestBuilder, socketTimeoutMs);
return requestBuilder.build();
}
Expand Down Expand Up @@ -1023,28 +1030,31 @@ public SimpleHttpResponse sendSegmentJson(URI uri, String jsonString)
*
* @param uri URI
* @param startReplaceSegmentsRequest request
* @param authToken auth token
* @return Response
* @throws IOException
* @throws HttpErrorStatusException
*/
public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest)
public SimpleHttpResponse startReplaceSegments(URI uri, StartReplaceSegmentsRequest startReplaceSegmentsRequest,
@Nullable String authToken)
throws IOException, HttpErrorStatusException {
return sendRequest(getStartReplaceSegmentsRequest(uri, JsonUtils.objectToString(startReplaceSegmentsRequest),
DEFAULT_SOCKET_TIMEOUT_MS));
DEFAULT_SOCKET_TIMEOUT_MS, authToken));
}

/**
* End replace segments with default settings.
*
* @param uri URI
* @oaram socketTimeoutMs Socket timeout in milliseconds
* @param authToken auth token
* @return Response
* @throws IOException
* @throws HttpErrorStatusException
*/
public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs)
public SimpleHttpResponse endReplaceSegments(URI uri, int socketTimeoutMs, @Nullable String authToken)
throws IOException, HttpErrorStatusException {
return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs));
return sendRequest(getEndReplaceSegmentsRequest(uri, socketTimeoutMs, authToken));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig
List<String> segmentsTo =
segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
lineageEntryId = SegmentConversionUtils.startSegmentReplace(tableNameWithType, uploadURL,
new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo), authToken);
}

// Upload the tarred segments
Expand Down Expand Up @@ -213,9 +213,8 @@ public List<SegmentConversionResult> executeTask(PinotTaskConfig pinotTaskConfig

// Update the segment lineage to indicate that the segment replacement is done.
if (replaceSegmentsEnabled) {
SegmentConversionUtils
.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
_minionConf.getEndReplaceSegmentsTimeoutMs());
SegmentConversionUtils.endSegmentReplace(tableNameWithType, uploadURL, lineageEntryId,
_minionConf.getEndReplaceSegmentsTimeoutMs(), authToken);
}

String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.URI;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.http.Header;
import org.apache.http.HttpHeaders;
Expand Down Expand Up @@ -122,15 +123,16 @@ public static void uploadSegment(Map<String, String> configs, List<Header> httpH
}

public static String startSegmentReplace(String tableNameWithType, String uploadURL,
StartReplaceSegmentsRequest startReplaceSegmentsRequest)
StartReplaceSegmentsRequest startReplaceSegmentsRequest, @Nullable String authToken)
throws Exception {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
SSLContext sslContext = MinionContext.getInstance().getSSLContext();
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
URI uri =
FileUploadDownloadClient.getStartReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), true);
SimpleHttpResponse response = fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest);
SimpleHttpResponse response =
fileUploadDownloadClient.startReplaceSegments(uri, startReplaceSegmentsRequest, authToken);
String responseString = response.getResponse();
LOGGER.info(
"Got response {}: {} while sending start replace segment request for table: {}, uploadURL: {}, request: {}",
Expand All @@ -140,15 +142,15 @@ public static String startSegmentReplace(String tableNameWithType, String upload
}

public static void endSegmentReplace(String tableNameWithType, String uploadURL, String segmentLineageEntryId,
int socketTimeoutMs)
int socketTimeoutMs, @Nullable String authToken)
throws Exception {
String rawTableName = TableNameBuilder.extractRawTableName(tableNameWithType);
TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
SSLContext sslContext = MinionContext.getInstance().getSSLContext();
try (FileUploadDownloadClient fileUploadDownloadClient = new FileUploadDownloadClient(sslContext)) {
URI uri = FileUploadDownloadClient
.getEndReplaceSegmentsURI(new URI(uploadURL), rawTableName, tableType.name(), segmentLineageEntryId);
SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs);
SimpleHttpResponse response = fileUploadDownloadClient.endReplaceSegments(uri, socketTimeoutMs, authToken);
LOGGER.info("Got response {}: {} while sending end replace segment request for table: {}, uploadURL: {}",
response.getStatusCode(), response.getResponse(), tableNameWithType, uploadURL);
}
Expand Down

0 comments on commit 08f0f32

Please sign in to comment.