Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][broker] Key_Shared subscription: Reject consumers with incompatible policy #23449

Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
} else {
if (consumer.subType() != dispatcher.getType()) {
return FutureUtil.failedFuture(new SubscriptionBusyException("Subscription is of different type"));
} else if (dispatcher.getType() == SubType.Key_Shared) {
KeySharedMeta dispatcherKsm = dispatcher.getConsumers().get(0).getKeySharedMeta();
KeySharedMeta consumerKsm = consumer.getKeySharedMeta();
if (dispatcherKsm.getKeySharedMode() != consumerKsm.getKeySharedMode()) {
return FutureUtil.failedFuture(
new SubscriptionBusyException("Subscription is of different key_shared mode"));
}
if (dispatcherKsm.isAllowOutOfOrderDelivery() != consumerKsm.isAllowOutOfOrderDelivery()) {
return FutureUtil.failedFuture(
new SubscriptionBusyException(dispatcherKsm.isAllowOutOfOrderDelivery()
? "Subscription allows out of order delivery" :
"Subscription does not allow out of order delivery"));
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,19 @@ private CompletableFuture<Void> addConsumerInternal(Consumer consumer) {
if (consumer.subType() != dispatcher.getType()) {
return FutureUtil.failedFuture(
new SubscriptionBusyException("Subscription is of different type"));
} else if (dispatcher.getType() == SubType.Key_Shared) {
KeySharedMeta dispatcherKsm = dispatcher.getConsumers().get(0).getKeySharedMeta();
KeySharedMeta consumerKsm = consumer.getKeySharedMeta();
if (dispatcherKsm.getKeySharedMode() != consumerKsm.getKeySharedMode()) {
return FutureUtil.failedFuture(
new SubscriptionBusyException("Subscription is of different key_shared mode"));
}
if (dispatcherKsm.isAllowOutOfOrderDelivery() != consumerKsm.isAllowOutOfOrderDelivery()) {
return FutureUtil.failedFuture(
new SubscriptionBusyException(dispatcherKsm.isAllowOutOfOrderDelivery()
? "Subscription allows out of order delivery" :
"Subscription does not allow out of order delivery"));
lhotari marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;

import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.testng.annotations.DataProvider;

public abstract class SubscriptionTestBase {
lhotari marked this conversation as resolved.
Show resolved Hide resolved

protected final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
protected final String subName = "subscriptionName";

@DataProvider(name = "incompatibleKeySharedPolicies")
public Object[][] incompatibleKeySharedPolicies() {
KeySharedMeta ksmSticky = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY);
ksmSticky.addHashRange().setStart(0).setEnd(2);

KeySharedMeta ksmStickyAllowOutOfOrder = new KeySharedMeta().setKeySharedMode(KeySharedMode.STICKY)
.setAllowOutOfOrderDelivery(true);
ksmStickyAllowOutOfOrder.addHashRange().setStart(3).setEnd(5);

KeySharedMeta ksmAutoSplit = new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT);
KeySharedMeta ksmAutoSplitAllowOutOfOrder = new KeySharedMeta().setKeySharedMode(KeySharedMode.AUTO_SPLIT)
.setAllowOutOfOrderDelivery(true);

String errorMessageDifferentMode = "Subscription is of different key_shared mode";
String errorMessageOutOfOrderNotAllowed = "Subscription does not allow out of order delivery";
String errorMessageOutOfOrderAllowed = "Subscription allows out of order delivery";

return new Object[][] {
{ ksmAutoSplit, ksmSticky, errorMessageDifferentMode },
{ ksmAutoSplit, ksmStickyAllowOutOfOrder, errorMessageDifferentMode },
{ ksmAutoSplit, ksmAutoSplitAllowOutOfOrder, errorMessageOutOfOrderNotAllowed },

{ ksmAutoSplitAllowOutOfOrder, ksmSticky, errorMessageDifferentMode },
{ ksmAutoSplitAllowOutOfOrder, ksmStickyAllowOutOfOrder, errorMessageDifferentMode },
{ ksmAutoSplitAllowOutOfOrder, ksmAutoSplit, errorMessageOutOfOrderAllowed },

{ ksmSticky, ksmStickyAllowOutOfOrder, errorMessageOutOfOrderNotAllowed },
{ ksmSticky, ksmAutoSplit, errorMessageDifferentMode },
{ ksmSticky, ksmAutoSplitAllowOutOfOrder, errorMessageDifferentMode },

{ ksmStickyAllowOutOfOrder, ksmSticky, errorMessageOutOfOrderAllowed },
{ ksmStickyAllowOutOfOrder, ksmAutoSplit, errorMessageDifferentMode },
{ ksmStickyAllowOutOfOrder, ksmAutoSplitAllowOutOfOrder, errorMessageDifferentMode }
};
}

protected Consumer createKeySharedMockConsumer(String name, KeySharedMeta ksm) {
Consumer consumer = BrokerTestUtil.createMockConsumer(name);
doReturn(CommandSubscribe.SubType.Key_Shared).when(consumer).subType();
doReturn(ksm).when(consumer).getKeySharedMeta();
doReturn(mock(PendingAcksMap.class)).when(consumer).getPendingAcks();
return consumer;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;

import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.SubscriptionTestBase;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class NonPersistentSubscriptionTest extends SubscriptionTestBase {

@Test(dataProvider = "incompatibleKeySharedPolicies")
public void testIncompatibleKeySharedPoliciesNotAllowed(KeySharedMeta consumer1Ksm, KeySharedMeta consumer2Ksm,
String expectedErrorMessage) throws Exception {
PulsarTestContext context = PulsarTestContext.builderForNonStartableContext().build();
lhotari marked this conversation as resolved.
Show resolved Hide resolved
NonPersistentTopic topic = new NonPersistentTopic(successTopicName, context.getBrokerService());
NonPersistentSubscription sub = new NonPersistentSubscription(topic, subName, Map.of());

// two consumers with incompatible key_shared policies
Consumer keySharedConsumerMock1 = createKeySharedMockConsumer("consumer-1", consumer1Ksm);
Consumer keySharedConsumerMock2 = createKeySharedMockConsumer("consumer-2", consumer2Ksm);

// first consumer defines key_shared mode of subscription and whether out of order delivery is allowed
sub.addConsumer(keySharedConsumerMock1).get(5, TimeUnit.SECONDS);

try {
// add second consumer with incompatible key_shared policy
sub.addConsumer(keySharedConsumerMock2).get(5, TimeUnit.SECONDS);
fail(SubscriptionBusyException.class.getSimpleName() + " not thrown");
} catch (Exception e) {
// subscription throws exception when consumer with incompatible key_shared policy is added
Throwable cause = e.getCause();
assertTrue(cause instanceof SubscriptionBusyException);
assertEquals(cause.getMessage(), expectedErrorMessage);
}

context.close();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import io.netty.channel.EventLoopGroup;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -35,7 +34,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand All @@ -46,7 +45,9 @@
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.commons.lang3.tuple.MutablePair;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.SubscriptionTestBase;
import org.apache.pulsar.broker.testcontext.PulsarTestContext;
import org.apache.pulsar.broker.transaction.buffer.impl.InMemTransactionBufferProvider;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
Expand All @@ -56,18 +57,17 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.TxnAction;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.transaction.common.exception.TransactionConflictException;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

@Test(groups = "broker")
public class PersistentSubscriptionTest {
public class PersistentSubscriptionTest extends SubscriptionTestBase {

private PulsarTestContext pulsarTestContext;
private ManagedLedger ledgerMock;
Expand All @@ -77,17 +77,9 @@ public class PersistentSubscriptionTest {
private Consumer consumerMock;
private ManagedLedgerConfig managedLedgerConfigMock;

final String successTopicName = "persistent://prop/use/ns-abc/successTopic";
final String subName = "subscriptionName";

final TxnID txnID1 = new TxnID(1,1);
final TxnID txnID2 = new TxnID(1,2);

private static final Logger log = LoggerFactory.getLogger(PersistentTopicTest.class);

private OrderedExecutor executor;
private EventLoopGroup eventLoopGroup;

@BeforeMethod
public void setup() throws Exception {
pulsarTestContext = PulsarTestContext.builderForNonStartableContext()
Expand Down Expand Up @@ -235,6 +227,34 @@ public void testAcknowledgeUpdateCursorLastActive() throws Exception {
assertTrue(persistentSubscription.cursor.getLastActive() > beforeAcknowledgeTimestamp);
}

@Test(dataProvider = "incompatibleKeySharedPolicies")
public void testIncompatibleKeySharedPoliciesNotAllowed(KeySharedMeta consumer1Ksm, KeySharedMeta consumer2Ksm,
String expectedErrorMessage) throws Exception {
PulsarTestContext context = PulsarTestContext.builderForNonStartableContext().build();
PersistentTopic topic = new PersistentTopic(successTopicName, ledgerMock, context.getBrokerService());
PersistentSubscription sub = new PersistentSubscription(topic, subName, cursorMock, false);

// two consumers with incompatible key_shared policies
Consumer keySharedConsumerMock1 = createKeySharedMockConsumer("consumer-1", consumer1Ksm);
Consumer keySharedConsumerMock2 = createKeySharedMockConsumer("consumer-2", consumer2Ksm);

// first consumer defines key_shared mode of subscription and whether out of order delivery is allowed
sub.addConsumer(keySharedConsumerMock1).get(5, TimeUnit.SECONDS);

try {
// add second consumer with incompatible key_shared policy
sub.addConsumer(keySharedConsumerMock2).get(5, TimeUnit.SECONDS);
fail(SubscriptionBusyException.class.getSimpleName() + " not thrown");
} catch (Exception e) {
// subscription throws exception when consumer with incompatible key_shared policy is added
Throwable cause = e.getCause();
assertTrue(cause instanceof SubscriptionBusyException);
assertEquals(cause.getMessage(), expectedErrorMessage);
}

context.close();
}

public static class CustomTransactionPendingAckStoreProvider implements TransactionPendingAckStoreProvider {
@Override
public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscription subscription) {
Expand Down
Loading