diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 39a77326bdee2..b43840f597903 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -516,6 +516,9 @@ + + + @@ -530,6 +533,8 @@ + + diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java index 7f1fe2841a365..a5abeff40ce17 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/MirrorRestServer.java @@ -22,7 +22,9 @@ import org.apache.kafka.connect.runtime.rest.RestClient; import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.RestServerConfig; -import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; +import org.glassfish.hk2.api.TypeLiteral; +import org.glassfish.hk2.utilities.binding.AbstractBinder; +import org.glassfish.jersey.server.ResourceConfig; import java.util.Arrays; import java.util.Collection; @@ -45,15 +47,28 @@ public void initializeInternalResources(Map herders) { } @Override - protected Collection regularResources() { + protected Collection> regularResources() { return Arrays.asList( - new InternalMirrorResource(herders, restClient) + InternalMirrorResource.class ); } @Override - protected Collection adminResources() { + protected Collection> adminResources() { return Collections.emptyList(); } + @Override + protected void configureRegularResources(ResourceConfig resourceConfig) { + resourceConfig.register(new Binder()); + } + + private class Binder extends AbstractBinder { + @Override + protected void configure() { + bind(herders).to(new TypeLiteral>() { }); + bind(restClient).to(RestClient.class); + } + } + } diff --git a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java index 8b5150f56acba..5c46bd9c6c519 100644 --- a/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java +++ b/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/rest/resources/InternalMirrorResource.java @@ -19,10 +19,12 @@ import org.apache.kafka.connect.mirror.SourceAndTarget; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.RestRequestTimeout; import org.apache.kafka.connect.runtime.rest.resources.InternalClusterResource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import javax.ws.rs.NotFoundException; import javax.ws.rs.Path; import javax.ws.rs.core.Context; @@ -39,8 +41,13 @@ public class InternalMirrorResource extends InternalClusterResource { private final Map herders; - public InternalMirrorResource(Map herders, RestClient restClient) { - super(restClient); + @Inject + public InternalMirrorResource( + Map herders, + RestClient restClient, + RestRequestTimeout requestTimeout + ) { + super(restClient, requestTimeout); this.herders = herders; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 3a586ea6186f8..3024e6b3d0316 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -66,10 +66,10 @@ import org.apache.kafka.connect.runtime.isolation.LoaderSwap; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; +import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffset; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.Message; -import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; @@ -718,7 +718,7 @@ public KafkaFuture fenceZombies(String connName, int numTasks, Map { if (error == null) log.debug("Finished fencing out {} task producers for source connector {}", numTasks, connName); @@ -1195,7 +1195,7 @@ void sinkConnectorOffsets(String connName, Connector connector, Map { if (error != null) { @@ -1299,7 +1299,7 @@ void modifySinkConnectorOffsets(String connName, Connector connector, Map, Map> offsets, ClassLoader connectorLoader, Callback cb) { executor.submit(plugins.withClassLoader(connectorLoader, () -> { try { - Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS)); + Timer timer = time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS)); boolean isReset = offsets == null; SinkConnectorConfig sinkConnectorConfig = new SinkConnectorConfig(plugins, connectorConfig); Class sinkConnectorClass = connector.getClass(); @@ -1530,7 +1530,7 @@ void modifySourceConnectorOffsets(String connName, Connector connector, Map cb) { executor.submit(plugins.withClassLoader(connectorLoader, () -> { try { - Timer timer = time.timer(Duration.ofMillis(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS)); + Timer timer = time.timer(Duration.ofMillis(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS)); // This reads to the end of the offsets topic and can be a potentially time-consuming operation offsetStore.start(); updateTimerAndCheckExpiry(timer, "Timed out while trying to read to the end of the offsets topic prior to modifying " + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java index 6cef19c22b8cc..3adbc0f14ec36 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/ConnectRestServer.java @@ -17,12 +17,12 @@ package org.apache.kafka.connect.runtime.rest; import org.apache.kafka.connect.runtime.Herder; -import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource; import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource; import org.apache.kafka.connect.runtime.rest.resources.InternalConnectResource; import org.apache.kafka.connect.runtime.rest.resources.LoggingResource; import org.apache.kafka.connect.runtime.rest.resources.RootResource; +import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.server.ResourceConfig; import java.util.Arrays; @@ -45,25 +45,40 @@ public void initializeResources(Herder herder) { } @Override - protected Collection regularResources() { + protected Collection> regularResources() { return Arrays.asList( - new RootResource(herder), - new ConnectorsResource(herder, config, restClient), - new InternalConnectResource(herder, restClient), - new ConnectorPluginsResource(herder) + RootResource.class, + ConnectorsResource.class, + InternalConnectResource.class, + ConnectorPluginsResource.class ); } @Override - protected Collection adminResources() { + protected Collection> adminResources() { return Arrays.asList( - new LoggingResource(herder) + LoggingResource.class ); } @Override protected void configureRegularResources(ResourceConfig resourceConfig) { registerRestExtensions(herder, resourceConfig); + resourceConfig.register(new Binder()); + } + + private class Binder extends AbstractBinder { + @Override + protected void configure() { + bind(herder).to(Herder.class); + bind(restClient).to(RestClient.class); + bind(config).to(RestServerConfig.class); + } + } + + @Override + protected void configureAdminResources(ResourceConfig resourceConfig) { + resourceConfig.register(new Binder()); } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java index 18c969098b751..bd57dc2c80a1c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/HerderRequestHandler.java @@ -41,18 +41,11 @@ public class HerderRequestHandler { private final RestClient restClient; - private volatile long requestTimeoutMs; + private final RestRequestTimeout requestTimeout; - public HerderRequestHandler(RestClient restClient, long requestTimeoutMs) { + public HerderRequestHandler(RestClient restClient, RestRequestTimeout requestTimeout) { this.restClient = restClient; - this.requestTimeoutMs = requestTimeoutMs; - } - - public void requestTimeoutMs(long requestTimeoutMs) { - if (requestTimeoutMs < 1) { - throw new IllegalArgumentException("REST request timeout must be positive"); - } - this.requestTimeoutMs = requestTimeoutMs; + this.requestTimeout = requestTimeout; } /** @@ -64,7 +57,7 @@ public void requestTimeoutMs(long requestTimeoutMs) { */ public T completeRequest(FutureCallback cb) throws Throwable { try { - return cb.get(requestTimeoutMs, TimeUnit.MILLISECONDS); + return cb.get(requestTimeout.timeoutMs(), TimeUnit.MILLISECONDS); } catch (ExecutionException e) { throw e.getCause(); } catch (StagedTimeoutException e) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java new file mode 100644 index 0000000000000..d2ce28cc472ae --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestRequestTimeout.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.runtime.rest; + +public interface RestRequestTimeout { + + /** + * @return the current timeout that should be used for REST requests, in milliseconds + */ + long timeoutMs(); + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java index 9a5bc6f5a5885..f078b24420e51 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java @@ -27,7 +27,6 @@ import org.apache.kafka.connect.runtime.health.ConnectClusterDetailsImpl; import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl; import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper; -import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; import org.apache.kafka.connect.runtime.rest.util.SSLUtils; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.CustomRequestLog; @@ -43,6 +42,8 @@ import org.eclipse.jetty.servlets.CrossOriginFilter; import org.eclipse.jetty.servlets.HeaderFilter; import org.eclipse.jetty.util.ssl.SslContextFactory; +import org.glassfish.hk2.utilities.Binder; +import org.glassfish.hk2.utilities.binding.AbstractBinder; import org.glassfish.jersey.server.ResourceConfig; import org.glassfish.jersey.server.ServerProperties; import org.glassfish.jersey.servlet.ServletContainer; @@ -59,6 +60,7 @@ import java.util.EnumSet; import java.util.List; import java.util.Locale; +import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -66,6 +68,13 @@ * Embedded server for the REST API that provides the control plane for Kafka Connect workers. */ public abstract class RestServer { + + // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full + // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but + // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases, + // but currently a worker simply leaving the group can take this long as well. + public static final long DEFAULT_REST_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(90); + private static final Logger log = LoggerFactory.getLogger(RestServer.class); // Used to distinguish between Admin connectors and regular REST API connectors when binding admin handlers @@ -80,8 +89,8 @@ public abstract class RestServer { protected final RestServerConfig config; private final ContextHandlerCollection handlers; private final Server jettyServer; + private final RequestTimeout requestTimeout; - private Collection resources; private List connectRestExtensions = Collections.emptyList(); /** @@ -95,6 +104,7 @@ protected RestServer(RestServerConfig config) { jettyServer = new Server(); handlers = new ContextHandlerCollection(); + requestTimeout = new RequestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS); createConnectors(listeners, adminListeners); } @@ -207,44 +217,31 @@ public void initializeServer() { protected final void initializeResources() { log.info("Initializing REST resources"); - resources = new ArrayList<>(); - - ResourceConfig resourceConfig = new ResourceConfig(); - resourceConfig.register(new JacksonJsonProvider()); - Collection regularResources = regularResources(); + ResourceConfig resourceConfig = newResourceConfig(); + Collection> regularResources = regularResources(); regularResources.forEach(resourceConfig::register); - resources.addAll(regularResources); - - resourceConfig.register(ConnectExceptionMapper.class); - resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true); - configureRegularResources(resourceConfig); List adminListeners = config.adminListeners(); ResourceConfig adminResourceConfig; - if (adminListeners == null) { - log.info("Adding admin resources to main listener"); - adminResourceConfig = resourceConfig; - Collection adminResources = adminResources(); - resources.addAll(adminResources); - adminResources.forEach(adminResourceConfig::register); - configureAdminResources(adminResourceConfig); - } else if (adminListeners.size() > 0) { - // TODO: we need to check if these listeners are same as 'listeners' - // TODO: the following code assumes that they are different - log.info("Adding admin resources to admin listener"); - adminResourceConfig = new ResourceConfig(); - adminResourceConfig.register(new JacksonJsonProvider()); - Collection adminResources = adminResources(); - resources.addAll(adminResources); - adminResources.forEach(adminResourceConfig::register); - adminResourceConfig.register(ConnectExceptionMapper.class); - configureAdminResources(adminResourceConfig); - } else { + if (adminListeners != null && adminListeners.isEmpty()) { log.info("Skipping adding admin resources"); // set up adminResource but add no handlers to it adminResourceConfig = resourceConfig; + } else { + if (adminListeners == null) { + log.info("Adding admin resources to main listener"); + adminResourceConfig = resourceConfig; + } else { + // TODO: we need to check if these listeners are same as 'listeners' + // TODO: the following code assumes that they are different + log.info("Adding admin resources to admin listener"); + adminResourceConfig = newResourceConfig(); + } + Collection> adminResources = adminResources(); + adminResources.forEach(adminResourceConfig::register); + configureAdminResources(adminResourceConfig); } ServletContainer servletContainer = new ServletContainer(resourceConfig); @@ -302,17 +299,26 @@ protected final void initializeResources() { log.info("REST resources initialized; server is started and ready to handle requests"); } + private ResourceConfig newResourceConfig() { + ResourceConfig result = new ResourceConfig(); + result.register(new JacksonJsonProvider()); + result.register(requestTimeout.binder()); + result.register(ConnectExceptionMapper.class); + result.property(ServerProperties.WADL_FEATURE_DISABLE, true); + return result; + } + /** - * @return the {@link ConnectResource resources} that should be registered with the + * @return the resources that should be registered with the * standard (i.e., non-admin) listener for this server; may be empty, but not null */ - protected abstract Collection regularResources(); + protected abstract Collection> regularResources(); /** - * @return the {@link ConnectResource resources} that should be registered with the + * @return the resources that should be registered with the * admin listener for this server; may be empty, but not null */ - protected abstract Collection adminResources(); + protected abstract Collection> adminResources(); /** * Pluggable hook to customize the regular (i.e., non-admin) resources on this server @@ -438,7 +444,7 @@ public URI adminUrl() { // For testing only public void requestTimeout(long requestTimeoutMs) { - this.resources.forEach(resource -> resource.requestTimeout(requestTimeoutMs)); + this.requestTimeout.timeoutMs(requestTimeoutMs); } String determineAdvertisedProtocol() { @@ -488,7 +494,7 @@ protected final void registerRestExtensions(Herder herder, ResourceConfig resour config.restExtensions(), config, ConnectRestExtension.class); - long herderRequestTimeoutMs = ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS; + long herderRequestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS; Integer rebalanceTimeoutMs = config.rebalanceTimeoutMs(); @@ -520,4 +526,35 @@ protected void configureHttpResponseHeaderFilter(ServletContextHandler context, headerFilterHolder.setInitParameter("headerConfig", headerConfig); context.addFilter(headerFilterHolder, "/*", EnumSet.of(DispatcherType.REQUEST)); } + + private static class RequestTimeout implements RestRequestTimeout { + + private final RequestBinder binder; + private volatile long timeoutMs; + + public RequestTimeout(long initialTimeoutMs) { + this.timeoutMs = initialTimeoutMs; + this.binder = new RequestBinder(); + } + + @Override + public long timeoutMs() { + return timeoutMs; + } + + public void timeoutMs(long timeoutMs) { + this.timeoutMs = timeoutMs; + } + + public Binder binder() { + return binder; + } + + private class RequestBinder extends AbstractBinder { + @Override + protected void configure() { + bind(RequestTimeout.this).to(RestRequestTimeout.class); + } + } + } } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java deleted file mode 100644 index 49d61a727a955..0000000000000 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectResource.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.connect.runtime.rest.resources; - -import java.util.concurrent.TimeUnit; - -/** - * This interface defines shared logic for all Connect REST resources. - */ -public interface ConnectResource { - - // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full - // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but - // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases, - // but currently a worker simply leaving the group can take this long as well. - long DEFAULT_REST_REQUEST_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(90); - - /** - * Set how long the resource will await the completion of each request before returning a 500 error. - * If the resource does not perform any operations that can be expected to block under reasonable - * circumstances, this can be implemented as a no-op. - * @param requestTimeoutMs the new timeout in milliseconds; must be positive - */ - void requestTimeout(long requestTimeoutMs); - -} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 3c6cce98a0547..7537a6fe1a731 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -22,6 +22,7 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.isolation.PluginDesc; import org.apache.kafka.connect.runtime.isolation.PluginType; +import org.apache.kafka.connect.runtime.rest.RestRequestTimeout; import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos; import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo; import org.apache.kafka.connect.runtime.rest.entities.PluginInfo; @@ -30,6 +31,7 @@ import org.apache.kafka.connect.util.Stage; import org.apache.kafka.connect.util.StagedTimeoutException; +import javax.inject.Inject; import javax.ws.rs.BadRequestException; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; @@ -56,17 +58,18 @@ @Path("/connector-plugins") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class ConnectorPluginsResource implements ConnectResource { +public class ConnectorPluginsResource { private static final String ALIAS_SUFFIX = "Connector"; private final Herder herder; private final Set connectorPlugins; - private long requestTimeoutMs; + private final RestRequestTimeout requestTimeout; - public ConnectorPluginsResource(Herder herder) { + @Inject + public ConnectorPluginsResource(Herder herder, RestRequestTimeout requestTimeout) { this.herder = herder; + this.requestTimeout = requestTimeout; this.connectorPlugins = new LinkedHashSet<>(); - this.requestTimeoutMs = DEFAULT_REST_REQUEST_TIMEOUT_MS; // TODO: improve once plugins are allowed to be added/removed during runtime. addConnectorPlugins(herder.plugins().sinkConnectors()); @@ -83,11 +86,6 @@ private void addConnectorPlugins(Collection> plugins) { .forEach(connectorPlugins::add); } - @Override - public void requestTimeout(long requestTimeoutMs) { - this.requestTimeoutMs = requestTimeoutMs; - } - @PUT @Path("/{pluginName}/config/validate") @Operation(summary = "Validate the provided configuration against the configuration definition for the specified pluginName") @@ -109,7 +107,7 @@ public ConfigInfos validateConfigs( herder.validateConnectorConfig(connectorConfig, validationCallback, false); try { - return validationCallback.get(requestTimeoutMs, TimeUnit.MILLISECONDS); + return validationCallback.get(requestTimeout.timeoutMs(), TimeUnit.MILLISECONDS); } catch (StagedTimeoutException e) { Stage stage = e.stage(); String message; @@ -136,7 +134,6 @@ public ConfigInfos validateConfigs( } @GET - @Path("/") @Operation(summary = "List all connector plugins installed") public List listConnectorPlugins( @DefaultValue("true") @QueryParam("connectorsOnly") @Parameter(description = "Whether to list only connectors instead of all plugins") boolean connectorsOnly diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index 75e510ef9ad5a..3b3a969a97d4a 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -25,6 +25,7 @@ import org.apache.kafka.connect.runtime.RestartRequest; import org.apache.kafka.connect.runtime.rest.HerderRequestHandler; import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.RestRequestTimeout; import org.apache.kafka.connect.runtime.rest.RestServerConfig; import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; @@ -39,6 +40,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import javax.servlet.ServletContext; import javax.ws.rs.BadRequestException; import javax.ws.rs.Consumes; @@ -70,7 +72,7 @@ @Path("/connectors") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class ConnectorsResource implements ConnectResource { +public class ConnectorsResource { private static final Logger log = LoggerFactory.getLogger(ConnectorsResource.class); private final Herder herder; @@ -80,20 +82,20 @@ public class ConnectorsResource implements ConnectResource { private final boolean isTopicTrackingDisabled; private final boolean isTopicTrackingResetDisabled; - public ConnectorsResource(Herder herder, RestServerConfig config, RestClient restClient) { + @Inject + public ConnectorsResource( + Herder herder, + RestServerConfig config, + RestClient restClient, + RestRequestTimeout requestTimeout + ) { this.herder = herder; - this.requestHandler = new HerderRequestHandler(restClient, DEFAULT_REST_REQUEST_TIMEOUT_MS); + this.requestHandler = new HerderRequestHandler(restClient, requestTimeout); this.isTopicTrackingDisabled = !config.topicTrackingEnabled(); this.isTopicTrackingResetDisabled = !config.topicTrackingResetEnabled(); } - @Override - public void requestTimeout(long requestTimeoutMs) { - requestHandler.requestTimeoutMs(requestTimeoutMs); - } - @GET - @Path("/") @Operation(summary = "List all active connectors") public Response listConnectors( final @Context UriInfo uriInfo, @@ -131,7 +133,6 @@ public Response listConnectors( } @POST - @Path("/") @Operation(summary = "Create a new connector") public Response createConnector(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, final @Context HttpHeaders headers, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java index c7bef991b4149..5ee1d232a2908 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalClusterResource.java @@ -24,6 +24,7 @@ import org.apache.kafka.connect.runtime.rest.HerderRequestHandler; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.RestRequestTimeout; import org.apache.kafka.connect.util.FutureCallback; import javax.ws.rs.POST; @@ -45,7 +46,7 @@ * requests that originate from a user and are forwarded from one worker to another. */ @Produces(MediaType.APPLICATION_JSON) -public abstract class InternalClusterResource implements ConnectResource { +public abstract class InternalClusterResource { private static final TypeReference>> TASK_CONFIGS_TYPE = new TypeReference>>() { }; @@ -56,13 +57,8 @@ public abstract class InternalClusterResource implements ConnectResource { @Context UriInfo uriInfo; - protected InternalClusterResource(RestClient restClient) { - this.requestHandler = new HerderRequestHandler(restClient, DEFAULT_REST_REQUEST_TIMEOUT_MS); - } - - @Override - public void requestTimeout(long requestTimeoutMs) { - requestHandler.requestTimeoutMs(requestTimeoutMs); + protected InternalClusterResource(RestClient restClient, RestRequestTimeout requestTimeout) { + this.requestHandler = new HerderRequestHandler(restClient, requestTimeout); } /** diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java index c4987150dfb02..228c7cd67baf6 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResource.java @@ -18,7 +18,9 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.RestRequestTimeout; +import javax.inject.Inject; import javax.ws.rs.Path; @Path("/connectors") @@ -26,8 +28,9 @@ public class InternalConnectResource extends InternalClusterResource { private final Herder herder; - public InternalConnectResource(Herder herder, RestClient restClient) { - super(restClient); + @Inject + public InternalConnectResource(Herder herder, RestClient restClient, RestRequestTimeout requestTimeout) { + super(restClient, requestTimeout); this.herder = herder; } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java index b215fe72adb49..bbf5bfa0d55ad 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java @@ -25,6 +25,7 @@ import org.apache.log4j.Level; import org.slf4j.LoggerFactory; +import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.DefaultValue; import javax.ws.rs.GET; @@ -46,7 +47,7 @@ @Path("/admin/loggers") @Produces(MediaType.APPLICATION_JSON) @Consumes(MediaType.APPLICATION_JSON) -public class LoggingResource implements ConnectResource { +public class LoggingResource { private static final org.slf4j.Logger log = LoggerFactory.getLogger(LoggingResource.class); @@ -55,22 +56,17 @@ public class LoggingResource implements ConnectResource { private final Herder herder; + @Inject public LoggingResource(Herder herder) { this.herder = herder; } - @Override - public void requestTimeout(long requestTimeoutMs) { - // No-op - } - /** * List the current loggers that have their levels explicitly set and their log levels. * * @return a list of current loggers and their levels. */ @GET - @Path("/") @Operation(summary = "List the current loggers that have their levels explicitly set and their log levels") public Response listLoggers() { return Response.ok(herder.allLoggerLevels()).build(); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java index fe09e26903924..b112c59b82049 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java @@ -20,6 +20,7 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; +import javax.inject.Inject; import javax.ws.rs.GET; import javax.ws.rs.Path; import javax.ws.rs.Produces; @@ -27,21 +28,16 @@ @Path("/") @Produces(MediaType.APPLICATION_JSON) -public class RootResource implements ConnectResource { +public class RootResource { private final Herder herder; + @Inject public RootResource(Herder herder) { this.herder = herder; } - @Override - public void requestTimeout(long requestTimeoutMs) { - // No-op - } - @GET - @Path("/") @Operation(summary = "Get details about this Connect worker and the id of the Kafka cluster it is connected to") public ServerInfo serverInfo() { return new ServerInfo(herder.kafkaClusterId()); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java index 03ace3b570600..6655e5a01ca88 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java @@ -27,7 +27,6 @@ import org.apache.kafka.connect.connector.Task; import org.apache.kafka.connect.runtime.Worker; import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; -import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; @@ -63,6 +62,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG; import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG; import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG; +import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -379,7 +379,7 @@ private void assertRequestTimesOut(String requestDescription, ThrowingRunnable r ); } // Reset the REST request timeout so that other requests aren't impacted - connect.requestTimeout(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS); + connect.requestTimeout(DEFAULT_REST_REQUEST_TIMEOUT_MS); } private static class Block { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java index 6e49120343f9f..ec4d256c6e6ea 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java @@ -63,7 +63,7 @@ import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG; import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG; -import static org.apache.kafka.connect.runtime.rest.resources.ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS; +import static org.apache.kafka.connect.runtime.rest.RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; import static org.apache.kafka.connect.util.clusters.ConnectAssertions.CONNECTOR_SETUP_DURATION_MS; import static org.apache.kafka.test.TestUtils.waitForCondition; import static org.hamcrest.CoreMatchers.containsString; diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 6a34fad6e9383..a03378b13803b 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -61,10 +61,10 @@ import org.apache.kafka.connect.runtime.isolation.PluginClassLoader; import org.apache.kafka.connect.runtime.isolation.Plugins; import org.apache.kafka.connect.runtime.isolation.Plugins.ClassLoaderUsage; +import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets; import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo; import org.apache.kafka.connect.runtime.rest.entities.Message; -import org.apache.kafka.connect.runtime.rest.resources.ConnectResource; import org.apache.kafka.connect.runtime.standalone.StandaloneConfig; import org.apache.kafka.connect.sink.SinkConnector; import org.apache.kafka.connect.sink.SinkRecord; @@ -2417,7 +2417,7 @@ public void testResetOffsetsSinkConnector() throws Exception { // Expect the call to Admin::deleteConsumerGroups to have a timeout value equal to the overall timeout value of DEFAULT_REST_REQUEST_TIMEOUT_MS // minus the delay introduced in the call to Admin::listConsumerGroupOffsets (2000 ms) and the delay introduced in the call to // SinkConnector::alterOffsets (3000 ms) - assertEquals((int) ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS - 2000L - 3000L, + assertEquals((int) RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS - 2000L - 3000L, deleteConsumerGroupsOptionsArgumentCaptor.getValue().timeoutMs().intValue()); verify(admin, timeout(1000)).close(); verifyKafkaClusterId(); @@ -2469,7 +2469,7 @@ public void testModifySourceConnectorOffsetsTimeout() throws Exception { when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); when(sourceConnector.alterOffsets(eq(connectorProps), anyMap())).thenAnswer(invocation -> { - time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000); + time.sleep(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000); return true; }); ConnectorOffsetBackingStore offsetStore = mock(ConnectorOffsetBackingStore.class); @@ -2507,7 +2507,7 @@ public void testModifyOffsetsSinkConnectorTimeout() throws Exception { when(plugins.withClassLoader(any(ClassLoader.class), any(Runnable.class))).thenAnswer(AdditionalAnswers.returnsSecondArg()); when(sinkConnector.alterOffsets(eq(connectorProps), anyMap())).thenAnswer(invocation -> { - time.sleep(ConnectResource.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000); + time.sleep(RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS + 1000); return true; }); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java index f2978678bbfff..1e06a77645313 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/ConnectRestServerTest.java @@ -40,6 +40,7 @@ import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.Mock; import org.mockito.junit.MockitoJUnitRunner; import org.slf4j.LoggerFactory; @@ -62,13 +63,13 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doReturn; -import static org.mockito.Mockito.mock; @RunWith(MockitoJUnitRunner.StrictStubs.class) public class ConnectRestServerTest { - private Herder herder; - private Plugins plugins; + @Mock private RestClient restClient; + @Mock private Herder herder; + @Mock private Plugins plugins; private ConnectRestServer server; private CloseableHttpClient httpClient; private Collection responses = new ArrayList<>(); @@ -77,8 +78,6 @@ public class ConnectRestServerTest { @Before public void setUp() { - herder = mock(Herder.class); - plugins = mock(Plugins.class); httpClient = HttpClients.createMinimal(); } @@ -117,7 +116,7 @@ public void testAdvertisedUri() { Map configMap = new HashMap<>(baseServerProps()); configMap.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443"); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); Assert.assertEquals("http://localhost:8080/", server.advertisedUrl().toString()); server.stop(); @@ -126,7 +125,7 @@ public void testAdvertisedUri() { configMap.put(RestServerConfig.LISTENERS_CONFIG, "http://localhost:8080,https://localhost:8443"); configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "https"); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString()); server.stop(); @@ -134,7 +133,7 @@ public void testAdvertisedUri() { configMap = new HashMap<>(baseServerProps()); configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://localhost:8443"); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); Assert.assertEquals("https://localhost:8443/", server.advertisedUrl().toString()); server.stop(); @@ -145,7 +144,7 @@ public void testAdvertisedUri() { configMap.put(RestServerConfig.REST_ADVERTISED_HOST_NAME_CONFIG, "somehost"); configMap.put(RestServerConfig.REST_ADVERTISED_PORT_CONFIG, "10000"); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); Assert.assertEquals("http://somehost:10000/", server.advertisedUrl().toString()); server.stop(); @@ -154,7 +153,7 @@ public void testAdvertisedUri() { configMap.put(RestServerConfig.LISTENERS_CONFIG, "https://encrypted-localhost:42069,http://plaintext-localhost:4761"); configMap.put(RestServerConfig.REST_ADVERTISED_LISTENER_CONFIG, "http"); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); Assert.assertEquals("http://plaintext-localhost:4761/", server.advertisedUrl().toString()); server.stop(); } @@ -167,7 +166,7 @@ public void testOptionsDoesNotIncludeWadlOutput() throws IOException { doReturn(plugins).when(herder).plugins(); expectEmptyRestExtensions(); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); server.initializeServer(); server.initializeResources(herder); @@ -194,7 +193,7 @@ public void checkCORSRequest(String corsDomain, String origin, String expectedHe expectEmptyRestExtensions(); doReturn(Arrays.asList("a", "b")).when(herder).connectors(); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); server.initializeServer(); server.initializeResources(herder); URI serverUrl = server.advertisedUrl(); @@ -237,7 +236,7 @@ public void testStandaloneConfig() throws IOException { expectEmptyRestExtensions(); doReturn(Arrays.asList("a", "b")).when(herder).connectors(); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); server.initializeServer(); server.initializeResources(herder); HttpRequest request = new HttpGet("/connectors"); @@ -260,7 +259,7 @@ public void testLoggerEndpointWithDefaults() throws IOException { doReturn(Collections.emptyList()).when(herder).setWorkerLoggerLevel(logger, loggingLevel); doReturn(Collections.singletonMap(logger, new LoggerLevel(loggingLevel, lastModified))).when(herder).allLoggerLevels(); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); server.initializeServer(); server.initializeResources(herder); @@ -295,7 +294,7 @@ public void testIndependentAdminEndpoint() throws IOException { LoggerFactory.getLogger("a.b.c.p.Y"); LoggerFactory.getLogger("a.b.c.p.Z"); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); server.initializeServer(); server.initializeResources(herder); @@ -317,7 +316,7 @@ public void testDisableAdminEndpoint() throws IOException { doReturn(plugins).when(herder).plugins(); expectEmptyRestExtensions(); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); server.initializeServer(); server.initializeResources(herder); @@ -336,7 +335,7 @@ public void testRequestLogs() throws IOException, InterruptedException { doReturn(plugins).when(herder).plugins(); expectEmptyRestExtensions(); - server = new ConnectRestServer(null, null, configMap); + server = new ConnectRestServer(null, restClient, configMap); server.initializeServer(); server.initializeResources(herder); @@ -382,7 +381,7 @@ private void checkCustomizedHttpResponseHeaders(String headerConfig, Map RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; + connectorPluginsResource = new ConnectorPluginsResource(herder, requestTimeout); } @Test diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java index aed081cf4d6f4..3395690b7ad04 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java @@ -28,6 +28,8 @@ import org.apache.kafka.connect.runtime.distributed.NotLeaderException; import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException; import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.RestRequestTimeout; +import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.runtime.rest.RestServerConfig; import org.apache.kafka.connect.runtime.rest.entities.ActiveTopicsInfo; import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; @@ -146,6 +148,8 @@ public class ConnectorsResourceTest { private static final Set CONNECTOR2_ACTIVE_TOPICS = new HashSet<>( Arrays.asList("foo_topic", "baz_topic")); + private static final RestRequestTimeout REQUEST_TIMEOUT = () -> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS; + @Mock private Herder herder; private ConnectorsResource connectorsResource; @@ -159,7 +163,7 @@ public class ConnectorsResourceTest { public void setUp() throws NoSuchMethodException { when(serverConfig.topicTrackingEnabled()).thenReturn(true); when(serverConfig.topicTrackingResetEnabled()).thenReturn(true); - connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); + connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT); forward = mock(UriInfo.class); MultivaluedMap queryParams = new MultivaluedHashMap<>(); queryParams.putSingle("forward", "true"); @@ -742,7 +746,7 @@ public void testRestartTaskOwnerRedirect() throws Throwable { public void testConnectorActiveTopicsWithTopicTrackingDisabled() { when(serverConfig.topicTrackingEnabled()).thenReturn(false); when(serverConfig.topicTrackingResetEnabled()).thenReturn(false); - connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); + connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME)); @@ -754,7 +758,7 @@ public void testResetConnectorActiveTopicsWithTopicTrackingDisabled() { when(serverConfig.topicTrackingEnabled()).thenReturn(false); when(serverConfig.topicTrackingResetEnabled()).thenReturn(true); HttpHeaders headers = mock(HttpHeaders.class); - connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); + connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers)); @@ -766,7 +770,7 @@ public void testResetConnectorActiveTopicsWithTopicTrackingEnabled() { when(serverConfig.topicTrackingEnabled()).thenReturn(true); when(serverConfig.topicTrackingResetEnabled()).thenReturn(false); HttpHeaders headers = mock(HttpHeaders.class); - connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); + connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT); Exception e = assertThrows(ConnectRestException.class, () -> connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers)); @@ -779,7 +783,7 @@ public void testConnectorActiveTopics() { when(serverConfig.topicTrackingResetEnabled()).thenReturn(true); when(herder.connectorActiveTopics(CONNECTOR_NAME)) .thenReturn(new ActiveTopicsInfo(CONNECTOR_NAME, CONNECTOR_ACTIVE_TOPICS)); - connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); + connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT); Response response = connectorsResource.getConnectorActiveTopics(CONNECTOR_NAME); assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); @@ -792,7 +796,7 @@ public void testConnectorActiveTopics() { @Test public void testResetConnectorActiveTopics() { HttpHeaders headers = mock(HttpHeaders.class); - connectorsResource = new ConnectorsResource(herder, serverConfig, restClient); + connectorsResource = new ConnectorsResource(herder, serverConfig, restClient, REQUEST_TIMEOUT); Response response = connectorsResource.resetConnectorActiveTopics(CONNECTOR_NAME, headers); verify(herder).resetConnectorActiveTopics(CONNECTOR_NAME); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java index 5bf33bf530061..0dff57fb59367 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/InternalConnectResourceTest.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.InternalRequestSignature; import org.apache.kafka.connect.runtime.rest.RestClient; +import org.apache.kafka.connect.runtime.rest.RestServer; import org.apache.kafka.connect.util.Callback; import org.junit.Before; import org.junit.Test; @@ -74,7 +75,7 @@ public class InternalConnectResourceTest { @Before public void setup() { - internalResource = new InternalConnectResource(herder, restClient); + internalResource = new InternalConnectResource(herder, restClient, () -> RestServer.DEFAULT_REST_REQUEST_TIMEOUT_MS); internalResource.uriInfo = uriInfo; }