From 109f04251f824405e88702eee1053a3663f1b742 Mon Sep 17 00:00:00 2001 From: Zixuan Liu Date: Fri, 20 Dec 2024 20:58:03 +0800 Subject: [PATCH] [improve][client] Make replicateSubscriptionState nullable (#23757) Signed-off-by: Zixuan Liu (cherry picked from commit 3fce3097c76a9c8cb64cf3d8d87f6e050e6cb3a5) --- .../pulsar/broker/service/ServerCnx.java | 4 +- .../broker/service/SubscriptionOption.java | 2 +- .../nonpersistent/NonPersistentTopic.java | 4 +- .../persistent/PersistentSubscription.java | 19 +++- .../service/persistent/PersistentTopic.java | 19 ++-- .../client/api/ReplicateSubscriptionTest.java | 96 +++++++++++++++++++ .../pulsar/client/impl/ConsumerImpl.java | 2 +- .../impl/conf/ConsumerConfigurationData.java | 14 ++- .../client/impl/ConsumerBuilderImplTest.java | 36 ++++++- .../pulsar/common/protocol/Commands.java | 8 +- 10 files changed, 176 insertions(+), 28 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java index ca91981963bde..8be363171dfab 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnx.java @@ -1229,8 +1229,8 @@ protected void handleSubscribe(final CommandSubscribe subscribe) { ? subscribe.getStartMessageRollbackDurationSec() : -1; final SchemaData schema = subscribe.hasSchema() ? getSchema(subscribe.getSchema()) : null; - final boolean isReplicated = subscribe.hasReplicateSubscriptionState() - && subscribe.isReplicateSubscriptionState(); + final Boolean isReplicated = + subscribe.hasReplicateSubscriptionState() ? subscribe.isReplicateSubscriptionState() : null; final boolean forceTopicCreation = subscribe.isForceTopicCreation(); final KeySharedMeta keySharedMeta = subscribe.hasKeySharedMeta() ? new KeySharedMeta().copyFrom(subscribe.getKeySharedMeta()) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java index af56d023616b4..328e7618f8cd8 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/SubscriptionOption.java @@ -46,7 +46,7 @@ public class SubscriptionOption { private boolean readCompacted; private CommandSubscribe.InitialPosition initialPosition; private long startMessageRollbackDurationSec; - private boolean replicatedSubscriptionStateArg; + private Boolean replicatedSubscriptionStateArg; private KeySharedMeta keySharedMeta; private Optional> subscriptionProperties; private long consumerEpoch; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java index 9d2f3588dc995..95df15956a47b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java @@ -267,7 +267,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { return internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(), option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(), - option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(), + option.getStartMessageRollbackDurationSec(), option.getReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(null), option.getSchemaType()); } @@ -290,7 +290,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St String consumerName, MessageId startMessageId, Map metadata, boolean readCompacted, long resetStartMessageBackInSec, - boolean replicateSubscriptionState, + Boolean replicateSubscriptionState, KeySharedMeta keySharedMeta, Map subscriptionProperties, SchemaType schemaType) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java index fde6a0716eb30..4ae289d7bae15 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentSubscription.java @@ -135,9 +135,11 @@ public class PersistentSubscription extends AbstractSubscription { private volatile Map subscriptionProperties; private volatile CompletableFuture fenceFuture; private volatile CompletableFuture inProgressResetCursorFuture; + private volatile Boolean replicatedControlled; - static Map getBaseCursorProperties(boolean isReplicated) { - return isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; + static Map getBaseCursorProperties(Boolean isReplicated) { + return isReplicated != null && isReplicated ? REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES : + NON_REPLICATED_SUBSCRIPTION_CURSOR_PROPERTIES; } static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) { @@ -145,19 +147,21 @@ static boolean isCursorFromReplicatedSubscription(ManagedCursor cursor) { } public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - boolean replicated) { + Boolean replicated) { this(topic, subscriptionName, cursor, replicated, Collections.emptyMap()); } public PersistentSubscription(PersistentTopic topic, String subscriptionName, ManagedCursor cursor, - boolean replicated, Map subscriptionProperties) { + Boolean replicated, Map subscriptionProperties) { this.topic = topic; this.cursor = cursor; this.topicName = topic.getName(); this.subName = subscriptionName; this.fullName = MoreObjects.toStringHelper(this).add("topic", topicName).add("name", subName).toString(); this.expiryMonitor = new PersistentMessageExpiryMonitor(topic, subscriptionName, cursor, this); - this.setReplicated(replicated); + if (replicated != null) { + this.setReplicated(replicated); + } this.subscriptionProperties = MapUtils.isEmpty(subscriptionProperties) ? Collections.emptyMap() : Collections.unmodifiableMap(subscriptionProperties); if (topic.getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled() @@ -196,6 +200,7 @@ public boolean isReplicated() { } public boolean setReplicated(boolean replicated) { + replicatedControlled = replicated; ServiceConfiguration config = topic.getBrokerService().getPulsar().getConfig(); if (!replicated || !config.isEnableReplicatedSubscriptions()) { @@ -1515,4 +1520,8 @@ public PositionInPendingAckStats checkPositionInPendingAckState(PositionImpl pos private static final Logger log = LoggerFactory.getLogger(PersistentSubscription.class); + @VisibleForTesting + public Boolean getReplicatedControlled() { + return replicatedControlled; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 20bdbe4effc1c..af3e021a97cbc 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -535,7 +535,7 @@ private void createPersistentSubscriptions() { } else { final String subscriptionName = Codec.decode(cursor.getName()); subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor, - PersistentSubscription.isCursorFromReplicatedSubscription(cursor), + PersistentSubscription.isCursorFromReplicatedSubscription(cursor) ? true : null, cursor.getCursorProperties())); // subscription-cursor gets activated by default: deactivate as there is no active subscription // right now @@ -606,7 +606,7 @@ public CompletableFuture unloadSubscription(@Nonnull String subName) { } private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor, - boolean replicated, Map subscriptionProperties) { + Boolean replicated, Map subscriptionProperties) { requireNonNull(topicCompactionService); if (isCompactionSubscription(subscriptionName) && topicCompactionService instanceof PulsarTopicCompactionService pulsarTopicCompactionService) { @@ -931,7 +931,7 @@ public CompletableFuture subscribe(SubscriptionOption option) { option.getSubType(), option.getPriorityLevel(), option.getConsumerName(), option.isDurable(), option.getStartMessageId(), option.getMetadata(), option.isReadCompacted(), option.getInitialPosition(), option.getStartMessageRollbackDurationSec(), - option.isReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), + option.getReplicatedSubscriptionStateArg(), option.getKeySharedMeta(), option.getSubscriptionProperties().orElse(Collections.emptyMap()), option.getConsumerEpoch(), option.getSchemaType()); } @@ -943,7 +943,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St Map metadata, boolean readCompacted, InitialPosition initialPosition, long startMessageRollbackDurationSec, - boolean replicatedSubscriptionStateArg, + Boolean replicatedSubscriptionStateArg, KeySharedMeta keySharedMeta, Map subscriptionProperties, long consumerEpoch, @@ -954,12 +954,9 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St } return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> { - boolean replicatedSubscriptionState = replicatedSubscriptionStateArg; - - if (replicatedSubscriptionState + if (replicatedSubscriptionStateArg != null && replicatedSubscriptionStateArg && !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) { log.warn("[{}] Replicated Subscription is disabled by broker.", getName()); - replicatedSubscriptionState = false; } if (subType == SubType.Key_Shared @@ -1028,7 +1025,7 @@ private CompletableFuture internalSubscribe(final TransportCnx cnx, St CompletableFuture subscriptionFuture = isDurable ? getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, - replicatedSubscriptionState, subscriptionProperties) + replicatedSubscriptionStateArg, subscriptionProperties) : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec, readCompacted, subscriptionProperties); @@ -1125,7 +1122,7 @@ public CompletableFuture subscribe(final TransportCnx cnx, String subs private CompletableFuture getDurableSubscription(String subscriptionName, InitialPosition initialPosition, long startMessageRollbackDurationSec, - boolean replicated, + Boolean replicated, Map subscriptionProperties) { CompletableFuture subscriptionFuture = new CompletableFuture<>(); if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) { @@ -1156,7 +1153,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { return; } } - if (replicated && !subscription.isReplicated()) { + if (replicated != null && replicated && !subscription.isReplicated()) { // Flip the subscription state subscription.setReplicated(replicated); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java new file mode 100644 index 0000000000000..327081bf1b9c8 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ReplicateSubscriptionTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import lombok.Cleanup; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.Topic; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.client.impl.ConsumerBuilderImpl; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +public class ReplicateSubscriptionTest extends ProducerConsumerBase { + + @BeforeClass + @Override + protected void setup() throws Exception { + super.internalSetup(); + super.producerBaseSetup(); + } + + @AfterClass(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + } + + @DataProvider + public Object[] replicateSubscriptionState() { + return new Object[]{ + Boolean.TRUE, + Boolean.FALSE, + null + }; + } + + @Test(dataProvider = "replicateSubscriptionState") + public void testReplicateSubscriptionState(Boolean replicateSubscriptionState) + throws Exception { + String topic = "persistent://my-property/my-ns/" + System.nanoTime(); + String subName = "sub-" + System.nanoTime(); + ConsumerBuilder consumerBuilder = pulsarClient.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subName); + if (replicateSubscriptionState != null) { + consumerBuilder.replicateSubscriptionState(replicateSubscriptionState); + } + ConsumerBuilderImpl consumerBuilderImpl = (ConsumerBuilderImpl) consumerBuilder; + assertEquals(consumerBuilderImpl.getConf().getReplicateSubscriptionState(), replicateSubscriptionState); + @Cleanup + Consumer ignored = consumerBuilder.subscribe(); + CompletableFuture> topicIfExists = pulsar.getBrokerService().getTopicIfExists(topic); + assertThat(topicIfExists) + .succeedsWithin(3, TimeUnit.SECONDS) + .matches(optionalTopic -> { + assertTrue(optionalTopic.isPresent()); + Topic topicRef = optionalTopic.get(); + Subscription subscription = topicRef.getSubscription(subName); + assertNotNull(subscription); + assertTrue(subscription instanceof PersistentSubscription); + PersistentSubscription persistentSubscription = (PersistentSubscription) subscription; + assertEquals(persistentSubscription.getReplicatedControlled(), replicateSubscriptionState); + return true; + }); + } +} diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java index df5d2f7184ff8..8292fc35755dc 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java @@ -899,7 +899,7 @@ public CompletableFuture connectionOpened(final ClientCnx cnx) { synchronized (this) { ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(), priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted, - conf.isReplicateSubscriptionState(), + conf.getReplicateSubscriptionState(), InitialPosition.valueOf(subscriptionInitialPosition.getValue()), startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(), // Use the current epoch to subscribe. diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java index 3ae0e977d13c4..80a8594024075 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ConsumerConfigurationData.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkArgument; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Sets; import io.swagger.annotations.ApiModelProperty; import java.io.Serializable; @@ -378,7 +379,8 @@ public int getMaxPendingChuckedMessage() { value = "If `replicateSubscriptionState` is enabled, a subscription state is replicated to geo-replicated" + " clusters." ) - private boolean replicateSubscriptionState = false; + @JsonProperty(access = JsonProperty.Access.READ_WRITE) + private Boolean replicateSubscriptionState; private boolean resetIncludeHead = false; @@ -434,4 +436,14 @@ public ConsumerConfigurationData clone() { throw new RuntimeException("Failed to clone ConsumerConfigurationData"); } } + + /** + * Backward compatibility with the old `replicateSubscriptionState` field. + * @deprecated Using {@link #getReplicateSubscriptionState()} instead. + */ + @JsonIgnore + @Deprecated + public boolean isReplicateSubscriptionState() { + return replicateSubscriptionState != null && replicateSubscriptionState; + } } diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java index e4b7b4d1ec85e..c103712d40055 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerBuilderImplTest.java @@ -504,7 +504,7 @@ public void testLoadConf() throws Exception { assertTrue(configurationData.isRetryEnable()); assertFalse(configurationData.isAutoUpdatePartitions()); assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 2); - assertTrue(configurationData.isReplicateSubscriptionState()); + assertEquals(configurationData.getReplicateSubscriptionState(), Boolean.TRUE); assertTrue(configurationData.isResetIncludeHead()); assertTrue(configurationData.isBatchIndexAckEnabled()); assertTrue(configurationData.isAckReceiptEnabled()); @@ -564,7 +564,7 @@ public void testLoadConfNotModified() { assertFalse(configurationData.isRetryEnable()); assertTrue(configurationData.isAutoUpdatePartitions()); assertEquals(configurationData.getAutoUpdatePartitionsIntervalSeconds(), 60); - assertFalse(configurationData.isReplicateSubscriptionState()); + assertNull(configurationData.getReplicateSubscriptionState()); assertFalse(configurationData.isResetIncludeHead()); assertFalse(configurationData.isBatchIndexAckEnabled()); assertFalse(configurationData.isAckReceiptEnabled()); @@ -584,6 +584,38 @@ public void testLoadConfNotModified() { assertNull(configurationData.getPayloadProcessor()); } + @Test + public void testReplicateSubscriptionState() { + ConsumerBuilderImpl consumerBuilder = createConsumerBuilder(); + assertNull(consumerBuilder.getConf().getReplicateSubscriptionState()); + + consumerBuilder.replicateSubscriptionState(true); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.TRUE); + + consumerBuilder.replicateSubscriptionState(false); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.FALSE); + + Map conf = new HashMap<>(); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertNull(consumerBuilder.getConf().getReplicateSubscriptionState()); + + conf.put("replicateSubscriptionState", true); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.TRUE); + + conf.put("replicateSubscriptionState", false); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertEquals(consumerBuilder.getConf().getReplicateSubscriptionState(), Boolean.FALSE); + + conf.put("replicateSubscriptionState", null); + consumerBuilder = createConsumerBuilder(); + consumerBuilder.loadConf(conf); + assertNull(consumerBuilder.getConf().getReplicateSubscriptionState()); + } + private ConsumerBuilderImpl createConsumerBuilder() { ConsumerBuilderImpl consumerBuilder = new ConsumerBuilderImpl<>(null, Schema.BYTES); Map properties = new HashMap<>(); diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java index 21604e2c944c9..2147aa8a8714b 100644 --- a/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java +++ b/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java @@ -583,7 +583,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, + Map metadata, boolean readCompacted, Boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist) { return newSubscribe(topic, subscription, consumerId, requestId, subType, priorityLevel, consumerName, @@ -594,7 +594,7 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu public static ByteBuf newSubscribe(String topic, String subscription, long consumerId, long requestId, SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageIdData startMessageId, - Map metadata, boolean readCompacted, boolean isReplicated, + Map metadata, boolean readCompacted, Boolean isReplicated, InitialPosition subscriptionInitialPosition, long startMessageRollbackDurationInSec, SchemaInfo schemaInfo, boolean createTopicIfDoesNotExist, KeySharedPolicy keySharedPolicy, Map subscriptionProperties, long consumerEpoch) { @@ -610,9 +610,11 @@ public static ByteBuf newSubscribe(String topic, String subscription, long consu .setDurable(isDurable) .setReadCompacted(readCompacted) .setInitialPosition(subscriptionInitialPosition) - .setReplicateSubscriptionState(isReplicated) .setForceTopicCreation(createTopicIfDoesNotExist) .setConsumerEpoch(consumerEpoch); + if (isReplicated != null) { + subscribe.setReplicateSubscriptionState(isReplicated); + } if (subscriptionProperties != null && !subscriptionProperties.isEmpty()) { List keyValues = new ArrayList<>();