Skip to content

Commit

Permalink
Fix connection, link and timer object leaks after EntityClose & fix i…
Browse files Browse the repository at this point in the history
…naccurate timer issues (#88)

* Fix connection, link and timer object leaks after EntityClose

* fixes stuck behaviors in Sender & Receiver
* MsgReceiver should update the pendingReceive queue before enqueing work to ReactorDispatcher
* fix receiver when prefetch queue is full
* Fix Sender & Receiver dormant links after close
* move messagesender timer out-of reactorDispatcher queue
* handle scenarios where an underlying linkOpen is pending and user invokes receiver.close()
* fix the close paths for send - for dormant links
* Move link open close timers out of ReactorDispatcher
* Clear pendingSends & Receives in case of CloseTimeout
* Don't open amqpconnection if MsgFactory is in Closed() state

* Fix sender stuck issue due to a missing not(!) in if check

* Fix dead links registered for ConnectionHandler errors

* add manual test which can be used to test intermittent connection scanarios effectively

* minor refactor

* don't proactively throw sendlink error on token renewal failure

* nit fixes

* code refactor

* refactoring messagesender

* Update minor release

* minor refactor
  • Loading branch information
SreeramGarlapati authored and sjkwak committed Apr 5, 2017
1 parent 768046b commit 9d76641
Show file tree
Hide file tree
Showing 15 changed files with 543 additions and 329 deletions.
2 changes: 1 addition & 1 deletion ConsumingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>0.13.0</version>
<version>0.13.1</version>
</dependency>
```

Expand Down
2 changes: 1 addition & 1 deletion PublishingEvents.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ following dependency declaration inside of your Maven project file:
<dependency>
<groupId>com.microsoft.azure</groupId>
<artifactId>azure-eventhubs-clients</artifactId>
<version>0.13.0</version>
<version>0.13.1</version>
</dependency>
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,20 @@

import java.time.Duration;
import java.util.Locale;
import java.util.concurrent.ScheduledFuture;
import java.util.logging.Level;
import java.util.logging.Logger;

public class ActiveClientTokenManager {

private static final Logger TRACE_LOGGER = Logger.getLogger(ClientConstants.SERVICEBUS_CLIENT_TRACE);

public final Runnable sendTokenTask;
public final ClientEntity clientEntity;
public final Duration tokenRefreshInterval;

private ScheduledFuture timer;

private final Object timerLock;
private final Runnable sendTokenTask;
private final ClientEntity clientEntity;
private final Duration tokenRefreshInterval;

public ActiveClientTokenManager(
final ClientEntity clientEntity,
Expand All @@ -25,8 +29,18 @@ public ActiveClientTokenManager(
this.sendTokenTask = sendTokenAsync;
this.clientEntity = clientEntity;
this.tokenRefreshInterval = tokenRefreshInterval;
this.timerLock = new Object();

synchronized (this.timerLock) {
this.timer = Timer.schedule(new TimerCallback(), tokenRefreshInterval, TimerType.OneTimeRun);
}
}

public void cancel() {

Timer.schedule(new TimerCallback(), tokenRefreshInterval, TimerType.OneTimeRun);
synchronized (this.timerLock) {
this.timer.cancel(false);
}
}

private class TimerCallback implements Runnable {
Expand All @@ -38,7 +52,9 @@ public void run() {

sendTokenTask.run();

Timer.schedule(new TimerCallback(), tokenRefreshInterval, TimerType.OneTimeRun);
synchronized (timerLock) {
timer = Timer.schedule(new TimerCallback(), tokenRefreshInterval, TimerType.OneTimeRun);
}
}
else {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

import java.util.Map;
import java.util.HashMap;
import java.util.function.Consumer;
import java.util.function.BiConsumer;

import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
Expand Down Expand Up @@ -107,10 +107,13 @@ public void run(IOperationResult<RequestResponseChannel, Exception> operationCal
CBSChannel.this.sessionProvider.getSession(
"cbs-session",
null,
new Consumer<ErrorCondition>() {
new BiConsumer<ErrorCondition, Exception>() {
@Override
public void accept(ErrorCondition error) {
operationCallback.onError(new AmqpException(error));
public void accept(ErrorCondition error, Exception exception) {
if (error != null)
operationCallback.onError(new AmqpException(error));
else if (exception != null)
operationCallback.onError(exception);
}
}));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ private ClientConstants() { }
public final static String DEFAULT_RETRY = "Default";

public final static String PRODUCT_NAME = "MSJavaClient";
public final static String CURRENT_JAVACLIENT_VERSION = "0.13.0";
public final static String CURRENT_JAVACLIENT_VERSION = "0.13.1";

public static final String PLATFORM_INFO = getPlatformInfo();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
*/
package com.microsoft.azure.servicebus;

import java.util.function.BiConsumer;
import java.util.function.Consumer;

import org.apache.qpid.proton.amqp.transport.ErrorCondition;
Expand All @@ -14,5 +15,5 @@ interface ISessionProvider
Session getSession(
final String path,
final Consumer<Session> onSessionOpen,
final Consumer<ErrorCondition> onSessionOpenError);
final BiConsumer<ErrorCondition, Exception> onSessionOpenError);
}
Loading

0 comments on commit 9d76641

Please sign in to comment.