From abbac5131db745b84b6bcccb3fc16d334fb04e1b Mon Sep 17 00:00:00 2001 From: Dave Maughan Date: Thu, 7 Jul 2022 13:07:17 +0100 Subject: [PATCH] [feature][client] PIP-184: Topic specific consumer priorityLevel Resolves #16481 --- .../pulsar/client/api/ConsumerBuilder.java | 16 +++++ .../client/api/TopicConsumerBuilder.java | 47 +++++++++++++++ .../client/impl/ConsumerBuilderImpl.java | 17 ++++++ .../pulsar/client/impl/ConsumerImpl.java | 5 +- .../client/impl/TopicConsumerBuilderImpl.java | 43 +++++++++++++ .../impl/conf/ConsumerConfigurationData.java | 16 +++++ .../conf/TopicConsumerConfigurationData.java | 48 +++++++++++++++ .../client/impl/ConsumerBuilderImplTest.java | 31 ++++++++++ .../pulsar/client/impl/ConsumerImplTest.java | 28 +++++++-- .../impl/TopicConsumerBuilderImplTest.java | 55 +++++++++++++++++ .../conf/ConsumerConfigurationDataTest.java | 48 +++++++++++++++ .../TopicConsumerConfigurationDataTest.java | 60 +++++++++++++++++++ 12 files changed, 409 insertions(+), 5 deletions(-) create mode 100644 pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java create mode 100644 pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java create mode 100644 pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java index e303e55538ea58..e20246aeaf1bf0 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ConsumerBuilder.java @@ -813,4 +813,20 @@ public interface ConsumerBuilder extends Cloneable { * @param enabled whether to enable AutoScaledReceiverQueueSize. */ ConsumerBuilder autoScaledReceiverQueueSizeEnabled(boolean enabled); + + /** + * Configure topic specific options to override those set at the {@link ConsumerBuilder} level. + * + * @param topicNameOrPattern a topic name or a regular expression to match a topic name + * @return a {@link TopicConsumerBuilder} instance + */ + TopicConsumerBuilder topicConfiguration(String topicNameOrPattern); + + /** + * Configure topic specific options to override those set at the {@link ConsumerBuilder} level. + * + * @param topicNameOrPattern a topic name or a regular expression to match a topic name + * @param builderConsumer a consumer to allow the configuration of the {@link TopicConsumerBuilder} instance + */ + ConsumerBuilder topicConfiguration(String topicNameOrPattern, java.util.function.Consumer> builderConsumer); } diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java new file mode 100644 index 00000000000000..5096f52577625e --- /dev/null +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TopicConsumerBuilder.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +/** + * {@link TopicConsumerBuilder} is used to configure topic specific options to override those set at the + * {@link ConsumerBuilder} level. + * + * @see ConsumerBuilder#topicConfiguration(String) + * + * @param the type of the value in the {@link ConsumerBuilder} + */ +public interface TopicConsumerBuilder { + /** + * Configure the priority level of this topic. + * + * @see ConsumerBuilder#priorityLevel(int) + * + * @param priorityLevel the priority of this topic + * @return the {@link TopicConsumerBuilder} instance + */ + TopicConsumerBuilder priorityLevel(int priorityLevel); + + /** + * Complete the configuration of the topic specific options and return control back to the + * {@link ConsumerBuilder} instance. + * + * @return the {@link ConsumerBuilder} instance + */ + ConsumerBuilder build(); +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java index 616ed86abb8a2e..98ccab210f0040 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerBuilderImpl.java @@ -52,8 +52,10 @@ import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.api.TopicConsumerBuilder; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; import org.apache.pulsar.client.util.RetryMessageUtil; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.partition.PartitionedTopicMetadata; @@ -537,4 +539,19 @@ public ConsumerBuilder autoScaledReceiverQueueSizeEnabled(boolean enabled) { conf.setAutoScaledReceiverQueueSizeEnabled(enabled); return this; } + + @Override + public TopicConsumerBuilder topicConfiguration(String topicNameOrPattern) { + checkArgument(StringUtils.isNotBlank(topicNameOrPattern), "topicNameOrPattern cannot be blank"); + TopicConsumerConfigurationData topicConf = TopicConsumerConfigurationData.of(topicNameOrPattern, conf); + conf.getTopicConfigurations().add(topicConf); + return new TopicConsumerBuilderImpl<>(this, topicConf); + } + + @Override + public ConsumerBuilder topicConfiguration(String topicNameOrPattern, + java.util.function.Consumer> builderConsumer) { + builderConsumer.accept(topicConfiguration(topicNameOrPattern)); + return this; + } } diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index 16eb49b1af790e..fefbc660650af9 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -61,6 +61,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.function.Function; import java.util.stream.Collectors; +import lombok.AccessLevel; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.ConsumerCryptoFailureAction; @@ -146,6 +148,7 @@ public class ConsumerImpl extends ConsumerBase implements ConnectionHandle private final NegativeAcksTracker negativeAcksTracker; protected final ConsumerStatsRecorder stats; + @Getter(AccessLevel.PACKAGE) private final int priorityLevel; private final SubscriptionMode subscriptionMode; private volatile BatchMessageIdImpl startMessageId; @@ -266,7 +269,7 @@ protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurat this.partitionIndex = partitionIndex; this.hasParentConsumer = hasParentConsumer; this.parentConsumerHasListener = parentConsumerHasListener; - this.priorityLevel = conf.getPriorityLevel(); + this.priorityLevel = conf.getMatchingTopicConfiguration(topic).getPriorityLevel(); this.readCompacted = conf.isReadCompacted(); this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition(); this.negativeAcksTracker = new NegativeAcksTracker(this, conf); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java new file mode 100644 index 00000000000000..33f91366584ad3 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImpl.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static com.google.common.base.Preconditions.checkArgument; +import lombok.RequiredArgsConstructor; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.api.TopicConsumerBuilder; +import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; + +@RequiredArgsConstructor +class TopicConsumerBuilderImpl implements TopicConsumerBuilder { + private final ConsumerBuilder consumerBuilder; + private final TopicConsumerConfigurationData topicConf; + + @Override + public TopicConsumerBuilder priorityLevel(int priorityLevel) { + checkArgument(priorityLevel >= 0, "priorityLevel needs to be >= 0"); + topicConf.setPriorityLevel(priorityLevel); + return this; + } + + @Override + public ConsumerBuilder build() { + return consumerBuilder; + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 6c22d143a6f06d..92a439ae09398d 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -22,6 +22,8 @@ import com.fasterxml.jackson.annotation.JsonIgnore; import com.google.common.collect.Sets; import java.io.Serializable; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; @@ -164,6 +166,20 @@ public int getMaxPendingChuckedMessage() { private boolean autoScaledReceiverQueueSizeEnabled = false; + private List topicConfigurations = new ArrayList<>(); + + public TopicConsumerConfigurationData getMatchingTopicConfiguration(String topicName) { + return topicConfigurations.stream() + .filter(topicConf -> topicConf.matchesTopicName(topicName)) + .findFirst() + .orElseGet(() -> TopicConsumerConfigurationData.of(topicName, this)); + } + + public void setTopicConfigurations(List topicConfigurations) { + checkArgument(topicConfigurations != null, "topicConfigurations should not be null."); + this.topicConfigurations = topicConfigurations; + } + public void setAutoUpdatePartitionsIntervalSeconds(int interval, TimeUnit timeUnit) { checkArgument(interval > 0, "interval needs to be > 0"); this.autoUpdatePartitionsIntervalSeconds = timeUnit.toSeconds(interval); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java new file mode 100644 index 00000000000000..2037cec6a2b226 --- /dev/null +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationData.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import java.io.Serializable; +import java.util.regex.Pattern; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TopicConsumerConfigurationData implements Serializable { + private static final long serialVersionUID = 1L; + + private Pattern topicsPattern; + private int priorityLevel; + + public boolean matchesTopicName(String topicName) { + return topicsPattern.matcher(topicName).matches(); + } + + public static TopicConsumerConfigurationData of(String topicNameOrPattern, ConsumerConfigurationData conf) { + return of(topicNameOrPattern, conf.getPriorityLevel()); + } + + public static TopicConsumerConfigurationData of(String topicNameOrPattern, int priorityLevel) { + Pattern topicsPattern = Pattern.compile(topicNameOrPattern); + return new TopicConsumerConfigurationData(topicsPattern, priorityLevel); + } +} diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index ff60caca2c19a4..672eba746a576d 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -18,10 +18,13 @@ */ package org.apache.pulsar.client.impl; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.testng.Assert.assertNotNull; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -31,13 +34,16 @@ import java.util.regex.Pattern; import org.apache.pulsar.client.api.BatchReceivePolicy; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.ConsumerBuilder; import org.apache.pulsar.client.api.DeadLetterPolicy; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.apache.pulsar.client.api.SubscriptionMode; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; import org.testng.annotations.Test; /** @@ -338,4 +344,29 @@ public void testStartPaused() { consumerBuilderImpl.startPaused(true); verify(consumerBuilderImpl.getConf()).setStartPaused(true); } + + @Test + public void testTopicConsumerBuilder() { + List topicConsumerConfigurationDataList = new ArrayList<>(); + when(consumerBuilderImpl.getConf().getTopicConfigurations()).thenReturn(topicConsumerConfigurationDataList); + + ConsumerBuilder consumerBuilder = consumerBuilderImpl.topicConfiguration("foo").priorityLevel(1).build(); + + assertThat(consumerBuilder).isSameAs(consumerBuilderImpl); + assertThat(topicConsumerConfigurationDataList).hasSize(1); + TopicConsumerConfigurationData topicConsumerConfigurationData = topicConsumerConfigurationDataList.get(0); + assertThat(topicConsumerConfigurationData.getTopicsPattern().pattern()).isEqualTo("foo"); + assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1); + } + + @DataProvider(name = "nullOrBlankTopicPatterns") + public Object[] nullOrBlankTopicPatterns() { + return new Object[]{" ", "", null}; + } + + @Test(dataProvider = "nullOrBlankTopicPatterns") + public void testTopicConsumerBuilderBlankPattern(String topicNameOrPattern) { + assertThatIllegalArgumentException() + .isThrownBy(() -> consumerBuilderImpl.topicConfiguration(topicNameOrPattern)); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java index fcea9d490707db..9f95e4ffdbb67f 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.client.impl; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.Mockito.any; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; @@ -26,6 +27,8 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import java.util.Collections; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.ExecutorService; @@ -39,6 +42,7 @@ import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; import org.apache.pulsar.client.util.ExecutorProvider; import org.awaitility.Awaitility; import org.testng.Assert; @@ -47,23 +51,28 @@ import org.testng.annotations.Test; public class ConsumerImplTest { + private final String topic = "non-persistent://tenant/ns1/my-topic"; private ExecutorProvider executorProvider; private ExecutorService internalExecutor; private ConsumerImpl consumer; - private ConsumerConfigurationData consumerConf; + private ConsumerConfigurationData consumerConf; @BeforeMethod(alwaysRun = true) public void setUp() { + consumerConf = new ConsumerConfigurationData<>(); + createConsumer(consumerConf); + } + + private void createConsumer(ConsumerConfigurationData consumerConf) { executorProvider = new ExecutorProvider(1, "ConsumerImplTest"); internalExecutor = Executors.newSingleThreadScheduledExecutor(); - consumerConf = new ConsumerConfigurationData<>(); + PulsarClientImpl client = ClientTestFixtures.createPulsarClientMock(executorProvider, internalExecutor); ClientConfigurationData clientConf = client.getConfiguration(); clientConf.setOperationTimeoutMs(100); clientConf.setStatsIntervalSeconds(0); - CompletableFuture> subscribeFuture = new CompletableFuture<>(); - String topic = "non-persistent://tenant/ns1/my-topic"; + CompletableFuture> subscribeFuture = new CompletableFuture<>(); consumerConf.setSubscriptionName("test-sub"); consumer = ConsumerImpl.newConsumerImpl(client, topic, consumerConf, @@ -239,4 +248,15 @@ public void testMaxReceiverQueueSize() { Assert.assertEquals(consumer.getCurrentReceiverQueueSize(), size + 100); Assert.assertEquals(consumer.getAvailablePermits(), permits + 100); } + + @Test + public void testTopicPriorityLevel() { + ConsumerConfigurationData consumerConf = new ConsumerConfigurationData<>(); + consumerConf.getTopicConfigurations().add( + TopicConsumerConfigurationData.of(topic, 1)); + + createConsumer(consumerConf); + + assertThat(consumer.getPriorityLevel()).isEqualTo(1); + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java new file mode 100644 index 00000000000000..cfc2380cebe347 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/TopicConsumerBuilderImplTest.java @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl; + +import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import org.apache.pulsar.client.api.ConsumerBuilder; +import org.apache.pulsar.client.impl.conf.TopicConsumerConfigurationData; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +public class TopicConsumerBuilderImplTest { + private TopicConsumerConfigurationData topicConsumerConfigurationData; + private TopicConsumerBuilderImpl topicConsumerBuilderImpl; + + @SuppressWarnings("unchecked") + @BeforeMethod(alwaysRun = true) + public void setup() { + ConsumerBuilder consumerBuilder = mock(ConsumerBuilder.class); + topicConsumerConfigurationData = mock(TopicConsumerConfigurationData.class); + topicConsumerBuilderImpl = new TopicConsumerBuilderImpl<>(consumerBuilder, topicConsumerConfigurationData); + } + + @Test + public void testInvalidPriorityLevel() { + assertThatIllegalArgumentException() + .isThrownBy(() -> topicConsumerBuilderImpl.priorityLevel(-1)); + verify(topicConsumerConfigurationData, never()).setPriorityLevel(anyInt()); + } + + @Test + public void testValidPriorityLevel() { + topicConsumerBuilderImpl.priorityLevel(0); + verify(topicConsumerConfigurationData).setPriorityLevel(0); + } +} \ No newline at end of file diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java new file mode 100644 index 00000000000000..04f7ed4cc0d075 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationDataTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import static org.assertj.core.api.Assertions.assertThat; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class ConsumerConfigurationDataTest { + @DataProvider(name = "topicConf") + public Object[][] topicConf() { + return new Object[][] { + new Object[] {"foo", "^foo$", 2}, + new Object[] {"bar", "bar", 1} + }; + } + + @Test(dataProvider = "topicConf") + public void testTopicConsumerConfigurationData(String topicName, String expectedTopicPattern, int expectedPriority) { + ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData<>(); + consumerConfigurationData.setPriorityLevel(1); + + consumerConfigurationData.getTopicConfigurations() + .add(TopicConsumerConfigurationData.of("^foo$", 2)); + + TopicConsumerConfigurationData topicConsumerConfigurationData = + consumerConfigurationData.getMatchingTopicConfiguration(topicName); + + assertThat(topicConsumerConfigurationData.getTopicsPattern().pattern()).isEqualTo(expectedTopicPattern); + assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(expectedPriority); + } +} \ No newline at end of file diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java new file mode 100644 index 00000000000000..7a0f87a25d0380 --- /dev/null +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/TopicConsumerConfigurationDataTest.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.conf; + +import static org.assertj.core.api.Assertions.assertThat; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class TopicConsumerConfigurationDataTest { + @Test + public void test() { + TopicConsumerConfigurationData topicConsumerConfigurationData = TopicConsumerConfigurationData + .of("foo", 1); + + assertThat(topicConsumerConfigurationData.getTopicsPattern().pattern()).isEqualTo("foo"); + assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1); + } + + @Test + public void test2() { + ConsumerConfigurationData consumerConfigurationData = new ConsumerConfigurationData<>(); + consumerConfigurationData.setPriorityLevel(1); + TopicConsumerConfigurationData topicConsumerConfigurationData = TopicConsumerConfigurationData + .of("foo", consumerConfigurationData); + + assertThat(topicConsumerConfigurationData.getTopicsPattern().pattern()).isEqualTo("foo"); + assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(1); + } + + @DataProvider(name = "topicNameMatch") + public Object[][] topicNameMatch() { + return new Object[][] { + new Object[] {"foo", true}, + new Object[] {"bar", false} + }; + } + + @Test(dataProvider = "topicNameMatch") + public void testTopicNameMatch(String topicName, boolean expectedMatch) { + TopicConsumerConfigurationData topicConsumerConfigurationData = TopicConsumerConfigurationData + .of("^foo$", 1); + assertThat(topicConsumerConfigurationData.matchesTopicName(topicName)).isEqualTo(expectedMatch); + } +} \ No newline at end of file