diff --git a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java index 9f151913067c..d326d77d87b2 100644 --- a/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java +++ b/remoting/src/main/java/org/apache/rocketmq/remoting/netty/NettyRemotingClient.java @@ -612,25 +612,29 @@ private void updateChannelLastResponseTime(final String addr) { } } - private Channel getAndCreateChannel(final String addr) throws InterruptedException { + private ChannelFuture getAndCreateChannelAsync(final String addr) throws InterruptedException { if (null == addr) { - return getAndCreateNameserverChannel(); + return getAndCreateNameserverChannelAsync(); } ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.getChannel(); + return cw.getChannel().newSucceededFuture(); } - return this.createChannel(addr); + return this.createChannelAsync(addr); + } + + private Channel getAndCreateChannel(final String addr) throws InterruptedException { + return getAndCreateChannelAsync(addr).awaitUninterruptibly().channel(); } - private Channel getAndCreateNameserverChannel() throws InterruptedException { + private ChannelFuture getAndCreateNameserverChannelAsync() throws InterruptedException { String addr = this.namesrvAddrChoosed.get(); if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.getChannel(); + return cw.getChannel().newSucceededFuture(); } } @@ -641,25 +645,19 @@ private Channel getAndCreateNameserverChannel() throws InterruptedException { if (addr != null) { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.getChannel(); + return cw.getChannel().newSucceededFuture(); } } if (addrList != null && !addrList.isEmpty()) { - for (int i = 0; i < addrList.size(); i++) { - int index = this.namesrvIndex.incrementAndGet(); - index = Math.abs(index); - index = index % addrList.size(); - String newAddr = addrList.get(index); - - this.namesrvAddrChoosed.set(newAddr); - LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex); - Channel channelNew = this.createChannel(newAddr); - if (channelNew != null) { - return channelNew; - } - } - throw new RemotingConnectException(addrList.toString()); + int index = this.namesrvIndex.incrementAndGet(); + index = Math.abs(index); + index = index % addrList.size(); + String newAddr = addrList.get(index); + + this.namesrvAddrChoosed.set(newAddr); + LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex); + return this.createChannelAsync(newAddr); } } catch (Exception e) { LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e); @@ -673,10 +671,10 @@ private Channel getAndCreateNameserverChannel() throws InterruptedException { return null; } - private Channel createChannel(final String addr) throws InterruptedException { + private ChannelFuture createChannelAsync(final String addr) throws InterruptedException { ChannelWrapper cw = this.channelTables.get(addr); if (cw != null && cw.isOK()) { - return cw.getChannel(); + return cw.getChannel().newSucceededFuture(); } if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) { @@ -684,9 +682,8 @@ private Channel createChannel(final String addr) throws InterruptedException { boolean createNewConnection; cw = this.channelTables.get(addr); if (cw != null) { - if (cw.isOK()) { - return cw.getChannel(); + return cw.getChannel().newSucceededFuture(); } else if (!cw.getChannelFuture().isDone()) { createNewConnection = false; } else { @@ -705,6 +702,7 @@ private Channel createChannel(final String addr) throws InterruptedException { cw = new ChannelWrapper(addr, channelFuture); this.channelTables.put(addr, cw); this.channelWrapperTables.put(channelFuture.channel(), cw); + return channelFuture; } } catch (Exception e) { LOGGER.error("createChannel: create channel exception", e); @@ -715,26 +713,6 @@ private Channel createChannel(final String addr) throws InterruptedException { LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS); } - if (cw != null) { - return waitChannelFuture(addr, cw); - } - - return null; - } - - private Channel waitChannelFuture(String addr, ChannelWrapper cw) { - ChannelFuture channelFuture = cw.getChannelFuture(); - if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) { - if (cw.isOK()) { - LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString()); - return cw.getChannel(); - } else { - LOGGER.warn("createChannel: connect remote host[{}] failed, {}", addr, channelFuture.toString()); - } - } else { - LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(), - channelFuture.toString()); - } return null; } @@ -743,38 +721,43 @@ public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { long beginStartTime = System.currentTimeMillis(); - final Channel channel = this.getAndCreateChannel(addr); - String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); - if (channel != null && channel.isActive()) { - long costTime = System.currentTimeMillis() - beginStartTime; - if (timeoutMillis < costTime) { - throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout"); + final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr); + channelFuture.addListener(future -> { + if (future.isSuccess()) { + Channel channel = channelFuture.channel(); + String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); + if (channel != null && channel.isActive()) { + long costTime = System.currentTimeMillis() - beginStartTime; + if (timeoutMillis < costTime) { + invokeCallback.operationFail(new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout")); + } + this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr)); + } else { + this.closeChannel(addr, channel); + invokeCallback.operationFail(new RemotingConnectException(addr)); + } + } else { + invokeCallback.operationFail(new RemotingConnectException(addr)); } - this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr)); - } else { - this.closeChannel(addr, channel); - throw new RemotingConnectException(addr); - } + }); } @Override public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { - final Channel channel = this.getAndCreateChannel(addr); - String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); - if (channel != null && channel.isActive()) { - try { - doBeforeRpcHooks(channelRemoteAddr, request); - this.invokeOnewayImpl(channel, request, timeoutMillis); - } catch (RemotingSendRequestException e) { - LOGGER.warn("invokeOneway: send request exception, so close the channel[{}]", channelRemoteAddr); - this.closeChannel(addr, channel); - throw e; + final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr); + channelFuture.addListener(future -> { + if (future.isSuccess()) { + Channel channel = channelFuture.channel(); + String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel); + if (channel != null && channel.isActive()) { + doBeforeRpcHooks(channelRemoteAddr, request); + this.invokeOnewayImpl(channel, request, timeoutMillis); + } else { + this.closeChannel(addr, channel); + } } - } else { - this.closeChannel(addr, channel); - throw new RemotingConnectException(addr); - } + }); } @Override @@ -782,17 +765,30 @@ public CompletableFuture invoke(String addr, RemotingCommand re long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); try { - final Channel channel = this.getAndCreateChannel(addr); - if (channel != null && channel.isActive()) { - return invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> { - if (t == null) { - updateChannelLastResponseTime(addr); + final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr); + channelFuture.addListener(f -> { + if (f.isSuccess()) { + Channel channel = channelFuture.channel(); + if (channel != null && channel.isActive()) { + invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> { + if (t == null) { + updateChannelLastResponseTime(addr); + } + }).thenApply(ResponseFuture::getResponseCommand).whenComplete((v, t) -> { + if (t != null) { + future.completeExceptionally(t); + } else { + future.complete(v); + } + }); + } else { + this.closeChannel(addr, channel); + future.completeExceptionally(new RemotingConnectException(addr)); } - }).thenApply(ResponseFuture::getResponseCommand); - } else { - this.closeChannel(addr, channel); - future.completeExceptionally(new RemotingConnectException(addr)); - } + } else { + future.completeExceptionally(new RemotingConnectException(addr)); + } + }); } catch (Throwable t) { future.completeExceptionally(t); } @@ -820,18 +816,36 @@ public CompletableFuture invokeImpl(final Channel channel, final }); if (channelWrapper != null) { if (nettyClientConfig.isEnableTransparentRetry()) { - long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); stopwatch.stop(); RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader()); retryRequest.setBody(request.getBody()); - Channel retryChannel; if (channelWrapper.isOK()) { - retryChannel = channelWrapper.getChannel(); + long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); + Channel retryChannel = channelWrapper.getChannel(); + if (retryChannel != null && channel != retryChannel) { + return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration); + } } else { - retryChannel = waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper); - } - if (retryChannel != null && channel != retryChannel) { - return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration); + CompletableFuture future = new CompletableFuture<>(); + ChannelFuture channelFuture = channelWrapper.getChannelFuture(); + channelFuture.addListener(f -> { + long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS); + if (f.isSuccess()) { + Channel retryChannel0 = channelFuture.channel(); + if (retryChannel0 != null && channel != retryChannel0) { + super.invokeImpl(retryChannel0, retryRequest, timeoutMillis - duration).whenComplete((v, t) -> { + if (t != null) { + future.completeExceptionally(t); + } else { + future.complete(v); + } + }); + } + } else { + future.completeExceptionally(new RemotingConnectException(channelWrapper.channelAddress)); + } + }); + return future; } } }