Skip to content

Commit

Permalink
[fix][client] Make DeadLetterPolicy & KeySharedPolicy serializable (#…
Browse files Browse the repository at this point in the history
…23718)

Co-authored-by: anurag.reddy <[email protected]>
  • Loading branch information
AnuragReddy2000 and anurag.reddy authored Dec 24, 2024
1 parent 1967a93 commit 14129e3
Show file tree
Hide file tree
Showing 7 changed files with 66 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -991,8 +991,8 @@ public void testBatchMessageDispatchingAccordingToPermits() throws Exception {

int numMsgs = 1000;
int batchMessages = 10;
final String topicName = "persistent://prop/ns-abc/testRetrieveSequenceIdSpecify-" + UUID.randomUUID();
final String subscriptionName = "sub-1";
final String topicName = "persistent://prop/ns-abc/testBatchMessageDispatchingAccordingToPermits-" + UUID.randomUUID();
final String subscriptionName = "bmdap-sub-1";

ConsumerImpl<byte[]> consumer1 = (ConsumerImpl<byte[]>) pulsarClient.newConsumer().topic(topicName)
.subscriptionName(subscriptionName).receiverQueueSize(10).subscriptionType(SubscriptionType.Shared)
Expand All @@ -1017,6 +1017,7 @@ public void testBatchMessageDispatchingAccordingToPermits() throws Exception {

producer.close();
consumer1.close();
consumer2.close();
}

@Test(dataProvider="testSubTypeAndEnableBatch")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import java.io.Serializable;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
Expand All @@ -36,7 +37,8 @@
@AllArgsConstructor
@InterfaceAudience.Public
@InterfaceStability.Stable
public class DeadLetterPolicy {
public class DeadLetterPolicy implements Serializable {
private static final long serialVersionUID = 1L;

/**
* Maximum number of times that a message will be redelivered before being sent to the dead letter queue.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
Expand All @@ -29,7 +30,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class KeySharedPolicy {
public abstract class KeySharedPolicy implements Serializable {

protected KeySharedMode keySharedMode;

Expand Down Expand Up @@ -82,6 +83,7 @@ public int getHashRangeTotal() {
* for message, the cursor will rewind.
*/
public static class KeySharedPolicySticky extends KeySharedPolicy {
private static final long serialVersionUID = 1L;

protected final List<Range> ranges;

Expand Down Expand Up @@ -129,6 +131,7 @@ public List<Range> getRanges() {
* Auto split hash range key shared policy.
*/
public static class KeySharedPolicyAutoSplit extends KeySharedPolicy {
private static final long serialVersionUID = 1L;

KeySharedPolicyAutoSplit() {
this.keySharedMode = KeySharedMode.AUTO_SPLIT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import java.io.Serializable;
import java.util.Objects;
import org.apache.pulsar.common.classification.InterfaceAudience;
import org.apache.pulsar.common.classification.InterfaceStability;
Expand All @@ -27,7 +28,8 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class Range implements Comparable<Range> {
public class Range implements Comparable<Range>, Serializable {
private static final long serialVersionUID = 1L;

private final int start;
private final int end;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public int getMaxPendingChuckedMessage() {
+ "When specifying the dead letter policy while not specifying `ackTimeoutMillis`, you can set the"
+ " ack timeout to 30000 millisecond."
)
private transient DeadLetterPolicy deadLetterPolicy;
private DeadLetterPolicy deadLetterPolicy;

private boolean retryEnable = false;

Expand Down Expand Up @@ -388,7 +388,7 @@ public int getMaxPendingChuckedMessage() {
private boolean resetIncludeHead = false;

@JsonIgnore
private transient KeySharedPolicy keySharedPolicy;
private KeySharedPolicy keySharedPolicy;

private boolean batchIndexAckEnabled = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ public class ReaderConfigurationData<T> implements Serializable, Cloneable {
)
private boolean resetIncludeHead = false;

private transient List<Range> keyHashRanges;
private List<Range> keyHashRanges;

private boolean poolMessages = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,18 @@
package org.apache.pulsar.client.impl.conf;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collections;
import java.util.regex.Pattern;

import lombok.Cleanup;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.SubscriptionType;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

Expand All @@ -45,4 +56,43 @@ public void testTopicConsumerConfigurationData(String topicName, int expectedPri

assertThat(topicConsumerConfigurationData.getPriorityLevel()).isEqualTo(expectedPriority);
}

@Test
public void testSerializable() throws Exception {
ConsumerConfigurationData<String> consumerConfigurationData = new ConsumerConfigurationData<>();
consumerConfigurationData.setPriorityLevel(1);
consumerConfigurationData.setSubscriptionName("my-sub");
consumerConfigurationData.setSubscriptionType(SubscriptionType.Shared);
consumerConfigurationData.setReceiverQueueSize(100);
consumerConfigurationData.setAckTimeoutMillis(1000);
consumerConfigurationData.setTopicNames(Collections.singleton("my-topic"));

DeadLetterPolicy deadLetterPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(10)
.retryLetterTopic("retry-topic")
.deadLetterTopic("dead-topic")
.build();
consumerConfigurationData.setDeadLetterPolicy(deadLetterPolicy);

@Cleanup
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@Cleanup
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(consumerConfigurationData);
byte[] serialized = bos.toByteArray();

// Deserialize
@Cleanup
ByteArrayInputStream bis = new ByteArrayInputStream(serialized);
@Cleanup
ObjectInputStream ois = new ObjectInputStream(bis);
Object object = ois.readObject();

Assert.assertEquals(object.getClass(), ConsumerConfigurationData.class);
Assert.assertEquals(object, consumerConfigurationData);

DeadLetterPolicy deserialisedDeadLetterPolicy = ((ConsumerConfigurationData<?>) object).getDeadLetterPolicy();
Assert.assertNotNull(deserialisedDeadLetterPolicy);
Assert.assertEquals(deserialisedDeadLetterPolicy, deadLetterPolicy);
}
}

0 comments on commit 14129e3

Please sign in to comment.