diff --git a/plugins/ingestion-kafka/build.gradle b/plugins/ingestion-kafka/build.gradle index c941fd1ba5e49..e81d75c5c5160 100644 --- a/plugins/ingestion-kafka/build.gradle +++ b/plugins/ingestion-kafka/build.gradle @@ -28,7 +28,9 @@ dependencies { // kafka api "org.slf4j:slf4j-api:${versions.slf4j}" api "org.apache.kafka:kafka-clients:${versions.kafka}" - api "org.xerial.snappy:snappy-java:${versions.snappy}" + + // dependencies of kafka-clients + runtimeOnly "org.xerial.snappy:snappy-java:${versions.snappy}" // test testImplementation "com.github.docker-java:docker-java-api:${versions.docker}" diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java index 8e3ac2e9694eb..e7d8e36acb302 100644 --- a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/IngestFromKafkaIT.java @@ -8,7 +8,7 @@ package org.opensearch.plugin.kafka; -import com.carrotsearch.randomizedtesting.annotations.ThreadLeakScope; +import com.carrotsearch.randomizedtesting.annotations.ThreadLeakFilters; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -45,12 +45,7 @@ /** * Integration test for Kafka ingestion */ - -// This test uses a Kafka test container which schedules a watcher daemon thread for monitoring hanging tests. -// The watcher thread sometimes is not stopped on time at the end of test execution resulting in thread leak check -// errors after which they are attempted to be stopped. Since these threads are outside the scope of this test and have -// no good way to stop them, we disable the checks using ThreadLeakScope.Scope.NONE -@ThreadLeakScope(ThreadLeakScope.Scope.NONE) +@ThreadLeakFilters(filters = TestContainerWatchdogThreadLeakFilter.class) public class IngestFromKafkaIT extends OpenSearchIntegTestCase { static final String topicName = "test"; diff --git a/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java new file mode 100644 index 0000000000000..50b88c6233a46 --- /dev/null +++ b/plugins/ingestion-kafka/src/internalClusterTest/java/org/opensearch/plugin/kafka/TestContainerWatchdogThreadLeakFilter.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.kafka; + +import com.carrotsearch.randomizedtesting.ThreadFilter; + +/** + * The {@link org.testcontainers.images.TimeLimitedLoggedPullImageResultCallback} instance used by test containers, + * for example {@link org.testcontainers.containers.KafkaContainer} creates a watcher daemon thread which is never + * stopped. This filter excludes that thread from the thread leak detection logic. + */ +public final class TestContainerWatchdogThreadLeakFilter implements ThreadFilter { + @Override + public boolean reject(Thread t) { + return t.getName().startsWith("testcontainers-pull-watchdog-"); + } +}