Skip to content

Commit

Permalink
Add enableReconnectForGoAway for NettyClientConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma committed Oct 8, 2023
1 parent 5a16134 commit 2f5a95c
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class NettyClientConfig {

private long maxReconnectIntervalTimeSeconds = 60;

private boolean enableReconnectForGoAway = true;

private boolean enableTransparentRetry = true;

public boolean isClientCloseSocketIfTimeout() {
Expand Down Expand Up @@ -193,6 +195,14 @@ public void setMaxReconnectIntervalTimeSeconds(long maxReconnectIntervalTimeSeco
this.maxReconnectIntervalTimeSeconds = maxReconnectIntervalTimeSeconds;
}

public boolean isEnableReconnectForGoAway() {
return enableReconnectForGoAway;
}

public void setEnableReconnectForGoAway(boolean enableReconnectForGoAway) {
this.enableReconnectForGoAway = enableReconnectForGoAway;
}

public boolean isEnableTransparentRetry() {
return enableTransparentRetry;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,25 +801,27 @@ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final
return super.invokeImpl(channel, request, timeoutMillis).thenCompose(responseFuture -> {
RemotingCommand response = responseFuture.getResponseCommand();
if (response.getCode() == ResponseCode.GO_AWAY) {
ChannelWrapper channelWrapper = channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
try {
if (channelWrapper0.reconnect()) {
LOGGER.info("Receive go away from channel {}, recreate the channel", channel0);
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
if (nettyClientConfig.isEnableReconnectForGoAway()) {
ChannelWrapper channelWrapper = channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
try {
if (channelWrapper0.reconnect()) {
LOGGER.info("Receive go away from channel {}, recreate the channel", channel0);
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
}
} catch (Throwable t) {
LOGGER.error("Channel {} reconnect error", channelWrapper0, t);
}
} catch (Throwable t) {
LOGGER.error("Channel {} reconnect error", channelWrapper0, t);
}
return channelWrapper0;
});
if (channelWrapper != null) {
if (nettyClientConfig.isEnableTransparentRetry()) {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
Channel retryChannel = channelWrapper.getChannel();
if (channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
return channelWrapper0;
});
if (channelWrapper != null) {
if (nettyClientConfig.isEnableTransparentRetry()) {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
Channel retryChannel = channelWrapper.getChannel();
if (channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
}
}
}
}
Expand Down

0 comments on commit 2f5a95c

Please sign in to comment.