Skip to content

Commit

Permalink
Pass Pinot Connection properties from JDBC driver (apache#7822)
Browse files Browse the repository at this point in the history
  • Loading branch information
xiangfu0 authored and kriti-sc committed Dec 12, 2021
1 parent b7fd066 commit c255c94
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Properties;
import org.apache.pinot.client.base.AbstractBaseConnection;
import org.apache.pinot.client.controller.PinotControllerTransport;
import org.apache.pinot.client.controller.response.ControllerTenantBrokerResponse;
Expand All @@ -40,11 +41,20 @@ public class PinotConnection extends AbstractBaseConnection {
private PinotControllerTransport _controllerTransport;

PinotConnection(String controllerURL, PinotClientTransport transport, String tenant) {
this(controllerURL, transport, tenant, null);
this(new Properties(), controllerURL, transport, tenant, null);
}

PinotConnection(Properties properties, String controllerURL, PinotClientTransport transport, String tenant) {
this(properties, controllerURL, transport, tenant, null);
}

PinotConnection(String controllerURL, PinotClientTransport transport, String tenant,
PinotControllerTransport controllerTransport) {
this(new Properties(), controllerURL, transport, tenant, controllerTransport);
}

PinotConnection(Properties properties, String controllerURL, PinotClientTransport transport, String tenant,
PinotControllerTransport controllerTransport) {
_closed = false;
_controllerURL = controllerURL;
if (controllerTransport == null) {
Expand All @@ -53,7 +63,7 @@ public class PinotConnection extends AbstractBaseConnection {
_controllerTransport = controllerTransport;
}
List<String> brokers = getBrokerList(controllerURL, tenant);
_session = new org.apache.pinot.client.Connection(brokers, transport);
_session = new org.apache.pinot.client.Connection(properties, brokers, transport);
}

public org.apache.pinot.client.Connection getSession() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public Connection connect(String url, Properties info)

Map<String, String> headers =
info.entrySet().stream().filter(entry -> entry.getKey().toString().startsWith(INFO_HEADERS + ".")).map(
entry -> Pair
.of(entry.getKey().toString().substring(INFO_HEADERS.length() + 1), entry.getValue().toString()))
entry -> Pair
.of(entry.getKey().toString().substring(INFO_HEADERS.length() + 1), entry.getValue().toString()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
if (!headers.isEmpty()) {
factory.setHeaders(headers);
Expand All @@ -68,7 +68,7 @@ public Connection connect(String url, Properties info)
PinotClientTransport pinotClientTransport = factory.buildTransport();
String controllerUrl = DriverUtils.getControllerFromURL(url);
String tenant = info.getProperty(INFO_TENANT, DEFAULT_TENANT);
return new PinotConnection(controllerUrl, pinotClientTransport, tenant);
return new PinotConnection(info, controllerUrl, pinotClientTransport, tenant);
} catch (Exception e) {
throw new SQLException(String.format("Failed to connect to url : %s", url), e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.sql.PreparedStatement;
import java.sql.Time;
import java.sql.Timestamp;
import java.util.Properties;
import org.apache.commons.codec.binary.Hex;
import org.apache.pinot.client.utils.DateTimeUtils;
import org.testng.Assert;
Expand All @@ -41,7 +42,8 @@ public class PinotPreparedStatementTest {
public void testSetAndClearValues()
throws Exception {
PinotConnection connection =
new PinotConnection("dummy", _dummyPinotClientTransport, "dummy", _dummyPinotControllerTransport);
new PinotConnection(new Properties(), "dummy", _dummyPinotClientTransport, "dummy",
_dummyPinotControllerTransport);
PreparedStatement preparedStatement = connection.prepareStatement(QUERY);

preparedStatement.setString(1, "foo");
Expand Down

0 comments on commit c255c94

Please sign in to comment.