diff --git a/pulsar-client/src/main/java/zipkin2/reporter/pulsar/PulsarSender.java b/pulsar-client/src/main/java/zipkin2/reporter/pulsar/PulsarSender.java index 579a9e79..2a65b61c 100644 --- a/pulsar-client/src/main/java/zipkin2/reporter/pulsar/PulsarSender.java +++ b/pulsar-client/src/main/java/zipkin2/reporter/pulsar/PulsarSender.java @@ -245,28 +245,27 @@ public Builder messageProps(Map messagePropsMap) { void sender(byte[] message) { if (closeCalled) throw new ClosedSenderException(); - sendMessage(message); + try { + get().newMessage() + .value(message) + .loadConf(messageProps) + .sendAsync(); + } catch (Exception e) { + cleanup(); + throw new RuntimeException("Pulsar producer send message failed." + e.getMessage(), e); + } } - void sendMessage(byte[] message) { + Producer get() { if (client == null) { synchronized (this) { if (client == null) { client = createClient(); producer = createProducer(client); - - try { - producer.newMessage() - .value(message) - .loadConf(messageProps) - .sendAsync(); - } catch (Exception e) { - cleanup(); - throw new RuntimeException("Pulsar producer send failed." + e.getMessage(), e); - } } } } + return producer; } Producer createProducer(PulsarClient client) { diff --git a/pulsar-client/src/test/java/zipkin2/reporter/pulsar/ITPulsarSender.java b/pulsar-client/src/test/java/zipkin2/reporter/pulsar/ITPulsarSender.java index 8fe4dc7a..1e09b8e1 100644 --- a/pulsar-client/src/test/java/zipkin2/reporter/pulsar/ITPulsarSender.java +++ b/pulsar-client/src/test/java/zipkin2/reporter/pulsar/ITPulsarSender.java @@ -5,6 +5,7 @@ package zipkin2.reporter.pulsar; import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.SubscriptionInitialPosition; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Tag; @@ -97,6 +98,28 @@ class ITPulsarSender { } } + @Test void send_multiple_JSON_messages() throws Exception { + try (PulsarSender sender = pulsar.newSenderBuilder(testName) + .encoding(Encoding.JSON) + .build()) { + int size = 10; + for (int i = 0; i < size; i++) { + send(sender, CLIENT_SPAN); + } + for (int i = 0; i < size; i++) { + assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))) + .hasSize(1).containsExactly(CLIENT_SPAN); + } + + send(sender, CLIENT_SPAN); + send(sender, CLIENT_SPAN, CLIENT_SPAN); + assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))) + .hasSize(1).containsExactly(CLIENT_SPAN); + assertThat(SpanBytesDecoder.JSON_V2.decodeList(readMessage(sender))) + .hasSize(2).containsExactly(CLIENT_SPAN, CLIENT_SPAN); + } + } + @Test void illegalToSendWhenClosed() throws IOException { try (PulsarSender sender = pulsar.newSenderBuilder(testName).build()) { sender.close(); @@ -144,27 +167,34 @@ byte[] readMessage(PulsarSender sender) throws Exception { final CountDownLatch countDown = new CountDownLatch(1); final AtomicReference result = new AtomicReference<>(); - try (Consumer ignored = sender.client.newConsumer() + Consumer consumer = null; + Message message = null; + try { + consumer = sender.client.newConsumer() .topic(sender.topic) .subscriptionName("zipkin-subscription") .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .messageListener((consumer, message) -> { - try { - result.set(message.getData()); - countDown.countDown(); - consumer.acknowledge(message); - } catch (Exception e) { - consumer.negativeAcknowledge(message); - } - }).subscribe()) { - - assertThat(countDown.await(10, TimeUnit.SECONDS)) - .withFailMessage("Timed out waiting to read message.") - .isTrue(); - assertThat(result) - .withFailMessage("Message data is null in Pulsar consumer.") - .isNotNull(); - return result.get(); + .subscribe(); + message = consumer.receive(10, TimeUnit.SECONDS); + result.set(message.getData()); + countDown.countDown(); + consumer.acknowledge(message); + } catch (Exception e) { + if (consumer != null) { + consumer.negativeAcknowledge(message); + } + } finally { + if (consumer != null) { + consumer.close(); + } } + + assertThat(countDown.await(10, TimeUnit.SECONDS)) + .withFailMessage("Timed out waiting to read message.") + .isTrue(); + assertThat(result) + .withFailMessage("Message data is null in Pulsar consumer.") + .isNotNull(); + return result.get(); } }