Skip to content

Commit

Permalink
[Event Hubs] updates PartitionPump to continue receiving events for r…
Browse files Browse the repository at this point in the history
…etryable errors (#4565)
  • Loading branch information
chradek authored Jul 31, 2019
1 parent 334d712 commit 9f94d25
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 23 deletions.
10 changes: 5 additions & 5 deletions sdk/eventhub/event-hubs/src/eventProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@ import { cancellableDelay } from "./util/cancellableDelay";
* Reason for closing a PartitionProcessor.
*/
export enum CloseReason {
/**
* The PartitionProcessor was shutdown due to some internal or service exception.
*/
EventHubException = "EventHubException",
/**
* Ownership of the partition was lost or transitioned to a new processor instance.
*/
OwnershipLost = "OwnershipLost",
/**
* The EventProcessor was shutdown.
*/
Shutdown = "Shutdown",
/**
* The PartitionProcessor was shutdown for an unknown reason.
*/
Unknown = "Unknown"
Shutdown = "Shutdown"
}

export interface PartitionProcessor {
Expand Down
52 changes: 35 additions & 17 deletions sdk/eventhub/event-hubs/src/partitionPump.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { EventHubClient } from "./eventHubClient";
import { EventPosition } from "./eventPosition";
import { EventHubConsumer } from "./receiver";
import { AbortController } from "@azure/abort-controller";
import { MessagingError } from "@azure/core-amqp";

export class PartitionPump {
private _partitionContext: PartitionContext;
Expand Down Expand Up @@ -50,34 +51,51 @@ export class PartitionPump {
}

private async _receiveEvents(partitionId: string): Promise<void> {
try {
this._receiver = await this._eventHubClient.createConsumer(
this._partitionContext.consumerGroupName,
partitionId,
this._processorOptions.initialEventPosition || EventPosition.earliest()
);
this._receiver = this._eventHubClient.createConsumer(
this._partitionContext.consumerGroupName,
partitionId,
this._processorOptions.initialEventPosition || EventPosition.earliest()
);

while (this._isReceiving) {
while (this._isReceiving) {
try {
const receivedEvents = await this._receiver.receiveBatch(
this._processorOptions.maxBatchSize || 1,
this._processorOptions.maxWaitTimeInSeconds,
this._abortController.signal
);
// avoid calling user's processEvents handler if the pump was stopped while receiving events
if (!this._isReceiving) {
return;
}
await this._partitionProcessor.processEvents(receivedEvents);
}
} catch (err) {
this._isReceiving = false;
try {
if (this._receiver) {
await this._receiver.close();
}
await this._partitionProcessor.processError(err);
log.error("An error occurred while receiving events.", err);
} catch (err) {
log.error("An error occurred while closing the receiver", err);
// check if this pump is still receiving
// it may not be if the EventProcessor was stopped during processEvents
if (!this._isReceiving) {
// no longer receiving, so close was called from somewhere else
return;
}

// forward error to user's processError and swallow errors they may throw
try {
await this._partitionProcessor.processError(err);
} catch (err) {
log.error("An error was thrown by user's processError method: ", err);
}

// close the partition processor if a non-retryable error was encountered
if (typeof err !== "object" || !(err as MessagingError).retryable) {
try {
// this will close the pump and will break us out of the while loop
return await this.stop(CloseReason.EventHubException);
} catch (err) {
log.error(
`An error occurred while closing the receiver with reason ${CloseReason.EventHubException}: `,
err
);
}
}
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion sdk/eventhub/event-hubs/src/pumpManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ export class PumpManager {
log.pumpManager(
`[${this._eventProcessorName}] [${partitionId}] The existing pump is not running.`
);
await this.removePump(partitionId, CloseReason.Unknown);
await this.removePump(partitionId, CloseReason.EventHubException);
}

log.pumpManager(`[${this._eventProcessorName}] [${partitionId}] Creating a new pump.`);
Expand Down

0 comments on commit 9f94d25

Please sign in to comment.