Skip to content

Commit

Permalink
async
Browse files Browse the repository at this point in the history
  • Loading branch information
drpmma committed Feb 4, 2024
1 parent 27ffbf3 commit f8ea63b
Showing 1 changed file with 101 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -612,25 +612,29 @@ private void updateChannelLastResponseTime(final String addr) {
}
}

private Channel getAndCreateChannel(final String addr) throws InterruptedException {
private ChannelFuture getAndCreateChannelAsync(final String addr) throws InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel();
return getAndCreateNameserverChannelAsync();
}

ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannel().newSucceededFuture();
}

return this.createChannel(addr);
return this.createChannelAsync(addr);
}

private Channel getAndCreateChannel(final String addr) throws InterruptedException {
return getAndCreateChannelAsync(addr).awaitUninterruptibly().channel();
}

private Channel getAndCreateNameserverChannel() throws InterruptedException {
private ChannelFuture getAndCreateNameserverChannelAsync() throws InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannel().newSucceededFuture();
}
}

Expand All @@ -641,25 +645,19 @@ private Channel getAndCreateNameserverChannel() throws InterruptedException {
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannel().newSucceededFuture();
}
}

if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);

this.namesrvAddrChoosed.set(newAddr);
LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null) {
return channelNew;
}
}
throw new RemotingConnectException(addrList.toString());
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);

this.namesrvAddrChoosed.set(newAddr);
LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
return this.createChannelAsync(newAddr);
}
} catch (Exception e) {
LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e);
Expand All @@ -673,20 +671,19 @@ private Channel getAndCreateNameserverChannel() throws InterruptedException {
return null;
}

private Channel createChannel(final String addr) throws InterruptedException {
private ChannelFuture createChannelAsync(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannel().newSucceededFuture();
}

if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {

if (cw.isOK()) {
return cw.getChannel();
return cw.getChannel().newSucceededFuture();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
} else {
Expand All @@ -705,6 +702,7 @@ private Channel createChannel(final String addr) throws InterruptedException {
cw = new ChannelWrapper(addr, channelFuture);
this.channelTables.put(addr, cw);
this.channelWrapperTables.put(channelFuture.channel(), cw);
return channelFuture;
}
} catch (Exception e) {
LOGGER.error("createChannel: create channel exception", e);
Expand All @@ -715,26 +713,6 @@ private Channel createChannel(final String addr) throws InterruptedException {
LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}

if (cw != null) {
return waitChannelFuture(addr, cw);
}

return null;
}

private Channel waitChannelFuture(String addr, ChannelWrapper cw) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
LOGGER.warn("createChannel: connect remote host[{}] failed, {}", addr, channelFuture.toString());
}
} else {
LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
return null;
}

Expand All @@ -743,56 +721,74 @@ public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout");
final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr);
channelFuture.addListener(future -> {
if (future.isSuccess()) {
Channel channel = channelFuture.channel();
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
invokeCallback.operationFail(new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout"));
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr));
} else {
this.closeChannel(addr, channel);
invokeCallback.operationFail(new RemotingConnectException(addr));
}
} else {
invokeCallback.operationFail(new RemotingConnectException(addr));
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr));
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
});
}

@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr);
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(channelRemoteAddr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
LOGGER.warn("invokeOneway: send request exception, so close the channel[{}]", channelRemoteAddr);
this.closeChannel(addr, channel);
throw e;
final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr);
channelFuture.addListener(future -> {
if (future.isSuccess()) {
Channel channel = channelFuture.channel();
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
doBeforeRpcHooks(channelRemoteAddr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} else {
this.closeChannel(addr, channel);
}
}
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
}
});
}

@Override
public CompletableFuture<RemotingCommand> invoke(String addr, RemotingCommand request,
long timeoutMillis) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
return invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> {
if (t == null) {
updateChannelLastResponseTime(addr);
final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr);
channelFuture.addListener(f -> {
if (f.isSuccess()) {
Channel channel = channelFuture.channel();
if (channel != null && channel.isActive()) {
invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> {
if (t == null) {
updateChannelLastResponseTime(addr);
}
}).thenApply(ResponseFuture::getResponseCommand).whenComplete((v, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(v);
}
});
} else {
this.closeChannel(addr, channel);
future.completeExceptionally(new RemotingConnectException(addr));
}
}).thenApply(ResponseFuture::getResponseCommand);
} else {
this.closeChannel(addr, channel);
future.completeExceptionally(new RemotingConnectException(addr));
}
} else {
future.completeExceptionally(new RemotingConnectException(addr));
}
});
} catch (Throwable t) {
future.completeExceptionally(t);
}
Expand Down Expand Up @@ -820,18 +816,36 @@ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final
});
if (channelWrapper != null) {
if (nettyClientConfig.isEnableTransparentRetry()) {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
retryRequest.setBody(request.getBody());
Channel retryChannel;
if (channelWrapper.isOK()) {
retryChannel = channelWrapper.getChannel();
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
Channel retryChannel = channelWrapper.getChannel();
if (retryChannel != null && channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
}
} else {
retryChannel = waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper);
}
if (retryChannel != null && channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
ChannelFuture channelFuture = channelWrapper.getChannelFuture();
channelFuture.addListener(f -> {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
if (f.isSuccess()) {
Channel retryChannel0 = channelFuture.channel();
if (retryChannel0 != null && channel != retryChannel0) {
super.invokeImpl(retryChannel0, retryRequest, timeoutMillis - duration).whenComplete((v, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(v);
}
});
}
} else {
future.completeExceptionally(new RemotingConnectException(channelWrapper.channelAddress));
}
});
return future;
}
}
}
Expand Down

0 comments on commit f8ea63b

Please sign in to comment.