From 180bc11fe1e9f15a69c0d97d694d4dff2bc7cc5e Mon Sep 17 00:00:00 2001 From: Sreeram Garlapati Date: Thu, 16 Mar 2017 10:28:33 -0700 Subject: [PATCH] Release 0.12.0 (#76) * Optimization: flow the Receiver runtime info as part of EventData (#65) * Optimization: flow the Receiver runtime info as part of EventData - Partition receiver runtime metrics aka EndOfStream info * move cursor to 0.12.0-snapshot * receiverruntimemetrics on PartitionReceiver & eph + junits * improve cit coverage * Fix issue 58: deliver empty iterable to onEvents on timeout * fix: eventData deserialization error, when the payload size is >1000bytes (#71) * EventData deserialization fails when the payload size is >1000bytes * move to readFully api (suggestion from @CodingCat) * releaseChecklist: fix javadoc & update version to 0.12.0 (#73) * eph test fix - absorb all events sent by RuntimeInfotest to not effect other tests * runtimeinfo: refactoring for CR (#80) * runtimeinfo: refactoring for CR * minor refactor * update eph test * fix exception contract for link-detach errors - this should be a transient error (#78) --- ConsumingEvents.md | 2 +- PublishingEvents.md | 2 +- .../EventHubPartitionPump.java | 39 +- .../EventProcessorOptions.java | 36 +- .../eventprocessorhost/IEventProcessor.java | 8 +- .../eventprocessorhost/PartitionContext.java | 17 +- .../eventprocessorhost/PerTestSettings.java | 5 + .../PrefabEventProcessor.java | 18 + .../PrefabProcessorFactory.java | 11 + .../RealEventHubUtilities.java | 32 +- .../azure/eventprocessorhost/SmokeTest.java | 50 ++- .../azure/eventprocessorhost/TestBase.java | 21 +- .../microsoft/azure/eventhubs/EventData.java | 15 +- .../azure/eventhubs/EventDataUtil.java | 40 +- .../azure/eventhubs/EventHubClient.java | 414 +++++++++++++++++- .../azure/eventhubs/PartitionReceiver.java | 67 ++- .../azure/eventhubs/ReceiverOptions.java | 37 ++ .../eventhubs/ReceiverRuntimeInformation.java | 80 ++++ .../azure/servicebus/ClientConstants.java | 6 +- .../azure/servicebus/ExceptionUtil.java | 4 +- .../servicebus/IReceiverSettingsProvider.java | 2 + .../azure/servicebus/MessageReceiver.java | 6 +- .../microsoft/azure/servicebus/PassByRef.java | 18 + .../servicebus/QuotaExceededException.java | 4 + .../azure/servicebus/amqp/AmqpConstants.java | 9 +- .../eventhubs/eventdata/EventDataTest.java | 173 +++++++- .../sendrecv/ReceiverRuntimeMetricsTest.java | 105 +++++ pom.xml | 2 +- readme.md | 2 +- 29 files changed, 1136 insertions(+), 89 deletions(-) create mode 100644 azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java create mode 100644 azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverRuntimeInformation.java create mode 100644 azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/PassByRef.java create mode 100644 azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverRuntimeMetricsTest.java diff --git a/ConsumingEvents.md b/ConsumingEvents.md index 741102eb7..fb48dafa4 100644 --- a/ConsumingEvents.md +++ b/ConsumingEvents.md @@ -30,7 +30,7 @@ following dependency declaration inside of your Maven project file: com.microsoft.azure azure-eventhubs-clients - 0.11.0 + 0.12.0 ``` diff --git a/PublishingEvents.md b/PublishingEvents.md index 87a599b34..f2022cf97 100644 --- a/PublishingEvents.md +++ b/PublishingEvents.md @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file: com.microsoft.azure azure-eventhubs-clients - 0.11.0 + 0.12.0 ``` diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventHubPartitionPump.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventHubPartitionPump.java index 920d02628..4c762418f 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventHubPartitionPump.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventHubPartitionPump.java @@ -7,6 +7,7 @@ import java.io.IOException; import java.time.Instant; +import java.util.ArrayList; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.logging.Level; @@ -15,6 +16,7 @@ import com.microsoft.azure.eventhubs.EventHubClient; import com.microsoft.azure.eventhubs.PartitionReceiveHandler; import com.microsoft.azure.eventhubs.PartitionReceiver; +import com.microsoft.azure.eventhubs.ReceiverOptions; import com.microsoft.azure.servicebus.ReceiverDisconnectedException; import com.microsoft.azure.servicebus.ServiceBusException; @@ -96,19 +98,21 @@ private void openClients() throws ServiceBusException, IOException, InterruptedE this.eventHubClient = (EventHubClient) this.internalOperationFuture.get(); this.internalOperationFuture = null; - // Create new receiver and set options + // Create new receiver and set options + ReceiverOptions options = new ReceiverOptions(); + options.setReceiverRuntimeMetricEnabled(this.host.getEventProcessorOptions().getReceiverRuntimeMetricEnabled()); Object startAt = this.partitionContext.getInitialOffset(); long epoch = this.lease.getEpoch(); this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + startAt); if (startAt instanceof String) { this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), - (String)startAt, epoch); + (String)startAt, epoch, options); } else if (startAt instanceof Instant) { this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(), - (Instant)startAt, epoch); + (Instant)startAt, epoch, options); } else { @@ -191,13 +195,28 @@ private class InternalReceiveHandler extends PartitionReceiveHandler @Override public void onReceive(Iterable events) { - // This method is called on the thread that the Java EH client uses to run the pump. - // There is one pump per EventHubClient. Since each PartitionPump creates a new EventHubClient, - // using that thread to call onEvents does no harm. Even if onEvents is slow, the pump will - // get control back each time onEvents returns, and be able to receive a new batch of messages - // with which to make the next onEvents call. The pump gains nothing by running faster than onEvents. - - EventHubPartitionPump.this.onEvents(events); + if (EventHubPartitionPump.this.host.getEventProcessorOptions().getReceiverRuntimeMetricEnabled()) + { + EventHubPartitionPump.this.partitionContext.setRuntimeInformation(EventHubPartitionPump.this.partitionReceiver.getRuntimeInformation()); + } + + // This method is called on the thread that the Java EH client uses to run the pump. + // There is one pump per EventHubClient. Since each PartitionPump creates a new EventHubClient, + // using that thread to call onEvents does no harm. Even if onEvents is slow, the pump will + // get control back each time onEvents returns, and be able to receive a new batch of messages + // with which to make the next onEvents call. The pump gains nothing by running faster than onEvents. + + // The underlying client returns null if there are no events, but the contract for IEventProcessor + // is different and is expecting an empty iterable if there are no events (and invoke processor after + // receive timeout is turned on). + + Iterable effectiveEvents = events; + if (effectiveEvents == null) + { + effectiveEvents = new ArrayList(); + } + + EventHubPartitionPump.this.onEvents(effectiveEvents); } @Override diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java index 347f32d2d..cb8800e1a 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/EventProcessorOptions.java @@ -15,6 +15,7 @@ public final class EventProcessorOptions { private Consumer exceptionNotificationHandler = null; private Boolean invokeProcessorAfterReceiveTimeout = false; + private boolean receiverRuntimeMetricEnabled = false; private int maxBatchSize = 10; private int prefetchCount = 300; private Duration receiveTimeOut = Duration.ofMinutes(1); @@ -30,6 +31,7 @@ public final class EventProcessorOptions * PrefetchCount: 300 * InitialOffsetProvider: uses the last offset checkpointed, or START_OF_STREAM * InvokeProcessorAfterReceiveTimeout: false + * ReceiverRuntimeMetricEnabled: false * * * @return an EventProcessorOptions instance with all options set to the default values @@ -146,8 +148,8 @@ public void setInitialOffsetProvider(Function initialOffsetProvi } /*** - * Returns whether the EventProcessorHost will call IEventProcessor.onEvents(null) when a receive - * timeout occurs (true) or not (false). + * Returns whether the EventProcessorHost will call IEventProcessor.onEvents() with an empty iterable + * when a receive timeout occurs (true) or not (false). * * Defaults to false. * @@ -159,8 +161,8 @@ public Boolean getInvokeProcessorAfterReceiveTimeout() } /** - * Changes whether the EventProcessorHost will call IEventProcessor.onEvents(null) when a receive - * timeout occurs (true) or not (false). + * Changes whether the EventProcessorHost will call IEventProcessor.onEvents() with an empty iterable + * when a receive timeout occurs (true) or not (false). * * The default is false (no call). * @@ -170,6 +172,32 @@ public void setInvokeProcessorAfterReceiveTimeout(Boolean invokeProcessorAfterRe { this.invokeProcessorAfterReceiveTimeout = invokeProcessorAfterReceiveTimeout; } + + /** + * Knob to enable/disable runtime metric of the receiver. If this is set to true, + * the first parameter {@link com.microsoft.azure.eventprocessorhost.PartitionContext#runtimeInformation} of + * {@link IEventProcessor#onEvents(com.microsoft.azure.eventprocessorhost.PartitionContext, java.lang.Iterable)} will be populated. + *

+ * Enabling this knob will add 3 additional properties to all raw AMQP messages received. + * @return the {@link boolean} indicating, whether, the runtime metric of the receiver was enabled + */ + public boolean getReceiverRuntimeMetricEnabled() + { + return this.receiverRuntimeMetricEnabled; + } + + /** + * Knob to enable/disable runtime metric of the receiver. If this is set to true, + * the first parameter {@link com.microsoft.azure.eventprocessorhost.PartitionContext#runtimeInformation} of + * {@link IEventProcessor#onEvents(com.microsoft.azure.eventprocessorhost.PartitionContext, java.lang.Iterable)} will be populated. + *

+ * Enabling this knob will add 3 additional properties to all raw AMQP messages received. + * @param value the {@link boolean} to indicate, whether, the runtime metric of the receiver should be enabled + */ + public void setReceiverRuntimeMetricEnabled(boolean value) + { + this.receiverRuntimeMetricEnabled = value; + } void notifyOfException(String hostname, Exception exception, String action) { diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessor.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessor.java index b42ad4301..c561df4bc 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessor.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/IEventProcessor.java @@ -47,10 +47,14 @@ public interface IEventProcessor /** * Called by the processor host when a batch of events has arrived. * - * This is where the real work of the event processor is done. + * This is where the real work of the event processor is done. It is normally called when one + * or more events have arrived. If the EventProcessorHost instance was set up with an EventProcessorOptions + * on which setInvokeProcessorAfterReceiveTimeout(true) has been called, then if a receive times out, + * it will be called with an empty iterable. By default this option is false and receive timeouts do not + * cause a call to this method. * * @param context Information about the partition. - * @param messages The events to be processed. + * @param messages The events to be processed. May be empty. * @throws Exception */ public void onEvents(PartitionContext context, Iterable messages) throws Exception; diff --git a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionContext.java b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionContext.java index e56bf7d68..fe3ccf58b 100644 --- a/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionContext.java +++ b/azure-eventhubs-eph/src/main/java/com/microsoft/azure/eventprocessorhost/PartitionContext.java @@ -12,17 +12,19 @@ import com.microsoft.azure.eventhubs.EventData; import com.microsoft.azure.eventhubs.PartitionReceiver; +import com.microsoft.azure.eventhubs.ReceiverRuntimeInformation; public class PartitionContext { - private final EventProcessorHost host; + private final EventProcessorHost host; private final String partitionId; private final String eventHubPath; private final String consumerGroupName; private Lease lease; private String offset = PartitionReceiver.START_OF_STREAM; - private long sequenceNumber = 0;; + private long sequenceNumber = 0; + private ReceiverRuntimeInformation runtimeInformation; private Object offsetSynchronizer; @@ -32,6 +34,7 @@ public class PartitionContext this.partitionId = partitionId; this.eventHubPath = eventHubPath; this.consumerGroupName = consumerGroupName; + this.runtimeInformation = new ReceiverRuntimeInformation(partitionId); this.offsetSynchronizer = new Object(); } @@ -50,6 +53,16 @@ public String getOwner() { return this.lease.getOwner(); } + + public ReceiverRuntimeInformation getRuntimeInformation() + { + return this.runtimeInformation; + } + + void setRuntimeInformation(ReceiverRuntimeInformation value) + { + this.runtimeInformation = value; + } Lease getLease() { diff --git a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PerTestSettings.java b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PerTestSettings.java index 8ec7619e1..75463ec69 100644 --- a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PerTestSettings.java +++ b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PerTestSettings.java @@ -10,6 +10,8 @@ public class PerTestSettings EventProcessorOptions inOptions; // can be null boolean inDoCheckpoint; boolean inEntityDoesNotExist; // Prevents test code from doing certain checks that would fail on nonexistence before reaching product code. + boolean inTelltaleOnTimeout; // Generates an empty telltale string, which causes PrefabEventProcessor to trigger telltale on timeout. + boolean inHasSenders; PerTestSettings(String testName) { @@ -17,6 +19,8 @@ public class PerTestSettings this.inOptions = EventProcessorOptions.getDefaultOptions(); this.inDoCheckpoint = false; this.inEntityDoesNotExist = false; + this.inTelltaleOnTimeout = false; + this.inHasSenders = true; this.inoutEPHConstructorArgs = new EPHConstructorArgs(); } @@ -52,6 +56,7 @@ class EPHConstructorArgs static final int CHECKPOINT_MANAGER_OVERRIDE = 0x0400; static final int LEASE_MANAGER_OVERRIDE = 0x0800; static final int EXPLICIT_MANAGER = CHECKPOINT_MANAGER_OVERRIDE | LEASE_MANAGER_OVERRIDE; + static final int TELLTALE_ON_TIMEOUT = 0x1000; private int flags; diff --git a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PrefabEventProcessor.java b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PrefabEventProcessor.java index 4795c051a..4b679f714 100644 --- a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PrefabEventProcessor.java +++ b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PrefabEventProcessor.java @@ -16,6 +16,7 @@ public class PrefabEventProcessor implements IEventProcessor private boolean doCheckpoint; private boolean doMarker; private boolean logEveryMessage; + private boolean telltaleOnTimeout; private int eventCount = 0; @@ -26,6 +27,7 @@ public class PrefabEventProcessor implements IEventProcessor this.doCheckpoint = doCheckpoint; this.doMarker = doMarker; this.logEveryMessage = logEveryMessage; + this.telltaleOnTimeout = telltale.isEmpty(); } @Override @@ -43,6 +45,9 @@ public void onEvents(PartitionContext context, Iterable messages) thr { int batchSize = 0; EventData lastEvent = null; + if (messages != null && messages.iterator().hasNext()) + this.factory.setOnEventsContext(context); + for (EventData event : messages) { this.eventCount++; @@ -61,6 +66,19 @@ public void onEvents(PartitionContext context, Iterable messages) thr } lastEvent = event; } + if (batchSize == 0) + { + if (this.telltaleOnTimeout) + { + TestUtilities.log("P" + context.getPartitionId() + " got expected timeout"); + this.factory.setTelltaleFound(context.getPartitionId()); + } + else + { + TestUtilities.log("P" + context.getPartitionId() + " got UNEXPECTED timeout"); + this.factory.putError("P" + context.getPartitionId() + " got UNEXPECTED timeout"); + } + } this.factory.addBatch(batchSize); if (doCheckpoint) { diff --git a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PrefabProcessorFactory.java b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PrefabProcessorFactory.java index 0321faa95..6fa0cb725 100644 --- a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PrefabProcessorFactory.java +++ b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/PrefabProcessorFactory.java @@ -18,6 +18,7 @@ public class PrefabProcessorFactory implements IEventProcessorFactory errors = new ArrayList(); private HashMap foundTelltale = new HashMap(); private int eventsReceivedCount = 0; + private PartitionContext partitionContextOnEvents; PrefabProcessorFactory(String telltale, boolean doCheckpoint, boolean doMarker) { @@ -72,6 +73,16 @@ int getEventsReceivedCount() { return this.eventsReceivedCount; } + + PartitionContext getOnEventsContext() + { + return this.partitionContextOnEvents; + } + + void setOnEventsContext(PartitionContext value) + { + this.partitionContextOnEvents = value; + } @Override public IEventProcessor createEventProcessor(PartitionContext context) throws Exception diff --git a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/RealEventHubUtilities.java b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/RealEventHubUtilities.java index 36fb7ba62..60c65db75 100644 --- a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/RealEventHubUtilities.java +++ b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/RealEventHubUtilities.java @@ -44,8 +44,8 @@ class RealEventHubUtilities private ConnectionStringBuilder hubConnectionString = null; private String hubName = null; private String consumerGroup = EventHubClient.DEFAULT_CONSUMER_GROUP_NAME; - private EventHubClient client; - private ArrayList partitionIds = null; + private EventHubClient client = null; + private ArrayList cachedPartitionIds = null; private HashMap partitionSenders = new HashMap(); static int QUERY_ENTITY_FOR_PARTITIONS = -1; @@ -55,6 +55,16 @@ class RealEventHubUtilities } ArrayList setup(int fakePartitions) throws ServiceBusException, IOException + { + ArrayList partitionIds = setupWithoutSenders(fakePartitions); + + // EventHubClient is source of all senders + this.client = EventHubClient.createFromConnectionStringSync(this.hubConnectionString.toString()); + + return partitionIds; + } + + ArrayList setupWithoutSenders(int fakePartitions) throws ServiceBusException, IOException { // Get the connection string from the environment ehCacheCheck(); @@ -80,10 +90,7 @@ ArrayList setup(int fakePartitions) throws ServiceBusException, IOExcept partitionIds.add(Integer.toString(i)); } } - - // EventHubClient is source of all senders - this.client = EventHubClient.createFromConnectionStringSync(this.hubConnectionString.toString()); - + return partitionIds; } @@ -93,7 +100,10 @@ void shutdown() throws ServiceBusException { sender.closeSync(); } - this.client.closeSync(); + if (this.client != null) + { + this.client.closeSync(); + } } ConnectionStringBuilder getConnectionString() @@ -154,9 +164,9 @@ void sendToPartition(String partitionId, String body) throws IllegalArgumentExce ArrayList getPartitionIdsForTest() throws IllegalEntityException { - if (this.partitionIds == null) + if (this.cachedPartitionIds == null) { - this.partitionIds = new ArrayList(); + this.cachedPartitionIds = new ArrayList(); ehCacheCheck(); try @@ -193,7 +203,7 @@ ArrayList getPartitionIdsForTest() throws IllegalEntityException for (int partitionIndex = 0; partitionIndex < partitionIdsNodes.getLength(); partitionIndex++) { - this.partitionIds.add(partitionIdsNodes.item(partitionIndex).getTextContent()); + this.cachedPartitionIds.add(partitionIdsNodes.item(partitionIndex).getTextContent()); } } catch(XPathExpressionException|ParserConfigurationException|IOException|InvalidKeyException|NoSuchAlgorithmException|URISyntaxException|SAXException exception) @@ -203,7 +213,7 @@ ArrayList getPartitionIdsForTest() throws IllegalEntityException } } - return this.partitionIds; + return this.cachedPartitionIds; } } diff --git a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/SmokeTest.java b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/SmokeTest.java index cb4317d27..60e588e39 100644 --- a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/SmokeTest.java +++ b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/SmokeTest.java @@ -5,14 +5,13 @@ package com.microsoft.azure.eventprocessorhost; -import static org.junit.Assert.*; - import java.time.Instant; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ThreadPoolExecutor; +import org.junit.Assert; import org.junit.Test; import com.microsoft.azure.eventhubs.EventData; @@ -33,6 +32,23 @@ public void SendRecv1MsgTest() throws Exception testFinish(settings, SmokeTest.ANY_NONZERO_COUNT); } + + @Test + public void ReceiverRuntimeMetricsTest() throws Exception + { + PerTestSettings settings = new PerTestSettings("ReceiverRuntimeMetrics"); + settings.inOptions.setReceiverRuntimeMetricEnabled(true); + settings = testSetup(settings); + + settings.outUtils.sendToAny(settings.outTelltale); + waitForTelltale(settings); + + // correctness of runtimeInfo is already tested in javaclient - this is only testing for presence of non-default value + Assert.assertTrue(settings.outProcessorFactory.getOnEventsContext().getRuntimeInformation() != null); + Assert.assertTrue(settings.outProcessorFactory.getOnEventsContext().getRuntimeInformation().getLastSequenceNumber() > 0); + + testFinish(settings, SmokeTest.ANY_NONZERO_COUNT); + } @Test public void receiveFromNowTest() throws Exception @@ -100,6 +116,36 @@ private PerTestSettings receiveFromCheckpointIteration(int iteration, int expect return settings; } + @Test + public void receiveInvokeOnTimeout() throws Exception + { + PerTestSettings settings = new PerTestSettings("receiveInvokeOnTimeout"); + settings.inOptions.setInvokeProcessorAfterReceiveTimeout(true); + settings.inTelltaleOnTimeout = true; + settings.inHasSenders = false; + settings = testSetup(settings); + + waitForTelltale(settings, "0"); + + testFinish(settings, SmokeTest.SKIP_COUNT_CHECK); + } + + @Test + public void receiveNotInvokeOnTimeout() throws Exception + { + PerTestSettings settings = new PerTestSettings("receiveNotInvokeOnTimeout"); + settings = testSetup(settings); + + // Receive timeout is one minute. If event processor is invoked on timeout, it will + // record an error that will fail the case on shutdown. + Thread.sleep(120 * 1000); + + settings.outUtils.sendToAny(settings.outTelltale); + waitForTelltale(settings); + + testFinish(settings, SmokeTest.ANY_NONZERO_COUNT); + } + @Test public void receiveAllPartitionsTest() throws Exception { diff --git a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/TestBase.java b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/TestBase.java index 3cb6c1edf..ddb29afb1 100644 --- a/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/TestBase.java +++ b/azure-eventhubs-eph/src/test/java/com/microsoft/azure/eventprocessorhost/TestBase.java @@ -23,7 +23,14 @@ PerTestSettings testSetup(PerTestSettings settings) throws Exception settings.inoutEPHConstructorArgs.getHostName() : settings.getTestName() + "-1"; settings.outUtils = new RealEventHubUtilities(); - settings.outPartitionIds = settings.outUtils.setup(settings.inEntityDoesNotExist ? 8 : RealEventHubUtilities.QUERY_ENTITY_FOR_PARTITIONS); + if (settings.inHasSenders) + { + settings.outPartitionIds = settings.outUtils.setup(settings.inEntityDoesNotExist ? 8 : RealEventHubUtilities.QUERY_ENTITY_FOR_PARTITIONS); + } + else + { + settings.outPartitionIds = settings.outUtils.setupWithoutSenders(settings.inEntityDoesNotExist ? 8 : RealEventHubUtilities.QUERY_ENTITY_FOR_PARTITIONS); + } ConnectionStringBuilder environmentCSB = settings.outUtils.getConnectionString(); String effectiveEntityPath = settings.inoutEPHConstructorArgs.isFlagSet(PerTestSettings.EPHConstructorArgs.EH_PATH_OVERRIDE) ? @@ -51,7 +58,14 @@ PerTestSettings testSetup(PerTestSettings settings) throws Exception ExecutorService effectiveExecutor = settings.inoutEPHConstructorArgs.isFlagSet(PerTestSettings.EPHConstructorArgs.EXECUTOR_OVERRIDE) ? settings.inoutEPHConstructorArgs.getExecutor() : null; - settings.outTelltale = settings.getTestName() + "-telltale-" + EventProcessorHost.safeCreateUUID(); + if (settings.inTelltaleOnTimeout) + { + settings.outTelltale = ""; + } + else + { + settings.outTelltale = settings.getTestName() + "-telltale-" + EventProcessorHost.safeCreateUUID(); + } settings.outGeneralErrorHandler = new PrefabGeneralErrorHandler(); settings.outProcessorFactory = new PrefabProcessorFactory(settings.outTelltale, settings.inDoCheckpoint, true, true); @@ -121,6 +135,7 @@ void waitForTelltale(PerTestSettings settings, String partitionId) throws Interr } + final static int SKIP_COUNT_CHECK = -3; // expectedMessages could be anything, don't check it at all final static int NO_CHECKS = -2; // do no checks at all, used for tests which are expected fail in startup final static int ANY_NONZERO_COUNT = -1; // if expectedMessages is -1, just check for > 0 void testFinish(PerTestSettings settings, int expectedMessages) throws InterruptedException, ExecutionException, ServiceBusException @@ -137,7 +152,7 @@ void testFinish(PerTestSettings settings, int expectedMessages) throws Interrupt { assertTrue("no messages received", settings.outProcessorFactory.getEventsReceivedCount() > 0); } - else + else if (expectedMessages != SKIP_COUNT_CHECK) { assertEquals("wrong number of messages received", expectedMessages, settings.outProcessorFactory.getEventsReceivedCount()); } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java index c0382d7a5..b74715878 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventData.java @@ -32,6 +32,7 @@ public class EventData implements Serializable { private static final long serialVersionUID = -5631628195600014255L; + private static final int BODY_DATA_NULL = -1; transient private Binary bodyData; @@ -322,8 +323,9 @@ private void writeObject(ObjectOutputStream out) throws IOException { out.defaultWriteObject(); - out.writeInt(this.bodyData.getLength()); - out.write(this.bodyData.getArray(), this.bodyData.getArrayOffset(), this.bodyData.getLength()); + out.writeInt(this.bodyData == null ? BODY_DATA_NULL : this.bodyData.getLength()); + if (this.bodyData != null) + out.write(this.bodyData.getArray(), this.bodyData.getArrayOffset(), this.bodyData.getLength()); } private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException @@ -331,9 +333,12 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE in.defaultReadObject(); final int length = in.readInt(); - final byte[] data = new byte[length]; - in.read(data, 0, length); - this.bodyData = new Binary(data, 0, length); + if (length != BODY_DATA_NULL) { + + final byte[] data = new byte[length]; + in.readFully(data, 0, length); + this.bodyData = new Binary(data, 0, length); + } } public static class SystemProperties extends HashMap diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventDataUtil.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventDataUtil.java index e99665b7d..c2209fcf9 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventDataUtil.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventDataUtil.java @@ -14,12 +14,13 @@ import org.apache.qpid.proton.message.Message; import com.microsoft.azure.servicebus.amqp.AmqpConstants; +import com.microsoft.azure.servicebus.PassByRef; /* * Internal utility class for EventData */ -final class EventDataUtil -{ +final class EventDataUtil { + @SuppressWarnings("serial") static final Set RESERVED_SYSTEM_PROPERTIES = Collections.unmodifiableSet(new HashSet() {{ @@ -32,31 +33,30 @@ final class EventDataUtil private EventDataUtil(){} - static LinkedList toEventDataCollection(final Collection messages) - { - if (messages == null) - { + static LinkedList toEventDataCollection(final Collection messages, final PassByRef lastMessageRef) { + + if (messages == null) { return null; } - // TODO: no-copy solution - LinkedList events = new LinkedList(); - for(Message message : messages) - { + LinkedList events = new LinkedList<>(); + for (Message message : messages) { + events.add(new EventData(message)); - } + + if (lastMessageRef != null) + lastMessageRef.set(message); + } return events; } - static Iterable toAmqpMessages(final Iterable eventDatas, final String partitionKey) - { - final LinkedList messages = new LinkedList(); - eventDatas.forEach(new Consumer() - { + static Iterable toAmqpMessages(final Iterable eventDatas, final String partitionKey) { + + final LinkedList messages = new LinkedList<>(); + eventDatas.forEach(new Consumer() { @Override - public void accept(EventData eventData) - { + public void accept(EventData eventData) { Message amqpMessage = partitionKey == null ? eventData.toAmqpMessage() : eventData.toAmqpMessage(partitionKey); messages.add(amqpMessage); } @@ -65,8 +65,8 @@ public void accept(EventData eventData) return messages; } - static Iterable toAmqpMessages(final Iterable eventDatas) - { + static Iterable toAmqpMessages(final Iterable eventDatas) { + return EventDataUtil.toAmqpMessages(eventDatas, null); } } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java index caac4202d..45780d130 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/EventHubClient.java @@ -626,7 +626,7 @@ public final PartitionReceiver createReceiverSync(final String consumerGroupName * @param consumerGroupName the consumer group name that this receiver should be grouped under. * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. * @param startingOffset the offset to start receiving the events from. To receive from start of the stream use: {@link PartitionReceiver#START_OF_STREAM} - * @return a CompletableFuture that would result in a PartitionReceiver isntance when it is completed. + * @return a CompletableFuture that would result in a PartitionReceiver instance when it is completed. * @throws ServiceBusException if Service Bus service encountered problems during the operation. * @see PartitionReceiver */ @@ -694,7 +694,7 @@ public final PartitionReceiver createReceiverSync(final String consumerGroupName public final CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final String startingOffset, boolean offsetInclusive) throws ServiceBusException { - return PartitionReceiver.create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, null, PartitionReceiver.NULL_EPOCH, false); + return this.createReceiver(consumerGroupName, partitionId, startingOffset, offsetInclusive, null); } /** @@ -753,9 +753,198 @@ public final PartitionReceiver createReceiverSync(final String consumerGroupName public final CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final Instant dateTime) throws ServiceBusException { - return PartitionReceiver.create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, null, false, dateTime, PartitionReceiver.NULL_EPOCH, false); + return this.createReceiver(consumerGroupName, partitionId, dateTime, null); + } + + /** + * Synchronous version of {@link #createReceiver(String, String, String)}. + * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param startingOffset the offset to start receiving the events from. To receive from start of the stream use: {@link PartitionReceiver#START_OF_STREAM} + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return PartitionReceiver instance which can be used for receiving {@link EventData}. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + */ + public final PartitionReceiver createReceiverSync(final String consumerGroupName, final String partitionId, final String startingOffset, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + try + { + return this.createReceiver(consumerGroupName, partitionId, startingOffset, receiverOptions).get(); + } + catch (InterruptedException|ExecutionException exception) + { + if (exception instanceof InterruptedException) + { + // Re-assert the thread's interrupted status + Thread.currentThread().interrupt(); + } + + Throwable throwable = exception.getCause(); + if (throwable != null) + { + if (throwable instanceof RuntimeException) + { + throw (RuntimeException)throwable; + } + + if (throwable instanceof ServiceBusException) + { + throw (ServiceBusException)throwable; + } + + throw new ServiceBusException(true, throwable); + } + } + + return null; + } + + /** + * The receiver is created for a specific EventHub partition from the specific consumer group. + * + *

NOTE: There can be a maximum number of receivers that can run in parallel per ConsumerGroup per Partition. + * The limit is enforced by the Event Hub service - current limit is 5 receivers in parallel. Having multiple receivers + * reading from offsets that are far apart on the same consumer group / partition combo will have significant performance Impact. + * + * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param startingOffset the offset to start receiving the events from. To receive from start of the stream use: {@link PartitionReceiver#START_OF_STREAM} + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return a CompletableFuture that would result in a PartitionReceiver instance when it is completed. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + * @see PartitionReceiver + */ + public final CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final String startingOffset, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + return this.createReceiver(consumerGroupName, partitionId, startingOffset, false, receiverOptions); + } + + /** + * Synchronous version of {@link #createReceiver(String, String, String, boolean)}. + * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param startingOffset the offset to start receiving the events from. To receive from start of the stream use: {@link PartitionReceiver#START_OF_STREAM} + * @param offsetInclusive if set to true, the startingOffset is treated as an inclusive offset - meaning the first event returned is the one that has the starting offset. Normally first event returned is the event after the starting offset. + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return PartitionReceiver instance which can be used for receiving {@link EventData}. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + */ + public final PartitionReceiver createReceiverSync(final String consumerGroupName, final String partitionId, final String startingOffset, boolean offsetInclusive, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + try + { + return this.createReceiver(consumerGroupName, partitionId, startingOffset, offsetInclusive, receiverOptions).get(); + } + catch (InterruptedException|ExecutionException exception) + { + if (exception instanceof InterruptedException) + { + // Re-assert the thread's interrupted status + Thread.currentThread().interrupt(); + } + + Throwable throwable = exception.getCause(); + if (throwable != null) + { + if (throwable instanceof RuntimeException) + { + throw (RuntimeException)throwable; + } + + if (throwable instanceof ServiceBusException) + { + throw (ServiceBusException)throwable; + } + + throw new ServiceBusException(true, throwable); + } + } + + return null; + } + + /** + * Create the EventHub receiver with given partition id and start receiving from the specified starting offset. + * The receiver is created for a specific EventHub Partition from the specific consumer group. + * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param startingOffset the offset to start receiving the events from. To receive from start of the stream use: {@link PartitionReceiver#START_OF_STREAM} + * @param offsetInclusive if set to true, the startingOffset is treated as an inclusive offset - meaning the first event returned is the one that has the starting offset. Normally first event returned is the event after the starting offset. + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return a CompletableFuture that would result in a PartitionReceiver instance when it is completed. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + * @see PartitionReceiver + */ + public final CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final String startingOffset, boolean offsetInclusive, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + return PartitionReceiver.create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, null, PartitionReceiver.NULL_EPOCH, false, receiverOptions); + } + + /** + * Synchronous version of {@link #createReceiver(String, String, Instant)}. + * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param dateTime the date time instant that receive operations will start receive events from. Events received will have {@link EventData.SystemProperties#getEnqueuedTime()} later than this Instant. + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return PartitionReceiver instance which can be used for receiving {@link EventData}. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + */ + public final PartitionReceiver createReceiverSync(final String consumerGroupName, final String partitionId, final Instant dateTime, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + try + { + return this.createReceiver(consumerGroupName, partitionId, dateTime, receiverOptions).get(); + } + catch (InterruptedException|ExecutionException exception) + { + if (exception instanceof InterruptedException) + { + // Re-assert the thread's interrupted status + Thread.currentThread().interrupt(); + } + + Throwable throwable = exception.getCause(); + if (throwable != null) + { + if (throwable instanceof RuntimeException) + { + throw (RuntimeException)throwable; + } + + if (throwable instanceof ServiceBusException) + { + throw (ServiceBusException)throwable; + } + + throw new ServiceBusException(true, throwable); + } + } + + return null; } + /** + * Create the EventHub receiver with given partition id and start receiving from the specified starting offset. + * The receiver is created for a specific EventHub Partition from the specific consumer group. + * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param dateTime the date time instant that receive operations will start receive events from. Events received will have {@link EventData.SystemProperties#getEnqueuedTime()} later than this Instant. + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return a CompletableFuture that would result in a PartitionReceiver when it is completed. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + * @see PartitionReceiver + */ + public final CompletableFuture createReceiver(final String consumerGroupName, final String partitionId, final Instant dateTime, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + return PartitionReceiver.create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, null, false, dateTime, PartitionReceiver.NULL_EPOCH, false, receiverOptions); + } + /** * Synchronous version of {@link #createEpochReceiver(String, String, String, long)}. * @param consumerGroupName the consumer group name that this receiver should be grouped under. @@ -893,7 +1082,7 @@ public final PartitionReceiver createEpochReceiverSync(final String consumerGrou public final CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final String startingOffset, boolean offsetInclusive, final long epoch) throws ServiceBusException { - return PartitionReceiver.create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, null, epoch, true); + return this.createEpochReceiver(consumerGroupName, partitionId, startingOffset, offsetInclusive, epoch, null); } /** @@ -962,7 +1151,222 @@ public final PartitionReceiver createEpochReceiverSync(final String consumerGrou public final CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final Instant dateTime, final long epoch) throws ServiceBusException { - return PartitionReceiver.create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, null, false, dateTime, epoch, true); + return this.createEpochReceiver(consumerGroupName, partitionId, dateTime, epoch, null); + } + + /** + * Synchronous version of {@link #createEpochReceiver(String, String, String, long)}. + * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param startingOffset the offset to start receiving the events from. To receive from start of the stream use: {@link PartitionReceiver#START_OF_STREAM} + * @param epoch an unique identifier (epoch value) that the service uses, to enforce partition/lease ownership. + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return PartitionReceiver instance which can be used for receiving {@link EventData}. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + */ + public final PartitionReceiver createEpochReceiverSync(final String consumerGroupName, final String partitionId, final String startingOffset, final long epoch, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + try + { + return this.createEpochReceiver(consumerGroupName, partitionId, startingOffset, epoch, receiverOptions).get(); + } + catch (InterruptedException|ExecutionException exception) + { + if (exception instanceof InterruptedException) + { + // Re-assert the thread's interrupted status + Thread.currentThread().interrupt(); + } + + Throwable throwable = exception.getCause(); + if (throwable != null) + { + if (throwable instanceof RuntimeException) + { + throw (RuntimeException)throwable; + } + + if (throwable instanceof ServiceBusException) + { + throw (ServiceBusException)throwable; + } + + throw new ServiceBusException(true, throwable); + } + } + + return null; + } + + /** + * Create a Epoch based EventHub receiver with given partition id and start receiving from the beginning of the partition stream. + * The receiver is created for a specific EventHub Partition from the specific consumer group. + *

+ * It is important to pay attention to the following when creating epoch based receiver: + *

    + *
  • Ownership enforcement - Once you created an epoch based receiver, you cannot create a non-epoch receiver to the same consumerGroup-Partition combo until all receivers to the combo are closed. + *
  • Ownership stealing - If a receiver with higher epoch value is created for a consumerGroup-Partition combo, any older epoch receiver to that combo will be force closed. + *
  • Any receiver closed due to lost of ownership to a consumerGroup-Partition combo will get ReceiverDisconnectedException for all operations from that receiver. + *
+ * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param startingOffset the offset to start receiving the events from. To receive from start of the stream use: {@link PartitionReceiver#START_OF_STREAM} + * @param epoch an unique identifier (epoch value) that the service uses, to enforce partition/lease ownership. + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return a CompletableFuture that would result in a PartitionReceiver when it is completed. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + * @see PartitionReceiver + * @see ReceiverDisconnectedException + */ + public final CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final String startingOffset, final long epoch, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + return this.createEpochReceiver(consumerGroupName, partitionId, startingOffset, false, epoch, receiverOptions); + } + + /** + * Synchronous version of {@link #createEpochReceiver(String, String, String, boolean, long)}. + * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param startingOffset the offset to start receiving the events from. To receive from start of the stream use: {@link PartitionReceiver#START_OF_STREAM} + * @param offsetInclusive if set to true, the startingOffset is treated as an inclusive offset - meaning the first event returned is the one that has the starting offset. Normally first event returned is the event after the starting offset. + * @param epoch an unique identifier (epoch value) that the service uses, to enforce partition/lease ownership. + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return PartitionReceiver instance which can be used for receiving {@link EventData}. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + */ + public final PartitionReceiver createEpochReceiverSync(final String consumerGroupName, final String partitionId, final String startingOffset, boolean offsetInclusive, final long epoch, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + try + { + return this.createEpochReceiver(consumerGroupName, partitionId, startingOffset, offsetInclusive, epoch, receiverOptions).get(); + } + catch (InterruptedException|ExecutionException exception) + { + if (exception instanceof InterruptedException) + { + // Re-assert the thread's interrupted status + Thread.currentThread().interrupt(); + } + + Throwable throwable = exception.getCause(); + if (throwable != null) + { + if (throwable instanceof RuntimeException) + { + throw (RuntimeException)throwable; + } + + if (throwable instanceof ServiceBusException) + { + throw (ServiceBusException)throwable; + } + + throw new ServiceBusException(true, throwable); + } + } + + return null; + } + + /** + * Create a Epoch based EventHub receiver with given partition id and start receiving from the beginning of the partition stream. + * The receiver is created for a specific EventHub Partition from the specific consumer group. + *

+ * It is important to pay attention to the following when creating epoch based receiver: + *

    + *
  • Ownership enforcement - Once you created an epoch based receiver, you cannot create a non-epoch receiver to the same consumerGroup-Partition combo until all receivers to the combo are closed. + *
  • Ownership stealing - If a receiver with higher epoch value is created for a consumerGroup-Partition combo, any older epoch receiver to that combo will be force closed. + *
  • Any receiver closed due to lost of ownership to a consumerGroup-Partition combo will get ReceiverDisconnectedException for all operations from that receiver. + *
+ * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param startingOffset the offset to start receiving the events from. To receive from start of the stream use: {@link PartitionReceiver#START_OF_STREAM} + * @param offsetInclusive if set to true, the startingOffset is treated as an inclusive offset - meaning the first event returned is the one that has the starting offset. Normally first event returned is the event after the starting offset. + * @param epoch an unique identifier (epoch value) that the service uses, to enforce partition/lease ownership. + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return a CompletableFuture that would result in a PartitionReceiver when it is completed. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + * @see PartitionReceiver + * @see ReceiverDisconnectedException + */ + public final CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final String startingOffset, boolean offsetInclusive, final long epoch, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + return PartitionReceiver.create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, null, epoch, true, receiverOptions); + } + + /** + * Synchronous version of {@link #createEpochReceiver(String, String, Instant, long)}. + * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param dateTime the date time instant that receive operations will start receive events from. Events received will have {@link EventData.SystemProperties#getEnqueuedTime()} later than this Instant. + * @param epoch an unique identifier (epoch value) that the service uses, to enforce partition/lease ownership. + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return PartitionReceiver instance which can be used for receiving {@link EventData}. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + */ + public final PartitionReceiver createEpochReceiverSync(final String consumerGroupName, final String partitionId, final Instant dateTime, final long epoch, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + try + { + return this.createEpochReceiver(consumerGroupName, partitionId, dateTime, epoch, receiverOptions).get(); + } + catch (InterruptedException|ExecutionException exception) + { + if (exception instanceof InterruptedException) + { + // Re-assert the thread's interrupted status + Thread.currentThread().interrupt(); + } + + Throwable throwable = exception.getCause(); + if (throwable != null) + { + if (throwable instanceof RuntimeException) + { + throw (RuntimeException)throwable; + } + + if (throwable instanceof ServiceBusException) + { + throw (ServiceBusException)throwable; + } + + throw new ServiceBusException(true, throwable); + } + } + + return null; + } + + /** + * Create a Epoch based EventHub receiver with given partition id and start receiving from the beginning of the partition stream. + * The receiver is created for a specific EventHub Partition from the specific consumer group. + *

+ * It is important to pay attention to the following when creating epoch based receiver: + *

    + *
  • Ownership enforcement - Once you created an epoch based receiver, you cannot create a non-epoch receiver to the same consumerGroup-Partition combo until all receivers to the combo are closed. + *
  • Ownership stealing - If a receiver with higher epoch value is created for a consumerGroup-Partition combo, any older epoch receiver to that combo will be force closed. + *
  • Any receiver closed due to lost of ownership to a consumerGroup-Partition combo will get ReceiverDisconnectedException for all operations from that receiver. + *
+ * @param consumerGroupName the consumer group name that this receiver should be grouped under. + * @param partitionId the partition Id that the receiver belongs to. All data received will be from this partition only. + * @param dateTime the date time instant that receive operations will start receive events from. Events received will have {@link EventData.SystemProperties#getEnqueuedTime()} later than this Instant. + * @param epoch a unique identifier (epoch value) that the service uses, to enforce partition/lease ownership. + * @param receiverOptions the set of options to enable on the event hubs receiver + * @return a CompletableFuture that would result in a PartitionReceiver when it is completed. + * @throws ServiceBusException if Service Bus service encountered problems during the operation. + * @see PartitionReceiver + * @see ReceiverDisconnectedException + */ + public final CompletableFuture createEpochReceiver(final String consumerGroupName, final String partitionId, final Instant dateTime, final long epoch, final ReceiverOptions receiverOptions) + throws ServiceBusException + { + return PartitionReceiver.create(this.underlyingFactory, this.eventHubName, consumerGroupName, partitionId, null, false, dateTime, epoch, true, receiverOptions); } @Override diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java index 6053e1ef5..39ed6a36c 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/PartitionReceiver.java @@ -8,6 +8,7 @@ import java.time.Instant; import java.util.Collection; import java.util.Collections; +import java.util.Date; import java.util.Locale; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -17,18 +18,20 @@ import java.util.logging.Level; import java.util.logging.Logger; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnknownDescribedType; import org.apache.qpid.proton.message.Message; +import com.microsoft.azure.servicebus.amqp.AmqpConstants; import com.microsoft.azure.servicebus.ClientConstants; import com.microsoft.azure.servicebus.ClientEntity; import com.microsoft.azure.servicebus.IReceiverSettingsProvider; import com.microsoft.azure.servicebus.MessageReceiver; import com.microsoft.azure.servicebus.MessagingFactory; +import com.microsoft.azure.servicebus.PassByRef; import com.microsoft.azure.servicebus.ServiceBusException; import com.microsoft.azure.servicebus.StringUtil; -import com.microsoft.azure.servicebus.amqp.AmqpConstants; /** * This is a logical representation of receiving from a EventHub partition. @@ -69,6 +72,8 @@ public final class PartitionReceiver extends ClientEntity implements IReceiverSe private Long epoch; private boolean isEpochReceiver; private ReceivePump receivePump; + private ReceiverOptions receiverOptions; + private ReceiverRuntimeInformation runtimeInformation; private PartitionReceiver(MessagingFactory factory, final String eventHubName, @@ -78,7 +83,8 @@ private PartitionReceiver(MessagingFactory factory, final boolean offsetInclusive, final Instant dateTime, final Long epoch, - final boolean isEpochReceiver) + final boolean isEpochReceiver, + final ReceiverOptions receiverOptions) throws ServiceBusException { super(null, null); @@ -93,9 +99,13 @@ private PartitionReceiver(MessagingFactory factory, this.epoch = epoch; this.isEpochReceiver = isEpochReceiver; this.receiveHandlerLock = new Object(); + this.receiverOptions = receiverOptions; + + if (this.receiverOptions != null && this.receiverOptions.getReceiverRuntimeMetricEnabled()) + this.runtimeInformation = new ReceiverRuntimeInformation(partitionId); } - static CompletableFuture create(MessagingFactory factory, + static CompletableFuture create(MessagingFactory factory, final String eventHubName, final String consumerGroupName, final String partitionId, @@ -103,7 +113,8 @@ static CompletableFuture create(MessagingFactory factory, final boolean offsetInclusive, final Instant dateTime, final long epoch, - final boolean isEpochReceiver) + final boolean isEpochReceiver, + final ReceiverOptions receiverOptions) throws ServiceBusException { if (epoch < NULL_EPOCH) @@ -116,7 +127,7 @@ static CompletableFuture create(MessagingFactory factory, throw new IllegalArgumentException("specify valid string for argument - 'consumerGroupName'"); } - final PartitionReceiver receiver = new PartitionReceiver(factory, eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, dateTime, epoch, isEpochReceiver); + final PartitionReceiver receiver = new PartitionReceiver(factory, eventHubName, consumerGroupName, partitionId, startingOffset, offsetInclusive, dateTime, epoch, isEpochReceiver, receiverOptions); return receiver.createInternalReceiver().thenApplyAsync(new Function() { public PartitionReceiver apply(Void a) @@ -207,6 +218,17 @@ public final long getEpoch() { return this.epoch; } + + /** + * Gets the temporal {@link ReceiverRuntimeInformation} for this EventHub partition. + * In general, this information is a representation of, where this {@link PartitionReceiver}'s end of stream is, + * at the time {@link ReceiverRuntimeInformation#getRetrievalTime()}. + * @return receiver runtime information + */ + public final ReceiverRuntimeInformation getRuntimeInformation() { + + return this.runtimeInformation; + } /** * Synchronous version of {@link #receive}. @@ -288,7 +310,26 @@ public CompletableFuture> receive(final int maxEventCount) @Override public Iterable apply(Collection amqpMessages) { - return EventDataUtil.toEventDataCollection(amqpMessages); + PassByRef lastMessageRef = null; + if (PartitionReceiver.this.receiverOptions != null && PartitionReceiver.this.receiverOptions.getReceiverRuntimeMetricEnabled()) + lastMessageRef = new PassByRef<>(); + + Iterable events = EventDataUtil.toEventDataCollection(amqpMessages, lastMessageRef); + + if (lastMessageRef != null && lastMessageRef.get() != null) { + + DeliveryAnnotations deliveryAnnotations = lastMessageRef.get().getDeliveryAnnotations(); + if (deliveryAnnotations != null && deliveryAnnotations.getValue() != null) { + + Map deliveryAnnotationsMap = deliveryAnnotations.getValue(); + PartitionReceiver.this.runtimeInformation.setRuntimeInformation( + (long) deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_SEQUENCE_NUMBER), + ((Date) deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_TIME_UTC)).toInstant(), + (String) deliveryAnnotationsMap.get(ClientConstants.LAST_ENQUEUED_OFFSET)); + } + } + + return events; } }); } @@ -443,8 +484,16 @@ public Map getFilter(final Message lastReceivedMes } @Override - public Map getProperties() - { - return this.isEpochReceiver ? Collections.singletonMap(AmqpConstants.EPOCH, (Object) this.epoch) : null; + public Map getProperties() { + + return this.isEpochReceiver ? Collections.singletonMap(AmqpConstants.EPOCH, (Object) this.epoch) : null; } + + @Override + public Symbol[] getDesiredCapabilities() { + + return this.receiverOptions != null && this.receiverOptions.getReceiverRuntimeMetricEnabled() + ? new Symbol[] { AmqpConstants.ENABLE_RECEIVER_RUNTIME_METRIC_NAME } + : null; + } } \ No newline at end of file diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java new file mode 100644 index 000000000..c9e14dfb9 --- /dev/null +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverOptions.java @@ -0,0 +1,37 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs; + +/** + * Represents various optional behaviors which can be turned on or off during the creation of a {@link PartitionReceiver}. + */ +public final class ReceiverOptions { + + private boolean receiverRuntimeMetricEnabled; + + /** + * Knob to enable/disable runtime metric of the receiver. If this is set to true and is passed to {@link EventHubClient#createReceiver}, + * after the first {@link PartitionReceiver#receive(int)} call, {@link PartitionReceiver#getRuntimeInformation()} is populated. + *

+ * Enabling this knob will add 3 additional properties to all {@link EventData}'s received on the {@link EventHubClient#createReceiver}. + * @return the {@link boolean} indicating, whether, the runtime metric of the receiver was enabled + */ + public boolean getReceiverRuntimeMetricEnabled() { + + return this.receiverRuntimeMetricEnabled; + } + + /** + * Knob to enable/disable runtime metric of the receiver. If this is set to true and is passed to {@link EventHubClient#createReceiver}, + * after the first {@link PartitionReceiver#receive(int)} call, {@link PartitionReceiver#getRuntimeInformation()} is populated. + *

+ * Enabling this knob will add 3 additional properties to all {@link EventData}'s received on the {@link EventHubClient#createReceiver}. + * @param value the {@link boolean} to indicate, whether, the runtime metric of the receiver should be enabled + */ + public void setReceiverRuntimeMetricEnabled(boolean value) { + + this.receiverRuntimeMetricEnabled = value; + } +} diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverRuntimeInformation.java b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverRuntimeInformation.java new file mode 100644 index 000000000..eb3e63193 --- /dev/null +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/eventhubs/ReceiverRuntimeInformation.java @@ -0,0 +1,80 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs; + +import java.time.Instant; + +/** + * Represents the temporal receiver runtime information for a {@link PartitionReceiver}. + * Current received {@link EventData} and {@link ReceiverRuntimeInformation} can be used to find approximate value of pending events (which are not processed yet). + */ +public final class ReceiverRuntimeInformation { + + private final String partitionId; + + private long lastSequenceNumber; + private Instant lastEnqueuedTime; + private String lastEnqueuedOffset; + private Instant retrievalTime; + + public ReceiverRuntimeInformation(final String partitionId) { + + this.partitionId = partitionId; + } + + /** + * Get PartitionId of the {@link PartitionReceiver} for which the {@link ReceiverRuntimeInformation} is returned. + * @return Partition Identifier + */ + public String getPartitionId() { + + return this.partitionId; + } + + /** + * Get sequence number of the {@link EventData}, that is written at the end of the Partition Stream. + * @return last sequence number + */ + public long getLastSequenceNumber() { + + return this.lastSequenceNumber; + } + + /** + * Get enqueued time of the {@link EventData}, that is written at the end of the Partition Stream. + * @return last enqueued time + */ + public Instant getLastEnqueuedTime() { + + return this.lastEnqueuedTime; + } + + /** + * Get offset of the {@link EventData}, that is written at the end of the Partition Stream. + * @return last enqueued offset + */ + public String getLastEnqueuedOffset() { + + return this.lastEnqueuedOffset; + } + + /** + * Get the timestamp at which this {@link ReceiverRuntimeInformation} was constructed. + * @return retrieval time + */ + public Instant getRetrievalTime() { + + return this.retrievalTime; + } + + void setRuntimeInformation(final long sequenceNumber, final Instant enqueuedTime, final String offset) { + + this.lastSequenceNumber = sequenceNumber; + this.lastEnqueuedTime = enqueuedTime; + this.lastEnqueuedOffset = offset; + + this.retrievalTime = Instant.now(); + } +} diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java index 83cc67081..357ce287c 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ClientConstants.java @@ -52,7 +52,7 @@ private ClientConstants() { } public final static String DEFAULT_RETRY = "Default"; public final static String PRODUCT_NAME = "MSJavaClient"; - public final static String CURRENT_JAVACLIENT_VERSION = "0.11.0"; + public final static String CURRENT_JAVACLIENT_VERSION = "0.12.0"; public static final String PLATFORM_INFO = getPlatformInfo(); @@ -85,6 +85,10 @@ private ClientConstants() { } public static final String MANAGEMENT_STATUS_DESCRIPTION_KEY = "status-description"; public static final String MANAGEMENT_RESPONSE_ERROR_CONDITION = "error-condition"; + public static final Symbol LAST_ENQUEUED_SEQUENCE_NUMBER = Symbol.valueOf(MANAGEMENT_RESULT_LAST_ENQUEUED_SEQUENCE_NUMBER); + public static final Symbol LAST_ENQUEUED_OFFSET = Symbol.valueOf(MANAGEMENT_RESULT_LAST_ENQUEUED_OFFSET); + public static final Symbol LAST_ENQUEUED_TIME_UTC = Symbol.valueOf(MANAGEMENT_RESULT_LAST_ENQUEUED_TIME_UTC); + public static final String AMQP_PUT_TOKEN_FAILED_ERROR = "Put token failed. status-code: %s, status-description: %s"; public static final String TOKEN_AUDIENCE_FORMAT = "amqp://%s/%s"; diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ExceptionUtil.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ExceptionUtil.java index 565a451bf..8b3891ce2 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ExceptionUtil.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/ExceptionUtil.java @@ -84,11 +84,11 @@ else if (errorCondition.getCondition() == ClientConstants.STORE_LOCK_LOST_ERROR) } else if (errorCondition.getCondition() == AmqpErrorCode.AmqpLinkDetachForced) { - return new ServiceBusException(false, new AmqpException(errorCondition)); + return new ServiceBusException(true, new AmqpException(errorCondition)); } else if (errorCondition.getCondition() == AmqpErrorCode.ResourceLimitExceeded) { - return new ServiceBusException(false, new AmqpException(errorCondition)); + return new QuotaExceededException(new AmqpException(errorCondition)); } return new ServiceBusException(ClientConstants.DEFAULT_IS_TRANSIENT, errorCondition.getDescription()); diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/IReceiverSettingsProvider.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/IReceiverSettingsProvider.java index 5b4e232d6..421e831bb 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/IReceiverSettingsProvider.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/IReceiverSettingsProvider.java @@ -15,4 +15,6 @@ public interface IReceiverSettingsProvider public Map getFilter(final Message lastReceivedMessage); public Map getProperties(); + + public Symbol[] getDesiredCapabilities(); } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java index 4598aca02..3a479cf69 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/MessageReceiver.java @@ -520,11 +520,15 @@ public void accept(Session session) // use explicit settlement via dispositions (not pre-settled) receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); receiver.setReceiverSettleMode(ReceiverSettleMode.SECOND); - + final Map linkProperties = MessageReceiver.this.settingsProvider.getProperties(); if (linkProperties != null) receiver.setProperties(linkProperties); + final Symbol[] desiredCapabilities = MessageReceiver.this.settingsProvider.getDesiredCapabilities(); + if (desiredCapabilities != null) + receiver.setDesiredCapabilities(desiredCapabilities); + final ReceiveLinkHandler handler = new ReceiveLinkHandler(MessageReceiver.this); BaseHandler.setHandler(receiver, handler); MessageReceiver.this.underlyingFactory.registerForConnectionError(receiver); diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/PassByRef.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/PassByRef.java new file mode 100644 index 000000000..ef39d1f68 --- /dev/null +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/PassByRef.java @@ -0,0 +1,18 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.servicebus; + +public final class PassByRef { + + T t; + + public T get() { + return this.t; + } + + public void set(final T t) { + this.t = t; + } +} diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/QuotaExceededException.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/QuotaExceededException.java index aafa5db80..d1f648707 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/QuotaExceededException.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/QuotaExceededException.java @@ -10,4 +10,8 @@ public QuotaExceededException(String message) { super(false, message); } + public QuotaExceededException(Throwable cause) { + super(false, cause); + } + } diff --git a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java index 2d65299e3..87b49f3e7 100644 --- a/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java +++ b/azure-eventhubs/src/main/java/com/microsoft/azure/servicebus/amqp/AmqpConstants.java @@ -10,13 +10,12 @@ import org.apache.qpid.proton.amqp.Symbol; -public final class AmqpConstants -{ +public final class AmqpConstants { + private AmqpConstants() { } @SuppressWarnings("serial") - public static final Set RESERVED_PROPERTY_NAMES = Collections.unmodifiableSet(new HashSet() - {{ + public static final Set RESERVED_PROPERTY_NAMES = Collections.unmodifiableSet(new HashSet() {{ add(AMQP_PROPERTY_MESSAGE_ID); add(AMQP_PROPERTY_USER_ID); add(AMQP_PROPERTY_TO); @@ -71,4 +70,6 @@ private AmqpConstants() { } public static final String AMQP_PROPERTY_GROUP_ID = "group-id"; public static final String AMQP_PROPERTY_GROUP_SEQUENCE = "group-sequence"; public static final String AMQP_PROPERTY_REPLY_TO_GROUP_ID = "reply-to-group-id"; + + public static final Symbol ENABLE_RECEIVER_RUNTIME_METRIC_NAME = Symbol.valueOf(VENDOR + ":enable-receiver-runtime-metric"); } diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/eventdata/EventDataTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/eventdata/EventDataTest.java index 189612a7f..52d579bf9 100644 --- a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/eventdata/EventDataTest.java +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/eventdata/EventDataTest.java @@ -18,8 +18,6 @@ public class EventDataTest { - final String payload = "testmessage1"; // even number of chars - @Test (expected = IllegalArgumentException.class) public void eventDataByteArrayNotNull() { @@ -39,11 +37,19 @@ public void eventDataByteArrayNotNullConstructor2() { new EventData(null, 0, 0); } + + @Test + public void eventDataEmptyByteArray() throws IOException, ClassNotFoundException + { + byte[] byteArray = new byte[0]; + EventData deSerializedEvent = serializeAndDeserialize(new EventData(byteArray)); + Assert.assertEquals(deSerializedEvent.getBodyLength(), 0); + Assert.assertTrue(deSerializedEvent.getBody() != null); + } @Test public void eventDataSerializationTest() throws IOException, ClassNotFoundException - { - + { final EventData withSimpleByteArray = new EventData(payload.getBytes()); EventData deSerializedEvent = serializeAndDeserialize(withSimpleByteArray); Assert.assertTrue(payload.equals(new String(deSerializedEvent.getBody()))); @@ -95,4 +101,163 @@ private EventData serializeAndDeserialize(final EventData input) throws IOExcept return deSerializedEvent; } + + final String payload = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + + "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"; // even number of chars } diff --git a/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverRuntimeMetricsTest.java b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverRuntimeMetricsTest.java new file mode 100644 index 000000000..f6db633cb --- /dev/null +++ b/azure-eventhubs/src/test/java/com/microsoft/azure/eventhubs/sendrecv/ReceiverRuntimeMetricsTest.java @@ -0,0 +1,105 @@ +/* + * Copyright (c) Microsoft. All rights reserved. + * Licensed under the MIT license. See LICENSE file in the project root for full license information. + */ +package com.microsoft.azure.eventhubs.sendrecv; + +import java.time.Instant; +import java.util.HashSet; +import java.util.LinkedList; + +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.microsoft.azure.eventhubs.EventData; +import com.microsoft.azure.eventhubs.EventHubClient; +import com.microsoft.azure.eventhubs.PartitionReceiver; +import com.microsoft.azure.eventhubs.ReceiverOptions; +import com.microsoft.azure.eventhubs.lib.ApiTestBase; +import com.microsoft.azure.eventhubs.lib.TestBase; +import com.microsoft.azure.eventhubs.lib.TestContext; +import com.microsoft.azure.servicebus.ConnectionStringBuilder; +import com.microsoft.azure.servicebus.ServiceBusException; + +public class ReceiverRuntimeMetricsTest extends ApiTestBase { + + static final String cgName = TestContext.getConsumerGroupName(); + static final String partitionId = "0"; + static final Instant beforeTestStart = Instant.now(); + static final int sentEvents = 25; + + static EventHubClient ehClient; + + static PartitionReceiver receiverWithOptions = null; + static PartitionReceiver receiverWithoutOptions = null; + static PartitionReceiver receiverWithOptionsDisabled = null; + + @BeforeClass + public static void initializeEventHub() throws Exception { + + final ConnectionStringBuilder connectionString = TestContext.getConnectionString(); + ehClient = EventHubClient.createFromConnectionStringSync(connectionString.toString()); + + ReceiverOptions options = new ReceiverOptions(); + options.setReceiverRuntimeMetricEnabled(true); + + ReceiverOptions optionsWithMetricsDisabled = new ReceiverOptions(); + optionsWithMetricsDisabled.setReceiverRuntimeMetricEnabled(false); + + receiverWithOptions = ehClient.createReceiverSync(cgName, partitionId, Instant.now(), options); + receiverWithoutOptions = ehClient.createReceiverSync(cgName, partitionId, Instant.EPOCH); + receiverWithOptionsDisabled = ehClient.createReceiverSync(cgName, partitionId, Instant.EPOCH, optionsWithMetricsDisabled); + + TestBase.pushEventsToPartition(ehClient, partitionId, sentEvents).get(); + } + + @Test() + public void testRuntimeMetricsReturnedWhenEnabled() throws ServiceBusException { + + LinkedList receivedEventsWithOptions = new LinkedList<>(); + while (receivedEventsWithOptions.size() < sentEvents) + for (EventData eData: receiverWithOptions.receiveSync(sentEvents)) + receivedEventsWithOptions.add(eData); + + HashSet offsets = new HashSet<>(); + for (EventData eData: receivedEventsWithOptions) + offsets.add(eData.getSystemProperties().getOffset()); + + Assert.assertTrue(receiverWithOptions.getRuntimeInformation() != null); + Assert.assertTrue(receiverWithOptions.getRuntimeInformation().getLastEnqueuedTime().isAfter(beforeTestStart)); + Assert.assertTrue(offsets.contains(receiverWithOptions.getRuntimeInformation().getLastEnqueuedOffset())); + Assert.assertTrue(receiverWithOptions.getRuntimeInformation().getLastSequenceNumber() >= receivedEventsWithOptions.iterator().next().getSystemProperties().getSequenceNumber()); + } + + @Test() + public void testRuntimeMetricsWhenDisabled() throws ServiceBusException { + + receiverWithOptionsDisabled.receiveSync(10); + Assert.assertTrue(receiverWithOptionsDisabled.getRuntimeInformation() == null); + } + + @Test() + public void testRuntimeMetricsDefaultDisabled() throws ServiceBusException { + + receiverWithoutOptions.receiveSync(10); + Assert.assertTrue(receiverWithoutOptions.getRuntimeInformation() == null); + } + + @AfterClass() + public static void cleanup() throws ServiceBusException { + + if (receiverWithOptions != null) + receiverWithOptions.closeSync(); + + if (receiverWithoutOptions != null) + receiverWithoutOptions.closeSync(); + + if (receiverWithOptionsDisabled != null) + receiverWithOptionsDisabled.closeSync(); + + if (ehClient != null) + ehClient.closeSync(); + } +} diff --git a/pom.xml b/pom.xml index 5f0b7aad8..f64a517d6 100644 --- a/pom.xml +++ b/pom.xml @@ -14,7 +14,7 @@ 0.16.0 4.12 - 0.11.0 + 0.12.0 diff --git a/readme.md b/readme.md index 70f21eb44..1222d806b 100644 --- a/readme.md +++ b/readme.md @@ -147,7 +147,7 @@ the required version of Apache Qpid Proton-J, and the crytography library BCPKIX com.microsoft.azure azure-eventhubs - 0.11.0 + 0.12.0 ```