From 5113f97dfcc9b48e909cd790c11d85b972e77acf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Grant=20Pal=C3=A1u=20Spencer?= Date: Fri, 16 Aug 2024 14:35:06 -0700 Subject: [PATCH] ZkBucketDataAccessor always complete scheduled GC (#2873) ZkBucketDataAccessor always complete scheduled GC --- .../manager/zk/ZkBucketDataAccessor.java | 54 +++++++++++-------- .../manager/zk/TestZkBucketDataAccessor.java | 39 ++++++++++++-- 2 files changed, 66 insertions(+), 27 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java index a99ca99aa5..bb82dbe836 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java @@ -23,9 +23,9 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; -import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -79,7 +79,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable { private final ZkSerializer _zkSerializer; private final RealmAwareZkClient _zkClient; private final ZkBaseDataAccessor _zkBaseDataAccessor; - private final Map _gcTaskFutureMap = new HashMap<>(); + private final Map _gcTaskFutureMap = new ConcurrentHashMap<>(); private boolean _usesExternalZkClient = false; /** @@ -239,7 +239,7 @@ public boolean compressedBucketWrite(String rootPath, } // 5. Update the timer for GC - updateGCTimer(rootPath, version); + scheduleStaleVersionGC(rootPath); return true; } @@ -268,13 +268,7 @@ public void disconnect() { private HelixProperty compressedBucketRead(String path) { // 1. Get the version to read - byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY, - null, AccessOption.PERSISTENT); - if (binaryVersionToRead == null) { - throw new ZkNoNodeException( - String.format("Last successful write ZNode does not exist for path: %s", path)); - } - String versionToRead = new String(binaryVersionToRead); + String versionToRead = getLastSuccessfulWriteVersion(path); // 2. Get the metadata map byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead + "/" + METADATA_KEY, @@ -354,26 +348,30 @@ public void finalize() { close(); } - private synchronized void updateGCTimer(String rootPath, long currentVersion) { + private synchronized void scheduleStaleVersionGC(String rootPath) { + // If GC already scheduled, return early if (_gcTaskFutureMap.containsKey(rootPath)) { - _gcTaskFutureMap.remove(rootPath).cancel(false); + return; } - // Schedule the gc task with TTL + // Schedule GC task _gcTaskFutureMap.put(rootPath, GC_THREAD.schedule(() -> { - try { - deleteStaleVersions(rootPath, currentVersion); - } catch (Exception ex) { - LOG.error("Failed to delete the stale versions.", ex); - } - }, _versionTTLms, TimeUnit.MILLISECONDS)); - } + try { + _gcTaskFutureMap.remove(rootPath); + deleteStaleVersions(rootPath); + } catch (Exception ex) { + LOG.error("Failed to delete the stale versions.", ex); + } + }, _versionTTLms, TimeUnit.MILLISECONDS)); + } /** * Deletes all stale versions. * @param rootPath - * @param currentVersion */ - private void deleteStaleVersions(String rootPath, long currentVersion) { + private void deleteStaleVersions(String rootPath) { + // Get most recent write version + String currentVersionStr = getLastSuccessfulWriteVersion(rootPath); + // Get all children names under path List children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT); if (children == null || children.isEmpty()) { @@ -381,7 +379,7 @@ private void deleteStaleVersions(String rootPath, long currentVersion) { return; } List pathsToDelete = - getPathsToDelete(rootPath, filterChildrenNames(children, currentVersion)); + getPathsToDelete(rootPath, filterChildrenNames(children, Long.parseLong(currentVersionStr))); for (String pathToDelete : pathsToDelete) { // TODO: Should be batch delete but it doesn't work. It's okay since this runs async _zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT); @@ -429,4 +427,14 @@ private List getPathsToDelete(String path, List staleVersions) { staleVersions.forEach(ver -> pathsToDelete.add(path + "/" + ver)); return pathsToDelete; } + + private String getLastSuccessfulWriteVersion(String path) { + byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY, + null, AccessOption.PERSISTENT); + if (binaryVersionToRead == null) { + throw new ZkNoNodeException( + String.format("Last successful write ZNode does not exist for path: %s", path)); + } + return new String(binaryVersionToRead); + } } diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java index 8d6b83fc04..7e495e922c 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBucketDataAccessor.java @@ -58,15 +58,17 @@ public class TestZkBucketDataAccessor extends ZkTestBase { private final ZNRecord record = new ZNRecord(NAME_KEY); + private HelixZkClient _zkClient; private BucketDataAccessor _bucketDataAccessor; private BaseDataAccessor _zkBaseDataAccessor; + private BucketDataAccessor _fastGCBucketDataAccessor; @BeforeClass public void beforeClass() { // Initialize ZK accessors for testing - HelixZkClient zkClient = DedicatedZkClientFactory.getInstance() + _zkClient = DedicatedZkClientFactory.getInstance() .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR)); - zkClient.setZkSerializer(new ZkSerializer() { + _zkClient.setZkSerializer(new ZkSerializer() { @Override public byte[] serialize(Object data) throws ZkMarshallingError { if (data instanceof byte[]) { @@ -80,8 +82,8 @@ public Object deserialize(byte[] data) throws ZkMarshallingError { return data; } }); - _zkBaseDataAccessor = new ZkBaseDataAccessor<>(zkClient); - _bucketDataAccessor = new ZkBucketDataAccessor(zkClient, 50 * 1024, VERSION_TTL_MS); + _zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient); + _bucketDataAccessor = new ZkBucketDataAccessor(_zkClient, 50 * 1024, VERSION_TTL_MS); // Fill in some data for the record record.setSimpleField(NAME_KEY, NAME_KEY); @@ -193,6 +195,35 @@ public void testLargeWriteAndRead() throws IOException { Assert.assertEquals(readRecord, property); } + /** + * Test to ensure bucket GC still occurs in high frequency write scenarios. + */ + @Test(dependsOnMethods = "testLargeWriteAndRead") + public void testGCScheduler() throws IOException, InterruptedException { + long gcTTL = 1000; // GC schedule for 1 second after write + ZkBucketDataAccessor fastGCBucketDataAccessor = new ZkBucketDataAccessor(_zkClient, 50 * 1024, gcTTL); + + int writeCount = 10; + for (int i = 0; i < writeCount; i++) { + Thread.sleep(gcTTL / 2); + Assert.assertTrue(fastGCBucketDataAccessor.compressedBucketWrite(PATH, new HelixProperty(record))); + } + List children = _zkBaseDataAccessor.getChildNames(PATH, AccessOption.PERSISTENT); + // remove from list if name cant be parsed into long (aka not a version count node) + children.removeIf(name -> { + try { + Long.parseLong(name); + return false; + } catch (NumberFormatException e) { + return true; + } + }); + + Assert.assertTrue(children.size() < writeCount, + "Expecting stale versions to cleaned up. Children were: " + children); + System.out.print("Children after GC: " + children); + } + private HelixProperty createLargeHelixProperty(String name, int numEntries) { HelixProperty property = new HelixProperty(name); for (int i = 0; i < numEntries; i++) {