Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7330] Add goaway and reconnection mechanism #7331

Merged
merged 8 commits into from
Oct 10, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,12 @@ public class NettyClientConfig {
private boolean disableCallbackExecutor = false;
private boolean disableNettyWorkerGroup = false;

private long maxReconnectIntervalTimeSeconds = 60;

private boolean enableReconnectForGoAway = true;

private boolean enableTransparentRetry = true;

public boolean isClientCloseSocketIfTimeout() {
return clientCloseSocketIfTimeout;
}
Expand Down Expand Up @@ -181,6 +187,30 @@ public void setDisableNettyWorkerGroup(boolean disableNettyWorkerGroup) {
this.disableNettyWorkerGroup = disableNettyWorkerGroup;
}

public long getMaxReconnectIntervalTimeSeconds() {
return maxReconnectIntervalTimeSeconds;
}

public void setMaxReconnectIntervalTimeSeconds(long maxReconnectIntervalTimeSeconds) {
this.maxReconnectIntervalTimeSeconds = maxReconnectIntervalTimeSeconds;
}

public boolean isEnableReconnectForGoAway() {
return enableReconnectForGoAway;
}

public void setEnableReconnectForGoAway(boolean enableReconnectForGoAway) {
this.enableReconnectForGoAway = enableReconnectForGoAway;
}

public boolean isEnableTransparentRetry() {
return enableTransparentRetry;
}

public void setEnableTransparentRetry(boolean enableTransparentRetry) {
this.enableTransparentRetry = enableTransparentRetry;
}

public String getSocksProxyConfig() {
return socksProxyConfig;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,11 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.AbortProcessException;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ServiceThread;
import org.apache.rocketmq.common.UtilAll;
Expand All @@ -60,6 +62,7 @@
import org.apache.rocketmq.remoting.metrics.RemotingMetricsManager;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RemotingSysResponseCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;

import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_IS_LONG_POLLING;
import static org.apache.rocketmq.remoting.metrics.RemotingMetricsConstant.LABEL_REQUEST_CODE;
Expand Down Expand Up @@ -120,6 +123,8 @@ public abstract class NettyRemotingAbstract {
*/
protected List<RPCHook> rpcHooks = new ArrayList<>();

protected AtomicBoolean isShuttingDown = new AtomicBoolean(false);

static {
NettyLogger.initNettyLogger();
}
Expand Down Expand Up @@ -264,6 +269,16 @@ public void processRequestCommand(final ChannelHandlerContext ctx, final Remotin

Runnable run = buildProcessRequestHandler(ctx, cmd, pair, opaque);

if (isShuttingDown.get()) {
if (cmd.getVersion() > MQVersion.Version.V5_1_3.ordinal()) {
drpmma marked this conversation as resolved.
Show resolved Hide resolved
final RemotingCommand response = RemotingCommand.createResponseCommand(ResponseCode.GO_AWAY,
"please go away");
response.setOpaque(opaque);
writeResponse(ctx.channel(), cmd, response);
return;
}
}

if (pair.getObject1().rejectRequest()) {
final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
"[REJECTREQUEST]system busy, start flow control for a while");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import com.google.common.base.Stopwatch;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
Expand Down Expand Up @@ -48,6 +49,7 @@
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.cert.CertificateException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -57,6 +59,7 @@
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
Expand All @@ -66,6 +69,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.ThreadFactoryImpl;
Expand All @@ -82,6 +86,7 @@
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.proxy.SocksProxyConfig;

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
Expand All @@ -97,6 +102,7 @@ public class NettyRemotingClient extends NettyRemotingAbstract implements Remoti
private final Map<String /* cidr */, SocksProxyConfig /* proxy */> proxyMap = new HashMap<>();
private final ConcurrentHashMap<String /* cidr */, Bootstrap> bootstrapMap = new ConcurrentHashMap<>();
private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<>();
private final ConcurrentMap<Channel, ChannelWrapper> channelWrapperTables = new ConcurrentHashMap<>();

private final HashedWheelTimer timer = new HashedWheelTimer(r -> new Thread(r, "ClientHouseKeepingService"));

Expand Down Expand Up @@ -356,9 +362,10 @@ public void shutdown() {
this.timer.stop();

for (String addr : this.channelTables.keySet()) {
this.closeChannel(addr, this.channelTables.get(addr).getChannel());
this.channelTables.get(addr).close();
}

this.channelWrapperTables.clear();
this.channelTables.clear();

this.eventLoopGroupWorker.shutdownGracefully();
Expand Down Expand Up @@ -416,7 +423,10 @@ public void closeChannel(final String addr, final Channel channel) {
}

if (removeItemFromTable) {
this.channelTables.remove(addrRemote);
ChannelWrapper channelWrapper = this.channelWrapperTables.remove(channel);
if (channelWrapper != null && channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
}

Expand Down Expand Up @@ -463,7 +473,10 @@ public void closeChannel(final Channel channel) {
}

if (removeItemFromTable) {
this.channelTables.remove(addrRemote);
ChannelWrapper channelWrapper = this.channelWrapperTables.remove(channel);
if (channelWrapper != null && channelWrapper.tryClose(channel)) {
this.channelTables.remove(addrRemote);
}
LOGGER.info("closeChannel: the channel[{}] was removed from channel table", addrRemote);
RemotingHelper.closeChannel(channel);
}
Expand Down Expand Up @@ -511,7 +524,7 @@ public void updateNameServerAddressList(List<String> addrs) {
if (addr.contains(namesrvAddr)) {
ChannelWrapper channelWrapper = this.channelTables.get(addr);
if (channelWrapper != null) {
closeChannel(channelWrapper.getChannel());
channelWrapper.close();
}
}
}
Expand Down Expand Up @@ -689,8 +702,9 @@ private Channel createChannel(final String addr) throws InterruptedException {
ChannelFuture channelFuture = fetchBootstrap(addr)
.connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(channelFuture);
cw = new ChannelWrapper(addr, channelFuture);
this.channelTables.put(addr, cw);
this.channelWrapperTables.put(channelFuture.channel(), cw);
}
} catch (Exception e) {
LOGGER.error("createChannel: create channel exception", e);
Expand Down Expand Up @@ -758,6 +772,64 @@ public void invokeOneway(String addr, RemotingCommand request, long timeoutMilli
}
}

@Override
public CompletableFuture<RemotingCommand> invoke(String addr, RemotingCommand request,
long timeoutMillis) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
return invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> {
if (t == null) {
updateChannelLastResponseTime(addr);
}
}).thenApply(ResponseFuture::getResponseCommand);
} else {
this.closeChannel(addr, channel);
future.completeExceptionally(new RemotingConnectException(addr));
}
} catch (Throwable t) {
future.completeExceptionally(t);
}
return future;
}

@Override
public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final RemotingCommand request,
final long timeoutMillis) {
Stopwatch stopwatch = Stopwatch.createStarted();
return super.invokeImpl(channel, request, timeoutMillis).thenCompose(responseFuture -> {
RemotingCommand response = responseFuture.getResponseCommand();
if (response.getCode() == ResponseCode.GO_AWAY) {
if (nettyClientConfig.isEnableReconnectForGoAway()) {
ChannelWrapper channelWrapper = channelWrapperTables.computeIfPresent(channel, (channel0, channelWrapper0) -> {
try {
if (channelWrapper0.reconnect()) {
LOGGER.info("Receive go away from channel {}, recreate the channel", channel0);
channelWrapperTables.put(channelWrapper0.getChannel(), channelWrapper0);
}
} catch (Throwable t) {
LOGGER.error("Channel {} reconnect error", channelWrapper0, t);
}
return channelWrapper0;
});
if (channelWrapper != null) {
if (nettyClientConfig.isEnableTransparentRetry()) {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
Channel retryChannel = channelWrapper.getChannel();
if (channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
}
}
}
}
}
return CompletableFuture.completedFuture(responseFuture);
});
}

