Skip to content

Commit

Permalink
Add support for Auth in controller requests in java query client (#9230)
Browse files Browse the repository at this point in the history
* Add support for auth in controller requests

* Use a different user agent for broker cache requests for better tracking

* Replace pinot-core with pinot-common

Co-authored-by: Kartik Khare <[email protected]>
  • Loading branch information
KKcorps and Kartik Khare authored Sep 7, 2022
1 parent 77c1f69 commit 15e9398
Show file tree
Hide file tree
Showing 9 changed files with 202 additions and 33 deletions.
5 changes: 5 additions & 0 deletions pinot-clients/pinot-java-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@
<artifactId>testng</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.pinot</groupId>
<artifactId>pinot-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,28 @@

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.core.type.TypeReference;
import io.netty.handler.ssl.ClientAuth;
import io.netty.handler.ssl.JdkSslContext;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.pinot.client.utils.ConnectionUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Dsl;
import org.asynchttpclient.Response;

Expand Down Expand Up @@ -70,20 +77,60 @@ public void setPort(Integer port) {
private static final TypeReference<Map<String, List<BrokerInstance>>> RESPONSE_TYPE_REF =
new TypeReference<Map<String, List<BrokerInstance>>>() {
};
private static final String DEFAULT_CONTROLLER_READ_TIMEOUT_MS = "60000";
private static final String DEFAULT_CONTROLLER_CONNECT_TIMEOUT_MS = "2000";
private static final String DEFAULT_CONTROLLER_HANDSHAKE_TIMEOUT_MS = "2000";
private static final String DEFAULT_CONTROLLER_TLS_V10_ENABLED = "false";
private static final String SCHEME = "scheme";

private final Random _random = new Random();
private final AsyncHttpClient _client;
private final String _address;
private final Map<String, String> _headers;
private final Properties _properties;
private volatile BrokerData _brokerData;

public BrokerCache(String scheme, String controllerHost, int controllerPort) {
_client = Dsl.asyncHttpClient();
public BrokerCache(Properties properties, String controllerUrl) {
String scheme = properties.getProperty(SCHEME, CommonConstants.HTTP_PROTOCOL);
DefaultAsyncHttpClientConfig.Builder builder = Dsl.config();
if (scheme.contentEquals(CommonConstants.HTTPS_PROTOCOL)) {
SSLContext sslContext = ConnectionUtils.getSSLContextFromProperties(properties);
builder.setSslContext(new JdkSslContext(sslContext, true, ClientAuth.OPTIONAL));
}

int readTimeoutMs = Integer.parseInt(properties.getProperty("controllerReadTimeoutMs",
DEFAULT_CONTROLLER_READ_TIMEOUT_MS));
int connectTimeoutMs = Integer.parseInt(properties.getProperty("controllerConnectTimeoutMs",
DEFAULT_CONTROLLER_CONNECT_TIMEOUT_MS));
int handshakeTimeoutMs = Integer.parseInt(properties.getProperty("controllerHandshakeTimeoutMs",
DEFAULT_CONTROLLER_HANDSHAKE_TIMEOUT_MS));
boolean tlsV10Enabled = Boolean.parseBoolean(properties.getProperty("controllerTlsV10Enabled",
DEFAULT_CONTROLLER_TLS_V10_ENABLED))
|| Boolean.parseBoolean(System.getProperties().getProperty("controller.tlsV10Enabled",
DEFAULT_CONTROLLER_TLS_V10_ENABLED));

TlsProtocols tlsProtocols = TlsProtocols.defaultProtocols(tlsV10Enabled);
builder.setReadTimeout(readTimeoutMs)
.setConnectTimeout(connectTimeoutMs)
.setHandshakeTimeout(handshakeTimeoutMs)
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua_broker_cache"))
.setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new String[0]));

_client = Dsl.asyncHttpClient(builder.build());
ControllerRequestURLBuilder controllerRequestURLBuilder =
ControllerRequestURLBuilder.baseUrl(scheme + "://" + controllerHost + ":" + controllerPort);
ControllerRequestURLBuilder.baseUrl(scheme + "://" + controllerUrl);
_address = controllerRequestURLBuilder.forLiveBrokerTablesGet();
_headers = ConnectionUtils.getHeadersFromProperties(properties);
_properties = properties;
}

private Map<String, List<BrokerInstance>> getTableToBrokersData() throws Exception {
BoundRequestBuilder getRequest = _client.prepareGet(_address);

if (_headers != null) {
_headers.forEach((k, v) -> getRequest.addHeader(k, v));
}

Future<Response> responseFuture = getRequest.addHeader("accept", "application/json").execute();
Response response = responseFuture.get();
String responseBody = response.getResponseBody(StandardCharsets.UTF_8);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pinot.client;

import java.util.List;
import java.util.Properties;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand All @@ -29,17 +30,22 @@
* Maintains broker cache this is updated periodically
*/
public class BrokerCacheUpdaterPeriodic implements UpdatableBrokerCache {
public static final String BROKER_UPDATE_FREQUENCY_MILLIS = "brokerUpdateFrequencyInMillis";
public static final String DEFAULT_BROKER_UPDATE_FREQUENCY_MILLIS = "defaultBrokerUpdateFrequencyInMillis";

private final BrokerCache _brokerCache;
private final ScheduledExecutorService _scheduledExecutorService;
private final long _brokerUpdateFreqInMillis;
private final Properties _properties;

private static final Logger LOGGER = LoggerFactory.getLogger(BrokerCacheUpdaterPeriodic.class);

public BrokerCacheUpdaterPeriodic(String scheme, String controllerHost,
int controllerPort, long brokerUpdateFreqInMillis) {
_brokerCache = new BrokerCache(scheme, controllerHost, controllerPort);
public BrokerCacheUpdaterPeriodic(Properties properties, String controllerUrl) {
_properties = properties;
_brokerCache = new BrokerCache(properties, controllerUrl);
_scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
_brokerUpdateFreqInMillis = brokerUpdateFreqInMillis;
_brokerUpdateFreqInMillis = Long.parseLong(
properties.getProperty(BROKER_UPDATE_FREQUENCY_MILLIS, DEFAULT_BROKER_UPDATE_FREQUENCY_MILLIS));
}

public void init() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,36 +59,58 @@ public static Connection fromZookeeper(String zkUrl, PinotClientTransport transp
}

/**
* Creates a connection to Pinot cluster, given its Controller URL
*
* @param scheme controller URL scheme
* @param controllerHost controller host
* @param controllerPort controller port
* @return A connection that connects to brokers as per the given controller
*/
@Deprecated
public static Connection fromController(String scheme, String controllerHost, int controllerPort) {
return fromController(scheme, controllerHost, controllerPort, 1000);
return fromController(new Properties(), scheme, controllerHost, controllerPort);
}

/**
*
* @param scheme controller URL scheme
* @param controllerHost controller host
* @param controllerPort controller port
* @param brokerUpdateFreqInMillis frequency of broker data refresh using controller APIs
* @return A connection that connects to brokers as per the given controller
*/
public static Connection fromController(String scheme, String controllerHost, int controllerPort,
long brokerUpdateFreqInMillis) {
@Deprecated
public static Connection fromController(Properties properties, String scheme,
String controllerHost, int controllerPort) {
try {
return new Connection(new Properties(),
new ControllerBasedBrokerSelector(scheme, controllerHost, controllerPort, brokerUpdateFreqInMillis),
return new Connection(properties,
new ControllerBasedBrokerSelector(scheme, controllerHost, controllerPort, properties),
getDefault());
} catch (Exception e) {
throw new PinotClientException(e);
}
}

/**
* @param controllerUrl url host:port of the controller
* @return A connection that connects to brokers as per the given controller
*/
public static Connection fromController(String controllerUrl) {
return fromController(new Properties(), controllerUrl);
}

/**
* @param properties
* @param controllerUrl url host:port of the controller
* @return A connection that connects to brokers as per the given controller
*/
public static Connection fromController(Properties properties, String controllerUrl) {
try {
return new Connection(properties,
new ControllerBasedBrokerSelector(properties, controllerUrl),
getDefault(properties));
} catch (Exception e) {
throw new PinotClientException(e);
}
}
/**
* Creates a connection to a Pinot cluster, given its Zookeeper URL
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,40 @@
package org.apache.pinot.client;

import java.util.List;
import java.util.Properties;


/**
* Maintains broker cache using controller APIs
*/
public class ControllerBasedBrokerSelector implements BrokerSelector {
private static final String SCHEME = "scheme";

private final UpdatableBrokerCache _brokerCache;
private final Properties _properties;

public ControllerBasedBrokerSelector(String scheme, String controllerHost, int controllerPort,
long brokerUpdateFreqInMillis)
public ControllerBasedBrokerSelector(String scheme, String controllerHost, int controllerPort)
throws Exception {
_brokerCache = new BrokerCacheUpdaterPeriodic(scheme, controllerHost, controllerPort, brokerUpdateFreqInMillis);
this(scheme, controllerHost, controllerPort, new Properties());
}

public ControllerBasedBrokerSelector(String scheme, String controllerHost, int controllerPort, Properties properties)
throws Exception {
_properties = properties;
String controllerUrl = controllerHost + ":" + controllerPort;
_properties.setProperty(SCHEME, scheme);
_brokerCache = new BrokerCacheUpdaterPeriodic(_properties, controllerUrl);
_brokerCache.init();
}

public ControllerBasedBrokerSelector(Properties properties, String controllerUrl)
throws Exception {
_properties = properties;
_brokerCache = new BrokerCacheUpdaterPeriodic(properties, controllerUrl);
_brokerCache.init();
}


@Override
public String selectBroker(String table) {
return _brokerCache.getBroker(table);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import javax.net.ssl.SSLContext;
import org.apache.pinot.client.utils.ConnectionUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.asynchttpclient.AsyncHttpClient;
Expand Down Expand Up @@ -79,7 +79,7 @@ public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String sch
builder.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
.setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
.setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
.setUserAgent(getUserAgentVersionFromClassPath())
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua"))
.setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new String[0]));
_httpClient = Dsl.asyncHttpClient(builder.build());
}
Expand All @@ -98,24 +98,11 @@ public JsonAsyncHttpPinotClientTransport(Map<String, String> headers, String sch
builder.setReadTimeout(connectionTimeouts.getReadTimeoutMs())
.setConnectTimeout(connectionTimeouts.getConnectTimeoutMs())
.setHandshakeTimeout(connectionTimeouts.getHandshakeTimeoutMs())
.setUserAgent(getUserAgentVersionFromClassPath())
.setUserAgent(ConnectionUtils.getUserAgentVersionFromClassPath("ua"))
.setEnabledProtocols(tlsProtocols.getEnabledProtocols().toArray(new String[0]));
_httpClient = Dsl.asyncHttpClient(builder.build());
}

private String getUserAgentVersionFromClassPath() {
Properties userAgentProperties = new Properties();
try {
userAgentProperties.load(JsonAsyncHttpPinotClientTransport.class.getClassLoader()
.getResourceAsStream("version.properties"));
} catch (IOException e) {
LOGGER.warn("Unable to set user agent version");
}
return userAgentProperties.getProperty("ua", "pinot-java");
}



@Override
public BrokerResponse executeQuery(String brokerAddress, String query)
throws PinotClientException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.Properties;
import javax.net.ssl.SSLContext;
import org.apache.pinot.client.utils.ConnectionUtils;
import org.apache.pinot.spi.utils.CommonConstants;


Expand Down Expand Up @@ -75,6 +76,18 @@ public void setSslContext(SSLContext sslContext) {
}

public JsonAsyncHttpPinotClientTransportFactory withConnectionProperties(Properties properties) {
if (_headers == null || _headers.isEmpty()) {
_headers = ConnectionUtils.getHeadersFromProperties(properties);
}

if (_scheme == null) {
_scheme = properties.getProperty("scheme", CommonConstants.HTTP_PROTOCOL);
}

if (_sslContext == null && _scheme.contentEquals(CommonConstants.HTTPS_PROTOCOL)) {
_sslContext = ConnectionUtils.getSSLContextFromProperties(properties);
}

_readTimeoutMs = Integer.parseInt(properties.getProperty("brokerReadTimeoutMs",
DEFAULT_BROKER_READ_TIMEOUT_MS));
_connectTimeoutMs = Integer.parseInt(properties.getProperty("brokerConnectTimeoutMs",
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.client.utils;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
import javax.net.ssl.SSLContext;
import org.apache.commons.configuration.MapConfiguration;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.config.TlsConfig;
import org.apache.pinot.common.utils.TlsUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


public class ConnectionUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(ConnectionUtils.class);

public static final String INFO_HEADERS = "headers";
public static final String PINOT_JAVA_TLS_PREFIX = "pinot.java_client.tls";

private ConnectionUtils() {
}

public static Map<String, String> getHeadersFromProperties(Properties info) {
return info.entrySet().stream().filter(entry -> entry.getKey().toString().startsWith(INFO_HEADERS + ".")).map(
entry -> Pair.of(entry.getKey().toString().substring(INFO_HEADERS.length() + 1),
entry.getValue().toString()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
}

public static SSLContext getSSLContextFromProperties(Properties properties) {
TlsConfig tlsConfig = TlsUtils.extractTlsConfig(
new PinotConfiguration(new MapConfiguration(properties)), PINOT_JAVA_TLS_PREFIX);
TlsUtils.installDefaultSSLSocketFactory(tlsConfig);
return TlsUtils.getSslContext();
}


public static String getUserAgentVersionFromClassPath(String userAgentKey) {
Properties userAgentProperties = new Properties();
try {
userAgentProperties.load(ConnectionUtils.class.getClassLoader()
.getResourceAsStream("version.properties"));
} catch (IOException e) {
LOGGER.warn("Unable to set user agent version");
}
return userAgentProperties.getProperty(userAgentKey, "pinot-java");
}
}
Loading

0 comments on commit 15e9398

Please sign in to comment.