-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[7338] Allow Reloading Segments with Multiple Threads #7893
Conversation
Working on tests for this. |
Codecov Report
@@ Coverage Diff @@
## master #7893 +/- ##
============================================
- Coverage 71.34% 71.31% -0.03%
- Complexity 4087 4194 +107
============================================
Files 1587 1595 +8
Lines 82071 82568 +497
Branches 12267 12320 +53
============================================
+ Hits 58553 58885 +332
- Misses 19550 19708 +158
- Partials 3968 3975 +7
Flags with carried forward coverage won't be shown. Click here to find out more.
Continue to review full report at Codecov.
|
CompletableFuture.runAsync(() -> { | ||
try { | ||
reloadSegment(tableNameWithType, segmentMetadata, tableConfig, schema, forceDownload); | ||
} catch (Exception e) { | ||
String segmentName = segmentMetadata.getName(); | ||
LOGGER.error("Caught exception while reloading segment: {} in table: {}", segmentName, tableNameWithType, e); | ||
failedSegments.add(segmentName); | ||
sampleException.set(e); | ||
} finally { | ||
latch.countDown(); | ||
} | ||
}, workers); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can make better use of the CompletableFuture
API here:
CompletableFuture.allOf(segmentsMetadata.stream()
.map(segmentMetadata -> CompletableFuture.runAsync(() -> {... reload the segment ...}, workers)
.toArray(CompletableFuture[]::new))
.get(timeout, TimeUnit.MILLISECONDS);
Which would eliminate the latch and make the code easier to reason about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is excellent. Thanks! Definitely need to revisit CompletableFuture
's API once.
@@ -49,6 +50,13 @@ public SegmentReloadMessage(@Nonnull String tableNameWithType, @Nullable String | |||
znRecord.setBooleanField(FORCE_DOWNLOAD_KEY, forceDownload); | |||
} | |||
|
|||
public SegmentReloadMessage(@Nonnull String tableNameWithType, @Nullable String segmentName, boolean forceDownload, | |||
int parallelism) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know this is draft PR
Just curious on the approach. How does the user specify parallelism parameter ? Can we use semaphore like we do in refresh (which is also delivered to server via helix message) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@suddendust if this is a draft PR, can you please add WIP in the title? thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@siddharthteotia After some discussion, the conclusion is that there is no point in having yet another param to control the parallelism. Instead, using the existing _refreshThreadsSemaphore
for this makes sense as reload and refresh are so similar. For each available permit, we reload a segment asynchronously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, sound good
@Jackie-Jiang Requesting review, thanks! |
try { | ||
acquireSema("ALL", refreshThreadSemaphore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put the segment name
acquireSema("ALL", refreshThreadSemaphore); | |
acquireSema(segmentsMetadata.getName(), refreshThreadSemaphore); |
try { | ||
acquireSema("ALL", refreshThreadSemaphore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
acquireSema()
should be outside of the try
block
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that was a miss. Thanks!
} | ||
} | ||
}, workers)).toArray(CompletableFuture[]::new)).get(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to shut down the executor service after it is done?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be GCed after ~ 60s (TTL of cached threads). However, it's a good idea to shut it down explicitly. Addressed.
LOGGER.info("Reloaded all segments in table: {}", tableNameWithType); | ||
} | ||
|
||
private void acquireSema(String context, Semaphore refreshThreadSemaphore) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first argument is segmentName
(optional) This part is duplicated. One way to avoid duplicate code is to add a SegmentRefreshSemaphore
class to wrap this logic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created a wrapper class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with minor comments
} catch (Exception e) { | ||
LOGGER.error("Caught exception while reloading segment: {} in table: {}", segmentName, tableNameWithType, e); | ||
failedSegments.add(segmentName); | ||
sampleException.set(e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This catch is duplicated. We only need one try/catch/finally
here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The outside try-catch
is to handle any InterruptedException
while trying to acquire a permit (acquireSema
throws an InterruptedException
). If we put a finally
in that block it'll try to release a permit in this case which would be wrong. So we need to another try-catch
inside the outer try
to execute the finally
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. In that case we should remove this catch
block (keep the try/finally
), and let the outside catch to handle the exception
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that one is redundant. Addressed.
pinot-core/src/main/java/org/apache/pinot/core/util/SegmentRefreshSemaphore.java
Outdated
Show resolved
Hide resolved
…reshSemaphore.java Co-authored-by: Xiaotian (Jackie) Jiang <[email protected]>
Description
This change allows users to reload segments of a table in parallel. More description here.
Upgrade Notes
Does this PR prevent a zero down-time upgrade? (Assume upgrade order: Controller, Broker, Server, Minion)
Does this PR fix a zero-downtime upgrade introduced earlier?
Does this PR otherwise need attention when creating release notes? Things to consider: