Skip to content

Commit

Permalink
Adding configs for zk client timeout (#9975)
Browse files Browse the repository at this point in the history
* Adding configs for zk client timeout

* Revert "[multistage][testing] Add test for null handling, round func and some other cases (#9950)"

This reverts commit 6a5f58b.
  • Loading branch information
xiangfu0 authored Dec 13, 2022
1 parent 6a5f58b commit 7f643b1
Show file tree
Hide file tree
Showing 10 changed files with 54 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.services.ServiceRole;
import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -37,7 +38,6 @@ private ServiceStartableUtils() {
private static final String CLUSTER_CONFIG_ZK_PATH_TEMPLATE = "/%s/CONFIGS/CLUSTER/%s";
private static final String PINOT_ALL_CONFIG_KEY_PREFIX = "pinot.all.";
private static final String PINOT_INSTANCE_CONFIG_KEY_PREFIX_TEMPLATE = "pinot.%s.";
private static final int ZK_TIMEOUT_MS = 30_000;

/**
* Applies the ZK cluster config to the given instance config if it does not already exist.
Expand All @@ -47,10 +47,19 @@ private ServiceStartableUtils() {
*/
public static void applyClusterConfig(PinotConfiguration instanceConfig, String zkAddress, String clusterName,
ServiceRole serviceRole) {

ZkClient zkClient = new ZkClient.Builder().setZkSerializer(new ZNRecordSerializer()).setZkServer(zkAddress)
.setConnectionTimeout(ZK_TIMEOUT_MS).build();
zkClient.waitUntilConnected(ZK_TIMEOUT_MS, TimeUnit.MILLISECONDS);
int zkClientSessionConfig =
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
int zkClientConnectionTimeoutMs =
instanceConfig.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
ZkClient zkClient = new ZkClient.Builder()
.setZkSerializer(new ZNRecordSerializer())
.setZkServer(zkAddress)
.setConnectionTimeout(zkClientConnectionTimeoutMs)
.setSessionTimeout(zkClientSessionConfig)
.build();
zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);

try {
ZNRecord clusterConfigZNRecord =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ private void setUpPinotController() {

// Set up Pinot cluster in Helix if needed
HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode,
_config.getLeadControllerResourceRebalanceStrategy());
_config);

// Start all components
initPinotFSFactory();
Expand Down Expand Up @@ -422,7 +422,8 @@ private void setUpPinotController() {

if (_config.getHLCTablesAllowed()) {
LOGGER.info("Realtime tables with High Level consumers will be supported");
_realtimeSegmentsManager = new PinotRealtimeSegmentManager(_helixResourceManager, _leadControllerManager);
_realtimeSegmentsManager =
new PinotRealtimeSegmentManager(_helixResourceManager, _leadControllerManager, _config);
_realtimeSegmentsManager.start(_controllerMetrics);
} else {
LOGGER.info("Realtime tables with High Level consumers will NOT be supported");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.pinot.common.utils.SegmentName;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.helix.HelixHelper;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
Expand Down Expand Up @@ -81,11 +82,13 @@ public class PinotRealtimeSegmentManager implements HelixPropertyListener, IZkCh
private ZkClient _zkClient;
private ControllerMetrics _controllerMetrics;
private final LeadControllerManager _leadControllerManager;
private final ControllerConf _controllerConf;

public PinotRealtimeSegmentManager(PinotHelixResourceManager pinotManager,
LeadControllerManager leadControllerManager) {
LeadControllerManager leadControllerManager, ControllerConf controllerConf) {
_pinotHelixResourceManager = pinotManager;
_leadControllerManager = leadControllerManager;
_controllerConf = controllerConf;
String clusterName = _pinotHelixResourceManager.getHelixClusterName();
_propertyStorePath = PropertyPathBuilder.propertyStore(clusterName);
_tableConfigPath = _propertyStorePath + TABLE_CONFIG;
Expand All @@ -96,9 +99,19 @@ public void start(ControllerMetrics controllerMetrics) {

LOGGER.info("Starting realtime segments manager, adding a listener on the property store table configs path.");
String zkUrl = _pinotHelixResourceManager.getHelixZkURL();
_zkClient = new ZkClient(zkUrl, ZkClient.DEFAULT_SESSION_TIMEOUT, ZkClient.DEFAULT_CONNECTION_TIMEOUT);
int zkClientSessionTimeoutMs =
_controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
int zkClientConnectionTimeoutMs =
_controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
_zkClient = new ZkClient.Builder()
.setZkServer(zkUrl)
.setSessionTimeout(zkClientSessionTimeoutMs)
.setConnectionTimeout(zkClientConnectionTimeoutMs)
.build();
_zkClient.setZkSerializer(new ZNRecordSerializer());
_zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
_zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);

// Subscribe to any data/child changes to property
_zkClient.subscribeChildChanges(_tableConfigPath, this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.apache.pinot.common.utils.helix.LeadControllerUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator;
import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator;
import org.apache.pinot.spi.utils.CommonConstants;
Expand Down Expand Up @@ -92,11 +93,22 @@ private static void setupHelixClusterIfNeeded(String helixClusterName, String zk
}

public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel,
boolean enableBatchMessageMode, String leadControllerResourceRebalanceStrategy) {
boolean enableBatchMessageMode, ControllerConf controllerConf) {
ZkClient zkClient = null;
int zkClientSessionConfig =
controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_SESSION_TIMEOUT_MS);
int zkClientConnectionTimeoutMs =
controllerConf.getProperty(CommonConstants.Helix.ZkClient.ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG,
CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS);
try {
zkClient = new ZkClient.Builder().setZkServer(zkPath).setZkSerializer(new ZNRecordSerializer()).build();
zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
zkClient = new ZkClient.Builder()
.setZkServer(zkPath)
.setSessionTimeout(zkClientSessionConfig)
.setConnectionTimeout(zkClientConnectionTimeoutMs)
.setZkSerializer(new ZNRecordSerializer())
.build();
zkClient.waitUntilConnected(zkClientConnectionTimeoutMs, TimeUnit.MILLISECONDS);
HelixAdmin helixAdmin = new ZKHelixAdmin(zkClient);
HelixDataAccessor helixDataAccessor =
new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient));
Expand All @@ -113,7 +125,7 @@ public static void setupPinotCluster(String helixClusterName, String zkPath, boo

// Add lead controller resource if needed
createLeadControllerResourceIfNeeded(helixClusterName, helixAdmin, configAccessor, enableBatchMessageMode,
leadControllerResourceRebalanceStrategy);
controllerConf.getLeadControllerResourceRebalanceStrategy());
} finally {
if (zkClient != null) {
zkClient.close();
Expand Down
20 changes: 0 additions & 20 deletions pinot-query-runtime/src/test/resources/queries/Cast.json

This file was deleted.

50 changes: 0 additions & 50 deletions pinot-query-runtime/src/test/resources/queries/MathFuncs.json
Original file line number Diff line number Diff line change
Expand Up @@ -535,55 +535,5 @@
"sql": "SELECT longCol / 1e20 FROM {numTbl}"
}
]
},
"round": {
"tables": {
"numTbl": {
"schema": [
{"name": "intCol", "type": "INT"},
{"name": "longCol", "type": "LONG"},
{"name": "doubleCol", "type": "DOUBLE"},
{"name": "floatCol", "type": "FLOAT"}
],
"inputs": [
[0, 3, 0.123, 3.2],
[123, 321, 4.242, 3.03],
[-456, -2, 1.134, 7.722],
[123, -456, 3.634, 9.12]
]
}
},
"queries": [
{
"description": "test round on integer columns",
"ignored": true,
"comment": "we round the number up somehow",
"sql": "SELECT round(intCol, 2) FROM {numTbl}"
},
{
"description": "test round on long columns",
"ignored": true,
"comment": "we round the number up somehow",
"sql": "SELECT round(longCol, 2) FROM {numTbl}"
},
{
"description": "test round on double columns",
"ignored": true,
"comment": "double is rounded to 0",
"sql": "SELECT round(doubleCol, 2) FROM {numTbl}"
},
{
"description": "test ceil on float columns",
"ignored": true,
"comment": "float is rounded to 0",
"sql": "SELECT round(floatCol, 2) FROM {numTbl}"
},
{
"ignored": true,
"comment": "Caught exception while initializing transform function: round",
"description": "test round on literal",
"sql": "SELECT round(2.0, 0) FROM {numTbl}"
}
]
}
}
40 changes: 0 additions & 40 deletions pinot-query-runtime/src/test/resources/queries/NullHanlding.json

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@
},
"queries": [
{ "sql": "SELECT * FROM {tbl} WHERE intCol > 5" },
{ "sql": "SELECT * FROM {tbl} WHERE strCol = 'foo'" },
{ "sql": "SELECT * FROM {tbl} WHERE strCol IN ('foo', 'bar')" },
{ "sql": "SELECT * FROM {tbl} WHERE intCol IN (196883, 42)" },
{ "sql": "SELECT * FROM {tbl} WHERE intCol IN (111, 222)" },
{ "sql": "SELECT * FROM {tbl} WHERE intCol NOT IN (196883, 42) AND strCol IN ('alice')" },
{ "sql": "SELECT * FROM {tbl} WHERE strCol IN (SELECT strCol FROM {tbl} WHERE intCol > 100)" },
{ "sql": "SELECT * FROM {tbl} WHERE intCol < (SELECT SUM(intCol) FROM {tbl} AS b WHERE strCol BETWEEN 'bar' AND 'foo')" },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,13 @@ public static class BrokerResourceStateModel {
}

public static class ZkClient {
public static final long DEFAULT_CONNECT_TIMEOUT_SEC = 60L;
public static final int DEFAULT_CONNECT_TIMEOUT_MS = 60_000;
public static final int DEFAULT_SESSION_TIMEOUT_MS = 30_000;
// Retry interval and count for ZK operations where we would rather fail than get an empty (wrong) result back
public static final int RETRY_INTERVAL_MS = 50;
public static final int RETRY_COUNT = 2;
public static final String ZK_CLIENT_CONNECTION_TIMEOUT_MS_CONFIG = "zk.client.connection.timeout.ms";
public static final String ZK_CLIENT_SESSION_TIMEOUT_MS_CONFIG = "zk.client.session.timeout.ms";
}

public static class DataSource {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public boolean execute()
ZkClient zkClient = new ZkClient(_zkAddress);
zkClient.setZkSerializer(new ZNRecordStreamingSerializer());
LOGGER.info("Connecting to Zookeeper at: {}", _zkAddress);
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS);
zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(zkClient);
ZKHelixDataAccessor zkHelixDataAccessor = new ZKHelixDataAccessor(_clusterName, baseDataAccessor);
PropertyKey property = zkHelixDataAccessor.keyBuilder().liveInstances();
Expand Down

0 comments on commit 7f643b1

Please sign in to comment.