Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
GrantPSpencer committed Aug 6, 2024
1 parent e16fb13 commit d72bbea
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,9 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
Expand Down Expand Up @@ -81,7 +79,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
private final ZkSerializer _zkSerializer;
private final RealmAwareZkClient _zkClient;
private final ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
private final Map<String, Queue<ScheduledFuture>> _gcTaskFutureQueueMap = new HashMap<>();
private final Map<String, ScheduledFuture> _gcTaskFutureMap = new ConcurrentHashMap<>();
private boolean _usesExternalZkClient = false;

/**
Expand Down Expand Up @@ -241,7 +239,7 @@ public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath,
}

// 5. Update the timer for GC
scheduleStaleVersionGC(rootPath, version);
scheduleStaleVersionGC(rootPath);
return true;
}

Expand All @@ -257,7 +255,7 @@ public void compressedBucketDelete(String path) {
throw new HelixException(String.format("Failed to delete the bucket data! Path: %s", path));
}
synchronized (this) {
_gcTaskFutureQueueMap.remove(path);
_gcTaskFutureMap.remove(path);
}
}

Expand All @@ -270,13 +268,7 @@ public void disconnect() {

private HelixProperty compressedBucketRead(String path) {
// 1. Get the version to read
byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY,
null, AccessOption.PERSISTENT);
if (binaryVersionToRead == null) {
throw new ZkNoNodeException(
String.format("Last successful write ZNode does not exist for path: %s", path));
}
String versionToRead = new String(binaryVersionToRead);
String versionToRead = getLastSuccessfulWriteVersion(path);

// 2. Get the metadata map
byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead + "/" + METADATA_KEY,
Expand Down Expand Up @@ -356,36 +348,38 @@ public void finalize() {
close();
}

private synchronized void scheduleStaleVersionGC(String rootPath, long currentVersion) {
// Create empty queue for new path
if (!_gcTaskFutureQueueMap.containsKey(rootPath)) {
_gcTaskFutureQueueMap.put(rootPath, new LinkedList<>());
private synchronized void scheduleStaleVersionGC(String rootPath) {
// If GC already scheduled, return early
if (_gcTaskFutureMap.containsKey(rootPath)) {
return;
}
// Schedule GC task
_gcTaskFutureQueueMap.get(rootPath).add(GC_THREAD.schedule(() -> {
_gcTaskFutureMap.put(rootPath, GC_THREAD.schedule(() -> {
try {
deleteStaleVersions(rootPath, currentVersion);
_gcTaskFutureMap.remove(rootPath);
deleteStaleVersions(rootPath);
} catch (Exception ex) {
LOG.error("Failed to delete the stale versions.", ex);
}
}, _versionTTLms, TimeUnit.MILLISECONDS)
);
}
}, _versionTTLms, TimeUnit.MILLISECONDS));
}

/**
* Deletes all stale versions.
* @param rootPath
* @param currentVersion
*/
private void deleteStaleVersions(String rootPath, long currentVersion) {
private void deleteStaleVersions(String rootPath) {
// Get most recent write version
String currentVersionStr = getLastSuccessfulWriteVersion(rootPath);

// Get all children names under path
List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT);
if (children == null || children.isEmpty()) {
// The whole path has been deleted so return immediately
return;
}
List<String> pathsToDelete =
getPathsToDelete(rootPath, filterChildrenNames(children, currentVersion));
getPathsToDelete(rootPath, filterChildrenNames(children, Long.parseLong(currentVersionStr)));
for (String pathToDelete : pathsToDelete) {
// TODO: Should be batch delete but it doesn't work. It's okay since this runs async
_zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT);
Expand Down Expand Up @@ -433,4 +427,14 @@ private List<String> getPathsToDelete(String path, List<String> staleVersions) {
staleVersions.forEach(ver -> pathsToDelete.add(path + "/" + ver));
return pathsToDelete;
}

private String getLastSuccessfulWriteVersion(String path) {
byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY,
null, AccessOption.PERSISTENT);
if (binaryVersionToRead == null) {
throw new ZkNoNodeException(
String.format("Last successful write ZNode does not exist for path: %s", path));
}
return new String(binaryVersionToRead);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public void testGCScheduler() throws IOException, InterruptedException {

Assert.assertTrue(children.size() < writeCount,
"Expecting stale versions to cleaned up. Children were: " + children);
System.out.print("Children after GC: " + children);
}

private HelixProperty createLargeHelixProperty(String name, int numEntries) {
Expand Down

0 comments on commit d72bbea

Please sign in to comment.