Skip to content

Commit

Permalink
Fix bug when send multi messages in pulsar sender (#277)
Browse files Browse the repository at this point in the history
- Fix bug when send multi messages in pulsar sender
- Add corresponding test
  • Loading branch information
CodePrometheus authored Feb 13, 2025
1 parent 80676b9 commit 9927591
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -245,28 +245,27 @@ public Builder messageProps(Map<String, Object> messagePropsMap) {

void sender(byte[] message) {
if (closeCalled) throw new ClosedSenderException();
sendMessage(message);
try {
get().newMessage()
.value(message)
.loadConf(messageProps)
.sendAsync();
} catch (Exception e) {
cleanup();
throw new RuntimeException("Pulsar producer send message failed." + e.getMessage(), e);
}
}

void sendMessage(byte[] message) {
Producer<byte[]> get() {
if (client == null) {
synchronized (this) {
if (client == null) {
client = createClient();
producer = createProducer(client);

try {
producer.newMessage()
.value(message)
.loadConf(messageProps)
.sendAsync();
} catch (Exception e) {
cleanup();
throw new RuntimeException("Pulsar producer send failed." + e.getMessage(), e);
}
}
}
}
return producer;
}

Producer<byte[]> createProducer(PulsarClient client) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package zipkin2.reporter.pulsar;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
Expand Down Expand Up @@ -97,6 +98,28 @@ class ITPulsarSender {
}
}

@Test void send_multiple_JSON_messages() throws Exception {
try (PulsarSender sender = pulsar.newSenderBuilder(testName)
.encoding(Encoding.JSON)
.build()) {
int size = 10;
for (int i = 0; i < size; i++) {
send(sender, CLIENT_SPAN);
}
for (int i = 0; i < size; i++) {
assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender)))
.hasSize(1).containsExactly(CLIENT_SPAN);
}

send(sender, CLIENT_SPAN);
send(sender, CLIENT_SPAN, CLIENT_SPAN);
assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender)))
.hasSize(1).containsExactly(CLIENT_SPAN);
assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender)))
.hasSize(2).containsExactly(CLIENT_SPAN, CLIENT_SPAN);
}
}

@Test void illegalToSendWhenClosed() throws IOException {
try (PulsarSender sender = pulsar.newSenderBuilder(testName).build()) {
sender.close();
Expand Down Expand Up @@ -144,27 +167,34 @@ byte[] readMessage(PulsarSender sender) throws Exception {
final CountDownLatch countDown = new CountDownLatch(1);
final AtomicReference<byte[]> result = new AtomicReference<>();

try (Consumer<byte[]> ignored = sender.client.newConsumer()
Consumer<byte[]> consumer = null;
Message<byte[]> message = null;
try {
consumer = sender.client.newConsumer()
.topic(sender.topic)
.subscriptionName("zipkin-subscription")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener((consumer, message) -> {
try {
result.set(message.getData());
countDown.countDown();
consumer.acknowledge(message);
} catch (Exception e) {
consumer.negativeAcknowledge(message);
}
}).subscribe()) {

assertThat(countDown.await(10, TimeUnit.SECONDS))
.withFailMessage("Timed out waiting to read message.")
.isTrue();
assertThat(result)
.withFailMessage("Message data is null in Pulsar consumer.")
.isNotNull();
return result.get();
.subscribe();
message = consumer.receive(10, TimeUnit.SECONDS);
result.set(message.getData());
countDown.countDown();
consumer.acknowledge(message);
} catch (Exception e) {
if (consumer != null) {
consumer.negativeAcknowledge(message);
}
} finally {
if (consumer != null) {
consumer.close();
}
}

assertThat(countDown.await(10, TimeUnit.SECONDS))
.withFailMessage("Timed out waiting to read message.")
.isTrue();
assertThat(result)
.withFailMessage("Message data is null in Pulsar consumer.")
.isNotNull();
return result.get();
}
}

0 comments on commit 9927591

Please sign in to comment.