-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Enable Consistent Data Push for Standalone Segment Push Job Runners #9295
Conversation
9307bac
to
356183a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Partial Review
@@ -81,10 +89,15 @@ protected List<String> getBloomFilterColumns() { | |||
return null; | |||
} | |||
|
|||
@BeforeMethod | |||
public void setUpTest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that this is not actually necessary.
We can call TestUtils.ensureDirectoriesExistAndEmpty()
function in setUp()
and clean up those in cleanUp()
For 2 different tests, we can use different table names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Calling TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir)
in setUp
is insufficient as there is another test case testUploadAndQuery
that leaves data in the tmp directories.
Using two table names will also lead to more code for now, since it seems like BaseClusterIntegrationTest
is written in a way that assumes one table/config per subclass, e.g. getTableName
, createOfflineTableConfig
.
Hence, I prefer to keep the test structure as is with the before and after method annotations, unless this is an anti-pattern. Another way is to create a different test class altogether, but combining the two here to not prevent slow down from too many tests.
@@ -217,10 +358,15 @@ public Boolean apply(@Nullable Void aVoid) { | |||
}, 100L, 300_000, "Failed to load " + countStarResult + " documents", true); | |||
} | |||
|
|||
@AfterMethod |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also not necessary if we use different table names.
...ion-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
Outdated
Show resolved
Hide resolved
...ion-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
Outdated
Show resolved
Hide resolved
...egment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
Outdated
Show resolved
Hide resolved
Codecov Report
@@ Coverage Diff @@
## master #9295 +/- ##
============================================
- Coverage 69.80% 69.76% -0.04%
+ Complexity 4777 4703 -74
============================================
Files 1875 1878 +3
Lines 99860 99930 +70
Branches 15194 15192 -2
============================================
+ Hits 69706 69718 +12
- Misses 25231 25285 +54
- Partials 4923 4927 +4
Flags with carried forward coverage won't be shown. Click here to find out more.
📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
Outdated
Show resolved
Hide resolved
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
Outdated
Show resolved
Hide resolved
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
String responseString = response.getResponse(); | ||
JsonNode responseJsonNode = JsonUtils.stringToJsonNode(responseString); | ||
Iterator<JsonNode> responseElements = responseJsonNode.elements(); | ||
while (responseElements.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we encapsulate the logic of fetching the segment names into a method, or is there any existing logic doing that right now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you clarify a bit more on the input parameters/ask for this method we are talking about? I'll be glad to extract that out if it does not already exist.
Ohh I see, I'll look into possibilities of reusing getSelectedSegments
API that you previously brought in. That may be used when we support APPEND tables anyways! Thx.
AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(spec.getAuthToken()); | ||
LOGGER.info("Start replace segment URIs: " + segmentsUris); | ||
|
||
int attempts = 1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have a larger number than 1 for a retrial attempt? What's the reasoning behind of doing a retry only for once?
for (URI uri : uriToLineageEntryIdMap.keySet()) { | ||
String segmentLineageEntryId = uriToLineageEntryIdMap.get(uri); | ||
try { | ||
FILE_UPLOAD_DOWNLOAD_CLIENT.endReplaceSegments( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add the retry here as well to keep the consistency with the startReplaceSegments call?
...egment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
Show resolved
Hide resolved
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPushUtils.class); | ||
private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); | ||
private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
tableTypeToFilter, excludeReplacedSegments); | ||
RequestBuilder requestBuilder = RequestBuilder.get(uri).setVersion(HttpVersion.HTTP_1_1); | ||
HttpClient.setTimeout(requestBuilder, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); | ||
RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0).attempt(() -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same for this..
...egment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
Outdated
Show resolved
Hide resolved
|
||
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentPushUtils.class); | ||
private static final FileUploadDownloadClient FILE_UPLOAD_DOWNLOAD_CLIENT = new FileUploadDownloadClient(); | ||
private static final RetryPolicy DEFAULT_RETRY_POLICY = RetryPolicies.exponentialBackoffRetryPolicy(5, 10_000L, 2.0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm
pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java
Outdated
Show resolved
Hide resolved
…ed BaseSegmentPushJobRunner
…ement protocl and segments GET
89474ac
to
5267dec
Compare
pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
Outdated
Show resolved
Hide resolved
...egment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
Outdated
Show resolved
Hide resolved
...egment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
Outdated
Show resolved
Hide resolved
...egment-local/src/main/java/org/apache/pinot/segment/local/utils/ConsistentDataPushUtils.java
Outdated
Show resolved
Hide resolved
...n/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
Outdated
Show resolved
Hide resolved
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java
Outdated
Show resolved
Hide resolved
} | ||
public void uploadSegments(Map<String, String> segmentsUriToTarPathMap) | ||
throws AttemptsExceededException, RetriableOperationException { | ||
SegmentPushUtils.pushSegments(_spec, _outputDirFS, new ArrayList<>(segmentsUriToTarPathMap.values())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@snleee Do you think this might cause some performance loss (in the case of having many segments to push)? If so, might need to change pushSegments
to take in Collection<String>
as opposed to List<String>
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For copying the elements from map.values()
to List
?
In our case, the worst-case scenario should be 10s thousands of segments for a table and I think it's OK to keep it this way. I don't think that it's a performance-critical path.
} | ||
public void uploadSegments(Map<String, String> segmentsUriToTarPathMap) | ||
throws AttemptsExceededException, RetriableOperationException { | ||
SegmentPushUtils.sendSegmentUris(_spec, new ArrayList<>(segmentsUriToTarPathMap.keySet())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me otherwise! Thank you for addressing all the comments.
* tarPaths (values), or both may be used depending on upload mode. | ||
*/ | ||
public List<String> getSegmentsToReplace(Map<String, String> segmentsUriToTarPathMap) { | ||
return SegmentPushUtils.getSegmentNames(segmentsUriToTarPathMap.values()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's put the logic here instead of putting in Util
class. We usually extract the logic to Util
class when the same code needs to be called in multiple places to avoid the duplicate code. In this case, we only need to use this function here.
This is a great feature! Can you please help also update the pinot documentation about this new feature? https://github.com/pinot-contrib/pinot-docs |
Description:
This PR addresses #9268 for the segment push job runners under the standalone execution framework:
SegmentMetadataPushJobRunner
,SegmentTarPushJobRunner
, andSegmentUriPushJobRunner
.This is accomplished by introducing a new class
ConsistentDataPushUtils
which contains APIs and helpers for*PushJobRunner(s)
to call to invoke the consistent push protocol.Since there are large overlaps in the code for all of the
*PushJobRunner(s)
, also took this opportunity to refactor and extract the common logic out toBaseSegmentPushJobRunner
abstract class.To enable consistent data push, this PR also introduces a new boolean config in table config under
TableConfig->IngestionConfig->BatchIngestionConfig->consistentDataPush
.Users can enable consistent data push by setting the
consistentDataPush
config to true as below before invoking ingestion jobs,which will
Testing Done:
Added new test
testUploadAndQueryWithConsistentPush
inSegmentUploadIntegrationTest
, whichRuns SegmentMetadataPushJobRunner with consistent push enabled.
[] -> [v1 segments]
Checks that the segment lineage entry is in expected and completed state.
Checks that count stars return expected outputs.
Runs SegmentTarPushJobRunner with consistent push enabled.
[v1 segments] -> [v2 segments]
Checks again that the segment lineage entry is in expected and completed state.
Checks again that count stars return expected outputs (that we have successfully bulk replaced the original set of segments).