diff --git a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java index 7e1fad62477..96086c7a255 100644 --- a/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java +++ b/client/src/test/java/org/apache/rocketmq/client/producer/DefaultMQProducerTest.java @@ -28,7 +28,9 @@ import org.apache.rocketmq.client.impl.factory.MQClientInstance; import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; import org.apache.rocketmq.client.impl.producer.TopicPublishInfo; +import org.apache.rocketmq.client.latency.MQFaultStrategy; import org.apache.rocketmq.common.UtilAll; +import org.apache.rocketmq.common.compression.CompressionType; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.message.MessageQueue; @@ -49,6 +51,7 @@ import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -82,15 +85,14 @@ public class DefaultMQProducerTest { private MQClientInstance mQClientFactory = MQClientManager.getInstance().getOrCreateMQClientInstance(new ClientConfig()); @Mock private MQClientAPIImpl mQClientAPIImpl; - @Mock - private NettyRemotingClient nettyRemotingClient; private DefaultMQProducer producer; private Message message; private Message zeroMsg; private Message bigMessage; - private String topic = "FooBar"; - private String producerGroupPrefix = "FooBar_PID"; + private final String topic = "FooBar"; + private final String producerGroupPrefix = "FooBar_PID"; + private final long defaultTimeout = 3000L; @Before public void init() throws Exception { @@ -196,7 +198,7 @@ public void onException(Throwable e) { countDownLatch.countDown(); } }); - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS); } @Test @@ -240,7 +242,7 @@ public MessageQueue select(List mqs, Message msg, Object arg) { //this message is send success producer.send(message, sendCallback, 1000); - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS); assertThat(cc.get()).isEqualTo(5); // off enableBackpressureForAsyncMode @@ -253,7 +255,7 @@ public MessageQueue select(List mqs, Message msg, Object arg) { //this message is send success producer.send(message, sendCallback, 1000); - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS); assertThat(cc.get()).isEqualTo(10); } @@ -301,7 +303,7 @@ public MessageQueue select(List mqs, Message msg, Object arg) { // this message is send failed producer.send(msgs, new MessageQueue(), sendCallback, 1000); - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS); assertThat(cc.get()).isEqualTo(1); // off enableBackpressureForAsyncMode @@ -312,7 +314,7 @@ public MessageQueue select(List mqs, Message msg, Object arg) { // this message is send failed producer.send(msgs, new MessageQueue(), sendCallback, 1000); - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS); assertThat(cc.get()).isEqualTo(2); } @@ -333,7 +335,7 @@ public void onSuccess(SendResult sendResult) { public void onException(Throwable e) { } }); - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS); } @Test @@ -472,7 +474,7 @@ public void onException(Throwable e) { future.setSendRequestOk(true); future.getRequestCallback().onSuccess(responseMsg); } - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS); } @Test @@ -509,7 +511,7 @@ public MessageQueue select(List mqs, Message msg, Object arg) { future.getRequestCallback().onException(e); } } - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS); assertThat(cc.get()).isEqualTo(1); } @@ -533,7 +535,7 @@ public void onException(Throwable e) { } }); - countDownLatch.await(3000L, TimeUnit.MILLISECONDS); + countDownLatch.await(defaultTimeout, TimeUnit.MILLISECONDS); producer.setAutoBatch(false); } @@ -662,4 +664,171 @@ public void assertCreateDefaultMQProducer() { assertTrue(producer5.isEnableTrace()); assertEquals("custom_trace_topic", producer5.getTraceTopic()); } + + @Test + public void assertSend() throws MQBrokerException, RemotingException, InterruptedException, MQClientException, NoSuchFieldException, IllegalAccessException { + setDefaultMQProducerImpl(); + setOtherParam(); + SendResult send = producer.send(message, defaultTimeout); + assertNull(send); + Collection msgs = Collections.singletonList(message); + send = producer.send(msgs); + assertNull(send); + send = producer.send(msgs, defaultTimeout); + assertNull(send); + } + + @Test + public void assertSendOneway() throws RemotingException, InterruptedException, MQClientException, NoSuchFieldException, IllegalAccessException { + setDefaultMQProducerImpl(); + producer.sendOneway(message); + MessageQueue mq = mock(MessageQueue.class); + producer.sendOneway(message, mq); + MessageQueueSelector selector = mock(MessageQueueSelector.class); + producer.sendOneway(message, selector, 1); + } + + @Test + public void assertSendByQueue() throws MQBrokerException, RemotingException, InterruptedException, MQClientException, NoSuchFieldException, IllegalAccessException { + setDefaultMQProducerImpl(); + MessageQueue mq = mock(MessageQueue.class); + SendResult send = producer.send(message, mq); + assertNull(send); + send = producer.send(message, mq, defaultTimeout); + assertNull(send); + Collection msgs = Collections.singletonList(message); + send = producer.send(msgs, mq); + assertNull(send); + send = producer.send(msgs, mq, defaultTimeout); + assertNull(send); + } + + @Test + public void assertSendByQueueSelector() throws MQBrokerException, RemotingException, InterruptedException, MQClientException, NoSuchFieldException, IllegalAccessException { + setDefaultMQProducerImpl(); + MessageQueueSelector selector = mock(MessageQueueSelector.class); + SendResult send = producer.send(message, selector, 1); + assertNull(send); + send = producer.send(message, selector, 1, defaultTimeout); + assertNull(send); + } + + @Test + public void assertRequest() throws MQBrokerException, RemotingException, InterruptedException, MQClientException, NoSuchFieldException, IllegalAccessException, RequestTimeoutException { + setDefaultMQProducerImpl(); + MessageQueueSelector selector = mock(MessageQueueSelector.class); + Message replyNsg = producer.request(message, selector, 1, defaultTimeout); + assertNull(replyNsg); + RequestCallback requestCallback = mock(RequestCallback.class); + producer.request(message, selector, 1, requestCallback, defaultTimeout); + MessageQueue mq = mock(MessageQueue.class); + producer.request(message, mq, defaultTimeout); + producer.request(message, mq, requestCallback, defaultTimeout); + } + + @Test(expected = RuntimeException.class) + public void assertSendMessageInTransaction() throws MQClientException { + TransactionSendResult result = producer.sendMessageInTransaction(message, 1); + assertNull(result); + } + + @Test + public void assertSearchOffset() throws MQClientException, NoSuchFieldException, IllegalAccessException { + setDefaultMQProducerImpl(); + MessageQueue mq = mock(MessageQueue.class); + long result = producer.searchOffset(mq, System.currentTimeMillis()); + assertEquals(0L, result); + } + + @Test + public void assertBatchMaxDelayMs() throws NoSuchFieldException, IllegalAccessException { + setProduceAccumulator(true); + assertEquals(0, producer.getBatchMaxDelayMs()); + setProduceAccumulator(false); + assertEquals(10, producer.getBatchMaxDelayMs()); + producer.batchMaxDelayMs(1000); + assertEquals(1000, producer.getBatchMaxDelayMs()); + } + + @Test + public void assertBatchMaxBytes() throws NoSuchFieldException, IllegalAccessException { + setProduceAccumulator(true); + assertEquals(0L, producer.getBatchMaxBytes()); + setProduceAccumulator(false); + assertEquals(32 * 1024L, producer.getBatchMaxBytes()); + producer.batchMaxBytes(64 * 1024L); + assertEquals(64 * 1024L, producer.getBatchMaxBytes()); + } + + @Test + public void assertTotalBatchMaxBytes() throws NoSuchFieldException, IllegalAccessException { + setProduceAccumulator(true); + assertEquals(0L, producer.getTotalBatchMaxBytes()); + } + + @Test + public void assertGetRetryResponseCodes() { + assertNotNull(producer.getRetryResponseCodes()); + assertEquals(7, producer.getRetryResponseCodes().size()); + } + + @Test + public void assertIsSendLatencyFaultEnable() { + assertFalse(producer.isSendLatencyFaultEnable()); + } + + @Test + public void assertGetLatencyMax() { + assertNotNull(producer.getLatencyMax()); + } + + @Test + public void assertGetNotAvailableDuration() { + assertNotNull(producer.getNotAvailableDuration()); + } + + @Test + public void assertIsRetryAnotherBrokerWhenNotStoreOK() { + assertFalse(producer.isRetryAnotherBrokerWhenNotStoreOK()); + } + + private void setOtherParam() { + producer.setCreateTopicKey("createTopicKey"); + producer.setRetryAnotherBrokerWhenNotStoreOK(false); + producer.setDefaultTopicQueueNums(6); + producer.setRetryTimesWhenSendFailed(1); + producer.setSendMessageWithVIPChannel(false); + producer.setNotAvailableDuration(new long[1]); + producer.setLatencyMax(new long[1]); + producer.setSendLatencyFaultEnable(false); + producer.setRetryTimesWhenSendAsyncFailed(1); + producer.setTopics(Collections.singletonList(topic)); + producer.setStartDetectorEnable(false); + producer.setCompressLevel(5); + producer.setCompressType(CompressionType.LZ4); + producer.addRetryResponseCode(0); + ExecutorService executorService = mock(ExecutorService.class); + producer.setAsyncSenderExecutor(executorService); + } + + private void setProduceAccumulator(final boolean isDefault) throws NoSuchFieldException, IllegalAccessException { + ProduceAccumulator accumulator = null; + if (!isDefault) { + accumulator = new ProduceAccumulator("instanceName"); + } + setField(producer, "produceAccumulator", accumulator); + } + + private void setDefaultMQProducerImpl() throws NoSuchFieldException, IllegalAccessException { + DefaultMQProducerImpl producerImpl = mock(DefaultMQProducerImpl.class); + setField(producer, "defaultMQProducerImpl", producerImpl); + when(producerImpl.getMqFaultStrategy()).thenReturn(mock(MQFaultStrategy.class)); + } + + private void setField(final Object target, final String fieldName, final Object newValue) throws NoSuchFieldException, IllegalAccessException { + Class clazz = target.getClass(); + Field field = clazz.getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, newValue); + } }