Skip to content

Commit

Permalink
fix flaky test
Browse files Browse the repository at this point in the history
  • Loading branch information
GrantPSpencer committed Feb 6, 2025
1 parent 33a28e7 commit f0b33ce
Showing 1 changed file with 30 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.BucketDataAccessor;
Expand Down Expand Up @@ -157,12 +158,13 @@ public void testMultipleWrites() throws Exception {
*/
@Test(dependsOnMethods = "testMultipleWrites")
public void testCompressedBucketRead() throws IOException {
_bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record));
HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class);
String path = PATH + "_" + TestHelper.getTestMethodName();
_bucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record));
HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(path, HelixProperty.class);
Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY);
Assert.assertEquals(readRecord.getRecord().getListField(NAME_KEY), LIST_FIELD);
Assert.assertEquals(readRecord.getRecord().getMapField(NAME_KEY), MAP_FIELD);
_bucketDataAccessor.compressedBucketDelete(PATH);
_bucketDataAccessor.compressedBucketDelete(path);
}

/**
Expand Down Expand Up @@ -194,29 +196,35 @@ public void testLargeWriteAndRead() throws IOException {
* Test to ensure bucket GC still occurs in high frequency write scenarios.
*/
@Test(dependsOnMethods = "testLargeWriteAndRead")
public void testGCScheduler() throws IOException, InterruptedException {
public void testGCCompletesUnderHighFrequency() throws Exception {
String path = PATH + "_" + TestHelper.getTestMethodName();
long gcTTL = 1000; // GC schedule for 1 second after write
ZkBucketDataAccessor fastGCBucketDataAccessor = new ZkBucketDataAccessor(_zkClient, 50 * 1024, gcTTL);

int writeCount = 10;
for (int i = 0; i < writeCount; i++) {
Thread.sleep(gcTTL / 2);
Assert.assertTrue(fastGCBucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record)));
}
List<String> children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT);
// remove from list if name cant be parsed into long (aka not a version count node)
children.removeIf(name -> {
try {
Long.parseLong(name);
return false;
} catch (NumberFormatException e) {
return true;
AtomicInteger writeCount = new AtomicInteger(0);
// Below verifier continuously writes to the same path and then checks if the # of children is less than the # of
// times we have written. This will only be true once the GC has cleaned up old versions, which will occur once the
// GC time for the first write has passed.
Assert.assertTrue(TestHelper.verify(() -> {
Assert.assertTrue(fastGCBucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record)));
Thread.sleep(gcTTL/4);
List<String> children = _zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT);
// remove from list if name cant be parsed into long (aka not a version count node)
children.removeIf(name -> {
try {
Long.parseLong(name);
return false;
} catch (NumberFormatException e) {
return true;
}
});
boolean result = children.size() < writeCount.incrementAndGet();
if (!result) {
System.out.println("Expecting stale versions to have been cleaned up. Write count was: " + writeCount +
", children were: " + children);
}
});

Assert.assertTrue(children.size() < writeCount,
"Expecting stale versions to cleaned up. Children were: " + children);
System.out.print("Children after GC: " + children);
return result;
}, TestHelper.WAIT_DURATION));
}

private HelixProperty createLargeHelixProperty(String name, int numEntries) {
Expand Down

0 comments on commit f0b33ce

Please sign in to comment.