diff --git a/sdk/eventhub/event-hubs/src/eventProcessor.ts b/sdk/eventhub/event-hubs/src/eventProcessor.ts index 4f1f5d227fcb..ca0de998f710 100644 --- a/sdk/eventhub/event-hubs/src/eventProcessor.ts +++ b/sdk/eventhub/event-hubs/src/eventProcessor.ts @@ -16,6 +16,10 @@ import { cancellableDelay } from "./util/cancellableDelay"; * Reason for closing a PartitionProcessor. */ export enum CloseReason { + /** + * The PartitionProcessor was shutdown due to some internal or service exception. + */ + EventHubException = "EventHubException", /** * Ownership of the partition was lost or transitioned to a new processor instance. */ @@ -23,11 +27,7 @@ export enum CloseReason { /** * The EventProcessor was shutdown. */ - Shutdown = "Shutdown", - /** - * The PartitionProcessor was shutdown for an unknown reason. - */ - Unknown = "Unknown" + Shutdown = "Shutdown" } export interface PartitionProcessor { diff --git a/sdk/eventhub/event-hubs/src/partitionPump.ts b/sdk/eventhub/event-hubs/src/partitionPump.ts index 759875a957a4..9ffbebb84d33 100644 --- a/sdk/eventhub/event-hubs/src/partitionPump.ts +++ b/sdk/eventhub/event-hubs/src/partitionPump.ts @@ -8,6 +8,7 @@ import { EventHubClient } from "./eventHubClient"; import { EventPosition } from "./eventPosition"; import { EventHubConsumer } from "./receiver"; import { AbortController } from "@azure/abort-controller"; +import { MessagingError } from "@azure/core-amqp"; export class PartitionPump { private _partitionContext: PartitionContext; @@ -50,34 +51,51 @@ export class PartitionPump { } private async _receiveEvents(partitionId: string): Promise { - try { - this._receiver = await this._eventHubClient.createConsumer( - this._partitionContext.consumerGroupName, - partitionId, - this._processorOptions.initialEventPosition || EventPosition.earliest() - ); + this._receiver = this._eventHubClient.createConsumer( + this._partitionContext.consumerGroupName, + partitionId, + this._processorOptions.initialEventPosition || EventPosition.earliest() + ); - while (this._isReceiving) { + while (this._isReceiving) { + try { const receivedEvents = await this._receiver.receiveBatch( this._processorOptions.maxBatchSize || 1, this._processorOptions.maxWaitTimeInSeconds, this._abortController.signal ); + // avoid calling user's processEvents handler if the pump was stopped while receiving events if (!this._isReceiving) { return; } await this._partitionProcessor.processEvents(receivedEvents); - } - } catch (err) { - this._isReceiving = false; - try { - if (this._receiver) { - await this._receiver.close(); - } - await this._partitionProcessor.processError(err); - log.error("An error occurred while receiving events.", err); } catch (err) { - log.error("An error occurred while closing the receiver", err); + // check if this pump is still receiving + // it may not be if the EventProcessor was stopped during processEvents + if (!this._isReceiving) { + // no longer receiving, so close was called from somewhere else + return; + } + + // forward error to user's processError and swallow errors they may throw + try { + await this._partitionProcessor.processError(err); + } catch (err) { + log.error("An error was thrown by user's processError method: ", err); + } + + // close the partition processor if a non-retryable error was encountered + if (typeof err !== "object" || !(err as MessagingError).retryable) { + try { + // this will close the pump and will break us out of the while loop + return await this.stop(CloseReason.EventHubException); + } catch (err) { + log.error( + `An error occurred while closing the receiver with reason ${CloseReason.EventHubException}: `, + err + ); + } + } } } } diff --git a/sdk/eventhub/event-hubs/src/pumpManager.ts b/sdk/eventhub/event-hubs/src/pumpManager.ts index 8b430fa45acc..cdcc9a7ebe10 100644 --- a/sdk/eventhub/event-hubs/src/pumpManager.ts +++ b/sdk/eventhub/event-hubs/src/pumpManager.ts @@ -68,7 +68,7 @@ export class PumpManager { log.pumpManager( `[${this._eventProcessorName}] [${partitionId}] The existing pump is not running.` ); - await this.removePump(partitionId, CloseReason.Unknown); + await this.removePump(partitionId, CloseReason.EventHubException); } log.pumpManager(`[${this._eventProcessorName}] [${partitionId}] Creating a new pump.`);