@Override
public void registerProcessor(int requestCode, NettyRequestProcessor processor, ExecutorService executor) {
ExecutorService executorThis = executor;
Expand Down Expand Up @@ -877,30 +949,41 @@ public void run() {
}
}

static class ChannelWrapper {
private final ChannelFuture channelFuture;
class ChannelWrapper {
private final ReentrantReadWriteLock lock;
private ChannelFuture channelFuture;
// only affected by sync or async request, oneway is not included.
private ChannelFuture channelToClose;
private long lastResponseTime;
private volatile long lastReconnectTimestamp = 0L;
private final String channelAddress;

public ChannelWrapper(ChannelFuture channelFuture) {
public ChannelWrapper(String address, ChannelFuture channelFuture) {
this.lock = new ReentrantReadWriteLock();
this.channelFuture = channelFuture;
this.lastResponseTime = System.currentTimeMillis();
this.channelAddress = address;
}

public boolean isOK() {
return this.channelFuture.channel() != null && this.channelFuture.channel().isActive();
return getChannel() != null && getChannel().isActive();
}

public boolean isWritable() {
return this.channelFuture.channel().isWritable();
return getChannel().isWritable();
}

private Channel getChannel() {
return this.channelFuture.channel();
return getChannelFuture().channel();
}

public ChannelFuture getChannelFuture() {
return channelFuture;
lock.readLock().lock();
try {
return this.channelFuture;
} finally {
lock.readLock().unlock();
}
}

public long getLastResponseTime() {
Expand All @@ -910,6 +993,52 @@ public long getLastResponseTime() {
public void updateLastResponseTime() {
this.lastResponseTime = System.currentTimeMillis();
}

public boolean reconnect() {
if (lock.writeLock().tryLock()) {
try {
if (lastReconnectTimestamp == 0L || System.currentTimeMillis() - lastReconnectTimestamp > Duration.ofSeconds(nettyClientConfig.getMaxReconnectIntervalTimeSeconds()).toMillis()) {
channelToClose = channelFuture;
String[] hostAndPort = getHostAndPort(channelAddress);
channelFuture = fetchBootstrap(channelAddress)
.connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
lastReconnectTimestamp = System.currentTimeMillis();
return true;
}
} finally {
lock.writeLock().unlock();
}
}
return false;
}

public boolean tryClose(Channel channel) {
try {
lock.readLock().lock();
if (channelFuture != null) {
if (channelFuture.channel().equals(channel)) {
return true;
}
}
} finally {
lock.readLock().unlock();
}
return false;
}

public void close() {
try {
lock.writeLock().lock();
if (channelFuture != null) {
closeChannel(channelFuture.channel());
}
if (channelToClose != null) {
closeChannel(channelToClose.channel());
}
} finally {
lock.writeLock().unlock();
}
}
}

class InvokeCallbackWrapper implements InvokeCallback {
Expand Down
Loading