From 79d1e5210ea3e30b203f7241180eea1628e9296e Mon Sep 17 00:00:00 2001 From: Jiwei Guo Date: Fri, 18 Nov 2022 12:09:15 +0800 Subject: [PATCH] [improve][admin] PulsarAdminBuilderImpl overrides timeout properties passed through config map (#17375) --- .../broker/admin/AdminApiTlsAuthTest.java | 2 +- .../internal/PulsarAdminBuilderImpl.java | 25 +++------ .../admin/internal/PulsarAdminImpl.java | 54 +++++-------------- .../internal/PulsarAdminBuilderImplTest.java | 22 ++++++++ .../pulsar/admin/cli/PulsarAdminToolTest.java | 15 +----- .../impl/conf/ClientConfigurationData.java | 12 +++++ 6 files changed, 55 insertions(+), 75 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java index 8e9540919b8ee..90a21d78bcd61 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTlsAuthTest.java @@ -480,7 +480,7 @@ public void testCertRefreshForPulsarAdmin() throws Exception { .allowTlsInsecureConnection(false) .enableTlsHostnameVerification(false) .serviceHttpUrl(brokerUrlTls.toString()) - .autoCertRefreshTime(1, TimeUnit.SECONDS) + .autoCertRefreshTime(autoCertRefreshTimeSec, TimeUnit.SECONDS) .authentication("org.apache.pulsar.client.impl.auth.AuthenticationTls", String.format("tlsCertFile:%s,tlsKeyFile:%s", getTLSFile(adminUser + ".cert"), keyFile)) diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java index 5c51ea07add48..98d1c95dc5ca1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java @@ -33,21 +33,12 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { protected ClientConfigurationData conf; - private int connectTimeout = PulsarAdminImpl.DEFAULT_CONNECT_TIMEOUT_SECONDS; - private int readTimeout = PulsarAdminImpl.DEFAULT_READ_TIMEOUT_SECONDS; - private int requestTimeout = PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS; - private int autoCertRefreshTime = PulsarAdminImpl.DEFAULT_CERT_REFRESH_SECONDS; - private TimeUnit connectTimeoutUnit = TimeUnit.SECONDS; - private TimeUnit readTimeoutUnit = TimeUnit.SECONDS; - private TimeUnit requestTimeoutUnit = TimeUnit.SECONDS; - private TimeUnit autoCertRefreshTimeUnit = TimeUnit.SECONDS; + private ClassLoader clientBuilderClassLoader = null; @Override public PulsarAdmin build() throws PulsarClientException { - return new PulsarAdminImpl(conf.getServiceUrl(), conf, connectTimeout, connectTimeoutUnit, readTimeout, - readTimeoutUnit, requestTimeout, requestTimeoutUnit, autoCertRefreshTime, - autoCertRefreshTimeUnit, clientBuilderClassLoader); + return new PulsarAdminImpl(conf.getServiceUrl(), conf, clientBuilderClassLoader); } public PulsarAdminBuilderImpl() { @@ -187,29 +178,25 @@ public PulsarAdminBuilder tlsProtocols(Set tlsProtocols) { @Override public PulsarAdminBuilder connectionTimeout(int connectionTimeout, TimeUnit connectionTimeoutUnit) { - this.connectTimeout = connectionTimeout; - this.connectTimeoutUnit = connectionTimeoutUnit; + this.conf.setConnectionTimeoutMs((int) connectionTimeoutUnit.toMillis(connectionTimeout)); return this; } @Override public PulsarAdminBuilder readTimeout(int readTimeout, TimeUnit readTimeoutUnit) { - this.readTimeout = readTimeout; - this.readTimeoutUnit = readTimeoutUnit; + this.conf.setReadTimeoutMs((int) readTimeoutUnit.toMillis(readTimeout)); return this; } @Override public PulsarAdminBuilder requestTimeout(int requestTimeout, TimeUnit requestTimeoutUnit) { - this.requestTimeout = requestTimeout; - this.requestTimeoutUnit = requestTimeoutUnit; + this.conf.setRequestTimeoutMs((int) requestTimeoutUnit.toMillis(requestTimeout)); return this; } @Override public PulsarAdminBuilder autoCertRefreshTime(int autoCertRefreshTime, TimeUnit autoCertRefreshTimeUnit) { - this.autoCertRefreshTime = autoCertRefreshTime; - this.autoCertRefreshTimeUnit = autoCertRefreshTimeUnit; + this.conf.setAutoCertRefreshSeconds((int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime)); return this; } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index e3ab4b0a063e6..5c04596da7554 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -73,10 +73,7 @@ public class PulsarAdminImpl implements PulsarAdmin { private static final Logger LOG = LoggerFactory.getLogger(PulsarAdmin.class); - public static final int DEFAULT_CONNECT_TIMEOUT_SECONDS = 60; - public static final int DEFAULT_READ_TIMEOUT_SECONDS = 60; public static final int DEFAULT_REQUEST_TIMEOUT_SECONDS = 300; - public static final int DEFAULT_CERT_REFRESH_SECONDS = 300; private final Clusters clusters; private final Brokers brokers; @@ -106,38 +103,11 @@ public class PulsarAdminImpl implements PulsarAdmin { private final Transactions transactions; protected final WebTarget root; protected final Authentication auth; - private final int connectTimeout; - private final TimeUnit connectTimeoutUnit; - private final int readTimeout; - private final TimeUnit readTimeoutUnit; - private final int requestTimeout; - private final TimeUnit requestTimeoutUnit; - - public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigData) throws PulsarClientException { - this(serviceUrl, clientConfigData, DEFAULT_CONNECT_TIMEOUT_SECONDS, TimeUnit.SECONDS, - DEFAULT_READ_TIMEOUT_SECONDS, TimeUnit.SECONDS, DEFAULT_REQUEST_TIMEOUT_SECONDS, TimeUnit.SECONDS, - DEFAULT_CERT_REFRESH_SECONDS, TimeUnit.SECONDS, null); - } - - public PulsarAdminImpl(String serviceUrl, - ClientConfigurationData clientConfigData, - int connectTimeout, - TimeUnit connectTimeoutUnit, - int readTimeout, - TimeUnit readTimeoutUnit, - int requestTimeout, - TimeUnit requestTimeoutUnit, - int autoCertRefreshTime, - TimeUnit autoCertRefreshTimeUnit, - ClassLoader clientBuilderClassLoader) throws PulsarClientException { + + public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigData, + ClassLoader clientBuilderClassLoader) throws PulsarClientException { checkArgument(StringUtils.isNotBlank(serviceUrl), "Service URL needs to be specified"); - this.connectTimeout = connectTimeout; - this.connectTimeoutUnit = connectTimeoutUnit; - this.readTimeout = readTimeout; - this.readTimeoutUnit = readTimeoutUnit; - this.requestTimeout = requestTimeout; - this.requestTimeoutUnit = requestTimeoutUnit; this.clientConfigData = clientConfigData; this.auth = clientConfigData != null ? clientConfigData.getAuthentication() : new AuthenticationDisabled(); LOG.debug("created: serviceUrl={}, authMethodName={}", serviceUrl, @@ -152,7 +122,7 @@ public PulsarAdminImpl(String serviceUrl, } AsyncHttpConnectorProvider asyncConnectorProvider = new AsyncHttpConnectorProvider(clientConfigData, - (int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime)); + clientConfigData.getAutoCertRefreshSeconds()); ClientConfig httpConfig = new ClientConfig(); httpConfig.property(ClientProperties.FOLLOW_REDIRECTS, true); @@ -168,8 +138,8 @@ public PulsarAdminImpl(String serviceUrl, ClientBuilder clientBuilder = ClientBuilder.newBuilder() .withConfig(httpConfig) - .connectTimeout(this.connectTimeout, this.connectTimeoutUnit) - .readTimeout(this.readTimeout, this.readTimeoutUnit) + .connectTimeout(this.clientConfigData.getConnectionTimeoutMs(), TimeUnit.MILLISECONDS) + .readTimeout(this.clientConfigData.getReadTimeoutMs(), TimeUnit.MILLISECONDS) .register(JacksonConfigurator.class).register(JacksonFeature.class); boolean useTls = clientConfigData.getServiceUrl().startsWith("https://"); @@ -181,12 +151,12 @@ public PulsarAdminImpl(String serviceUrl, root = client.target(serviceUri.selectOne()); this.asyncHttpConnector = asyncConnectorProvider.getConnector( - Math.toIntExact(connectTimeoutUnit.toMillis(this.connectTimeout)), - Math.toIntExact(readTimeoutUnit.toMillis(this.readTimeout)), - Math.toIntExact(requestTimeoutUnit.toMillis(this.requestTimeout)), - (int) autoCertRefreshTimeUnit.toSeconds(autoCertRefreshTime)); + Math.toIntExact(clientConfigData.getConnectionTimeoutMs()), + Math.toIntExact(clientConfigData.getReadTimeoutMs()), + Math.toIntExact(clientConfigData.getRequestTimeoutMs()), + clientConfigData.getAutoCertRefreshSeconds()); - long readTimeoutMs = readTimeoutUnit.toMillis(this.readTimeout); + long readTimeoutMs = clientConfigData.getReadTimeoutMs(); this.clusters = new ClustersImpl(root, auth, readTimeoutMs); this.brokers = new BrokersImpl(root, auth, readTimeoutMs); this.brokerStats = new BrokerStatsImpl(root, auth, readTimeoutMs); @@ -228,7 +198,7 @@ public PulsarAdminImpl(String serviceUrl, */ @Deprecated public PulsarAdminImpl(URL serviceUrl, Authentication auth) throws PulsarClientException { - this(serviceUrl.toString(), getConfigData(auth)); + this(serviceUrl.toString(), getConfigData(auth), null); } private static ClientConfigurationData getConfigData(Authentication auth) { diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java index e85e554498dff..54a53236033c9 100644 --- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java @@ -21,8 +21,13 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.fail; import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminBuilder; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.testng.Assert; import org.testng.annotations.Test; +import java.util.HashMap; +import java.util.Map; public class PulsarAdminBuilderImplTest { @@ -35,4 +40,21 @@ public void testAdminBuilderWithServiceUrlNotSet() throws PulsarClientException assertEquals("Service URL needs to be specified", exception.getMessage()); } } + + @Test + public void testGetPropertiesFromConf() throws Exception { + Map config = new HashMap<>(); + config.put("serviceUrl", "pulsar://localhost:6650"); + config.put("requestTimeoutMs", 10); + config.put("autoCertRefreshSeconds", 20); + config.put("connectionTimeoutMs", 30); + config.put("readTimeoutMs", 40); + PulsarAdminBuilder adminBuilder = PulsarAdmin.builder().loadConf(config); + PulsarAdminImpl admin = (PulsarAdminImpl) adminBuilder.build(); + ClientConfigurationData clientConfigData = admin.getClientConfigData(); + Assert.assertEquals(clientConfigData.getRequestTimeoutMs(), 10); + Assert.assertEquals(clientConfigData.getAutoCertRefreshSeconds(), 20); + Assert.assertEquals(clientConfigData.getConnectionTimeoutMs(), 30); + Assert.assertEquals(clientConfigData.getReadTimeoutMs(), 40); + } } diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java index 7b81c2cf4496e..8741af7c2672b 100644 --- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java +++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java @@ -37,7 +37,6 @@ import java.io.ByteArrayOutputStream; import java.io.File; import java.io.PrintStream; -import java.lang.reflect.Field; import java.net.URL; import java.net.URLClassLoader; import java.nio.charset.StandardCharsets; @@ -2182,19 +2181,9 @@ public void requestTimeout() throws Exception { //Ok } + ClientConfigurationData conf = ((PulsarAdminImpl)tool.getPulsarAdminSupplier().get()).getClientConfigData(); - final PulsarAdmin admin = tool.getPulsarAdminSupplier().get(); - Field requestTimeoutField = - PulsarAdminImpl.class.getDeclaredField("requestTimeout"); - requestTimeoutField.setAccessible(true); - int requestTimeout = (int) requestTimeoutField.get(admin); - - Field requestTimeoutUnitField = - PulsarAdminImpl.class.getDeclaredField("requestTimeoutUnit"); - requestTimeoutUnitField.setAccessible(true); - TimeUnit requestTimeoutUnit = (TimeUnit) requestTimeoutUnitField.get(admin); - assertEquals(1, requestTimeout); - assertEquals(TimeUnit.SECONDS, requestTimeoutUnit); + assertEquals(1000, conf.getRequestTimeoutMs()); } @Test diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index b66edf7792a16..338b9196a6651 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -219,6 +219,18 @@ public class ClientConfigurationData implements Serializable, Cloneable { ) private int requestTimeoutMs = 60000; + @ApiModelProperty( + name = "readTimeoutMs", + value = "Maximum read time of a request." + ) + private int readTimeoutMs = 60000; + + @ApiModelProperty( + name = "autoCertRefreshSeconds", + value = "Seconds of auto refreshing certificate." + ) + private int autoCertRefreshSeconds = 300; + @ApiModelProperty( name = "initialBackoffIntervalNanos", value = "Initial backoff interval (in nanosecond)."