Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7815] Use createChannelAsync for async invoke rpc #7816

Merged
merged 4 commits into from
Feb 20, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -612,25 +612,33 @@ private void updateChannelLastResponseTime(final String addr) {
}
}

private Channel getAndCreateChannel(final String addr) throws InterruptedException {
private ChannelFuture getAndCreateChannelAsync(final String addr) throws InterruptedException {
if (null == addr) {
return getAndCreateNameserverChannel();
return getAndCreateNameserverChannelAsync();
}

ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里会不会建出多个 channel ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

想了下这里应该不会泄漏

return cw.getChannel();
return cw.getChannelFuture();
}

return this.createChannel(addr);
return this.createChannelAsync(addr);
}

private Channel getAndCreateChannel(final String addr) throws InterruptedException {
ChannelFuture channelFuture = getAndCreateChannelAsync(addr);
if (channelFuture == null) {
return null;
}
return getAndCreateChannelAsync(addr).awaitUninterruptibly().channel();
}

private Channel getAndCreateNameserverChannel() throws InterruptedException {
private ChannelFuture getAndCreateNameserverChannelAsync() throws InterruptedException {
String addr = this.namesrvAddrChoosed.get();
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannelFuture();
}
}

Expand All @@ -641,25 +649,19 @@ private Channel getAndCreateNameserverChannel() throws InterruptedException {
if (addr != null) {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannelFuture();
}
}

if (addrList != null && !addrList.isEmpty()) {
for (int i = 0; i < addrList.size(); i++) {
int index = this.namesrvIndex.incrementAndGet();
index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);

this.namesrvAddrChoosed.set(newAddr);
LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
Channel channelNew = this.createChannel(newAddr);
if (channelNew != null) {
return channelNew;
}
}
throw new RemotingConnectException(addrList.toString());
int index = this.namesrvIndex.incrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, the original semantics have been changed. Consider this situation: the NameServer address is set as '127.0.0.1:9876;${wrong_ip}:9876'.

In the original implementation, 'getAndCreateNameServerChannel' would always retrieve the correct channel with '127.0.0.1:9876'.
However, in the new implementation, 'ChannelFuture' may be null if '${wrong_ip}:9876' is chosen.

index = Math.abs(index);
index = index % addrList.size();
String newAddr = addrList.get(index);

