Skip to content

Commit

Permalink
Release 0.12.0 (Azure#76)
Browse files Browse the repository at this point in the history
* Optimization: flow the Receiver runtime info as part of EventData (Azure#65)

* Optimization: flow the Receiver runtime info as part of EventData - Partition receiver runtime metrics aka EndOfStream info
* move cursor to 0.12.0-snapshot
* receiverruntimemetrics on PartitionReceiver & eph + junits

* improve cit coverage

* Fix issue 58: deliver empty iterable to onEvents on timeout

* fix: eventData deserialization error, when the payload size is >1000bytes (Azure#71)

* EventData deserialization fails when the payload size is >1000bytes
* move to readFully api (suggestion from @CodingCat)

* releaseChecklist: fix javadoc & update version to 0.12.0 (Azure#73)

* eph test fix - absorb all events sent by RuntimeInfotest to not effect other tests

* runtimeinfo: refactoring for CR (Azure#80)

* runtimeinfo: refactoring for CR

* minor refactor

* update eph test

* fix exception contract for link-detach errors - this should be a transient error (Azure#78)
  • Loading branch information
SreeramGarlapati authored and sjkwak committed Mar 16, 2017
1 parent 88a06c9 commit 180bc11
Show file tree
Hide file tree
Showing 29 changed files with 1,136 additions and 89 deletions.
2 changes: 1 addition & 1 deletion ConsumingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion PublishingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>0.11.0</version>
<version>0.12.0</version>
</dependency>
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.logging.Level;
Expand All @@ -15,6 +16,7 @@
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.PartitionReceiveHandler;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.ReceiverOptions;
import com.microsoft.azure.servicebus.ReceiverDisconnectedException;
import com.microsoft.azure.servicebus.ServiceBusException;

Expand Down Expand Up @@ -96,19 +98,21 @@ private void openClients() throws ServiceBusException, IOException, InterruptedE
this.eventHubClient = (EventHubClient) this.internalOperationFuture.get();
this.internalOperationFuture = null;

// Create new receiver and set options
// Create new receiver and set options
ReceiverOptions options = new ReceiverOptions();
options.setReceiverRuntimeMetricEnabled(this.host.getEventProcessorOptions().getReceiverRuntimeMetricEnabled());
Object startAt = this.partitionContext.getInitialOffset();
long epoch = this.lease.getEpoch();
this.host.logWithHostAndPartition(Level.FINER, this.partitionContext, "Opening EH receiver with epoch " + epoch + " at location " + startAt);
if (startAt instanceof String)
{
this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(),
(String)startAt, epoch);
(String)startAt, epoch, options);
}
else if (startAt instanceof Instant)
{
this.internalOperationFuture = this.eventHubClient.createEpochReceiver(this.partitionContext.getConsumerGroupName(), this.partitionContext.getPartitionId(),
(Instant)startAt, epoch);
(Instant)startAt, epoch, options);
}
else
{
Expand Down Expand Up @@ -191,13 +195,28 @@ private class InternalReceiveHandler extends PartitionReceiveHandler
@Override
public void onReceive(Iterable<EventData> events)
{
// This method is called on the thread that the Java EH client uses to run the pump.
// There is one pump per EventHubClient. Since each PartitionPump creates a new EventHubClient,
// using that thread to call onEvents does no harm. Even if onEvents is slow, the pump will
// get control back each time onEvents returns, and be able to receive a new batch of messages
// with which to make the next onEvents call. The pump gains nothing by running faster than onEvents.

EventHubPartitionPump.this.onEvents(events);
if (EventHubPartitionPump.this.host.getEventProcessorOptions().getReceiverRuntimeMetricEnabled())
{
EventHubPartitionPump.this.partitionContext.setRuntimeInformation(EventHubPartitionPump.this.partitionReceiver.getRuntimeInformation());
}

// This method is called on the thread that the Java EH client uses to run the pump.
// There is one pump per EventHubClient. Since each PartitionPump creates a new EventHubClient,
// using that thread to call onEvents does no harm. Even if onEvents is slow, the pump will
// get control back each time onEvents returns, and be able to receive a new batch of messages
// with which to make the next onEvents call. The pump gains nothing by running faster than onEvents.

// The underlying client returns null if there are no events, but the contract for IEventProcessor
// is different and is expecting an empty iterable if there are no events (and invoke processor after
// receive timeout is turned on).

Iterable<EventData> effectiveEvents = events;
if (effectiveEvents == null)
{
effectiveEvents = new ArrayList<EventData>();
}

EventHubPartitionPump.this.onEvents(effectiveEvents);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public final class EventProcessorOptions
{
private Consumer<ExceptionReceivedEventArgs> exceptionNotificationHandler = null;
private Boolean invokeProcessorAfterReceiveTimeout = false;
private boolean receiverRuntimeMetricEnabled = false;
private int maxBatchSize = 10;
private int prefetchCount = 300;
private Duration receiveTimeOut = Duration.ofMinutes(1);
Expand All @@ -30,6 +31,7 @@ public final class EventProcessorOptions
* PrefetchCount: 300
* InitialOffsetProvider: uses the last offset checkpointed, or START_OF_STREAM
* InvokeProcessorAfterReceiveTimeout: false
* ReceiverRuntimeMetricEnabled: false
* </pre>
*
* @return an EventProcessorOptions instance with all options set to the default values
Expand Down Expand Up @@ -146,8 +148,8 @@ public void setInitialOffsetProvider(Function<String, Object> initialOffsetProvi
}

/***
* Returns whether the EventProcessorHost will call IEventProcessor.onEvents(null) when a receive
* timeout occurs (true) or not (false).
* Returns whether the EventProcessorHost will call IEventProcessor.onEvents() with an empty iterable
* when a receive timeout occurs (true) or not (false).
*
* Defaults to false.
*
Expand All @@ -159,8 +161,8 @@ public Boolean getInvokeProcessorAfterReceiveTimeout()
}

/**
* Changes whether the EventProcessorHost will call IEventProcessor.onEvents(null) when a receive
* timeout occurs (true) or not (false).
* Changes whether the EventProcessorHost will call IEventProcessor.onEvents() with an empty iterable
* when a receive timeout occurs (true) or not (false).
*
* The default is false (no call).
*
Expand All @@ -170,6 +172,32 @@ public void setInvokeProcessorAfterReceiveTimeout(Boolean invokeProcessorAfterRe
{
this.invokeProcessorAfterReceiveTimeout = invokeProcessorAfterReceiveTimeout;
}

/**
* Knob to enable/disable runtime metric of the receiver. If this is set to true,
* the first parameter {@link com.microsoft.azure.eventprocessorhost.PartitionContext#runtimeInformation} of
* {@link IEventProcessor#onEvents(com.microsoft.azure.eventprocessorhost.PartitionContext, java.lang.Iterable)} will be populated.
* <p>
* Enabling this knob will add 3 additional properties to all raw AMQP messages received.
* @return the {@link boolean} indicating, whether, the runtime metric of the receiver was enabled
*/
public boolean getReceiverRuntimeMetricEnabled()
{
return this.receiverRuntimeMetricEnabled;
}

/**
* Knob to enable/disable runtime metric of the receiver. If this is set to true,
* the first parameter {@link com.microsoft.azure.eventprocessorhost.PartitionContext#runtimeInformation} of
* {@link IEventProcessor#onEvents(com.microsoft.azure.eventprocessorhost.PartitionContext, java.lang.Iterable)} will be populated.
* <p>
* Enabling this knob will add 3 additional properties to all raw AMQP messages received.
* @param value the {@link boolean} to indicate, whether, the runtime metric of the receiver should be enabled
*/
public void setReceiverRuntimeMetricEnabled(boolean value)
{
this.receiverRuntimeMetricEnabled = value;
}

void notifyOfException(String hostname, Exception exception, String action)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,14 @@ public interface IEventProcessor
/**
* Called by the processor host when a batch of events has arrived.
*
* This is where the real work of the event processor is done.
* This is where the real work of the event processor is done. It is normally called when one
* or more events have arrived. If the EventProcessorHost instance was set up with an EventProcessorOptions
* on which setInvokeProcessorAfterReceiveTimeout(true) has been called, then if a receive times out,
* it will be called with an empty iterable. By default this option is false and receive timeouts do not
* cause a call to this method.
*
* @param context Information about the partition.
* @param messages The events to be processed.
* @param messages The events to be processed. May be empty.
* @throws Exception
*/
public void onEvents(PartitionContext context, Iterable<EventData> messages) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,19 @@

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.PartitionReceiver;
import com.microsoft.azure.eventhubs.ReceiverRuntimeInformation;

public class PartitionContext
{
private final EventProcessorHost host;
private final EventProcessorHost host;
private final String partitionId;
private final String eventHubPath;
private final String consumerGroupName;

private Lease lease;
private String offset = PartitionReceiver.START_OF_STREAM;
private long sequenceNumber = 0;;
private long sequenceNumber = 0;
private ReceiverRuntimeInformation runtimeInformation;

private Object offsetSynchronizer;

Expand All @@ -32,6 +34,7 @@ public class PartitionContext
this.partitionId = partitionId;
this.eventHubPath = eventHubPath;
this.consumerGroupName = consumerGroupName;
this.runtimeInformation = new ReceiverRuntimeInformation(partitionId);

this.offsetSynchronizer = new Object();
}
Expand All @@ -50,6 +53,16 @@ public String getOwner()
{
return this.lease.getOwner();
}

public ReceiverRuntimeInformation getRuntimeInformation()
{
return this.runtimeInformation;
}

void setRuntimeInformation(ReceiverRuntimeInformation value)
{
this.runtimeInformation = value;
}

Lease getLease()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,17 @@ public class PerTestSettings
EventProcessorOptions inOptions; // can be null
boolean inDoCheckpoint;
boolean inEntityDoesNotExist; // Prevents test code from doing certain checks that would fail on nonexistence before reaching product code.
boolean inTelltaleOnTimeout; // Generates an empty telltale string, which causes PrefabEventProcessor to trigger telltale on timeout.
boolean inHasSenders;

PerTestSettings(String testName)
{
this.inTestName = testName;
this.inOptions = EventProcessorOptions.getDefaultOptions();
this.inDoCheckpoint = false;
this.inEntityDoesNotExist = false;
this.inTelltaleOnTimeout = false;
this.inHasSenders = true;

this.inoutEPHConstructorArgs = new EPHConstructorArgs();
}
Expand Down Expand Up @@ -52,6 +56,7 @@ class EPHConstructorArgs
static final int CHECKPOINT_MANAGER_OVERRIDE = 0x0400;
static final int LEASE_MANAGER_OVERRIDE = 0x0800;
static final int EXPLICIT_MANAGER = CHECKPOINT_MANAGER_OVERRIDE | LEASE_MANAGER_OVERRIDE;
static final int TELLTALE_ON_TIMEOUT = 0x1000;

private int flags;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ public class PrefabEventProcessor implements IEventProcessor
private boolean doCheckpoint;
private boolean doMarker;
private boolean logEveryMessage;
private boolean telltaleOnTimeout;

private int eventCount = 0;

Expand All @@ -26,6 +27,7 @@ public class PrefabEventProcessor implements IEventProcessor
this.doCheckpoint = doCheckpoint;
this.doMarker = doMarker;
this.logEveryMessage = logEveryMessage;
this.telltaleOnTimeout = telltale.isEmpty();
}

@Override
Expand All @@ -43,6 +45,9 @@ public void onEvents(PartitionContext context, Iterable<EventData> messages) thr
{
int batchSize = 0;
EventData lastEvent = null;
if (messages != null && messages.iterator().hasNext())
this.factory.setOnEventsContext(context);

for (EventData event : messages)
{
this.eventCount++;
Expand All @@ -61,6 +66,19 @@ public void onEvents(PartitionContext context, Iterable<EventData> messages) thr
}
lastEvent = event;
}
if (batchSize == 0)
{
if (this.telltaleOnTimeout)
{
TestUtilities.log("P" + context.getPartitionId() + " got expected timeout");
this.factory.setTelltaleFound(context.getPartitionId());
}
else
{
TestUtilities.log("P" + context.getPartitionId() + " got UNEXPECTED timeout");
this.factory.putError("P" + context.getPartitionId() + " got UNEXPECTED timeout");
}
}
this.factory.addBatch(batchSize);
if (doCheckpoint)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ public class PrefabProcessorFactory implements IEventProcessorFactory<IEventProc
private ArrayList<String> errors = new ArrayList<String>();
private HashMap<String, Boolean> foundTelltale = new HashMap<String, Boolean>();
private int eventsReceivedCount = 0;
private PartitionContext partitionContextOnEvents;

PrefabProcessorFactory(String telltale, boolean doCheckpoint, boolean doMarker)
{
Expand Down Expand Up @@ -72,6 +73,16 @@ int getEventsReceivedCount()
{
return this.eventsReceivedCount;
}

PartitionContext getOnEventsContext()
{
return this.partitionContextOnEvents;
}

void setOnEventsContext(PartitionContext value)
{
this.partitionContextOnEvents = value;
}

@Override
public IEventProcessor createEventProcessor(PartitionContext context) throws Exception
Expand Down
Loading

0 comments on commit 180bc11

Please sign in to comment.