Skip to content

Commit

Permalink
[Proxy-Server] Support SNI routing to support various proxy-server in…
Browse files Browse the repository at this point in the history
… pulsar
  • Loading branch information
rdhabalia committed Mar 19, 2020
1 parent e938a35 commit db8d58b
Show file tree
Hide file tree
Showing 12 changed files with 344 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.bookkeeper.util.ZkUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.PulsarServerException;
Expand Down Expand Up @@ -762,7 +763,11 @@ public PulsarClient getReplicationClient(String cluster) {
clientBuilder.serviceUrl(
isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl());
}

if (data.getProxyProtocol() != null && StringUtils.isNotBlank(data.getProxyServiceUrl())) {
clientBuilder.proxyServiceUrl(data.getProxyServiceUrl(), data.getProxyProtocol());
log.info("Configuring proxy-url {} with protocol {}", data.getProxyServiceUrl(),
data.getProxyProtocol());
}
// Share all the IO threads across broker and client connections
ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData();
return new PulsarClientImpl(conf, workerGroup);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

import java.net.UnknownHostException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.Test;

public class ProxyProtocolTest extends TlsProducerConsumerBase {
private static final Logger log = LoggerFactory.getLogger(ProxyProtocolTest.class);

@Test
public void testSniProxyProtocol() throws Exception {

// Client should try to connect to proxy and pass broker-url as SNI header
String proxyUrl = pulsar.getBrokerServiceUrlTls();
String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
String topicName = "persistent://my-property/use/my-ns/my-topic1";

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
.proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);

PulsarClient pulsarClient = clientBuilder.build();

// should be able to create producer successfully
pulsarClient.newProducer().topic(topicName).create();
}

@Test
public void testSniProxyProtocolWithInvalidProxyUrl() throws Exception {

// Client should try to connect to proxy and pass broker-url as SNI header
String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
String proxyHost = "invalid-url";
String proxyUrl = "pulsar+ssl://" + proxyHost + ":5555";
String topicName = "persistent://my-property/use/my-ns/my-topic1";

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
.tlsTrustCertsFilePath(TLS_TRUST_CERT_FILE_PATH).enableTls(true).allowTlsInsecureConnection(false)
.proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);
Map<String, String> authParams = new HashMap<>();
authParams.put("tlsCertFile", TLS_CLIENT_CERT_FILE_PATH);
authParams.put("tlsKeyFile", TLS_CLIENT_KEY_FILE_PATH);
clientBuilder.authentication(AuthenticationTls.class.getName(), authParams);

PulsarClient pulsarClient = clientBuilder.build();

try {
pulsarClient.newProducer().topic(topicName).create();
fail("should have failed due to invalid url");
} catch (PulsarClientException e) {
assertTrue(e.getMessage().contains(proxyHost));
}
}

