Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update minion task metadata ZNode path #8959

Merged
merged 7 commits into from
Jun 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import org.apache.helix.ZNRecord;
import org.apache.helix.model.ExternalView;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.pinot.broker.util.FakePropertyStore;
import org.apache.pinot.common.lineage.LineageEntry;
import org.apache.pinot.common.lineage.LineageEntryState;
import org.apache.pinot.common.lineage.SegmentLineage;
import org.apache.pinot.common.lineage.SegmentLineageAccessHelper;
import org.apache.pinot.common.lineage.SegmentLineageUtils;
import org.apache.pinot.common.utils.helix.FakePropertyStore;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,7 @@ private ZKMetadataProvider() {
private static final String PROPERTYSTORE_MINION_TASK_METADATA_PREFIX = "/MINION_TASK_METADATA";

public static void setUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String username, ZNRecord znRecord) {
propertyStore
.set(constructPropertyStorePathForUserConfig(username), znRecord, AccessOption.PERSISTENT);
propertyStore.set(constructPropertyStorePathForUserConfig(username), znRecord, AccessOption.PERSISTENT);
}

public static void setRealtimeTableConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String realtimeTableName,
Expand Down Expand Up @@ -130,7 +129,13 @@ public static String constructPropertyStorePathForSegmentLineage(String tableNam
return StringUtil.join("/", PROPERTYSTORE_SEGMENT_LINEAGE, tableNameWithType);
}

public static String constructPropertyStorePathForMinionTaskMetadata(String taskType, String tableNameWithType) {
public static String constructPropertyStorePathForMinionTaskMetadata(String tableNameWithType, String taskType) {
return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, tableNameWithType, taskType);
}

@Deprecated
public static String constructPropertyStorePathForMinionTaskMetadataDeprecated(String taskType,
String tableNameWithType) {
return StringUtil.join("/", PROPERTYSTORE_MINION_TASK_METADATA_PREFIX, taskType, tableNameWithType);
}

Expand All @@ -156,8 +161,7 @@ public static void removeResourceConfigFromPropertyStore(ZkHelixPropertyStore<ZN
}
}

