Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Flaky Test - testGCScheduler in TestZkBucketDataAcessor #3003

Merged
merged 1 commit into from
Feb 18, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.BucketDataAccessor;
Expand Down Expand Up @@ -157,12 +158,13 @@ public void testMultipleWrites() throws Exception {
*/
@Test(dependsOnMethods = "testMultipleWrites")
public void testCompressedBucketRead() throws IOException {
_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class);
String path = PATH + "_" + TestHelper.getTestMethodName();
_bucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record));
HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(path, HelixProperty.class);
Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY);
Assert.assertEquals(readRecord.getRecord().getListField(NAME_KEY), LIST_FIELD);
Assert.assertEquals(readRecord.getRecord().getMapField(NAME_KEY), MAP_FIELD);
_bucketDataAccessor.compressedBucketDelete(PATH);
_bucketDataAccessor.compressedBucketDelete(path);
}

/**
Expand Down Expand Up @@ -194,29 +196,35 @@ public void testLargeWriteAndRead() throws IOException {
* Test to ensure bucket GC still occurs in high frequency write scenarios.
*/
@Test(dependsOnMethods = "testLargeWriteAndRead")
public void testGCScheduler() throws IOException, InterruptedException {
public void testGCCompletesUnderHighFrequency() throws Exception {
String path = PATH + "_" + TestHelper.getTestMethodName();
long gcTTL = 1000; // GC schedule for 1 second after write
ZkBucketDataAccessor fastGCBucketDataAccessor = new ZkBucketDataAccessor(_zkClient, 50 * 1024, gcTTL);

int writeCount = 10;
for (int i = 0; i < writeCount; i++) {
Thread.sleep(gcTTL / 2);
Assert.assertTrue(fastGCBucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record)));
}
List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
// remove from list if name cant be parsed into long (aka not a version count node)
children.removeIf(name -> {
try {
Long.parseLong(name);
return false;
} catch (NumberFormatException e) {
return true;
AtomicInteger writeCount = new AtomicInteger(0);
// Below verifier continuously writes to the same path and then checks if the # of children is less than the # of
// times we have written. This will only be true once the GC has cleaned up old versions, which will occur once the
// GC time for the first write has passed.
Assert.assertTrue(TestHelper.verify(() -> {
Assert.assertTrue(fastGCBucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record)));
Thread.sleep(gcTTL/4);
List<String> children = _zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT);
// remove from list if name cant be parsed into long (aka not a version count node)
children.removeIf(name -> {
try {
Long.parseLong(name);
return false;
} catch (NumberFormatException e) {
return true;
}
});
boolean result = children.size() < writeCount.incrementAndGet();
if (!result) {
System.out.println("Expecting stale versions to have been cleaned up. Write count was: " + writeCount +
", children were: " + children);
}
});

Assert.assertTrue(children.size() < writeCount,
"Expecting stale versions to cleaned up. Children were: " + children);
System.out.print("Children after GC: " + children);
return result;
}, TestHelper.WAIT_DURATION));
}

private HelixProperty createLargeHelixProperty(String name, int numEntries) {
Expand Down