@Test
public void testSniProxyProtocolWithoutTls() throws Exception {
// Client should try to connect to proxy and pass broker-url as SNI header
String proxyUrl = pulsar.getBrokerServiceUrl();
String brokerServiceUrl = "pulsar+ssl://1.1.1.1:6651";
String topicName = "persistent://my-property/use/my-ns/my-topic1";

ClientBuilder clientBuilder = PulsarClient.builder().serviceUrl(brokerServiceUrl)
.proxyServiceUrl(proxyUrl, ProxyProtocol.SNI).operationTimeout(1000, TimeUnit.MILLISECONDS);

PulsarClient pulsarClient = clientBuilder.build();

try {
pulsarClient.newProducer().topic(topicName).create();
fail("should have failed due to invalid url");
} catch (PulsarClientException e) {
// Ok
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -387,4 +387,14 @@ ClientBuilder authentication(String authPluginClassName, Map<String, String> aut
* @return the client builder instance
*/
ClientBuilder clock(Clock clock);

/**
* Proxy-service url when client would like to connect to broker via proxy. Client can choose type of proxy-routing
* using {@link ProxyProtocol}.
*
* @param proxyServiceUrl proxy service url
* @param proxyProtocol protocol to decide type of proxy routing eg: SNI-routing
* @return
*/
ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.api;

/**
* Protcol type to determine type of proxy routing when client connects to proxy using
* {@link ClientBuilder::proxyServiceUrl}.
*/
public enum ProxyProtocol {
/**
* Follows SNI-routing
* https://docs.trafficserver.apache.org/en/latest/admin-guide/layer-4-routing.en.html#sni-routing.
**/
SNI
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,22 @@ public PulsarClientException(Throwable t) {
super(t);
}

/**
* Constructs an {@code PulsarClientException} with the specified cause.
*
*@param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
*
* @param t
* The cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A null value is permitted,
* and indicates that the cause is nonexistent or unknown.)
*/
public PulsarClientException(String msg, Throwable t) {
super(msg, t);
}

/**
* Invalid Service URL exception thrown by Pulsar client.
*/
Expand All @@ -66,6 +82,21 @@ public static class InvalidServiceURL extends PulsarClientException {
public InvalidServiceURL(Throwable t) {
super(t);
}

/**
* Constructs an {@code InvalidServiceURL} with the specified cause.
*
*@param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
* @param t
* The cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A null value is permitted,
* and indicates that the cause is nonexistent or unknown.)
*/
public InvalidServiceURL(String msg, Throwable t) {
super(msg, t);
}
}

/**
Expand Down Expand Up @@ -94,6 +125,21 @@ public InvalidConfigurationException(String msg) {
public InvalidConfigurationException(Throwable t) {
super(t);
}

/**
* Constructs an {@code InvalidConfigurationException} with the specified cause.
*
*@param msg
* The detail message (which is saved for later retrieval
* by the {@link #getMessage()} method)
* @param t
* The cause (which is saved for later retrieval by the
* {@link #getCause()} method). (A null value is permitted,
* and indicates that the cause is nonexistent or unknown.)
*/
public InvalidConfigurationException(String msg, Throwable t) {
super(msg, t);
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.FailureDomain;

Expand Down Expand Up @@ -69,10 +70,17 @@ private class Create extends CliCommand {
@Parameter(names = "--broker-url-secure", description = "broker-service-url for secure connection", required = false)
private String brokerServiceUrlTls;

@Parameter(names = "--proxy-url", description = "Proxy-service url when client would like to connect to broker via proxy.", required = false)
private String proxyServiceUrl;

@Parameter(names = "--proxy-protocol", description = "protocol to decide type of proxy routing eg: SNI", required = false)
private ProxyProtocol proxyProtocol;

void run() throws PulsarAdminException {
String cluster = getOneArgument(params);
admin.clusters().createCluster(cluster,
new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls));
new ClusterData(serviceUrl, serviceUrlTls, brokerServiceUrl, brokerServiceUrlTls, proxyServiceUrl,
proxyProtocol));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;

Expand All @@ -43,6 +44,12 @@ public class PulsarClientTool {
@Parameter(names = { "--url" }, description = "Broker URL to which to connect.")
String serviceURL = null;

@Parameter(names = { "--proxy-url" }, description = "Proxy-server URL to which to connect.")
String proxyServiceURL = null;

@Parameter(names = { "--proxy-protocol" }, description = "Proxy protocol to select type of routing at proxy.")
ProxyProtocol proxyProtocol = null;

@Parameter(names = { "--auth-plugin" }, description = "Authentication plugin class name.")
String authPluginClassName = null;

Expand Down Expand Up @@ -99,6 +106,13 @@ private void updateConfig() throws UnsupportedAuthenticationException {
clientBuilder.allowTlsInsecureConnection(this.tlsAllowInsecureConnection);
clientBuilder.tlsTrustCertsFilePath(this.tlsTrustCertsFilePath);
clientBuilder.serviceUrl(serviceURL);
if (StringUtils.isNotBlank(proxyServiceURL)) {
if (proxyProtocol == null) {
System.out.println("proxy-protocol must be provided with proxy-url");
System.exit(-1);
}
clientBuilder.proxyServiceUrl(proxyServiceURL, proxyProtocol);
}
this.produceCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
this.consumeCommand.updateConfig(clientBuilder, authentication, this.serviceURL);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
Expand Down Expand Up @@ -230,4 +231,14 @@ public ClientBuilder clock(Clock clock) {
conf.setClock(clock);
return this;
}

@Override
public ClientBuilder proxyServiceUrl(String proxyServiceUrl, ProxyProtocol proxyProtocol) {
if (StringUtils.isNotBlank(proxyServiceUrl) && proxyProtocol == null) {
throw new IllegalArgumentException("proxyProtocol must be present with proxyServiceUrl");
}
conf.setProxyServiceUrl(proxyServiceUrl);
conf.setProxyProtocol(proxyProtocol);
return this;
}
}
Loading

0 comments on commit db8d58b

Please sign in to comment.