this.namesrvAddrChoosed.set(newAddr);
LOGGER.info("new name server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}", addr, newAddr, namesrvIndex);
return this.createChannelAsync(newAddr);
}
} catch (Exception e) {
LOGGER.error("getAndCreateNameserverChannel: create name server channel exception", e);
Expand All @@ -673,39 +675,23 @@ private Channel getAndCreateNameserverChannel() throws InterruptedException {
return null;
}

private Channel createChannel(final String addr) throws InterruptedException {
private ChannelFuture createChannelAsync(final String addr) throws InterruptedException {
ChannelWrapper cw = this.channelTables.get(addr);
if (cw != null && cw.isOK()) {
return cw.getChannel();
return cw.getChannelFuture();
}

if (this.lockChannelTables.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
try {
boolean createNewConnection;
cw = this.channelTables.get(addr);
if (cw != null) {

if (cw.isOK()) {
return cw.getChannel();
} else if (!cw.getChannelFuture().isDone()) {
createNewConnection = false;
if (cw.isOK() || !cw.getChannelFuture().isDone()) {
return cw.getChannelFuture();
} else {
this.channelTables.remove(addr);
createNewConnection = true;
}
} else {
createNewConnection = true;
}

if (createNewConnection) {
String[] hostAndPort = getHostAndPort(addr);
ChannelFuture channelFuture = fetchBootstrap(addr)
.connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
cw = new ChannelWrapper(addr, channelFuture);
this.channelTables.put(addr, cw);
this.channelWrapperTables.put(channelFuture.channel(), cw);
}
return createChannel(addr).getChannelFuture();
} catch (Exception e) {
LOGGER.error("createChannel: create channel exception", e);
} finally {
Expand All @@ -715,84 +701,104 @@ private Channel createChannel(final String addr) throws InterruptedException {
LOGGER.warn("createChannel: try to lock channel table, but timeout, {}ms", LOCK_TIMEOUT_MILLIS);
}

if (cw != null) {
return waitChannelFuture(addr, cw);
}

return null;
}

private Channel waitChannelFuture(String addr, ChannelWrapper cw) {
ChannelFuture channelFuture = cw.getChannelFuture();
if (channelFuture.awaitUninterruptibly(this.nettyClientConfig.getConnectTimeoutMillis())) {
if (cw.isOK()) {
LOGGER.info("createChannel: connect remote host[{}] success, {}", addr, channelFuture.toString());
return cw.getChannel();
} else {
LOGGER.warn("createChannel: connect remote host[{}] failed, {}", addr, channelFuture.toString());
}
} else {
LOGGER.warn("createChannel: connect remote host[{}] timeout {}ms, {}", addr, this.nettyClientConfig.getConnectTimeoutMillis(),
channelFuture.toString());
}
return null;
private ChannelWrapper createChannel(String addr) {
String[] hostAndPort = getHostAndPort(addr);
ChannelFuture channelFuture = fetchBootstrap(addr)
.connect(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
LOGGER.info("createChannel: begin to connect remote host[{}] asynchronously", addr);
ChannelWrapper cw = new ChannelWrapper(addr, channelFuture);
this.channelTables.put(addr, cw);
this.channelWrapperTables.put(channelFuture.channel(), cw);
return cw;
}

@Override
public void invokeAsync(String addr, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)
throws InterruptedException, RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException,
RemotingSendRequestException {
long beginStartTime = System.currentTimeMillis();
final Channel channel = this.getAndCreateChannel(addr);
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
throw new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout");
}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr));
} else {
this.closeChannel(addr, channel);
throw new RemotingConnectException(addr);
final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr);
if (channelFuture == null) {
invokeCallback.operationFail(new RemotingConnectException(addr));
return;
}
channelFuture.addListener(future -> {
if (future.isSuccess()) {
Channel channel = channelFuture.channel();
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
long costTime = System.currentTimeMillis() - beginStartTime;
if (timeoutMillis < costTime) {
invokeCallback.operationFail(new RemotingTooMuchRequestException("invokeAsync call the addr[" + channelRemoteAddr + "] timeout"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add real cost time to the Exception mesage.

}
this.invokeAsyncImpl(channel, request, timeoutMillis - costTime, new InvokeCallbackWrapper(invokeCallback, addr));
} else {
this.closeChannel(addr, channel);
invokeCallback.operationFail(new RemotingConnectException(addr));
}
} else {
invokeCallback.operationFail(new RemotingConnectException(addr));
}
});
}

@Override
public void invokeOneway(String addr, RemotingCommand request, long timeoutMillis) throws InterruptedException,
RemotingConnectException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {
final Channel channel = this.getAndCreateChannel(addr);
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
try {
doBeforeRpcHooks(channelRemoteAddr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} catch (RemotingSendRequestException e) {
LOGGER.warn("invokeOneway: send request exception, so close the channel[{}]", channelRemoteAddr);
this.closeChannel(addr, channel);
throw e;
}
} else {
this.closeChannel(addr, channel);
final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr);
if (channelFuture == null) {
throw new RemotingConnectException(addr);
}
channelFuture.addListener(future -> {
if (future.isSuccess()) {
Channel channel = channelFuture.channel();
String channelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channel);
if (channel != null && channel.isActive()) {
doBeforeRpcHooks(channelRemoteAddr, request);
this.invokeOnewayImpl(channel, request, timeoutMillis);
} else {
this.closeChannel(addr, channel);
}
}
});
}

@Override
public CompletableFuture<RemotingCommand> invoke(String addr, RemotingCommand request,
long timeoutMillis) {
CompletableFuture<RemotingCommand> future = new CompletableFuture<>();
try {
final Channel channel = this.getAndCreateChannel(addr);
if (channel != null && channel.isActive()) {
return invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> {
if (t == null) {
updateChannelLastResponseTime(addr);
}
}).thenApply(ResponseFuture::getResponseCommand);
} else {
this.closeChannel(addr, channel);
final ChannelFuture channelFuture = this.getAndCreateChannelAsync(addr);
if (channelFuture == null) {
future.completeExceptionally(new RemotingConnectException(addr));
return future;
}
channelFuture.addListener(f -> {
if (f.isSuccess()) {
Channel channel = channelFuture.channel();
if (channel != null && channel.isActive()) {
invokeImpl(channel, request, timeoutMillis).whenComplete((v, t) -> {
if (t == null) {
updateChannelLastResponseTime(addr);
}
}).thenApply(ResponseFuture::getResponseCommand).whenComplete((v, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(v);
}
});
} else {
this.closeChannel(addr, channel);
future.completeExceptionally(new RemotingConnectException(addr));
}
} else {
future.completeExceptionally(new RemotingConnectException(addr));
}
});
} catch (Throwable t) {
future.completeExceptionally(t);
}
Expand Down Expand Up @@ -820,18 +826,37 @@ public CompletableFuture<ResponseFuture> invokeImpl(final Channel channel, final
});
if (channelWrapper != null) {
if (nettyClientConfig.isEnableTransparentRetry()) {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
RemotingCommand retryRequest = RemotingCommand.createRequestCommand(request.getCode(), request.readCustomHeader());
retryRequest.setBody(request.getBody());
Channel retryChannel;
if (channelWrapper.isOK()) {
retryChannel = channelWrapper.getChannel();
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
Channel retryChannel = channelWrapper.getChannel();
if (retryChannel != null && channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
}
} else {
retryChannel = waitChannelFuture(channelWrapper.getChannelAddress(), channelWrapper);
}
if (retryChannel != null && channel != retryChannel) {
return super.invokeImpl(retryChannel, retryRequest, timeoutMillis - duration);
CompletableFuture<ResponseFuture> future = new CompletableFuture<>();
ChannelFuture channelFuture = channelWrapper.getChannelFuture();
channelFuture.addListener(f -> {
long duration = stopwatch.elapsed(TimeUnit.MILLISECONDS);
stopwatch.stop();
if (f.isSuccess()) {
Channel retryChannel0 = channelFuture.channel();
if (retryChannel0 != null && channel != retryChannel0) {
super.invokeImpl(retryChannel0, retryRequest, timeoutMillis - duration).whenComplete((v, t) -> {
if (t != null) {
future.completeExceptionally(t);
} else {
future.complete(v);
}
});
}
} else {
future.completeExceptionally(new RemotingConnectException(channelWrapper.channelAddress));
}
});
return future;
}
}
}
Expand Down
Loading