Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Recreating connections on transient failures #7090

Merged
merged 31 commits into from
Jan 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
378fb1d
Adding EventHubConnectionProcessor.
conniey Dec 1, 2019
b36d9f6
Adding tests.
conniey Dec 2, 2019
8c9bfbd
Removing unneeded local variable assignment.
conniey Dec 2, 2019
66e9fca
Add tests for fetching new connection.
conniey Dec 2, 2019
e410ce1
Replace ConcurrentLinkedDequeue with ArrayList.
conniey Dec 30, 2019
64ba300
Add tests to ensure on error conditions.
conniey Dec 30, 2019
721d33e
Require not null for EventConnection subscribe methodsˆ.
conniey Jan 2, 2020
67cfe0f
Remove circuar dependency on EventHubConnection from EventHubConnecti…
conniey Jan 2, 2020
0214e1a
Removing EventHubConnection.
conniey Jan 3, 2020
a3a0030
Update AmqpConnection interface to create send and receive links.
conniey Jan 3, 2020
aaa3626
Fixing errors in ConnectionProcessor and update ReactorAmqpConnection…
conniey Jan 3, 2020
603833f
Update clients to use ConnectionProcessor.
conniey Jan 3, 2020
29b3a99
Update tests.
conniey Jan 3, 2020
89f5e19
Fix tests for consumers.
conniey Jan 3, 2020
abc2a70
Fixing EHubClientBuilder to repeat the mono, so we have a potentially…
conniey Jan 3, 2020
aec04f6
Add test to verify that on non-transient errors, the connection is no…
conniey Jan 3, 2020
7a173c0
Adding log messages.
conniey Jan 3, 2020
d53234e
Fix header.
conniey Jan 3, 2020
30d4ec3
Fix checkstyle.
conniey Jan 4, 2020
ad9be4a
Remove logging for info.
conniey Jan 6, 2020
2a8dd64
Fix Event Hub Consumer tests.
conniey Jan 7, 2020
d44bfa6
Using parallel scheduler.
conniey Jan 7, 2020
1073763
Delay retrying EventHubConnection creation on a transient error.
conniey Jan 7, 2020
2f74cac
Update to .toMillis()
conniey Jan 7, 2020
526f996
Setting retry options.
conniey Jan 7, 2020
ca903dc
Complete subscriber.
conniey Jan 7, 2020
079dbdb
Consolidate request from upstream for new connection.
conniey Jan 7, 2020
38fe3a5
Fix unit tests.
conniey Jan 7, 2020
27614c5
Not requesting new connection unless needed.
conniey Jan 7, 2020
cd404de
Fixing tests.
conniey Jan 7, 2020
7dff159
Fixing checkstyle issues.
conniey Jan 7, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public int getMaxRetries() {
* @return The amount of time to delay before retrying the associated operation; if {@code null}, then the operation
* is no longer eligible to be retried.
*/
public Duration calculateRetryDelay(Exception lastException, int retryCount) {
public Duration calculateRetryDelay(Throwable lastException, int retryCount) {
if (retryOptions.getDelay() == Duration.ZERO
|| retryOptions.getMaxDelay() == Duration.ZERO
|| retryCount > retryOptions.getMaxRetries()) {
Expand Down Expand Up @@ -138,7 +138,7 @@ public boolean equals(Object obj) {
* @param exception An exception that was observed for the operation to be retried.
* @return true if the exception is a retriable exception, otherwise false.
*/
private static boolean isRetriableException(Exception exception) {
private static boolean isRetriableException(Throwable exception) {
return (exception instanceof AmqpException) && ((AmqpException) exception).isTransient();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.azure.core.amqp.implementation.MessageSerializer;
import com.azure.core.amqp.implementation.TracerProvider;
import com.azure.core.util.logging.ClientLogger;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubManagementNode;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand All @@ -25,15 +26,16 @@
class EventHubAsyncClient implements Closeable {
private final ClientLogger logger = new ClientLogger(EventHubAsyncClient.class);
private final MessageSerializer messageSerializer;
private final EventHubConnection connection;
private final EventHubConnectionProcessor connectionProcessor;
private final boolean isSharedConnection;
private final TracerProvider tracerProvider;

EventHubAsyncClient(EventHubConnection connection, TracerProvider tracerProvider,
EventHubAsyncClient(EventHubConnectionProcessor connectionProcessor, TracerProvider tracerProvider,
MessageSerializer messageSerializer, boolean isSharedConnection) {
this.tracerProvider = Objects.requireNonNull(tracerProvider, "'tracerProvider' cannot be null.");
this.messageSerializer = Objects.requireNonNull(messageSerializer, "'messageSerializer' cannot be null.");
this.connection = Objects.requireNonNull(connection, "'connection' cannot be null.");
this.connectionProcessor = Objects.requireNonNull(connectionProcessor,
"'connectionProcessor' cannot be null.");
this.isSharedConnection = isSharedConnection;
}

Expand All @@ -43,7 +45,7 @@ class EventHubAsyncClient implements Closeable {
* @return The fully qualified namespace of this Event Hub.
*/
String getFullyQualifiedNamespace() {
return connection.getFullyQualifiedNamespace();
return connectionProcessor.getFullyQualifiedNamespace();
}

/**
Expand All @@ -52,7 +54,7 @@ String getFullyQualifiedNamespace() {
* @return The Event Hub name this client interacts with.
*/
String getEventHubName() {
return connection.getEventHubName();
return connectionProcessor.getEventHubName();
}

/**
Expand All @@ -61,7 +63,9 @@ String getEventHubName() {
* @return The set of information for the Event Hub that this client is associated with.
*/
Mono<EventHubProperties> getProperties() {
return connection.getManagementNode().flatMap(EventHubManagementNode::getEventHubProperties);
return connectionProcessor
.flatMap(connection -> connection.getManagementNode())
.flatMap(EventHubManagementNode::getEventHubProperties);
}

/**
Expand All @@ -81,7 +85,9 @@ Flux<String> getPartitionIds() {
* @return The set of information for the requested partition under the Event Hub this client is associated with.
*/
Mono<PartitionProperties> getPartitionProperties(String partitionId) {
return connection.getManagementNode().flatMap(node -> node.getPartitionProperties(partitionId));
return connectionProcessor
.flatMap(connection -> connection.getManagementNode())
.flatMap(node -> node.getPartitionProperties(partitionId));
}

/**
Expand All @@ -91,8 +97,9 @@ Mono<PartitionProperties> getPartitionProperties(String partitionId) {
* @return A new {@link EventHubProducerAsyncClient}.
*/
EventHubProducerAsyncClient createProducer() {
return new EventHubProducerAsyncClient(connection.getFullyQualifiedNamespace(), getEventHubName(), connection,
connection.getRetryOptions(), tracerProvider, messageSerializer, isSharedConnection);
return new EventHubProducerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(),
connectionProcessor, connectionProcessor.getRetryOptions(), tracerProvider, messageSerializer,
isSharedConnection);
}

/**
Expand All @@ -115,8 +122,8 @@ EventHubConsumerAsyncClient createConsumer(String consumerGroup, int prefetchCou
new IllegalArgumentException("'consumerGroup' cannot be an empty string."));
}

return new EventHubConsumerAsyncClient(connection.getFullyQualifiedNamespace(), getEventHubName(),
connection, messageSerializer, consumerGroup, prefetchCount, isSharedConnection);
return new EventHubConsumerAsyncClient(connectionProcessor.getFullyQualifiedNamespace(), getEventHubName(),
connectionProcessor, messageSerializer, consumerGroup, prefetchCount, isSharedConnection);
}

/**
Expand All @@ -126,6 +133,6 @@ EventHubConsumerAsyncClient createConsumer(String consumerGroup, int prefetchCou
*/
@Override
public void close() {
connection.close();
connectionProcessor.dispose();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
import com.azure.core.util.tracing.Tracer;
import com.azure.messaging.eventhubs.implementation.ClientConstants;
import com.azure.messaging.eventhubs.implementation.EventHubAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubConnectionProcessor;
import com.azure.messaging.eventhubs.implementation.EventHubReactorAmqpConnection;
import com.azure.messaging.eventhubs.implementation.EventHubSharedKeyCredential;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
Expand Down Expand Up @@ -125,7 +127,7 @@ public class EventHubClientBuilder {
private String fullyQualifiedNamespace;
private String eventHubName;
private String consumerGroup;
private EventHubConnection eventHubConnection;
private EventHubConnectionProcessor eventHubConnectionProcessor;
private int prefetchCount;
private boolean isSharedConnection;

Expand Down Expand Up @@ -464,17 +466,17 @@ EventHubAsyncClient buildAsyncClient() {

final MessageSerializer messageSerializer = new EventHubMessageSerializer();

if (isSharedConnection && eventHubConnection == null) {
eventHubConnection = buildConnection(messageSerializer);
if (isSharedConnection && eventHubConnectionProcessor == null) {
eventHubConnectionProcessor = buildConnectionProcessor(messageSerializer);
}

final EventHubConnection connection = isSharedConnection
? eventHubConnection
: buildConnection(messageSerializer);
final EventHubConnectionProcessor processor = isSharedConnection
? eventHubConnectionProcessor
: buildConnectionProcessor(messageSerializer);

final TracerProvider tracerProvider = new TracerProvider(ServiceLoader.load(Tracer.class));

return new EventHubAsyncClient(connection, tracerProvider, messageSerializer, isSharedConnection);
return new EventHubAsyncClient(processor, tracerProvider, messageSerializer, isSharedConnection);
}

/**
Expand Down Expand Up @@ -508,25 +510,28 @@ EventHubClient buildClient() {
return new EventHubClient(client, retryOptions);
}

private EventHubConnection buildConnection(MessageSerializer messageSerializer) {
private EventHubConnectionProcessor buildConnectionProcessor(MessageSerializer messageSerializer) {
final ConnectionOptions connectionOptions = getConnectionOptions();
final TokenManagerProvider tokenManagerProvider = new AzureTokenManagerProvider(
connectionOptions.getAuthorizationType(), connectionOptions.getFullyQualifiedNamespace(),
ClientConstants.AZURE_ACTIVE_DIRECTORY_SCOPE);
final ReactorProvider provider = new ReactorProvider();
final ReactorHandlerProvider handlerProvider = new ReactorHandlerProvider(provider);

Map<String, String> properties = CoreUtils.getProperties(EVENTHUBS_PROPERTIES_FILE);
String product = properties.getOrDefault(NAME_KEY, UNKNOWN);
String clientVersion = properties.getOrDefault(VERSION_KEY, UNKNOWN);
final Map<String, String> properties = CoreUtils.getProperties(EVENTHUBS_PROPERTIES_FILE);
final String product = properties.getOrDefault(NAME_KEY, UNKNOWN);
final String clientVersion = properties.getOrDefault(VERSION_KEY, UNKNOWN);

final Mono<EventHubAmqpConnection> connectionMono = Mono.fromCallable(() -> {
final Flux<EventHubAmqpConnection> connectionFlux = Mono.fromCallable(() -> {
final String connectionId = StringUtil.getRandomString("MF");
return new EventHubReactorAmqpConnection(connectionId, connectionOptions, provider, handlerProvider,
tokenManagerProvider, messageSerializer, product, clientVersion);
});

return new EventHubConnection(connectionMono, connectionOptions);
return (EventHubAmqpConnection) new EventHubReactorAmqpConnection(connectionId, connectionOptions, provider,
handlerProvider, tokenManagerProvider, messageSerializer, product, clientVersion);
}).repeat();

return connectionFlux.subscribeWith(new EventHubConnectionProcessor(
connectionOptions.getFullyQualifiedNamespace(), connectionOptions.getEntityPath(),
connectionOptions.getRetry()));
}

private ConnectionOptions getConnectionOptions() {
Expand Down

This file was deleted.

Loading