Skip to content

Commit

Permalink
Adding pinot client connection config to allow skip fail on broker re…
Browse files Browse the repository at this point in the history
…sponse exception (apache#7816)
  • Loading branch information
xiangfu0 authored and kriti-sc committed Dec 12, 2021
1 parent 7f13774 commit 63590df
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.client;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -31,19 +32,32 @@
* A connection to Pinot, normally created through calls to the {@link ConnectionFactory}.
*/
public class Connection {
public static final String FAIL_ON_EXCEPTIONS = "failOnExceptions";
private static final Logger LOGGER = LoggerFactory.getLogger(Connection.class);

private final PinotClientTransport _transport;
private final BrokerSelector _brokerSelector;
private final boolean _failOnExceptions;

Connection(List<String> brokerList, PinotClientTransport transport) {
LOGGER.info("Creating connection to broker list {}", brokerList);
_brokerSelector = new SimpleBrokerSelector(brokerList);
_transport = transport;
this(new Properties(), new SimpleBrokerSelector(brokerList), transport);
}

Connection(Properties properties, List<String> brokerList, PinotClientTransport transport) {
this(properties, new SimpleBrokerSelector(brokerList), transport);
LOGGER.info("Created connection to broker list {}", brokerList);
}

Connection(BrokerSelector brokerSelector, PinotClientTransport transport) {
this(new Properties(), brokerSelector, transport);
}

Connection(Properties properties, BrokerSelector brokerSelector, PinotClientTransport transport) {
_brokerSelector = brokerSelector;
_transport = transport;

// Default fail Pinot query if response contains any exception.
_failOnExceptions = Boolean.parseBoolean(properties.getProperty(FAIL_ON_EXCEPTIONS, "TRUE"));
}

/**
Expand Down Expand Up @@ -123,7 +137,7 @@ public ResultSetGroup execute(String tableName, Request request)
"Could not find broker to query for table: " + (tableName == null ? "null" : tableName));
}
BrokerResponse response = _transport.executeQuery(brokerHostPort, request);
if (response.hasExceptions()) {
if (response.hasExceptions() && _failOnExceptions) {
throw new PinotClientException("Query had processing exceptions: \n" + response.getExceptions());
}
return new ResultSetGroup(response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,31 @@ public static Connection fromZookeeper(String zkUrl, PinotClientTransport transp
}
}

/**
* Creates a connection to a Pinot cluster, given its Zookeeper URL
*
* @param properties The Pinot connection properties
* @param zkUrl The URL to the Zookeeper cluster, must include the cluster name e.g host:port/chroot/pinot-cluster
* @param transport pinot transport
* @return A connection that connects to the brokers in the given Helix cluster
*/
public static Connection fromZookeeper(Properties properties, String zkUrl, PinotClientTransport transport) {
try {
return fromZookeeper(properties, new DynamicBrokerSelector(zkUrl), transport);
} catch (Exception e) {
throw new PinotClientException(e);
}
}

@VisibleForTesting
static Connection fromZookeeper(DynamicBrokerSelector dynamicBrokerSelector, PinotClientTransport transport) {
return new Connection(dynamicBrokerSelector, transport);
return fromZookeeper(new Properties(), dynamicBrokerSelector, transport);
}

@VisibleForTesting
static Connection fromZookeeper(Properties properties, DynamicBrokerSelector dynamicBrokerSelector,
PinotClientTransport transport) {
return new Connection(properties, dynamicBrokerSelector, transport);
}

/**
Expand All @@ -81,7 +103,7 @@ public static Connection fromProperties(Properties properties) {
* @return A connection that connects to the brokers specified in the properties
*/
public static Connection fromProperties(Properties properties, PinotClientTransport transport) {
return new Connection(Arrays.asList(properties.getProperty("brokerList").split(",")), transport);
return new Connection(properties, Arrays.asList(properties.getProperty("brokerList").split(",")), transport);
}

/**
Expand All @@ -105,6 +127,19 @@ public static Connection fromHostList(List<String> brokers, PinotClientTransport
return new Connection(brokers, transport);
}

/**
* Creates a connection which sends queries randomly between the specified brokers.
*
* @param properties The Pinot connection properties
* @param brokers The list of brokers to send queries to
* @param transport pinot transport
* @return A connection to the set of brokers specified
*/
public static Connection fromHostList(Properties properties, List<String> brokers,
PinotClientTransport transport) {
return new Connection(properties, brokers, transport);
}

private static PinotClientTransport getDefault() {
if (_defaultTransport == null) {
_defaultTransport = new JsonAsyncHttpPinotClientTransportFactory().buildTransport();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.JsonNode;
import java.util.ArrayList;
import java.util.List;
import javax.annotation.Nullable;


/**
Expand All @@ -29,6 +30,7 @@
public class ResultSetGroup {
private final List<ResultSet> _resultSets;
private final ExecutionStats _executionStats;
private final List<PinotClientException> _exceptions;

ResultSetGroup(BrokerResponse brokerResponse) {
_resultSets = new ArrayList<>();
Expand All @@ -53,8 +55,19 @@ public class ResultSetGroup {
}
}
}

_executionStats = brokerResponse.getExecutionStats();
_exceptions = getPinotClientExceptions(brokerResponse.getExceptions());
}

private static List<PinotClientException> getPinotClientExceptions(
@Nullable JsonNode exceptionsJson) {
List<PinotClientException> exceptions = new ArrayList<>();
if (exceptionsJson != null && exceptionsJson.isArray()) {
for (int i = 0; i < exceptionsJson.size(); i++) {
exceptions.add(new PinotClientException(exceptionsJson.get(i).toPrettyString()));
}
}
return exceptions;
}

/**
Expand All @@ -81,6 +94,10 @@ public ExecutionStats getExecutionStats() {
return _executionStats;
}

public List<PinotClientException> getExceptions() {
return _exceptions;
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
Expand All @@ -89,6 +106,11 @@ public String toString() {
sb.append("\n");
}
sb.append(_executionStats.toString());
sb.append("\n");
for (PinotClientException exception : _exceptions) {
sb.append(exception);
sb.append("\n");
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.InputStream;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.Future;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -116,12 +117,31 @@ public void testDeserializeExceptionResultSet() {
}
}

@Test
public void testDeserializeExceptionResultSetSkipFail() {
try {
final ResultSetGroup resultSet = getResultSetSkipError("exception.json");
Assert.assertTrue(resultSet.getExceptions().size() > 0);
} catch (PinotClientException e) {
Assert.fail("Execute should have thrown an exception");
}
}

private ResultSetGroup getResultSet(String resourceName) {
_dummyJsonTransport._resource = resourceName;
Connection connection = ConnectionFactory.fromHostList(Collections.singletonList("dummy"), _dummyJsonTransport);
return connection.execute("dummy");
}

private ResultSetGroup getResultSetSkipError(String resourceName) {
_dummyJsonTransport._resource = resourceName;
Properties props = new Properties();
props.setProperty("failOnExceptions", "false");
Connection connection =
ConnectionFactory.fromHostList(props, Collections.singletonList("dummy"), _dummyJsonTransport);
return connection.execute("dummy");
}

static class DummyJsonTransport implements PinotClientTransport {
public String _resource;

Expand Down

0 comments on commit 63590df

Please sign in to comment.