diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java index 5dda56c6d5..7d60c99f9b 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java @@ -28,6 +28,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.helix.AccessOption; import org.apache.helix.BaseDataAccessor; import org.apache.helix.BucketDataAccessor; @@ -157,12 +158,13 @@ public void testMultipleWrites() throws Exception { */ @Test(dependsOnMethods = "testMultipleWrites") public void testCompressedBucketRead() throws IOException { - _bucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record)); - HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(PATH, HelixProperty.class); + String path = PATH + "_" + TestHelper.getTestMethodName(); + _bucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record)); + HelixProperty readRecord = _bucketDataAccessor.compressedBucketRead(path, HelixProperty.class); Assert.assertEquals(readRecord.getRecord().getSimpleField(NAME_KEY), NAME_KEY); Assert.assertEquals(readRecord.getRecord().getListField(NAME_KEY), LIST_FIELD); Assert.assertEquals(readRecord.getRecord().getMapField(NAME_KEY), MAP_FIELD); - _bucketDataAccessor.compressedBucketDelete(PATH); + _bucketDataAccessor.compressedBucketDelete(path); } /** @@ -194,29 +196,35 @@ public void testLargeWriteAndRead() throws IOException { * Test to ensure bucket GC still occurs in high frequency write scenarios. */ @Test(dependsOnMethods = "testLargeWriteAndRead") - public void testGCScheduler() throws IOException, InterruptedException { + public void testGCCompletesUnderHighFrequency() throws Exception { + String path = PATH + "_" + TestHelper.getTestMethodName(); long gcTTL = 1000; // GC schedule for 1 second after write ZkBucketDataAccessor fastGCBucketDataAccessor = new ZkBucketDataAccessor(_zkClient, 50 * 1024, gcTTL); - int writeCount = 10; - for (int i = 0; i < writeCount; i++) { - Thread.sleep(gcTTL / 2); - Assert.assertTrue(fastGCBucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record))); - } - List children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT); - // remove from list if name cant be parsed into long (aka not a version count node) - children.removeIf(name -> { - try { - Long.parseLong(name); - return false; - } catch (NumberFormatException e) { - return true; + AtomicInteger writeCount = new AtomicInteger(0); + // Below verifier continuously writes to the same path and then checks if the # of children is less than the # of + // times we have written. This will only be true once the GC has cleaned up old versions, which will occur once the + // GC time for the first write has passed. + Assert.assertTrue(TestHelper.verify(() -> { + Assert.assertTrue(fastGCBucketDataAccessor.compressedBucketWrite(path, new HelixProperty(record))); + Thread.sleep(gcTTL/4); + List children = _zkBaseDataAccessor.getChildNames(path, AccessOption.PERSISTENT); + // remove from list if name cant be parsed into long (aka not a version count node) + children.removeIf(name -> { + try { + Long.parseLong(name); + return false; + } catch (NumberFormatException e) { + return true; + } + }); + boolean result = children.size() < writeCount.incrementAndGet(); + if (!result) { + System.out.println("Expecting stale versions to have been cleaned up. Write count was: " + writeCount + + ", children were: " + children); } - }); - - Assert.assertTrue(children.size() < writeCount, - "Expecting stale versions to cleaned up. Children were: " + children); - System.out.print("Children after GC: " + children); + return result; + }, TestHelper.WAIT_DURATION)); } private HelixProperty createLargeHelixProperty(String name, int numEntries) {