Skip to content

Commit

Permalink
Compress idealstate according to estimated size (#10766)
Browse files Browse the repository at this point in the history
* Compress idealstate according to estimated size

Currently we compress idealstate based on the number of segments
in a table. Added an alfgorithm to estimate the size of the idealstate
znode while deciding to enable compression.

Size threshold configurable as per zookeeper installation requirements.

* Addressed PR comments
  • Loading branch information
mcvsubbu authored May 16, 2023
1 parent 27eb960 commit 579082c
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -78,6 +79,11 @@ private HelixHelper() {

public static final String BROKER_RESOURCE = CommonConstants.Helix.BROKER_RESOURCE_INSTANCE;

private static int _minNumCharsInISToTurnOnCompression = -1;

public static synchronized void setMinNumCharsInISToTurnOnCompression(int minNumChars) {
_minNumCharsInISToTurnOnCompression = minNumChars;
}
public static IdealState cloneIdealState(IdealState idealState) {
return new IdealState(
(ZNRecord) ZN_RECORD_SERIALIZER.deserialize(ZN_RECORD_SERIALIZER.serialize(idealState.getRecord())));
Expand Down Expand Up @@ -127,7 +133,7 @@ public Boolean call() {
updatedIdealState.setNumPartitions(numPartitions);

// If the ideal state is large enough, enable compression
boolean enableCompression = numPartitions > NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION;
boolean enableCompression = shouldCompress(updatedIdealState);
if (enableCompression) {
updatedZNRecord.setBooleanField(ENABLE_COMPRESSIONS_KEY, true);
} else {
Expand Down Expand Up @@ -163,6 +169,34 @@ public Boolean call() {
return true;
}
}

private boolean shouldCompress(IdealState is) {
if (is.getNumPartitions() > NUM_PARTITIONS_THRESHOLD_TO_ENABLE_COMPRESSION) {
return true;
}

// Find the number of characters in one partition in idealstate, and extrapolate
// to estimate the number of characters.
// We could serialize the znode to determine the exact size, but that would mean serializing every
// idealstate znode twice. We avoid some extra GC by estimating the size instead. Such estimations
// should be good for most installations that have similar segment and instance names.
Iterator<String> it = is.getPartitionSet().iterator();
if (it.hasNext()) {
String partitionName = it.next();
int numChars = partitionName.length();
Map<String, String> stateMap = is.getInstanceStateMap(partitionName);
for (Map.Entry<String, String> entry : stateMap.entrySet()) {
numChars += entry.getKey().length();
numChars += entry.getValue().length();
}
numChars *= is.getNumPartitions();
if (_minNumCharsInISToTurnOnCompression > 0
&& numChars > _minNumCharsInISToTurnOnCompression) {
return true;
}
}
return false;
}
});
return idealStateWrapper._idealState;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ public void init(PinotConfiguration pinotConfiguration)
ServiceStartableUtils.applyClusterConfig(_config, _helixZkURL, _helixClusterName, ServiceRole.CONTROLLER);

setupHelixSystemProperties();
HelixHelper.setMinNumCharsInISToTurnOnCompression(_config.getMinNumCharsInISToTurnOnCompression());
_listenerConfigs = ListenerConfigUtil.buildControllerConfigs(_config);
_controllerMode = _config.getControllerMode();
inferHostnameIfNeeded(_config);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public class ControllerConf extends PinotConfiguration {
public static final String ZK_STR = "controller.zk.str";
// boolean: Update the statemodel on boot?
public static final String UPDATE_SEGMENT_STATE_MODEL = "controller.update_segment_state_model";
public static final String MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION = "controller.min_is_size_for_compression";
public static final String HELIX_CLUSTER_NAME = "controller.helix.cluster.name";
public static final String CLUSTER_TENANT_ISOLATION_ENABLE = "cluster.tenant.isolation.enable";
public static final String CONSOLE_WEBAPP_ROOT_PATH = "controller.query.console";
Expand Down Expand Up @@ -292,6 +293,7 @@ private static long getRandomInitialDelayInSeconds() {
private static final String DEFAULT_LINEAGE_MANAGER =
"org.apache.pinot.controller.helix.core.lineage.DefaultLineageManager";
private static final long DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS = 600_000L; // 10 minutes
private static final int DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION = -1;
private static final int DEFAULT_REALTIME_SEGMENT_METADATA_COMMIT_NUMLOCKS = 64;
private static final boolean DEFAULT_ENABLE_STORAGE_QUOTA_CHECK = true;
private static final boolean DEFAULT_ENABLE_BATCH_MESSAGE_MODE = false;
Expand Down Expand Up @@ -392,6 +394,10 @@ public void setUpdateSegmentStateModel(String updateStateModel) {
setProperty(UPDATE_SEGMENT_STATE_MODEL, updateStateModel);
}

public void setMinISSizeForCompression(int minSize) {
setProperty(MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION, minSize);
}

public void setZkStr(String zkStr) {
setProperty(CommonConstants.Helix.CONFIG_OF_ZOOKEEPR_SERVER, zkStr);
}
Expand Down Expand Up @@ -859,6 +865,10 @@ public long getSegmentUploadTimeoutInMillis() {
return getProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, DEFAULT_SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS);
}

public int getMinNumCharsInISToTurnOnCompression() {
return getProperty(MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION, DEFAULT_MIN_NUM_CHARS_IN_IS_TO_TURN_ON_COMPRESSION);
}

public void setSegmentUploadTimeoutInMillis(long segmentUploadTimeoutInMillis) {
setProperty(SEGMENT_UPLOAD_TIMEOUT_IN_MILLIS, segmentUploadTimeoutInMillis);
}
Expand Down

0 comments on commit 579082c

Please sign in to comment.