From 63590df9215262839ed5511db9c9b084510cfd2e Mon Sep 17 00:00:00 2001 From: Xiang Fu Date: Tue, 23 Nov 2021 12:15:04 -0800 Subject: [PATCH] Adding pinot client connection config to allow skip fail on broker response exception (#7816) --- .../org/apache/pinot/client/Connection.java | 22 +++++++++-- .../pinot/client/ConnectionFactory.java | 39 ++++++++++++++++++- .../apache/pinot/client/ResultSetGroup.java | 24 +++++++++++- .../pinot/client/ResultSetGroupTest.java | 20 ++++++++++ 4 files changed, 98 insertions(+), 7 deletions(-) diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java index dac739f047ad..8033c9abe654 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/Connection.java @@ -19,6 +19,7 @@ package org.apache.pinot.client; import java.util.List; +import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -31,19 +32,32 @@ * A connection to Pinot, normally created through calls to the {@link ConnectionFactory}. */ public class Connection { + public static final String FAIL_ON_EXCEPTIONS = "failOnExceptions"; private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class); + private final PinotClientTransport _transport; private final BrokerSelector _brokerSelector; + private final boolean _failOnExceptions; Connection(List brokerList, PinotClientTransport transport) { - LOGGER.info("Creating connection to broker list {}", brokerList); - _brokerSelector = new SimpleBrokerSelector(brokerList); - _transport = transport; + this(new Properties(), new SimpleBrokerSelector(brokerList), transport); + } + + Connection(Properties properties, List brokerList, PinotClientTransport transport) { + this(properties, new SimpleBrokerSelector(brokerList), transport); + LOGGER.info("Created connection to broker list {}", brokerList); } Connection(BrokerSelector brokerSelector, PinotClientTransport transport) { + this(new Properties(), brokerSelector, transport); + } + + Connection(Properties properties, BrokerSelector brokerSelector, PinotClientTransport transport) { _brokerSelector = brokerSelector; _transport = transport; + + // Default fail Pinot query if response contains any exception. + _failOnExceptions = Boolean.parseBoolean(properties.getProperty(FAIL_ON_EXCEPTIONS, "TRUE")); } /** @@ -123,7 +137,7 @@ public ResultSetGroup execute(String tableName, Request request) "Could not find broker to query for table: " + (tableName == null ? "null" : tableName)); } BrokerResponse response = _transport.executeQuery(brokerHostPort, request); - if (response.hasExceptions()) { + if (response.hasExceptions() && _failOnExceptions) { throw new PinotClientException("Query had processing exceptions: \n" + response.getExceptions()); } return new ResultSetGroup(response); diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java index 1f9af747e0d9..a0814915551c 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ConnectionFactory.java @@ -58,9 +58,31 @@ public static Connection fromZookeeper(String zkUrl, PinotClientTransport transp } } + /** + * Creates a connection to a Pinot cluster, given its Zookeeper URL + * + * @param properties The Pinot connection properties + * @param zkUrl The URL to the Zookeeper cluster, must include the cluster name e.g host:port/chroot/pinot-cluster + * @param transport pinot transport + * @return A connection that connects to the brokers in the given Helix cluster + */ + public static Connection fromZookeeper(Properties properties, String zkUrl, PinotClientTransport transport) { + try { + return fromZookeeper(properties, new DynamicBrokerSelector(zkUrl), transport); + } catch (Exception e) { + throw new PinotClientException(e); + } + } + @VisibleForTesting static Connection fromZookeeper(DynamicBrokerSelector dynamicBrokerSelector, PinotClientTransport transport) { - return new Connection(dynamicBrokerSelector, transport); + return fromZookeeper(new Properties(), dynamicBrokerSelector, transport); + } + + @VisibleForTesting + static Connection fromZookeeper(Properties properties, DynamicBrokerSelector dynamicBrokerSelector, + PinotClientTransport transport) { + return new Connection(properties, dynamicBrokerSelector, transport); } /** @@ -81,7 +103,7 @@ public static Connection fromProperties(Properties properties) { * @return A connection that connects to the brokers specified in the properties */ public static Connection fromProperties(Properties properties, PinotClientTransport transport) { - return new Connection(Arrays.asList(properties.getProperty("brokerList").split(",")), transport); + return new Connection(properties, Arrays.asList(properties.getProperty("brokerList").split(",")), transport); } /** @@ -105,6 +127,19 @@ public static Connection fromHostList(List brokers, PinotClientTransport return new Connection(brokers, transport); } + /** + * Creates a connection which sends queries randomly between the specified brokers. + * + * @param properties The Pinot connection properties + * @param brokers The list of brokers to send queries to + * @param transport pinot transport + * @return A connection to the set of brokers specified + */ + public static Connection fromHostList(Properties properties, List brokers, + PinotClientTransport transport) { + return new Connection(properties, brokers, transport); + } + private static PinotClientTransport getDefault() { if (_defaultTransport == null) { _defaultTransport = new JsonAsyncHttpPinotClientTransportFactory().buildTransport(); diff --git a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java index 67c40038b5b4..e7e05e7fc417 100644 --- a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java +++ b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/ResultSetGroup.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.JsonNode; import java.util.ArrayList; import java.util.List; +import javax.annotation.Nullable; /** @@ -29,6 +30,7 @@ public class ResultSetGroup { private final List _resultSets; private final ExecutionStats _executionStats; + private final List _exceptions; ResultSetGroup(BrokerResponse brokerResponse) { _resultSets = new ArrayList<>(); @@ -53,8 +55,19 @@ public class ResultSetGroup { } } } - _executionStats = brokerResponse.getExecutionStats(); + _exceptions = getPinotClientExceptions(brokerResponse.getExceptions()); + } + + private static List getPinotClientExceptions( + @Nullable JsonNode exceptionsJson) { + List exceptions = new ArrayList<>(); + if (exceptionsJson != null && exceptionsJson.isArray()) { + for (int i = 0; i < exceptionsJson.size(); i++) { + exceptions.add(new PinotClientException(exceptionsJson.get(i).toPrettyString())); + } + } + return exceptions; } /** @@ -81,6 +94,10 @@ public ExecutionStats getExecutionStats() { return _executionStats; } + public List getExceptions() { + return _exceptions; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -89,6 +106,11 @@ public String toString() { sb.append("\n"); } sb.append(_executionStats.toString()); + sb.append("\n"); + for (PinotClientException exception : _exceptions) { + sb.append(exception); + sb.append("\n"); + } return sb.toString(); } } diff --git a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java index 77b337c3f2c5..e9bee6379765 100644 --- a/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java +++ b/pinot-clients/pinot-java-client/src/test/java/org/apache/pinot/client/ResultSetGroupTest.java @@ -21,6 +21,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import java.io.InputStream; import java.util.Collections; +import java.util.Properties; import java.util.concurrent.Future; import org.testng.Assert; import org.testng.annotations.Test; @@ -116,12 +117,31 @@ public void testDeserializeExceptionResultSet() { } } + @Test + public void testDeserializeExceptionResultSetSkipFail() { + try { + final ResultSetGroup resultSet = getResultSetSkipError("exception.json"); + Assert.assertTrue(resultSet.getExceptions().size() > 0); + } catch (PinotClientException e) { + Assert.fail("Execute should have thrown an exception"); + } + } + private ResultSetGroup getResultSet(String resourceName) { _dummyJsonTransport._resource = resourceName; Connection connection = ConnectionFactory.fromHostList(Collections.singletonList("dummy"), _dummyJsonTransport); return connection.execute("dummy"); } + private ResultSetGroup getResultSetSkipError(String resourceName) { + _dummyJsonTransport._resource = resourceName; + Properties props = new Properties(); + props.setProperty("failOnExceptions", "false"); + Connection connection = + ConnectionFactory.fromHostList(props, Collections.singletonList("dummy"), _dummyJsonTransport); + return connection.execute("dummy"); + } + static class DummyJsonTransport implements PinotClientTransport { public String _resource;