Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TTL support for compressedBucketWrite fn #2930

Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,16 @@ public boolean update(String path, DataUpdater<T> updater, int options) {
* sync update
*/
public AccessResult doUpdate(String path, DataUpdater<T> updater, int options) {
return doUpdate(path, updater, options, ZkClient.TTL_NOT_SET);
}

/**
* sync update with ttl
*
* ttl is only used when creating new znode, hence if znode is already created with a ttl, further
* update operations will not update the znode ttl even if ttl is provided in the options
*/
AccessResult doUpdate(String path, DataUpdater<T> updater, int options, long ttl) {
AccessResult result = new AccessResult();
CreateMode mode = AccessOption.getMode(options);
if (mode == null) {
Expand Down Expand Up @@ -452,7 +462,7 @@ public AccessResult doUpdate(String path, DataUpdater<T> updater, int options) {
T newData = updater.update(null);
RetCode rc;
if (newData != null) {
AccessResult res = doCreate(path, newData, options);
AccessResult res = doCreate(path, newData, options, ttl);
result._pathCreated.addAll(res._pathCreated);
rc = res._retCode;
} else {
Expand Down Expand Up @@ -979,11 +989,29 @@ public boolean[] setChildren(List<String> paths, List<T> records, int options) {
return set(paths, records, null, null, options);
}

/**
* async setChildren with TTL
*/
public boolean[] setChildren(List<String> paths, List<T> records, int options, long ttl) {
return set(paths, records, null, null, options, ttl);
}

/**
* async set, give up on error other than NoNode
*/
boolean[] set(List<String> paths, List<T> records, List<List<String>> pathsCreated,
List<Stat> stats, int options) {
return set(paths, records, pathsCreated, stats, options, ZkClient.TTL_NOT_SET);
}

/**
* async set with ttl, give up on error other than NoNode
*
* ttl is only used when creating new znode, hence if znode is already created with a ttl, further
* set operations will not update the znode ttl even if ttl is provided in the options
*/
boolean[] set(List<String> paths, List<T> records, List<List<String>> pathsCreated,
List<Stat> stats, int options, long ttl) {
if (paths == null || paths.size() == 0) {
return new boolean[0];
}
Expand Down Expand Up @@ -1051,7 +1079,7 @@ boolean[] set(List<String> paths, List<T> records, List<List<String>> pathsCreat
// if failOnNoNode, try create
if (failOnNoNode) {
boolean[] needCreate = Arrays.copyOf(needSet, needSet.length);
createCbList = create(paths, records, needCreate, pathsCreated, options);
createCbList = create(paths, records, needCreate, pathsCreated, options, ttl);
for (int i = 0; i < createCbList.length; i++) {
ZkAsyncCallbacks.CreateCallbackHandler createCb = createCbList[i];
if (createCb == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.ZkClient;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
Expand Down Expand Up @@ -76,6 +77,8 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {

private final int _bucketSize;
private final long _versionTTLms;
private long _znodeTTLms;
private int _accessOption;
private final ZkSerializer _zkSerializer;
private final RealmAwareZkClient _zkClient;
private final ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
Expand All @@ -89,25 +92,31 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable {
* @param versionTTLms in ms
*/
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTLms) {
this(createRealmAwareZkClient(zkAddr), bucketSize, versionTTLms, false);
this(createRealmAwareZkClient(zkAddr), bucketSize, versionTTLms, false, ZkClient.TTL_NOT_SET);
}

public ZkBucketDataAccessor(RealmAwareZkClient zkClient) {
this(zkClient, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL, true);
this(zkClient, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL, true, ZkClient.TTL_NOT_SET);
}

public ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize, long versionTTLms) {
this(zkClient, bucketSize, versionTTLms, true);
this(zkClient, bucketSize, versionTTLms, true, ZkClient.TTL_NOT_SET);
}

public ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize, long versionTTLms, long znodeTTLms) {
this(zkClient, bucketSize, versionTTLms, true, znodeTTLms);
}

private ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize, long versionTTLms,
boolean usesExternalZkClient) {
boolean usesExternalZkClient, long znodeTTLms) {
_zkClient = zkClient;
_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
_zkSerializer = new ZNRecordJacksonSerializer();
_bucketSize = bucketSize;
_versionTTLms = versionTTLms;
_usesExternalZkClient = usesExternalZkClient;
_znodeTTLms = znodeTTLms;
_accessOption = getAccessOption(znodeTTLms);
}

/**
Expand Down Expand Up @@ -156,10 +165,9 @@ public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath,
lastWriteVersion++;
return String.valueOf(lastWriteVersion).getBytes();
};

// 1. Increment lastWriteVersion using DataUpdater
ZkBaseDataAccessor.AccessResult result = _zkBaseDataAccessor.doUpdate(
rootPath + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, AccessOption.PERSISTENT);
Copy link
Author

@jacoblukose jacoblukose Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note(myself): based on how pr discussion concludes, we might need to use explicit create for first time and then update calls from second time onwards.

rootPath + "/" + LAST_WRITE_KEY, lastWriteVersionUpdater, _accessOption, _znodeTTLms);
if (result._retCode != ZkBaseDataAccessor.RetCode.OK) {
throw new HelixException(
String.format("Failed to write the write version at path: %s!", rootPath));
Expand Down Expand Up @@ -206,7 +214,7 @@ public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath,
buckets.add(binaryMetadata);

// Do an async set to ZK
boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, AccessOption.PERSISTENT);
boolean[] success = _zkBaseDataAccessor.setChildren(paths, buckets, _accessOption, _znodeTTLms);
// Exception and fail the write if any failed
for (boolean s : success) {
if (!s) {
Expand All @@ -233,7 +241,7 @@ public <T extends HelixProperty> boolean compressedBucketWrite(String rootPath,
}
};
if (!_zkBaseDataAccessor.update(rootPath + "/" + LAST_SUCCESSFUL_WRITE_KEY,
Copy link
Author

@jacoblukose jacoblukose Nov 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note(myself): I think this update() calls also need to be changed to either create if first time and then update for later updates. Or move to doUpdate fn with ttl support, based on how the PR discussion concludes.

lastSuccessfulWriteVersionUpdater, AccessOption.PERSISTENT)) {
lastSuccessfulWriteVersionUpdater, _accessOption)) {
throw new HelixException(String
.format("Failed to write the last successful write metadata at path: %s!", rootPath));
}
Expand All @@ -251,7 +259,7 @@ public <T extends HelixProperty> HelixProperty compressedBucketRead(String path,

@Override
public void compressedBucketDelete(String path) {
if (!_zkBaseDataAccessor.remove(path, AccessOption.PERSISTENT)) {
if (!_zkBaseDataAccessor.remove(path, _accessOption)) {
throw new HelixException(String.format("Failed to delete the bucket data! Path: %s", path));
}
synchronized (this) {
Expand All @@ -272,7 +280,7 @@ private HelixProperty compressedBucketRead(String path) {

// 2. Get the metadata map
byte[] binaryMetadata = _zkBaseDataAccessor.get(path + "/" + versionToRead + "/" + METADATA_KEY,
null, AccessOption.PERSISTENT);
null, _accessOption);
if (binaryMetadata == null) {
throw new ZkNoNodeException(
String.format("Metadata ZNode does not exist for path: %s", path));
Expand Down Expand Up @@ -309,7 +317,7 @@ private HelixProperty compressedBucketRead(String path) {
}

// Async get
List<byte[]> buckets = _zkBaseDataAccessor.get(paths, null, AccessOption.PERSISTENT, true);
List<byte[]> buckets = _zkBaseDataAccessor.get(paths, null, _accessOption, true);

// Combine buckets into one byte array
int copyPtr = 0;
Expand Down Expand Up @@ -348,6 +356,14 @@ public void finalize() {
close();
}

static int getAccessOption(long znodeTTLms) {
if(znodeTTLms > 0) {
return AccessOption.PERSISTENT_WITH_TTL;
} else {
return AccessOption.PERSISTENT;
}
}

private synchronized void scheduleStaleVersionGC(String rootPath) {
// If GC already scheduled, return early
if (_gcTaskFutureMap.containsKey(rootPath)) {
Expand All @@ -373,7 +389,7 @@ private void deleteStaleVersions(String rootPath) {
String currentVersionStr = getLastSuccessfulWriteVersion(rootPath);

// Get all children names under path
List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, AccessOption.PERSISTENT);
List<String> children = _zkBaseDataAccessor.getChildNames(rootPath, _accessOption);
if (children == null || children.isEmpty()) {
// The whole path has been deleted so return immediately
return;
Expand All @@ -382,7 +398,7 @@ private void deleteStaleVersions(String rootPath) {
getPathsToDelete(rootPath, filterChildrenNames(children, Long.parseLong(currentVersionStr)));
for (String pathToDelete : pathsToDelete) {
// TODO: Should be batch delete but it doesn't work. It's okay since this runs async
_zkBaseDataAccessor.remove(pathToDelete, AccessOption.PERSISTENT);
_zkBaseDataAccessor.remove(pathToDelete, _accessOption);
}
}

Expand Down Expand Up @@ -430,7 +446,7 @@ private List<String> getPathsToDelete(String path, List<String> staleVersions) {

private String getLastSuccessfulWriteVersion(String path) {
byte[] binaryVersionToRead = _zkBaseDataAccessor.get(path + "/" + LAST_SUCCESSFUL_WRITE_KEY,
null, AccessOption.PERSISTENT);
null, _accessOption);
if (binaryVersionToRead == null) {
throw new ZkNoNodeException(
String.format("Last successful write ZNode does not exist for path: %s", path));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,52 @@ record = new ZNRecord("msg_1");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

/**
* Test to ensure async setChildren fn with ttl configured works as expected. Test verifies general setChildren functionality
* and also verifies if the znodes created are configured with correct TTL.
*/
@Test
public void testAsyncSetChildrenWithTTL() {
// Step 1: Enable extended types in Zookeeper for TTL support
System.setProperty("zookeeper.extendedTypesEnabled", "true");
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;

System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));

String root = _rootPath;
long ttl = 4000L;
List<ZNRecord> records = new ArrayList<>();
List<String> paths = new ArrayList<>();
ZkBaseDataAccessor<ZNRecord> accessor = Mockito.spy(new ZkBaseDataAccessor<ZNRecord>(_gZkClient));

// Step 2: Create 10 ZNRecord objects and corresponding paths
for (int i = 0; i < 10; i++) {
String msgId = "msg_" + i;
// Example path: /TestZkBaseDataAccessor/INSTANCES/host_1/MESSAGES/msg_id
paths.add(PropertyPathBuilder.instanceMessage(root, "host_1", msgId));
ZNRecord newRecord = new ZNRecord(msgId);
newRecord.setSimpleField("key1", "value1");
records.add(newRecord);
}

// Step 3: Set the 10 ZNRecord objects with TTL
boolean[] success = accessor.setChildren(paths, records, AccessOption.PERSISTENT_WITH_TTL, ttl);
for (int i = 0; i < 10; i++) {
String msgId = "msg_" + i;
Assert.assertTrue(success[i], "Should succeed in set " + msgId);
}

// Step 4: Verify if all 5 subpaths to be created are configured with TTL
Mockito.verify(accessor, Mockito.times(5)).create(Mockito.any(),
Mockito.any(), Mockito.any(), Mockito.any(), Mockito.eq(AccessOption.PERSISTENT_WITH_TTL), Mockito.eq(ttl));

// Step 5: Clear the extended types property
System.clearProperty("zookeeper.extendedTypesEnabled");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testSyncDoSet() {
String className = TestHelper.getTestClassName();
Expand Down Expand Up @@ -420,6 +466,58 @@ public ZNRecord update(ZNRecord currentData) {
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

/**
* Test to ensure sync doUpdate fn with ttl configured works as expected. Test verifies general doUpdate functionality
* and also verifies if znode created are configured with correct TTL.
*/
@Test
public void testSyncDoUpdateWithTTL() {
// Step 1: Enable extended types in Zookeeper for TTL support
System.setProperty("zookeeper.extendedTypesEnabled", "true");
String className = TestHelper.getTestClassName();
String methodName = TestHelper.getTestMethodName();
String testName = className + "_" + methodName;

System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));

long ttl = 4000L;
String path = String.format("/%s/%s", _rootPath, "msg_0");
ZNRecord record = new ZNRecord("msg_0");
ZkBaseDataAccessor<ZNRecord> accessor = Mockito.spy(new ZkBaseDataAccessor<ZNRecord>(_gZkClient));

// Step 2: Attempt to update without TTL (should fail)
AccessResult result = accessor.doUpdate(path, new ZNRecordUpdater(record), AccessOption.PERSISTENT_WITH_TTL);
// Fails as ttl is not provided when AccessOption.PERSISTENT_WITH_TTL is used
Assert.assertEquals(result._retCode, RetCode.ERROR);

// Step 3: Update with TTL
result = accessor.doUpdate(path, new ZNRecordUpdater(record), AccessOption.PERSISTENT_WITH_TTL, ttl);
Assert.assertEquals(result._retCode, RetCode.OK);
ZNRecord getRecord = _gZkClient.readData(path);
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getId(), "msg_0");
// Step 4: Verify if the znode created is configured with TTL
Mockito.verify(accessor, Mockito.times(1)).doCreate(Mockito.anyString(), Mockito.any(ZNRecord.class), Mockito.eq(AccessOption.PERSISTENT_WITH_TTL), Mockito.eq(ttl));

Mockito.reset(accessor);

// Step 5: Update the record with a simple field and verify
record.setSimpleField("key0", "value0");
result = accessor.doUpdate(path, new ZNRecordUpdater(record), AccessOption.PERSISTENT_WITH_TTL, ttl);
Assert.assertEquals(result._retCode, RetCode.OK);
getRecord = _gZkClient.readData(path);
Assert.assertNotNull(getRecord);
Assert.assertEquals(getRecord.getSimpleFields().size(), 1);
Assert.assertNotNull(getRecord.getSimpleField("key0"));
Assert.assertEquals(getRecord.getSimpleField("key0"), "value0");
// Step 6: Verify if the znode created is configured with TTL
Mockito.verify(accessor, Mockito.times(0)).doCreate(Mockito.anyString(), Mockito.any(ZNRecord.class), Mockito.eq(AccessOption.PERSISTENT_WITH_TTL), Mockito.eq(ttl));

// Step 7: Clear the extended types property
System.clearProperty("zookeeper.extendedTypesEnabled");
System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
}

@Test
public void testSyncMultiSet() {
String className = TestHelper.getTestClassName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -219,6 +220,27 @@ public void testGCScheduler() throws IOException, InterruptedException {
System.out.print("Children after GC: " + children);
}

/**
* Test to ensure that the correct AccessOption is returned based on the znodeTTLms value.
*/
@Test
public void testGetAccessOption() {
long ttl = 1000L; // Example of a positive TTL
int result = ZkBucketDataAccessor.getAccessOption(ttl);
Assert.assertEquals(AccessOption.PERSISTENT_WITH_TTL, result,
"Expected PERSISTENT_WITH_TTL for positive znodeTTLms");

ttl = 0L; // Example of a zero TTL
result = ZkBucketDataAccessor.getAccessOption(ttl);
Assert.assertEquals(AccessOption.PERSISTENT, result,
"Expected PERSISTENT for zero znodeTTLms");

ttl = -100L; // Example of a negative TTL
result = ZkBucketDataAccessor.getAccessOption(ttl);
Assert.assertEquals(AccessOption.PERSISTENT, result,
"Expected PERSISTENT for negative znodeTTLms");
}

private HelixProperty createLargeHelixProperty(String name, int numEntries) {
HelixProperty property = new HelixProperty(name);
for (int i = 0; i < numEntries; i++) {
Expand Down