public static void removeUserConfigFromPropertyStore(ZkHelixPropertyStore<ZNRecord> propertyStore,
String username) {
public static void removeUserConfigFromPropertyStore(ZkHelixPropertyStore<ZNRecord> propertyStore, String username) {
String propertyStorePath = constructPropertyStorePathForUserConfig(username);
if (propertyStore.exists(propertyStorePath, AccessOption.PERSISTENT)) {
propertyStore.remove(propertyStorePath, AccessOption.PERSISTENT);
Expand Down Expand Up @@ -224,8 +228,8 @@ public static SegmentZKMetadata getSegmentZKMetadata(ZkHelixPropertyStore<ZNReco

@Nullable
public static UserConfig getUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore, String username) {
ZNRecord znRecord = propertyStore
.get(constructPropertyStorePathForUserConfig(username), null, AccessOption.PERSISTENT);
ZNRecord znRecord =
propertyStore.get(constructPropertyStorePathForUserConfig(username), null, AccessOption.PERSISTENT);
if (znRecord == null) {
return null;
}
Expand All @@ -240,14 +244,13 @@ public static UserConfig getUserConfig(ZkHelixPropertyStore<ZNRecord> propertySt

@Nullable
public static List<UserConfig> getAllUserConfig(ZkHelixPropertyStore<ZNRecord> propertyStore) {
List<ZNRecord> znRecordss = propertyStore
.getChildren(PROPERTYSTORE_USER_CONFIGS_PREFIX, null, AccessOption.PERSISTENT);
List<ZNRecord> znRecordss =
propertyStore.getChildren(PROPERTYSTORE_USER_CONFIGS_PREFIX, null, AccessOption.PERSISTENT);

try {
return Optional.ofNullable(znRecordss)
.orElseGet(() -> {
return new ArrayList<>();
}).stream().map(AccessControlUserConfigUtils::fromZNRecord).collect(Collectors.toList());
return Optional.ofNullable(znRecordss).orElseGet(() -> {
return new ArrayList<>();
}).stream().map(AccessControlUserConfigUtils::fromZNRecord).collect(Collectors.toList());
} catch (Exception e) {
LOGGER.error("Caught exception while getting user list configuration", e);
return null;
Expand All @@ -256,8 +259,7 @@ public static List<UserConfig> getAllUserConfig(ZkHelixPropertyStore<ZNRecord> p

@Nullable
public static List<String> getAllUserName(ZkHelixPropertyStore<ZNRecord> propertyStore) {
return propertyStore
.getChildNames(PROPERTYSTORE_USER_CONFIGS_PREFIX, AccessOption.PERSISTENT);
return propertyStore.getChildNames(PROPERTYSTORE_USER_CONFIGS_PREFIX, AccessOption.PERSISTENT);
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
* Base abstract class for minion task metadata.
*
* This metadata gets serialized and stored in zookeeper under the path:
* MINION_TASK_METADATA/${taskName}/${tableNameWithType}
* MINION_TASK_METADATA/${tableNameWithType}/${taskName}
*/
public abstract class BaseTaskMetadata {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The <code>watermarkMap</code> denotes the time (exclusive) upto which tasks have been executed for the bucket
* granularity.
*
* This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/MergeRollupTask/tableNameWithType
* This gets serialized and stored in zookeeper under the path MINION_TASK_METADATA/${tableNameWithType}/MergeRollupTask
*/
public class MergeRollupTaskMetadata extends BaseTaskMetadata {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,24 @@ private MinionTaskMetadataUtils() {
}

/**
* Fetches the ZNRecord for the given minion task and tableName, from MINION_TASK_METADATA/taskName/tableNameWthType
* Fetches the ZNRecord for the given minion task and tableName. Fetch from the new path
* MINION_TASK_METADATA/${tableNameWthType}/{taskType} if it exists; otherwise, fetch from the old path
* MINION_TASK_METADATA/${taskType}/${tableNameWthType}.
*/
@Nullable
public static ZNRecord fetchTaskMetadata(HelixPropertyStore<ZNRecord> propertyStore, String taskType,
String tableNameWithType) {
String path = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType, tableNameWithType);
String newPath = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(tableNameWithType, taskType);
if (propertyStore.exists(newPath, AccessOption.PERSISTENT)) {
return fetchTaskMetadata(propertyStore, newPath);
} else {
return fetchTaskMetadata(propertyStore,
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(taskType, tableNameWithType));
}
}

@Nullable
private static ZNRecord fetchTaskMetadata(HelixPropertyStore<ZNRecord> propertyStore, String path) {
Stat stat = new Stat();
ZNRecord znRecord = propertyStore.get(path, stat, AccessOption.PERSISTENT);
if (znRecord != null) {
Expand All @@ -51,29 +63,49 @@ public static ZNRecord fetchTaskMetadata(HelixPropertyStore<ZNRecord> propertySt
}

/**
* Deletes the ZNRecord for the given minion task and tableName, from MINION_TASK_METADATA/taskName/tableNameWthType
* Deletes the ZNRecord for the given minion task and tableName, from both the new path
* MINION_TASK_METADATA/${tableNameWthType}/${taskType} and the old path
* MINION_TASK_METADATA/${taskType}/${tableNameWthType}.
*/
public static void deleteTaskMetadata(HelixPropertyStore<ZNRecord> propertyStore, String taskType,
String tableNameWithType) {
String path = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType, tableNameWithType);
if (!propertyStore.remove(path, AccessOption.PERSISTENT)) {
String newPath = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(tableNameWithType, taskType);
String oldPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(taskType, tableNameWithType);
boolean newPathDeleted = propertyStore.remove(newPath, AccessOption.PERSISTENT);
boolean oldPathDeleted = propertyStore.remove(oldPath, AccessOption.PERSISTENT);
if (!newPathDeleted || !oldPathDeleted) {
throw new ZkException("Failed to delete task metadata: " + taskType + ", " + tableNameWithType);
}
}

/**
* Generic method for persisting {@link BaseTaskMetadata} to MINION_TASK_METADATA. The metadata will be saved in the
* ZNode under the path: /MINION_TASK_METADATA/${taskType}/${tableNameWithType}
* Generic method for persisting {@link BaseTaskMetadata} to MINION_TASK_METADATA. The metadata will
* be saved in the ZNode under the new path /MINION_TASK_METADATA/${tableNameWithType}/${taskType} if
* the old path already exists; otherwise, it will be saved in the ZNode under the old path
* /MINION_TASK_METADATA/${taskType}/${tableNameWithType}.
*
* Will fail if expectedVersion does not match.
* Set expectedVersion -1 to override version check.
*/
public static void persistTaskMetadata(HelixPropertyStore<ZNRecord> propertyStore, String taskType,
BaseTaskMetadata taskMetadata, int expectedVersion) {
String path = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskType,
String newPath =
ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadata(taskMetadata.getTableNameWithType(),
taskType);
String oldPath = ZKMetadataProvider.constructPropertyStorePathForMinionTaskMetadataDeprecated(taskType,
taskMetadata.getTableNameWithType());
if (!propertyStore
.set(path, taskMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT)) {
if (propertyStore.exists(newPath, AccessOption.PERSISTENT) || !propertyStore.exists(oldPath,
AccessOption.PERSISTENT)) {
persistTaskMetadata(newPath, propertyStore, taskType, taskMetadata, expectedVersion);
} else {
persistTaskMetadata(oldPath, propertyStore, taskType, taskMetadata, expectedVersion);
}
}

private static void persistTaskMetadata(String path, HelixPropertyStore<ZNRecord> propertyStore, String taskType,
BaseTaskMetadata taskMetadata, int expectedVersion) {
if (!propertyStore.set(path, taskMetadata.toZNRecord(), expectedVersion, AccessOption.PERSISTENT)) {
throw new ZkException(
"Failed to persist minion metadata for task: " + taskType + " and metadata: " + taskMetadata);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* The <code>watermarkMs</code> denotes the time (exclusive) upto which tasks have been executed.
*
* This gets serialized and stored in zookeeper under the path
* MINION_TASK_METADATA/RealtimeToOfflineSegmentsTask/tableNameWithType
* MINION_TASK_METADATA/${tableNameWithType}/RealtimeToOfflineSegmentsTask
*
* PinotTaskGenerator:
* The <code>watermarkMs</code>> is used by the <code>RealtimeToOfflineSegmentsTaskGenerator</code>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.broker.util;
package org.apache.pinot.common.utils.helix;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -40,6 +40,11 @@ public ZNRecord get(String path, Stat stat, int options) {
return _contents.get(path);
}

@Override
public boolean exists(String path, int options) {
return _contents.containsKey(path);
}

@Override
public void subscribeDataChanges(String path, IZkDataListener listener) {
_listener = listener;
Expand All @@ -65,6 +70,12 @@ public boolean set(String path, ZNRecord stat, int options) {
}
}

@Override
public boolean remove(String path, int options) {
_contents.remove(path);
return true;
}

public void setContents(String path, ZNRecord contents)
throws Exception {
_contents.put(path, contents);
Expand Down
Loading