Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

adding optional client payload #9465

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,21 @@ public static Connection fromZookeeper(String zkUrl, PinotClientTransport transp
}
}

/**
* Creates a connection to a Pinot cluster, given its Zookeeper URL and properties.
*
* @param properties connection properties
* @param zkUrl zookeeper URL.
* @return A connection that connects to the brokers in the given Helix cluster
*/
public static Connection fromZookeeper(Properties properties, String zkUrl) {
try {
return fromZookeeper(properties, new DynamicBrokerSelector(zkUrl), getDefault(properties));
} catch (Exception e) {
throw new PinotClientException(e);
}
}

/**
*
* @param scheme controller URL scheme
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.client.utils.ConnectionUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
Expand All @@ -51,25 +52,29 @@
public class JsonAsyncHttpPinotClientTransport implements PinotClientTransport {
private static final Logger LOGGER = LoggerFactory.getLogger(JsonAsyncHttpPinotClientTransport.class);
private static final ObjectReader OBJECT_READER = JsonUtils.DEFAULT_READER;
private static final String DEFAULT_EXTRA_QUERY_OPTION_STRING = "groupByMode=sql;responseFormat=sql";

private final Map<String, String> _headers;
private final String _scheme;

private final int _brokerReadTimeout;
private final AsyncHttpClient _httpClient;
private final String _extraOptionStr;

public JsonAsyncHttpPinotClientTransport() {
_brokerReadTimeout = 60000;
_headers = new HashMap<>();
_scheme = CommonConstants.HTTP_PROTOCOL;
_extraOptionStr = DEFAULT_EXTRA_QUERY_OPTION_STRING;
_httpClient = Dsl.asyncHttpClient();
}

public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme,
public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme, String extraOptionString,
@Nullable SSLContext sslContext, ConnectionTimeouts connectionTimeouts, TlsProtocols tlsProtocols) {
_brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
_headers = headers;
_scheme = scheme;
_extraOptionStr = StringUtils.isEmpty(extraOptionString) ? DEFAULT_EXTRA_QUERY_OPTION_STRING : extraOptionString;

Builder builder = Dsl.config();
if (sslContext != null) {
Expand All @@ -84,11 +89,12 @@ public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String sch
_httpClient = Dsl.asyncHttpClient(builder.build());
}

public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme,
public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String scheme, String extraOptionStr,
@Nullable SslContext sslContext, ConnectionTimeouts connectionTimeouts, TlsProtocols tlsProtocols) {
_brokerReadTimeout = connectionTimeouts.getReadTimeoutMs();
_headers = headers;
_scheme = scheme;
_extraOptionStr = StringUtils.isEmpty(extraOptionStr) ? DEFAULT_EXTRA_QUERY_OPTION_STRING : extraOptionStr;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't we ned to concat the DEFAULT with what user passed in to keep behavior same ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah I thought about it and wasn't sure if this is a good idea since these 2 options have been in the deprecation process for a while. but good call out

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ohh yeah. Let's probably ignore then


Builder builder = Dsl.config();
if (sslContext != null) {
Expand Down Expand Up @@ -118,7 +124,7 @@ public Future<BrokerResponse> executeQueryAsync(String brokerAddress, String que
try {
ObjectNode json = JsonNodeFactory.instance.objectNode();
json.put("sql", query);
json.put("queryOptions", "groupByMode=sql;responseFormat=sql");
json.put("queryOptions", _extraOptionStr);

String url = _scheme + "://" + brokerAddress + "/query/sql";
BoundRequestBuilder requestBuilder = _httpClient.preparePost(url);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,15 @@ public class JsonAsyncHttpPinotClientTransportFactory implements PinotClientTran
private int _readTimeoutMs = Integer.parseInt(DEFAULT_BROKER_READ_TIMEOUT_MS);
private int _connectTimeoutMs = Integer.parseInt(DEFAULT_BROKER_READ_TIMEOUT_MS);
private int _handshakeTimeoutMs = Integer.parseInt(DEFAULT_BROKER_HANDSHAKE_TIMEOUT_MS);
private String _extraOptionString;

@Override
public PinotClientTransport buildTransport() {
ConnectionTimeouts connectionTimeouts = ConnectionTimeouts.create(_readTimeoutMs, _connectTimeoutMs,
_handshakeTimeoutMs);
TlsProtocols tlsProtocols = TlsProtocols.defaultProtocols(_tlsV10Enabled);
return new JsonAsyncHttpPinotClientTransport(_headers, _scheme, _sslContext, connectionTimeouts, tlsProtocols);
return new JsonAsyncHttpPinotClientTransport(_headers, _scheme, _extraOptionString, _sslContext, connectionTimeouts,
tlsProtocols);
}
public Map<String, String> getHeaders() {
return _headers;
Expand Down Expand Up @@ -98,6 +100,8 @@ public JsonAsyncHttpPinotClientTransportFactory withConnectionProperties(Propert
DEFAULT_BROKER_TLS_V10_ENABLED))
|| Boolean.parseBoolean(System.getProperties().getProperty("broker.tlsV10Enabled",
DEFAULT_BROKER_TLS_V10_ENABLED));

_extraOptionString = properties.getProperty("queryOptions", "");
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ private void testHardcodedQueriesCommon()
query = "SELECT MAX(ArrDelay) + MAX(AirTime) FROM mytable";
testQuery(query);
query = "SELECT MAX(ArrDelay) - MAX(AirTime), DaysSinceEpoch FROM mytable GROUP BY DaysSinceEpoch ORDER BY MAX"
+ "(ArrDelay) - MIN(AirTime) DESC";
+ "(ArrDelay) - MIN(AirTime) DESC, DaysSinceEpoch";
testQuery(query);
query = "SELECT DaysSinceEpoch, MAX(ArrDelay) * 2 - MAX(AirTime) - 3 FROM mytable GROUP BY DaysSinceEpoch ORDER BY "
+ "MAX(ArrDelay) - MIN(AirTime) DESC";
+ "MAX(ArrDelay) - MIN(AirTime) DESC, DaysSinceEpoch";
testQuery(query);

// Having
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,10 @@
import com.google.common.collect.ImmutableMap;
import java.io.File;
import java.util.List;
import java.util.Properties;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.client.Connection;
import org.apache.pinot.client.ConnectionFactory;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.query.service.QueryConfig;
import org.apache.pinot.spi.config.table.TableConfig;
Expand Down Expand Up @@ -95,6 +98,16 @@ public void testGeneratedQueries()
super.testGeneratedQueries(false, true);
}

@Override
protected Connection getPinotConnection() {
Properties properties = new Properties();
properties.put("queryOptions", "useMultistageEngine=true");
if (_pinotConnection == null) {
_pinotConnection = ConnectionFactory.fromZookeeper(properties, getZkUrl() + "/" + getHelixClusterName());
}
return _pinotConnection;
}

@Override
protected void overrideBrokerConf(PinotConfiguration brokerConf) {
brokerConf.setProperty(CommonConstants.Helix.CONFIG_OF_MULTI_STAGE_ENGINE_ENABLED, true);
Expand Down